diff --git a/src/datatypes/src/vectors/boolean.rs b/src/datatypes/src/vectors/boolean.rs index d28a825ee9..c1d98fa0f2 100644 --- a/src/datatypes/src/vectors/boolean.rs +++ b/src/datatypes/src/vectors/boolean.rs @@ -4,6 +4,8 @@ use std::sync::Arc; use arrow::array::{Array, ArrayRef, BooleanArray, MutableArray, MutableBooleanArray}; use arrow::bitmap::utils::{BitmapIter, ZipValidity}; +use arrow::bitmap::MutableBitmap; +use arrow::datatypes::DataType as ArrowDataType; use snafu::{OptionExt, ResultExt}; use crate::data_type::ConcreteDataType; @@ -59,6 +61,14 @@ impl>> FromIterator for BooleanVector { } } +impl From for BooleanVector { + fn from(bitmap: MutableBitmap) -> BooleanVector { + BooleanVector { + array: BooleanArray::new(ArrowDataType::Boolean, bitmap.into(), None), + } + } +} + impl Vector for BooleanVector { fn data_type(&self) -> ConcreteDataType { ConcreteDataType::boolean_datatype() @@ -327,4 +337,16 @@ mod tests { let expect: VectorRef = Arc::new(BooleanVector::from_slice(&[true, false, true])); assert_eq!(expect, vector); } + + #[test] + fn test_from_mutable_bitmap() { + let mut bitmap = MutableBitmap::new(); + let values = [false, true, true, false, true]; + for v in values { + bitmap.push(v); + } + let vector = BooleanVector::from(bitmap); + let expect = BooleanVector::from_slice(&values); + assert_eq!(expect, vector); + } } diff --git a/src/datatypes/src/vectors/null.rs b/src/datatypes/src/vectors/null.rs index 329210886e..84dd8f7567 100644 --- a/src/datatypes/src/vectors/null.rs +++ b/src/datatypes/src/vectors/null.rs @@ -175,6 +175,10 @@ impl MutableVector for NullVectorBuilder { pub(crate) fn replicate_null(vector: &NullVector, offsets: &[usize]) -> VectorRef { assert_eq!(offsets.len(), vector.len()); + if offsets.is_empty() { + return vector.slice(0, 0); + } + Arc::new(NullVector::new(*offsets.last().unwrap())) } diff --git a/src/datatypes/src/vectors/operations/replicate.rs b/src/datatypes/src/vectors/operations/replicate.rs index 8ed712fd40..1715777b3e 100644 --- a/src/datatypes/src/vectors/operations/replicate.rs +++ b/src/datatypes/src/vectors/operations/replicate.rs @@ -72,6 +72,11 @@ mod tests { #[test] fn test_replicate_null() { + let v = NullVector::new(0); + let offsets = []; + let v = v.replicate(&offsets); + assert!(v.is_empty()); + let v = NullVector::new(3); let offsets = [1, 3, 5]; diff --git a/src/storage/src/chunk.rs b/src/storage/src/chunk.rs index 3a94fb34d5..e816d6f60c 100644 --- a/src/storage/src/chunk.rs +++ b/src/storage/src/chunk.rs @@ -8,7 +8,7 @@ use table::predicate::Predicate; use crate::error::{self, Error, Result}; use crate::memtable::{IterContext, MemtableRef, MemtableSet}; -use crate::read::{BoxedBatchReader, MergeReaderBuilder}; +use crate::read::{BoxedBatchReader, DedupReader, MergeReaderBuilder}; use crate::schema::{ProjectedSchema, ProjectedSchemaRef, RegionSchemaRef}; use crate::sst::{AccessLayerRef, FileHandle, LevelMetas, ReadOptions, Visitor}; @@ -18,7 +18,7 @@ use crate::sst::{AccessLayerRef, FileHandle, LevelMetas, ReadOptions, Visitor}; // necessary to do so. pub struct ChunkReaderImpl { schema: ProjectedSchemaRef, - sst_reader: BoxedBatchReader, + batch_reader: BoxedBatchReader, } #[async_trait] @@ -30,7 +30,7 @@ impl ChunkReader for ChunkReaderImpl { } async fn next_chunk(&mut self) -> Result> { - let batch = match self.sst_reader.next_batch().await? { + let batch = match self.batch_reader.next_batch().await? { Some(b) => b, None => return Ok(None), }; @@ -42,8 +42,11 @@ impl ChunkReader for ChunkReaderImpl { } impl ChunkReaderImpl { - pub fn new(schema: ProjectedSchemaRef, sst_reader: BoxedBatchReader) -> ChunkReaderImpl { - ChunkReaderImpl { schema, sst_reader } + pub fn new(schema: ProjectedSchemaRef, batch_reader: BoxedBatchReader) -> ChunkReaderImpl { + ChunkReaderImpl { + schema, + batch_reader, + } } } @@ -142,6 +145,7 @@ impl ChunkReaderBuilder { } let reader = reader_builder.build(); + let reader = DedupReader::new(schema.clone(), reader); Ok(ChunkReaderImpl::new(schema, Box::new(reader))) } diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index 2a8bf019e8..af6cc3e1ad 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -246,6 +246,13 @@ pub enum Error { #[snafu(display("Failed to build batch, {}", msg))] BuildBatch { msg: String, backtrace: Backtrace }, + + #[snafu(display("Failed to filter column {}, source: {}", name, source))] + FilterColumn { + name: String, + #[snafu(backtrace)] + source: datatypes::error::Error, + }, } pub type Result = std::result::Result; @@ -276,7 +283,8 @@ impl ErrorExt for Error { | VersionNotFound { .. } | SequenceNotMonotonic { .. } | ConvertStoreSchema { .. } - | InvalidRawRegion { .. } => StatusCode::Unexpected, + | InvalidRawRegion { .. } + | FilterColumn { .. } => StatusCode::Unexpected, FlushIo { .. } | WriteParquet { .. } diff --git a/src/storage/src/read.rs b/src/storage/src/read.rs index 16102cbb9c..08b91f8077 100644 --- a/src/storage/src/read.rs +++ b/src/storage/src/read.rs @@ -1,21 +1,24 @@ //! Common structs and utilities for read. +mod dedup; mod merge; +use std::cmp::Ordering; + use async_trait::async_trait; +use datatypes::arrow::bitmap::MutableBitmap; use datatypes::data_type::DataType; use datatypes::prelude::ConcreteDataType; -use datatypes::vectors::{MutableVector, VectorRef}; +use datatypes::vectors::{BooleanVector, MutableVector, VectorRef}; +pub use dedup::DedupReader; pub use merge::{MergeReader, MergeReaderBuilder}; use snafu::{ensure, ResultExt}; use crate::error::{self, Result}; /// Storage internal representation of a batch of rows. -/// -/// `Batch` must contain at least one column, but might not hold any row. // Now the structure of `Batch` is still unstable, all pub fields may be changed. -#[derive(Debug, Default, PartialEq, Eq)] +#[derive(Debug, Default, PartialEq, Eq, Clone)] pub struct Batch { /// Rows organized in columnar format. /// @@ -28,9 +31,7 @@ impl Batch { /// Create a new `Batch` from `columns`. /// /// # Panics - /// Panics if - /// - `columns` is empty. - /// - vectors in `columns` have different length. + /// Panics if vectors in `columns` have different length. pub fn new(columns: Vec) -> Batch { Self::assert_columns(&columns); @@ -44,8 +45,7 @@ impl Batch { #[inline] pub fn num_rows(&self) -> usize { - // The invariant of `Batch::new()` ensure columns isn't empty. - self.columns[0].len() + self.columns.get(0).map(|v| v.len()).unwrap_or(0) } #[inline] @@ -77,12 +77,50 @@ impl Batch { } fn assert_columns(columns: &[VectorRef]) { - assert!(!columns.is_empty()); + if columns.is_empty() { + return; + } + let length = columns[0].len(); assert!(columns.iter().all(|col| col.len() == length)); } } +/// Compute operations for Batch. +pub trait BatchOp { + /// Compare `i-th` in `left` to `j-th` row in `right` by key (row key + internal columns). + /// + /// The caller should ensure `left` and `right` have same schema as `self`. + /// + /// # Panics + /// Panics if + /// - `i` or `j` is out of bound. + /// - `left` or `right` has insufficient column num. + fn compare_row(&self, left: &Batch, i: usize, right: &Batch, j: usize) -> Ordering; + + /// Dedup rows in `batch` by row key. + /// + /// If `prev` is `Some` and not empty, the last row of `prev` would be used to dedup + /// current `batch`. Set `i-th` bit of `selected` to `true` if we need to keep `i-th` + /// row. So the caller could use `selected` to build a [BooleanVector] to filter the + /// batch. + /// + /// The caller must ensure `selected` is initialized by filling `batch.num_rows()` bits + /// to zero. + /// + /// # Panics + /// Panics if `batch` and `prev` have different number of columns (unless `prev` is + /// empty). + fn dedup(&self, batch: &Batch, selected: &mut MutableBitmap, prev: Option<&Batch>); + + /// Filters the `batch`, returns elements matching the `filter` (i.e. where the values + /// are true). + /// + /// Note that the nulls of `filter` are interpreted as `false` will lead to these elements + /// being masked out. + fn filter(&self, batch: &Batch, filter: &BooleanVector) -> Result; +} + /// Reusable [Batch] builder. pub struct BatchBuilder { builders: Vec>, diff --git a/src/storage/src/read/dedup.rs b/src/storage/src/read/dedup.rs new file mode 100644 index 0000000000..6d6f93bdf7 --- /dev/null +++ b/src/storage/src/read/dedup.rs @@ -0,0 +1,164 @@ +use async_trait::async_trait; +use datatypes::arrow::bitmap::MutableBitmap; +use datatypes::vectors::BooleanVector; + +use crate::error::Result; +use crate::read::{Batch, BatchOp, BatchReader}; +use crate::schema::ProjectedSchemaRef; + +/// A reader that dedup rows from inner reader. +pub struct DedupReader { + /// Projected schema to read. + schema: ProjectedSchemaRef, + /// The inner reader. + reader: R, + /// Previous batch from the reader. + prev_batch: Option, +} + +impl DedupReader { + pub fn new(schema: ProjectedSchemaRef, reader: R) -> DedupReader { + DedupReader { + schema, + reader, + prev_batch: None, + } + } + + /// Take `batch` and then returns a new batch with no duplicated rows. + /// + /// This method may returns empty `Batch`. + fn dedup_batch(&mut self, batch: Batch) -> Result { + if batch.is_empty() { + // No need to update `prev_batch` if current batch is empty. + return Ok(batch); + } + + // The `arrow` filter needs `BooleanArray` as input so there is no convenient + // and efficient way to reuse the bitmap. Though we could use `MutableBooleanArray`, + // but we couldn't zero all bits in the mutable array easily. + let mut selected = MutableBitmap::from_len_zeroed(batch.num_rows()); + self.schema + .dedup(&batch, &mut selected, self.prev_batch.as_ref()); + + // Store current batch to `prev_batch` so we could compare the next batch + // with this batch. We store batch before filtering it mainly for correctness, as + // once we supports `DELETE`, rows with `OpType::Delete` would be removed from the + // batch after filter, then we may store an incorrect `last row` of previous batch. + self.prev_batch + .get_or_insert_with(Batch::default) + .clone_from(&batch); // Use `clone_from` to reuse allocated memory if possible. + + // TODO(yingwen): To support `DELETE`, we could find all rows whose op_types are equal + // to `OpType::Delete`, mark their `selected` to false, then filter the batch. + + let filter = BooleanVector::from(selected); + // Filter duplicate rows. + self.schema.filter(&batch, &filter) + } +} + +#[async_trait] +impl BatchReader for DedupReader { + async fn next_batch(&mut self) -> Result> { + while let Some(batch) = self.reader.next_batch().await? { + let filtered = self.dedup_batch(batch)?; + // Skip empty batch. + if !filtered.is_empty() { + return Ok(Some(filtered)); + } + } + + Ok(None) + } +} + +#[cfg(test)] +mod tests { + use store_api::storage::OpType; + + use super::*; + use crate::test_util::read_util; + + #[tokio::test] + async fn test_dedup_reader_empty() { + let schema = read_util::new_projected_schema(); + let reader = read_util::build_vec_reader(&[]); + let mut reader = DedupReader::new(schema, reader); + + assert!(reader.next_batch().await.unwrap().is_none()); + // Call next_batch() again is allowed. + assert!(reader.next_batch().await.unwrap().is_none()); + } + + #[tokio::test] + async fn test_dedup_by_sequence() { + let schema = read_util::new_projected_schema(); + let reader = read_util::build_full_vec_reader(&[ + // key, value, sequence, op_type + &[ + (100, 1, 1000, OpType::Put), + (100, 2, 999, OpType::Put), + (100, 3, 998, OpType::Put), + (101, 1, 1000, OpType::Put), + ], + &[ + (101, 2, 999, OpType::Put), + (102, 12, 1000, OpType::Put), + (103, 13, 1000, OpType::Put), + ], + &[(103, 2, 999, OpType::Put)], + ]); + let mut reader = DedupReader::new(schema, reader); + + let result = read_util::collect_kv_batch(&mut reader).await; + let expect = [ + (100, Some(1)), + (101, Some(1)), + (102, Some(12)), + (103, Some(13)), + ]; + assert_eq!(&expect, &result[..]); + } + + #[tokio::test] + async fn test_dedup_contains_empty_input() { + let schema = read_util::new_projected_schema(); + let reader = read_util::build_full_vec_reader(&[ + // key, value, sequence, op_type + &[ + (100, 1, 1000, OpType::Put), + (100, 2, 999, OpType::Put), + (101, 1, 1000, OpType::Put), + ], + &[], + &[(101, 2, 999, OpType::Put), (102, 12, 1000, OpType::Put)], + ]); + let mut reader = DedupReader::new(schema, reader); + + let result = read_util::collect_kv_batch(&mut reader).await; + let expect = [(100, Some(1)), (101, Some(1)), (102, Some(12))]; + assert_eq!(&expect, &result[..]); + } + + #[tokio::test] + async fn test_dedup_contains_empty_output() { + let schema = read_util::new_projected_schema(); + let reader = read_util::build_full_vec_reader(&[ + // key, value, sequence, op_type + &[ + (100, 1, 1000, OpType::Put), + (100, 2, 999, OpType::Put), + (101, 1, 1000, OpType::Put), + ], + &[(101, 2, 999, OpType::Put)], + &[(101, 3, 998, OpType::Put), (101, 4, 997, OpType::Put)], + &[(102, 12, 998, OpType::Put)], + ]); + let mut reader = DedupReader::new(schema, reader); + + let result = read_util::collect_kv_batch(&mut reader).await; + let expect = [(100, Some(1)), (101, Some(1)), (102, Some(12))]; + assert_eq!(&expect, &result[..]); + } +} diff --git a/src/storage/src/read/merge.rs b/src/storage/src/read/merge.rs index a1f69b2e34..a620141830 100644 --- a/src/storage/src/read/merge.rs +++ b/src/storage/src/read/merge.rs @@ -49,7 +49,7 @@ use store_api::storage::consts; use crate::error::Result; use crate::memtable::BoxedBatchIterator; -use crate::read::{Batch, BatchBuilder, BatchReader, BoxedBatchReader}; +use crate::read::{Batch, BatchBuilder, BatchOp, BatchReader, BoxedBatchReader}; use crate::schema::{ProjectedSchema, ProjectedSchemaRef}; /// Batch data source. @@ -98,7 +98,7 @@ struct RowCursor<'a> { impl<'a> RowCursor<'a> { #[inline] fn compare(&self, schema: &ProjectedSchema, other: &RowCursor) -> Ordering { - schema.compare_row_of_batch(self.batch, self.pos, other.batch, other.pos) + schema.compare_row(self.batch, self.pos, other.batch, other.pos) } } diff --git a/src/storage/src/region/tests/flush.rs b/src/storage/src/region/tests/flush.rs index 17d94c273f..fdf4e2528d 100644 --- a/src/storage/src/region/tests/flush.rs +++ b/src/storage/src/region/tests/flush.rs @@ -122,7 +122,6 @@ async fn test_flush_and_stall() { let store_dir = dir.path().to_str().unwrap(); let flush_switch = Arc::new(FlushSwitch::default()); - // Always trigger flush before write. let tester = FlushTester::new(store_dir, flush_switch.clone()).await; let data = [(1000, Some(100))]; @@ -182,7 +181,6 @@ async fn test_read_after_flush() { let store_dir = dir.path().to_str().unwrap(); let flush_switch = Arc::new(FlushSwitch::default()); - // Always trigger flush before write. let tester = FlushTester::new(store_dir, flush_switch.clone()).await; // Put elements so we have content to flush. @@ -209,3 +207,48 @@ async fn test_read_after_flush() { let output = tester.full_scan().await; assert_eq!(expect, output); } + +#[tokio::test] +async fn test_merge_read_after_flush() { + let dir = TempDir::new("merge-read-flush").unwrap(); + let store_dir = dir.path().to_str().unwrap(); + + let flush_switch = Arc::new(FlushSwitch::default()); + let tester = FlushTester::new(store_dir, flush_switch.clone()).await; + + // Put elements so we have content to flush (In SST1). + tester.put(&[(3000, Some(300))]).await; + tester.put(&[(2000, Some(200))]).await; + + // Now set should flush to true to trigger flush. + flush_switch.set_should_flush(true); + + // Put element to trigger flush (In SST2). + tester.put(&[(2000, Some(201))]).await; + tester.wait_flush_done().await; + + // Disable flush. + flush_switch.set_should_flush(false); + // In SST2. + tester.put(&[(2000, Some(202))]).await; + tester.put(&[(1000, Some(100))]).await; + + // Enable flush. + flush_switch.set_should_flush(true); + // Trigger flush and overwrite row (In memtable). + tester.put(&[(2000, Some(203))]).await; + tester.wait_flush_done().await; + + let expect = vec![(1000, Some(100)), (2000, Some(203)), (3000, Some(300))]; + + let output = tester.full_scan().await; + assert_eq!(expect, output); + + // Reopen + let mut tester = tester; + tester.reopen().await; + + // Scan after reopen. + let output = tester.full_scan().await; + assert_eq!(expect, output); +} diff --git a/src/storage/src/schema.rs b/src/storage/src/schema.rs index 0e5ae4f50a..29c0a16f9e 100644 --- a/src/storage/src/schema.rs +++ b/src/storage/src/schema.rs @@ -4,16 +4,18 @@ use std::sync::Arc; use common_error::prelude::*; use datatypes::arrow::array::Array; +use datatypes::arrow::bitmap::MutableBitmap; use datatypes::arrow::chunk::Chunk as ArrowChunk; use datatypes::arrow::datatypes::Schema as ArrowSchema; use datatypes::schema::Metadata; -use datatypes::vectors::{Helper, VectorRef}; +use datatypes::vectors::{BooleanVector, Helper, VectorRef}; use serde::{Deserialize, Serialize}; use snafu::ensure; use store_api::storage::{consts, Chunk, ColumnId, ColumnSchema, Schema, SchemaBuilder, SchemaRef}; +use crate::error; use crate::metadata::{ColumnMetadata, ColumnsMetadata, ColumnsMetadataRef}; -use crate::read::Batch; +use crate::read::{Batch, BatchOp}; const ROW_KEY_END_KEY: &str = "greptime:storage:row_key_end"; const USER_COLUMN_END_KEY: &str = "greptime:storage:user_column_end"; @@ -231,20 +233,6 @@ impl StoreSchema { Ok(Batch::new(columns)) } - fn compare_row_of_batch(&self, left: &Batch, i: usize, right: &Batch, j: usize) -> Ordering { - let indices = self.full_key_indices(); - for idx in indices { - let (left_col, right_col) = (left.column(idx), right.column(idx)); - // Comparision of vector is done by virtual method calls currently. Consider using - // enum dispatch if this becomes bottleneck. - let order = left_col.get_ref(i).cmp(&right_col.get_ref(j)); - if order != Ordering::Equal { - return order; - } - } - Ordering::Equal - } - fn from_columns_metadata(columns: &ColumnsMetadata, version: u32) -> Result { let column_schemas: Vec<_> = columns .iter_all_columns() @@ -315,13 +303,6 @@ impl StoreSchema { fn num_columns(&self) -> usize { self.schema.num_columns() } - - fn full_key_indices(&self) -> impl Iterator { - // row key, sequence, op_type - (0..self.row_key_end) - .chain(std::iter::once(self.sequence_index())) - .chain(std::iter::once(self.op_type_index())) - } } impl TryFrom for StoreSchema { @@ -552,25 +533,6 @@ impl ProjectedSchema { Batch::new(columns) } - /// Compare `i-th` in `left` to `j-th` row in `right` by key (row key + internal columns). - /// - /// The caller should ensure `left` and `right` have same schema as `self.schema_to_read()`. - /// - /// # Panics - /// Panics if - /// - `i` or `j` is out of bound. - /// - `left` or `right` has insufficient column num. - #[inline] - pub fn compare_row_of_batch( - &self, - left: &Batch, - i: usize, - right: &Batch, - j: usize, - ) -> Ordering { - self.schema_to_read.compare_row_of_batch(left, i, right, j) - } - fn build_schema_to_read( region_schema: &RegionSchema, projection: &Projection, @@ -652,6 +614,65 @@ impl ProjectedSchema { } } +impl BatchOp for ProjectedSchema { + fn compare_row(&self, left: &Batch, i: usize, right: &Batch, j: usize) -> Ordering { + // Ordered by (row_key asc, sequence desc, op_type desc). + let indices = self.schema_to_read.row_key_indices(); + for idx in indices { + let (left_col, right_col) = (left.column(idx), right.column(idx)); + // Comparision of vector is done by virtual method calls currently. Consider using + // enum dispatch if this becomes bottleneck. + let order = left_col.get_ref(i).cmp(&right_col.get_ref(j)); + if order != Ordering::Equal { + return order; + } + } + let (sequence_index, op_type_index) = ( + self.schema_to_read.sequence_index(), + self.schema_to_read.op_type_index(), + ); + right + .column(sequence_index) + .get_ref(j) + .cmp(&left.column(sequence_index).get_ref(i)) + .then_with(|| { + right + .column(op_type_index) + .get_ref(j) + .cmp(&left.column(op_type_index).get_ref(i)) + }) + } + + fn dedup(&self, batch: &Batch, selected: &mut MutableBitmap, prev: Option<&Batch>) { + if let Some(prev) = prev { + assert_eq!(batch.num_columns(), prev.num_columns()); + } + let indices = self.schema_to_read.row_key_indices(); + for idx in indices { + let (current, prev_col) = ( + batch.column(idx), + prev.map(|prev| prev.column(idx).as_ref()), + ); + current.dedup(selected, prev_col); + } + } + + fn filter(&self, batch: &Batch, filter: &BooleanVector) -> error::Result { + let columns = batch + .columns() + .iter() + .enumerate() + .map(|(i, v)| { + v.filter(filter).context(error::FilterColumnSnafu { + name: self.schema_to_read.column_name(i), + }) + }) + .collect::>>()?; + + Ok(Batch::new(columns)) + } +} + fn parse_index_from_metadata(metadata: &Metadata, key: &str) -> Result { let value = metadata.get(key).context(MissingMetaSnafu { key })?; value.parse().context(ParseIndexSnafu { value }) @@ -673,12 +694,14 @@ fn build_user_schema(columns: &ColumnsMetadata, version: u32) -> Result #[cfg(test)] mod tests { + use datatypes::prelude::ScalarVector; use datatypes::type_id::LogicalTypeId; - use datatypes::vectors::{Int64Vector, UInt64Vector, UInt8Vector}; + use datatypes::vectors::{Int64Vector, TimestampVector, UInt64Vector, UInt8Vector}; + use store_api::storage::OpType; use super::*; use crate::metadata::RegionMetadata; - use crate::test_util::{descriptor_util, schema_util}; + use crate::test_util::{descriptor_util, read_util, schema_util}; fn new_batch() -> Batch { let k0 = Int64Vector::from_slice(&[1, 2, 3]); @@ -934,4 +957,48 @@ mod tests { .unwrap(); assert!(matches!(err, Error::InvalidProjection { .. })); } + + #[test] + fn test_compare_batch() { + let schema = read_util::new_projected_schema(); + let left = read_util::new_full_kv_batch(&[(1000, 1, 1000, OpType::Put)]); + let right = read_util::new_full_kv_batch(&[ + (999, 1, 1000, OpType::Put), + (1000, 1, 999, OpType::Put), + (1000, 1, 1000, OpType::Put), + ]); + + assert_eq!(Ordering::Greater, schema.compare_row(&left, 0, &right, 0)); + assert_eq!(Ordering::Less, schema.compare_row(&left, 0, &right, 1)); + assert_eq!(Ordering::Equal, schema.compare_row(&left, 0, &right, 2)); + } + + #[test] + fn test_dedup_batch() { + let schema = read_util::new_projected_schema(); + let batch = read_util::new_kv_batch(&[(1000, Some(1)), (2000, Some(2)), (2000, Some(2))]); + let mut selected = MutableBitmap::from_len_zeroed(3); + + schema.dedup(&batch, &mut selected, None); + assert!(selected.get(0)); + assert!(selected.get(1)); + assert!(!selected.get(2)); + + let prev = read_util::new_kv_batch(&[(1000, Some(1))]); + schema.dedup(&batch, &mut selected, Some(&prev)); + assert!(!selected.get(0)); + assert!(selected.get(1)); + assert!(!selected.get(2)); + } + + #[test] + fn test_filter_batch() { + let schema = read_util::new_projected_schema(); + let batch = read_util::new_kv_batch(&[(1000, Some(1)), (2000, Some(2)), (3000, Some(3))]); + let filter = BooleanVector::from_slice(&[true, false, true]); + + let res = schema.filter(&batch, &filter).unwrap(); + let expect: VectorRef = Arc::new(TimestampVector::from_values([1000, 3000])); + assert_eq!(expect, *res.column(0)); + } } diff --git a/src/storage/src/test_util/read_util.rs b/src/storage/src/test_util/read_util.rs index d18868682b..b15b43a004 100644 --- a/src/storage/src/test_util/read_util.rs +++ b/src/storage/src/test_util/read_util.rs @@ -4,6 +4,7 @@ use async_trait::async_trait; use datatypes::prelude::ScalarVector; use datatypes::type_id::LogicalTypeId; use datatypes::vectors::{Int64Vector, TimestampVector, UInt64Vector, UInt8Vector}; +use store_api::storage::OpType; use crate::error::Result; use crate::memtable::{BatchIterator, BoxedBatchIterator, RowOrdering}; @@ -29,7 +30,7 @@ pub fn new_projected_schema() -> ProjectedSchemaRef { } /// Build a new batch, with 0 sequence and op_type. -fn new_kv_batch(key_values: &[(i64, Option)]) -> Batch { +pub fn new_kv_batch(key_values: &[(i64, Option)]) -> Batch { let key = Arc::new(TimestampVector::from_values(key_values.iter().map(|v| v.0))); let value = Arc::new(Int64Vector::from_iter(key_values.iter().map(|v| v.1))); let sequences = Arc::new(UInt64Vector::from_vec(vec![0; key_values.len()])); @@ -38,6 +39,18 @@ fn new_kv_batch(key_values: &[(i64, Option)]) -> Batch { Batch::new(vec![key, value, sequences, op_types]) } +/// Build a new batch from (key, value, sequence, op_type) +pub fn new_full_kv_batch(all_values: &[(i64, i64, u64, OpType)]) -> Batch { + let key = Arc::new(TimestampVector::from_values(all_values.iter().map(|v| v.0))); + let value = Arc::new(Int64Vector::from_values(all_values.iter().map(|v| v.1))); + let sequences = Arc::new(UInt64Vector::from_values(all_values.iter().map(|v| v.2))); + let op_types = Arc::new(UInt8Vector::from_values( + all_values.iter().map(|v| v.3.as_u8()), + )); + + Batch::new(vec![key, value, sequences, op_types]) +} + fn check_kv_batch(batches: &[Batch], expect: &[&[(i64, Option)]]) { for (batch, key_values) in batches.iter().zip(expect.iter()) { let key = batch @@ -145,6 +158,15 @@ pub fn build_vec_reader(batches: &[&[(i64, Option)]]) -> VecBatchReader { VecBatchReader::new(batches) } +pub fn build_full_vec_reader(batches: &[&[(i64, i64, u64, OpType)]]) -> VecBatchReader { + let batches: Vec<_> = batches + .iter() + .map(|key_values| new_full_kv_batch(key_values)) + .collect(); + + VecBatchReader::new(batches) +} + pub fn build_boxed_reader(batches: &[&[(i64, Option)]]) -> BoxedBatchReader { Box::new(build_vec_reader(batches)) }