mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-09 06:42:57 +00:00
chore: replace bitvec impl (#214)
* chore: replace bitvec impl * chore: reduce one copy of nullmask * chore: move bitvec to common_base
This commit is contained in:
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -741,6 +741,7 @@ dependencies = [
|
||||
name = "common-base"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"bitvec",
|
||||
"bytes",
|
||||
"common-error",
|
||||
"paste",
|
||||
@@ -4111,8 +4112,8 @@ dependencies = [
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
"atomic_float",
|
||||
"bitvec",
|
||||
"bytes",
|
||||
"common-base",
|
||||
"common-error",
|
||||
"common-runtime",
|
||||
"common-telemetry",
|
||||
|
||||
@@ -4,6 +4,7 @@ version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
bitvec = "1.0"
|
||||
bytes = { version = "1.1", features = ["serde"] }
|
||||
common-error = { path = "../error" }
|
||||
paste = "1.0"
|
||||
|
||||
@@ -1,214 +1,5 @@
|
||||
// TODO(fys): Use bitset crate to replace it
|
||||
pub struct BitSet {
|
||||
// TODO(fys): Is SmallVec or TinyVec better?
|
||||
buffer: Vec<u8>,
|
||||
nbits: usize,
|
||||
}
|
||||
use bitvec::prelude as bv;
|
||||
|
||||
impl BitSet {
|
||||
pub fn from_bytes(data: Vec<u8>, nbits: usize) -> Self {
|
||||
debug_assert!(data.len() << 3 >= nbits);
|
||||
Self {
|
||||
buffer: data,
|
||||
nbits,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_capacity(size: usize) -> Self {
|
||||
let buffer_len = (size + 7) >> 3;
|
||||
Self {
|
||||
buffer: vec![0; buffer_len],
|
||||
nbits: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.nbits
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.nbits == 0
|
||||
}
|
||||
|
||||
pub fn ones_count(&self) -> usize {
|
||||
(0..self.nbits)
|
||||
.into_iter()
|
||||
.filter(|&i| matches!(self.get(i), Some(true)))
|
||||
.count()
|
||||
}
|
||||
|
||||
pub fn get(&self, idx: usize) -> Option<bool> {
|
||||
if idx >= self.nbits {
|
||||
return None;
|
||||
}
|
||||
|
||||
let byte_idx = idx >> 3;
|
||||
let bit_idx = idx & 7;
|
||||
Some((self.buffer[byte_idx] >> bit_idx) & 1 != 0)
|
||||
}
|
||||
|
||||
pub fn extend(&mut self, other: &BitSet) {
|
||||
let nbits = self.len() + other.len();
|
||||
|
||||
if self.buffer.len() << 3 < nbits {
|
||||
let buffer_len = (nbits + 7) >> 3;
|
||||
self.buffer.resize(buffer_len, 0);
|
||||
}
|
||||
|
||||
for idx in 0..other.len() {
|
||||
if let Some(true) = other.get(idx) {
|
||||
self.set_bit_uncheck(idx + self.nbits);
|
||||
}
|
||||
}
|
||||
|
||||
self.nbits = nbits;
|
||||
}
|
||||
|
||||
pub fn append(&mut self, to_set: &[bool]) {
|
||||
let nbits = self.nbits + to_set.len();
|
||||
|
||||
if self.buffer.len() << 3 < nbits {
|
||||
let buffer_len = (nbits + 7) >> 3;
|
||||
self.buffer.resize(buffer_len, 0);
|
||||
}
|
||||
|
||||
for (idx, is_set) in to_set.iter().enumerate() {
|
||||
if *is_set {
|
||||
self.set_bit_uncheck(self.nbits + idx);
|
||||
}
|
||||
}
|
||||
|
||||
self.nbits = nbits;
|
||||
}
|
||||
|
||||
pub fn set(&mut self, idx: usize) {
|
||||
debug_assert!(idx < self.nbits, "idx should be less than nbits");
|
||||
|
||||
self.set_bit_uncheck(idx);
|
||||
}
|
||||
|
||||
pub fn buffer(self) -> Vec<u8> {
|
||||
self.buffer
|
||||
}
|
||||
|
||||
fn set_bit_uncheck(&mut self, idx: usize) {
|
||||
let byte_idx = idx >> 3;
|
||||
let bit_idx = idx & 7;
|
||||
self.buffer[byte_idx] |= 1 << bit_idx;
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Vec<u8>> for BitSet {
|
||||
fn from(data: Vec<u8>) -> Self {
|
||||
BitSet {
|
||||
nbits: data.len() << 3,
|
||||
buffer: data,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&[u8]> for BitSet {
|
||||
fn from(data: &[u8]) -> Self {
|
||||
BitSet {
|
||||
buffer: data.into(),
|
||||
nbits: data.len() << 3,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::usize;
|
||||
|
||||
use crate::bitset::BitSet;
|
||||
|
||||
#[test]
|
||||
fn test_bit_set_get_and_set() {
|
||||
let mut bit_set: BitSet = vec![0b0000_0000, 0b0000_0000].into();
|
||||
|
||||
check_bit_set(&bit_set, &[]);
|
||||
assert_eq!(16, bit_set.len());
|
||||
assert_eq!(0, bit_set.ones_count());
|
||||
|
||||
bit_set.set(0);
|
||||
bit_set.set(7);
|
||||
bit_set.set(15);
|
||||
|
||||
check_bit_set(&bit_set, &[0, 7, 15]);
|
||||
assert_eq!(None, bit_set.get(16));
|
||||
assert_eq!(16, bit_set.len());
|
||||
assert_eq!(3, bit_set.ones_count());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bit_set_extend() {
|
||||
let mut bit_set = BitSet::with_capacity(10);
|
||||
|
||||
bit_set.extend(&vec![0b1000_0111].into());
|
||||
|
||||
check_bit_set(&bit_set, &[0, 1, 2, 7]);
|
||||
assert_eq!(None, bit_set.get(9));
|
||||
assert_eq!(8, bit_set.len());
|
||||
assert_eq!(4, bit_set.ones_count());
|
||||
|
||||
let other = BitSet::from_bytes(vec![0b0000_0111], 3);
|
||||
bit_set.extend(&other);
|
||||
|
||||
check_bit_set(&bit_set, &[0, 1, 2, 7, 8, 9, 10]);
|
||||
assert_eq!(None, bit_set.get(11));
|
||||
assert_eq!(11, bit_set.len());
|
||||
assert_eq!(7, bit_set.ones_count());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ones_count() {
|
||||
let bit_set = BitSet::from_bytes(vec![0b1111_1111], 4);
|
||||
|
||||
let ones_count = bit_set.ones_count();
|
||||
|
||||
assert_eq!(4, ones_count);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bit_set_append() {
|
||||
let mut bit_set: BitSet = vec![0b0000_0001, 0b0000_1000].into();
|
||||
|
||||
bit_set.append(&[true, false]);
|
||||
|
||||
check_bit_set(&bit_set, &[0, 11, 16]);
|
||||
assert_eq!(None, bit_set.get(18));
|
||||
assert_eq!(18, bit_set.len());
|
||||
assert_eq!(3, bit_set.ones_count());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bit_set_buffer() {
|
||||
let empty_bit_set = BitSet::with_capacity(10);
|
||||
|
||||
assert!(empty_bit_set.is_empty());
|
||||
assert_eq!(vec![0, 0], empty_bit_set.buffer());
|
||||
|
||||
let mut bit_set = BitSet::with_capacity(10);
|
||||
|
||||
bit_set.append(&[true, false]);
|
||||
|
||||
let buffer = bit_set.buffer();
|
||||
assert_eq!(vec![1, 0], buffer);
|
||||
}
|
||||
|
||||
fn check_bit_set(bit_set: &BitSet, set_positions: &[usize]) {
|
||||
(0..bit_set.len()).for_each(|idx| {
|
||||
let mut is_hit = false;
|
||||
for position in set_positions {
|
||||
if idx == *position {
|
||||
assert_eq!(Some(true), bit_set.get(idx));
|
||||
is_hit = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if !is_hit {
|
||||
assert_eq!(Some(false), bit_set.get(idx));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
// `Lsb0` provides the best codegen for bit manipulation,
|
||||
// see https://github.com/bitvecto-rs/bitvec/blob/main/doc/order/Lsb0.md
|
||||
pub type BitVec = bv::BitVec<u8, bv::Lsb0>;
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
pub mod bitset;
|
||||
pub mod buffer;
|
||||
pub mod bytes;
|
||||
|
||||
pub use bitset::BitVec;
|
||||
|
||||
@@ -4,7 +4,7 @@ use std::{
|
||||
};
|
||||
|
||||
use api::v1::{codec::InsertBatch, column::Values, Column, InsertExpr};
|
||||
use common_base::bitset::BitSet;
|
||||
use common_base::BitVec;
|
||||
use datatypes::{data_type::ConcreteDataType, value::Value, vectors::VectorBuilder};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use table::{requests::InsertRequest, Table};
|
||||
@@ -50,7 +50,6 @@ pub fn insertion_expr_to_request(
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
add_values_to_builder(vector_builder, values, row_count as usize, null_mask)?;
|
||||
}
|
||||
}
|
||||
@@ -78,10 +77,9 @@ fn add_values_to_builder(
|
||||
builder: &mut VectorBuilder,
|
||||
values: Values,
|
||||
row_count: usize,
|
||||
null_mask: impl Into<BitSet>,
|
||||
null_mask: Vec<u8>,
|
||||
) -> Result<()> {
|
||||
let data_type = builder.data_type();
|
||||
let null_mask: BitSet = null_mask.into();
|
||||
let values = convert_values(&data_type, values);
|
||||
|
||||
if null_mask.is_empty() {
|
||||
@@ -91,8 +89,9 @@ fn add_values_to_builder(
|
||||
builder.push(value);
|
||||
});
|
||||
} else {
|
||||
let null_mask = BitVec::from_vec(null_mask);
|
||||
ensure!(
|
||||
null_mask.ones_count() + values.len() == row_count,
|
||||
null_mask.count_ones() + values.len() == row_count,
|
||||
IllegalInsertDataSnafu
|
||||
);
|
||||
|
||||
@@ -107,7 +106,6 @@ fn add_values_to_builder(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -175,8 +173,8 @@ fn convert_values(data_type: &ConcreteDataType, values: Values) -> Vec<Value> {
|
||||
}
|
||||
}
|
||||
|
||||
fn is_null(null_mask: &BitSet, idx: usize) -> Option<bool> {
|
||||
null_mask.get(idx)
|
||||
fn is_null(null_mask: &BitVec, idx: usize) -> Option<bool> {
|
||||
null_mask.get(idx).as_deref().copied()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -188,7 +186,7 @@ mod tests {
|
||||
column::{self, Values},
|
||||
Column, InsertExpr,
|
||||
};
|
||||
use common_base::bitset::BitSet;
|
||||
use common_base::BitVec;
|
||||
use common_query::prelude::Expr;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
use datatypes::{
|
||||
@@ -252,7 +250,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_is_null() {
|
||||
let null_mask: BitSet = vec![0b0000_0001, 0b0000_1000].into();
|
||||
let null_mask = BitVec::from_slice(&[0b0000_0001, 0b0000_1000]);
|
||||
|
||||
assert_eq!(Some(true), is_null(&null_mask, 0));
|
||||
assert_eq!(Some(false), is_null(&null_mask, 1));
|
||||
|
||||
@@ -2,7 +2,7 @@ use std::sync::Arc;
|
||||
|
||||
use api::v1::{codec::SelectResult, column::Values, Column, ObjectResult};
|
||||
use arrow::array::{Array, BooleanArray, PrimitiveArray};
|
||||
use common_base::bitset::BitSet;
|
||||
use common_base::BitVec;
|
||||
use common_error::prelude::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_recordbatch::{util, RecordBatch, SendableRecordBatchStream};
|
||||
@@ -87,19 +87,16 @@ fn null_mask(arrays: &Vec<Arc<dyn Array>>, row_count: usize) -> Vec<u8> {
|
||||
return Vec::default();
|
||||
}
|
||||
|
||||
let mut nulls_set = BitSet::with_capacity(row_count);
|
||||
let mut null_mask = BitVec::with_capacity(row_count);
|
||||
for array in arrays {
|
||||
let validity = array.validity();
|
||||
// TODO(fys): Improve in the future, better way: repeat(false, len).
|
||||
if let Some(v) = validity {
|
||||
let nulls: Vec<bool> = v.iter().map(|x| !x).collect();
|
||||
nulls_set.append(&nulls);
|
||||
v.iter().for_each(|x| null_mask.push(!x));
|
||||
} else {
|
||||
nulls_set.append(&vec![false; array.len()]);
|
||||
null_mask.extend_from_bitslice(&BitVec::repeat(false, array.len()));
|
||||
}
|
||||
}
|
||||
|
||||
nulls_set.buffer()
|
||||
null_mask.into_vec()
|
||||
}
|
||||
|
||||
macro_rules! convert_arrow_array_to_grpc_vals {
|
||||
|
||||
@@ -10,8 +10,8 @@ arc-swap = "1.0"
|
||||
arrow-format = { version = "0.4", features = ["ipc"] }
|
||||
async-stream = "0.3"
|
||||
async-trait = "0.1"
|
||||
bitvec = "1.0"
|
||||
bytes = "1.1"
|
||||
common-base = { path = "../common/base" }
|
||||
common-error = { path = "../common/error" }
|
||||
common-runtime = { path = "../common/runtime" }
|
||||
common-telemetry = { path = "../common/telemetry" }
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
use std::io::Read;
|
||||
|
||||
use arrow_format::{self, ipc::planus::ReadAsRoot};
|
||||
use common_base::BitVec;
|
||||
use datatypes::arrow::{
|
||||
datatypes::Schema,
|
||||
error::{ArrowError, Result},
|
||||
@@ -14,8 +15,6 @@ use datatypes::arrow::{
|
||||
},
|
||||
};
|
||||
|
||||
use crate::bit_vec;
|
||||
|
||||
const CONTINUATION_MARKER: [u8; 4] = [0xff; 4];
|
||||
|
||||
pub struct ArrowStreamReader<R: Read> {
|
||||
@@ -92,7 +91,7 @@ impl<R: Read> ArrowStreamReader<R> {
|
||||
}
|
||||
|
||||
fn valid_metadata(metadata: &StreamMetadata, column_null_mask: &[u8]) -> StreamMetadata {
|
||||
let column_null_mask = bit_vec::BitVec::from_slice(column_null_mask);
|
||||
let column_null_mask = BitVec::from_slice(column_null_mask);
|
||||
|
||||
let schema = Schema::from(
|
||||
metadata
|
||||
|
||||
@@ -1,5 +0,0 @@
|
||||
use bitvec::prelude as bv;
|
||||
|
||||
// `Lsb0` provides the best codegen for bit manipulation,
|
||||
// see https://github.com/bitvecto-rs/bitvec/blob/main/doc/order/Lsb0.md
|
||||
pub type BitVec = bv::BitVec<u8, bv::Lsb0>;
|
||||
@@ -1,7 +1,6 @@
|
||||
//! Storage engine implementation.
|
||||
mod arrow_stream;
|
||||
mod background;
|
||||
mod bit_vec;
|
||||
mod chunk;
|
||||
pub mod codec;
|
||||
pub mod config;
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
#![allow(clippy::all)]
|
||||
tonic::include_proto!("greptime.storage.wal.v1");
|
||||
|
||||
use crate::{
|
||||
bit_vec,
|
||||
write_batch::{Mutation, WriteBatch},
|
||||
};
|
||||
use common_base::BitVec;
|
||||
|
||||
use crate::write_batch::{Mutation, WriteBatch};
|
||||
|
||||
pub fn gen_mutation_extras(write_batch: &WriteBatch) -> Vec<MutationExtra> {
|
||||
let column_schemas = write_batch.schema().column_schemas();
|
||||
@@ -18,7 +17,7 @@ pub fn gen_mutation_extras(write_batch: &WriteBatch) -> Vec<MutationExtra> {
|
||||
column_null_mask: Default::default(),
|
||||
}
|
||||
} else {
|
||||
let mut column_null_mask = bit_vec::BitVec::repeat(false, column_schemas.len());
|
||||
let mut column_null_mask = BitVec::repeat(false, column_schemas.len());
|
||||
for (i, cs) in column_schemas.iter().enumerate() {
|
||||
if put.column_by_name(&cs.name).is_none() {
|
||||
column_null_mask.set(i, true);
|
||||
|
||||
@@ -3,6 +3,7 @@ tonic::include_proto!("greptime.storage.write_batch.v1");
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_base::BitVec;
|
||||
use common_error::prelude::*;
|
||||
use datatypes::schema;
|
||||
use datatypes::{
|
||||
@@ -20,8 +21,6 @@ use datatypes::{
|
||||
use paste::paste;
|
||||
use snafu::OptionExt;
|
||||
|
||||
use crate::bit_vec;
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display("Failed to convert datafusion type: {}", from))]
|
||||
@@ -178,7 +177,7 @@ macro_rules! gen_columns {
|
||||
.with_context(|| ConversionSnafu {
|
||||
from: std::format!("{:?}", vector.as_ref().data_type()),
|
||||
})?;
|
||||
let mut bits: Option<bit_vec::BitVec> = None;
|
||||
let mut bits: Option<BitVec> = None;
|
||||
|
||||
vector_ref
|
||||
.iter_data()
|
||||
@@ -187,7 +186,7 @@ macro_rules! gen_columns {
|
||||
Some($vari) => values.[<$key _values>].push($cast),
|
||||
None => {
|
||||
if (bits.is_none()) {
|
||||
bits = Some(bit_vec::BitVec::repeat(false, vector_ref.len()));
|
||||
bits = Some(BitVec::repeat(false, vector_ref.len()));
|
||||
}
|
||||
bits.as_mut().map(|x| x.set(i, true));
|
||||
}
|
||||
@@ -237,7 +236,7 @@ macro_rules! gen_put_data {
|
||||
(0..num_rows)
|
||||
.for_each(|_| builder.push(vector_iter.next().map(|$vari| $cast)));
|
||||
} else {
|
||||
bit_vec::BitVec::from_vec(column.value_null_mask)
|
||||
BitVec::from_vec(column.value_null_mask)
|
||||
.into_iter()
|
||||
.take(num_rows)
|
||||
.for_each(|is_null| {
|
||||
|
||||
@@ -432,6 +432,7 @@ pub mod codec {
|
||||
|
||||
use std::{io::Cursor, sync::Arc};
|
||||
|
||||
use common_base::BitVec;
|
||||
use datatypes::{
|
||||
arrow::{
|
||||
chunk::Chunk as ArrowChunk,
|
||||
@@ -454,18 +455,15 @@ pub mod codec {
|
||||
Error as WriteBatchError, FromProtobufSnafu, Mutation, ParseSchemaSnafu, Result,
|
||||
ToProtobufSnafu, WriteBatch,
|
||||
};
|
||||
use crate::proto::{
|
||||
wal::{MutationExtra, MutationType},
|
||||
write_batch::{self, gen_columns, gen_put_data_vector},
|
||||
};
|
||||
use crate::write_batch::{DecodeProtobufSnafu, EncodeProtobufSnafu, PutData};
|
||||
use crate::{
|
||||
arrow_stream::ArrowStreamReader,
|
||||
codec::{Decoder, Encoder},
|
||||
};
|
||||
use crate::{
|
||||
bit_vec,
|
||||
proto::{
|
||||
wal::{MutationExtra, MutationType},
|
||||
write_batch::{self, gen_columns, gen_put_data_vector},
|
||||
},
|
||||
};
|
||||
|
||||
// TODO(jiachun): The codec logic is too complex, maybe we should use protobuf to
|
||||
// serialize/deserialize all our data.
|
||||
@@ -523,7 +521,7 @@ pub mod codec {
|
||||
} else {
|
||||
let valid_ipc_fields = ipc_fields
|
||||
.iter()
|
||||
.zip(bit_vec::BitVec::from_slice(column_null_mask))
|
||||
.zip(BitVec::from_slice(column_null_mask))
|
||||
.filter(|(_, is_null)| !*is_null)
|
||||
.map(|(ipc_field, _)| ipc_field.clone())
|
||||
.collect::<Vec<_>>();
|
||||
@@ -644,13 +642,12 @@ pub mod codec {
|
||||
if ext.column_null_mask.is_empty() {
|
||||
gen_mutation_put(&column_names)
|
||||
} else {
|
||||
let valid_columns =
|
||||
bit_vec::BitVec::from_slice(&ext.column_null_mask)
|
||||
.into_iter()
|
||||
.zip(column_names.iter())
|
||||
.filter(|(is_null, _)| !*is_null)
|
||||
.map(|(_, column_name)| column_name.clone())
|
||||
.collect::<Vec<_>>();
|
||||
let valid_columns = BitVec::from_slice(&ext.column_null_mask)
|
||||
.into_iter()
|
||||
.zip(column_names.iter())
|
||||
.filter(|(is_null, _)| !*is_null)
|
||||
.map(|(_, column_name)| column_name.clone())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
gen_mutation_put(&valid_columns)
|
||||
}
|
||||
@@ -765,7 +762,7 @@ pub mod codec {
|
||||
.map(|column| (column.name.clone(), column.data_type.clone()))
|
||||
.collect::<Vec<_>>()
|
||||
} else {
|
||||
bit_vec::BitVec::from_slice(&ext.column_null_mask)
|
||||
BitVec::from_slice(&ext.column_null_mask)
|
||||
.into_iter()
|
||||
.zip(column_schemas.iter())
|
||||
.filter(|(is_null, _)| !*is_null)
|
||||
|
||||
Reference in New Issue
Block a user