feat: Dedup strategy that keeps the last not null field (#4184)

* feat: dedup strategy: last not null

* fix: fix tests

* fix: fix single batch

* chore: warning

* chore: skip has_null check

* refactor: rename fields

* fix: merge last fields may not reset builder

* chore: clear before filter deleted

* chore: remove debug logs

* chore: Update comment

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

---------

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Yingwen
2024-06-25 14:38:48 +08:00
committed by GitHub
parent 4a4237115a
commit 9aaf7d79bf
2 changed files with 661 additions and 8 deletions

View File

@@ -14,13 +14,18 @@
//! Utilities to remove duplicate rows from a sorted batch.
use api::v1::OpType;
use async_trait::async_trait;
use common_telemetry::debug;
use common_time::Timestamp;
use datatypes::data_type::DataType;
use datatypes::prelude::ScalarVector;
use datatypes::value::Value;
use datatypes::vectors::MutableVector;
use crate::error::Result;
use crate::metrics::MERGE_FILTER_ROWS_TOTAL;
use crate::read::{Batch, BatchReader};
use crate::read::{Batch, BatchColumn, BatchReader};
/// A reader that dedup sorted batches from a source based on the
/// dedup strategy.
@@ -181,12 +186,7 @@ impl DedupStrategy for LastRow {
// Filters deleted rows.
if self.filter_deleted {
let num_rows = batch.num_rows();
batch.filter_deleted()?;
let num_rows_after_filter = batch.num_rows();
let num_deleted = num_rows - num_rows_after_filter;
metrics.num_deleted_rows += num_deleted;
metrics.num_unselected_rows += num_deleted;
filter_deleted_from_batch(&mut batch, metrics)?;
}
// The batch can become empty if all rows are deleted.
@@ -202,6 +202,18 @@ impl DedupStrategy for LastRow {
}
}
/// Removes deleted rows from the batch and updates metrics.
fn filter_deleted_from_batch(batch: &mut Batch, metrics: &mut DedupMetrics) -> Result<()> {
let num_rows = batch.num_rows();
batch.filter_deleted()?;
let num_rows_after_filter = batch.num_rows();
let num_deleted = num_rows - num_rows_after_filter;
metrics.num_deleted_rows += num_deleted;
metrics.num_unselected_rows += num_deleted;
Ok(())
}
/// Metrics for deduplication.
#[derive(Debug, Default)]
pub(crate) struct DedupMetrics {
@@ -211,11 +223,260 @@ pub(crate) struct DedupMetrics {
pub(crate) num_deleted_rows: usize,
}
/// Buffer to store fields in the last row to merge.
struct LastFieldsBuilder {
/// Filter deleted rows.
filter_deleted: bool,
/// Fields builders, lazy initialized.
builders: Vec<Box<dyn MutableVector>>,
/// Last fields to merge, lazy initialized.
/// Only initializes this field when `skip_merge()` is false.
last_fields: Vec<Value>,
/// Whether the last row (including `last_fields`) has null field.
/// Only sets this field when `contains_deletion` is false.
contains_null: bool,
/// Whether the last row has delete op. If true, skips merging fields.
contains_deletion: bool,
/// Whether the builder is initialized.
initialized: bool,
}
impl LastFieldsBuilder {
/// Returns a new builder with the given `filter_deleted` flag.
fn new(filter_deleted: bool) -> Self {
Self {
filter_deleted,
builders: Vec::new(),
last_fields: Vec::new(),
contains_null: false,
contains_deletion: false,
initialized: false,
}
}
/// Initializes the builders with the last row of the batch.
fn maybe_init(&mut self, batch: &Batch) {
debug_assert!(!batch.is_empty());
if self.initialized || batch.fields().is_empty() {
// Already initialized or no fields to merge.
return;
}
self.initialized = true;
let last_idx = batch.num_rows() - 1;
let fields = batch.fields();
// Safety: The last_idx is valid.
self.contains_deletion =
batch.op_types().get_data(last_idx).unwrap() == OpType::Delete as u8;
// If the row has been deleted, then we don't need to merge fields.
if !self.contains_deletion {
self.contains_null = fields.iter().any(|col| col.data.is_null(last_idx));
}
if self.skip_merge() {
// No null field or the row has been deleted, no need to merge.
return;
}
if self.builders.is_empty() {
self.builders = fields
.iter()
.map(|col| col.data.data_type().create_mutable_vector(1))
.collect();
}
self.last_fields = fields.iter().map(|col| col.data.get(last_idx)).collect();
}
/// Returns true if the builder don't need to merge the rows.
fn skip_merge(&self) -> bool {
debug_assert!(self.initialized);
// No null field or the row has been deleted, no need to merge.
self.contains_deletion || !self.contains_null
}
/// Pushes first row of a batch to the builder.
fn push_first_row(&mut self, batch: &Batch) {
debug_assert!(self.initialized);
debug_assert!(!batch.is_empty());
if self.skip_merge() {
// No remaining null field, skips this batch.
return;
}
let fields = batch.fields();
for (idx, value) in self.last_fields.iter_mut().enumerate() {
if value.is_null() && !fields[idx].data.is_null(0) {
// Updates the value.
*value = fields[idx].data.get(0);
}
}
// Updates the flag.
self.contains_null = self.last_fields.iter().any(Value::is_null);
}
/// Merges last not null fields, builds a new batch and resets the builder.
/// It may overwrites the last row of the `buffer`.
fn merge_last_not_null(
&mut self,
buffer: Batch,
metrics: &mut DedupMetrics,
) -> Result<Option<Batch>> {
debug_assert!(self.initialized);
let mut output = if self.last_fields.is_empty() {
// No need to overwrite the last row.
buffer
} else {
// Builds last fields.
for (builder, value) in self.builders.iter_mut().zip(&self.last_fields) {
// Safety: Vectors of the batch has the same type.
builder.push_value_ref(value.as_value_ref());
}
let fields = self
.builders
.iter_mut()
.zip(buffer.fields())
.map(|(builder, col)| BatchColumn {
column_id: col.column_id,
data: builder.to_vector(),
})
.collect();
if buffer.num_rows() == 1 {
// Replaces the buffer directly if it only has one row.
buffer.with_fields(fields)?
} else {
// Replaces the last row of the buffer.
let front = buffer.slice(0, buffer.num_rows() - 1);
let last = buffer.slice(buffer.num_rows() - 1, 1);
let last = last.with_fields(fields)?;
Batch::concat(vec![front, last])?
}
};
// Resets itself. `self.builders` is already reset in `to_vector()`.
self.clear();
if self.filter_deleted {
filter_deleted_from_batch(&mut output, metrics)?;
}
if output.is_empty() {
Ok(None)
} else {
Ok(Some(output))
}
}
/// Clears the builder.
fn clear(&mut self) {
self.last_fields.clear();
self.contains_null = false;
self.contains_deletion = false;
self.initialized = false;
}
}
/// Dedup strategy that keeps the last not null field for the same key.
///
/// It assumes that batches from files and memtables don't contain duplicate rows
/// and the merge reader never concatenates batches from different source.
///
/// We might implement a new strategy if we need to process files with duplicate rows.
pub(crate) struct LastNotNull {
/// Buffered batch that fields in the last row may be updated.
buffer: Option<Batch>,
/// Fields that overlaps with the last row of the `buffer`.
last_fields: LastFieldsBuilder,
}
impl LastNotNull {
/// Creates a new strategy with the given `filter_deleted` flag.
#[allow(dead_code)]
pub(crate) fn new(filter_deleted: bool) -> Self {
Self {
buffer: None,
last_fields: LastFieldsBuilder::new(filter_deleted),
}
}
}
impl DedupStrategy for LastNotNull {
fn push_batch(&mut self, batch: Batch, metrics: &mut DedupMetrics) -> Result<Option<Batch>> {
if batch.is_empty() {
return Ok(None);
}
let Some(buffer) = self.buffer.as_mut() else {
// The buffer is empty, store the batch and return. We need to observe the next batch.
self.buffer = Some(batch);
return Ok(None);
};
// Initializes last fields with the first buffer.
self.last_fields.maybe_init(buffer);
if buffer.primary_key() != batch.primary_key() {
// Next key is different.
let buffer = std::mem::replace(buffer, batch);
let merged = self.last_fields.merge_last_not_null(buffer, metrics)?;
return Ok(merged);
}
if buffer.last_timestamp() != batch.first_timestamp() {
// The next batch has a different timestamp.
let buffer = std::mem::replace(buffer, batch);
let merged = self.last_fields.merge_last_not_null(buffer, metrics)?;
return Ok(merged);
}
// The next batch has the same key and timestamp.
metrics.num_unselected_rows += 1;
// We assumes each batch doesn't contain duplicate rows so we only need to check the first row.
if batch.num_rows() == 1 {
self.last_fields.push_first_row(&batch);
return Ok(None);
}
// The next batch has the same key and timestamp but contains multiple rows.
// We can merge the first row and buffer the remaining rows.
let first = batch.slice(0, 1);
self.last_fields.push_first_row(&first);
// Moves the remaining rows to the buffer.
let batch = batch.slice(1, batch.num_rows() - 1);
let buffer = std::mem::replace(buffer, batch);
let merged = self.last_fields.merge_last_not_null(buffer, metrics)?;
Ok(merged)
}
fn finish(&mut self, metrics: &mut DedupMetrics) -> Result<Option<Batch>> {
let Some(buffer) = self.buffer.take() else {
return Ok(None);
};
// Initializes last fields with the first buffer.
self.last_fields.maybe_init(&buffer);
let merged = self.last_fields.merge_last_not_null(buffer, metrics)?;
Ok(merged)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::OpType;
use datatypes::arrow::array::{TimestampMillisecondArray, UInt64Array, UInt8Array};
use super::*;
use crate::read::BatchBuilder;
use crate::test_util::{check_reader_result, new_batch, VecBatchReader};
#[tokio::test]
@@ -237,11 +498,20 @@ mod tests {
&[31, 32],
),
];
// Test last row.
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastRow::new(true));
check_reader_result(&mut reader, &input).await;
assert_eq!(0, reader.metrics().num_unselected_rows);
assert_eq!(0, reader.metrics().num_deleted_rows);
// Test last not null.
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNotNull::new(true));
check_reader_result(&mut reader, &input).await;
assert_eq!(0, reader.metrics().num_unselected_rows);
assert_eq!(0, reader.metrics().num_deleted_rows);
}
#[tokio::test]
@@ -277,8 +547,8 @@ mod tests {
// is filtered out by the previous batch.
new_batch(b"k3", &[2], &[19], &[OpType::Delete], &[0]),
];
let reader = VecBatchReader::new(&input);
// Filter deleted.
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastRow::new(true));
check_reader_result(
&mut reader,
@@ -333,4 +603,383 @@ mod tests {
assert_eq!(3, reader.metrics().num_unselected_rows);
assert_eq!(0, reader.metrics().num_deleted_rows);
}
/// Returns a new [Batch] whose field has column id 1, 2.
fn new_batch_multi_fields(
primary_key: &[u8],
timestamps: &[i64],
sequences: &[u64],
op_types: &[OpType],
fields: &[(Option<u64>, Option<u64>)],
) -> Batch {
let mut builder = BatchBuilder::new(primary_key.to_vec());
builder
.timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
timestamps.iter().copied(),
)))
.unwrap()
.sequences_array(Arc::new(UInt64Array::from_iter_values(
sequences.iter().copied(),
)))
.unwrap()
.op_types_array(Arc::new(UInt8Array::from_iter_values(
op_types.iter().map(|v| *v as u8),
)))
.unwrap()
.push_field_array(
1,
Arc::new(UInt64Array::from_iter(fields.iter().map(|field| field.0))),
)
.unwrap()
.push_field_array(
2,
Arc::new(UInt64Array::from_iter(fields.iter().map(|field| field.1))),
)
.unwrap();
builder.build().unwrap()
}
#[tokio::test]
async fn test_last_not_null_merge() {
let input = [
new_batch_multi_fields(
b"k1",
&[1, 2],
&[13, 11],
&[OpType::Put, OpType::Put],
&[(Some(11), Some(11)), (None, None)],
),
// empty batch.
new_batch_multi_fields(b"k1", &[], &[], &[], &[]),
// Duplicate with the previous batch.
new_batch_multi_fields(b"k1", &[2], &[10], &[OpType::Put], &[(Some(12), None)]),
new_batch_multi_fields(
b"k1",
&[2, 3, 4],
&[10, 13, 13],
&[OpType::Put, OpType::Put, OpType::Delete],
&[(Some(2), Some(22)), (Some(13), None), (None, Some(14))],
),
new_batch_multi_fields(
b"k2",
&[1, 2],
&[20, 20],
&[OpType::Put, OpType::Delete],
&[(Some(101), Some(101)), (None, None)],
),
new_batch_multi_fields(
b"k2",
&[2],
&[19],
&[OpType::Put],
&[(Some(102), Some(102))],
),
new_batch_multi_fields(
b"k3",
&[2],
&[20],
&[OpType::Put],
&[(Some(202), Some(202))],
),
// This batch won't increase the deleted rows count as it
// is filtered out by the previous batch. (All fields are null).
new_batch_multi_fields(b"k3", &[2], &[19], &[OpType::Delete], &[(None, None)]),
];
// Filter deleted.
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNotNull::new(true));
check_reader_result(
&mut reader,
&[
new_batch_multi_fields(
b"k1",
&[1, 2],
&[13, 11],
&[OpType::Put, OpType::Put],
&[(Some(11), Some(11)), (Some(12), Some(22))],
),
new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(13), None)]),
new_batch_multi_fields(
b"k2",
&[1],
&[20],
&[OpType::Put],
&[(Some(101), Some(101))],
),
new_batch_multi_fields(
b"k3",
&[2],
&[20],
&[OpType::Put],
&[(Some(202), Some(202))],
),
],
)
.await;
assert_eq!(6, reader.metrics().num_unselected_rows);
assert_eq!(2, reader.metrics().num_deleted_rows);
// Does not filter deleted.
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNotNull::new(false));
check_reader_result(
&mut reader,
&[
new_batch_multi_fields(
b"k1",
&[1, 2],
&[13, 11],
&[OpType::Put, OpType::Put],
&[(Some(11), Some(11)), (Some(12), Some(22))],
),
new_batch_multi_fields(
b"k1",
&[3, 4],
&[13, 13],
&[OpType::Put, OpType::Delete],
&[(Some(13), None), (None, Some(14))],
),
new_batch_multi_fields(
b"k2",
&[1, 2],
&[20, 20],
&[OpType::Put, OpType::Delete],
&[(Some(101), Some(101)), (None, None)],
),
new_batch_multi_fields(
b"k3",
&[2],
&[20],
&[OpType::Put],
&[(Some(202), Some(202))],
),
],
)
.await;
assert_eq!(4, reader.metrics().num_unselected_rows);
assert_eq!(0, reader.metrics().num_deleted_rows);
}
#[tokio::test]
async fn test_last_not_null_skip_merge_single() {
let input = [new_batch_multi_fields(
b"k1",
&[1, 2, 3],
&[13, 11, 13],
&[OpType::Put, OpType::Delete, OpType::Put],
&[(Some(11), Some(11)), (None, None), (Some(13), Some(13))],
)];
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNotNull::new(true));
check_reader_result(
&mut reader,
&[new_batch_multi_fields(
b"k1",
&[1, 3],
&[13, 13],
&[OpType::Put, OpType::Put],
&[(Some(11), Some(11)), (Some(13), Some(13))],
)],
)
.await;
assert_eq!(1, reader.metrics().num_unselected_rows);
assert_eq!(1, reader.metrics().num_deleted_rows);
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNotNull::new(false));
check_reader_result(&mut reader, &input).await;
assert_eq!(0, reader.metrics().num_unselected_rows);
assert_eq!(0, reader.metrics().num_deleted_rows);
}
#[tokio::test]
async fn test_last_not_null_skip_merge_no_null() {
let input = [
new_batch_multi_fields(
b"k1",
&[1, 2],
&[13, 11],
&[OpType::Put, OpType::Put],
&[(Some(11), Some(11)), (Some(12), Some(12))],
),
new_batch_multi_fields(b"k1", &[2], &[10], &[OpType::Put], &[(None, Some(22))]),
new_batch_multi_fields(
b"k1",
&[2, 3],
&[9, 13],
&[OpType::Put, OpType::Put],
&[(Some(32), None), (Some(13), Some(13))],
),
];
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNotNull::new(true));
check_reader_result(
&mut reader,
&[
new_batch_multi_fields(
b"k1",
&[1, 2],
&[13, 11],
&[OpType::Put, OpType::Put],
&[(Some(11), Some(11)), (Some(12), Some(12))],
),
new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(13), Some(13))]),
],
)
.await;
assert_eq!(2, reader.metrics().num_unselected_rows);
assert_eq!(0, reader.metrics().num_deleted_rows);
}
#[tokio::test]
async fn test_last_not_null_merge_null() {
let input = [
new_batch_multi_fields(
b"k1",
&[1, 2],
&[13, 11],
&[OpType::Put, OpType::Put],
&[(Some(11), Some(11)), (None, None)],
),
new_batch_multi_fields(b"k1", &[2], &[10], &[OpType::Put], &[(None, Some(22))]),
new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(33), None)]),
];
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNotNull::new(true));
check_reader_result(
&mut reader,
&[
new_batch_multi_fields(
b"k1",
&[1, 2],
&[13, 11],
&[OpType::Put, OpType::Put],
&[(Some(11), Some(11)), (None, Some(22))],
),
new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(33), None)]),
],
)
.await;
assert_eq!(1, reader.metrics().num_unselected_rows);
assert_eq!(0, reader.metrics().num_deleted_rows);
}
fn check_dedup_strategy(input: &[Batch], strategy: &mut dyn DedupStrategy, expect: &[Batch]) {
let mut actual = Vec::new();
let mut metrics = DedupMetrics::default();
for batch in input {
if let Some(out) = strategy.push_batch(batch.clone(), &mut metrics).unwrap() {
actual.push(out);
}
}
if let Some(out) = strategy.finish(&mut metrics).unwrap() {
actual.push(out);
}
assert_eq!(expect, actual);
}
#[test]
fn test_last_not_null_strategy_delete_last() {
let input = [
new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
new_batch_multi_fields(
b"k1",
&[1, 2],
&[1, 7],
&[OpType::Put, OpType::Put],
&[(Some(1), None), (Some(22), Some(222))],
),
new_batch_multi_fields(b"k1", &[2], &[4], &[OpType::Put], &[(Some(12), None)]),
new_batch_multi_fields(
b"k2",
&[2, 3],
&[2, 5],
&[OpType::Put, OpType::Delete],
&[(None, None), (Some(13), None)],
),
new_batch_multi_fields(b"k2", &[3], &[3], &[OpType::Put], &[(None, Some(3))]),
];
let mut strategy = LastNotNull::new(true);
check_dedup_strategy(
&input,
&mut strategy,
&[
new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
new_batch_multi_fields(b"k1", &[2], &[7], &[OpType::Put], &[(Some(22), Some(222))]),
new_batch_multi_fields(b"k2", &[2], &[2], &[OpType::Put], &[(None, None)]),
],
);
}
#[test]
fn test_last_not_null_strategy_delete_one() {
let input = [
new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Delete], &[(None, None)]),
new_batch_multi_fields(b"k2", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
];
let mut strategy = LastNotNull::new(true);
check_dedup_strategy(
&input,
&mut strategy,
&[new_batch_multi_fields(
b"k2",
&[1],
&[6],
&[OpType::Put],
&[(Some(11), None)],
)],
);
}
#[test]
fn test_last_not_null_strategy_delete_all() {
let input = [
new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Delete], &[(None, None)]),
new_batch_multi_fields(b"k2", &[1], &[6], &[OpType::Delete], &[(Some(11), None)]),
];
let mut strategy = LastNotNull::new(true);
check_dedup_strategy(&input, &mut strategy, &[]);
}
#[test]
fn test_last_not_null_strategy_same_batch() {
let input = [
new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
new_batch_multi_fields(
b"k1",
&[1, 2],
&[1, 7],
&[OpType::Put, OpType::Put],
&[(Some(1), None), (Some(22), Some(222))],
),
new_batch_multi_fields(b"k1", &[2], &[4], &[OpType::Put], &[(Some(12), None)]),
new_batch_multi_fields(
b"k1",
&[2, 3],
&[2, 5],
&[OpType::Put, OpType::Put],
&[(None, None), (Some(13), None)],
),
new_batch_multi_fields(b"k1", &[3], &[3], &[OpType::Put], &[(None, Some(3))]),
];
let mut strategy = LastNotNull::new(true);
check_dedup_strategy(
&input,
&mut strategy,
&[
new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
new_batch_multi_fields(b"k1", &[2], &[7], &[OpType::Put], &[(Some(22), Some(222))]),
new_batch_multi_fields(b"k1", &[3], &[5], &[OpType::Put], &[(Some(13), Some(3))]),
],
);
}
}

View File

@@ -33,6 +33,10 @@ use crate::read::{Batch, BatchReader, BoxedBatchReader, Source};
/// 1. Batch is ordered by primary key, time index, sequence desc, op type desc (we can
/// ignore op type as sequence is already unique).
/// 2. Batches from sources **must** not be empty.
///
/// The reader won't concatenate batches. Each batch returned by the reader also doesn't
/// contain duplicate rows. But the last (primary key, timestamp) of a batch may be the same
/// as the first one in the next batch.
pub struct MergeReader {
/// Holds [Node]s whose key range of current batch **is** overlapped with the merge window.
/// Each node yields batches from a `source`.