diff --git a/Cargo.lock b/Cargo.lock index f1c7220398..678291cea5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -741,6 +741,7 @@ dependencies = [ name = "common-base" version = "0.1.0" dependencies = [ + "bitvec", "bytes", "common-error", "paste", @@ -4111,8 +4112,8 @@ dependencies = [ "async-stream", "async-trait", "atomic_float", - "bitvec", "bytes", + "common-base", "common-error", "common-runtime", "common-telemetry", diff --git a/src/common/base/Cargo.toml b/src/common/base/Cargo.toml index c59f3356ea..0ecd8e2629 100644 --- a/src/common/base/Cargo.toml +++ b/src/common/base/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] +bitvec = "1.0" bytes = { version = "1.1", features = ["serde"] } common-error = { path = "../error" } paste = "1.0" diff --git a/src/common/base/src/bitset.rs b/src/common/base/src/bitset.rs index 441356b0b8..dd3529a81b 100644 --- a/src/common/base/src/bitset.rs +++ b/src/common/base/src/bitset.rs @@ -1,214 +1,5 @@ -// TODO(fys): Use bitset crate to replace it -pub struct BitSet { - // TODO(fys): Is SmallVec or TinyVec better? - buffer: Vec, - nbits: usize, -} +use bitvec::prelude as bv; -impl BitSet { - pub fn from_bytes(data: Vec, nbits: usize) -> Self { - debug_assert!(data.len() << 3 >= nbits); - Self { - buffer: data, - nbits, - } - } - - pub fn with_capacity(size: usize) -> Self { - let buffer_len = (size + 7) >> 3; - Self { - buffer: vec![0; buffer_len], - nbits: 0, - } - } - - pub fn len(&self) -> usize { - self.nbits - } - - pub fn is_empty(&self) -> bool { - self.nbits == 0 - } - - pub fn ones_count(&self) -> usize { - (0..self.nbits) - .into_iter() - .filter(|&i| matches!(self.get(i), Some(true))) - .count() - } - - pub fn get(&self, idx: usize) -> Option { - if idx >= self.nbits { - return None; - } - - let byte_idx = idx >> 3; - let bit_idx = idx & 7; - Some((self.buffer[byte_idx] >> bit_idx) & 1 != 0) - } - - pub fn extend(&mut self, other: &BitSet) { - let nbits = self.len() + other.len(); - - if self.buffer.len() << 3 < nbits { - let buffer_len = (nbits + 7) >> 3; - self.buffer.resize(buffer_len, 0); - } - - for idx in 0..other.len() { - if let Some(true) = other.get(idx) { - self.set_bit_uncheck(idx + self.nbits); - } - } - - self.nbits = nbits; - } - - pub fn append(&mut self, to_set: &[bool]) { - let nbits = self.nbits + to_set.len(); - - if self.buffer.len() << 3 < nbits { - let buffer_len = (nbits + 7) >> 3; - self.buffer.resize(buffer_len, 0); - } - - for (idx, is_set) in to_set.iter().enumerate() { - if *is_set { - self.set_bit_uncheck(self.nbits + idx); - } - } - - self.nbits = nbits; - } - - pub fn set(&mut self, idx: usize) { - debug_assert!(idx < self.nbits, "idx should be less than nbits"); - - self.set_bit_uncheck(idx); - } - - pub fn buffer(self) -> Vec { - self.buffer - } - - fn set_bit_uncheck(&mut self, idx: usize) { - let byte_idx = idx >> 3; - let bit_idx = idx & 7; - self.buffer[byte_idx] |= 1 << bit_idx; - } -} - -impl From> for BitSet { - fn from(data: Vec) -> Self { - BitSet { - nbits: data.len() << 3, - buffer: data, - } - } -} - -impl From<&[u8]> for BitSet { - fn from(data: &[u8]) -> Self { - BitSet { - buffer: data.into(), - nbits: data.len() << 3, - } - } -} - -#[cfg(test)] -mod tests { - use std::usize; - - use crate::bitset::BitSet; - - #[test] - fn test_bit_set_get_and_set() { - let mut bit_set: BitSet = vec![0b0000_0000, 0b0000_0000].into(); - - check_bit_set(&bit_set, &[]); - assert_eq!(16, bit_set.len()); - assert_eq!(0, bit_set.ones_count()); - - bit_set.set(0); - bit_set.set(7); - bit_set.set(15); - - check_bit_set(&bit_set, &[0, 7, 15]); - assert_eq!(None, bit_set.get(16)); - assert_eq!(16, bit_set.len()); - assert_eq!(3, bit_set.ones_count()); - } - - #[test] - fn test_bit_set_extend() { - let mut bit_set = BitSet::with_capacity(10); - - bit_set.extend(&vec![0b1000_0111].into()); - - check_bit_set(&bit_set, &[0, 1, 2, 7]); - assert_eq!(None, bit_set.get(9)); - assert_eq!(8, bit_set.len()); - assert_eq!(4, bit_set.ones_count()); - - let other = BitSet::from_bytes(vec![0b0000_0111], 3); - bit_set.extend(&other); - - check_bit_set(&bit_set, &[0, 1, 2, 7, 8, 9, 10]); - assert_eq!(None, bit_set.get(11)); - assert_eq!(11, bit_set.len()); - assert_eq!(7, bit_set.ones_count()); - } - - #[test] - fn test_ones_count() { - let bit_set = BitSet::from_bytes(vec![0b1111_1111], 4); - - let ones_count = bit_set.ones_count(); - - assert_eq!(4, ones_count); - } - - #[test] - fn test_bit_set_append() { - let mut bit_set: BitSet = vec![0b0000_0001, 0b0000_1000].into(); - - bit_set.append(&[true, false]); - - check_bit_set(&bit_set, &[0, 11, 16]); - assert_eq!(None, bit_set.get(18)); - assert_eq!(18, bit_set.len()); - assert_eq!(3, bit_set.ones_count()); - } - - #[test] - fn test_bit_set_buffer() { - let empty_bit_set = BitSet::with_capacity(10); - - assert!(empty_bit_set.is_empty()); - assert_eq!(vec![0, 0], empty_bit_set.buffer()); - - let mut bit_set = BitSet::with_capacity(10); - - bit_set.append(&[true, false]); - - let buffer = bit_set.buffer(); - assert_eq!(vec![1, 0], buffer); - } - - fn check_bit_set(bit_set: &BitSet, set_positions: &[usize]) { - (0..bit_set.len()).for_each(|idx| { - let mut is_hit = false; - for position in set_positions { - if idx == *position { - assert_eq!(Some(true), bit_set.get(idx)); - is_hit = true; - break; - } - } - if !is_hit { - assert_eq!(Some(false), bit_set.get(idx)); - } - }); - } -} +// `Lsb0` provides the best codegen for bit manipulation, +// see https://github.com/bitvecto-rs/bitvec/blob/main/doc/order/Lsb0.md +pub type BitVec = bv::BitVec; diff --git a/src/common/base/src/lib.rs b/src/common/base/src/lib.rs index 903a9bf90a..d31fb9e30f 100644 --- a/src/common/base/src/lib.rs +++ b/src/common/base/src/lib.rs @@ -1,3 +1,5 @@ pub mod bitset; pub mod buffer; pub mod bytes; + +pub use bitset::BitVec; diff --git a/src/datanode/src/server/grpc/insert.rs b/src/datanode/src/server/grpc/insert.rs index 0a38ea501a..854899580f 100644 --- a/src/datanode/src/server/grpc/insert.rs +++ b/src/datanode/src/server/grpc/insert.rs @@ -4,7 +4,7 @@ use std::{ }; use api::v1::{codec::InsertBatch, column::Values, Column, InsertExpr}; -use common_base::bitset::BitSet; +use common_base::BitVec; use datatypes::{data_type::ConcreteDataType, value::Value, vectors::VectorBuilder}; use snafu::{ensure, OptionExt, ResultExt}; use table::{requests::InsertRequest, Table}; @@ -50,7 +50,6 @@ pub fn insertion_expr_to_request( )) } }; - add_values_to_builder(vector_builder, values, row_count as usize, null_mask)?; } } @@ -78,10 +77,9 @@ fn add_values_to_builder( builder: &mut VectorBuilder, values: Values, row_count: usize, - null_mask: impl Into, + null_mask: Vec, ) -> Result<()> { let data_type = builder.data_type(); - let null_mask: BitSet = null_mask.into(); let values = convert_values(&data_type, values); if null_mask.is_empty() { @@ -91,8 +89,9 @@ fn add_values_to_builder( builder.push(value); }); } else { + let null_mask = BitVec::from_vec(null_mask); ensure!( - null_mask.ones_count() + values.len() == row_count, + null_mask.count_ones() + values.len() == row_count, IllegalInsertDataSnafu ); @@ -107,7 +106,6 @@ fn add_values_to_builder( } } } - Ok(()) } @@ -175,8 +173,8 @@ fn convert_values(data_type: &ConcreteDataType, values: Values) -> Vec { } } -fn is_null(null_mask: &BitSet, idx: usize) -> Option { - null_mask.get(idx) +fn is_null(null_mask: &BitVec, idx: usize) -> Option { + null_mask.get(idx).as_deref().copied() } #[cfg(test)] @@ -188,7 +186,7 @@ mod tests { column::{self, Values}, Column, InsertExpr, }; - use common_base::bitset::BitSet; + use common_base::BitVec; use common_query::prelude::Expr; use common_recordbatch::SendableRecordBatchStream; use datatypes::{ @@ -252,7 +250,7 @@ mod tests { #[test] fn test_is_null() { - let null_mask: BitSet = vec![0b0000_0001, 0b0000_1000].into(); + let null_mask = BitVec::from_slice(&[0b0000_0001, 0b0000_1000]); assert_eq!(Some(true), is_null(&null_mask, 0)); assert_eq!(Some(false), is_null(&null_mask, 1)); diff --git a/src/datanode/src/server/grpc/select.rs b/src/datanode/src/server/grpc/select.rs index 4ff13036fb..c5344b4d52 100644 --- a/src/datanode/src/server/grpc/select.rs +++ b/src/datanode/src/server/grpc/select.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use api::v1::{codec::SelectResult, column::Values, Column, ObjectResult}; use arrow::array::{Array, BooleanArray, PrimitiveArray}; -use common_base::bitset::BitSet; +use common_base::BitVec; use common_error::prelude::ErrorExt; use common_error::status_code::StatusCode; use common_recordbatch::{util, RecordBatch, SendableRecordBatchStream}; @@ -87,19 +87,16 @@ fn null_mask(arrays: &Vec>, row_count: usize) -> Vec { return Vec::default(); } - let mut nulls_set = BitSet::with_capacity(row_count); + let mut null_mask = BitVec::with_capacity(row_count); for array in arrays { let validity = array.validity(); - // TODO(fys): Improve in the future, better way: repeat(false, len). if let Some(v) = validity { - let nulls: Vec = v.iter().map(|x| !x).collect(); - nulls_set.append(&nulls); + v.iter().for_each(|x| null_mask.push(!x)); } else { - nulls_set.append(&vec![false; array.len()]); + null_mask.extend_from_bitslice(&BitVec::repeat(false, array.len())); } } - - nulls_set.buffer() + null_mask.into_vec() } macro_rules! convert_arrow_array_to_grpc_vals { diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index a2037d67b5..e61552e195 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -10,8 +10,8 @@ arc-swap = "1.0" arrow-format = { version = "0.4", features = ["ipc"] } async-stream = "0.3" async-trait = "0.1" -bitvec = "1.0" bytes = "1.1" +common-base = { path = "../common/base" } common-error = { path = "../common/error" } common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } diff --git a/src/storage/src/arrow_stream.rs b/src/storage/src/arrow_stream.rs index 174ad1e8de..3c49bd2d5f 100644 --- a/src/storage/src/arrow_stream.rs +++ b/src/storage/src/arrow_stream.rs @@ -5,6 +5,7 @@ use std::io::Read; use arrow_format::{self, ipc::planus::ReadAsRoot}; +use common_base::BitVec; use datatypes::arrow::{ datatypes::Schema, error::{ArrowError, Result}, @@ -14,8 +15,6 @@ use datatypes::arrow::{ }, }; -use crate::bit_vec; - const CONTINUATION_MARKER: [u8; 4] = [0xff; 4]; pub struct ArrowStreamReader { @@ -92,7 +91,7 @@ impl ArrowStreamReader { } fn valid_metadata(metadata: &StreamMetadata, column_null_mask: &[u8]) -> StreamMetadata { - let column_null_mask = bit_vec::BitVec::from_slice(column_null_mask); + let column_null_mask = BitVec::from_slice(column_null_mask); let schema = Schema::from( metadata diff --git a/src/storage/src/bit_vec.rs b/src/storage/src/bit_vec.rs deleted file mode 100644 index dd3529a81b..0000000000 --- a/src/storage/src/bit_vec.rs +++ /dev/null @@ -1,5 +0,0 @@ -use bitvec::prelude as bv; - -// `Lsb0` provides the best codegen for bit manipulation, -// see https://github.com/bitvecto-rs/bitvec/blob/main/doc/order/Lsb0.md -pub type BitVec = bv::BitVec; diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index 3901f12140..2dce68781f 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -1,7 +1,6 @@ //! Storage engine implementation. mod arrow_stream; mod background; -mod bit_vec; mod chunk; pub mod codec; pub mod config; diff --git a/src/storage/src/proto/wal.rs b/src/storage/src/proto/wal.rs index 2ad59e910b..012ba34c33 100644 --- a/src/storage/src/proto/wal.rs +++ b/src/storage/src/proto/wal.rs @@ -1,10 +1,9 @@ #![allow(clippy::all)] tonic::include_proto!("greptime.storage.wal.v1"); -use crate::{ - bit_vec, - write_batch::{Mutation, WriteBatch}, -}; +use common_base::BitVec; + +use crate::write_batch::{Mutation, WriteBatch}; pub fn gen_mutation_extras(write_batch: &WriteBatch) -> Vec { let column_schemas = write_batch.schema().column_schemas(); @@ -18,7 +17,7 @@ pub fn gen_mutation_extras(write_batch: &WriteBatch) -> Vec { column_null_mask: Default::default(), } } else { - let mut column_null_mask = bit_vec::BitVec::repeat(false, column_schemas.len()); + let mut column_null_mask = BitVec::repeat(false, column_schemas.len()); for (i, cs) in column_schemas.iter().enumerate() { if put.column_by_name(&cs.name).is_none() { column_null_mask.set(i, true); diff --git a/src/storage/src/proto/write_batch.rs b/src/storage/src/proto/write_batch.rs index fe29451e87..14e2c048a8 100644 --- a/src/storage/src/proto/write_batch.rs +++ b/src/storage/src/proto/write_batch.rs @@ -3,6 +3,7 @@ tonic::include_proto!("greptime.storage.write_batch.v1"); use std::sync::Arc; +use common_base::BitVec; use common_error::prelude::*; use datatypes::schema; use datatypes::{ @@ -20,8 +21,6 @@ use datatypes::{ use paste::paste; use snafu::OptionExt; -use crate::bit_vec; - #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("Failed to convert datafusion type: {}", from))] @@ -178,7 +177,7 @@ macro_rules! gen_columns { .with_context(|| ConversionSnafu { from: std::format!("{:?}", vector.as_ref().data_type()), })?; - let mut bits: Option = None; + let mut bits: Option = None; vector_ref .iter_data() @@ -187,7 +186,7 @@ macro_rules! gen_columns { Some($vari) => values.[<$key _values>].push($cast), None => { if (bits.is_none()) { - bits = Some(bit_vec::BitVec::repeat(false, vector_ref.len())); + bits = Some(BitVec::repeat(false, vector_ref.len())); } bits.as_mut().map(|x| x.set(i, true)); } @@ -237,7 +236,7 @@ macro_rules! gen_put_data { (0..num_rows) .for_each(|_| builder.push(vector_iter.next().map(|$vari| $cast))); } else { - bit_vec::BitVec::from_vec(column.value_null_mask) + BitVec::from_vec(column.value_null_mask) .into_iter() .take(num_rows) .for_each(|is_null| { diff --git a/src/storage/src/write_batch.rs b/src/storage/src/write_batch.rs index 38c98bae1d..7071c14347 100644 --- a/src/storage/src/write_batch.rs +++ b/src/storage/src/write_batch.rs @@ -432,6 +432,7 @@ pub mod codec { use std::{io::Cursor, sync::Arc}; + use common_base::BitVec; use datatypes::{ arrow::{ chunk::Chunk as ArrowChunk, @@ -454,18 +455,15 @@ pub mod codec { Error as WriteBatchError, FromProtobufSnafu, Mutation, ParseSchemaSnafu, Result, ToProtobufSnafu, WriteBatch, }; + use crate::proto::{ + wal::{MutationExtra, MutationType}, + write_batch::{self, gen_columns, gen_put_data_vector}, + }; use crate::write_batch::{DecodeProtobufSnafu, EncodeProtobufSnafu, PutData}; use crate::{ arrow_stream::ArrowStreamReader, codec::{Decoder, Encoder}, }; - use crate::{ - bit_vec, - proto::{ - wal::{MutationExtra, MutationType}, - write_batch::{self, gen_columns, gen_put_data_vector}, - }, - }; // TODO(jiachun): The codec logic is too complex, maybe we should use protobuf to // serialize/deserialize all our data. @@ -523,7 +521,7 @@ pub mod codec { } else { let valid_ipc_fields = ipc_fields .iter() - .zip(bit_vec::BitVec::from_slice(column_null_mask)) + .zip(BitVec::from_slice(column_null_mask)) .filter(|(_, is_null)| !*is_null) .map(|(ipc_field, _)| ipc_field.clone()) .collect::>(); @@ -644,13 +642,12 @@ pub mod codec { if ext.column_null_mask.is_empty() { gen_mutation_put(&column_names) } else { - let valid_columns = - bit_vec::BitVec::from_slice(&ext.column_null_mask) - .into_iter() - .zip(column_names.iter()) - .filter(|(is_null, _)| !*is_null) - .map(|(_, column_name)| column_name.clone()) - .collect::>(); + let valid_columns = BitVec::from_slice(&ext.column_null_mask) + .into_iter() + .zip(column_names.iter()) + .filter(|(is_null, _)| !*is_null) + .map(|(_, column_name)| column_name.clone()) + .collect::>(); gen_mutation_put(&valid_columns) } @@ -765,7 +762,7 @@ pub mod codec { .map(|column| (column.name.clone(), column.data_type.clone())) .collect::>() } else { - bit_vec::BitVec::from_slice(&ext.column_null_mask) + BitVec::from_slice(&ext.column_null_mask) .into_iter() .zip(column_schemas.iter()) .filter(|(is_null, _)| !*is_null)