mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-28 00:42:56 +00:00
feat: Implements merge_mode region options (#4208)
* feat: add update_mode to region options * test: add test * feat: last not null iter * feat: time series last not null * feat: partition tree update mode * feat: partition tree * fix: last not null iter slice * test: add test for compaction * test: use second resolution * style: fix clippy * chore: merge two lines Co-authored-by: Jeremyhi <jiachun_feng@proton.me> * chore: address CR comments * refactor: UpdateMode -> MergeMode * refactor: LastNotNull -> LastNonNull * chore: return None earlier * feat: validate region options make merge mode optional and use default while it is None * test: fix tests --------- Co-authored-by: Jeremyhi <jiachun_feng@proton.me>
This commit is contained in:
@@ -24,6 +24,7 @@ use datatypes::schema::ColumnSchema;
|
||||
use mito2::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtable};
|
||||
use mito2::memtable::time_series::TimeSeriesMemtable;
|
||||
use mito2::memtable::{KeyValues, Memtable};
|
||||
use mito2::region::options::MergeMode;
|
||||
use mito2::test_util::memtable_util::{self, region_metadata_to_row_schema};
|
||||
use rand::rngs::ThreadRng;
|
||||
use rand::seq::SliceRandom;
|
||||
@@ -51,7 +52,7 @@ fn write_rows(c: &mut Criterion) {
|
||||
});
|
||||
});
|
||||
group.bench_function("time_series", |b| {
|
||||
let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None, true);
|
||||
let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None, true, MergeMode::LastRow);
|
||||
let kvs =
|
||||
memtable_util::build_key_values(&metadata, "hello".to_string(), 42, ×tamps, 1);
|
||||
b.iter(|| {
|
||||
@@ -83,7 +84,7 @@ fn full_scan(c: &mut Criterion) {
|
||||
});
|
||||
});
|
||||
group.bench_function("time_series", |b| {
|
||||
let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None, true);
|
||||
let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None, true, MergeMode::LastRow);
|
||||
for kvs in generator.iter() {
|
||||
memtable.write(&kvs).unwrap();
|
||||
}
|
||||
@@ -121,7 +122,7 @@ fn filter_1_host(c: &mut Criterion) {
|
||||
});
|
||||
});
|
||||
group.bench_function("time_series", |b| {
|
||||
let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None, true);
|
||||
let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None, true, MergeMode::LastRow);
|
||||
for kvs in generator.iter() {
|
||||
memtable.write(&kvs).unwrap();
|
||||
}
|
||||
|
||||
@@ -54,6 +54,7 @@ use crate::read::projection::ProjectionMapper;
|
||||
use crate::read::scan_region::ScanInput;
|
||||
use crate::read::seq_scan::SeqScan;
|
||||
use crate::read::BoxedBatchReader;
|
||||
use crate::region::options::MergeMode;
|
||||
use crate::region::version::{VersionControlRef, VersionRef};
|
||||
use crate::region::ManifestContextRef;
|
||||
use crate::request::{OptionOutputTx, OutputTx, WorkerRequest};
|
||||
@@ -453,31 +454,39 @@ pub struct SerializedCompactionOutput {
|
||||
output_time_range: Option<TimestampRange>,
|
||||
}
|
||||
|
||||
/// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order.
|
||||
async fn build_sst_reader(
|
||||
/// Builders to create [BoxedBatchReader] for compaction.
|
||||
struct CompactionSstReaderBuilder<'a> {
|
||||
metadata: RegionMetadataRef,
|
||||
sst_layer: AccessLayerRef,
|
||||
cache: Option<CacheManagerRef>,
|
||||
inputs: &[FileHandle],
|
||||
inputs: &'a [FileHandle],
|
||||
append_mode: bool,
|
||||
filter_deleted: bool,
|
||||
time_range: Option<TimestampRange>,
|
||||
) -> Result<BoxedBatchReader> {
|
||||
let mut scan_input = ScanInput::new(sst_layer, ProjectionMapper::all(&metadata)?)
|
||||
.with_files(inputs.to_vec())
|
||||
.with_append_mode(append_mode)
|
||||
.with_cache(cache)
|
||||
.with_filter_deleted(filter_deleted)
|
||||
// We ignore file not found error during compaction.
|
||||
.with_ignore_file_not_found(true);
|
||||
merge_mode: MergeMode,
|
||||
}
|
||||
|
||||
// This serves as a workaround of https://github.com/GreptimeTeam/greptimedb/issues/3944
|
||||
// by converting time ranges into predicate.
|
||||
if let Some(time_range) = time_range {
|
||||
scan_input = scan_input.with_predicate(time_range_to_predicate(time_range, &metadata)?);
|
||||
impl<'a> CompactionSstReaderBuilder<'a> {
|
||||
/// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order.
|
||||
async fn build_sst_reader(self) -> Result<BoxedBatchReader> {
|
||||
let mut scan_input = ScanInput::new(self.sst_layer, ProjectionMapper::all(&self.metadata)?)
|
||||
.with_files(self.inputs.to_vec())
|
||||
.with_append_mode(self.append_mode)
|
||||
.with_cache(self.cache)
|
||||
.with_filter_deleted(self.filter_deleted)
|
||||
// We ignore file not found error during compaction.
|
||||
.with_ignore_file_not_found(true)
|
||||
.with_merge_mode(self.merge_mode);
|
||||
|
||||
// This serves as a workaround of https://github.com/GreptimeTeam/greptimedb/issues/3944
|
||||
// by converting time ranges into predicate.
|
||||
if let Some(time_range) = self.time_range {
|
||||
scan_input =
|
||||
scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
|
||||
}
|
||||
|
||||
SeqScan::new(scan_input).build_reader().await
|
||||
}
|
||||
|
||||
SeqScan::new(scan_input).build_reader().await
|
||||
}
|
||||
|
||||
/// Converts time range to predicates so that rows outside the range will be filtered.
|
||||
|
||||
@@ -26,8 +26,8 @@ use store_api::storage::RegionId;
|
||||
|
||||
use crate::access_layer::{AccessLayer, AccessLayerRef, SstWriteRequest};
|
||||
use crate::cache::{CacheManager, CacheManagerRef};
|
||||
use crate::compaction::build_sst_reader;
|
||||
use crate::compaction::picker::{new_picker, PickerOutput};
|
||||
use crate::compaction::CompactionSstReaderBuilder;
|
||||
use crate::config::MitoConfig;
|
||||
use crate::error::{EmptyRegionDirSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result};
|
||||
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
|
||||
@@ -137,7 +137,8 @@ pub async fn open_compaction_region(
|
||||
let memtable_builder = MemtableBuilderProvider::new(None, Arc::new(mito_config.clone()))
|
||||
.builder_for_options(
|
||||
req.region_options.memtable.as_ref(),
|
||||
!req.region_options.append_mode,
|
||||
req.region_options.need_dedup(),
|
||||
req.region_options.merge_mode(),
|
||||
);
|
||||
|
||||
// Initial memtable id is 0.
|
||||
@@ -282,16 +283,19 @@ impl Compactor for DefaultCompactor {
|
||||
.index_options
|
||||
.clone();
|
||||
let append_mode = compaction_region.current_version.options.append_mode;
|
||||
let merge_mode = compaction_region.current_version.options.merge_mode();
|
||||
futs.push(async move {
|
||||
let reader = build_sst_reader(
|
||||
region_metadata.clone(),
|
||||
sst_layer.clone(),
|
||||
Some(cache_manager.clone()),
|
||||
&output.inputs,
|
||||
let reader = CompactionSstReaderBuilder {
|
||||
metadata: region_metadata.clone(),
|
||||
sst_layer: sst_layer.clone(),
|
||||
cache: Some(cache_manager.clone()),
|
||||
inputs: &output.inputs,
|
||||
append_mode,
|
||||
output.filter_deleted,
|
||||
output.output_time_range,
|
||||
)
|
||||
filter_deleted: output.filter_deleted,
|
||||
time_range: output.output_time_range,
|
||||
merge_mode,
|
||||
}
|
||||
.build_sst_reader()
|
||||
.await?;
|
||||
let file_meta_opt = sst_layer
|
||||
.write_sst(
|
||||
|
||||
@@ -260,6 +260,7 @@ mod tests {
|
||||
wal_options: Default::default(),
|
||||
index_options: Default::default(),
|
||||
memtable: None,
|
||||
merge_mode: None,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
@@ -39,6 +39,8 @@ mod flush_test;
|
||||
#[cfg(any(test, feature = "test"))]
|
||||
pub mod listener;
|
||||
#[cfg(test)]
|
||||
mod merge_mode_test;
|
||||
#[cfg(test)]
|
||||
mod open_test;
|
||||
#[cfg(test)]
|
||||
mod parallel_test;
|
||||
|
||||
@@ -113,7 +113,7 @@ async fn test_append_mode_compaction() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Flush 2 SSTs for compaction.
|
||||
// Flush 3 SSTs for compaction.
|
||||
// a, field 1, 2
|
||||
let rows = Rows {
|
||||
schema: column_schemas.clone(),
|
||||
|
||||
208
src/mito2/src/engine/merge_mode_test.rs
Normal file
208
src/mito2/src/engine/merge_mode_test.rs
Normal file
@@ -0,0 +1,208 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Tests for append mode.
|
||||
|
||||
use api::v1::Rows;
|
||||
use common_recordbatch::RecordBatches;
|
||||
use store_api::region_engine::RegionEngine;
|
||||
use store_api::region_request::{RegionCompactRequest, RegionRequest};
|
||||
use store_api::storage::{RegionId, ScanRequest};
|
||||
|
||||
use crate::config::MitoConfig;
|
||||
use crate::test_util::batch_util::sort_batches_and_print;
|
||||
use crate::test_util::{
|
||||
build_delete_rows_for_key, build_rows_with_fields, delete_rows, delete_rows_schema,
|
||||
flush_region, put_rows, reopen_region, rows_schema, CreateRequestBuilder, TestEnv,
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_merge_mode_write_query() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut env = TestEnv::new();
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new()
|
||||
.field_num(2)
|
||||
.insert_option("merge_mode", "last_non_null")
|
||||
.build();
|
||||
|
||||
let column_schemas = rows_schema(&request);
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let rows = build_rows_with_fields(
|
||||
"a",
|
||||
&[1, 2, 3],
|
||||
&[(Some(1), None), (None, None), (None, Some(3))],
|
||||
);
|
||||
let rows = Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows,
|
||||
};
|
||||
put_rows(&engine, region_id, rows).await;
|
||||
|
||||
let rows = build_rows_with_fields("a", &[2, 3], &[(Some(12), None), (Some(13), None)]);
|
||||
let rows = Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows,
|
||||
};
|
||||
put_rows(&engine, region_id, rows).await;
|
||||
|
||||
let rows = build_rows_with_fields("a", &[1, 2], &[(Some(11), None), (Some(22), Some(222))]);
|
||||
let rows = Rows {
|
||||
schema: column_schemas,
|
||||
rows,
|
||||
};
|
||||
put_rows(&engine, region_id, rows).await;
|
||||
|
||||
let request = ScanRequest::default();
|
||||
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
|
||||
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
let expected = "\
|
||||
+-------+---------+---------+---------------------+
|
||||
| tag_0 | field_0 | field_1 | ts |
|
||||
+-------+---------+---------+---------------------+
|
||||
| a | 11.0 | | 1970-01-01T00:00:01 |
|
||||
| a | 22.0 | 222.0 | 1970-01-01T00:00:02 |
|
||||
| a | 13.0 | 3.0 | 1970-01-01T00:00:03 |
|
||||
+-------+---------+---------+---------------------+";
|
||||
assert_eq!(expected, batches.pretty_print().unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_merge_mode_compaction() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut env = TestEnv::new();
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
scan_parallelism: 2,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
let region_id = RegionId::new(1, 1);
|
||||
|
||||
let request = CreateRequestBuilder::new()
|
||||
.field_num(2)
|
||||
.insert_option("compaction.type", "twcs")
|
||||
.insert_option("compaction.twcs.max_active_window_files", "2")
|
||||
.insert_option("compaction.twcs.max_inactive_window_files", "2")
|
||||
.insert_option("merge_mode", "last_non_null")
|
||||
.build();
|
||||
let region_dir = request.region_dir.clone();
|
||||
let region_opts = request.options.clone();
|
||||
let delete_schema = delete_rows_schema(&request);
|
||||
let column_schemas = rows_schema(&request);
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Flush 3 SSTs for compaction.
|
||||
// a, 1 => (1, null), 2 => (null, null), 3 => (null, 3), 4 => (4, 4)
|
||||
let rows = build_rows_with_fields(
|
||||
"a",
|
||||
&[1, 2, 3, 4],
|
||||
&[
|
||||
(Some(1), None),
|
||||
(None, None),
|
||||
(None, Some(3)),
|
||||
(Some(4), Some(4)),
|
||||
],
|
||||
);
|
||||
let rows = Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows,
|
||||
};
|
||||
put_rows(&engine, region_id, rows).await;
|
||||
flush_region(&engine, region_id, None).await;
|
||||
|
||||
// a, 1 => (null, 11), 2 => (2, null), 3 => (null, 13)
|
||||
let rows = build_rows_with_fields(
|
||||
"a",
|
||||
&[1, 2, 3],
|
||||
&[(None, Some(11)), (Some(2), None), (None, Some(13))],
|
||||
);
|
||||
let rows = Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows,
|
||||
};
|
||||
put_rows(&engine, region_id, rows).await;
|
||||
flush_region(&engine, region_id, None).await;
|
||||
|
||||
// Delete a, 4
|
||||
let rows = Rows {
|
||||
schema: delete_schema.clone(),
|
||||
rows: build_delete_rows_for_key("a", 4, 5),
|
||||
};
|
||||
delete_rows(&engine, region_id, rows).await;
|
||||
flush_region(&engine, region_id, None).await;
|
||||
|
||||
let output = engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Compact(RegionCompactRequest::default()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(output.affected_rows, 0);
|
||||
|
||||
// a, 1 => (21, null), 2 => (22, null)
|
||||
let rows = build_rows_with_fields("a", &[1, 2], &[(Some(21), None), (Some(22), None)]);
|
||||
let rows = Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows,
|
||||
};
|
||||
put_rows(&engine, region_id, rows).await;
|
||||
|
||||
let expected = "\
|
||||
+-------+---------+---------+---------------------+
|
||||
| tag_0 | field_0 | field_1 | ts |
|
||||
+-------+---------+---------+---------------------+
|
||||
| a | 21.0 | 11.0 | 1970-01-01T00:00:01 |
|
||||
| a | 22.0 | | 1970-01-01T00:00:02 |
|
||||
| a | | 13.0 | 1970-01-01T00:00:03 |
|
||||
+-------+---------+---------+---------------------+";
|
||||
// Scans in parallel.
|
||||
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
|
||||
assert_eq!(1, scanner.num_files());
|
||||
assert_eq!(1, scanner.num_memtables());
|
||||
let stream = scanner.scan().await.unwrap();
|
||||
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"]));
|
||||
|
||||
// Reopens engine with parallelism 1.
|
||||
let engine = env
|
||||
.reopen_engine(
|
||||
engine,
|
||||
MitoConfig {
|
||||
scan_parallelism: 1,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
// Reopens the region.
|
||||
reopen_region(&engine, region_id, region_dir, false, region_opts).await;
|
||||
let stream = engine
|
||||
.scan_to_stream(region_id, ScanRequest::default())
|
||||
.await
|
||||
.unwrap();
|
||||
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"]));
|
||||
}
|
||||
@@ -34,7 +34,7 @@ use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtable
|
||||
use crate::memtable::time_series::TimeSeriesMemtableBuilder;
|
||||
use crate::metrics::WRITE_BUFFER_BYTES;
|
||||
use crate::read::Batch;
|
||||
use crate::region::options::MemtableOptions;
|
||||
use crate::region::options::{MemtableOptions, MergeMode};
|
||||
|
||||
pub mod bulk;
|
||||
pub mod key_values;
|
||||
@@ -251,11 +251,13 @@ impl MemtableBuilderProvider {
|
||||
&self,
|
||||
options: Option<&MemtableOptions>,
|
||||
dedup: bool,
|
||||
merge_mode: MergeMode,
|
||||
) -> MemtableBuilderRef {
|
||||
match options {
|
||||
Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
|
||||
self.write_buffer_manager.clone(),
|
||||
dedup,
|
||||
merge_mode,
|
||||
)),
|
||||
Some(MemtableOptions::PartitionTree(opts)) => {
|
||||
Arc::new(PartitionTreeMemtableBuilder::new(
|
||||
@@ -264,15 +266,16 @@ impl MemtableBuilderProvider {
|
||||
data_freeze_threshold: opts.data_freeze_threshold,
|
||||
fork_dictionary_bytes: opts.fork_dictionary_bytes,
|
||||
dedup,
|
||||
merge_mode,
|
||||
},
|
||||
self.write_buffer_manager.clone(),
|
||||
))
|
||||
}
|
||||
None => self.default_memtable_builder(dedup),
|
||||
None => self.default_memtable_builder(dedup, merge_mode),
|
||||
}
|
||||
}
|
||||
|
||||
fn default_memtable_builder(&self, dedup: bool) -> MemtableBuilderRef {
|
||||
fn default_memtable_builder(&self, dedup: bool, merge_mode: MergeMode) -> MemtableBuilderRef {
|
||||
match &self.config.memtable {
|
||||
MemtableConfig::PartitionTree(config) => {
|
||||
let mut config = config.clone();
|
||||
@@ -285,6 +288,7 @@ impl MemtableBuilderProvider {
|
||||
MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(
|
||||
self.write_buffer_manager.clone(),
|
||||
dedup,
|
||||
merge_mode,
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,6 +43,7 @@ use crate::memtable::{
|
||||
AllocTracker, BoxedBatchIterator, BulkPart, IterBuilder, KeyValues, Memtable, MemtableBuilder,
|
||||
MemtableId, MemtableRange, MemtableRangeContext, MemtableRef, MemtableStats,
|
||||
};
|
||||
use crate::region::options::MergeMode;
|
||||
|
||||
/// Use `1/DICTIONARY_SIZE_FACTOR` of OS memory as dictionary size.
|
||||
pub(crate) const DICTIONARY_SIZE_FACTOR: u64 = 8;
|
||||
@@ -80,6 +81,9 @@ pub struct PartitionTreeConfig {
|
||||
pub dedup: bool,
|
||||
/// Total bytes of dictionary to keep in fork.
|
||||
pub fork_dictionary_bytes: ReadableSize,
|
||||
/// Merge mode of the tree.
|
||||
#[serde(skip_deserializing)]
|
||||
pub merge_mode: MergeMode,
|
||||
}
|
||||
|
||||
impl Default for PartitionTreeConfig {
|
||||
@@ -98,6 +102,7 @@ impl Default for PartitionTreeConfig {
|
||||
data_freeze_threshold: 131072,
|
||||
dedup: true,
|
||||
fork_dictionary_bytes,
|
||||
merge_mode: MergeMode::LastRow,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,7 +40,9 @@ use crate::memtable::partition_tree::partition::{
|
||||
use crate::memtable::partition_tree::PartitionTreeConfig;
|
||||
use crate::memtable::{BoxedBatchIterator, KeyValues};
|
||||
use crate::metrics::{PARTITION_TREE_READ_STAGE_ELAPSED, READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
|
||||
use crate::read::dedup::LastNonNullIter;
|
||||
use crate::read::Batch;
|
||||
use crate::region::options::MergeMode;
|
||||
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
|
||||
|
||||
/// The partition tree.
|
||||
@@ -83,9 +85,13 @@ impl PartitionTree {
|
||||
.collect(),
|
||||
};
|
||||
let is_partitioned = Partition::has_multi_partitions(&metadata);
|
||||
let mut config = config.clone();
|
||||
if config.merge_mode == MergeMode::LastNonNull {
|
||||
config.dedup = false;
|
||||
}
|
||||
|
||||
PartitionTree {
|
||||
config: config.clone(),
|
||||
config,
|
||||
metadata,
|
||||
row_codec: Arc::new(row_codec),
|
||||
partitions: Default::default(),
|
||||
@@ -237,7 +243,13 @@ impl PartitionTree {
|
||||
iter.fetch_next_partition(context)?;
|
||||
|
||||
iter.metrics.iter_elapsed += start.elapsed();
|
||||
Ok(Box::new(iter))
|
||||
|
||||
if self.config.merge_mode == MergeMode::LastNonNull {
|
||||
let iter = LastNonNullIter::new(iter);
|
||||
Ok(Box::new(iter))
|
||||
} else {
|
||||
Ok(Box::new(iter))
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if the tree is empty.
|
||||
|
||||
@@ -47,7 +47,9 @@ use crate::memtable::{
|
||||
MemtableId, MemtableRange, MemtableRangeContext, MemtableRef, MemtableStats,
|
||||
};
|
||||
use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
|
||||
use crate::read::dedup::LastNonNullIter;
|
||||
use crate::read::{Batch, BatchBuilder, BatchColumn};
|
||||
use crate::region::options::MergeMode;
|
||||
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
|
||||
|
||||
/// Initial vector builder capacity.
|
||||
@@ -58,14 +60,20 @@ const INITIAL_BUILDER_CAPACITY: usize = 0;
|
||||
pub struct TimeSeriesMemtableBuilder {
|
||||
write_buffer_manager: Option<WriteBufferManagerRef>,
|
||||
dedup: bool,
|
||||
merge_mode: MergeMode,
|
||||
}
|
||||
|
||||
impl TimeSeriesMemtableBuilder {
|
||||
/// Creates a new builder with specific `write_buffer_manager`.
|
||||
pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>, dedup: bool) -> Self {
|
||||
pub fn new(
|
||||
write_buffer_manager: Option<WriteBufferManagerRef>,
|
||||
dedup: bool,
|
||||
merge_mode: MergeMode,
|
||||
) -> Self {
|
||||
Self {
|
||||
write_buffer_manager,
|
||||
dedup,
|
||||
merge_mode,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -77,6 +85,7 @@ impl MemtableBuilder for TimeSeriesMemtableBuilder {
|
||||
id,
|
||||
self.write_buffer_manager.clone(),
|
||||
self.dedup,
|
||||
self.merge_mode,
|
||||
))
|
||||
}
|
||||
}
|
||||
@@ -91,6 +100,7 @@ pub struct TimeSeriesMemtable {
|
||||
max_timestamp: AtomicI64,
|
||||
min_timestamp: AtomicI64,
|
||||
dedup: bool,
|
||||
merge_mode: MergeMode,
|
||||
}
|
||||
|
||||
impl TimeSeriesMemtable {
|
||||
@@ -99,6 +109,7 @@ impl TimeSeriesMemtable {
|
||||
id: MemtableId,
|
||||
write_buffer_manager: Option<WriteBufferManagerRef>,
|
||||
dedup: bool,
|
||||
merge_mode: MergeMode,
|
||||
) -> Self {
|
||||
let row_codec = Arc::new(McmpRowCodec::new(
|
||||
region_metadata
|
||||
@@ -107,6 +118,11 @@ impl TimeSeriesMemtable {
|
||||
.collect(),
|
||||
));
|
||||
let series_set = SeriesSet::new(region_metadata.clone(), row_codec.clone());
|
||||
let dedup = if merge_mode == MergeMode::LastNonNull {
|
||||
false
|
||||
} else {
|
||||
dedup
|
||||
};
|
||||
Self {
|
||||
id,
|
||||
region_metadata,
|
||||
@@ -116,6 +132,7 @@ impl TimeSeriesMemtable {
|
||||
max_timestamp: AtomicI64::new(i64::MIN),
|
||||
min_timestamp: AtomicI64::new(i64::MAX),
|
||||
dedup,
|
||||
merge_mode,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -251,7 +268,13 @@ impl Memtable for TimeSeriesMemtable {
|
||||
let iter = self
|
||||
.series_set
|
||||
.iter_series(projection, filters, self.dedup)?;
|
||||
Ok(Box::new(iter))
|
||||
|
||||
if self.merge_mode == MergeMode::LastNonNull {
|
||||
let iter = LastNonNullIter::new(iter);
|
||||
Ok(Box::new(iter))
|
||||
} else {
|
||||
Ok(Box::new(iter))
|
||||
}
|
||||
}
|
||||
|
||||
fn ranges(
|
||||
@@ -272,6 +295,7 @@ impl Memtable for TimeSeriesMemtable {
|
||||
projection,
|
||||
predicate,
|
||||
dedup: self.dedup,
|
||||
merge_mode: self.merge_mode,
|
||||
});
|
||||
let context = Arc::new(MemtableRangeContext::new(self.id, builder));
|
||||
|
||||
@@ -320,6 +344,7 @@ impl Memtable for TimeSeriesMemtable {
|
||||
id,
|
||||
self.alloc_tracker.write_buffer_manager(),
|
||||
self.dedup,
|
||||
self.merge_mode,
|
||||
))
|
||||
}
|
||||
}
|
||||
@@ -856,6 +881,7 @@ struct TimeSeriesIterBuilder {
|
||||
projection: HashSet<ColumnId>,
|
||||
predicate: Option<Predicate>,
|
||||
dedup: bool,
|
||||
merge_mode: MergeMode,
|
||||
}
|
||||
|
||||
impl IterBuilder for TimeSeriesIterBuilder {
|
||||
@@ -865,7 +891,13 @@ impl IterBuilder for TimeSeriesIterBuilder {
|
||||
self.predicate.clone(),
|
||||
self.dedup,
|
||||
)?;
|
||||
Ok(Box::new(iter))
|
||||
|
||||
if self.merge_mode == MergeMode::LastNonNull {
|
||||
let iter = LastNonNullIter::new(iter);
|
||||
Ok(Box::new(iter))
|
||||
} else {
|
||||
Ok(Box::new(iter))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1234,7 +1266,7 @@ mod tests {
|
||||
fn check_memtable_dedup(dedup: bool) {
|
||||
let schema = schema_for_test();
|
||||
let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
|
||||
let memtable = TimeSeriesMemtable::new(schema, 42, None, dedup);
|
||||
let memtable = TimeSeriesMemtable::new(schema, 42, None, dedup, MergeMode::LastRow);
|
||||
memtable.write(&kvs).unwrap();
|
||||
memtable.write(&kvs).unwrap();
|
||||
|
||||
@@ -1283,7 +1315,7 @@ mod tests {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let schema = schema_for_test();
|
||||
let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
|
||||
let memtable = TimeSeriesMemtable::new(schema, 42, None, true);
|
||||
let memtable = TimeSeriesMemtable::new(schema, 42, None, true, MergeMode::LastRow);
|
||||
memtable.write(&kvs).unwrap();
|
||||
|
||||
let iter = memtable.iter(Some(&[3]), None).unwrap();
|
||||
|
||||
@@ -317,9 +317,9 @@ impl LastFieldsBuilder {
|
||||
self.contains_null = self.last_fields.iter().any(Value::is_null);
|
||||
}
|
||||
|
||||
/// Merges last not null fields, builds a new batch and resets the builder.
|
||||
/// Merges last non-null fields, builds a new batch and resets the builder.
|
||||
/// It may overwrites the last row of the `buffer`.
|
||||
fn merge_last_not_null(
|
||||
fn merge_last_non_null(
|
||||
&mut self,
|
||||
buffer: Batch,
|
||||
metrics: &mut DedupMetrics,
|
||||
@@ -380,20 +380,20 @@ impl LastFieldsBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
/// Dedup strategy that keeps the last not null field for the same key.
|
||||
/// Dedup strategy that keeps the last non-null field for the same key.
|
||||
///
|
||||
/// It assumes that batches from files and memtables don't contain duplicate rows
|
||||
/// and the merge reader never concatenates batches from different source.
|
||||
///
|
||||
/// We might implement a new strategy if we need to process files with duplicate rows.
|
||||
pub(crate) struct LastNotNull {
|
||||
pub(crate) struct LastNonNull {
|
||||
/// Buffered batch that fields in the last row may be updated.
|
||||
buffer: Option<Batch>,
|
||||
/// Fields that overlaps with the last row of the `buffer`.
|
||||
last_fields: LastFieldsBuilder,
|
||||
}
|
||||
|
||||
impl LastNotNull {
|
||||
impl LastNonNull {
|
||||
/// Creates a new strategy with the given `filter_deleted` flag.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn new(filter_deleted: bool) -> Self {
|
||||
@@ -404,7 +404,7 @@ impl LastNotNull {
|
||||
}
|
||||
}
|
||||
|
||||
impl DedupStrategy for LastNotNull {
|
||||
impl DedupStrategy for LastNonNull {
|
||||
fn push_batch(&mut self, batch: Batch, metrics: &mut DedupMetrics) -> Result<Option<Batch>> {
|
||||
if batch.is_empty() {
|
||||
return Ok(None);
|
||||
@@ -422,14 +422,14 @@ impl DedupStrategy for LastNotNull {
|
||||
if buffer.primary_key() != batch.primary_key() {
|
||||
// Next key is different.
|
||||
let buffer = std::mem::replace(buffer, batch);
|
||||
let merged = self.last_fields.merge_last_not_null(buffer, metrics)?;
|
||||
let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
|
||||
return Ok(merged);
|
||||
}
|
||||
|
||||
if buffer.last_timestamp() != batch.first_timestamp() {
|
||||
// The next batch has a different timestamp.
|
||||
let buffer = std::mem::replace(buffer, batch);
|
||||
let merged = self.last_fields.merge_last_not_null(buffer, metrics)?;
|
||||
let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
|
||||
return Ok(merged);
|
||||
}
|
||||
|
||||
@@ -449,7 +449,7 @@ impl DedupStrategy for LastNotNull {
|
||||
// Moves the remaining rows to the buffer.
|
||||
let batch = batch.slice(1, batch.num_rows() - 1);
|
||||
let buffer = std::mem::replace(buffer, batch);
|
||||
let merged = self.last_fields.merge_last_not_null(buffer, metrics)?;
|
||||
let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
|
||||
|
||||
Ok(merged)
|
||||
}
|
||||
@@ -462,12 +462,107 @@ impl DedupStrategy for LastNotNull {
|
||||
// Initializes last fields with the first buffer.
|
||||
self.last_fields.maybe_init(&buffer);
|
||||
|
||||
let merged = self.last_fields.merge_last_not_null(buffer, metrics)?;
|
||||
let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
|
||||
|
||||
Ok(merged)
|
||||
}
|
||||
}
|
||||
|
||||
/// An iterator that dedup rows by [LastNonNull] strategy.
|
||||
/// The input iterator must returns sorted batches.
|
||||
pub(crate) struct LastNonNullIter<I> {
|
||||
/// Inner iterator that returns sorted batches.
|
||||
iter: Option<I>,
|
||||
/// Dedup strategy.
|
||||
strategy: LastNonNull,
|
||||
/// Dedup metrics.
|
||||
metrics: DedupMetrics,
|
||||
/// The current batch returned by the iterator. If it is None, we need to
|
||||
/// fetch a new batch.
|
||||
/// The batch is always not empty.
|
||||
current_batch: Option<Batch>,
|
||||
}
|
||||
|
||||
impl<I> LastNonNullIter<I> {
|
||||
/// Creates a new iterator with the given inner iterator.
|
||||
pub(crate) fn new(iter: I) -> Self {
|
||||
Self {
|
||||
iter: Some(iter),
|
||||
// We only use the iter in memtables. Memtables never filter deleted.
|
||||
strategy: LastNonNull::new(false),
|
||||
metrics: DedupMetrics::default(),
|
||||
current_batch: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Finds the index of the first row that has the same timestamp with the next row.
|
||||
/// If no duplicate rows, returns None.
|
||||
fn find_split_index(batch: &Batch) -> Option<usize> {
|
||||
if batch.num_rows() < 2 {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Safety: The batch is not empty.
|
||||
let timestamps = batch.timestamps_native().unwrap();
|
||||
timestamps.windows(2).position(|t| t[0] == t[1])
|
||||
}
|
||||
}
|
||||
|
||||
impl<I: Iterator<Item = Result<Batch>>> LastNonNullIter<I> {
|
||||
/// Fetches the next batch from the inner iterator. It will slice the batch if it
|
||||
/// contains duplicate rows.
|
||||
fn next_batch_for_merge(&mut self) -> Result<Option<Batch>> {
|
||||
if self.current_batch.is_none() {
|
||||
// No current batch. Fetches a new batch from the inner iterator.
|
||||
let Some(iter) = self.iter.as_mut() else {
|
||||
// The iterator is exhausted.
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
self.current_batch = iter.next().transpose()?;
|
||||
if self.current_batch.is_none() {
|
||||
// The iterator is exhausted.
|
||||
self.iter = None;
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(batch) = &self.current_batch {
|
||||
let Some(index) = Self::find_split_index(batch) else {
|
||||
// No duplicate rows in the current batch.
|
||||
return Ok(self.current_batch.take());
|
||||
};
|
||||
|
||||
let first = batch.slice(0, index + 1);
|
||||
let batch = batch.slice(index + 1, batch.num_rows() - index - 1);
|
||||
// `index` is Some indicates that the batch has at least one row remaining.
|
||||
debug_assert!(!batch.is_empty());
|
||||
self.current_batch = Some(batch);
|
||||
return Ok(Some(first));
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn next_batch(&mut self) -> Result<Option<Batch>> {
|
||||
while let Some(batch) = self.next_batch_for_merge()? {
|
||||
if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? {
|
||||
return Ok(Some(batch));
|
||||
}
|
||||
}
|
||||
|
||||
self.strategy.finish(&mut self.metrics)
|
||||
}
|
||||
}
|
||||
|
||||
impl<I: Iterator<Item = Result<Batch>>> Iterator for LastNonNullIter<I> {
|
||||
type Item = Result<Batch>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.next_batch().transpose()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
@@ -506,9 +601,9 @@ mod tests {
|
||||
assert_eq!(0, reader.metrics().num_unselected_rows);
|
||||
assert_eq!(0, reader.metrics().num_deleted_rows);
|
||||
|
||||
// Test last not null.
|
||||
// Test last non-null.
|
||||
let reader = VecBatchReader::new(&input);
|
||||
let mut reader = DedupReader::new(reader, LastNotNull::new(true));
|
||||
let mut reader = DedupReader::new(reader, LastNonNull::new(true));
|
||||
check_reader_result(&mut reader, &input).await;
|
||||
assert_eq!(0, reader.metrics().num_unselected_rows);
|
||||
assert_eq!(0, reader.metrics().num_deleted_rows);
|
||||
@@ -640,7 +735,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_last_not_null_merge() {
|
||||
async fn test_last_non_null_merge() {
|
||||
let input = [
|
||||
new_batch_multi_fields(
|
||||
b"k1",
|
||||
@@ -688,7 +783,7 @@ mod tests {
|
||||
|
||||
// Filter deleted.
|
||||
let reader = VecBatchReader::new(&input);
|
||||
let mut reader = DedupReader::new(reader, LastNotNull::new(true));
|
||||
let mut reader = DedupReader::new(reader, LastNonNull::new(true));
|
||||
check_reader_result(
|
||||
&mut reader,
|
||||
&[
|
||||
@@ -722,7 +817,7 @@ mod tests {
|
||||
|
||||
// Does not filter deleted.
|
||||
let reader = VecBatchReader::new(&input);
|
||||
let mut reader = DedupReader::new(reader, LastNotNull::new(false));
|
||||
let mut reader = DedupReader::new(reader, LastNonNull::new(false));
|
||||
check_reader_result(
|
||||
&mut reader,
|
||||
&[
|
||||
@@ -762,7 +857,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_last_not_null_skip_merge_single() {
|
||||
async fn test_last_non_null_skip_merge_single() {
|
||||
let input = [new_batch_multi_fields(
|
||||
b"k1",
|
||||
&[1, 2, 3],
|
||||
@@ -772,7 +867,7 @@ mod tests {
|
||||
)];
|
||||
|
||||
let reader = VecBatchReader::new(&input);
|
||||
let mut reader = DedupReader::new(reader, LastNotNull::new(true));
|
||||
let mut reader = DedupReader::new(reader, LastNonNull::new(true));
|
||||
check_reader_result(
|
||||
&mut reader,
|
||||
&[new_batch_multi_fields(
|
||||
@@ -788,14 +883,14 @@ mod tests {
|
||||
assert_eq!(1, reader.metrics().num_deleted_rows);
|
||||
|
||||
let reader = VecBatchReader::new(&input);
|
||||
let mut reader = DedupReader::new(reader, LastNotNull::new(false));
|
||||
let mut reader = DedupReader::new(reader, LastNonNull::new(false));
|
||||
check_reader_result(&mut reader, &input).await;
|
||||
assert_eq!(0, reader.metrics().num_unselected_rows);
|
||||
assert_eq!(0, reader.metrics().num_deleted_rows);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_last_not_null_skip_merge_no_null() {
|
||||
async fn test_last_non_null_skip_merge_no_null() {
|
||||
let input = [
|
||||
new_batch_multi_fields(
|
||||
b"k1",
|
||||
@@ -815,7 +910,7 @@ mod tests {
|
||||
];
|
||||
|
||||
let reader = VecBatchReader::new(&input);
|
||||
let mut reader = DedupReader::new(reader, LastNotNull::new(true));
|
||||
let mut reader = DedupReader::new(reader, LastNonNull::new(true));
|
||||
check_reader_result(
|
||||
&mut reader,
|
||||
&[
|
||||
@@ -835,7 +930,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_last_not_null_merge_null() {
|
||||
async fn test_last_non_null_merge_null() {
|
||||
let input = [
|
||||
new_batch_multi_fields(
|
||||
b"k1",
|
||||
@@ -849,7 +944,7 @@ mod tests {
|
||||
];
|
||||
|
||||
let reader = VecBatchReader::new(&input);
|
||||
let mut reader = DedupReader::new(reader, LastNotNull::new(true));
|
||||
let mut reader = DedupReader::new(reader, LastNonNull::new(true));
|
||||
check_reader_result(
|
||||
&mut reader,
|
||||
&[
|
||||
@@ -884,7 +979,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_last_not_null_strategy_delete_last() {
|
||||
fn test_last_non_null_strategy_delete_last() {
|
||||
let input = [
|
||||
new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
|
||||
new_batch_multi_fields(
|
||||
@@ -905,7 +1000,7 @@ mod tests {
|
||||
new_batch_multi_fields(b"k2", &[3], &[3], &[OpType::Put], &[(None, Some(3))]),
|
||||
];
|
||||
|
||||
let mut strategy = LastNotNull::new(true);
|
||||
let mut strategy = LastNonNull::new(true);
|
||||
check_dedup_strategy(
|
||||
&input,
|
||||
&mut strategy,
|
||||
@@ -918,13 +1013,13 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_last_not_null_strategy_delete_one() {
|
||||
fn test_last_non_null_strategy_delete_one() {
|
||||
let input = [
|
||||
new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Delete], &[(None, None)]),
|
||||
new_batch_multi_fields(b"k2", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
|
||||
];
|
||||
|
||||
let mut strategy = LastNotNull::new(true);
|
||||
let mut strategy = LastNonNull::new(true);
|
||||
check_dedup_strategy(
|
||||
&input,
|
||||
&mut strategy,
|
||||
@@ -939,18 +1034,18 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_last_not_null_strategy_delete_all() {
|
||||
fn test_last_non_null_strategy_delete_all() {
|
||||
let input = [
|
||||
new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Delete], &[(None, None)]),
|
||||
new_batch_multi_fields(b"k2", &[1], &[6], &[OpType::Delete], &[(Some(11), None)]),
|
||||
];
|
||||
|
||||
let mut strategy = LastNotNull::new(true);
|
||||
let mut strategy = LastNonNull::new(true);
|
||||
check_dedup_strategy(&input, &mut strategy, &[]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_last_not_null_strategy_same_batch() {
|
||||
fn test_last_non_null_strategy_same_batch() {
|
||||
let input = [
|
||||
new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
|
||||
new_batch_multi_fields(
|
||||
@@ -971,7 +1066,7 @@ mod tests {
|
||||
new_batch_multi_fields(b"k1", &[3], &[3], &[OpType::Put], &[(None, Some(3))]),
|
||||
];
|
||||
|
||||
let mut strategy = LastNotNull::new(true);
|
||||
let mut strategy = LastNonNull::new(true);
|
||||
check_dedup_strategy(
|
||||
&input,
|
||||
&mut strategy,
|
||||
@@ -982,4 +1077,92 @@ mod tests {
|
||||
],
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_last_non_null_iter_on_batch() {
|
||||
let input = [new_batch_multi_fields(
|
||||
b"k1",
|
||||
&[1, 1, 2],
|
||||
&[13, 12, 13],
|
||||
&[OpType::Put, OpType::Put, OpType::Put],
|
||||
&[(None, None), (Some(1), None), (Some(2), Some(22))],
|
||||
)];
|
||||
let iter = input.into_iter().map(Ok);
|
||||
let iter = LastNonNullIter::new(iter);
|
||||
let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
|
||||
let expect = [
|
||||
new_batch_multi_fields(b"k1", &[1], &[13], &[OpType::Put], &[(Some(1), None)]),
|
||||
new_batch_multi_fields(b"k1", &[2], &[13], &[OpType::Put], &[(Some(2), Some(22))]),
|
||||
];
|
||||
assert_eq!(&expect, &actual[..]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_last_non_null_iter_same_row() {
|
||||
let input = [
|
||||
new_batch_multi_fields(
|
||||
b"k1",
|
||||
&[1, 1, 1],
|
||||
&[13, 12, 11],
|
||||
&[OpType::Put, OpType::Put, OpType::Put],
|
||||
&[(None, None), (Some(1), None), (Some(11), None)],
|
||||
),
|
||||
new_batch_multi_fields(
|
||||
b"k1",
|
||||
&[1, 1],
|
||||
&[10, 9],
|
||||
&[OpType::Put, OpType::Put],
|
||||
&[(None, Some(11)), (Some(21), Some(31))],
|
||||
),
|
||||
];
|
||||
let iter = input.into_iter().map(Ok);
|
||||
let iter = LastNonNullIter::new(iter);
|
||||
let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
|
||||
let expect = [new_batch_multi_fields(
|
||||
b"k1",
|
||||
&[1],
|
||||
&[13],
|
||||
&[OpType::Put],
|
||||
&[(Some(1), Some(11))],
|
||||
)];
|
||||
assert_eq!(&expect, &actual[..]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_last_non_null_iter_multi_batch() {
|
||||
let input = [
|
||||
new_batch_multi_fields(
|
||||
b"k1",
|
||||
&[1, 1, 2],
|
||||
&[13, 12, 13],
|
||||
&[OpType::Put, OpType::Put, OpType::Put],
|
||||
&[(None, None), (Some(1), None), (Some(2), Some(22))],
|
||||
),
|
||||
new_batch_multi_fields(
|
||||
b"k1",
|
||||
&[2, 3],
|
||||
&[12, 13],
|
||||
&[OpType::Put, OpType::Delete],
|
||||
&[(None, Some(12)), (None, None)],
|
||||
),
|
||||
new_batch_multi_fields(
|
||||
b"k2",
|
||||
&[1, 1, 2],
|
||||
&[13, 12, 13],
|
||||
&[OpType::Put, OpType::Put, OpType::Put],
|
||||
&[(None, None), (Some(1), None), (Some(2), Some(22))],
|
||||
),
|
||||
];
|
||||
let iter = input.into_iter().map(Ok);
|
||||
let iter = LastNonNullIter::new(iter);
|
||||
let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
|
||||
let expect = [
|
||||
new_batch_multi_fields(b"k1", &[1], &[13], &[OpType::Put], &[(Some(1), None)]),
|
||||
new_batch_multi_fields(b"k1", &[2], &[13], &[OpType::Put], &[(Some(2), Some(22))]),
|
||||
new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Delete], &[(None, None)]),
|
||||
new_batch_multi_fields(b"k2", &[1], &[13], &[OpType::Put], &[(Some(1), None)]),
|
||||
new_batch_multi_fields(b"k2", &[2], &[13], &[OpType::Put], &[(Some(2), Some(22))]),
|
||||
];
|
||||
assert_eq!(&expect, &actual[..]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,6 +42,7 @@ use crate::read::projection::ProjectionMapper;
|
||||
use crate::read::seq_scan::SeqScan;
|
||||
use crate::read::unordered_scan::UnorderedScan;
|
||||
use crate::read::{Batch, Source};
|
||||
use crate::region::options::MergeMode;
|
||||
use crate::region::version::VersionRef;
|
||||
use crate::sst::file::{overlaps, FileHandle, FileMeta};
|
||||
use crate::sst::index::applier::builder::SstIndexApplierBuilder;
|
||||
@@ -295,7 +296,8 @@ impl ScanRegion {
|
||||
.with_parallelism(self.parallelism)
|
||||
.with_start_time(self.start_time)
|
||||
.with_append_mode(self.version.options.append_mode)
|
||||
.with_filter_deleted(filter_deleted);
|
||||
.with_filter_deleted(filter_deleted)
|
||||
.with_merge_mode(self.version.options.merge_mode());
|
||||
Ok(input)
|
||||
}
|
||||
|
||||
@@ -398,6 +400,8 @@ pub(crate) struct ScanInput {
|
||||
pub(crate) append_mode: bool,
|
||||
/// Whether to remove deletion markers.
|
||||
pub(crate) filter_deleted: bool,
|
||||
/// Mode to merge duplicate rows.
|
||||
pub(crate) merge_mode: MergeMode,
|
||||
}
|
||||
|
||||
impl ScanInput {
|
||||
@@ -418,6 +422,7 @@ impl ScanInput {
|
||||
query_start: None,
|
||||
append_mode: false,
|
||||
filter_deleted: true,
|
||||
merge_mode: MergeMode::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -497,6 +502,13 @@ impl ScanInput {
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the merge mode.
|
||||
#[must_use]
|
||||
pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
|
||||
self.merge_mode = merge_mode;
|
||||
self
|
||||
}
|
||||
|
||||
/// Scans sources in parallel.
|
||||
///
|
||||
/// # Panics if the input doesn't allow parallel scan.
|
||||
|
||||
@@ -35,12 +35,13 @@ use tokio::sync::Semaphore;
|
||||
|
||||
use crate::error::{PartitionOutOfRangeSnafu, Result};
|
||||
use crate::memtable::MemtableRef;
|
||||
use crate::read::dedup::{DedupReader, LastRow};
|
||||
use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
|
||||
use crate::read::merge::MergeReaderBuilder;
|
||||
use crate::read::scan_region::{
|
||||
FileRangeCollector, ScanInput, ScanPart, ScanPartList, StreamContext,
|
||||
};
|
||||
use crate::read::{BatchReader, BoxedBatchReader, ScannerMetrics, Source};
|
||||
use crate::region::options::MergeMode;
|
||||
use crate::sst::file::FileMeta;
|
||||
use crate::sst::parquet::file_range::FileRange;
|
||||
use crate::sst::parquet::reader::ReaderMetrics;
|
||||
@@ -210,10 +211,16 @@ impl SeqScan {
|
||||
|
||||
let dedup = !stream_ctx.input.append_mode;
|
||||
if dedup {
|
||||
let reader = Box::new(DedupReader::new(
|
||||
reader,
|
||||
LastRow::new(stream_ctx.input.filter_deleted),
|
||||
));
|
||||
let reader = match stream_ctx.input.merge_mode {
|
||||
MergeMode::LastRow => Box::new(DedupReader::new(
|
||||
reader,
|
||||
LastRow::new(stream_ctx.input.filter_deleted),
|
||||
)) as _,
|
||||
MergeMode::LastNonNull => Box::new(DedupReader::new(
|
||||
reader,
|
||||
LastNonNull::new(stream_ctx.input.filter_deleted),
|
||||
)) as _,
|
||||
};
|
||||
Ok(Some(reader))
|
||||
} else {
|
||||
let reader = Box::new(reader);
|
||||
|
||||
@@ -119,9 +119,10 @@ impl RegionOpener {
|
||||
}
|
||||
|
||||
/// Sets options for the region.
|
||||
pub(crate) fn options(mut self, options: RegionOptions) -> Self {
|
||||
pub(crate) fn options(mut self, options: RegionOptions) -> Result<Self> {
|
||||
options.validate()?;
|
||||
self.options = Some(options);
|
||||
self
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
/// Sets the cache manager for the region.
|
||||
@@ -192,9 +193,11 @@ impl RegionOpener {
|
||||
)
|
||||
.await?;
|
||||
|
||||
let memtable_builder = self
|
||||
.memtable_builder_provider
|
||||
.builder_for_options(options.memtable.as_ref(), !options.append_mode);
|
||||
let memtable_builder = self.memtable_builder_provider.builder_for_options(
|
||||
options.memtable.as_ref(),
|
||||
options.need_dedup(),
|
||||
options.merge_mode(),
|
||||
);
|
||||
// Initial memtable id is 0.
|
||||
let part_duration = options.compaction.time_window();
|
||||
let mutable = Arc::new(TimePartitions::new(
|
||||
@@ -323,7 +326,8 @@ impl RegionOpener {
|
||||
));
|
||||
let memtable_builder = self.memtable_builder_provider.builder_for_options(
|
||||
region_options.memtable.as_ref(),
|
||||
!region_options.append_mode,
|
||||
region_options.need_dedup(),
|
||||
region_options.merge_mode(),
|
||||
);
|
||||
// Initial memtable id is 0.
|
||||
let part_duration = region_options.compaction.time_window();
|
||||
|
||||
@@ -24,15 +24,28 @@ use common_wal::options::{WalOptions, WAL_OPTIONS_KEY};
|
||||
use serde::de::Error as _;
|
||||
use serde::{Deserialize, Deserializer, Serialize};
|
||||
use serde_json::Value;
|
||||
use serde_with::{serde_as, with_prefix, DisplayFromStr};
|
||||
use serde_with::{serde_as, with_prefix, DisplayFromStr, NoneAsEmptyString};
|
||||
use snafu::{ensure, ResultExt};
|
||||
use store_api::storage::ColumnId;
|
||||
use strum::EnumString;
|
||||
|
||||
use crate::error::{Error, InvalidRegionOptionsSnafu, JsonOptionsSnafu, Result};
|
||||
use crate::memtable::partition_tree::{DEFAULT_FREEZE_THRESHOLD, DEFAULT_MAX_KEYS_PER_SHARD};
|
||||
|
||||
const DEFAULT_INDEX_SEGMENT_ROW_COUNT: usize = 1024;
|
||||
|
||||
/// Mode to handle duplicate rows while merging.
|
||||
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, EnumString)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
#[strum(serialize_all = "snake_case")]
|
||||
pub enum MergeMode {
|
||||
/// Keeps the last row.
|
||||
#[default]
|
||||
LastRow,
|
||||
/// Keeps the last non-null field for each row.
|
||||
LastNonNull,
|
||||
}
|
||||
|
||||
/// Options that affect the entire region.
|
||||
///
|
||||
/// Users need to specify the options while creating/opening a region.
|
||||
@@ -54,6 +67,34 @@ pub struct RegionOptions {
|
||||
pub index_options: IndexOptions,
|
||||
/// Memtable options.
|
||||
pub memtable: Option<MemtableOptions>,
|
||||
/// The mode to merge duplicate rows.
|
||||
/// Only takes effect when `append_mode` is `false`.
|
||||
pub merge_mode: Option<MergeMode>,
|
||||
}
|
||||
|
||||
impl RegionOptions {
|
||||
/// Validates options.
|
||||
pub fn validate(&self) -> Result<()> {
|
||||
if self.append_mode {
|
||||
ensure!(
|
||||
self.merge_mode.is_none(),
|
||||
InvalidRegionOptionsSnafu {
|
||||
reason: "merge_mode is not allowed when append_mode is enabled",
|
||||
}
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns `true` if deduplication is needed.
|
||||
pub fn need_dedup(&self) -> bool {
|
||||
!self.append_mode
|
||||
}
|
||||
|
||||
/// Returns the `merge_mode` if it is set, otherwise returns the default `MergeMode`.
|
||||
pub fn merge_mode(&self) -> MergeMode {
|
||||
self.merge_mode.unwrap_or_default()
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&HashMap<String, String>> for RegionOptions {
|
||||
@@ -89,7 +130,7 @@ impl TryFrom<&HashMap<String, String>> for RegionOptions {
|
||||
None
|
||||
};
|
||||
|
||||
Ok(RegionOptions {
|
||||
let opts = RegionOptions {
|
||||
ttl: options.ttl,
|
||||
compaction,
|
||||
storage: options.storage,
|
||||
@@ -97,7 +138,11 @@ impl TryFrom<&HashMap<String, String>> for RegionOptions {
|
||||
wal_options,
|
||||
index_options,
|
||||
memtable,
|
||||
})
|
||||
merge_mode: options.merge_mode,
|
||||
};
|
||||
opts.validate()?;
|
||||
|
||||
Ok(opts)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -179,6 +224,8 @@ struct RegionOptionsWithoutEnum {
|
||||
storage: Option<String>,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
append_mode: bool,
|
||||
#[serde_as(as = "NoneAsEmptyString")]
|
||||
merge_mode: Option<MergeMode>,
|
||||
}
|
||||
|
||||
impl Default for RegionOptionsWithoutEnum {
|
||||
@@ -188,6 +235,7 @@ impl Default for RegionOptionsWithoutEnum {
|
||||
ttl: options.ttl,
|
||||
storage: options.storage,
|
||||
append_mode: options.append_mode,
|
||||
merge_mode: options.merge_mode,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -477,6 +525,21 @@ mod tests {
|
||||
assert_eq!(StatusCode::InvalidArguments, err.status_code());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_with_merge_mode() {
|
||||
let map = make_map(&[("merge_mode", "last_row")]);
|
||||
let options = RegionOptions::try_from(&map).unwrap();
|
||||
assert_eq!(MergeMode::LastRow, options.merge_mode());
|
||||
|
||||
let map = make_map(&[("merge_mode", "last_non_null")]);
|
||||
let options = RegionOptions::try_from(&map).unwrap();
|
||||
assert_eq!(MergeMode::LastNonNull, options.merge_mode());
|
||||
|
||||
let map = make_map(&[("merge_mode", "unknown")]);
|
||||
let err = RegionOptions::try_from(&map).unwrap_err();
|
||||
assert_eq!(StatusCode::InvalidArguments, err.status_code());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_with_all() {
|
||||
let wal_options = WalOptions::Kafka(KafkaWalOptions {
|
||||
@@ -489,7 +552,7 @@ mod tests {
|
||||
("compaction.twcs.time_window", "2h"),
|
||||
("compaction.type", "twcs"),
|
||||
("storage", "S3"),
|
||||
("append_mode", "true"),
|
||||
("append_mode", "false"),
|
||||
("index.inverted_index.ignore_column_ids", "1,2,3"),
|
||||
("index.inverted_index.segment_row_count", "512"),
|
||||
(
|
||||
@@ -500,6 +563,7 @@ mod tests {
|
||||
("memtable.partition_tree.index_max_keys_per_shard", "2048"),
|
||||
("memtable.partition_tree.data_freeze_threshold", "2048"),
|
||||
("memtable.partition_tree.fork_dictionary_bytes", "128M"),
|
||||
("merge_mode", "last_non_null"),
|
||||
]);
|
||||
let options = RegionOptions::try_from(&map).unwrap();
|
||||
let expect = RegionOptions {
|
||||
@@ -510,7 +574,7 @@ mod tests {
|
||||
time_window: Some(Duration::from_secs(3600 * 2)),
|
||||
}),
|
||||
storage: Some("S3".to_string()),
|
||||
append_mode: true,
|
||||
append_mode: false,
|
||||
wal_options,
|
||||
index_options: IndexOptions {
|
||||
inverted_index: InvertedIndexOptions {
|
||||
@@ -523,6 +587,7 @@ mod tests {
|
||||
data_freeze_threshold: 2048,
|
||||
fork_dictionary_bytes: ReadableSize::mb(128),
|
||||
})),
|
||||
merge_mode: Some(MergeMode::LastNonNull),
|
||||
};
|
||||
assert_eq!(expect, options);
|
||||
}
|
||||
|
||||
@@ -920,6 +920,38 @@ pub fn build_rows(start: usize, end: usize) -> Vec<Row> {
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Build rows with schema (string, f64, f64, ts_millis).
|
||||
/// - `key`: A string key that is common across all rows.
|
||||
/// - `timestamps`: Array of timestamp values.
|
||||
/// - `fields`: Array of tuples where each tuple contains two optional i64 values, representing two optional float fields.
|
||||
/// Returns a vector of `Row` each containing the key, two optional float fields, and a timestamp.
|
||||
pub fn build_rows_with_fields(
|
||||
key: &str,
|
||||
timestamps: &[i64],
|
||||
fields: &[(Option<i64>, Option<i64>)],
|
||||
) -> Vec<Row> {
|
||||
timestamps
|
||||
.iter()
|
||||
.zip(fields.iter())
|
||||
.map(|(ts, (field1, field2))| api::v1::Row {
|
||||
values: vec![
|
||||
api::v1::Value {
|
||||
value_data: Some(ValueData::StringValue(key.to_string())),
|
||||
},
|
||||
api::v1::Value {
|
||||
value_data: field1.map(|v| ValueData::F64Value(v as f64)),
|
||||
},
|
||||
api::v1::Value {
|
||||
value_data: field2.map(|v| ValueData::F64Value(v as f64)),
|
||||
},
|
||||
api::v1::Value {
|
||||
value_data: Some(ValueData::TimestampMillisecondValue(*ts * 1000)),
|
||||
},
|
||||
],
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Get column schemas for rows.
|
||||
pub fn rows_schema(request: &RegionCreateRequest) -> Vec<api::v1::ColumnSchema> {
|
||||
request
|
||||
|
||||
@@ -57,7 +57,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
self.intermediate_manager.clone(),
|
||||
)
|
||||
.cache(Some(self.cache_manager.clone()))
|
||||
.options(region.version().options.clone())
|
||||
.options(region.version().options.clone())?
|
||||
.skip_wal_replay(true)
|
||||
.open(&self.config, &self.wal)
|
||||
.await?,
|
||||
|
||||
Reference in New Issue
Block a user