fix: reordered write cause incorrect kv (#6345)

* fix/reordered-write-cause-incorrect-kv:
 - **Enhance Testing in `partition_tree.rs`**: Added comprehensive test functions such as `kv_region_metadata`, `key_values`, and `collect_kvs` to improve the robustness of key-value operations and ensure correct behavior of the `PartitionTreeMemtable`.
 - **Improve Key Handling in `dict.rs`**: Modified `KeyDictBuilder` to handle both full and sparse keys, ensuring correct mapping and insertion. Added a new test `test_builder_finish_with_sparse_key` to validate the handling of sparse keys.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix/reordered-write-cause-incorrect-kv:
 ### Refactor `partition_tree.rs` for Improved Key Handling

 - **Refactored Key Handling**: Simplified the `key_values` function to accept an iterator of keys, removing hardcoded key-value pairs. This change enhances flexibility and reduces redundancy in key management.
 - **Updated Test Cases**: Modified test cases to use the new `key_values` function signature, ensuring they iterate over keys dynamically rather than relying on predefined lists.

 Files affected:
 - `src/mito2/src/memtable/partition_tree.rs`

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix/reordered-write-cause-incorrect-kv:
 Enhance Testing in `partition_tree.rs`

 - Added assertions to verify key-value collection after `memtable` and `forked` operations.
 - Refactored key-value writing logic for clarity in `forked` operations.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2025-06-19 14:32:40 +08:00
committed by GitHub
parent 2ab08a8f93
commit 6ece560f8c
2 changed files with 168 additions and 3 deletions

View File

@@ -359,16 +359,19 @@ impl IterBuilder for PartitionTreeIterBuilder {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use api::v1::value::ValueData;
use api::v1::{Row, Rows, SemanticType};
use api::v1::{Mutation, OpType, Row, Rows, SemanticType};
use common_time::Timestamp;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::{BinaryExpr, Expr, Operator};
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::Vector;
use datatypes::scalars::ScalarVector;
use datatypes::schema::ColumnSchema;
use datatypes::value::Value;
use datatypes::vectors::Int64Vector;
use datatypes::vectors::{Int64Vector, StringVector};
use mito_codec::row_converter::DensePrimaryKeyCodec;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
@@ -789,4 +792,148 @@ mod tests {
unreachable!()
}
}
fn kv_region_metadata() -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 0,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("k", ConcreteDataType::string_datatype(), false),
semantic_type: SemanticType::Tag,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("v", ConcreteDataType::string_datatype(), false),
semantic_type: SemanticType::Field,
column_id: 2,
})
.primary_key(vec![1]);
let region_metadata = builder.build().unwrap();
Arc::new(region_metadata)
}
fn kv_column_schemas() -> Vec<api::v1::ColumnSchema> {
vec![
api::v1::ColumnSchema {
column_name: "ts".to_string(),
datatype: api::v1::ColumnDataType::TimestampMillisecond as i32,
semantic_type: SemanticType::Timestamp as i32,
..Default::default()
},
api::v1::ColumnSchema {
column_name: "k".to_string(),
datatype: api::v1::ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as i32,
..Default::default()
},
api::v1::ColumnSchema {
column_name: "v".to_string(),
datatype: api::v1::ColumnDataType::String as i32,
semantic_type: SemanticType::Field as i32,
..Default::default()
},
]
}
fn key_values<T: AsRef<str>>(
metadata: &RegionMetadataRef,
keys: impl Iterator<Item = T>,
) -> KeyValues {
let rows = keys
.map(|c| Row {
values: vec![
api::v1::Value {
value_data: Some(ValueData::TimestampMillisecondValue(0)),
},
api::v1::Value {
value_data: Some(ValueData::StringValue(c.as_ref().to_string())),
},
api::v1::Value {
value_data: Some(ValueData::StringValue(c.as_ref().to_string())),
},
],
})
.collect();
let mutation = Mutation {
op_type: OpType::Put as i32,
sequence: 0,
rows: Some(Rows {
schema: kv_column_schemas(),
rows,
}),
write_hint: None,
};
KeyValues::new(metadata, mutation).unwrap()
}
fn collect_kvs(
iter: BoxedBatchIterator,
region_meta: &RegionMetadataRef,
) -> HashMap<String, String> {
let decoder = DensePrimaryKeyCodec::new(region_meta);
let mut res = HashMap::new();
for v in iter {
let batch = v.unwrap();
let values = decoder.decode(batch.primary_key()).unwrap().into_dense();
let field_vector = batch.fields()[0]
.data
.as_any()
.downcast_ref::<StringVector>()
.unwrap();
for row in 0..batch.num_rows() {
res.insert(
values[0].as_string().unwrap(),
field_vector.get(row).as_string().unwrap(),
);
}
}
res
}
#[test]
fn test_reorder_insert_key_values() {
let metadata = kv_region_metadata();
let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None)
.build(1, &metadata);
memtable
.write(&key_values(&metadata, ('a'..'h').map(|c| c.to_string())))
.unwrap();
memtable.freeze().unwrap();
assert_eq!(
collect_kvs(memtable.iter(None, None, None).unwrap(), &metadata),
('a'..'h').map(|c| (c.to_string(), c.to_string())).collect()
);
let forked = memtable.fork(2, &metadata);
let keys = ["c", "f", "i", "h", "b", "e", "g"];
forked.write(&key_values(&metadata, keys.iter())).unwrap();
forked.freeze().unwrap();
assert_eq!(
collect_kvs(forked.iter(None, None, None).unwrap(), &metadata),
keys.iter()
.map(|c| (c.to_string(), c.to_string()))
.collect()
);
let forked2 = forked.fork(3, &metadata);
let keys = ["g", "e", "a", "f", "b", "c", "h"];
forked2.write(&key_values(&metadata, keys.iter())).unwrap();
let kvs = collect_kvs(forked2.iter(None, None, None).unwrap(), &metadata);
let expected = keys
.iter()
.map(|c| (c.to_string(), c.to_string()))
.collect::<HashMap<_, _>>();
assert_eq!(kvs, expected);
}
}

View File

@@ -133,7 +133,7 @@ impl KeyDictBuilder {
// Computes key position and then alter pk index.
let mut key_positions = vec![0; self.pk_to_index.len()];
for (i, (_full_pk, (pk_index, sparse_key))) in (std::mem::take(&mut self.pk_to_index))
for (i, (full_pk, (pk_index, sparse_key))) in (std::mem::take(&mut self.pk_to_index))
.into_iter()
.enumerate()
{
@@ -142,6 +142,7 @@ impl KeyDictBuilder {
if let Some(sparse_key) = sparse_key {
pk_to_index_map.insert(sparse_key, i as PkIndex);
}
pk_to_index_map.insert(full_pk, i as PkIndex);
}
self.num_keys = 0;
@@ -472,4 +473,21 @@ mod tests {
assert!(!builder.is_full());
assert_eq!(0, builder.insert_key(b"a0", None, &mut metrics));
}
#[test]
fn test_builder_finish_with_sparse_key() {
let mut builder = KeyDictBuilder::new((MAX_KEYS_PER_BLOCK * 2).into());
let mut metrics = WriteMetrics::default();
let full_key = "42".to_string();
let sparse_key = &[42u8];
builder.insert_key(full_key.as_bytes(), Some(sparse_key), &mut metrics);
let (dict, pk_to_pk_id) = builder.finish().unwrap();
assert_eq!(dict.key_positions.len(), 1);
assert_eq!(dict.dict_blocks.len(), 1);
assert_eq!(
pk_to_pk_id.get(sparse_key.as_slice()),
pk_to_pk_id.get(full_key.as_bytes())
);
}
}