test: add a unit test to scan data from memtable in append mode (#7193)

* test: add tests for scanning append mode before flush

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: extract a function maybe_dedup_one

Signed-off-by: evenyag <realevenyag@gmail.com>

* ci: add flat format to docs.yml so we can make it required later

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2025-11-06 14:11:58 +08:00
committed by GitHub
parent 4a77167138
commit 82812ff19e
3 changed files with 147 additions and 15 deletions

View File

@@ -92,5 +92,6 @@ jobs:
mode:
- name: "Basic"
- name: "Remote WAL"
- name: "Flat format"
steps:
- run: 'echo "No action required"'

View File

@@ -220,3 +220,126 @@ async fn test_append_mode_compaction_with_format(flat_format: bool) {
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"]));
}
#[tokio::test]
async fn test_put_single_range() {
test_put_single_range_with_format(false).await;
test_put_single_range_with_format(true).await;
}
async fn test_put_single_range_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("append_mode", "true")
.build();
let table_dir = request.table_dir.clone();
let region_opts = request.options.clone();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// a, field 1, 2
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 1, 3, 1),
};
put_rows(&engine, region_id, rows).await;
// a, field 0, 1
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 0, 2, 0),
};
put_rows(&engine, region_id, rows).await;
// b, field 0, 1
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("b", 0, 2, 0),
};
put_rows(&engine, region_id, rows).await;
// a, field 2, 3
let rows = Rows {
schema: column_schemas,
rows: build_rows_for_key("a", 2, 4, 2),
};
put_rows(&engine, region_id, rows).await;
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| a | 0.0 | 1970-01-01T00:00:00 |
| a | 1.0 | 1970-01-01T00:00:01 |
| a | 1.0 | 1970-01-01T00:00:01 |
| a | 2.0 | 1970-01-01T00:00:02 |
| a | 2.0 | 1970-01-01T00:00:02 |
| a | 3.0 | 1970-01-01T00:00:03 |
| b | 0.0 | 1970-01-01T00:00:00 |
| b | 1.0 | 1970-01-01T00:00:01 |
+-------+---------+---------------------+";
// Scans in parallel.
let mut scanner = engine
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
assert_eq!(0, scanner.num_files());
assert_eq!(1, scanner.num_memtables());
scanner.set_target_partitions(2);
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"]));
// Flushes and scans.
flush_region(&engine, region_id, None).await;
let mut scanner = engine
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
assert_eq!(1, scanner.num_files());
assert_eq!(0, scanner.num_memtables());
scanner.set_target_partitions(2);
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"]));
// Reopens engine.
let engine = env
.reopen_engine(
engine,
MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
},
)
.await;
// Reopens the region.
reopen_region(&engine, region_id, table_dir, false, region_opts).await;
let stream = engine
.scan_to_stream(region_id, ScanRequest::default())
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"]));
}

View File

@@ -743,7 +743,6 @@ struct FlatSources {
encoded: SmallVec<[EncodedRange; 4]>,
}
// TODO(yingwen): Flushes into multiple files in parallel.
/// Returns the max sequence and [FlatSource] for the given memtable.
fn memtable_flat_sources(
schema: SchemaRef,
@@ -767,20 +766,7 @@ fn memtable_flat_sources(
let iter = only_range.build_record_batch_iter(None)?;
// Dedup according to append mode and merge mode.
// Even single range may have duplicate rows.
let iter = if options.append_mode {
// No dedup in append mode
Box::new(iter) as _
} else {
match options.merge_mode() {
MergeMode::LastRow => {
Box::new(FlatDedupIterator::new(iter, FlatLastRow::new(false))) as _
}
MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
iter,
FlatLastNonNull::new(field_column_start, false),
)) as _,
}
};
let iter = maybe_dedup_one(options, field_column_start, iter);
flat_sources.sources.push(FlatSource::Iter(iter));
};
} else {
@@ -848,6 +834,28 @@ fn merge_and_dedup(
Ok(maybe_dedup)
}
fn maybe_dedup_one(
options: &RegionOptions,
field_column_start: usize,
input_iter: BoxedRecordBatchIterator,
) -> BoxedRecordBatchIterator {
if options.append_mode {
// No dedup in append mode
input_iter
} else {
// Dedup according to merge mode.
match options.merge_mode() {
MergeMode::LastRow => {
Box::new(FlatDedupIterator::new(input_iter, FlatLastRow::new(false)))
}
MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
input_iter,
FlatLastNonNull::new(field_column_start, false),
)),
}
}
}
/// Manages background flushes of a worker.
pub(crate) struct FlushScheduler {
/// Tracks regions need to flush.