poc-write-path: Enhance Memtable Handling with Primary Key Encoding

• Introduced PrimaryKeyEncoding to differentiate between dense and sparse primary key encodings.
 • Updated BulkMemtableBuilder to conditionally create memtables based on primary key encoding.
 • Integrated PartitionTreeMemtableBuilder as a fallback for dense encodings.
 • Modified RegionWriteCtx to handle mutations differently based on primary key encoding.
 • Adjusted RegionWorkerLoop to skip bulk encoding for dense primary key mutations.
 • Refactored SparseEncoder to support conditional compilation for testing purposes.
This commit is contained in:
Lei, HUANG
2025-02-10 14:38:11 +08:00
parent 3cb2343f7f
commit bec8245e75
6 changed files with 95 additions and 48 deletions

View File

@@ -17,6 +17,7 @@
use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
use table::predicate::Predicate;
@@ -25,6 +26,7 @@ use crate::error::Result;
use crate::flush::WriteBufferManagerRef;
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
use crate::memtable::{
AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId,
MemtableRanges, MemtableRef, MemtableStats,
@@ -44,17 +46,23 @@ pub struct BulkMemtableBuilder {
write_buffer_manager: Option<WriteBufferManagerRef>,
dedup: bool,
merge_mode: MergeMode,
fallback_builder: PartitionTreeMemtableBuilder,
}
impl MemtableBuilder for BulkMemtableBuilder {
fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
Arc::new(BulkMemtable::new(
metadata.clone(),
id,
self.write_buffer_manager.clone(),
self.dedup,
self.merge_mode,
))
//todo(hl): create different memtables according to region type (metadata/physical)
if metadata.primary_key_encoding == PrimaryKeyEncoding::Dense {
self.fallback_builder.build(id, metadata)
} else {
Arc::new(BulkMemtable::new(
metadata.clone(),
id,
self.write_buffer_manager.clone(),
self.dedup,
self.merge_mode,
)) as MemtableRef
}
}
}
@@ -64,10 +72,16 @@ impl BulkMemtableBuilder {
dedup: bool,
merge_mode: MergeMode,
) -> Self {
let builder = PartitionTreeMemtableBuilder::new(
PartitionTreeConfig::default(),
write_buffer_manager.clone(),
);
Self {
write_buffer_manager,
dedup,
merge_mode,
fallback_builder: builder,
}
}
}

View File

@@ -19,6 +19,7 @@ use std::sync::Arc;
use api::v1::Mutation;
use bytes::Bytes;
use common_telemetry::error;
use common_time::timestamp::TimeUnit;
use datafusion::arrow::array::{TimestampNanosecondArray, UInt64Builder};
use datatypes::arrow;
@@ -32,13 +33,13 @@ use datatypes::arrow::datatypes::SchemaRef;
use datatypes::arrow_array::BinaryArray;
use datatypes::data_type::DataType;
use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
use datatypes::value::ValueRef;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use parquet::arrow::ArrowWriter;
use parquet::data_type::AsBytes;
use parquet::file::metadata::ParquetMetaData;
use parquet::file::properties::WriterProperties;
use snafu::ResultExt;
use datatypes::value::ValueRef;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
use table::predicate::Predicate;
@@ -47,10 +48,10 @@ use crate::error;
use crate::error::{ComputeArrowSnafu, EncodeMemtableSnafu, NewRecordBatchSnafu, Result};
use crate::memtable::bulk::context::BulkIterContextRef;
use crate::memtable::bulk::part_reader::BulkPartIter;
use crate::memtable::encoder::{FieldWithId, SparseEncoder};
use crate::memtable::key_values::KeyValuesRef;
use crate::memtable::BoxedBatchIterator;
use crate::memtable::encoder::{FieldWithId, SparseEncoder};
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, SortField, SparsePrimaryKeyCodec};
use crate::row_converter::{PrimaryKeyCodec, PrimaryKeyCodecExt};
use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
use crate::sst::parquet::helper::parse_parquet_metadata;
use crate::sst::to_sst_arrow_schema;
@@ -215,10 +216,18 @@ fn mutations_to_record_batch(
for row in key_values.iter() {
assert_eq!(1, row.num_primary_keys());
assert_eq!(1, row.num_fields());
let ValueRef::Binary(encoded_primary_keys) = row.primary_keys().next().unwrap()else{
unreachable!("Primary key must be encoded binary type");
let first_primary_key_col = row.primary_keys().next().unwrap();
let bytes = match first_primary_key_col {
ValueRef::Binary(b) => b,
_ => {
unreachable!(
"Primary key must be encoded binary type, found: {:?}",
first_primary_key_col
);
}
};
pk_builder.append_value(encoded_primary_keys);
pk_builder.append_value(bytes);
ts_vector.push_value_ref(row.timestamp());
sequence_builder.append_value(row.sequence());
op_type_builder.append_value(row.op_type() as u8);

View File

@@ -14,17 +14,15 @@
//! Sparse primary key encoder;
use std::collections::HashMap;
use datatypes::prelude::ValueRef;
use memcomparable::{Deserializer, Serializer};
use serde::{Deserialize, Serialize};
use memcomparable::Serializer;
use serde::Serialize;
use snafu::ResultExt;
use datatypes::value::Value;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use crate::error;
use crate::error::{DeserializeFieldSnafu, SerializeFieldSnafu};
use crate::row_converter::{SortField, SparseValues};
use crate::error::SerializeFieldSnafu;
use crate::row_converter::SortField;
pub(crate) struct FieldWithId {
pub(crate) field: SortField,
@@ -33,15 +31,17 @@ pub(crate) struct FieldWithId {
pub(crate) struct SparseEncoder {
pub(crate) columns: Vec<FieldWithId>,
pub(crate) column_id_to_field: HashMap<ColumnId, (SortField,usize)>,
#[cfg(test)]
pub(crate) column_id_to_field: std::collections::HashMap<ColumnId, (SortField, usize)>,
}
impl SparseEncoder {
pub(crate) fn new(metadata: &RegionMetadataRef) -> Self {
let mut columns = Vec::with_capacity(metadata.primary_key.len());
let mut column_id_to_field = HashMap::with_capacity(metadata.primary_key.len());
for (idx, c) in metadata
.primary_key_columns().enumerate() {
#[cfg(test)]
let mut column_id_to_field =
std::collections::HashMap::with_capacity(metadata.primary_key.len());
for (_idx, c) in metadata.primary_key_columns().enumerate() {
let sort_field = SortField::new(c.column_schema.data_type.clone());
let field = FieldWithId {
@@ -49,17 +49,19 @@ impl SparseEncoder {
column_id: c.column_id,
};
columns.push(field);
column_id_to_field.insert(c.column_id, (sort_field, idx));
#[cfg(test)]
column_id_to_field.insert(c.column_id, (sort_field, _idx));
}
Self {
columns,
#[cfg(test)]
column_id_to_field,
}
}
pub fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> crate::error::Result<()>
where
I: Iterator<Item=ValueRef<'a>>,
I: Iterator<Item = ValueRef<'a>>,
{
let mut serializer = Serializer::new(buffer);
for (value, field) in row.zip(self.columns.iter()) {
@@ -74,12 +76,15 @@ impl SparseEncoder {
Ok(())
}
pub fn decode(&self, bytes: &[u8]) -> error::Result<Vec<Value>> {
let mut deserializer = Deserializer::new(bytes);
let mut values = vec![Value::Null; self.columns.len()];
#[cfg(test)]
pub fn decode(&self, bytes: &[u8]) -> crate::error::Result<Vec<datatypes::value::Value>> {
use serde::Deserialize;
let mut deserializer = memcomparable::Deserializer::new(bytes);
let mut values = vec![datatypes::value::Value::Null; self.columns.len()];
while deserializer.has_remaining() {
let column_id = u32::deserialize(&mut deserializer).context(DeserializeFieldSnafu)?;
let column_id =
u32::deserialize(&mut deserializer).context(crate::error::DeserializeFieldSnafu)?;
let (field, idx) = self.column_id_to_field.get(&column_id).unwrap();
let value = field.deserialize(&mut deserializer)?;
values[*idx] = value;

View File

@@ -19,8 +19,8 @@ use datatypes::prelude::ConcreteDataType;
use datatypes::value::ValueRef;
use memcomparable::Deserializer;
use store_api::codec::{infer_primary_key_encoding_from_hint, PrimaryKeyEncoding};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::{ColumnId, SequenceNumber};
use store_api::metadata::RegionMetadata;
use store_api::storage::SequenceNumber;
use crate::row_converter::{SortField, COLUMN_ID_ENCODE_SIZE};

View File

@@ -18,13 +18,14 @@ use std::sync::Arc;
use api::v1::{Mutation, OpType, Rows, WalEntry, WriteHint};
use futures::future::try_join_all;
use snafu::ResultExt;
use store_api::codec::PrimaryKeyEncoding;
use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore;
use store_api::storage::{RegionId, SequenceNumber};
use crate::error::{Error, JoinSnafu, Result, WriteGroupSnafu};
use crate::memtable::bulk::part::BulkPartEncoder;
use crate::memtable::BulkPart;
use crate::memtable::{BulkPart, KeyValues};
use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
use crate::request::OptionOutputTx;
use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
@@ -213,22 +214,36 @@ impl RegionWriteCtx {
}
let mutable = &self.version.memtables.mutable;
// Takes mutations from the wal entry.
let bulk_parts = mem::take(&mut self.bulk_parts);
for (bulk_part, notify) in bulk_parts.into_iter().zip(&mut self.notifiers) {
// Write mutation to the memtable.
let Some(bulk_part) = bulk_part else {
continue;
};
if let Err(e) = mutable.write_bulk(bulk_part) {
notify.err = Some(Arc::new(e));
if self.version().metadata.primary_key_encoding == PrimaryKeyEncoding::Dense {
let mutations = mem::take(&mut self.wal_entry.mutations);
for (mutation, notify) in mutations.into_iter().zip(&mut self.notifiers) {
// Write mutation to the memtable.
let Some(kvs) = KeyValues::new(&self.version.metadata, mutation) else {
continue;
};
if let Err(e) = mutable.write(&kvs) {
notify.err = Some(Arc::new(e));
}
}
} else {
// Takes mutations from the wal entry.
let bulk_parts = mem::take(&mut self.bulk_parts);
for (bulk_part, notify) in bulk_parts.into_iter().zip(&mut self.notifiers) {
// Write mutation to the memtable.
let Some(bulk_part) = bulk_part else {
continue;
};
if let Err(e) = mutable.write_bulk(bulk_part) {
notify.err = Some(Arc::new(e));
}
// let Some(kvs) = KeyValues::new(&self.version.metadata, mutation) else {
// continue;
// };
// if let Err(e) = mutable.write(&kvs) {
// notify.err = Some(Arc::new(e));
// }
}
// let Some(kvs) = KeyValues::new(&self.version.metadata, mutation) else {
// continue;
// };
// if let Err(e) = mutable.write(&kvs) {
// notify.err = Some(Arc::new(e));
// }
}
// Updates region sequence and entry id. Since we stores last sequence and entry id in region, we need

View File

@@ -75,6 +75,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.with_label_values(&["encode_bulk"])
.start_timer();
for region_ctx in region_ctxs.values_mut() {
// Avoid encoding to bulk part when mutations are dense encoded.
if region_ctx.version().metadata.primary_key_encoding == PrimaryKeyEncoding::Dense {
continue;
}
// TODO(yingwen): We don't do region level parallelism as we only test
// one region now.
if let Err(e) = region_ctx.encode_bulks().await.map_err(Arc::new) {