diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 042eaf2124..8b9c549207 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -15,6 +15,7 @@ //! Common structs and utilities for reading data. pub mod compat; +pub mod dedup; pub mod merge; pub mod projection; pub(crate) mod scan_region; @@ -74,8 +75,6 @@ pub struct Batch { /// /// UInt8 type, not null. op_types: Arc, - /// True if op types only contains put operations. - put_only: bool, /// Fields organized in columnar format. fields: Vec, } @@ -225,7 +224,6 @@ impl Batch { sequences: Arc::new(self.sequences.get_slice(offset, length)), op_types: Arc::new(self.op_types.get_slice(offset, length)), fields, - put_only: self.put_only, } } @@ -292,11 +290,6 @@ impl Batch { /// Removes rows whose op type is delete. pub fn filter_deleted(&mut self) -> Result<()> { - if self.put_only { - // If there is only put operation, we can skip comparison and filtering. - return Ok(()); - } - // Safety: op type column is not null. let array = self.op_types.as_arrow(); // Find rows with non-delete op type. @@ -327,10 +320,6 @@ impl Batch { ) .unwrap(), ); - // Also updates put_only field if it contains other ops. - if !self.put_only { - self.put_only = is_put_only(&self.op_types); - } for batch_column in &mut self.fields { batch_column.data = batch_column .data @@ -454,10 +443,6 @@ impl Batch { let array = arrow::compute::take(self.op_types.as_arrow(), indices.as_arrow(), None) .context(ComputeArrowSnafu)?; self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap()); - // Also updates put_only field if it contains other ops. - if !self.put_only { - self.put_only = is_put_only(&self.op_types); - } for batch_column in &mut self.fields { batch_column.data = batch_column .data @@ -491,16 +476,6 @@ impl Batch { } } -/// Returns whether the op types vector only contains put operation. -fn is_put_only(op_types: &UInt8Vector) -> bool { - // Safety: Op types is not null. - op_types - .as_arrow() - .values() - .iter() - .all(|v| *v == OpType::Put as u8) -} - /// Len of timestamp in arrow row format. const TIMESTAMP_KEY_LEN: usize = 9; @@ -676,10 +651,6 @@ impl BatchBuilder { ); } - // Checks whether op types are put only. In the future, we may get this from statistics - // in memtables and SSTs. - let put_only = is_put_only(&op_types); - Ok(Batch { primary_key: self.primary_key, pk_values: None, @@ -687,7 +658,6 @@ impl BatchBuilder { sequences, op_types, fields: self.fields, - put_only, }) } } @@ -994,7 +964,6 @@ mod tests { &[OpType::Delete, OpType::Put, OpType::Delete, OpType::Put], &[21, 22, 23, 24], ); - assert!(!batch.put_only); batch.filter_deleted().unwrap(); let expect = new_batch(&[2, 4], &[12, 14], &[OpType::Put, OpType::Put], &[22, 24]); assert_eq!(expect, batch); @@ -1005,7 +974,6 @@ mod tests { &[OpType::Put, OpType::Put, OpType::Put, OpType::Put], &[21, 22, 23, 24], ); - assert!(batch.put_only); let expect = batch.clone(); batch.filter_deleted().unwrap(); assert_eq!(expect, batch); diff --git a/src/mito2/src/read/dedup.rs b/src/mito2/src/read/dedup.rs new file mode 100644 index 0000000000..0ba6f86113 --- /dev/null +++ b/src/mito2/src/read/dedup.rs @@ -0,0 +1,336 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utilities to remove duplicate rows from a sorted batch. + +use async_trait::async_trait; +use common_telemetry::debug; +use common_time::Timestamp; + +use crate::error::Result; +use crate::metrics::MERGE_FILTER_ROWS_TOTAL; +use crate::read::{Batch, BatchReader}; + +/// A reader that dedup sorted batches from a source based on the +/// dedup strategy. +pub(crate) struct DedupReader { + source: R, + strategy: S, + metrics: DedupMetrics, +} + +impl DedupReader { + /// Creates a new dedup reader. + pub(crate) fn new(source: R, strategy: S) -> Self { + Self { + source, + strategy, + metrics: DedupMetrics::default(), + } + } +} + +impl DedupReader { + /// Returns the next deduplicated batch. + async fn fetch_next_batch(&mut self) -> Result> { + while let Some(batch) = self.source.next_batch().await? { + if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? { + return Ok(Some(batch)); + } + } + + self.strategy.finish(&mut self.metrics) + } +} + +#[async_trait] +impl BatchReader for DedupReader { + async fn next_batch(&mut self) -> Result> { + self.fetch_next_batch().await + } +} + +impl Drop for DedupReader { + fn drop(&mut self) { + debug!("Dedup reader finished, metrics: {:?}", self.metrics); + + MERGE_FILTER_ROWS_TOTAL + .with_label_values(&["dedup"]) + .inc_by(self.metrics.num_unselected_rows as u64); + MERGE_FILTER_ROWS_TOTAL + .with_label_values(&["delete"]) + .inc_by(self.metrics.num_unselected_rows as u64); + } +} + +#[cfg(test)] +impl DedupReader { + fn metrics(&self) -> &DedupMetrics { + &self.metrics + } +} + +/// Strategy to remove duplicate rows from sorted batches. +pub(crate) trait DedupStrategy: Send { + /// Pushes a batch to the dedup strategy. + /// Returns the deduplicated batch. + fn push_batch(&mut self, batch: Batch, metrics: &mut DedupMetrics) -> Result>; + + /// Finishes the deduplication and resets the strategy. + /// + /// Users must ensure that `push_batch` is called for all batches before + /// calling this method. + fn finish(&mut self, metrics: &mut DedupMetrics) -> Result>; +} + +/// State of the last row in a batch for dedup. +struct BatchLastRow { + primary_key: Vec, + /// The last timestamp of the batch. + timestamp: Timestamp, +} + +/// Dedup strategy that keeps the row with latest sequence of each key. +/// +/// This strategy is optimized specially based on the properties of the SST files, +/// memtables and the merge reader. It assumes that batches from files and memtables +/// don't contain duplicate rows and the merge reader never concatenates batches from +/// different source. +/// +/// We might implement a new strategy if we need to process files with duplicate rows. +pub(crate) struct LastRow { + /// Meta of the last row in the previous batch that has the same key + /// as the batch to push. + prev_batch: Option, + /// Filter deleted rows. + filter_deleted: bool, +} + +impl LastRow { + /// Creates a new strategy with the given `filter_deleted` flag. + pub(crate) fn new(filter_deleted: bool) -> Self { + Self { + prev_batch: None, + filter_deleted, + } + } +} + +impl DedupStrategy for LastRow { + fn push_batch( + &mut self, + mut batch: Batch, + metrics: &mut DedupMetrics, + ) -> Result> { + if batch.is_empty() { + return Ok(None); + } + debug_assert!(batch.first_timestamp().is_some()); + let prev_timestamp = match &self.prev_batch { + Some(prev_batch) => { + if prev_batch.primary_key != batch.primary_key() { + // The key has changed. This is the first batch of the + // new key. + None + } else { + Some(prev_batch.timestamp) + } + } + None => None, + }; + if batch.first_timestamp() == prev_timestamp { + metrics.num_unselected_rows += 1; + // This batch contains a duplicate row, skip it. + if batch.num_rows() == 1 { + // We don't need to update `prev_batch` because they have the same + // key and timestamp. + return Ok(None); + } + // Skips the first row. + batch = batch.slice(1, batch.num_rows() - 1); + } + + // Store current batch to `prev_batch` so we could compare the next batch + // with this batch. We store batch before filtering it as rows with `OpType::Delete` + // would be removed from the batch after filter, then we may store an incorrect `last row` + // of previous batch. + match &mut self.prev_batch { + Some(prev) => { + // Reuse the primary key buffer. + prev.primary_key.clone_from(&batch.primary_key); + prev.timestamp = batch.last_timestamp().unwrap(); + } + None => { + self.prev_batch = Some(BatchLastRow { + primary_key: batch.primary_key().to_vec(), + timestamp: batch.last_timestamp().unwrap(), + }) + } + } + + // Filters deleted rows. + if self.filter_deleted { + let num_rows = batch.num_rows(); + batch.filter_deleted()?; + let num_rows_after_filter = batch.num_rows(); + let num_deleted = num_rows - num_rows_after_filter; + metrics.num_deleted_rows += num_deleted; + metrics.num_unselected_rows += num_deleted; + } + + // The batch can become empty if all rows are deleted. + if batch.is_empty() { + Ok(None) + } else { + Ok(Some(batch)) + } + } + + fn finish(&mut self, _metrics: &mut DedupMetrics) -> Result> { + Ok(None) + } +} + +/// Metrics for deduplication. +#[derive(Debug, Default)] +pub(crate) struct DedupMetrics { + /// Number of rows removed during deduplication. + pub(crate) num_unselected_rows: usize, + /// Number of deleted rows. + pub(crate) num_deleted_rows: usize, +} + +#[cfg(test)] +mod tests { + use api::v1::OpType; + + use super::*; + use crate::test_util::{check_reader_result, new_batch, VecBatchReader}; + + #[tokio::test] + async fn test_dedup_reader_no_duplications() { + let input = [ + new_batch( + b"k1", + &[1, 2], + &[11, 12], + &[OpType::Put, OpType::Put], + &[21, 22], + ), + new_batch(b"k1", &[3], &[13], &[OpType::Put], &[23]), + new_batch( + b"k2", + &[1, 2], + &[111, 112], + &[OpType::Put, OpType::Put], + &[31, 32], + ), + ]; + let reader = VecBatchReader::new(&input); + let mut reader = DedupReader::new(reader, LastRow::new(true)); + check_reader_result(&mut reader, &input).await; + assert_eq!(0, reader.metrics().num_unselected_rows); + assert_eq!(0, reader.metrics().num_deleted_rows); + } + + #[tokio::test] + async fn test_dedup_reader_duplications() { + let input = [ + new_batch( + b"k1", + &[1, 2], + &[13, 11], + &[OpType::Put, OpType::Put], + &[11, 12], + ), + // empty batch. + new_batch(b"k1", &[], &[], &[], &[]), + // Duplicate with the previous batch. + new_batch( + b"k1", + &[2, 3, 4], + &[10, 13, 13], + &[OpType::Put, OpType::Put, OpType::Delete], + &[2, 13, 14], + ), + new_batch( + b"k2", + &[1, 2], + &[20, 20], + &[OpType::Put, OpType::Delete], + &[101, 0], + ), + new_batch(b"k2", &[2], &[19], &[OpType::Put], &[102]), + new_batch(b"k3", &[2], &[20], &[OpType::Put], &[202]), + // This batch won't increase the deleted rows count as it + // is filtered out by the previous batch. + new_batch(b"k3", &[2], &[19], &[OpType::Delete], &[0]), + ]; + let reader = VecBatchReader::new(&input); + // Filter deleted. + let mut reader = DedupReader::new(reader, LastRow::new(true)); + check_reader_result( + &mut reader, + &[ + new_batch( + b"k1", + &[1, 2], + &[13, 11], + &[OpType::Put, OpType::Put], + &[11, 12], + ), + new_batch(b"k1", &[3], &[13], &[OpType::Put], &[13]), + new_batch(b"k2", &[1], &[20], &[OpType::Put], &[101]), + new_batch(b"k3", &[2], &[20], &[OpType::Put], &[202]), + ], + ) + .await; + assert_eq!(5, reader.metrics().num_unselected_rows); + assert_eq!(2, reader.metrics().num_deleted_rows); + + // Does not filter deleted. + let reader = VecBatchReader::new(&input); + let mut reader = DedupReader::new(reader, LastRow::new(false)); + check_reader_result( + &mut reader, + &[ + new_batch( + b"k1", + &[1, 2], + &[13, 11], + &[OpType::Put, OpType::Put], + &[11, 12], + ), + new_batch( + b"k1", + &[3, 4], + &[13, 13], + &[OpType::Put, OpType::Delete], + &[13, 14], + ), + new_batch( + b"k2", + &[1, 2], + &[20, 20], + &[OpType::Put, OpType::Delete], + &[101, 0], + ), + new_batch(b"k3", &[2], &[20], &[OpType::Put], &[202]), + ], + ) + .await; + assert_eq!(3, reader.metrics().num_unselected_rows); + assert_eq!(0, reader.metrics().num_deleted_rows); + } +} diff --git a/src/mito2/src/read/merge.rs b/src/mito2/src/read/merge.rs index f8ba260645..9377bb447c 100644 --- a/src/mito2/src/read/merge.rs +++ b/src/mito2/src/read/merge.rs @@ -21,11 +21,10 @@ use std::time::{Duration, Instant}; use async_trait::async_trait; use common_telemetry::debug; -use common_time::Timestamp; use crate::error::Result; use crate::memtable::BoxedBatchIterator; -use crate::metrics::{MERGE_FILTER_ROWS_TOTAL, READ_STAGE_ELAPSED}; +use crate::metrics::READ_STAGE_ELAPSED; use crate::read::{Batch, BatchReader, BoxedBatchReader, Source}; /// Reader to merge sorted batches. @@ -33,9 +32,7 @@ use crate::read::{Batch, BatchReader, BoxedBatchReader, Source}; /// The merge reader merges [Batch]es from multiple sources that yield sorted batches. /// 1. Batch is ordered by primary key, time index, sequence desc, op type desc (we can /// ignore op type as sequence is already unique). -/// 2. Batch doesn't have duplicate elements (elements with the same primary key and time index) if -/// dedup is true. -/// 3. Batches from sources **must** not be empty. +/// 2. Batches from sources **must** not be empty. pub struct MergeReader { /// Holds [Node]s whose key range of current batch **is** overlapped with the merge window. /// Each node yields batches from a `source`. @@ -49,10 +46,6 @@ pub struct MergeReader { cold: BinaryHeap, /// Batch to output. output_batch: Option, - /// Remove duplicate timestamps. - dedup: bool, - /// Remove deletion markers - filter_deleted: bool, /// Local metrics. metrics: Metrics, } @@ -89,12 +82,6 @@ impl Drop for MergeReader { fn drop(&mut self) { debug!("Merge reader finished, metrics: {:?}", self.metrics); - MERGE_FILTER_ROWS_TOTAL - .with_label_values(&["dedup"]) - .inc_by(self.metrics.num_duplicate_rows as u64); - MERGE_FILTER_ROWS_TOTAL - .with_label_values(&["delete"]) - .inc_by(self.metrics.num_deleted_rows as u64); READ_STAGE_ELAPSED .with_label_values(&["merge"]) .observe(self.metrics.scan_cost.as_secs_f64()); @@ -103,11 +90,7 @@ impl Drop for MergeReader { impl MergeReader { /// Creates and initializes a new [MergeReader]. - pub async fn new( - sources: Vec, - dedup: bool, - filter_deleted: bool, - ) -> Result { + pub async fn new(sources: Vec) -> Result { let start = Instant::now(); let mut metrics = Metrics::default(); @@ -121,15 +104,10 @@ impl MergeReader { } } - // If dedup is false, we don't expect delete happens and we skip filtering deletion markers. - let filter_deleted = filter_deleted && dedup; - let mut reader = MergeReader { hot, cold, output_batch: None, - dedup, - filter_deleted, metrics, }; // Initializes the reader. @@ -164,18 +142,11 @@ impl MergeReader { let mut hottest = self.hot.pop().unwrap(); let batch = hottest.fetch_batch(&mut self.metrics).await?; - Self::maybe_output_batch( - batch, - &mut self.output_batch, - self.filter_deleted, - &mut self.metrics, - )?; + Self::maybe_output_batch(batch, &mut self.output_batch)?; self.reheap(hottest) } /// Fetches non-duplicated rows from the hottest node. - /// - /// If `dedup` is true, it skips the timestamp duplicated with the first timestamp in the next node. async fn fetch_rows_from_hottest(&mut self) -> Result<()> { // Safety: `fetch_batches_to_output()` ensures the hot heap has more than 1 element. // Pop hottest node. @@ -200,36 +171,12 @@ impl MergeReader { Ok(pos) => pos, Err(pos) => { // No duplicate timestamp. Outputs timestamp before `pos`. - Self::maybe_output_batch( - top.slice(0, pos), - &mut self.output_batch, - self.filter_deleted, - &mut self.metrics, - )?; + Self::maybe_output_batch(top.slice(0, pos), &mut self.output_batch)?; top_node.skip_rows(pos, &mut self.metrics).await?; return self.reheap(top_node); } }; - if self.dedup { - // They have duplicate timestamps. Outputs timestamps before the duplicated timestamp. - // Batch itself doesn't contain duplicate timestamps so timestamps before `duplicate_pos` - // must be less than `next_min_ts`. - Self::maybe_output_batch( - top.slice(0, duplicate_pos), - &mut self.output_batch, - self.filter_deleted, - &mut self.metrics, - )?; - // This keep the duplicate timestamp in the node. - top_node.skip_rows(duplicate_pos, &mut self.metrics).await?; - // The merge window should contain this timestamp so only nodes in the hot heap - // have this timestamp. - return self - .filter_first_duplicate_timestamp_in_hot(top_node, next_min_ts) - .await; - } - // No need to remove duplicate timestamps. let output_end = if duplicate_pos == 0 { // If the first timestamp of the top node is duplicate. We can simply return the first row @@ -240,69 +187,11 @@ impl MergeReader { // the duplicate pos. duplicate_pos }; - Self::maybe_output_batch( - top.slice(0, output_end), - &mut self.output_batch, - self.filter_deleted, - &mut self.metrics, - )?; + Self::maybe_output_batch(top.slice(0, output_end), &mut self.output_batch)?; top_node.skip_rows(output_end, &mut self.metrics).await?; self.reheap(top_node) } - /// Filters the first duplicate `timestamp` in `top_node` and `hot` heap. Only keeps the timestamp - /// with the maximum sequence. - async fn filter_first_duplicate_timestamp_in_hot( - &mut self, - top_node: Node, - timestamp: Timestamp, - ) -> Result<()> { - debug_assert_eq!( - top_node.current_batch().first_timestamp().unwrap(), - timestamp - ); - - // The node with maximum sequence. - let mut max_seq_node = top_node; - let mut max_seq = max_seq_node.current_batch().first_sequence().unwrap(); - while let Some(mut next_node) = self.hot.pop() { - // Safety: Batches in the heap is not empty. - let next_first_ts = next_node.current_batch().first_timestamp().unwrap(); - let next_first_seq = next_node.current_batch().first_sequence().unwrap(); - - if next_first_ts != timestamp { - // We are done, push the node with max seq. - self.cold.push(next_node); - break; - } - - if max_seq < next_first_seq { - // The next node has larger seq. - max_seq_node.skip_rows(1, &mut self.metrics).await?; - self.metrics.num_duplicate_rows += 1; - if !max_seq_node.is_eof() { - self.cold.push(max_seq_node); - } - max_seq_node = next_node; - max_seq = next_first_seq; - } else { - next_node.skip_rows(1, &mut self.metrics).await?; - self.metrics.num_duplicate_rows += 1; - if !next_node.is_eof() { - // If the next node has smaller seq, skip that row. - self.cold.push(next_node); - } - } - } - debug_assert!(!max_seq_node.is_eof()); - self.cold.push(max_seq_node); - - // The merge window is updated, we need to refill the hot heap. - self.refill_hot(); - - Ok(()) - } - /// Push the node popped from `hot` back to a proper heap. fn reheap(&mut self, node: Node) -> Result<()> { if node.is_eof() { @@ -336,21 +225,8 @@ impl MergeReader { /// If `filter_deleted` is set to true, removes deleted entries and sets the `batch` to the `output_batch`. /// /// Ignores the `batch` if it is empty. - fn maybe_output_batch( - mut batch: Batch, - output_batch: &mut Option, - filter_deleted: bool, - metrics: &mut Metrics, - ) -> Result<()> { + fn maybe_output_batch(batch: Batch, output_batch: &mut Option) -> Result<()> { debug_assert!(output_batch.is_none()); - - let num_rows = batch.num_rows(); - - if filter_deleted { - batch.filter_deleted()?; - } - // Update deleted rows metrics. - metrics.num_deleted_rows += num_rows - batch.num_rows(); if batch.is_empty() { return Ok(()); } @@ -361,15 +237,12 @@ impl MergeReader { } /// Builder to build and initialize a [MergeReader]. +#[derive(Default)] pub struct MergeReaderBuilder { /// Input sources. /// /// All source must yield batches with the same schema. sources: Vec, - /// Remove duplicate timestamps. Default is true. - dedup: bool, - /// Remove deletion markers. - filter_deleted: bool, } impl MergeReaderBuilder { @@ -379,16 +252,8 @@ impl MergeReaderBuilder { } /// Creates a builder from sources. - pub fn from_sources( - sources: Vec, - dedup: bool, - filter_deleted: bool, - ) -> MergeReaderBuilder { - MergeReaderBuilder { - sources, - dedup, - filter_deleted, - } + pub fn from_sources(sources: Vec) -> MergeReaderBuilder { + MergeReaderBuilder { sources } } /// Pushes a batch reader to sources. @@ -406,17 +271,7 @@ impl MergeReaderBuilder { /// Builds and initializes the reader, then resets the builder. pub async fn build(&mut self) -> Result { let sources = mem::take(&mut self.sources); - MergeReader::new(sources, self.dedup, self.filter_deleted).await - } -} - -impl Default for MergeReaderBuilder { - fn default() -> Self { - MergeReaderBuilder { - sources: Vec::new(), - dedup: true, - filter_deleted: true, - } + MergeReader::new(sources).await } } @@ -431,12 +286,8 @@ struct Metrics { num_fetch_by_rows: usize, /// Number of input rows. num_input_rows: usize, - /// Number of skipped duplicate rows. - num_duplicate_rows: usize, /// Number of output rows. num_output_rows: usize, - /// Number of deleted rows. - num_deleted_rows: usize, /// Cost to fetch batches from sources. fetch_cost: Duration, } @@ -672,15 +523,26 @@ mod tests { &[OpType::Put, OpType::Put], &[24, 25], ), - new_batch(b"k1", &[7], &[17], &[OpType::Put], &[27]), - new_batch(b"k2", &[3], &[13], &[OpType::Put], &[23]), + new_batch( + b"k1", + &[7, 8], + &[17, 18], + &[OpType::Put, OpType::Delete], + &[27, 28], + ), + new_batch( + b"k2", + &[2, 3], + &[12, 13], + &[OpType::Delete, OpType::Put], + &[22, 23], + ), ], ) .await; assert_eq!(8, reader.metrics.num_input_rows); - assert_eq!(6, reader.metrics.num_output_rows); - assert_eq!(2, reader.metrics.num_deleted_rows); + assert_eq!(8, reader.metrics.num_output_rows); } #[tokio::test] @@ -782,17 +644,24 @@ mod tests { ), new_batch(b"k1", &[3], &[10], &[OpType::Put], &[33]), new_batch(b"k1", &[4], &[14], &[OpType::Put], &[24]), + new_batch(b"k1", &[4], &[10], &[OpType::Put], &[34]), + new_batch(b"k1", &[5], &[15], &[OpType::Delete], &[25]), + new_batch(b"k1", &[5], &[10], &[OpType::Put], &[35]), new_batch(b"k2", &[1], &[11], &[OpType::Put], &[21]), - new_batch(b"k2", &[3], &[13], &[OpType::Put], &[23]), + new_batch( + b"k2", + &[2, 3], + &[12, 13], + &[OpType::Delete, OpType::Put], + &[22, 23], + ), new_batch(b"k2", &[10], &[20], &[OpType::Put], &[30]), ], ) .await; assert_eq!(11, reader.metrics.num_input_rows); - assert_eq!(7, reader.metrics.num_output_rows); - assert_eq!(2, reader.metrics.num_deleted_rows); - assert_eq!(2, reader.metrics.num_duplicate_rows); + assert_eq!(11, reader.metrics.num_output_rows); } #[tokio::test] @@ -828,7 +697,29 @@ mod tests { .unwrap(); check_reader_result( &mut reader, - &[new_batch(b"k2", &[3], &[13], &[OpType::Put], &[23])], + &[ + new_batch( + b"k1", + &[1, 2], + &[11, 12], + &[OpType::Delete, OpType::Delete], + &[21, 22], + ), + new_batch( + b"k1", + &[4, 5], + &[14, 15], + &[OpType::Delete, OpType::Delete], + &[24, 25], + ), + new_batch( + b"k2", + &[2, 3], + &[12, 13], + &[OpType::Delete, OpType::Put], + &[22, 23], + ), + ], ) .await; } @@ -842,7 +733,6 @@ mod tests { &[OpType::Put, OpType::Put], &[21, 22], )]); - // This reader will be empty after skipping the timestamp. let reader2 = VecBatchReader::new(&[new_batch(b"k1", &[1], &[10], &[OpType::Put], &[33])]); let mut reader = MergeReaderBuilder::new() .push_batch_reader(Box::new(reader1)) @@ -852,20 +742,17 @@ mod tests { .unwrap(); check_reader_result( &mut reader, - &[new_batch( - b"k1", - &[1, 2], - &[11, 12], - &[OpType::Put, OpType::Put], - &[21, 22], - )], + &[ + new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21]), + new_batch(b"k1", &[1], &[10], &[OpType::Put], &[33]), + new_batch(b"k1", &[2], &[12], &[OpType::Put], &[22]), + ], ) .await; } #[tokio::test] async fn test_merge_top_node_empty() { - // This reader will be empty after skipping the timestamp 2. let reader1 = VecBatchReader::new(&[new_batch( b"k1", &[1, 2], @@ -890,13 +777,9 @@ mod tests { &mut reader, &[ new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]), - new_batch( - b"k1", - &[2, 3], - &[11, 11], - &[OpType::Put, OpType::Put], - &[32, 33], - ), + new_batch(b"k1", &[2], &[11], &[OpType::Put], &[32]), + new_batch(b"k1", &[2], &[10], &[OpType::Put], &[22]), + new_batch(b"k1", &[3], &[11], &[OpType::Put], &[33]), ], ) .await; @@ -938,6 +821,7 @@ mod tests { &mut reader, &[ new_batch(b"k1", &[1], &[11], &[OpType::Put], &[31]), + new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]), new_batch( b"k1", &[6, 8], @@ -963,9 +847,13 @@ mod tests { builder.push_batch_reader(Box::new(reader)); } let mut reader = builder.build().await.unwrap(); - let expect: Vec<_> = (0..8) - .map(|ts| new_batch(b"k1", &[ts], &[9], &[OpType::Put], &[100])) - .collect(); + let mut expect = Vec::with_capacity(80); + for ts in 0..8 { + for i in 0..10 { + let batch = new_batch(b"k1", &[ts], &[9 - i], &[OpType::Put], &[100]); + expect.push(batch); + } + } check_reader_result(&mut reader, &expect).await; } @@ -989,7 +877,7 @@ mod tests { Source::Reader(Box::new(reader1)), Source::Iter(Box::new(reader2)), ]; - let mut reader = MergeReaderBuilder::from_sources(sources, false, true) + let mut reader = MergeReaderBuilder::from_sources(sources) .build() .await .unwrap(); diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index cc810ba0d8..f957f2b04c 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -34,7 +34,8 @@ use tokio::sync::Semaphore; use crate::error::Result; use crate::memtable::MemtableRef; -use crate::read::merge::{MergeReader, MergeReaderBuilder}; +use crate::read::dedup::{DedupReader, LastRow}; +use crate::read::merge::MergeReaderBuilder; use crate::read::scan_region::{ FileRangeCollector, ScanInput, ScanPart, ScanPartList, StreamContext, }; @@ -149,7 +150,7 @@ impl SeqScan { partition: Option, semaphore: Arc, metrics: &mut ScannerMetrics, - ) -> Result> { + ) -> Result> { let mut parts = stream_ctx.parts.lock().await; maybe_init_parts(&stream_ctx.input, &mut parts, metrics).await?; @@ -186,10 +187,20 @@ impl SeqScan { .create_parallel_sources(sources, semaphore.clone())?; } + let mut builder = MergeReaderBuilder::from_sources(sources); + let reader = builder.build().await?; + let dedup = !stream_ctx.input.append_mode; - let mut builder = - MergeReaderBuilder::from_sources(sources, dedup, stream_ctx.input.filter_deleted); - builder.build().await.map(Some) + if dedup { + let reader = Box::new(DedupReader::new( + reader, + LastRow::new(stream_ctx.input.filter_deleted), + )); + Ok(Some(reader)) + } else { + let reader = Box::new(reader); + Ok(Some(reader)) + } } /// Scans one partition or all partitions.