From 10b7a3d24d65e5849946c248068eb719b44e8b26 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Thu, 27 Jun 2024 15:52:58 +0800 Subject: [PATCH] feat: Implements `merge_mode` region options (#4208) * feat: add update_mode to region options * test: add test * feat: last not null iter * feat: time series last not null * feat: partition tree update mode * feat: partition tree * fix: last not null iter slice * test: add test for compaction * test: use second resolution * style: fix clippy * chore: merge two lines Co-authored-by: Jeremyhi * chore: address CR comments * refactor: UpdateMode -> MergeMode * refactor: LastNotNull -> LastNonNull * chore: return None earlier * feat: validate region options make merge mode optional and use default while it is None * test: fix tests --------- Co-authored-by: Jeremyhi --- src/mito2/benches/memtable_bench.rs | 7 +- src/mito2/src/compaction.rs | 43 ++-- src/mito2/src/compaction/compactor.rs | 24 +- src/mito2/src/compaction/window.rs | 1 + src/mito2/src/engine.rs | 2 + src/mito2/src/engine/append_mode_test.rs | 2 +- src/mito2/src/engine/merge_mode_test.rs | 208 +++++++++++++++ src/mito2/src/memtable.rs | 10 +- src/mito2/src/memtable/partition_tree.rs | 5 + src/mito2/src/memtable/partition_tree/tree.rs | 16 +- src/mito2/src/memtable/time_series.rs | 42 ++- src/mito2/src/read/dedup.rs | 243 +++++++++++++++--- src/mito2/src/read/scan_region.rs | 14 +- src/mito2/src/read/seq_scan.rs | 17 +- src/mito2/src/region/opener.rs | 16 +- src/mito2/src/region/options.rs | 75 +++++- src/mito2/src/test_util.rs | 32 +++ src/mito2/src/worker/handle_catchup.rs | 2 +- 18 files changed, 670 insertions(+), 89 deletions(-) create mode 100644 src/mito2/src/engine/merge_mode_test.rs diff --git a/src/mito2/benches/memtable_bench.rs b/src/mito2/benches/memtable_bench.rs index f50d3e32dd..4309520cdd 100644 --- a/src/mito2/benches/memtable_bench.rs +++ b/src/mito2/benches/memtable_bench.rs @@ -24,6 +24,7 @@ use datatypes::schema::ColumnSchema; use mito2::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtable}; use mito2::memtable::time_series::TimeSeriesMemtable; use mito2::memtable::{KeyValues, Memtable}; +use mito2::region::options::MergeMode; use mito2::test_util::memtable_util::{self, region_metadata_to_row_schema}; use rand::rngs::ThreadRng; use rand::seq::SliceRandom; @@ -51,7 +52,7 @@ fn write_rows(c: &mut Criterion) { }); }); group.bench_function("time_series", |b| { - let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None, true); + let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None, true, MergeMode::LastRow); let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 42, ×tamps, 1); b.iter(|| { @@ -83,7 +84,7 @@ fn full_scan(c: &mut Criterion) { }); }); group.bench_function("time_series", |b| { - let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None, true); + let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None, true, MergeMode::LastRow); for kvs in generator.iter() { memtable.write(&kvs).unwrap(); } @@ -121,7 +122,7 @@ fn filter_1_host(c: &mut Criterion) { }); }); group.bench_function("time_series", |b| { - let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None, true); + let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None, true, MergeMode::LastRow); for kvs in generator.iter() { memtable.write(&kvs).unwrap(); } diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 6ee21b736e..0106f2ead3 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -54,6 +54,7 @@ use crate::read::projection::ProjectionMapper; use crate::read::scan_region::ScanInput; use crate::read::seq_scan::SeqScan; use crate::read::BoxedBatchReader; +use crate::region::options::MergeMode; use crate::region::version::{VersionControlRef, VersionRef}; use crate::region::ManifestContextRef; use crate::request::{OptionOutputTx, OutputTx, WorkerRequest}; @@ -453,31 +454,39 @@ pub struct SerializedCompactionOutput { output_time_range: Option, } -/// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order. -async fn build_sst_reader( +/// Builders to create [BoxedBatchReader] for compaction. +struct CompactionSstReaderBuilder<'a> { metadata: RegionMetadataRef, sst_layer: AccessLayerRef, cache: Option, - inputs: &[FileHandle], + inputs: &'a [FileHandle], append_mode: bool, filter_deleted: bool, time_range: Option, -) -> Result { - let mut scan_input = ScanInput::new(sst_layer, ProjectionMapper::all(&metadata)?) - .with_files(inputs.to_vec()) - .with_append_mode(append_mode) - .with_cache(cache) - .with_filter_deleted(filter_deleted) - // We ignore file not found error during compaction. - .with_ignore_file_not_found(true); + merge_mode: MergeMode, +} - // This serves as a workaround of https://github.com/GreptimeTeam/greptimedb/issues/3944 - // by converting time ranges into predicate. - if let Some(time_range) = time_range { - scan_input = scan_input.with_predicate(time_range_to_predicate(time_range, &metadata)?); +impl<'a> CompactionSstReaderBuilder<'a> { + /// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order. + async fn build_sst_reader(self) -> Result { + let mut scan_input = ScanInput::new(self.sst_layer, ProjectionMapper::all(&self.metadata)?) + .with_files(self.inputs.to_vec()) + .with_append_mode(self.append_mode) + .with_cache(self.cache) + .with_filter_deleted(self.filter_deleted) + // We ignore file not found error during compaction. + .with_ignore_file_not_found(true) + .with_merge_mode(self.merge_mode); + + // This serves as a workaround of https://github.com/GreptimeTeam/greptimedb/issues/3944 + // by converting time ranges into predicate. + if let Some(time_range) = self.time_range { + scan_input = + scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?); + } + + SeqScan::new(scan_input).build_reader().await } - - SeqScan::new(scan_input).build_reader().await } /// Converts time range to predicates so that rows outside the range will be filtered. diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 827aba80cc..216dff88e7 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -26,8 +26,8 @@ use store_api::storage::RegionId; use crate::access_layer::{AccessLayer, AccessLayerRef, SstWriteRequest}; use crate::cache::{CacheManager, CacheManagerRef}; -use crate::compaction::build_sst_reader; use crate::compaction::picker::{new_picker, PickerOutput}; +use crate::compaction::CompactionSstReaderBuilder; use crate::config::MitoConfig; use crate::error::{EmptyRegionDirSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result}; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; @@ -137,7 +137,8 @@ pub async fn open_compaction_region( let memtable_builder = MemtableBuilderProvider::new(None, Arc::new(mito_config.clone())) .builder_for_options( req.region_options.memtable.as_ref(), - !req.region_options.append_mode, + req.region_options.need_dedup(), + req.region_options.merge_mode(), ); // Initial memtable id is 0. @@ -282,16 +283,19 @@ impl Compactor for DefaultCompactor { .index_options .clone(); let append_mode = compaction_region.current_version.options.append_mode; + let merge_mode = compaction_region.current_version.options.merge_mode(); futs.push(async move { - let reader = build_sst_reader( - region_metadata.clone(), - sst_layer.clone(), - Some(cache_manager.clone()), - &output.inputs, + let reader = CompactionSstReaderBuilder { + metadata: region_metadata.clone(), + sst_layer: sst_layer.clone(), + cache: Some(cache_manager.clone()), + inputs: &output.inputs, append_mode, - output.filter_deleted, - output.output_time_range, - ) + filter_deleted: output.filter_deleted, + time_range: output.output_time_range, + merge_mode, + } + .build_sst_reader() .await?; let file_meta_opt = sst_layer .write_sst( diff --git a/src/mito2/src/compaction/window.rs b/src/mito2/src/compaction/window.rs index 1683d28f9a..cf5f172155 100644 --- a/src/mito2/src/compaction/window.rs +++ b/src/mito2/src/compaction/window.rs @@ -260,6 +260,7 @@ mod tests { wal_options: Default::default(), index_options: Default::default(), memtable: None, + merge_mode: None, }, }) } diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 86324ec9f0..7af21da298 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -39,6 +39,8 @@ mod flush_test; #[cfg(any(test, feature = "test"))] pub mod listener; #[cfg(test)] +mod merge_mode_test; +#[cfg(test)] mod open_test; #[cfg(test)] mod parallel_test; diff --git a/src/mito2/src/engine/append_mode_test.rs b/src/mito2/src/engine/append_mode_test.rs index ed9d64ee2c..548a205f39 100644 --- a/src/mito2/src/engine/append_mode_test.rs +++ b/src/mito2/src/engine/append_mode_test.rs @@ -113,7 +113,7 @@ async fn test_append_mode_compaction() { .await .unwrap(); - // Flush 2 SSTs for compaction. + // Flush 3 SSTs for compaction. // a, field 1, 2 let rows = Rows { schema: column_schemas.clone(), diff --git a/src/mito2/src/engine/merge_mode_test.rs b/src/mito2/src/engine/merge_mode_test.rs new file mode 100644 index 0000000000..1adf51d12f --- /dev/null +++ b/src/mito2/src/engine/merge_mode_test.rs @@ -0,0 +1,208 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Tests for append mode. + +use api::v1::Rows; +use common_recordbatch::RecordBatches; +use store_api::region_engine::RegionEngine; +use store_api::region_request::{RegionCompactRequest, RegionRequest}; +use store_api::storage::{RegionId, ScanRequest}; + +use crate::config::MitoConfig; +use crate::test_util::batch_util::sort_batches_and_print; +use crate::test_util::{ + build_delete_rows_for_key, build_rows_with_fields, delete_rows, delete_rows_schema, + flush_region, put_rows, reopen_region, rows_schema, CreateRequestBuilder, TestEnv, +}; + +#[tokio::test] +async fn test_merge_mode_write_query() { + common_telemetry::init_default_ut_logging(); + + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new() + .field_num(2) + .insert_option("merge_mode", "last_non_null") + .build(); + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let rows = build_rows_with_fields( + "a", + &[1, 2, 3], + &[(Some(1), None), (None, None), (None, Some(3))], + ); + let rows = Rows { + schema: column_schemas.clone(), + rows, + }; + put_rows(&engine, region_id, rows).await; + + let rows = build_rows_with_fields("a", &[2, 3], &[(Some(12), None), (Some(13), None)]); + let rows = Rows { + schema: column_schemas.clone(), + rows, + }; + put_rows(&engine, region_id, rows).await; + + let rows = build_rows_with_fields("a", &[1, 2], &[(Some(11), None), (Some(22), Some(222))]); + let rows = Rows { + schema: column_schemas, + rows, + }; + put_rows(&engine, region_id, rows).await; + + let request = ScanRequest::default(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------+---------------------+ +| tag_0 | field_0 | field_1 | ts | ++-------+---------+---------+---------------------+ +| a | 11.0 | | 1970-01-01T00:00:01 | +| a | 22.0 | 222.0 | 1970-01-01T00:00:02 | +| a | 13.0 | 3.0 | 1970-01-01T00:00:03 | ++-------+---------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} + +#[tokio::test] +async fn test_merge_mode_compaction() { + common_telemetry::init_default_ut_logging(); + + let mut env = TestEnv::new(); + let engine = env + .create_engine(MitoConfig { + scan_parallelism: 2, + ..Default::default() + }) + .await; + let region_id = RegionId::new(1, 1); + + let request = CreateRequestBuilder::new() + .field_num(2) + .insert_option("compaction.type", "twcs") + .insert_option("compaction.twcs.max_active_window_files", "2") + .insert_option("compaction.twcs.max_inactive_window_files", "2") + .insert_option("merge_mode", "last_non_null") + .build(); + let region_dir = request.region_dir.clone(); + let region_opts = request.options.clone(); + let delete_schema = delete_rows_schema(&request); + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Flush 3 SSTs for compaction. + // a, 1 => (1, null), 2 => (null, null), 3 => (null, 3), 4 => (4, 4) + let rows = build_rows_with_fields( + "a", + &[1, 2, 3, 4], + &[ + (Some(1), None), + (None, None), + (None, Some(3)), + (Some(4), Some(4)), + ], + ); + let rows = Rows { + schema: column_schemas.clone(), + rows, + }; + put_rows(&engine, region_id, rows).await; + flush_region(&engine, region_id, None).await; + + // a, 1 => (null, 11), 2 => (2, null), 3 => (null, 13) + let rows = build_rows_with_fields( + "a", + &[1, 2, 3], + &[(None, Some(11)), (Some(2), None), (None, Some(13))], + ); + let rows = Rows { + schema: column_schemas.clone(), + rows, + }; + put_rows(&engine, region_id, rows).await; + flush_region(&engine, region_id, None).await; + + // Delete a, 4 + let rows = Rows { + schema: delete_schema.clone(), + rows: build_delete_rows_for_key("a", 4, 5), + }; + delete_rows(&engine, region_id, rows).await; + flush_region(&engine, region_id, None).await; + + let output = engine + .handle_request( + region_id, + RegionRequest::Compact(RegionCompactRequest::default()), + ) + .await + .unwrap(); + assert_eq!(output.affected_rows, 0); + + // a, 1 => (21, null), 2 => (22, null) + let rows = build_rows_with_fields("a", &[1, 2], &[(Some(21), None), (Some(22), None)]); + let rows = Rows { + schema: column_schemas.clone(), + rows, + }; + put_rows(&engine, region_id, rows).await; + + let expected = "\ ++-------+---------+---------+---------------------+ +| tag_0 | field_0 | field_1 | ts | ++-------+---------+---------+---------------------+ +| a | 21.0 | 11.0 | 1970-01-01T00:00:01 | +| a | 22.0 | | 1970-01-01T00:00:02 | +| a | | 13.0 | 1970-01-01T00:00:03 | ++-------+---------+---------+---------------------+"; + // Scans in parallel. + let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + assert_eq!(1, scanner.num_files()); + assert_eq!(1, scanner.num_memtables()); + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"])); + + // Reopens engine with parallelism 1. + let engine = env + .reopen_engine( + engine, + MitoConfig { + scan_parallelism: 1, + ..Default::default() + }, + ) + .await; + // Reopens the region. + reopen_region(&engine, region_id, region_dir, false, region_opts).await; + let stream = engine + .scan_to_stream(region_id, ScanRequest::default()) + .await + .unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"])); +} diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index b807197f09..3cc497b254 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -34,7 +34,7 @@ use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtable use crate::memtable::time_series::TimeSeriesMemtableBuilder; use crate::metrics::WRITE_BUFFER_BYTES; use crate::read::Batch; -use crate::region::options::MemtableOptions; +use crate::region::options::{MemtableOptions, MergeMode}; pub mod bulk; pub mod key_values; @@ -251,11 +251,13 @@ impl MemtableBuilderProvider { &self, options: Option<&MemtableOptions>, dedup: bool, + merge_mode: MergeMode, ) -> MemtableBuilderRef { match options { Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new( self.write_buffer_manager.clone(), dedup, + merge_mode, )), Some(MemtableOptions::PartitionTree(opts)) => { Arc::new(PartitionTreeMemtableBuilder::new( @@ -264,15 +266,16 @@ impl MemtableBuilderProvider { data_freeze_threshold: opts.data_freeze_threshold, fork_dictionary_bytes: opts.fork_dictionary_bytes, dedup, + merge_mode, }, self.write_buffer_manager.clone(), )) } - None => self.default_memtable_builder(dedup), + None => self.default_memtable_builder(dedup, merge_mode), } } - fn default_memtable_builder(&self, dedup: bool) -> MemtableBuilderRef { + fn default_memtable_builder(&self, dedup: bool, merge_mode: MergeMode) -> MemtableBuilderRef { match &self.config.memtable { MemtableConfig::PartitionTree(config) => { let mut config = config.clone(); @@ -285,6 +288,7 @@ impl MemtableBuilderProvider { MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new( self.write_buffer_manager.clone(), dedup, + merge_mode, )), } } diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index af3b1e3437..0d902aaa85 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -43,6 +43,7 @@ use crate::memtable::{ AllocTracker, BoxedBatchIterator, BulkPart, IterBuilder, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRef, MemtableStats, }; +use crate::region::options::MergeMode; /// Use `1/DICTIONARY_SIZE_FACTOR` of OS memory as dictionary size. pub(crate) const DICTIONARY_SIZE_FACTOR: u64 = 8; @@ -80,6 +81,9 @@ pub struct PartitionTreeConfig { pub dedup: bool, /// Total bytes of dictionary to keep in fork. pub fork_dictionary_bytes: ReadableSize, + /// Merge mode of the tree. + #[serde(skip_deserializing)] + pub merge_mode: MergeMode, } impl Default for PartitionTreeConfig { @@ -98,6 +102,7 @@ impl Default for PartitionTreeConfig { data_freeze_threshold: 131072, dedup: true, fork_dictionary_bytes, + merge_mode: MergeMode::LastRow, } } } diff --git a/src/mito2/src/memtable/partition_tree/tree.rs b/src/mito2/src/memtable/partition_tree/tree.rs index 110032a68e..3df106f7da 100644 --- a/src/mito2/src/memtable/partition_tree/tree.rs +++ b/src/mito2/src/memtable/partition_tree/tree.rs @@ -40,7 +40,9 @@ use crate::memtable::partition_tree::partition::{ use crate::memtable::partition_tree::PartitionTreeConfig; use crate::memtable::{BoxedBatchIterator, KeyValues}; use crate::metrics::{PARTITION_TREE_READ_STAGE_ELAPSED, READ_ROWS_TOTAL, READ_STAGE_ELAPSED}; +use crate::read::dedup::LastNonNullIter; use crate::read::Batch; +use crate::region::options::MergeMode; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; /// The partition tree. @@ -83,9 +85,13 @@ impl PartitionTree { .collect(), }; let is_partitioned = Partition::has_multi_partitions(&metadata); + let mut config = config.clone(); + if config.merge_mode == MergeMode::LastNonNull { + config.dedup = false; + } PartitionTree { - config: config.clone(), + config, metadata, row_codec: Arc::new(row_codec), partitions: Default::default(), @@ -237,7 +243,13 @@ impl PartitionTree { iter.fetch_next_partition(context)?; iter.metrics.iter_elapsed += start.elapsed(); - Ok(Box::new(iter)) + + if self.config.merge_mode == MergeMode::LastNonNull { + let iter = LastNonNullIter::new(iter); + Ok(Box::new(iter)) + } else { + Ok(Box::new(iter)) + } } /// Returns true if the tree is empty. diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index ac3965f6f1..4c7a114566 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -47,7 +47,9 @@ use crate::memtable::{ MemtableId, MemtableRange, MemtableRangeContext, MemtableRef, MemtableStats, }; use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED}; +use crate::read::dedup::LastNonNullIter; use crate::read::{Batch, BatchBuilder, BatchColumn}; +use crate::region::options::MergeMode; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; /// Initial vector builder capacity. @@ -58,14 +60,20 @@ const INITIAL_BUILDER_CAPACITY: usize = 0; pub struct TimeSeriesMemtableBuilder { write_buffer_manager: Option, dedup: bool, + merge_mode: MergeMode, } impl TimeSeriesMemtableBuilder { /// Creates a new builder with specific `write_buffer_manager`. - pub fn new(write_buffer_manager: Option, dedup: bool) -> Self { + pub fn new( + write_buffer_manager: Option, + dedup: bool, + merge_mode: MergeMode, + ) -> Self { Self { write_buffer_manager, dedup, + merge_mode, } } } @@ -77,6 +85,7 @@ impl MemtableBuilder for TimeSeriesMemtableBuilder { id, self.write_buffer_manager.clone(), self.dedup, + self.merge_mode, )) } } @@ -91,6 +100,7 @@ pub struct TimeSeriesMemtable { max_timestamp: AtomicI64, min_timestamp: AtomicI64, dedup: bool, + merge_mode: MergeMode, } impl TimeSeriesMemtable { @@ -99,6 +109,7 @@ impl TimeSeriesMemtable { id: MemtableId, write_buffer_manager: Option, dedup: bool, + merge_mode: MergeMode, ) -> Self { let row_codec = Arc::new(McmpRowCodec::new( region_metadata @@ -107,6 +118,11 @@ impl TimeSeriesMemtable { .collect(), )); let series_set = SeriesSet::new(region_metadata.clone(), row_codec.clone()); + let dedup = if merge_mode == MergeMode::LastNonNull { + false + } else { + dedup + }; Self { id, region_metadata, @@ -116,6 +132,7 @@ impl TimeSeriesMemtable { max_timestamp: AtomicI64::new(i64::MIN), min_timestamp: AtomicI64::new(i64::MAX), dedup, + merge_mode, } } @@ -251,7 +268,13 @@ impl Memtable for TimeSeriesMemtable { let iter = self .series_set .iter_series(projection, filters, self.dedup)?; - Ok(Box::new(iter)) + + if self.merge_mode == MergeMode::LastNonNull { + let iter = LastNonNullIter::new(iter); + Ok(Box::new(iter)) + } else { + Ok(Box::new(iter)) + } } fn ranges( @@ -272,6 +295,7 @@ impl Memtable for TimeSeriesMemtable { projection, predicate, dedup: self.dedup, + merge_mode: self.merge_mode, }); let context = Arc::new(MemtableRangeContext::new(self.id, builder)); @@ -320,6 +344,7 @@ impl Memtable for TimeSeriesMemtable { id, self.alloc_tracker.write_buffer_manager(), self.dedup, + self.merge_mode, )) } } @@ -856,6 +881,7 @@ struct TimeSeriesIterBuilder { projection: HashSet, predicate: Option, dedup: bool, + merge_mode: MergeMode, } impl IterBuilder for TimeSeriesIterBuilder { @@ -865,7 +891,13 @@ impl IterBuilder for TimeSeriesIterBuilder { self.predicate.clone(), self.dedup, )?; - Ok(Box::new(iter)) + + if self.merge_mode == MergeMode::LastNonNull { + let iter = LastNonNullIter::new(iter); + Ok(Box::new(iter)) + } else { + Ok(Box::new(iter)) + } } } @@ -1234,7 +1266,7 @@ mod tests { fn check_memtable_dedup(dedup: bool) { let schema = schema_for_test(); let kvs = build_key_values(&schema, "hello".to_string(), 42, 100); - let memtable = TimeSeriesMemtable::new(schema, 42, None, dedup); + let memtable = TimeSeriesMemtable::new(schema, 42, None, dedup, MergeMode::LastRow); memtable.write(&kvs).unwrap(); memtable.write(&kvs).unwrap(); @@ -1283,7 +1315,7 @@ mod tests { common_telemetry::init_default_ut_logging(); let schema = schema_for_test(); let kvs = build_key_values(&schema, "hello".to_string(), 42, 100); - let memtable = TimeSeriesMemtable::new(schema, 42, None, true); + let memtable = TimeSeriesMemtable::new(schema, 42, None, true, MergeMode::LastRow); memtable.write(&kvs).unwrap(); let iter = memtable.iter(Some(&[3]), None).unwrap(); diff --git a/src/mito2/src/read/dedup.rs b/src/mito2/src/read/dedup.rs index c8709edcd9..52ff05fd12 100644 --- a/src/mito2/src/read/dedup.rs +++ b/src/mito2/src/read/dedup.rs @@ -317,9 +317,9 @@ impl LastFieldsBuilder { self.contains_null = self.last_fields.iter().any(Value::is_null); } - /// Merges last not null fields, builds a new batch and resets the builder. + /// Merges last non-null fields, builds a new batch and resets the builder. /// It may overwrites the last row of the `buffer`. - fn merge_last_not_null( + fn merge_last_non_null( &mut self, buffer: Batch, metrics: &mut DedupMetrics, @@ -380,20 +380,20 @@ impl LastFieldsBuilder { } } -/// Dedup strategy that keeps the last not null field for the same key. +/// Dedup strategy that keeps the last non-null field for the same key. /// /// It assumes that batches from files and memtables don't contain duplicate rows /// and the merge reader never concatenates batches from different source. /// /// We might implement a new strategy if we need to process files with duplicate rows. -pub(crate) struct LastNotNull { +pub(crate) struct LastNonNull { /// Buffered batch that fields in the last row may be updated. buffer: Option, /// Fields that overlaps with the last row of the `buffer`. last_fields: LastFieldsBuilder, } -impl LastNotNull { +impl LastNonNull { /// Creates a new strategy with the given `filter_deleted` flag. #[allow(dead_code)] pub(crate) fn new(filter_deleted: bool) -> Self { @@ -404,7 +404,7 @@ impl LastNotNull { } } -impl DedupStrategy for LastNotNull { +impl DedupStrategy for LastNonNull { fn push_batch(&mut self, batch: Batch, metrics: &mut DedupMetrics) -> Result> { if batch.is_empty() { return Ok(None); @@ -422,14 +422,14 @@ impl DedupStrategy for LastNotNull { if buffer.primary_key() != batch.primary_key() { // Next key is different. let buffer = std::mem::replace(buffer, batch); - let merged = self.last_fields.merge_last_not_null(buffer, metrics)?; + let merged = self.last_fields.merge_last_non_null(buffer, metrics)?; return Ok(merged); } if buffer.last_timestamp() != batch.first_timestamp() { // The next batch has a different timestamp. let buffer = std::mem::replace(buffer, batch); - let merged = self.last_fields.merge_last_not_null(buffer, metrics)?; + let merged = self.last_fields.merge_last_non_null(buffer, metrics)?; return Ok(merged); } @@ -449,7 +449,7 @@ impl DedupStrategy for LastNotNull { // Moves the remaining rows to the buffer. let batch = batch.slice(1, batch.num_rows() - 1); let buffer = std::mem::replace(buffer, batch); - let merged = self.last_fields.merge_last_not_null(buffer, metrics)?; + let merged = self.last_fields.merge_last_non_null(buffer, metrics)?; Ok(merged) } @@ -462,12 +462,107 @@ impl DedupStrategy for LastNotNull { // Initializes last fields with the first buffer. self.last_fields.maybe_init(&buffer); - let merged = self.last_fields.merge_last_not_null(buffer, metrics)?; + let merged = self.last_fields.merge_last_non_null(buffer, metrics)?; Ok(merged) } } +/// An iterator that dedup rows by [LastNonNull] strategy. +/// The input iterator must returns sorted batches. +pub(crate) struct LastNonNullIter { + /// Inner iterator that returns sorted batches. + iter: Option, + /// Dedup strategy. + strategy: LastNonNull, + /// Dedup metrics. + metrics: DedupMetrics, + /// The current batch returned by the iterator. If it is None, we need to + /// fetch a new batch. + /// The batch is always not empty. + current_batch: Option, +} + +impl LastNonNullIter { + /// Creates a new iterator with the given inner iterator. + pub(crate) fn new(iter: I) -> Self { + Self { + iter: Some(iter), + // We only use the iter in memtables. Memtables never filter deleted. + strategy: LastNonNull::new(false), + metrics: DedupMetrics::default(), + current_batch: None, + } + } + + /// Finds the index of the first row that has the same timestamp with the next row. + /// If no duplicate rows, returns None. + fn find_split_index(batch: &Batch) -> Option { + if batch.num_rows() < 2 { + return None; + } + + // Safety: The batch is not empty. + let timestamps = batch.timestamps_native().unwrap(); + timestamps.windows(2).position(|t| t[0] == t[1]) + } +} + +impl>> LastNonNullIter { + /// Fetches the next batch from the inner iterator. It will slice the batch if it + /// contains duplicate rows. + fn next_batch_for_merge(&mut self) -> Result> { + if self.current_batch.is_none() { + // No current batch. Fetches a new batch from the inner iterator. + let Some(iter) = self.iter.as_mut() else { + // The iterator is exhausted. + return Ok(None); + }; + + self.current_batch = iter.next().transpose()?; + if self.current_batch.is_none() { + // The iterator is exhausted. + self.iter = None; + return Ok(None); + } + } + + if let Some(batch) = &self.current_batch { + let Some(index) = Self::find_split_index(batch) else { + // No duplicate rows in the current batch. + return Ok(self.current_batch.take()); + }; + + let first = batch.slice(0, index + 1); + let batch = batch.slice(index + 1, batch.num_rows() - index - 1); + // `index` is Some indicates that the batch has at least one row remaining. + debug_assert!(!batch.is_empty()); + self.current_batch = Some(batch); + return Ok(Some(first)); + } + + Ok(None) + } + + fn next_batch(&mut self) -> Result> { + while let Some(batch) = self.next_batch_for_merge()? { + if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? { + return Ok(Some(batch)); + } + } + + self.strategy.finish(&mut self.metrics) + } +} + +impl>> Iterator for LastNonNullIter { + type Item = Result; + + fn next(&mut self) -> Option { + self.next_batch().transpose() + } +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -506,9 +601,9 @@ mod tests { assert_eq!(0, reader.metrics().num_unselected_rows); assert_eq!(0, reader.metrics().num_deleted_rows); - // Test last not null. + // Test last non-null. let reader = VecBatchReader::new(&input); - let mut reader = DedupReader::new(reader, LastNotNull::new(true)); + let mut reader = DedupReader::new(reader, LastNonNull::new(true)); check_reader_result(&mut reader, &input).await; assert_eq!(0, reader.metrics().num_unselected_rows); assert_eq!(0, reader.metrics().num_deleted_rows); @@ -640,7 +735,7 @@ mod tests { } #[tokio::test] - async fn test_last_not_null_merge() { + async fn test_last_non_null_merge() { let input = [ new_batch_multi_fields( b"k1", @@ -688,7 +783,7 @@ mod tests { // Filter deleted. let reader = VecBatchReader::new(&input); - let mut reader = DedupReader::new(reader, LastNotNull::new(true)); + let mut reader = DedupReader::new(reader, LastNonNull::new(true)); check_reader_result( &mut reader, &[ @@ -722,7 +817,7 @@ mod tests { // Does not filter deleted. let reader = VecBatchReader::new(&input); - let mut reader = DedupReader::new(reader, LastNotNull::new(false)); + let mut reader = DedupReader::new(reader, LastNonNull::new(false)); check_reader_result( &mut reader, &[ @@ -762,7 +857,7 @@ mod tests { } #[tokio::test] - async fn test_last_not_null_skip_merge_single() { + async fn test_last_non_null_skip_merge_single() { let input = [new_batch_multi_fields( b"k1", &[1, 2, 3], @@ -772,7 +867,7 @@ mod tests { )]; let reader = VecBatchReader::new(&input); - let mut reader = DedupReader::new(reader, LastNotNull::new(true)); + let mut reader = DedupReader::new(reader, LastNonNull::new(true)); check_reader_result( &mut reader, &[new_batch_multi_fields( @@ -788,14 +883,14 @@ mod tests { assert_eq!(1, reader.metrics().num_deleted_rows); let reader = VecBatchReader::new(&input); - let mut reader = DedupReader::new(reader, LastNotNull::new(false)); + let mut reader = DedupReader::new(reader, LastNonNull::new(false)); check_reader_result(&mut reader, &input).await; assert_eq!(0, reader.metrics().num_unselected_rows); assert_eq!(0, reader.metrics().num_deleted_rows); } #[tokio::test] - async fn test_last_not_null_skip_merge_no_null() { + async fn test_last_non_null_skip_merge_no_null() { let input = [ new_batch_multi_fields( b"k1", @@ -815,7 +910,7 @@ mod tests { ]; let reader = VecBatchReader::new(&input); - let mut reader = DedupReader::new(reader, LastNotNull::new(true)); + let mut reader = DedupReader::new(reader, LastNonNull::new(true)); check_reader_result( &mut reader, &[ @@ -835,7 +930,7 @@ mod tests { } #[tokio::test] - async fn test_last_not_null_merge_null() { + async fn test_last_non_null_merge_null() { let input = [ new_batch_multi_fields( b"k1", @@ -849,7 +944,7 @@ mod tests { ]; let reader = VecBatchReader::new(&input); - let mut reader = DedupReader::new(reader, LastNotNull::new(true)); + let mut reader = DedupReader::new(reader, LastNonNull::new(true)); check_reader_result( &mut reader, &[ @@ -884,7 +979,7 @@ mod tests { } #[test] - fn test_last_not_null_strategy_delete_last() { + fn test_last_non_null_strategy_delete_last() { let input = [ new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]), new_batch_multi_fields( @@ -905,7 +1000,7 @@ mod tests { new_batch_multi_fields(b"k2", &[3], &[3], &[OpType::Put], &[(None, Some(3))]), ]; - let mut strategy = LastNotNull::new(true); + let mut strategy = LastNonNull::new(true); check_dedup_strategy( &input, &mut strategy, @@ -918,13 +1013,13 @@ mod tests { } #[test] - fn test_last_not_null_strategy_delete_one() { + fn test_last_non_null_strategy_delete_one() { let input = [ new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Delete], &[(None, None)]), new_batch_multi_fields(b"k2", &[1], &[6], &[OpType::Put], &[(Some(11), None)]), ]; - let mut strategy = LastNotNull::new(true); + let mut strategy = LastNonNull::new(true); check_dedup_strategy( &input, &mut strategy, @@ -939,18 +1034,18 @@ mod tests { } #[test] - fn test_last_not_null_strategy_delete_all() { + fn test_last_non_null_strategy_delete_all() { let input = [ new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Delete], &[(None, None)]), new_batch_multi_fields(b"k2", &[1], &[6], &[OpType::Delete], &[(Some(11), None)]), ]; - let mut strategy = LastNotNull::new(true); + let mut strategy = LastNonNull::new(true); check_dedup_strategy(&input, &mut strategy, &[]); } #[test] - fn test_last_not_null_strategy_same_batch() { + fn test_last_non_null_strategy_same_batch() { let input = [ new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]), new_batch_multi_fields( @@ -971,7 +1066,7 @@ mod tests { new_batch_multi_fields(b"k1", &[3], &[3], &[OpType::Put], &[(None, Some(3))]), ]; - let mut strategy = LastNotNull::new(true); + let mut strategy = LastNonNull::new(true); check_dedup_strategy( &input, &mut strategy, @@ -982,4 +1077,92 @@ mod tests { ], ); } + + #[test] + fn test_last_non_null_iter_on_batch() { + let input = [new_batch_multi_fields( + b"k1", + &[1, 1, 2], + &[13, 12, 13], + &[OpType::Put, OpType::Put, OpType::Put], + &[(None, None), (Some(1), None), (Some(2), Some(22))], + )]; + let iter = input.into_iter().map(Ok); + let iter = LastNonNullIter::new(iter); + let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect(); + let expect = [ + new_batch_multi_fields(b"k1", &[1], &[13], &[OpType::Put], &[(Some(1), None)]), + new_batch_multi_fields(b"k1", &[2], &[13], &[OpType::Put], &[(Some(2), Some(22))]), + ]; + assert_eq!(&expect, &actual[..]); + } + + #[test] + fn test_last_non_null_iter_same_row() { + let input = [ + new_batch_multi_fields( + b"k1", + &[1, 1, 1], + &[13, 12, 11], + &[OpType::Put, OpType::Put, OpType::Put], + &[(None, None), (Some(1), None), (Some(11), None)], + ), + new_batch_multi_fields( + b"k1", + &[1, 1], + &[10, 9], + &[OpType::Put, OpType::Put], + &[(None, Some(11)), (Some(21), Some(31))], + ), + ]; + let iter = input.into_iter().map(Ok); + let iter = LastNonNullIter::new(iter); + let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect(); + let expect = [new_batch_multi_fields( + b"k1", + &[1], + &[13], + &[OpType::Put], + &[(Some(1), Some(11))], + )]; + assert_eq!(&expect, &actual[..]); + } + + #[test] + fn test_last_non_null_iter_multi_batch() { + let input = [ + new_batch_multi_fields( + b"k1", + &[1, 1, 2], + &[13, 12, 13], + &[OpType::Put, OpType::Put, OpType::Put], + &[(None, None), (Some(1), None), (Some(2), Some(22))], + ), + new_batch_multi_fields( + b"k1", + &[2, 3], + &[12, 13], + &[OpType::Put, OpType::Delete], + &[(None, Some(12)), (None, None)], + ), + new_batch_multi_fields( + b"k2", + &[1, 1, 2], + &[13, 12, 13], + &[OpType::Put, OpType::Put, OpType::Put], + &[(None, None), (Some(1), None), (Some(2), Some(22))], + ), + ]; + let iter = input.into_iter().map(Ok); + let iter = LastNonNullIter::new(iter); + let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect(); + let expect = [ + new_batch_multi_fields(b"k1", &[1], &[13], &[OpType::Put], &[(Some(1), None)]), + new_batch_multi_fields(b"k1", &[2], &[13], &[OpType::Put], &[(Some(2), Some(22))]), + new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Delete], &[(None, None)]), + new_batch_multi_fields(b"k2", &[1], &[13], &[OpType::Put], &[(Some(1), None)]), + new_batch_multi_fields(b"k2", &[2], &[13], &[OpType::Put], &[(Some(2), Some(22))]), + ]; + assert_eq!(&expect, &actual[..]); + } } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 4fe783ce9e..e29b1611a2 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -42,6 +42,7 @@ use crate::read::projection::ProjectionMapper; use crate::read::seq_scan::SeqScan; use crate::read::unordered_scan::UnorderedScan; use crate::read::{Batch, Source}; +use crate::region::options::MergeMode; use crate::region::version::VersionRef; use crate::sst::file::{overlaps, FileHandle, FileMeta}; use crate::sst::index::applier::builder::SstIndexApplierBuilder; @@ -295,7 +296,8 @@ impl ScanRegion { .with_parallelism(self.parallelism) .with_start_time(self.start_time) .with_append_mode(self.version.options.append_mode) - .with_filter_deleted(filter_deleted); + .with_filter_deleted(filter_deleted) + .with_merge_mode(self.version.options.merge_mode()); Ok(input) } @@ -398,6 +400,8 @@ pub(crate) struct ScanInput { pub(crate) append_mode: bool, /// Whether to remove deletion markers. pub(crate) filter_deleted: bool, + /// Mode to merge duplicate rows. + pub(crate) merge_mode: MergeMode, } impl ScanInput { @@ -418,6 +422,7 @@ impl ScanInput { query_start: None, append_mode: false, filter_deleted: true, + merge_mode: MergeMode::default(), } } @@ -497,6 +502,13 @@ impl ScanInput { self } + /// Sets the merge mode. + #[must_use] + pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self { + self.merge_mode = merge_mode; + self + } + /// Scans sources in parallel. /// /// # Panics if the input doesn't allow parallel scan. diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 17151b624d..9a0038135f 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -35,12 +35,13 @@ use tokio::sync::Semaphore; use crate::error::{PartitionOutOfRangeSnafu, Result}; use crate::memtable::MemtableRef; -use crate::read::dedup::{DedupReader, LastRow}; +use crate::read::dedup::{DedupReader, LastNonNull, LastRow}; use crate::read::merge::MergeReaderBuilder; use crate::read::scan_region::{ FileRangeCollector, ScanInput, ScanPart, ScanPartList, StreamContext, }; use crate::read::{BatchReader, BoxedBatchReader, ScannerMetrics, Source}; +use crate::region::options::MergeMode; use crate::sst::file::FileMeta; use crate::sst::parquet::file_range::FileRange; use crate::sst::parquet::reader::ReaderMetrics; @@ -210,10 +211,16 @@ impl SeqScan { let dedup = !stream_ctx.input.append_mode; if dedup { - let reader = Box::new(DedupReader::new( - reader, - LastRow::new(stream_ctx.input.filter_deleted), - )); + let reader = match stream_ctx.input.merge_mode { + MergeMode::LastRow => Box::new(DedupReader::new( + reader, + LastRow::new(stream_ctx.input.filter_deleted), + )) as _, + MergeMode::LastNonNull => Box::new(DedupReader::new( + reader, + LastNonNull::new(stream_ctx.input.filter_deleted), + )) as _, + }; Ok(Some(reader)) } else { let reader = Box::new(reader); diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index e20a00d35a..50aa7c68cd 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -119,9 +119,10 @@ impl RegionOpener { } /// Sets options for the region. - pub(crate) fn options(mut self, options: RegionOptions) -> Self { + pub(crate) fn options(mut self, options: RegionOptions) -> Result { + options.validate()?; self.options = Some(options); - self + Ok(self) } /// Sets the cache manager for the region. @@ -192,9 +193,11 @@ impl RegionOpener { ) .await?; - let memtable_builder = self - .memtable_builder_provider - .builder_for_options(options.memtable.as_ref(), !options.append_mode); + let memtable_builder = self.memtable_builder_provider.builder_for_options( + options.memtable.as_ref(), + options.need_dedup(), + options.merge_mode(), + ); // Initial memtable id is 0. let part_duration = options.compaction.time_window(); let mutable = Arc::new(TimePartitions::new( @@ -323,7 +326,8 @@ impl RegionOpener { )); let memtable_builder = self.memtable_builder_provider.builder_for_options( region_options.memtable.as_ref(), - !region_options.append_mode, + region_options.need_dedup(), + region_options.merge_mode(), ); // Initial memtable id is 0. let part_duration = region_options.compaction.time_window(); diff --git a/src/mito2/src/region/options.rs b/src/mito2/src/region/options.rs index d4a27fca3e..6806a4545b 100644 --- a/src/mito2/src/region/options.rs +++ b/src/mito2/src/region/options.rs @@ -24,15 +24,28 @@ use common_wal::options::{WalOptions, WAL_OPTIONS_KEY}; use serde::de::Error as _; use serde::{Deserialize, Deserializer, Serialize}; use serde_json::Value; -use serde_with::{serde_as, with_prefix, DisplayFromStr}; +use serde_with::{serde_as, with_prefix, DisplayFromStr, NoneAsEmptyString}; use snafu::{ensure, ResultExt}; use store_api::storage::ColumnId; +use strum::EnumString; use crate::error::{Error, InvalidRegionOptionsSnafu, JsonOptionsSnafu, Result}; use crate::memtable::partition_tree::{DEFAULT_FREEZE_THRESHOLD, DEFAULT_MAX_KEYS_PER_SHARD}; const DEFAULT_INDEX_SEGMENT_ROW_COUNT: usize = 1024; +/// Mode to handle duplicate rows while merging. +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, EnumString)] +#[serde(rename_all = "snake_case")] +#[strum(serialize_all = "snake_case")] +pub enum MergeMode { + /// Keeps the last row. + #[default] + LastRow, + /// Keeps the last non-null field for each row. + LastNonNull, +} + /// Options that affect the entire region. /// /// Users need to specify the options while creating/opening a region. @@ -54,6 +67,34 @@ pub struct RegionOptions { pub index_options: IndexOptions, /// Memtable options. pub memtable: Option, + /// The mode to merge duplicate rows. + /// Only takes effect when `append_mode` is `false`. + pub merge_mode: Option, +} + +impl RegionOptions { + /// Validates options. + pub fn validate(&self) -> Result<()> { + if self.append_mode { + ensure!( + self.merge_mode.is_none(), + InvalidRegionOptionsSnafu { + reason: "merge_mode is not allowed when append_mode is enabled", + } + ); + } + Ok(()) + } + + /// Returns `true` if deduplication is needed. + pub fn need_dedup(&self) -> bool { + !self.append_mode + } + + /// Returns the `merge_mode` if it is set, otherwise returns the default `MergeMode`. + pub fn merge_mode(&self) -> MergeMode { + self.merge_mode.unwrap_or_default() + } } impl TryFrom<&HashMap> for RegionOptions { @@ -89,7 +130,7 @@ impl TryFrom<&HashMap> for RegionOptions { None }; - Ok(RegionOptions { + let opts = RegionOptions { ttl: options.ttl, compaction, storage: options.storage, @@ -97,7 +138,11 @@ impl TryFrom<&HashMap> for RegionOptions { wal_options, index_options, memtable, - }) + merge_mode: options.merge_mode, + }; + opts.validate()?; + + Ok(opts) } } @@ -179,6 +224,8 @@ struct RegionOptionsWithoutEnum { storage: Option, #[serde_as(as = "DisplayFromStr")] append_mode: bool, + #[serde_as(as = "NoneAsEmptyString")] + merge_mode: Option, } impl Default for RegionOptionsWithoutEnum { @@ -188,6 +235,7 @@ impl Default for RegionOptionsWithoutEnum { ttl: options.ttl, storage: options.storage, append_mode: options.append_mode, + merge_mode: options.merge_mode, } } } @@ -477,6 +525,21 @@ mod tests { assert_eq!(StatusCode::InvalidArguments, err.status_code()); } + #[test] + fn test_with_merge_mode() { + let map = make_map(&[("merge_mode", "last_row")]); + let options = RegionOptions::try_from(&map).unwrap(); + assert_eq!(MergeMode::LastRow, options.merge_mode()); + + let map = make_map(&[("merge_mode", "last_non_null")]); + let options = RegionOptions::try_from(&map).unwrap(); + assert_eq!(MergeMode::LastNonNull, options.merge_mode()); + + let map = make_map(&[("merge_mode", "unknown")]); + let err = RegionOptions::try_from(&map).unwrap_err(); + assert_eq!(StatusCode::InvalidArguments, err.status_code()); + } + #[test] fn test_with_all() { let wal_options = WalOptions::Kafka(KafkaWalOptions { @@ -489,7 +552,7 @@ mod tests { ("compaction.twcs.time_window", "2h"), ("compaction.type", "twcs"), ("storage", "S3"), - ("append_mode", "true"), + ("append_mode", "false"), ("index.inverted_index.ignore_column_ids", "1,2,3"), ("index.inverted_index.segment_row_count", "512"), ( @@ -500,6 +563,7 @@ mod tests { ("memtable.partition_tree.index_max_keys_per_shard", "2048"), ("memtable.partition_tree.data_freeze_threshold", "2048"), ("memtable.partition_tree.fork_dictionary_bytes", "128M"), + ("merge_mode", "last_non_null"), ]); let options = RegionOptions::try_from(&map).unwrap(); let expect = RegionOptions { @@ -510,7 +574,7 @@ mod tests { time_window: Some(Duration::from_secs(3600 * 2)), }), storage: Some("S3".to_string()), - append_mode: true, + append_mode: false, wal_options, index_options: IndexOptions { inverted_index: InvertedIndexOptions { @@ -523,6 +587,7 @@ mod tests { data_freeze_threshold: 2048, fork_dictionary_bytes: ReadableSize::mb(128), })), + merge_mode: Some(MergeMode::LastNonNull), }; assert_eq!(expect, options); } diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index ac5f34f789..374e7548b0 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -920,6 +920,38 @@ pub fn build_rows(start: usize, end: usize) -> Vec { .collect() } +/// Build rows with schema (string, f64, f64, ts_millis). +/// - `key`: A string key that is common across all rows. +/// - `timestamps`: Array of timestamp values. +/// - `fields`: Array of tuples where each tuple contains two optional i64 values, representing two optional float fields. +/// Returns a vector of `Row` each containing the key, two optional float fields, and a timestamp. +pub fn build_rows_with_fields( + key: &str, + timestamps: &[i64], + fields: &[(Option, Option)], +) -> Vec { + timestamps + .iter() + .zip(fields.iter()) + .map(|(ts, (field1, field2))| api::v1::Row { + values: vec![ + api::v1::Value { + value_data: Some(ValueData::StringValue(key.to_string())), + }, + api::v1::Value { + value_data: field1.map(|v| ValueData::F64Value(v as f64)), + }, + api::v1::Value { + value_data: field2.map(|v| ValueData::F64Value(v as f64)), + }, + api::v1::Value { + value_data: Some(ValueData::TimestampMillisecondValue(*ts * 1000)), + }, + ], + }) + .collect() +} + /// Get column schemas for rows. pub fn rows_schema(request: &RegionCreateRequest) -> Vec { request diff --git a/src/mito2/src/worker/handle_catchup.rs b/src/mito2/src/worker/handle_catchup.rs index 93a84b92e2..a4353fe529 100644 --- a/src/mito2/src/worker/handle_catchup.rs +++ b/src/mito2/src/worker/handle_catchup.rs @@ -57,7 +57,7 @@ impl RegionWorkerLoop { self.intermediate_manager.clone(), ) .cache(Some(self.cache_manager.clone())) - .options(region.version().options.clone()) + .options(region.version().options.clone())? .skip_wal_replay(true) .open(&self.config, &self.wal) .await?,