From 6ece560f8cd67e62796c517ed96b766658fa94b8 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Thu, 19 Jun 2025 14:32:40 +0800 Subject: [PATCH] fix: reordered write cause incorrect kv (#6345) * fix/reordered-write-cause-incorrect-kv: - **Enhance Testing in `partition_tree.rs`**: Added comprehensive test functions such as `kv_region_metadata`, `key_values`, and `collect_kvs` to improve the robustness of key-value operations and ensure correct behavior of the `PartitionTreeMemtable`. - **Improve Key Handling in `dict.rs`**: Modified `KeyDictBuilder` to handle both full and sparse keys, ensuring correct mapping and insertion. Added a new test `test_builder_finish_with_sparse_key` to validate the handling of sparse keys. Signed-off-by: Lei, HUANG * fix/reordered-write-cause-incorrect-kv: ### Refactor `partition_tree.rs` for Improved Key Handling - **Refactored Key Handling**: Simplified the `key_values` function to accept an iterator of keys, removing hardcoded key-value pairs. This change enhances flexibility and reduces redundancy in key management. - **Updated Test Cases**: Modified test cases to use the new `key_values` function signature, ensuring they iterate over keys dynamically rather than relying on predefined lists. Files affected: - `src/mito2/src/memtable/partition_tree.rs` Signed-off-by: Lei, HUANG * fix/reordered-write-cause-incorrect-kv: Enhance Testing in `partition_tree.rs` - Added assertions to verify key-value collection after `memtable` and `forked` operations. - Refactored key-value writing logic for clarity in `forked` operations. Signed-off-by: Lei, HUANG --------- Signed-off-by: Lei, HUANG --- src/mito2/src/memtable/partition_tree.rs | 151 +++++++++++++++++- src/mito2/src/memtable/partition_tree/dict.rs | 20 ++- 2 files changed, 168 insertions(+), 3 deletions(-) diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index 8307f84980..807c632dca 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -359,16 +359,19 @@ impl IterBuilder for PartitionTreeIterBuilder { #[cfg(test)] mod tests { + use std::collections::HashMap; + use api::v1::value::ValueData; - use api::v1::{Row, Rows, SemanticType}; + use api::v1::{Mutation, OpType, Row, Rows, SemanticType}; use common_time::Timestamp; use datafusion_common::{Column, ScalarValue}; use datafusion_expr::{BinaryExpr, Expr, Operator}; use datatypes::data_type::ConcreteDataType; + use datatypes::prelude::Vector; use datatypes::scalars::ScalarVector; use datatypes::schema::ColumnSchema; use datatypes::value::Value; - use datatypes::vectors::Int64Vector; + use datatypes::vectors::{Int64Vector, StringVector}; use mito_codec::row_converter::DensePrimaryKeyCodec; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; use store_api::storage::RegionId; @@ -789,4 +792,148 @@ mod tests { unreachable!() } } + + fn kv_region_metadata() -> RegionMetadataRef { + let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 0, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("k", ConcreteDataType::string_datatype(), false), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("v", ConcreteDataType::string_datatype(), false), + semantic_type: SemanticType::Field, + column_id: 2, + }) + .primary_key(vec![1]); + let region_metadata = builder.build().unwrap(); + Arc::new(region_metadata) + } + + fn kv_column_schemas() -> Vec { + vec![ + api::v1::ColumnSchema { + column_name: "ts".to_string(), + datatype: api::v1::ColumnDataType::TimestampMillisecond as i32, + semantic_type: SemanticType::Timestamp as i32, + ..Default::default() + }, + api::v1::ColumnSchema { + column_name: "k".to_string(), + datatype: api::v1::ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as i32, + ..Default::default() + }, + api::v1::ColumnSchema { + column_name: "v".to_string(), + datatype: api::v1::ColumnDataType::String as i32, + semantic_type: SemanticType::Field as i32, + ..Default::default() + }, + ] + } + + fn key_values>( + metadata: &RegionMetadataRef, + keys: impl Iterator, + ) -> KeyValues { + let rows = keys + .map(|c| Row { + values: vec![ + api::v1::Value { + value_data: Some(ValueData::TimestampMillisecondValue(0)), + }, + api::v1::Value { + value_data: Some(ValueData::StringValue(c.as_ref().to_string())), + }, + api::v1::Value { + value_data: Some(ValueData::StringValue(c.as_ref().to_string())), + }, + ], + }) + .collect(); + let mutation = Mutation { + op_type: OpType::Put as i32, + sequence: 0, + rows: Some(Rows { + schema: kv_column_schemas(), + rows, + }), + write_hint: None, + }; + KeyValues::new(metadata, mutation).unwrap() + } + + fn collect_kvs( + iter: BoxedBatchIterator, + region_meta: &RegionMetadataRef, + ) -> HashMap { + let decoder = DensePrimaryKeyCodec::new(region_meta); + let mut res = HashMap::new(); + for v in iter { + let batch = v.unwrap(); + let values = decoder.decode(batch.primary_key()).unwrap().into_dense(); + let field_vector = batch.fields()[0] + .data + .as_any() + .downcast_ref::() + .unwrap(); + for row in 0..batch.num_rows() { + res.insert( + values[0].as_string().unwrap(), + field_vector.get(row).as_string().unwrap(), + ); + } + } + res + } + + #[test] + fn test_reorder_insert_key_values() { + let metadata = kv_region_metadata(); + let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None) + .build(1, &metadata); + + memtable + .write(&key_values(&metadata, ('a'..'h').map(|c| c.to_string()))) + .unwrap(); + memtable.freeze().unwrap(); + assert_eq!( + collect_kvs(memtable.iter(None, None, None).unwrap(), &metadata), + ('a'..'h').map(|c| (c.to_string(), c.to_string())).collect() + ); + let forked = memtable.fork(2, &metadata); + + let keys = ["c", "f", "i", "h", "b", "e", "g"]; + forked.write(&key_values(&metadata, keys.iter())).unwrap(); + forked.freeze().unwrap(); + assert_eq!( + collect_kvs(forked.iter(None, None, None).unwrap(), &metadata), + keys.iter() + .map(|c| (c.to_string(), c.to_string())) + .collect() + ); + + let forked2 = forked.fork(3, &metadata); + + let keys = ["g", "e", "a", "f", "b", "c", "h"]; + forked2.write(&key_values(&metadata, keys.iter())).unwrap(); + + let kvs = collect_kvs(forked2.iter(None, None, None).unwrap(), &metadata); + let expected = keys + .iter() + .map(|c| (c.to_string(), c.to_string())) + .collect::>(); + assert_eq!(kvs, expected); + } } diff --git a/src/mito2/src/memtable/partition_tree/dict.rs b/src/mito2/src/memtable/partition_tree/dict.rs index 51100158a9..bdc22a0916 100644 --- a/src/mito2/src/memtable/partition_tree/dict.rs +++ b/src/mito2/src/memtable/partition_tree/dict.rs @@ -133,7 +133,7 @@ impl KeyDictBuilder { // Computes key position and then alter pk index. let mut key_positions = vec![0; self.pk_to_index.len()]; - for (i, (_full_pk, (pk_index, sparse_key))) in (std::mem::take(&mut self.pk_to_index)) + for (i, (full_pk, (pk_index, sparse_key))) in (std::mem::take(&mut self.pk_to_index)) .into_iter() .enumerate() { @@ -142,6 +142,7 @@ impl KeyDictBuilder { if let Some(sparse_key) = sparse_key { pk_to_index_map.insert(sparse_key, i as PkIndex); } + pk_to_index_map.insert(full_pk, i as PkIndex); } self.num_keys = 0; @@ -472,4 +473,21 @@ mod tests { assert!(!builder.is_full()); assert_eq!(0, builder.insert_key(b"a0", None, &mut metrics)); } + + #[test] + fn test_builder_finish_with_sparse_key() { + let mut builder = KeyDictBuilder::new((MAX_KEYS_PER_BLOCK * 2).into()); + let mut metrics = WriteMetrics::default(); + let full_key = "42".to_string(); + let sparse_key = &[42u8]; + + builder.insert_key(full_key.as_bytes(), Some(sparse_key), &mut metrics); + let (dict, pk_to_pk_id) = builder.finish().unwrap(); + assert_eq!(dict.key_positions.len(), 1); + assert_eq!(dict.dict_blocks.len(), 1); + assert_eq!( + pk_to_pk_id.get(sparse_key.as_slice()), + pk_to_pk_id.get(full_key.as_bytes()) + ); + } }