fix(mito): append mode in flat format not working (#7186)

* mito2: add unit test for flat single-range append_mode dedup behavior

Verify memtable_flat_sources skips dedup when append_mode is true and
performs dedup otherwise for single-range flat memtables, preventing
regressions in the new append_mode path.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix/flat-source-merge:
 ### Improve Column Metadata Extraction Logic

 - **File**: `src/common/meta/src/ddl/utils.rs`
   - Modified the `extract_column_metadatas` function to use `swap_remove` for extracting the first schema and decode column metadata for comparison instead of raw bytes. This ensures that the extension map is considered during
 verification, enhancing the robustness of metadata consistency checks across datanodes.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2025-11-06 11:19:39 +08:00
committed by GitHub
parent fb92e4d0b2
commit 934df46f53
2 changed files with 144 additions and 21 deletions

View File

@@ -442,7 +442,7 @@ pub fn extract_column_metadatas(
results: &mut [RegionResponse],
key: &str,
) -> Result<Option<Vec<ColumnMetadata>>> {
let schemas = results
let mut schemas = results
.iter_mut()
.map(|r| r.extensions.remove(key))
.collect::<Vec<_>>();
@@ -454,20 +454,24 @@ pub fn extract_column_metadatas(
// Verify all the physical schemas are the same
// Safety: previous check ensures this vec is not empty
let first = schemas.first().unwrap();
ensure!(
schemas.iter().all(|x| x == first),
MetadataCorruptionSnafu {
err_msg: "The table column metadata schemas from datanodes are not the same."
}
);
let first_column_metadatas = schemas
.swap_remove(0)
.map(|first_bytes| ColumnMetadata::decode_list(&first_bytes).context(DecodeJsonSnafu))
.transpose()?;
if let Some(first) = first {
let column_metadatas = ColumnMetadata::decode_list(first).context(DecodeJsonSnafu)?;
Ok(Some(column_metadatas))
} else {
Ok(None)
for s in schemas {
// check decoded column metadata instead of bytes because it contains extension map.
let column_metadata = s
.map(|bytes| ColumnMetadata::decode_list(&bytes).context(DecodeJsonSnafu))
.transpose()?;
ensure!(
column_metadata == first_column_metadatas,
MetadataCorruptionSnafu {
err_msg: "The table column metadata schemas from datanodes are not the same."
}
);
}
Ok(first_column_metadatas)
}
#[cfg(test)]

View File

@@ -765,16 +765,21 @@ fn memtable_flat_sources(
flat_sources.encoded.push(encoded);
} else {
let iter = only_range.build_record_batch_iter(None)?;
// Dedup according to merge mode.
// Dedup according to append mode and merge mode.
// Even single range may have duplicate rows.
let iter = match options.merge_mode() {
MergeMode::LastRow => {
Box::new(FlatDedupIterator::new(iter, FlatLastRow::new(false))) as _
let iter = if options.append_mode {
// No dedup in append mode
Box::new(iter) as _
} else {
match options.merge_mode() {
MergeMode::LastRow => {
Box::new(FlatDedupIterator::new(iter, FlatLastRow::new(false))) as _
}
MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
iter,
FlatLastNonNull::new(field_column_start, false),
)) as _,
}
MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
iter,
FlatLastNonNull::new(field_column_start, false),
)) as _,
};
flat_sources.sources.push(FlatSource::Iter(iter));
};
@@ -1176,11 +1181,16 @@ impl FlushStatus {
#[cfg(test)]
mod tests {
use mito_codec::row_converter::build_primary_key_codec;
use tokio::sync::oneshot;
use super::*;
use crate::cache::CacheManager;
use crate::memtable::bulk::part::BulkPartConverter;
use crate::memtable::time_series::TimeSeriesMemtableBuilder;
use crate::memtable::{Memtable, RangesOptions};
use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
use crate::test_util::version_util::{VersionControlBuilder, write_rows_to_version};
@@ -1364,4 +1374,113 @@ mod tests {
assert_eq!(output, 0);
}
}
// Verifies single-range flat flush path respects append_mode (no dedup) vs dedup when disabled.
#[test]
fn test_memtable_flat_sources_single_range_append_mode_behavior() {
// Build test metadata and flat schema
let metadata = metadata_for_test();
let schema = to_flat_sst_arrow_schema(
&metadata,
&FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
);
// Prepare a bulk part containing duplicate rows for the same PK and timestamp
// Two rows with identical keys and timestamps (ts = 1000), different field values
let capacity = 16;
let pk_codec = build_primary_key_codec(&metadata);
let mut converter =
BulkPartConverter::new(&metadata, schema.clone(), capacity, pk_codec, true);
let kvs = build_key_values_with_ts_seq_values(
&metadata,
"dup_key".to_string(),
1,
vec![1000i64, 1000i64].into_iter(),
vec![Some(1.0f64), Some(2.0f64)].into_iter(),
1,
);
converter.append_key_values(&kvs).unwrap();
let part = converter.convert().unwrap();
// Helper to build MemtableRanges with a single range from one bulk part.
// We use BulkMemtable directly because it produces record batch iterators.
let build_ranges = |append_mode: bool| -> MemtableRanges {
let memtable = crate::memtable::bulk::BulkMemtable::new(
1,
metadata.clone(),
None,
None,
append_mode,
MergeMode::LastRow,
);
memtable.write_bulk(part.clone()).unwrap();
memtable.ranges(None, RangesOptions::for_flush()).unwrap()
};
// Case 1: append_mode = false => dedup happens, total rows should be 1
{
let mem_ranges = build_ranges(false);
assert_eq!(1, mem_ranges.ranges.len());
let options = RegionOptions {
append_mode: false,
merge_mode: Some(MergeMode::LastRow),
..Default::default()
};
let flat_sources = memtable_flat_sources(
schema.clone(),
mem_ranges,
&options,
metadata.primary_key.len(),
)
.unwrap();
assert!(flat_sources.encoded.is_empty());
assert_eq!(1, flat_sources.sources.len());
// Consume the iterator and count rows
let mut total_rows = 0usize;
for source in flat_sources.sources {
match source {
crate::read::FlatSource::Iter(iter) => {
for rb in iter {
total_rows += rb.unwrap().num_rows();
}
}
crate::read::FlatSource::Stream(_) => unreachable!(),
}
}
assert_eq!(1, total_rows, "dedup should keep a single row");
}
// Case 2: append_mode = true => no dedup, total rows should be 2
{
let mem_ranges = build_ranges(true);
assert_eq!(1, mem_ranges.ranges.len());
let options = RegionOptions {
append_mode: true,
..Default::default()
};
let flat_sources =
memtable_flat_sources(schema, mem_ranges, &options, metadata.primary_key.len())
.unwrap();
assert!(flat_sources.encoded.is_empty());
assert_eq!(1, flat_sources.sources.len());
let mut total_rows = 0usize;
for source in flat_sources.sources {
match source {
crate::read::FlatSource::Iter(iter) => {
for rb in iter {
total_rows += rb.unwrap().num_rows();
}
}
crate::read::FlatSource::Stream(_) => unreachable!(),
}
}
assert_eq!(2, total_rows, "append_mode should preserve duplicates");
}
}
}