feat: add put_only field to skip filtering deletion (#2801)

* feat: add put_only field to skip filtering deletion

* docs: fix typo
This commit is contained in:
Yingwen
2023-11-24 14:33:17 +08:00
committed by GitHub
parent 85eebcb16f
commit 00e4bd45f0
2 changed files with 68 additions and 2 deletions

View File

@@ -288,7 +288,7 @@ impl<T: LogicalPrimitiveType> PrimitiveVector<T> {
}
// To distinguish with `Vector::slice()`.
/// Slice the batch, returning a new batch.
/// Slice the vector, returning a new vector.
///
/// # Panics
/// This function panics if `offset + length > self.len()`.

View File

@@ -66,6 +66,8 @@ pub struct Batch {
///
/// UInt8 type, not null.
op_types: Arc<UInt8Vector>,
/// True if op types only contains put operations.
put_only: bool,
/// Fields organized in columnar format.
fields: Vec<BatchColumn>,
}
@@ -195,6 +197,7 @@ impl Batch {
sequences: Arc::new(self.sequences.get_slice(offset, length)),
op_types: Arc::new(self.op_types.get_slice(offset, length)),
fields,
put_only: self.put_only,
}
}
@@ -261,6 +264,11 @@ impl Batch {
/// Removes rows whose op type is delete.
pub fn filter_deleted(&mut self) -> Result<()> {
if self.put_only {
// If there is only put operation, we can skip comparison and filtering.
return Ok(());
}
// Safety: op type column is not null.
let array = self.op_types.as_arrow();
// Find rows with non-delete op type.
@@ -291,6 +299,10 @@ impl Batch {
)
.unwrap(),
);
// Also updates put_only field if it contains other ops.
if !self.put_only {
self.put_only = is_put_only(&self.op_types);
}
for batch_column in &mut self.fields {
batch_column.data = batch_column
.data
@@ -411,6 +423,10 @@ impl Batch {
let array = arrow::compute::take(self.op_types.as_arrow(), indices.as_arrow(), None)
.context(ComputeArrowSnafu)?;
self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
// Also updates put_only field if it contains other ops.
if !self.put_only {
self.put_only = is_put_only(&self.op_types);
}
for batch_column in &mut self.fields {
batch_column.data = batch_column
.data
@@ -444,6 +460,16 @@ impl Batch {
}
}
/// Returns whether the op types vector only contains put operation.
fn is_put_only(op_types: &UInt8Vector) -> bool {
// Safety: Op types is not null.
op_types
.as_arrow()
.values()
.iter()
.all(|v| *v == OpType::Put as u8)
}
/// Len of timestamp in arrow row format.
const TIMESTAMP_KEY_LEN: usize = 9;
@@ -619,12 +645,17 @@ impl BatchBuilder {
);
}
// Checks whether op types are put only. In the future, we may get this from statistics
// in memtables and SSTs.
let put_only = is_put_only(&op_types);
Ok(Batch {
primary_key: self.primary_key,
timestamps,
sequences,
op_types,
fields: self.fields,
put_only,
})
}
}
@@ -873,13 +904,26 @@ mod tests {
&[OpType::Delete, OpType::Put, OpType::Delete, OpType::Put],
&[21, 22, 23, 24],
);
assert!(!batch.put_only);
batch.filter_deleted().unwrap();
let expect = new_batch(&[2, 4], &[12, 14], &[OpType::Put, OpType::Put], &[22, 24]);
assert_eq!(expect, batch);
let mut batch = new_batch(
&[1, 2, 3, 4],
&[11, 12, 13, 14],
&[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
&[21, 22, 23, 24],
);
assert!(batch.put_only);
let expect = batch.clone();
batch.filter_deleted().unwrap();
assert_eq!(expect, batch);
}
#[test]
fn test_filter() {
// Filters put only.
let mut batch = new_batch(
&[1, 2, 3, 4],
&[11, 12, 13, 14],
@@ -891,7 +935,19 @@ mod tests {
let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
assert_eq!(expect, batch);
// filter to empty.
// Filters deletion.
let mut batch = new_batch(
&[1, 2, 3, 4],
&[11, 12, 13, 14],
&[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
&[21, 22, 23, 24],
);
let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
batch.filter(&predicate).unwrap();
let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
assert_eq!(expect, batch);
// Filters to empty.
let predicate = BooleanVector::from_vec(vec![false, false]);
batch.filter(&predicate).unwrap();
assert!(batch.is_empty());
@@ -927,5 +983,15 @@ mod tests {
&[23, 26, 22, 24, 25],
);
assert_eq!(expect, batch);
let mut batch = new_batch(
&[2, 2, 1],
&[1, 6, 1],
&[OpType::Delete, OpType::Put, OpType::Put],
&[21, 22, 23],
);
batch.sort_and_dedup().unwrap();
let expect = new_batch(&[1, 2], &[1, 6], &[OpType::Put, OpType::Put], &[23, 22]);
assert_eq!(expect, batch);
}
}