refactor: Decouple dedup and merge (#4139)

* feat: remove dedup/filter deleted from merge reader

* feat: impl dedup reader

* feat: support filter deleted flag

* test: test dedup reader

* feat: remove put_only field

* chore: fix clippy

* feat: metrics

* test: test empty batch

* perf: optimize dedup strategy

Avoid iterating all timestamps.

* test: fix test

* feat: generic
This commit is contained in:
Yingwen
2024-06-17 12:09:50 +08:00
committed by GitHub
parent f4a5a44549
commit 558272de61
4 changed files with 430 additions and 227 deletions

View File

@@ -15,6 +15,7 @@
//! Common structs and utilities for reading data.
pub mod compat;
pub mod dedup;
pub mod merge;
pub mod projection;
pub(crate) mod scan_region;
@@ -74,8 +75,6 @@ pub struct Batch {
///
/// UInt8 type, not null.
op_types: Arc<UInt8Vector>,
/// True if op types only contains put operations.
put_only: bool,
/// Fields organized in columnar format.
fields: Vec<BatchColumn>,
}
@@ -225,7 +224,6 @@ impl Batch {
sequences: Arc::new(self.sequences.get_slice(offset, length)),
op_types: Arc::new(self.op_types.get_slice(offset, length)),
fields,
put_only: self.put_only,
}
}
@@ -292,11 +290,6 @@ impl Batch {
/// Removes rows whose op type is delete.
pub fn filter_deleted(&mut self) -> Result<()> {
if self.put_only {
// If there is only put operation, we can skip comparison and filtering.
return Ok(());
}
// Safety: op type column is not null.
let array = self.op_types.as_arrow();
// Find rows with non-delete op type.
@@ -327,10 +320,6 @@ impl Batch {
)
.unwrap(),
);
// Also updates put_only field if it contains other ops.
if !self.put_only {
self.put_only = is_put_only(&self.op_types);
}
for batch_column in &mut self.fields {
batch_column.data = batch_column
.data
@@ -454,10 +443,6 @@ impl Batch {
let array = arrow::compute::take(self.op_types.as_arrow(), indices.as_arrow(), None)
.context(ComputeArrowSnafu)?;
self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
// Also updates put_only field if it contains other ops.
if !self.put_only {
self.put_only = is_put_only(&self.op_types);
}
for batch_column in &mut self.fields {
batch_column.data = batch_column
.data
@@ -491,16 +476,6 @@ impl Batch {
}
}
/// Returns whether the op types vector only contains put operation.
fn is_put_only(op_types: &UInt8Vector) -> bool {
// Safety: Op types is not null.
op_types
.as_arrow()
.values()
.iter()
.all(|v| *v == OpType::Put as u8)
}
/// Len of timestamp in arrow row format.
const TIMESTAMP_KEY_LEN: usize = 9;
@@ -676,10 +651,6 @@ impl BatchBuilder {
);
}
// Checks whether op types are put only. In the future, we may get this from statistics
// in memtables and SSTs.
let put_only = is_put_only(&op_types);
Ok(Batch {
primary_key: self.primary_key,
pk_values: None,
@@ -687,7 +658,6 @@ impl BatchBuilder {
sequences,
op_types,
fields: self.fields,
put_only,
})
}
}
@@ -994,7 +964,6 @@ mod tests {
&[OpType::Delete, OpType::Put, OpType::Delete, OpType::Put],
&[21, 22, 23, 24],
);
assert!(!batch.put_only);
batch.filter_deleted().unwrap();
let expect = new_batch(&[2, 4], &[12, 14], &[OpType::Put, OpType::Put], &[22, 24]);
assert_eq!(expect, batch);
@@ -1005,7 +974,6 @@ mod tests {
&[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
&[21, 22, 23, 24],
);
assert!(batch.put_only);
let expect = batch.clone();
batch.filter_deleted().unwrap();
assert_eq!(expect, batch);

336
src/mito2/src/read/dedup.rs Normal file
View File

@@ -0,0 +1,336 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Utilities to remove duplicate rows from a sorted batch.
use async_trait::async_trait;
use common_telemetry::debug;
use common_time::Timestamp;
use crate::error::Result;
use crate::metrics::MERGE_FILTER_ROWS_TOTAL;
use crate::read::{Batch, BatchReader};
/// A reader that dedup sorted batches from a source based on the
/// dedup strategy.
pub(crate) struct DedupReader<R, S> {
source: R,
strategy: S,
metrics: DedupMetrics,
}
impl<R, S> DedupReader<R, S> {
/// Creates a new dedup reader.
pub(crate) fn new(source: R, strategy: S) -> Self {
Self {
source,
strategy,
metrics: DedupMetrics::default(),
}
}
}
impl<R: BatchReader, S: DedupStrategy> DedupReader<R, S> {
/// Returns the next deduplicated batch.
async fn fetch_next_batch(&mut self) -> Result<Option<Batch>> {
while let Some(batch) = self.source.next_batch().await? {
if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? {
return Ok(Some(batch));
}
}
self.strategy.finish(&mut self.metrics)
}
}
#[async_trait]
impl<R: BatchReader, S: DedupStrategy> BatchReader for DedupReader<R, S> {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
self.fetch_next_batch().await
}
}
impl<R, S> Drop for DedupReader<R, S> {
fn drop(&mut self) {
debug!("Dedup reader finished, metrics: {:?}", self.metrics);
MERGE_FILTER_ROWS_TOTAL
.with_label_values(&["dedup"])
.inc_by(self.metrics.num_unselected_rows as u64);
MERGE_FILTER_ROWS_TOTAL
.with_label_values(&["delete"])
.inc_by(self.metrics.num_unselected_rows as u64);
}
}
#[cfg(test)]
impl<R, S> DedupReader<R, S> {
fn metrics(&self) -> &DedupMetrics {
&self.metrics
}
}
/// Strategy to remove duplicate rows from sorted batches.
pub(crate) trait DedupStrategy: Send {
/// Pushes a batch to the dedup strategy.
/// Returns the deduplicated batch.
fn push_batch(&mut self, batch: Batch, metrics: &mut DedupMetrics) -> Result<Option<Batch>>;
/// Finishes the deduplication and resets the strategy.
///
/// Users must ensure that `push_batch` is called for all batches before
/// calling this method.
fn finish(&mut self, metrics: &mut DedupMetrics) -> Result<Option<Batch>>;
}
/// State of the last row in a batch for dedup.
struct BatchLastRow {
primary_key: Vec<u8>,
/// The last timestamp of the batch.
timestamp: Timestamp,
}
/// Dedup strategy that keeps the row with latest sequence of each key.
///
/// This strategy is optimized specially based on the properties of the SST files,
/// memtables and the merge reader. It assumes that batches from files and memtables
/// don't contain duplicate rows and the merge reader never concatenates batches from
/// different source.
///
/// We might implement a new strategy if we need to process files with duplicate rows.
pub(crate) struct LastRow {
/// Meta of the last row in the previous batch that has the same key
/// as the batch to push.
prev_batch: Option<BatchLastRow>,
/// Filter deleted rows.
filter_deleted: bool,
}
impl LastRow {
/// Creates a new strategy with the given `filter_deleted` flag.
pub(crate) fn new(filter_deleted: bool) -> Self {
Self {
prev_batch: None,
filter_deleted,
}
}
}
impl DedupStrategy for LastRow {
fn push_batch(
&mut self,
mut batch: Batch,
metrics: &mut DedupMetrics,
) -> Result<Option<Batch>> {
if batch.is_empty() {
return Ok(None);
}
debug_assert!(batch.first_timestamp().is_some());
let prev_timestamp = match &self.prev_batch {
Some(prev_batch) => {
if prev_batch.primary_key != batch.primary_key() {
// The key has changed. This is the first batch of the
// new key.
None
} else {
Some(prev_batch.timestamp)
}
}
None => None,
};
if batch.first_timestamp() == prev_timestamp {
metrics.num_unselected_rows += 1;
// This batch contains a duplicate row, skip it.
if batch.num_rows() == 1 {
// We don't need to update `prev_batch` because they have the same
// key and timestamp.
return Ok(None);
}
// Skips the first row.
batch = batch.slice(1, batch.num_rows() - 1);
}
// Store current batch to `prev_batch` so we could compare the next batch
// with this batch. We store batch before filtering it as rows with `OpType::Delete`
// would be removed from the batch after filter, then we may store an incorrect `last row`
// of previous batch.
match &mut self.prev_batch {
Some(prev) => {
// Reuse the primary key buffer.
prev.primary_key.clone_from(&batch.primary_key);
prev.timestamp = batch.last_timestamp().unwrap();
}
None => {
self.prev_batch = Some(BatchLastRow {
primary_key: batch.primary_key().to_vec(),
timestamp: batch.last_timestamp().unwrap(),
})
}
}
// Filters deleted rows.
if self.filter_deleted {
let num_rows = batch.num_rows();
batch.filter_deleted()?;
let num_rows_after_filter = batch.num_rows();
let num_deleted = num_rows - num_rows_after_filter;
metrics.num_deleted_rows += num_deleted;
metrics.num_unselected_rows += num_deleted;
}
// The batch can become empty if all rows are deleted.
if batch.is_empty() {
Ok(None)
} else {
Ok(Some(batch))
}
}
fn finish(&mut self, _metrics: &mut DedupMetrics) -> Result<Option<Batch>> {
Ok(None)
}
}
/// Metrics for deduplication.
#[derive(Debug, Default)]
pub(crate) struct DedupMetrics {
/// Number of rows removed during deduplication.
pub(crate) num_unselected_rows: usize,
/// Number of deleted rows.
pub(crate) num_deleted_rows: usize,
}
#[cfg(test)]
mod tests {
use api::v1::OpType;
use super::*;
use crate::test_util::{check_reader_result, new_batch, VecBatchReader};
#[tokio::test]
async fn test_dedup_reader_no_duplications() {
let input = [
new_batch(
b"k1",
&[1, 2],
&[11, 12],
&[OpType::Put, OpType::Put],
&[21, 22],
),
new_batch(b"k1", &[3], &[13], &[OpType::Put], &[23]),
new_batch(
b"k2",
&[1, 2],
&[111, 112],
&[OpType::Put, OpType::Put],
&[31, 32],
),
];
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastRow::new(true));
check_reader_result(&mut reader, &input).await;
assert_eq!(0, reader.metrics().num_unselected_rows);
assert_eq!(0, reader.metrics().num_deleted_rows);
}
#[tokio::test]
async fn test_dedup_reader_duplications() {
let input = [
new_batch(
b"k1",
&[1, 2],
&[13, 11],
&[OpType::Put, OpType::Put],
&[11, 12],
),
// empty batch.
new_batch(b"k1", &[], &[], &[], &[]),
// Duplicate with the previous batch.
new_batch(
b"k1",
&[2, 3, 4],
&[10, 13, 13],
&[OpType::Put, OpType::Put, OpType::Delete],
&[2, 13, 14],
),
new_batch(
b"k2",
&[1, 2],
&[20, 20],
&[OpType::Put, OpType::Delete],
&[101, 0],
),
new_batch(b"k2", &[2], &[19], &[OpType::Put], &[102]),
new_batch(b"k3", &[2], &[20], &[OpType::Put], &[202]),
// This batch won't increase the deleted rows count as it
// is filtered out by the previous batch.
new_batch(b"k3", &[2], &[19], &[OpType::Delete], &[0]),
];
let reader = VecBatchReader::new(&input);
// Filter deleted.
let mut reader = DedupReader::new(reader, LastRow::new(true));
check_reader_result(
&mut reader,
&[
new_batch(
b"k1",
&[1, 2],
&[13, 11],
&[OpType::Put, OpType::Put],
&[11, 12],
),
new_batch(b"k1", &[3], &[13], &[OpType::Put], &[13]),
new_batch(b"k2", &[1], &[20], &[OpType::Put], &[101]),
new_batch(b"k3", &[2], &[20], &[OpType::Put], &[202]),
],
)
.await;
assert_eq!(5, reader.metrics().num_unselected_rows);
assert_eq!(2, reader.metrics().num_deleted_rows);
// Does not filter deleted.
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastRow::new(false));
check_reader_result(
&mut reader,
&[
new_batch(
b"k1",
&[1, 2],
&[13, 11],
&[OpType::Put, OpType::Put],
&[11, 12],
),
new_batch(
b"k1",
&[3, 4],
&[13, 13],
&[OpType::Put, OpType::Delete],
&[13, 14],
),
new_batch(
b"k2",
&[1, 2],
&[20, 20],
&[OpType::Put, OpType::Delete],
&[101, 0],
),
new_batch(b"k3", &[2], &[20], &[OpType::Put], &[202]),
],
)
.await;
assert_eq!(3, reader.metrics().num_unselected_rows);
assert_eq!(0, reader.metrics().num_deleted_rows);
}
}

View File

@@ -21,11 +21,10 @@ use std::time::{Duration, Instant};
use async_trait::async_trait;
use common_telemetry::debug;
use common_time::Timestamp;
use crate::error::Result;
use crate::memtable::BoxedBatchIterator;
use crate::metrics::{MERGE_FILTER_ROWS_TOTAL, READ_STAGE_ELAPSED};
use crate::metrics::READ_STAGE_ELAPSED;
use crate::read::{Batch, BatchReader, BoxedBatchReader, Source};
/// Reader to merge sorted batches.
@@ -33,9 +32,7 @@ use crate::read::{Batch, BatchReader, BoxedBatchReader, Source};
/// The merge reader merges [Batch]es from multiple sources that yield sorted batches.
/// 1. Batch is ordered by primary key, time index, sequence desc, op type desc (we can
/// ignore op type as sequence is already unique).
/// 2. Batch doesn't have duplicate elements (elements with the same primary key and time index) if
/// dedup is true.
/// 3. Batches from sources **must** not be empty.
/// 2. Batches from sources **must** not be empty.
pub struct MergeReader {
/// Holds [Node]s whose key range of current batch **is** overlapped with the merge window.
/// Each node yields batches from a `source`.
@@ -49,10 +46,6 @@ pub struct MergeReader {
cold: BinaryHeap<Node>,
/// Batch to output.
output_batch: Option<Batch>,
/// Remove duplicate timestamps.
dedup: bool,
/// Remove deletion markers
filter_deleted: bool,
/// Local metrics.
metrics: Metrics,
}
@@ -89,12 +82,6 @@ impl Drop for MergeReader {
fn drop(&mut self) {
debug!("Merge reader finished, metrics: {:?}", self.metrics);
MERGE_FILTER_ROWS_TOTAL
.with_label_values(&["dedup"])
.inc_by(self.metrics.num_duplicate_rows as u64);
MERGE_FILTER_ROWS_TOTAL
.with_label_values(&["delete"])
.inc_by(self.metrics.num_deleted_rows as u64);
READ_STAGE_ELAPSED
.with_label_values(&["merge"])
.observe(self.metrics.scan_cost.as_secs_f64());
@@ -103,11 +90,7 @@ impl Drop for MergeReader {
impl MergeReader {
/// Creates and initializes a new [MergeReader].
pub async fn new(
sources: Vec<Source>,
dedup: bool,
filter_deleted: bool,
) -> Result<MergeReader> {
pub async fn new(sources: Vec<Source>) -> Result<MergeReader> {
let start = Instant::now();
let mut metrics = Metrics::default();
@@ -121,15 +104,10 @@ impl MergeReader {
}
}
// If dedup is false, we don't expect delete happens and we skip filtering deletion markers.
let filter_deleted = filter_deleted && dedup;
let mut reader = MergeReader {
hot,
cold,
output_batch: None,
dedup,
filter_deleted,
metrics,
};
// Initializes the reader.
@@ -164,18 +142,11 @@ impl MergeReader {
let mut hottest = self.hot.pop().unwrap();
let batch = hottest.fetch_batch(&mut self.metrics).await?;
Self::maybe_output_batch(
batch,
&mut self.output_batch,
self.filter_deleted,
&mut self.metrics,
)?;
Self::maybe_output_batch(batch, &mut self.output_batch)?;
self.reheap(hottest)
}
/// Fetches non-duplicated rows from the hottest node.
///
/// If `dedup` is true, it skips the timestamp duplicated with the first timestamp in the next node.
async fn fetch_rows_from_hottest(&mut self) -> Result<()> {
// Safety: `fetch_batches_to_output()` ensures the hot heap has more than 1 element.
// Pop hottest node.
@@ -200,36 +171,12 @@ impl MergeReader {
Ok(pos) => pos,
Err(pos) => {
// No duplicate timestamp. Outputs timestamp before `pos`.
Self::maybe_output_batch(
top.slice(0, pos),
&mut self.output_batch,
self.filter_deleted,
&mut self.metrics,
)?;
Self::maybe_output_batch(top.slice(0, pos), &mut self.output_batch)?;
top_node.skip_rows(pos, &mut self.metrics).await?;
return self.reheap(top_node);
}
};
if self.dedup {
// They have duplicate timestamps. Outputs timestamps before the duplicated timestamp.
// Batch itself doesn't contain duplicate timestamps so timestamps before `duplicate_pos`
// must be less than `next_min_ts`.
Self::maybe_output_batch(
top.slice(0, duplicate_pos),
&mut self.output_batch,
self.filter_deleted,
&mut self.metrics,
)?;
// This keep the duplicate timestamp in the node.
top_node.skip_rows(duplicate_pos, &mut self.metrics).await?;
// The merge window should contain this timestamp so only nodes in the hot heap
// have this timestamp.
return self
.filter_first_duplicate_timestamp_in_hot(top_node, next_min_ts)
.await;
}
// No need to remove duplicate timestamps.
let output_end = if duplicate_pos == 0 {
// If the first timestamp of the top node is duplicate. We can simply return the first row
@@ -240,69 +187,11 @@ impl MergeReader {
// the duplicate pos.
duplicate_pos
};
Self::maybe_output_batch(
top.slice(0, output_end),
&mut self.output_batch,
self.filter_deleted,
&mut self.metrics,
)?;
Self::maybe_output_batch(top.slice(0, output_end), &mut self.output_batch)?;
top_node.skip_rows(output_end, &mut self.metrics).await?;
self.reheap(top_node)
}
/// Filters the first duplicate `timestamp` in `top_node` and `hot` heap. Only keeps the timestamp
/// with the maximum sequence.
async fn filter_first_duplicate_timestamp_in_hot(
&mut self,
top_node: Node,
timestamp: Timestamp,
) -> Result<()> {
debug_assert_eq!(
top_node.current_batch().first_timestamp().unwrap(),
timestamp
);
// The node with maximum sequence.
let mut max_seq_node = top_node;
let mut max_seq = max_seq_node.current_batch().first_sequence().unwrap();
while let Some(mut next_node) = self.hot.pop() {
// Safety: Batches in the heap is not empty.
let next_first_ts = next_node.current_batch().first_timestamp().unwrap();
let next_first_seq = next_node.current_batch().first_sequence().unwrap();
if next_first_ts != timestamp {
// We are done, push the node with max seq.
self.cold.push(next_node);
break;
}
if max_seq < next_first_seq {
// The next node has larger seq.
max_seq_node.skip_rows(1, &mut self.metrics).await?;
self.metrics.num_duplicate_rows += 1;
if !max_seq_node.is_eof() {
self.cold.push(max_seq_node);
}
max_seq_node = next_node;
max_seq = next_first_seq;
} else {
next_node.skip_rows(1, &mut self.metrics).await?;
self.metrics.num_duplicate_rows += 1;
if !next_node.is_eof() {
// If the next node has smaller seq, skip that row.
self.cold.push(next_node);
}
}
}
debug_assert!(!max_seq_node.is_eof());
self.cold.push(max_seq_node);
// The merge window is updated, we need to refill the hot heap.
self.refill_hot();
Ok(())
}
/// Push the node popped from `hot` back to a proper heap.
fn reheap(&mut self, node: Node) -> Result<()> {
if node.is_eof() {
@@ -336,21 +225,8 @@ impl MergeReader {
/// If `filter_deleted` is set to true, removes deleted entries and sets the `batch` to the `output_batch`.
///
/// Ignores the `batch` if it is empty.
fn maybe_output_batch(
mut batch: Batch,
output_batch: &mut Option<Batch>,
filter_deleted: bool,
metrics: &mut Metrics,
) -> Result<()> {
fn maybe_output_batch(batch: Batch, output_batch: &mut Option<Batch>) -> Result<()> {
debug_assert!(output_batch.is_none());
let num_rows = batch.num_rows();
if filter_deleted {
batch.filter_deleted()?;
}
// Update deleted rows metrics.
metrics.num_deleted_rows += num_rows - batch.num_rows();
if batch.is_empty() {
return Ok(());
}
@@ -361,15 +237,12 @@ impl MergeReader {
}
/// Builder to build and initialize a [MergeReader].
#[derive(Default)]
pub struct MergeReaderBuilder {
/// Input sources.
///
/// All source must yield batches with the same schema.
sources: Vec<Source>,
/// Remove duplicate timestamps. Default is true.
dedup: bool,
/// Remove deletion markers.
filter_deleted: bool,
}
impl MergeReaderBuilder {
@@ -379,16 +252,8 @@ impl MergeReaderBuilder {
}
/// Creates a builder from sources.
pub fn from_sources(
sources: Vec<Source>,
dedup: bool,
filter_deleted: bool,
) -> MergeReaderBuilder {
MergeReaderBuilder {
sources,
dedup,
filter_deleted,
}
pub fn from_sources(sources: Vec<Source>) -> MergeReaderBuilder {
MergeReaderBuilder { sources }
}
/// Pushes a batch reader to sources.
@@ -406,17 +271,7 @@ impl MergeReaderBuilder {
/// Builds and initializes the reader, then resets the builder.
pub async fn build(&mut self) -> Result<MergeReader> {
let sources = mem::take(&mut self.sources);
MergeReader::new(sources, self.dedup, self.filter_deleted).await
}
}
impl Default for MergeReaderBuilder {
fn default() -> Self {
MergeReaderBuilder {
sources: Vec::new(),
dedup: true,
filter_deleted: true,
}
MergeReader::new(sources).await
}
}
@@ -431,12 +286,8 @@ struct Metrics {
num_fetch_by_rows: usize,
/// Number of input rows.
num_input_rows: usize,
/// Number of skipped duplicate rows.
num_duplicate_rows: usize,
/// Number of output rows.
num_output_rows: usize,
/// Number of deleted rows.
num_deleted_rows: usize,
/// Cost to fetch batches from sources.
fetch_cost: Duration,
}
@@ -672,15 +523,26 @@ mod tests {
&[OpType::Put, OpType::Put],
&[24, 25],
),
new_batch(b"k1", &[7], &[17], &[OpType::Put], &[27]),
new_batch(b"k2", &[3], &[13], &[OpType::Put], &[23]),
new_batch(
b"k1",
&[7, 8],
&[17, 18],
&[OpType::Put, OpType::Delete],
&[27, 28],
),
new_batch(
b"k2",
&[2, 3],
&[12, 13],
&[OpType::Delete, OpType::Put],
&[22, 23],
),
],
)
.await;
assert_eq!(8, reader.metrics.num_input_rows);
assert_eq!(6, reader.metrics.num_output_rows);
assert_eq!(2, reader.metrics.num_deleted_rows);
assert_eq!(8, reader.metrics.num_output_rows);
}
#[tokio::test]
@@ -782,17 +644,24 @@ mod tests {
),
new_batch(b"k1", &[3], &[10], &[OpType::Put], &[33]),
new_batch(b"k1", &[4], &[14], &[OpType::Put], &[24]),
new_batch(b"k1", &[4], &[10], &[OpType::Put], &[34]),
new_batch(b"k1", &[5], &[15], &[OpType::Delete], &[25]),
new_batch(b"k1", &[5], &[10], &[OpType::Put], &[35]),
new_batch(b"k2", &[1], &[11], &[OpType::Put], &[21]),
new_batch(b"k2", &[3], &[13], &[OpType::Put], &[23]),
new_batch(
b"k2",
&[2, 3],
&[12, 13],
&[OpType::Delete, OpType::Put],
&[22, 23],
),
new_batch(b"k2", &[10], &[20], &[OpType::Put], &[30]),
],
)
.await;
assert_eq!(11, reader.metrics.num_input_rows);
assert_eq!(7, reader.metrics.num_output_rows);
assert_eq!(2, reader.metrics.num_deleted_rows);
assert_eq!(2, reader.metrics.num_duplicate_rows);
assert_eq!(11, reader.metrics.num_output_rows);
}
#[tokio::test]
@@ -828,7 +697,29 @@ mod tests {
.unwrap();
check_reader_result(
&mut reader,
&[new_batch(b"k2", &[3], &[13], &[OpType::Put], &[23])],
&[
new_batch(
b"k1",
&[1, 2],
&[11, 12],
&[OpType::Delete, OpType::Delete],
&[21, 22],
),
new_batch(
b"k1",
&[4, 5],
&[14, 15],
&[OpType::Delete, OpType::Delete],
&[24, 25],
),
new_batch(
b"k2",
&[2, 3],
&[12, 13],
&[OpType::Delete, OpType::Put],
&[22, 23],
),
],
)
.await;
}
@@ -842,7 +733,6 @@ mod tests {
&[OpType::Put, OpType::Put],
&[21, 22],
)]);
// This reader will be empty after skipping the timestamp.
let reader2 = VecBatchReader::new(&[new_batch(b"k1", &[1], &[10], &[OpType::Put], &[33])]);
let mut reader = MergeReaderBuilder::new()
.push_batch_reader(Box::new(reader1))
@@ -852,20 +742,17 @@ mod tests {
.unwrap();
check_reader_result(
&mut reader,
&[new_batch(
b"k1",
&[1, 2],
&[11, 12],
&[OpType::Put, OpType::Put],
&[21, 22],
)],
&[
new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21]),
new_batch(b"k1", &[1], &[10], &[OpType::Put], &[33]),
new_batch(b"k1", &[2], &[12], &[OpType::Put], &[22]),
],
)
.await;
}
#[tokio::test]
async fn test_merge_top_node_empty() {
// This reader will be empty after skipping the timestamp 2.
let reader1 = VecBatchReader::new(&[new_batch(
b"k1",
&[1, 2],
@@ -890,13 +777,9 @@ mod tests {
&mut reader,
&[
new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]),
new_batch(
b"k1",
&[2, 3],
&[11, 11],
&[OpType::Put, OpType::Put],
&[32, 33],
),
new_batch(b"k1", &[2], &[11], &[OpType::Put], &[32]),
new_batch(b"k1", &[2], &[10], &[OpType::Put], &[22]),
new_batch(b"k1", &[3], &[11], &[OpType::Put], &[33]),
],
)
.await;
@@ -938,6 +821,7 @@ mod tests {
&mut reader,
&[
new_batch(b"k1", &[1], &[11], &[OpType::Put], &[31]),
new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]),
new_batch(
b"k1",
&[6, 8],
@@ -963,9 +847,13 @@ mod tests {
builder.push_batch_reader(Box::new(reader));
}
let mut reader = builder.build().await.unwrap();
let expect: Vec<_> = (0..8)
.map(|ts| new_batch(b"k1", &[ts], &[9], &[OpType::Put], &[100]))
.collect();
let mut expect = Vec::with_capacity(80);
for ts in 0..8 {
for i in 0..10 {
let batch = new_batch(b"k1", &[ts], &[9 - i], &[OpType::Put], &[100]);
expect.push(batch);
}
}
check_reader_result(&mut reader, &expect).await;
}
@@ -989,7 +877,7 @@ mod tests {
Source::Reader(Box::new(reader1)),
Source::Iter(Box::new(reader2)),
];
let mut reader = MergeReaderBuilder::from_sources(sources, false, true)
let mut reader = MergeReaderBuilder::from_sources(sources)
.build()
.await
.unwrap();

View File

@@ -34,7 +34,8 @@ use tokio::sync::Semaphore;
use crate::error::Result;
use crate::memtable::MemtableRef;
use crate::read::merge::{MergeReader, MergeReaderBuilder};
use crate::read::dedup::{DedupReader, LastRow};
use crate::read::merge::MergeReaderBuilder;
use crate::read::scan_region::{
FileRangeCollector, ScanInput, ScanPart, ScanPartList, StreamContext,
};
@@ -149,7 +150,7 @@ impl SeqScan {
partition: Option<usize>,
semaphore: Arc<Semaphore>,
metrics: &mut ScannerMetrics,
) -> Result<Option<MergeReader>> {
) -> Result<Option<BoxedBatchReader>> {
let mut parts = stream_ctx.parts.lock().await;
maybe_init_parts(&stream_ctx.input, &mut parts, metrics).await?;
@@ -186,10 +187,20 @@ impl SeqScan {
.create_parallel_sources(sources, semaphore.clone())?;
}
let mut builder = MergeReaderBuilder::from_sources(sources);
let reader = builder.build().await?;
let dedup = !stream_ctx.input.append_mode;
let mut builder =
MergeReaderBuilder::from_sources(sources, dedup, stream_ctx.input.filter_deleted);
builder.build().await.map(Some)
if dedup {
let reader = Box::new(DedupReader::new(
reader,
LastRow::new(stream_ctx.input.filter_deleted),
));
Ok(Some(reader))
} else {
let reader = Box::new(reader);
Ok(Some(reader))
}
}
/// Scans one partition or all partitions.