From bec8245e75c043bf44c049a3cafcab3ab5db3a67 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Mon, 10 Feb 2025 14:38:11 +0800 Subject: [PATCH] poc-write-path: Enhance Memtable Handling with Primary Key Encoding MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Introduced PrimaryKeyEncoding to differentiate between dense and sparse primary key encodings. • Updated BulkMemtableBuilder to conditionally create memtables based on primary key encoding. • Integrated PartitionTreeMemtableBuilder as a fallback for dense encodings. • Modified RegionWriteCtx to handle mutations differently based on primary key encoding. • Adjusted RegionWorkerLoop to skip bulk encoding for dense primary key mutations. • Refactored SparseEncoder to support conditional compilation for testing purposes. --- src/mito2/src/memtable/bulk.rs | 28 ++++++++++++----- src/mito2/src/memtable/bulk/part.rs | 21 +++++++++---- src/mito2/src/memtable/encoder.rs | 39 +++++++++++++---------- src/mito2/src/memtable/key_values.rs | 4 +-- src/mito2/src/region_write_ctx.rs | 47 ++++++++++++++++++---------- src/mito2/src/worker/handle_write.rs | 4 +++ 6 files changed, 95 insertions(+), 48 deletions(-) diff --git a/src/mito2/src/memtable/bulk.rs b/src/mito2/src/memtable/bulk.rs index 971bf44511..ac0d739714 100644 --- a/src/mito2/src/memtable/bulk.rs +++ b/src/mito2/src/memtable/bulk.rs @@ -17,6 +17,7 @@ use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; +use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, SequenceNumber}; use table::predicate::Predicate; @@ -25,6 +26,7 @@ use crate::error::Result; use crate::flush::WriteBufferManagerRef; use crate::memtable::bulk::part::BulkPart; use crate::memtable::key_values::KeyValue; +use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder}; use crate::memtable::{ AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRanges, MemtableRef, MemtableStats, @@ -44,17 +46,23 @@ pub struct BulkMemtableBuilder { write_buffer_manager: Option, dedup: bool, merge_mode: MergeMode, + fallback_builder: PartitionTreeMemtableBuilder, } impl MemtableBuilder for BulkMemtableBuilder { fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef { - Arc::new(BulkMemtable::new( - metadata.clone(), - id, - self.write_buffer_manager.clone(), - self.dedup, - self.merge_mode, - )) + //todo(hl): create different memtables according to region type (metadata/physical) + if metadata.primary_key_encoding == PrimaryKeyEncoding::Dense { + self.fallback_builder.build(id, metadata) + } else { + Arc::new(BulkMemtable::new( + metadata.clone(), + id, + self.write_buffer_manager.clone(), + self.dedup, + self.merge_mode, + )) as MemtableRef + } } } @@ -64,10 +72,16 @@ impl BulkMemtableBuilder { dedup: bool, merge_mode: MergeMode, ) -> Self { + let builder = PartitionTreeMemtableBuilder::new( + PartitionTreeConfig::default(), + write_buffer_manager.clone(), + ); + Self { write_buffer_manager, dedup, merge_mode, + fallback_builder: builder, } } } diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index 88cdabd48f..a3bea1abfa 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use api::v1::Mutation; use bytes::Bytes; +use common_telemetry::error; use common_time::timestamp::TimeUnit; use datafusion::arrow::array::{TimestampNanosecondArray, UInt64Builder}; use datatypes::arrow; @@ -32,13 +33,13 @@ use datatypes::arrow::datatypes::SchemaRef; use datatypes::arrow_array::BinaryArray; use datatypes::data_type::DataType; use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector}; +use datatypes::value::ValueRef; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use parquet::arrow::ArrowWriter; use parquet::data_type::AsBytes; use parquet::file::metadata::ParquetMetaData; use parquet::file::properties::WriterProperties; use snafu::ResultExt; -use datatypes::value::ValueRef; use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, SequenceNumber}; use table::predicate::Predicate; @@ -47,10 +48,10 @@ use crate::error; use crate::error::{ComputeArrowSnafu, EncodeMemtableSnafu, NewRecordBatchSnafu, Result}; use crate::memtable::bulk::context::BulkIterContextRef; use crate::memtable::bulk::part_reader::BulkPartIter; +use crate::memtable::encoder::{FieldWithId, SparseEncoder}; use crate::memtable::key_values::KeyValuesRef; use crate::memtable::BoxedBatchIterator; -use crate::memtable::encoder::{FieldWithId, SparseEncoder}; -use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, SortField, SparsePrimaryKeyCodec}; +use crate::row_converter::{PrimaryKeyCodec, PrimaryKeyCodecExt}; use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat}; use crate::sst::parquet::helper::parse_parquet_metadata; use crate::sst::to_sst_arrow_schema; @@ -215,10 +216,18 @@ fn mutations_to_record_batch( for row in key_values.iter() { assert_eq!(1, row.num_primary_keys()); assert_eq!(1, row.num_fields()); - let ValueRef::Binary(encoded_primary_keys) = row.primary_keys().next().unwrap()else{ - unreachable!("Primary key must be encoded binary type"); + let first_primary_key_col = row.primary_keys().next().unwrap(); + + let bytes = match first_primary_key_col { + ValueRef::Binary(b) => b, + _ => { + unreachable!( + "Primary key must be encoded binary type, found: {:?}", + first_primary_key_col + ); + } }; - pk_builder.append_value(encoded_primary_keys); + pk_builder.append_value(bytes); ts_vector.push_value_ref(row.timestamp()); sequence_builder.append_value(row.sequence()); op_type_builder.append_value(row.op_type() as u8); diff --git a/src/mito2/src/memtable/encoder.rs b/src/mito2/src/memtable/encoder.rs index 99597ff27c..407485d85b 100644 --- a/src/mito2/src/memtable/encoder.rs +++ b/src/mito2/src/memtable/encoder.rs @@ -14,17 +14,15 @@ //! Sparse primary key encoder; -use std::collections::HashMap; use datatypes::prelude::ValueRef; -use memcomparable::{Deserializer, Serializer}; -use serde::{Deserialize, Serialize}; +use memcomparable::Serializer; +use serde::Serialize; use snafu::ResultExt; -use datatypes::value::Value; use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; -use crate::error; -use crate::error::{DeserializeFieldSnafu, SerializeFieldSnafu}; -use crate::row_converter::{SortField, SparseValues}; + +use crate::error::SerializeFieldSnafu; +use crate::row_converter::SortField; pub(crate) struct FieldWithId { pub(crate) field: SortField, @@ -33,15 +31,17 @@ pub(crate) struct FieldWithId { pub(crate) struct SparseEncoder { pub(crate) columns: Vec, - pub(crate) column_id_to_field: HashMap, + #[cfg(test)] + pub(crate) column_id_to_field: std::collections::HashMap, } impl SparseEncoder { pub(crate) fn new(metadata: &RegionMetadataRef) -> Self { let mut columns = Vec::with_capacity(metadata.primary_key.len()); - let mut column_id_to_field = HashMap::with_capacity(metadata.primary_key.len()); - for (idx, c) in metadata - .primary_key_columns().enumerate() { + #[cfg(test)] + let mut column_id_to_field = + std::collections::HashMap::with_capacity(metadata.primary_key.len()); + for (_idx, c) in metadata.primary_key_columns().enumerate() { let sort_field = SortField::new(c.column_schema.data_type.clone()); let field = FieldWithId { @@ -49,17 +49,19 @@ impl SparseEncoder { column_id: c.column_id, }; columns.push(field); - column_id_to_field.insert(c.column_id, (sort_field, idx)); + #[cfg(test)] + column_id_to_field.insert(c.column_id, (sort_field, _idx)); } Self { columns, + #[cfg(test)] column_id_to_field, } } pub fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec) -> crate::error::Result<()> where - I: Iterator>, + I: Iterator>, { let mut serializer = Serializer::new(buffer); for (value, field) in row.zip(self.columns.iter()) { @@ -74,12 +76,15 @@ impl SparseEncoder { Ok(()) } - pub fn decode(&self, bytes: &[u8]) -> error::Result> { - let mut deserializer = Deserializer::new(bytes); - let mut values = vec![Value::Null; self.columns.len()]; + #[cfg(test)] + pub fn decode(&self, bytes: &[u8]) -> crate::error::Result> { + use serde::Deserialize; + let mut deserializer = memcomparable::Deserializer::new(bytes); + let mut values = vec![datatypes::value::Value::Null; self.columns.len()]; while deserializer.has_remaining() { - let column_id = u32::deserialize(&mut deserializer).context(DeserializeFieldSnafu)?; + let column_id = + u32::deserialize(&mut deserializer).context(crate::error::DeserializeFieldSnafu)?; let (field, idx) = self.column_id_to_field.get(&column_id).unwrap(); let value = field.deserialize(&mut deserializer)?; values[*idx] = value; diff --git a/src/mito2/src/memtable/key_values.rs b/src/mito2/src/memtable/key_values.rs index a5b2ab6544..89551f6264 100644 --- a/src/mito2/src/memtable/key_values.rs +++ b/src/mito2/src/memtable/key_values.rs @@ -19,8 +19,8 @@ use datatypes::prelude::ConcreteDataType; use datatypes::value::ValueRef; use memcomparable::Deserializer; use store_api::codec::{infer_primary_key_encoding_from_hint, PrimaryKeyEncoding}; -use store_api::metadata::{RegionMetadata, RegionMetadataRef}; -use store_api::storage::{ColumnId, SequenceNumber}; +use store_api::metadata::RegionMetadata; +use store_api::storage::SequenceNumber; use crate::row_converter::{SortField, COLUMN_ID_ENCODE_SIZE}; diff --git a/src/mito2/src/region_write_ctx.rs b/src/mito2/src/region_write_ctx.rs index 316a021806..eafe994e0c 100644 --- a/src/mito2/src/region_write_ctx.rs +++ b/src/mito2/src/region_write_ctx.rs @@ -18,13 +18,14 @@ use std::sync::Arc; use api::v1::{Mutation, OpType, Rows, WalEntry, WriteHint}; use futures::future::try_join_all; use snafu::ResultExt; +use store_api::codec::PrimaryKeyEncoding; use store_api::logstore::provider::Provider; use store_api::logstore::LogStore; use store_api::storage::{RegionId, SequenceNumber}; use crate::error::{Error, JoinSnafu, Result, WriteGroupSnafu}; use crate::memtable::bulk::part::BulkPartEncoder; -use crate::memtable::BulkPart; +use crate::memtable::{BulkPart, KeyValues}; use crate::region::version::{VersionControlData, VersionControlRef, VersionRef}; use crate::request::OptionOutputTx; use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE; @@ -213,22 +214,36 @@ impl RegionWriteCtx { } let mutable = &self.version.memtables.mutable; - // Takes mutations from the wal entry. - let bulk_parts = mem::take(&mut self.bulk_parts); - for (bulk_part, notify) in bulk_parts.into_iter().zip(&mut self.notifiers) { - // Write mutation to the memtable. - let Some(bulk_part) = bulk_part else { - continue; - }; - if let Err(e) = mutable.write_bulk(bulk_part) { - notify.err = Some(Arc::new(e)); + + if self.version().metadata.primary_key_encoding == PrimaryKeyEncoding::Dense { + let mutations = mem::take(&mut self.wal_entry.mutations); + for (mutation, notify) in mutations.into_iter().zip(&mut self.notifiers) { + // Write mutation to the memtable. + let Some(kvs) = KeyValues::new(&self.version.metadata, mutation) else { + continue; + }; + if let Err(e) = mutable.write(&kvs) { + notify.err = Some(Arc::new(e)); + } + } + } else { + // Takes mutations from the wal entry. + let bulk_parts = mem::take(&mut self.bulk_parts); + for (bulk_part, notify) in bulk_parts.into_iter().zip(&mut self.notifiers) { + // Write mutation to the memtable. + let Some(bulk_part) = bulk_part else { + continue; + }; + if let Err(e) = mutable.write_bulk(bulk_part) { + notify.err = Some(Arc::new(e)); + } + // let Some(kvs) = KeyValues::new(&self.version.metadata, mutation) else { + // continue; + // }; + // if let Err(e) = mutable.write(&kvs) { + // notify.err = Some(Arc::new(e)); + // } } - // let Some(kvs) = KeyValues::new(&self.version.metadata, mutation) else { - // continue; - // }; - // if let Err(e) = mutable.write(&kvs) { - // notify.err = Some(Arc::new(e)); - // } } // Updates region sequence and entry id. Since we stores last sequence and entry id in region, we need diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 7ac9cd1076..f5b2080202 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -75,6 +75,10 @@ impl RegionWorkerLoop { .with_label_values(&["encode_bulk"]) .start_timer(); for region_ctx in region_ctxs.values_mut() { + // Avoid encoding to bulk part when mutations are dense encoded. + if region_ctx.version().metadata.primary_key_encoding == PrimaryKeyEncoding::Dense { + continue; + } // TODO(yingwen): We don't do region level parallelism as we only test // one region now. if let Err(e) = region_ctx.encode_bulks().await.map_err(Arc::new) {