feat: Implement dedup reader (#270)

* feat: Handle empty NullVector in replicate_null

* chore: Rename ChunkReaderImpl::sst_reader to batch_reader

* feat: dedup reader wip

* feat: Add BatchOp

Add BatchOp to support dedup/filter Batch and implement BatchOp for
ProjectedSchema.

Moves compare_row_of_batch to BatchOp::compare_row.

* feat: Allow Batch has empty columns

* feat: Implement DedupReader

Also add From<MutableBitmap> for BooleanVector

* test: Test dedup reader

Fix issue that compare_row compare by full key not row key

* chore: Add comments to BatchOp

* feat: Dedup results from merge reader

* test: Test merge read after flush

* test: Test merge read after flush and reopen

* test: Test replicate empty NullVector

* test: Add tests for `ProjectedSchema::dedup/filter`

* feat: Filter empty batch in DedepReader

Also fix clippy warnings and refactor some codes
This commit is contained in:
evenyag
2022-09-21 17:49:53 +08:00
committed by GitHub
parent 9489862417
commit a954ba862a
11 changed files with 442 additions and 65 deletions

View File

@@ -4,6 +4,8 @@ use std::sync::Arc;
use arrow::array::{Array, ArrayRef, BooleanArray, MutableArray, MutableBooleanArray};
use arrow::bitmap::utils::{BitmapIter, ZipValidity};
use arrow::bitmap::MutableBitmap;
use arrow::datatypes::DataType as ArrowDataType;
use snafu::{OptionExt, ResultExt};
use crate::data_type::ConcreteDataType;
@@ -59,6 +61,14 @@ impl<Ptr: Borrow<Option<bool>>> FromIterator<Ptr> for BooleanVector {
}
}
impl From<MutableBitmap> for BooleanVector {
fn from(bitmap: MutableBitmap) -> BooleanVector {
BooleanVector {
array: BooleanArray::new(ArrowDataType::Boolean, bitmap.into(), None),
}
}
}
impl Vector for BooleanVector {
fn data_type(&self) -> ConcreteDataType {
ConcreteDataType::boolean_datatype()
@@ -327,4 +337,16 @@ mod tests {
let expect: VectorRef = Arc::new(BooleanVector::from_slice(&[true, false, true]));
assert_eq!(expect, vector);
}
#[test]
fn test_from_mutable_bitmap() {
let mut bitmap = MutableBitmap::new();
let values = [false, true, true, false, true];
for v in values {
bitmap.push(v);
}
let vector = BooleanVector::from(bitmap);
let expect = BooleanVector::from_slice(&values);
assert_eq!(expect, vector);
}
}

View File

@@ -175,6 +175,10 @@ impl MutableVector for NullVectorBuilder {
pub(crate) fn replicate_null(vector: &NullVector, offsets: &[usize]) -> VectorRef {
assert_eq!(offsets.len(), vector.len());
if offsets.is_empty() {
return vector.slice(0, 0);
}
Arc::new(NullVector::new(*offsets.last().unwrap()))
}

View File

@@ -72,6 +72,11 @@ mod tests {
#[test]
fn test_replicate_null() {
let v = NullVector::new(0);
let offsets = [];
let v = v.replicate(&offsets);
assert!(v.is_empty());
let v = NullVector::new(3);
let offsets = [1, 3, 5];

View File

@@ -8,7 +8,7 @@ use table::predicate::Predicate;
use crate::error::{self, Error, Result};
use crate::memtable::{IterContext, MemtableRef, MemtableSet};
use crate::read::{BoxedBatchReader, MergeReaderBuilder};
use crate::read::{BoxedBatchReader, DedupReader, MergeReaderBuilder};
use crate::schema::{ProjectedSchema, ProjectedSchemaRef, RegionSchemaRef};
use crate::sst::{AccessLayerRef, FileHandle, LevelMetas, ReadOptions, Visitor};
@@ -18,7 +18,7 @@ use crate::sst::{AccessLayerRef, FileHandle, LevelMetas, ReadOptions, Visitor};
// necessary to do so.
pub struct ChunkReaderImpl {
schema: ProjectedSchemaRef,
sst_reader: BoxedBatchReader,
batch_reader: BoxedBatchReader,
}
#[async_trait]
@@ -30,7 +30,7 @@ impl ChunkReader for ChunkReaderImpl {
}
async fn next_chunk(&mut self) -> Result<Option<Chunk>> {
let batch = match self.sst_reader.next_batch().await? {
let batch = match self.batch_reader.next_batch().await? {
Some(b) => b,
None => return Ok(None),
};
@@ -42,8 +42,11 @@ impl ChunkReader for ChunkReaderImpl {
}
impl ChunkReaderImpl {
pub fn new(schema: ProjectedSchemaRef, sst_reader: BoxedBatchReader) -> ChunkReaderImpl {
ChunkReaderImpl { schema, sst_reader }
pub fn new(schema: ProjectedSchemaRef, batch_reader: BoxedBatchReader) -> ChunkReaderImpl {
ChunkReaderImpl {
schema,
batch_reader,
}
}
}
@@ -142,6 +145,7 @@ impl ChunkReaderBuilder {
}
let reader = reader_builder.build();
let reader = DedupReader::new(schema.clone(), reader);
Ok(ChunkReaderImpl::new(schema, Box::new(reader)))
}

View File

@@ -246,6 +246,13 @@ pub enum Error {
#[snafu(display("Failed to build batch, {}", msg))]
BuildBatch { msg: String, backtrace: Backtrace },
#[snafu(display("Failed to filter column {}, source: {}", name, source))]
FilterColumn {
name: String,
#[snafu(backtrace)]
source: datatypes::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -276,7 +283,8 @@ impl ErrorExt for Error {
| VersionNotFound { .. }
| SequenceNotMonotonic { .. }
| ConvertStoreSchema { .. }
| InvalidRawRegion { .. } => StatusCode::Unexpected,
| InvalidRawRegion { .. }
| FilterColumn { .. } => StatusCode::Unexpected,
FlushIo { .. }
| WriteParquet { .. }

View File

@@ -1,21 +1,24 @@
//! Common structs and utilities for read.
mod dedup;
mod merge;
use std::cmp::Ordering;
use async_trait::async_trait;
use datatypes::arrow::bitmap::MutableBitmap;
use datatypes::data_type::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::vectors::{MutableVector, VectorRef};
use datatypes::vectors::{BooleanVector, MutableVector, VectorRef};
pub use dedup::DedupReader;
pub use merge::{MergeReader, MergeReaderBuilder};
use snafu::{ensure, ResultExt};
use crate::error::{self, Result};
/// Storage internal representation of a batch of rows.
///
/// `Batch` must contain at least one column, but might not hold any row.
// Now the structure of `Batch` is still unstable, all pub fields may be changed.
#[derive(Debug, Default, PartialEq, Eq)]
#[derive(Debug, Default, PartialEq, Eq, Clone)]
pub struct Batch {
/// Rows organized in columnar format.
///
@@ -28,9 +31,7 @@ impl Batch {
/// Create a new `Batch` from `columns`.
///
/// # Panics
/// Panics if
/// - `columns` is empty.
/// - vectors in `columns` have different length.
/// Panics if vectors in `columns` have different length.
pub fn new(columns: Vec<VectorRef>) -> Batch {
Self::assert_columns(&columns);
@@ -44,8 +45,7 @@ impl Batch {
#[inline]
pub fn num_rows(&self) -> usize {
// The invariant of `Batch::new()` ensure columns isn't empty.
self.columns[0].len()
self.columns.get(0).map(|v| v.len()).unwrap_or(0)
}
#[inline]
@@ -77,12 +77,50 @@ impl Batch {
}
fn assert_columns(columns: &[VectorRef]) {
assert!(!columns.is_empty());
if columns.is_empty() {
return;
}
let length = columns[0].len();
assert!(columns.iter().all(|col| col.len() == length));
}
}
/// Compute operations for Batch.
pub trait BatchOp {
/// Compare `i-th` in `left` to `j-th` row in `right` by key (row key + internal columns).
///
/// The caller should ensure `left` and `right` have same schema as `self`.
///
/// # Panics
/// Panics if
/// - `i` or `j` is out of bound.
/// - `left` or `right` has insufficient column num.
fn compare_row(&self, left: &Batch, i: usize, right: &Batch, j: usize) -> Ordering;
/// Dedup rows in `batch` by row key.
///
/// If `prev` is `Some` and not empty, the last row of `prev` would be used to dedup
/// current `batch`. Set `i-th` bit of `selected` to `true` if we need to keep `i-th`
/// row. So the caller could use `selected` to build a [BooleanVector] to filter the
/// batch.
///
/// The caller must ensure `selected` is initialized by filling `batch.num_rows()` bits
/// to zero.
///
/// # Panics
/// Panics if `batch` and `prev` have different number of columns (unless `prev` is
/// empty).
fn dedup(&self, batch: &Batch, selected: &mut MutableBitmap, prev: Option<&Batch>);
/// Filters the `batch`, returns elements matching the `filter` (i.e. where the values
/// are true).
///
/// Note that the nulls of `filter` are interpreted as `false` will lead to these elements
/// being masked out.
fn filter(&self, batch: &Batch, filter: &BooleanVector) -> Result<Batch>;
}
/// Reusable [Batch] builder.
pub struct BatchBuilder {
builders: Vec<Box<dyn MutableVector>>,

View File

@@ -0,0 +1,164 @@
use async_trait::async_trait;
use datatypes::arrow::bitmap::MutableBitmap;
use datatypes::vectors::BooleanVector;
use crate::error::Result;
use crate::read::{Batch, BatchOp, BatchReader};
use crate::schema::ProjectedSchemaRef;
/// A reader that dedup rows from inner reader.
pub struct DedupReader<R> {
/// Projected schema to read.
schema: ProjectedSchemaRef,
/// The inner reader.
reader: R,
/// Previous batch from the reader.
prev_batch: Option<Batch>,
}
impl<R> DedupReader<R> {
pub fn new(schema: ProjectedSchemaRef, reader: R) -> DedupReader<R> {
DedupReader {
schema,
reader,
prev_batch: None,
}
}
/// Take `batch` and then returns a new batch with no duplicated rows.
///
/// This method may returns empty `Batch`.
fn dedup_batch(&mut self, batch: Batch) -> Result<Batch> {
if batch.is_empty() {
// No need to update `prev_batch` if current batch is empty.
return Ok(batch);
}
// The `arrow` filter needs `BooleanArray` as input so there is no convenient
// and efficient way to reuse the bitmap. Though we could use `MutableBooleanArray`,
// but we couldn't zero all bits in the mutable array easily.
let mut selected = MutableBitmap::from_len_zeroed(batch.num_rows());
self.schema
.dedup(&batch, &mut selected, self.prev_batch.as_ref());
// Store current batch to `prev_batch` so we could compare the next batch
// with this batch. We store batch before filtering it mainly for correctness, as
// once we supports `DELETE`, rows with `OpType::Delete` would be removed from the
// batch after filter, then we may store an incorrect `last row` of previous batch.
self.prev_batch
.get_or_insert_with(Batch::default)
.clone_from(&batch); // Use `clone_from` to reuse allocated memory if possible.
// TODO(yingwen): To support `DELETE`, we could find all rows whose op_types are equal
// to `OpType::Delete`, mark their `selected` to false, then filter the batch.
let filter = BooleanVector::from(selected);
// Filter duplicate rows.
self.schema.filter(&batch, &filter)
}
}
#[async_trait]
impl<R: BatchReader> BatchReader for DedupReader<R> {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
while let Some(batch) = self.reader.next_batch().await? {
let filtered = self.dedup_batch(batch)?;
// Skip empty batch.
if !filtered.is_empty() {
return Ok(Some(filtered));
}
}
Ok(None)
}
}
#[cfg(test)]
mod tests {
use store_api::storage::OpType;
use super::*;
use crate::test_util::read_util;
#[tokio::test]
async fn test_dedup_reader_empty() {
let schema = read_util::new_projected_schema();
let reader = read_util::build_vec_reader(&[]);
let mut reader = DedupReader::new(schema, reader);
assert!(reader.next_batch().await.unwrap().is_none());
// Call next_batch() again is allowed.
assert!(reader.next_batch().await.unwrap().is_none());
}
#[tokio::test]
async fn test_dedup_by_sequence() {
let schema = read_util::new_projected_schema();
let reader = read_util::build_full_vec_reader(&[
// key, value, sequence, op_type
&[
(100, 1, 1000, OpType::Put),
(100, 2, 999, OpType::Put),
(100, 3, 998, OpType::Put),
(101, 1, 1000, OpType::Put),
],
&[
(101, 2, 999, OpType::Put),
(102, 12, 1000, OpType::Put),
(103, 13, 1000, OpType::Put),
],
&[(103, 2, 999, OpType::Put)],
]);
let mut reader = DedupReader::new(schema, reader);
let result = read_util::collect_kv_batch(&mut reader).await;
let expect = [
(100, Some(1)),
(101, Some(1)),
(102, Some(12)),
(103, Some(13)),
];
assert_eq!(&expect, &result[..]);
}
#[tokio::test]
async fn test_dedup_contains_empty_input() {
let schema = read_util::new_projected_schema();
let reader = read_util::build_full_vec_reader(&[
// key, value, sequence, op_type
&[
(100, 1, 1000, OpType::Put),
(100, 2, 999, OpType::Put),
(101, 1, 1000, OpType::Put),
],
&[],
&[(101, 2, 999, OpType::Put), (102, 12, 1000, OpType::Put)],
]);
let mut reader = DedupReader::new(schema, reader);
let result = read_util::collect_kv_batch(&mut reader).await;
let expect = [(100, Some(1)), (101, Some(1)), (102, Some(12))];
assert_eq!(&expect, &result[..]);
}
#[tokio::test]
async fn test_dedup_contains_empty_output() {
let schema = read_util::new_projected_schema();
let reader = read_util::build_full_vec_reader(&[
// key, value, sequence, op_type
&[
(100, 1, 1000, OpType::Put),
(100, 2, 999, OpType::Put),
(101, 1, 1000, OpType::Put),
],
&[(101, 2, 999, OpType::Put)],
&[(101, 3, 998, OpType::Put), (101, 4, 997, OpType::Put)],
&[(102, 12, 998, OpType::Put)],
]);
let mut reader = DedupReader::new(schema, reader);
let result = read_util::collect_kv_batch(&mut reader).await;
let expect = [(100, Some(1)), (101, Some(1)), (102, Some(12))];
assert_eq!(&expect, &result[..]);
}
}

View File

@@ -49,7 +49,7 @@ use store_api::storage::consts;
use crate::error::Result;
use crate::memtable::BoxedBatchIterator;
use crate::read::{Batch, BatchBuilder, BatchReader, BoxedBatchReader};
use crate::read::{Batch, BatchBuilder, BatchOp, BatchReader, BoxedBatchReader};
use crate::schema::{ProjectedSchema, ProjectedSchemaRef};
/// Batch data source.
@@ -98,7 +98,7 @@ struct RowCursor<'a> {
impl<'a> RowCursor<'a> {
#[inline]
fn compare(&self, schema: &ProjectedSchema, other: &RowCursor) -> Ordering {
schema.compare_row_of_batch(self.batch, self.pos, other.batch, other.pos)
schema.compare_row(self.batch, self.pos, other.batch, other.pos)
}
}

View File

@@ -122,7 +122,6 @@ async fn test_flush_and_stall() {
let store_dir = dir.path().to_str().unwrap();
let flush_switch = Arc::new(FlushSwitch::default());
// Always trigger flush before write.
let tester = FlushTester::new(store_dir, flush_switch.clone()).await;
let data = [(1000, Some(100))];
@@ -182,7 +181,6 @@ async fn test_read_after_flush() {
let store_dir = dir.path().to_str().unwrap();
let flush_switch = Arc::new(FlushSwitch::default());
// Always trigger flush before write.
let tester = FlushTester::new(store_dir, flush_switch.clone()).await;
// Put elements so we have content to flush.
@@ -209,3 +207,48 @@ async fn test_read_after_flush() {
let output = tester.full_scan().await;
assert_eq!(expect, output);
}
#[tokio::test]
async fn test_merge_read_after_flush() {
let dir = TempDir::new("merge-read-flush").unwrap();
let store_dir = dir.path().to_str().unwrap();
let flush_switch = Arc::new(FlushSwitch::default());
let tester = FlushTester::new(store_dir, flush_switch.clone()).await;
// Put elements so we have content to flush (In SST1).
tester.put(&[(3000, Some(300))]).await;
tester.put(&[(2000, Some(200))]).await;
// Now set should flush to true to trigger flush.
flush_switch.set_should_flush(true);
// Put element to trigger flush (In SST2).
tester.put(&[(2000, Some(201))]).await;
tester.wait_flush_done().await;
// Disable flush.
flush_switch.set_should_flush(false);
// In SST2.
tester.put(&[(2000, Some(202))]).await;
tester.put(&[(1000, Some(100))]).await;
// Enable flush.
flush_switch.set_should_flush(true);
// Trigger flush and overwrite row (In memtable).
tester.put(&[(2000, Some(203))]).await;
tester.wait_flush_done().await;
let expect = vec![(1000, Some(100)), (2000, Some(203)), (3000, Some(300))];
let output = tester.full_scan().await;
assert_eq!(expect, output);
// Reopen
let mut tester = tester;
tester.reopen().await;
// Scan after reopen.
let output = tester.full_scan().await;
assert_eq!(expect, output);
}

View File

@@ -4,16 +4,18 @@ use std::sync::Arc;
use common_error::prelude::*;
use datatypes::arrow::array::Array;
use datatypes::arrow::bitmap::MutableBitmap;
use datatypes::arrow::chunk::Chunk as ArrowChunk;
use datatypes::arrow::datatypes::Schema as ArrowSchema;
use datatypes::schema::Metadata;
use datatypes::vectors::{Helper, VectorRef};
use datatypes::vectors::{BooleanVector, Helper, VectorRef};
use serde::{Deserialize, Serialize};
use snafu::ensure;
use store_api::storage::{consts, Chunk, ColumnId, ColumnSchema, Schema, SchemaBuilder, SchemaRef};
use crate::error;
use crate::metadata::{ColumnMetadata, ColumnsMetadata, ColumnsMetadataRef};
use crate::read::Batch;
use crate::read::{Batch, BatchOp};
const ROW_KEY_END_KEY: &str = "greptime:storage:row_key_end";
const USER_COLUMN_END_KEY: &str = "greptime:storage:user_column_end";
@@ -231,20 +233,6 @@ impl StoreSchema {
Ok(Batch::new(columns))
}
fn compare_row_of_batch(&self, left: &Batch, i: usize, right: &Batch, j: usize) -> Ordering {
let indices = self.full_key_indices();
for idx in indices {
let (left_col, right_col) = (left.column(idx), right.column(idx));
// Comparision of vector is done by virtual method calls currently. Consider using
// enum dispatch if this becomes bottleneck.
let order = left_col.get_ref(i).cmp(&right_col.get_ref(j));
if order != Ordering::Equal {
return order;
}
}
Ordering::Equal
}
fn from_columns_metadata(columns: &ColumnsMetadata, version: u32) -> Result<StoreSchema> {
let column_schemas: Vec<_> = columns
.iter_all_columns()
@@ -315,13 +303,6 @@ impl StoreSchema {
fn num_columns(&self) -> usize {
self.schema.num_columns()
}
fn full_key_indices(&self) -> impl Iterator<Item = usize> {
// row key, sequence, op_type
(0..self.row_key_end)
.chain(std::iter::once(self.sequence_index()))
.chain(std::iter::once(self.op_type_index()))
}
}
impl TryFrom<ArrowSchema> for StoreSchema {
@@ -552,25 +533,6 @@ impl ProjectedSchema {
Batch::new(columns)
}
/// Compare `i-th` in `left` to `j-th` row in `right` by key (row key + internal columns).
///
/// The caller should ensure `left` and `right` have same schema as `self.schema_to_read()`.
///
/// # Panics
/// Panics if
/// - `i` or `j` is out of bound.
/// - `left` or `right` has insufficient column num.
#[inline]
pub fn compare_row_of_batch(
&self,
left: &Batch,
i: usize,
right: &Batch,
j: usize,
) -> Ordering {
self.schema_to_read.compare_row_of_batch(left, i, right, j)
}
fn build_schema_to_read(
region_schema: &RegionSchema,
projection: &Projection,
@@ -652,6 +614,65 @@ impl ProjectedSchema {
}
}
impl BatchOp for ProjectedSchema {
fn compare_row(&self, left: &Batch, i: usize, right: &Batch, j: usize) -> Ordering {
// Ordered by (row_key asc, sequence desc, op_type desc).
let indices = self.schema_to_read.row_key_indices();
for idx in indices {
let (left_col, right_col) = (left.column(idx), right.column(idx));
// Comparision of vector is done by virtual method calls currently. Consider using
// enum dispatch if this becomes bottleneck.
let order = left_col.get_ref(i).cmp(&right_col.get_ref(j));
if order != Ordering::Equal {
return order;
}
}
let (sequence_index, op_type_index) = (
self.schema_to_read.sequence_index(),
self.schema_to_read.op_type_index(),
);
right
.column(sequence_index)
.get_ref(j)
.cmp(&left.column(sequence_index).get_ref(i))
.then_with(|| {
right
.column(op_type_index)
.get_ref(j)
.cmp(&left.column(op_type_index).get_ref(i))
})
}
fn dedup(&self, batch: &Batch, selected: &mut MutableBitmap, prev: Option<&Batch>) {
if let Some(prev) = prev {
assert_eq!(batch.num_columns(), prev.num_columns());
}
let indices = self.schema_to_read.row_key_indices();
for idx in indices {
let (current, prev_col) = (
batch.column(idx),
prev.map(|prev| prev.column(idx).as_ref()),
);
current.dedup(selected, prev_col);
}
}
fn filter(&self, batch: &Batch, filter: &BooleanVector) -> error::Result<Batch> {
let columns = batch
.columns()
.iter()
.enumerate()
.map(|(i, v)| {
v.filter(filter).context(error::FilterColumnSnafu {
name: self.schema_to_read.column_name(i),
})
})
.collect::<error::Result<Vec<_>>>()?;
Ok(Batch::new(columns))
}
}
fn parse_index_from_metadata(metadata: &Metadata, key: &str) -> Result<usize> {
let value = metadata.get(key).context(MissingMetaSnafu { key })?;
value.parse().context(ParseIndexSnafu { value })
@@ -673,12 +694,14 @@ fn build_user_schema(columns: &ColumnsMetadata, version: u32) -> Result<Schema>
#[cfg(test)]
mod tests {
use datatypes::prelude::ScalarVector;
use datatypes::type_id::LogicalTypeId;
use datatypes::vectors::{Int64Vector, UInt64Vector, UInt8Vector};
use datatypes::vectors::{Int64Vector, TimestampVector, UInt64Vector, UInt8Vector};
use store_api::storage::OpType;
use super::*;
use crate::metadata::RegionMetadata;
use crate::test_util::{descriptor_util, schema_util};
use crate::test_util::{descriptor_util, read_util, schema_util};
fn new_batch() -> Batch {
let k0 = Int64Vector::from_slice(&[1, 2, 3]);
@@ -934,4 +957,48 @@ mod tests {
.unwrap();
assert!(matches!(err, Error::InvalidProjection { .. }));
}
#[test]
fn test_compare_batch() {
let schema = read_util::new_projected_schema();
let left = read_util::new_full_kv_batch(&[(1000, 1, 1000, OpType::Put)]);
let right = read_util::new_full_kv_batch(&[
(999, 1, 1000, OpType::Put),
(1000, 1, 999, OpType::Put),
(1000, 1, 1000, OpType::Put),
]);
assert_eq!(Ordering::Greater, schema.compare_row(&left, 0, &right, 0));
assert_eq!(Ordering::Less, schema.compare_row(&left, 0, &right, 1));
assert_eq!(Ordering::Equal, schema.compare_row(&left, 0, &right, 2));
}
#[test]
fn test_dedup_batch() {
let schema = read_util::new_projected_schema();
let batch = read_util::new_kv_batch(&[(1000, Some(1)), (2000, Some(2)), (2000, Some(2))]);
let mut selected = MutableBitmap::from_len_zeroed(3);
schema.dedup(&batch, &mut selected, None);
assert!(selected.get(0));
assert!(selected.get(1));
assert!(!selected.get(2));
let prev = read_util::new_kv_batch(&[(1000, Some(1))]);
schema.dedup(&batch, &mut selected, Some(&prev));
assert!(!selected.get(0));
assert!(selected.get(1));
assert!(!selected.get(2));
}
#[test]
fn test_filter_batch() {
let schema = read_util::new_projected_schema();
let batch = read_util::new_kv_batch(&[(1000, Some(1)), (2000, Some(2)), (3000, Some(3))]);
let filter = BooleanVector::from_slice(&[true, false, true]);
let res = schema.filter(&batch, &filter).unwrap();
let expect: VectorRef = Arc::new(TimestampVector::from_values([1000, 3000]));
assert_eq!(expect, *res.column(0));
}
}

View File

@@ -4,6 +4,7 @@ use async_trait::async_trait;
use datatypes::prelude::ScalarVector;
use datatypes::type_id::LogicalTypeId;
use datatypes::vectors::{Int64Vector, TimestampVector, UInt64Vector, UInt8Vector};
use store_api::storage::OpType;
use crate::error::Result;
use crate::memtable::{BatchIterator, BoxedBatchIterator, RowOrdering};
@@ -29,7 +30,7 @@ pub fn new_projected_schema() -> ProjectedSchemaRef {
}
/// Build a new batch, with 0 sequence and op_type.
fn new_kv_batch(key_values: &[(i64, Option<i64>)]) -> Batch {
pub fn new_kv_batch(key_values: &[(i64, Option<i64>)]) -> Batch {
let key = Arc::new(TimestampVector::from_values(key_values.iter().map(|v| v.0)));
let value = Arc::new(Int64Vector::from_iter(key_values.iter().map(|v| v.1)));
let sequences = Arc::new(UInt64Vector::from_vec(vec![0; key_values.len()]));
@@ -38,6 +39,18 @@ fn new_kv_batch(key_values: &[(i64, Option<i64>)]) -> Batch {
Batch::new(vec![key, value, sequences, op_types])
}
/// Build a new batch from (key, value, sequence, op_type)
pub fn new_full_kv_batch(all_values: &[(i64, i64, u64, OpType)]) -> Batch {
let key = Arc::new(TimestampVector::from_values(all_values.iter().map(|v| v.0)));
let value = Arc::new(Int64Vector::from_values(all_values.iter().map(|v| v.1)));
let sequences = Arc::new(UInt64Vector::from_values(all_values.iter().map(|v| v.2)));
let op_types = Arc::new(UInt8Vector::from_values(
all_values.iter().map(|v| v.3.as_u8()),
));
Batch::new(vec![key, value, sequences, op_types])
}
fn check_kv_batch(batches: &[Batch], expect: &[&[(i64, Option<i64>)]]) {
for (batch, key_values) in batches.iter().zip(expect.iter()) {
let key = batch
@@ -145,6 +158,15 @@ pub fn build_vec_reader(batches: &[&[(i64, Option<i64>)]]) -> VecBatchReader {
VecBatchReader::new(batches)
}
pub fn build_full_vec_reader(batches: &[&[(i64, i64, u64, OpType)]]) -> VecBatchReader {
let batches: Vec<_> = batches
.iter()
.map(|key_values| new_full_kv_batch(key_values))
.collect();
VecBatchReader::new(batches)
}
pub fn build_boxed_reader(batches: &[&[(i64, Option<i64>)]]) -> BoxedBatchReader {
Box::new(build_vec_reader(batches))
}