From 5af87baeb0b2fed644bde99a79d47fc79fa3d87f Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Tue, 16 Apr 2024 14:34:41 +0800 Subject: [PATCH] feat: add `filter_deleted` option to avoid removing deletion markers (#3707) * feat: add `filter_deleted` scan option to avoid removing deletion markers. * refactor: move sort_batches_and_print to test_util --- src/mito2/src/engine.rs | 2 + src/mito2/src/engine/append_mode_test.rs | 32 +----- src/mito2/src/engine/filter_deleted_test.rs | 102 ++++++++++++++++++++ src/mito2/src/read/merge.rs | 53 +++++++--- src/mito2/src/read/scan_region.rs | 27 +++++- src/mito2/src/read/seq_scan.rs | 6 +- src/mito2/src/test_util.rs | 1 + src/mito2/src/test_util/batch_util.rs | 47 +++++++++ 8 files changed, 219 insertions(+), 51 deletions(-) create mode 100644 src/mito2/src/engine/filter_deleted_test.rs create mode 100644 src/mito2/src/test_util/batch_util.rs diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 54df1b1b6e..955aa9c7ee 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -31,6 +31,8 @@ mod create_test; #[cfg(test)] mod drop_test; #[cfg(test)] +mod filter_deleted_test; +#[cfg(test)] mod flush_test; #[cfg(any(test, feature = "test"))] pub mod listener; diff --git a/src/mito2/src/engine/append_mode_test.rs b/src/mito2/src/engine/append_mode_test.rs index 05d7dad1d6..d1fc417390 100644 --- a/src/mito2/src/engine/append_mode_test.rs +++ b/src/mito2/src/engine/append_mode_test.rs @@ -16,14 +16,12 @@ use api::v1::Rows; use common_recordbatch::RecordBatches; -use datatypes::arrow::compute::{self, SortColumn}; -use datatypes::arrow::record_batch::RecordBatch; -use datatypes::arrow::util::pretty; use store_api::region_engine::RegionEngine; use store_api::region_request::{RegionCompactRequest, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; use crate::config::MitoConfig; +use crate::test_util::batch_util::sort_batches_and_print; use crate::test_util::{ build_rows, build_rows_for_key, flush_region, put_rows, reopen_region, rows_schema, CreateRequestBuilder, TestEnv, @@ -191,31 +189,3 @@ async fn test_append_mode_compaction() { let batches = RecordBatches::try_collect(stream).await.unwrap(); assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"])); } - -/// Sorts `batches` by column `names`. -fn sort_batches_and_print(batches: &RecordBatches, names: &[&str]) -> String { - let schema = batches.schema(); - let record_batches = batches.iter().map(|batch| batch.df_record_batch()); - let record_batch = compute::concat_batches(schema.arrow_schema(), record_batches).unwrap(); - let columns: Vec<_> = names - .iter() - .map(|name| { - let array = record_batch.column_by_name(name).unwrap(); - SortColumn { - values: array.clone(), - options: None, - } - }) - .collect(); - let indices = compute::lexsort_to_indices(&columns, None).unwrap(); - let columns = record_batch - .columns() - .iter() - .map(|array| compute::take(&array, &indices, None).unwrap()) - .collect(); - let record_batch = RecordBatch::try_new(record_batch.schema(), columns).unwrap(); - - pretty::pretty_format_batches(&[record_batch]) - .unwrap() - .to_string() -} diff --git a/src/mito2/src/engine/filter_deleted_test.rs b/src/mito2/src/engine/filter_deleted_test.rs new file mode 100644 index 0000000000..c3c35f9ba0 --- /dev/null +++ b/src/mito2/src/engine/filter_deleted_test.rs @@ -0,0 +1,102 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::Rows; +use common_recordbatch::RecordBatches; +use store_api::region_engine::RegionEngine; +use store_api::region_request::RegionRequest; +use store_api::storage::{RegionId, ScanRequest}; + +use crate::config::MitoConfig; +use crate::test_util::batch_util::sort_batches_and_print; +use crate::test_util::{ + build_rows, delete_rows, flush_region, put_rows, rows_schema, CreateRequestBuilder, TestEnv, +}; + +#[tokio::test] +async fn test_scan_without_filtering_deleted() { + common_telemetry::init_default_ut_logging(); + + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new() + .insert_option("compaction.type", "twcs") + .insert_option("compaction.twcs.max_active_window_files", "10") + .build(); + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // put 1, 2, 3, 4 and flush + put_rows( + &engine, + region_id, + Rows { + schema: column_schemas.clone(), + rows: build_rows(1, 5), + }, + ) + .await; + flush_region(&engine, region_id, None).await; + + // delete 2, 3 and flush + delete_rows( + &engine, + region_id, + Rows { + schema: column_schemas.clone(), + rows: build_rows(2, 4), + }, + ) + .await; + flush_region(&engine, region_id, None).await; + + // scan + let request = ScanRequest::default(); + let stream = engine.handle_query(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 4 | 4.0 | 1970-01-01T00:00:04 | ++-------+---------+---------------------+"; + assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"])); + + // Tries to use seq scan to test it under append mode. + let scan = engine + .scan_region(region_id, ScanRequest::default()) + .unwrap(); + + let seq_scan = scan.scan_without_filter_deleted().unwrap(); + + let stream = seq_scan.build_stream().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | +| 3 | 3.0 | 1970-01-01T00:00:03 | +| 4 | 4.0 | 1970-01-01T00:00:04 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} diff --git a/src/mito2/src/read/merge.rs b/src/mito2/src/read/merge.rs index ca758d2825..f8ba260645 100644 --- a/src/mito2/src/read/merge.rs +++ b/src/mito2/src/read/merge.rs @@ -51,6 +51,8 @@ pub struct MergeReader { output_batch: Option, /// Remove duplicate timestamps. dedup: bool, + /// Remove deletion markers + filter_deleted: bool, /// Local metrics. metrics: Metrics, } @@ -101,7 +103,11 @@ impl Drop for MergeReader { impl MergeReader { /// Creates and initializes a new [MergeReader]. - pub async fn new(sources: Vec, dedup: bool) -> Result { + pub async fn new( + sources: Vec, + dedup: bool, + filter_deleted: bool, + ) -> Result { let start = Instant::now(); let mut metrics = Metrics::default(); @@ -115,11 +121,15 @@ impl MergeReader { } } + // If dedup is false, we don't expect delete happens and we skip filtering deletion markers. + let filter_deleted = filter_deleted && dedup; + let mut reader = MergeReader { hot, cold, output_batch: None, dedup, + filter_deleted, metrics, }; // Initializes the reader. @@ -154,7 +164,12 @@ impl MergeReader { let mut hottest = self.hot.pop().unwrap(); let batch = hottest.fetch_batch(&mut self.metrics).await?; - Self::maybe_output_batch(batch, &mut self.output_batch, self.dedup, &mut self.metrics)?; + Self::maybe_output_batch( + batch, + &mut self.output_batch, + self.filter_deleted, + &mut self.metrics, + )?; self.reheap(hottest) } @@ -188,7 +203,7 @@ impl MergeReader { Self::maybe_output_batch( top.slice(0, pos), &mut self.output_batch, - self.dedup, + self.filter_deleted, &mut self.metrics, )?; top_node.skip_rows(pos, &mut self.metrics).await?; @@ -203,7 +218,7 @@ impl MergeReader { Self::maybe_output_batch( top.slice(0, duplicate_pos), &mut self.output_batch, - self.dedup, + self.filter_deleted, &mut self.metrics, )?; // This keep the duplicate timestamp in the node. @@ -228,7 +243,7 @@ impl MergeReader { Self::maybe_output_batch( top.slice(0, output_end), &mut self.output_batch, - self.dedup, + self.filter_deleted, &mut self.metrics, )?; top_node.skip_rows(output_end, &mut self.metrics).await?; @@ -318,21 +333,20 @@ impl MergeReader { Ok(()) } - /// Removeds deleted entries and sets the `batch` to the `output_batch`. + /// If `filter_deleted` is set to true, removes deleted entries and sets the `batch` to the `output_batch`. /// /// Ignores the `batch` if it is empty. fn maybe_output_batch( mut batch: Batch, output_batch: &mut Option, - dedup: bool, + filter_deleted: bool, metrics: &mut Metrics, ) -> Result<()> { debug_assert!(output_batch.is_none()); let num_rows = batch.num_rows(); - // If dedup is false, we don't expect delete happens and we skip checking whether there - // is any deleted entry. - if dedup { + + if filter_deleted { batch.filter_deleted()?; } // Update deleted rows metrics. @@ -354,6 +368,8 @@ pub struct MergeReaderBuilder { sources: Vec, /// Remove duplicate timestamps. Default is true. dedup: bool, + /// Remove deletion markers. + filter_deleted: bool, } impl MergeReaderBuilder { @@ -363,8 +379,16 @@ impl MergeReaderBuilder { } /// Creates a builder from sources. - pub fn from_sources(sources: Vec, dedup: bool) -> MergeReaderBuilder { - MergeReaderBuilder { sources, dedup } + pub fn from_sources( + sources: Vec, + dedup: bool, + filter_deleted: bool, + ) -> MergeReaderBuilder { + MergeReaderBuilder { + sources, + dedup, + filter_deleted, + } } /// Pushes a batch reader to sources. @@ -382,7 +406,7 @@ impl MergeReaderBuilder { /// Builds and initializes the reader, then resets the builder. pub async fn build(&mut self) -> Result { let sources = mem::take(&mut self.sources); - MergeReader::new(sources, self.dedup).await + MergeReader::new(sources, self.dedup, self.filter_deleted).await } } @@ -391,6 +415,7 @@ impl Default for MergeReaderBuilder { MergeReaderBuilder { sources: Vec::new(), dedup: true, + filter_deleted: true, } } } @@ -964,7 +989,7 @@ mod tests { Source::Reader(Box::new(reader1)), Source::Iter(Box::new(reader2)), ]; - let mut reader = MergeReaderBuilder::from_sources(sources, false) + let mut reader = MergeReaderBuilder::from_sources(sources, false, true) .build() .await .unwrap(); diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 111c737d5e..0ba6290c69 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -204,7 +204,7 @@ impl ScanRegion { /// Scan sequentially. pub(crate) fn seq_scan(self) -> Result { - let input = self.scan_input()?; + let input = self.scan_input(true)?; let seq_scan = SeqScan::new(input); Ok(seq_scan) @@ -212,14 +212,21 @@ impl ScanRegion { /// Unordered scan. pub(crate) fn unordered_scan(self) -> Result { - let input = self.scan_input()?; + let input = self.scan_input(true)?; let scan = UnorderedScan::new(input); Ok(scan) } + #[cfg(test)] + pub(crate) fn scan_without_filter_deleted(self) -> Result { + let input = self.scan_input(false)?; + let scan = SeqScan::new(input); + Ok(scan) + } + /// Creates a scan input. - fn scan_input(self) -> Result { + fn scan_input(self, filter_deleted: bool) -> Result { let time_range = self.build_time_range_predicate(); let ssts = &self.version.ssts; @@ -278,7 +285,8 @@ impl ScanRegion { .with_index_applier(index_applier) .with_parallelism(self.parallelism) .with_start_time(self.start_time) - .with_append_mode(self.version.options.append_mode); + .with_append_mode(self.version.options.append_mode) + .with_filter_deleted(filter_deleted); Ok(input) } @@ -383,6 +391,8 @@ pub(crate) struct ScanInput { pub(crate) query_start: Option, /// The region is using append mode. pub(crate) append_mode: bool, + /// Whether to remove deletion markers. + pub(crate) filter_deleted: bool, } impl ScanInput { @@ -402,6 +412,7 @@ impl ScanInput { index_applier: None, query_start: None, append_mode: false, + filter_deleted: true, } } @@ -474,6 +485,14 @@ impl ScanInput { self } + /// Sets whether to remove deletion markers during scan. + #[allow(unused)] + #[must_use] + pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self { + self.filter_deleted = filter_deleted; + self + } + /// Builds and returns sources to read. pub(crate) async fn build_sources(&self) -> Result> { let mut sources = Vec::with_capacity(self.memtables.len() + self.files.len()); diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index e77097dc42..2277a8df32 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -105,7 +105,8 @@ impl SeqScan { // Scans all memtables and SSTs. Builds a merge reader to merge results. let sources = self.input.build_sources().await?; let dedup = !self.input.append_mode; - let mut builder = MergeReaderBuilder::from_sources(sources, dedup); + let mut builder = + MergeReaderBuilder::from_sources(sources, dedup, self.input.filter_deleted); let reader = builder.build().await?; Ok(Box::new(reader)) } @@ -114,7 +115,8 @@ impl SeqScan { async fn build_parallel_reader(&self) -> Result { let sources = self.input.build_parallel_sources().await?; let dedup = !self.input.append_mode; - let mut builder = MergeReaderBuilder::from_sources(sources, dedup); + let mut builder = + MergeReaderBuilder::from_sources(sources, dedup, self.input.filter_deleted); let reader = builder.build().await?; Ok(Box::new(reader)) } diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 5efa9dae89..59debe15ac 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -14,6 +14,7 @@ //! Utilities for testing. +pub mod batch_util; pub mod memtable_util; pub mod meta_util; pub mod scheduler_util; diff --git a/src/mito2/src/test_util/batch_util.rs b/src/mito2/src/test_util/batch_util.rs new file mode 100644 index 0000000000..3c4e89b98f --- /dev/null +++ b/src/mito2/src/test_util/batch_util.rs @@ -0,0 +1,47 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_recordbatch::RecordBatches; +use datatypes::arrow::array::RecordBatch; +use datatypes::arrow::compute; +use datatypes::arrow::compute::SortColumn; +use datatypes::arrow::util::pretty; + +/// Sorts `batches` by column `names`. +pub fn sort_batches_and_print(batches: &RecordBatches, names: &[&str]) -> String { + let schema = batches.schema(); + let record_batches = batches.iter().map(|batch| batch.df_record_batch()); + let record_batch = compute::concat_batches(schema.arrow_schema(), record_batches).unwrap(); + let columns: Vec<_> = names + .iter() + .map(|name| { + let array = record_batch.column_by_name(name).unwrap(); + SortColumn { + values: array.clone(), + options: None, + } + }) + .collect(); + let indices = compute::lexsort_to_indices(&columns, None).unwrap(); + let columns = record_batch + .columns() + .iter() + .map(|array| compute::take(&array, &indices, None).unwrap()) + .collect(); + let record_batch = RecordBatch::try_new(record_batch.schema(), columns).unwrap(); + + pretty::pretty_format_batches(&[record_batch]) + .unwrap() + .to_string() +}