From 82812ff19ea8432e3e0b2e47d1b682f2a320be50 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Thu, 6 Nov 2025 14:11:58 +0800 Subject: [PATCH] test: add a unit test to scan data from memtable in append mode (#7193) * test: add tests for scanning append mode before flush Signed-off-by: evenyag * refactor: extract a function maybe_dedup_one Signed-off-by: evenyag * ci: add flat format to docs.yml so we can make it required later Signed-off-by: evenyag --------- Signed-off-by: evenyag --- .github/workflows/docs.yml | 1 + src/mito2/src/engine/append_mode_test.rs | 123 +++++++++++++++++++++++ src/mito2/src/flush.rs | 38 ++++--- 3 files changed, 147 insertions(+), 15 deletions(-) diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index fc472e2d8b..71812a35bf 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -92,5 +92,6 @@ jobs: mode: - name: "Basic" - name: "Remote WAL" + - name: "Flat format" steps: - run: 'echo "No action required"' diff --git a/src/mito2/src/engine/append_mode_test.rs b/src/mito2/src/engine/append_mode_test.rs index ccdcbb3372..85d4f24fe3 100644 --- a/src/mito2/src/engine/append_mode_test.rs +++ b/src/mito2/src/engine/append_mode_test.rs @@ -220,3 +220,126 @@ async fn test_append_mode_compaction_with_format(flat_format: bool) { let batches = RecordBatches::try_collect(stream).await.unwrap(); assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"])); } + +#[tokio::test] +async fn test_put_single_range() { + test_put_single_range_with_format(false).await; + test_put_single_range_with_format(true).await; +} + +async fn test_put_single_range_with_format(flat_format: bool) { + let mut env = TestEnv::new().await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; + let region_id = RegionId::new(1, 1); + + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + env.get_kv_backend(), + ) + .await; + + let request = CreateRequestBuilder::new() + .insert_option("compaction.type", "twcs") + .insert_option("append_mode", "true") + .build(); + let table_dir = request.table_dir.clone(); + let region_opts = request.options.clone(); + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // a, field 1, 2 + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("a", 1, 3, 1), + }; + put_rows(&engine, region_id, rows).await; + // a, field 0, 1 + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("a", 0, 2, 0), + }; + put_rows(&engine, region_id, rows).await; + // b, field 0, 1 + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("b", 0, 2, 0), + }; + put_rows(&engine, region_id, rows).await; + // a, field 2, 3 + let rows = Rows { + schema: column_schemas, + rows: build_rows_for_key("a", 2, 4, 2), + }; + put_rows(&engine, region_id, rows).await; + + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| a | 0.0 | 1970-01-01T00:00:00 | +| a | 1.0 | 1970-01-01T00:00:01 | +| a | 1.0 | 1970-01-01T00:00:01 | +| a | 2.0 | 1970-01-01T00:00:02 | +| a | 2.0 | 1970-01-01T00:00:02 | +| a | 3.0 | 1970-01-01T00:00:03 | +| b | 0.0 | 1970-01-01T00:00:00 | +| b | 1.0 | 1970-01-01T00:00:01 | ++-------+---------+---------------------+"; + // Scans in parallel. + let mut scanner = engine + .scanner(region_id, ScanRequest::default()) + .await + .unwrap(); + assert_eq!(0, scanner.num_files()); + assert_eq!(1, scanner.num_memtables()); + scanner.set_target_partitions(2); + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"])); + + // Flushes and scans. + flush_region(&engine, region_id, None).await; + let mut scanner = engine + .scanner(region_id, ScanRequest::default()) + .await + .unwrap(); + assert_eq!(1, scanner.num_files()); + assert_eq!(0, scanner.num_memtables()); + scanner.set_target_partitions(2); + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"])); + + // Reopens engine. + let engine = env + .reopen_engine( + engine, + MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }, + ) + .await; + // Reopens the region. + reopen_region(&engine, region_id, table_dir, false, region_opts).await; + let stream = engine + .scan_to_stream(region_id, ScanRequest::default()) + .await + .unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"])); +} diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index bb1e848196..819a227e4b 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -743,7 +743,6 @@ struct FlatSources { encoded: SmallVec<[EncodedRange; 4]>, } -// TODO(yingwen): Flushes into multiple files in parallel. /// Returns the max sequence and [FlatSource] for the given memtable. fn memtable_flat_sources( schema: SchemaRef, @@ -767,20 +766,7 @@ fn memtable_flat_sources( let iter = only_range.build_record_batch_iter(None)?; // Dedup according to append mode and merge mode. // Even single range may have duplicate rows. - let iter = if options.append_mode { - // No dedup in append mode - Box::new(iter) as _ - } else { - match options.merge_mode() { - MergeMode::LastRow => { - Box::new(FlatDedupIterator::new(iter, FlatLastRow::new(false))) as _ - } - MergeMode::LastNonNull => Box::new(FlatDedupIterator::new( - iter, - FlatLastNonNull::new(field_column_start, false), - )) as _, - } - }; + let iter = maybe_dedup_one(options, field_column_start, iter); flat_sources.sources.push(FlatSource::Iter(iter)); }; } else { @@ -848,6 +834,28 @@ fn merge_and_dedup( Ok(maybe_dedup) } +fn maybe_dedup_one( + options: &RegionOptions, + field_column_start: usize, + input_iter: BoxedRecordBatchIterator, +) -> BoxedRecordBatchIterator { + if options.append_mode { + // No dedup in append mode + input_iter + } else { + // Dedup according to merge mode. + match options.merge_mode() { + MergeMode::LastRow => { + Box::new(FlatDedupIterator::new(input_iter, FlatLastRow::new(false))) + } + MergeMode::LastNonNull => Box::new(FlatDedupIterator::new( + input_iter, + FlatLastNonNull::new(field_column_start, false), + )), + } + } +} + /// Manages background flushes of a worker. pub(crate) struct FlushScheduler { /// Tracks regions need to flush.