From 00e4bd45f0527dad3698e03469e626f7f7f091a2 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Fri, 24 Nov 2023 14:33:17 +0800 Subject: [PATCH] feat: add put_only field to skip filtering deletion (#2801) * feat: add put_only field to skip filtering deletion * docs: fix typo --- src/datatypes/src/vectors/primitive.rs | 2 +- src/mito2/src/read.rs | 68 +++++++++++++++++++++++++- 2 files changed, 68 insertions(+), 2 deletions(-) diff --git a/src/datatypes/src/vectors/primitive.rs b/src/datatypes/src/vectors/primitive.rs index 027f09fbfe..d570b67c66 100644 --- a/src/datatypes/src/vectors/primitive.rs +++ b/src/datatypes/src/vectors/primitive.rs @@ -288,7 +288,7 @@ impl PrimitiveVector { } // To distinguish with `Vector::slice()`. - /// Slice the batch, returning a new batch. + /// Slice the vector, returning a new vector. /// /// # Panics /// This function panics if `offset + length > self.len()`. diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 7c3fda2790..1ef80b3bb1 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -66,6 +66,8 @@ pub struct Batch { /// /// UInt8 type, not null. op_types: Arc, + /// True if op types only contains put operations. + put_only: bool, /// Fields organized in columnar format. fields: Vec, } @@ -195,6 +197,7 @@ impl Batch { sequences: Arc::new(self.sequences.get_slice(offset, length)), op_types: Arc::new(self.op_types.get_slice(offset, length)), fields, + put_only: self.put_only, } } @@ -261,6 +264,11 @@ impl Batch { /// Removes rows whose op type is delete. pub fn filter_deleted(&mut self) -> Result<()> { + if self.put_only { + // If there is only put operation, we can skip comparison and filtering. + return Ok(()); + } + // Safety: op type column is not null. let array = self.op_types.as_arrow(); // Find rows with non-delete op type. @@ -291,6 +299,10 @@ impl Batch { ) .unwrap(), ); + // Also updates put_only field if it contains other ops. + if !self.put_only { + self.put_only = is_put_only(&self.op_types); + } for batch_column in &mut self.fields { batch_column.data = batch_column .data @@ -411,6 +423,10 @@ impl Batch { let array = arrow::compute::take(self.op_types.as_arrow(), indices.as_arrow(), None) .context(ComputeArrowSnafu)?; self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap()); + // Also updates put_only field if it contains other ops. + if !self.put_only { + self.put_only = is_put_only(&self.op_types); + } for batch_column in &mut self.fields { batch_column.data = batch_column .data @@ -444,6 +460,16 @@ impl Batch { } } +/// Returns whether the op types vector only contains put operation. +fn is_put_only(op_types: &UInt8Vector) -> bool { + // Safety: Op types is not null. + op_types + .as_arrow() + .values() + .iter() + .all(|v| *v == OpType::Put as u8) +} + /// Len of timestamp in arrow row format. const TIMESTAMP_KEY_LEN: usize = 9; @@ -619,12 +645,17 @@ impl BatchBuilder { ); } + // Checks whether op types are put only. In the future, we may get this from statistics + // in memtables and SSTs. + let put_only = is_put_only(&op_types); + Ok(Batch { primary_key: self.primary_key, timestamps, sequences, op_types, fields: self.fields, + put_only, }) } } @@ -873,13 +904,26 @@ mod tests { &[OpType::Delete, OpType::Put, OpType::Delete, OpType::Put], &[21, 22, 23, 24], ); + assert!(!batch.put_only); batch.filter_deleted().unwrap(); let expect = new_batch(&[2, 4], &[12, 14], &[OpType::Put, OpType::Put], &[22, 24]); assert_eq!(expect, batch); + + let mut batch = new_batch( + &[1, 2, 3, 4], + &[11, 12, 13, 14], + &[OpType::Put, OpType::Put, OpType::Put, OpType::Put], + &[21, 22, 23, 24], + ); + assert!(batch.put_only); + let expect = batch.clone(); + batch.filter_deleted().unwrap(); + assert_eq!(expect, batch); } #[test] fn test_filter() { + // Filters put only. let mut batch = new_batch( &[1, 2, 3, 4], &[11, 12, 13, 14], @@ -891,7 +935,19 @@ mod tests { let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]); assert_eq!(expect, batch); - // filter to empty. + // Filters deletion. + let mut batch = new_batch( + &[1, 2, 3, 4], + &[11, 12, 13, 14], + &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put], + &[21, 22, 23, 24], + ); + let predicate = BooleanVector::from_vec(vec![false, false, true, true]); + batch.filter(&predicate).unwrap(); + let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]); + assert_eq!(expect, batch); + + // Filters to empty. let predicate = BooleanVector::from_vec(vec![false, false]); batch.filter(&predicate).unwrap(); assert!(batch.is_empty()); @@ -927,5 +983,15 @@ mod tests { &[23, 26, 22, 24, 25], ); assert_eq!(expect, batch); + + let mut batch = new_batch( + &[2, 2, 1], + &[1, 6, 1], + &[OpType::Delete, OpType::Put, OpType::Put], + &[21, 22, 23], + ); + batch.sort_and_dedup().unwrap(); + let expect = new_batch(&[1, 2], &[1, 6], &[OpType::Put, OpType::Put], &[23, 22]); + assert_eq!(expect, batch); } }