poc-write-path:

### Implement Sparse Primary Key Encoding

 - **Added `SparseEncoder`**: Introduced a new module `encoder.rs` to implement sparse primary key encoding, replacing the previous dense encoding approach.
 - **Updated `BulkPartEncoder`**: Modified `BulkPartEncoder` in `bulk/part.rs` to utilize `SparseEncoder` for encoding primary keys.
 - **Refactored `PartitionTree`**: Updated `partition_tree/tree.rs` to use the new `SparseEncoder` for primary key encoding.
 - **Code Adjustments**: Removed redundant code and adjusted imports in `key_values.rs` and `partition_tree/tree.rs` to align with the new encoding strategy.
This commit is contained in:
Lei, HUANG
2025-02-08 08:10:16 +00:00
parent d10c207371
commit 3cb2343f7f
5 changed files with 113 additions and 57 deletions

View File

@@ -48,6 +48,7 @@ mod stats;
pub mod time_partition;
pub mod time_series;
pub(crate) mod version;
mod encoder;
/// Id for memtables.
///

View File

@@ -38,6 +38,7 @@ use parquet::data_type::AsBytes;
use parquet::file::metadata::ParquetMetaData;
use parquet::file::properties::WriterProperties;
use snafu::ResultExt;
use datatypes::value::ValueRef;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
use table::predicate::Predicate;
@@ -48,7 +49,8 @@ use crate::memtable::bulk::context::BulkIterContextRef;
use crate::memtable::bulk::part_reader::BulkPartIter;
use crate::memtable::key_values::KeyValuesRef;
use crate::memtable::BoxedBatchIterator;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt};
use crate::memtable::encoder::{FieldWithId, SparseEncoder};
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, SortField, SparsePrimaryKeyCodec};
use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
use crate::sst::parquet::helper::parse_parquet_metadata;
use crate::sst::to_sst_arrow_schema;
@@ -108,7 +110,7 @@ pub struct BulkPartMeta {
pub struct BulkPartEncoder {
metadata: RegionMetadataRef,
pk_encoder: DensePrimaryKeyCodec,
pk_encoder: SparseEncoder,
row_group_size: usize,
dedup: bool,
writer_props: Option<WriterProperties>,
@@ -120,7 +122,7 @@ impl BulkPartEncoder {
dedup: bool,
row_group_size: usize,
) -> BulkPartEncoder {
let codec = DensePrimaryKeyCodec::new(&metadata);
let encoder = SparseEncoder::new(&metadata);
let writer_props = Some(
WriterProperties::builder()
.set_write_batch_size(row_group_size)
@@ -129,7 +131,7 @@ impl BulkPartEncoder {
);
Self {
metadata,
pk_encoder: codec,
pk_encoder: encoder,
row_group_size,
dedup,
writer_props,
@@ -141,7 +143,7 @@ impl BulkPartEncoder {
/// Encodes mutations to a [BulkPart], returns true if encoded data has been written to `dest`.
pub(crate) fn encode_mutations(&self, mutations: &[Mutation]) -> Result<Option<BulkPart>> {
let Some((arrow_record_batch, min_ts, max_ts)) =
mutations_to_record_batch(mutations, &self.metadata, &self.pk_encoder, self.dedup)?
mutations_to_record_batch(mutations, &self.metadata, self.dedup)?
else {
return Ok(None);
};
@@ -179,7 +181,6 @@ impl BulkPartEncoder {
fn mutations_to_record_batch(
mutations: &[Mutation],
metadata: &RegionMetadataRef,
pk_encoder: &DensePrimaryKeyCodec,
dedup: bool,
) -> Result<Option<(RecordBatch, i64, i64)>> {
let total_rows: usize = mutations
@@ -206,16 +207,18 @@ fn mutations_to_record_batch(
.map(|f| f.column_schema.data_type.create_mutable_vector(total_rows))
.collect();
let mut pk_buffer = vec![];
for m in mutations {
let Some(key_values) = KeyValuesRef::new(metadata, m) else {
continue;
};
for row in key_values.iter() {
pk_buffer.clear();
pk_encoder.encode_to_vec(row.primary_keys(), &mut pk_buffer)?;
pk_builder.append_value(pk_buffer.as_bytes());
assert_eq!(1, row.num_primary_keys());
assert_eq!(1, row.num_fields());
let ValueRef::Binary(encoded_primary_keys) = row.primary_keys().next().unwrap()else{
unreachable!("Primary key must be encoded binary type");
};
pk_builder.append_value(encoded_primary_keys);
ts_vector.push_value_ref(row.timestamp());
sequence_builder.append_value(row.sequence());
op_type_builder.append_value(row.op_type() as u8);
@@ -543,9 +546,9 @@ mod tests {
.map(|r| r.rows.len())
.sum();
let pk_encoder = DensePrimaryKeyCodec::new(&metadata);
let pk_encoder = SparseEncoder::new(&metadata);
let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup)
let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, dedup)
.unwrap()
.unwrap();
let read_format = ReadFormat::new_with_all_columns(metadata.clone());
@@ -562,7 +565,7 @@ mod tests {
let batch_values = batches
.into_iter()
.map(|b| {
let pk_values = pk_encoder.decode(b.primary_key()).unwrap().into_dense();
let pk_values = pk_encoder.decode(b.primary_key()).unwrap();
let timestamps = b
.timestamps()
.as_any()

View File

@@ -0,0 +1,89 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Sparse primary key encoder;
use std::collections::HashMap;
use datatypes::prelude::ValueRef;
use memcomparable::{Deserializer, Serializer};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use datatypes::value::Value;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use crate::error;
use crate::error::{DeserializeFieldSnafu, SerializeFieldSnafu};
use crate::row_converter::{SortField, SparseValues};
pub(crate) struct FieldWithId {
pub(crate) field: SortField,
pub(crate) column_id: ColumnId,
}
pub(crate) struct SparseEncoder {
pub(crate) columns: Vec<FieldWithId>,
pub(crate) column_id_to_field: HashMap<ColumnId, (SortField,usize)>,
}
impl SparseEncoder {
pub(crate) fn new(metadata: &RegionMetadataRef) -> Self {
let mut columns = Vec::with_capacity(metadata.primary_key.len());
let mut column_id_to_field = HashMap::with_capacity(metadata.primary_key.len());
for (idx, c) in metadata
.primary_key_columns().enumerate() {
let sort_field = SortField::new(c.column_schema.data_type.clone());
let field = FieldWithId {
field: sort_field.clone(),
column_id: c.column_id,
};
columns.push(field);
column_id_to_field.insert(c.column_id, (sort_field, idx));
}
Self {
columns,
column_id_to_field,
}
}
pub fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> crate::error::Result<()>
where
I: Iterator<Item=ValueRef<'a>>,
{
let mut serializer = Serializer::new(buffer);
for (value, field) in row.zip(self.columns.iter()) {
if !value.is_null() {
field
.column_id
.serialize(&mut serializer)
.context(SerializeFieldSnafu)?;
field.field.serialize(&mut serializer, &value)?;
}
}
Ok(())
}
pub fn decode(&self, bytes: &[u8]) -> error::Result<Vec<Value>> {
let mut deserializer = Deserializer::new(bytes);
let mut values = vec![Value::Null; self.columns.len()];
while deserializer.has_remaining() {
let column_id = u32::deserialize(&mut deserializer).context(DeserializeFieldSnafu)?;
let (field, idx) = self.column_id_to_field.get(&column_id).unwrap();
let value = field.deserialize(&mut deserializer)?;
values[*idx] = value;
}
Ok(values)
}
}

View File

@@ -19,8 +19,8 @@ use datatypes::prelude::ConcreteDataType;
use datatypes::value::ValueRef;
use memcomparable::Deserializer;
use store_api::codec::{infer_primary_key_encoding_from_hint, PrimaryKeyEncoding};
use store_api::metadata::RegionMetadata;
use store_api::storage::SequenceNumber;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::{ColumnId, SequenceNumber};
use crate::row_converter::{SortField, COLUMN_ID_ENCODE_SIZE};

View File

@@ -23,16 +23,14 @@ use common_recordbatch::filter::SimpleFilterEvaluator;
use common_time::Timestamp;
use datafusion_common::ScalarValue;
use datatypes::prelude::ValueRef;
use memcomparable::Serializer;
use serde::Serialize;
use snafu::{ensure, ResultExt};
use snafu::{ensure, };
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
use table::predicate::Predicate;
use crate::error::{
EncodeSparsePrimaryKeySnafu, PrimaryKeyLengthMismatchSnafu, Result, SerializeFieldSnafu,
EncodeSparsePrimaryKeySnafu, PrimaryKeyLengthMismatchSnafu, Result,
};
use crate::flush::WriteBufferManagerRef;
use crate::memtable::key_values::KeyValue;
@@ -42,11 +40,12 @@ use crate::memtable::partition_tree::partition::{
use crate::memtable::partition_tree::PartitionTreeConfig;
use crate::memtable::stats::WriteMetrics;
use crate::memtable::{BoxedBatchIterator, KeyValues};
use crate::memtable::encoder::{ SparseEncoder};
use crate::metrics::{PARTITION_TREE_READ_STAGE_ELAPSED, READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
use crate::read::dedup::LastNonNullIter;
use crate::read::Batch;
use crate::region::options::MergeMode;
use crate::row_converter::{PrimaryKeyCodec, SortField};
use crate::row_converter::{PrimaryKeyCodec};
/// The partition tree.
pub struct PartitionTree {
@@ -73,15 +72,7 @@ impl PartitionTree {
config: &PartitionTreeConfig,
write_buffer_manager: Option<WriteBufferManagerRef>,
) -> Self {
let sparse_encoder = SparseEncoder {
fields: metadata
.primary_key_columns()
.map(|c| FieldWithId {
field: SortField::new(c.column_schema.data_type.clone()),
column_id: c.column_id,
})
.collect(),
};
let sparse_encoder = SparseEncoder::new(&metadata);
let is_partitioned = Partition::has_multi_partitions(&metadata);
let mut config = config.clone();
if config.merge_mode == MergeMode::LastNonNull {
@@ -436,34 +427,6 @@ impl PartitionTree {
}
}
struct FieldWithId {
field: SortField,
column_id: ColumnId,
}
struct SparseEncoder {
fields: Vec<FieldWithId>,
}
impl SparseEncoder {
fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
where
I: Iterator<Item = ValueRef<'a>>,
{
let mut serializer = Serializer::new(buffer);
for (value, field) in row.zip(self.fields.iter()) {
if !value.is_null() {
field
.column_id
.serialize(&mut serializer)
.context(SerializeFieldSnafu)?;
field.field.serialize(&mut serializer, &value)?;
}
}
Ok(())
}
}
#[derive(Default)]
struct TreeIterMetrics {
iter_elapsed: Duration,