feat: add filter_deleted option to avoid removing deletion markers (#3707)

* feat: add `filter_deleted` scan option to avoid removing deletion markers.

* refactor: move sort_batches_and_print to test_util
This commit is contained in:
Lei, HUANG
2024-04-16 14:34:41 +08:00
committed by GitHub
parent d5a948a0a6
commit 5af87baeb0
8 changed files with 219 additions and 51 deletions

View File

@@ -31,6 +31,8 @@ mod create_test;
#[cfg(test)]
mod drop_test;
#[cfg(test)]
mod filter_deleted_test;
#[cfg(test)]
mod flush_test;
#[cfg(any(test, feature = "test"))]
pub mod listener;

View File

@@ -16,14 +16,12 @@
use api::v1::Rows;
use common_recordbatch::RecordBatches;
use datatypes::arrow::compute::{self, SortColumn};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::arrow::util::pretty;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionCompactRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::test_util::batch_util::sort_batches_and_print;
use crate::test_util::{
build_rows, build_rows_for_key, flush_region, put_rows, reopen_region, rows_schema,
CreateRequestBuilder, TestEnv,
@@ -191,31 +189,3 @@ async fn test_append_mode_compaction() {
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"]));
}
/// Sorts `batches` by column `names`.
fn sort_batches_and_print(batches: &RecordBatches, names: &[&str]) -> String {
let schema = batches.schema();
let record_batches = batches.iter().map(|batch| batch.df_record_batch());
let record_batch = compute::concat_batches(schema.arrow_schema(), record_batches).unwrap();
let columns: Vec<_> = names
.iter()
.map(|name| {
let array = record_batch.column_by_name(name).unwrap();
SortColumn {
values: array.clone(),
options: None,
}
})
.collect();
let indices = compute::lexsort_to_indices(&columns, None).unwrap();
let columns = record_batch
.columns()
.iter()
.map(|array| compute::take(&array, &indices, None).unwrap())
.collect();
let record_batch = RecordBatch::try_new(record_batch.schema(), columns).unwrap();
pretty::pretty_format_batches(&[record_batch])
.unwrap()
.to_string()
}

View File

@@ -0,0 +1,102 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::Rows;
use common_recordbatch::RecordBatches;
use store_api::region_engine::RegionEngine;
use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::test_util::batch_util::sort_batches_and_print;
use crate::test_util::{
build_rows, delete_rows, flush_region, put_rows, rows_schema, CreateRequestBuilder, TestEnv,
};
#[tokio::test]
async fn test_scan_without_filtering_deleted() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_files", "10")
.build();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// put 1, 2, 3, 4 and flush
put_rows(
&engine,
region_id,
Rows {
schema: column_schemas.clone(),
rows: build_rows(1, 5),
},
)
.await;
flush_region(&engine, region_id, None).await;
// delete 2, 3 and flush
delete_rows(
&engine,
region_id,
Rows {
schema: column_schemas.clone(),
rows: build_rows(2, 4),
},
)
.await;
flush_region(&engine, region_id, None).await;
// scan
let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 4 | 4.0 | 1970-01-01T00:00:04 |
+-------+---------+---------------------+";
assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"]));
// Tries to use seq scan to test it under append mode.
let scan = engine
.scan_region(region_id, ScanRequest::default())
.unwrap();
let seq_scan = scan.scan_without_filter_deleted().unwrap();
let stream = seq_scan.build_stream().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
| 3 | 3.0 | 1970-01-01T00:00:03 |
| 4 | 4.0 | 1970-01-01T00:00:04 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}

View File

@@ -51,6 +51,8 @@ pub struct MergeReader {
output_batch: Option<Batch>,
/// Remove duplicate timestamps.
dedup: bool,
/// Remove deletion markers
filter_deleted: bool,
/// Local metrics.
metrics: Metrics,
}
@@ -101,7 +103,11 @@ impl Drop for MergeReader {
impl MergeReader {
/// Creates and initializes a new [MergeReader].
pub async fn new(sources: Vec<Source>, dedup: bool) -> Result<MergeReader> {
pub async fn new(
sources: Vec<Source>,
dedup: bool,
filter_deleted: bool,
) -> Result<MergeReader> {
let start = Instant::now();
let mut metrics = Metrics::default();
@@ -115,11 +121,15 @@ impl MergeReader {
}
}
// If dedup is false, we don't expect delete happens and we skip filtering deletion markers.
let filter_deleted = filter_deleted && dedup;
let mut reader = MergeReader {
hot,
cold,
output_batch: None,
dedup,
filter_deleted,
metrics,
};
// Initializes the reader.
@@ -154,7 +164,12 @@ impl MergeReader {
let mut hottest = self.hot.pop().unwrap();
let batch = hottest.fetch_batch(&mut self.metrics).await?;
Self::maybe_output_batch(batch, &mut self.output_batch, self.dedup, &mut self.metrics)?;
Self::maybe_output_batch(
batch,
&mut self.output_batch,
self.filter_deleted,
&mut self.metrics,
)?;
self.reheap(hottest)
}
@@ -188,7 +203,7 @@ impl MergeReader {
Self::maybe_output_batch(
top.slice(0, pos),
&mut self.output_batch,
self.dedup,
self.filter_deleted,
&mut self.metrics,
)?;
top_node.skip_rows(pos, &mut self.metrics).await?;
@@ -203,7 +218,7 @@ impl MergeReader {
Self::maybe_output_batch(
top.slice(0, duplicate_pos),
&mut self.output_batch,
self.dedup,
self.filter_deleted,
&mut self.metrics,
)?;
// This keep the duplicate timestamp in the node.
@@ -228,7 +243,7 @@ impl MergeReader {
Self::maybe_output_batch(
top.slice(0, output_end),
&mut self.output_batch,
self.dedup,
self.filter_deleted,
&mut self.metrics,
)?;
top_node.skip_rows(output_end, &mut self.metrics).await?;
@@ -318,21 +333,20 @@ impl MergeReader {
Ok(())
}
/// Removeds deleted entries and sets the `batch` to the `output_batch`.
/// If `filter_deleted` is set to true, removes deleted entries and sets the `batch` to the `output_batch`.
///
/// Ignores the `batch` if it is empty.
fn maybe_output_batch(
mut batch: Batch,
output_batch: &mut Option<Batch>,
dedup: bool,
filter_deleted: bool,
metrics: &mut Metrics,
) -> Result<()> {
debug_assert!(output_batch.is_none());
let num_rows = batch.num_rows();
// If dedup is false, we don't expect delete happens and we skip checking whether there
// is any deleted entry.
if dedup {
if filter_deleted {
batch.filter_deleted()?;
}
// Update deleted rows metrics.
@@ -354,6 +368,8 @@ pub struct MergeReaderBuilder {
sources: Vec<Source>,
/// Remove duplicate timestamps. Default is true.
dedup: bool,
/// Remove deletion markers.
filter_deleted: bool,
}
impl MergeReaderBuilder {
@@ -363,8 +379,16 @@ impl MergeReaderBuilder {
}
/// Creates a builder from sources.
pub fn from_sources(sources: Vec<Source>, dedup: bool) -> MergeReaderBuilder {
MergeReaderBuilder { sources, dedup }
pub fn from_sources(
sources: Vec<Source>,
dedup: bool,
filter_deleted: bool,
) -> MergeReaderBuilder {
MergeReaderBuilder {
sources,
dedup,
filter_deleted,
}
}
/// Pushes a batch reader to sources.
@@ -382,7 +406,7 @@ impl MergeReaderBuilder {
/// Builds and initializes the reader, then resets the builder.
pub async fn build(&mut self) -> Result<MergeReader> {
let sources = mem::take(&mut self.sources);
MergeReader::new(sources, self.dedup).await
MergeReader::new(sources, self.dedup, self.filter_deleted).await
}
}
@@ -391,6 +415,7 @@ impl Default for MergeReaderBuilder {
MergeReaderBuilder {
sources: Vec::new(),
dedup: true,
filter_deleted: true,
}
}
}
@@ -964,7 +989,7 @@ mod tests {
Source::Reader(Box::new(reader1)),
Source::Iter(Box::new(reader2)),
];
let mut reader = MergeReaderBuilder::from_sources(sources, false)
let mut reader = MergeReaderBuilder::from_sources(sources, false, true)
.build()
.await
.unwrap();

View File

@@ -204,7 +204,7 @@ impl ScanRegion {
/// Scan sequentially.
pub(crate) fn seq_scan(self) -> Result<SeqScan> {
let input = self.scan_input()?;
let input = self.scan_input(true)?;
let seq_scan = SeqScan::new(input);
Ok(seq_scan)
@@ -212,14 +212,21 @@ impl ScanRegion {
/// Unordered scan.
pub(crate) fn unordered_scan(self) -> Result<UnorderedScan> {
let input = self.scan_input()?;
let input = self.scan_input(true)?;
let scan = UnorderedScan::new(input);
Ok(scan)
}
#[cfg(test)]
pub(crate) fn scan_without_filter_deleted(self) -> Result<SeqScan> {
let input = self.scan_input(false)?;
let scan = SeqScan::new(input);
Ok(scan)
}
/// Creates a scan input.
fn scan_input(self) -> Result<ScanInput> {
fn scan_input(self, filter_deleted: bool) -> Result<ScanInput> {
let time_range = self.build_time_range_predicate();
let ssts = &self.version.ssts;
@@ -278,7 +285,8 @@ impl ScanRegion {
.with_index_applier(index_applier)
.with_parallelism(self.parallelism)
.with_start_time(self.start_time)
.with_append_mode(self.version.options.append_mode);
.with_append_mode(self.version.options.append_mode)
.with_filter_deleted(filter_deleted);
Ok(input)
}
@@ -383,6 +391,8 @@ pub(crate) struct ScanInput {
pub(crate) query_start: Option<Instant>,
/// The region is using append mode.
pub(crate) append_mode: bool,
/// Whether to remove deletion markers.
pub(crate) filter_deleted: bool,
}
impl ScanInput {
@@ -402,6 +412,7 @@ impl ScanInput {
index_applier: None,
query_start: None,
append_mode: false,
filter_deleted: true,
}
}
@@ -474,6 +485,14 @@ impl ScanInput {
self
}
/// Sets whether to remove deletion markers during scan.
#[allow(unused)]
#[must_use]
pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
self.filter_deleted = filter_deleted;
self
}
/// Builds and returns sources to read.
pub(crate) async fn build_sources(&self) -> Result<Vec<Source>> {
let mut sources = Vec::with_capacity(self.memtables.len() + self.files.len());

View File

@@ -105,7 +105,8 @@ impl SeqScan {
// Scans all memtables and SSTs. Builds a merge reader to merge results.
let sources = self.input.build_sources().await?;
let dedup = !self.input.append_mode;
let mut builder = MergeReaderBuilder::from_sources(sources, dedup);
let mut builder =
MergeReaderBuilder::from_sources(sources, dedup, self.input.filter_deleted);
let reader = builder.build().await?;
Ok(Box::new(reader))
}
@@ -114,7 +115,8 @@ impl SeqScan {
async fn build_parallel_reader(&self) -> Result<BoxedBatchReader> {
let sources = self.input.build_parallel_sources().await?;
let dedup = !self.input.append_mode;
let mut builder = MergeReaderBuilder::from_sources(sources, dedup);
let mut builder =
MergeReaderBuilder::from_sources(sources, dedup, self.input.filter_deleted);
let reader = builder.build().await?;
Ok(Box::new(reader))
}

View File

@@ -14,6 +14,7 @@
//! Utilities for testing.
pub mod batch_util;
pub mod memtable_util;
pub mod meta_util;
pub mod scheduler_util;

View File

@@ -0,0 +1,47 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_recordbatch::RecordBatches;
use datatypes::arrow::array::RecordBatch;
use datatypes::arrow::compute;
use datatypes::arrow::compute::SortColumn;
use datatypes::arrow::util::pretty;
/// Sorts `batches` by column `names`.
pub fn sort_batches_and_print(batches: &RecordBatches, names: &[&str]) -> String {
let schema = batches.schema();
let record_batches = batches.iter().map(|batch| batch.df_record_batch());
let record_batch = compute::concat_batches(schema.arrow_schema(), record_batches).unwrap();
let columns: Vec<_> = names
.iter()
.map(|name| {
let array = record_batch.column_by_name(name).unwrap();
SortColumn {
values: array.clone(),
options: None,
}
})
.collect();
let indices = compute::lexsort_to_indices(&columns, None).unwrap();
let columns = record_batch
.columns()
.iter()
.map(|array| compute::take(&array, &indices, None).unwrap())
.collect();
let record_batch = RecordBatch::try_new(record_batch.schema(), columns).unwrap();
pretty::pretty_format_batches(&[record_batch])
.unwrap()
.to_string()
}