diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index a223e06178..135ba7b390 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -48,6 +48,7 @@ mod stats; pub mod time_partition; pub mod time_series; pub(crate) mod version; +mod encoder; /// Id for memtables. /// diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index e2fd61144e..88cdabd48f 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -38,6 +38,7 @@ use parquet::data_type::AsBytes; use parquet::file::metadata::ParquetMetaData; use parquet::file::properties::WriterProperties; use snafu::ResultExt; +use datatypes::value::ValueRef; use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, SequenceNumber}; use table::predicate::Predicate; @@ -48,7 +49,8 @@ use crate::memtable::bulk::context::BulkIterContextRef; use crate::memtable::bulk::part_reader::BulkPartIter; use crate::memtable::key_values::KeyValuesRef; use crate::memtable::BoxedBatchIterator; -use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt}; +use crate::memtable::encoder::{FieldWithId, SparseEncoder}; +use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, SortField, SparsePrimaryKeyCodec}; use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat}; use crate::sst::parquet::helper::parse_parquet_metadata; use crate::sst::to_sst_arrow_schema; @@ -108,7 +110,7 @@ pub struct BulkPartMeta { pub struct BulkPartEncoder { metadata: RegionMetadataRef, - pk_encoder: DensePrimaryKeyCodec, + pk_encoder: SparseEncoder, row_group_size: usize, dedup: bool, writer_props: Option, @@ -120,7 +122,7 @@ impl BulkPartEncoder { dedup: bool, row_group_size: usize, ) -> BulkPartEncoder { - let codec = DensePrimaryKeyCodec::new(&metadata); + let encoder = SparseEncoder::new(&metadata); let writer_props = Some( WriterProperties::builder() .set_write_batch_size(row_group_size) @@ -129,7 +131,7 @@ impl BulkPartEncoder { ); Self { metadata, - pk_encoder: codec, + pk_encoder: encoder, row_group_size, dedup, writer_props, @@ -141,7 +143,7 @@ impl BulkPartEncoder { /// Encodes mutations to a [BulkPart], returns true if encoded data has been written to `dest`. pub(crate) fn encode_mutations(&self, mutations: &[Mutation]) -> Result> { let Some((arrow_record_batch, min_ts, max_ts)) = - mutations_to_record_batch(mutations, &self.metadata, &self.pk_encoder, self.dedup)? + mutations_to_record_batch(mutations, &self.metadata, self.dedup)? else { return Ok(None); }; @@ -179,7 +181,6 @@ impl BulkPartEncoder { fn mutations_to_record_batch( mutations: &[Mutation], metadata: &RegionMetadataRef, - pk_encoder: &DensePrimaryKeyCodec, dedup: bool, ) -> Result> { let total_rows: usize = mutations @@ -206,16 +207,18 @@ fn mutations_to_record_batch( .map(|f| f.column_schema.data_type.create_mutable_vector(total_rows)) .collect(); - let mut pk_buffer = vec![]; for m in mutations { let Some(key_values) = KeyValuesRef::new(metadata, m) else { continue; }; for row in key_values.iter() { - pk_buffer.clear(); - pk_encoder.encode_to_vec(row.primary_keys(), &mut pk_buffer)?; - pk_builder.append_value(pk_buffer.as_bytes()); + assert_eq!(1, row.num_primary_keys()); + assert_eq!(1, row.num_fields()); + let ValueRef::Binary(encoded_primary_keys) = row.primary_keys().next().unwrap()else{ + unreachable!("Primary key must be encoded binary type"); + }; + pk_builder.append_value(encoded_primary_keys); ts_vector.push_value_ref(row.timestamp()); sequence_builder.append_value(row.sequence()); op_type_builder.append_value(row.op_type() as u8); @@ -543,9 +546,9 @@ mod tests { .map(|r| r.rows.len()) .sum(); - let pk_encoder = DensePrimaryKeyCodec::new(&metadata); + let pk_encoder = SparseEncoder::new(&metadata); - let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup) + let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, dedup) .unwrap() .unwrap(); let read_format = ReadFormat::new_with_all_columns(metadata.clone()); @@ -562,7 +565,7 @@ mod tests { let batch_values = batches .into_iter() .map(|b| { - let pk_values = pk_encoder.decode(b.primary_key()).unwrap().into_dense(); + let pk_values = pk_encoder.decode(b.primary_key()).unwrap(); let timestamps = b .timestamps() .as_any() diff --git a/src/mito2/src/memtable/encoder.rs b/src/mito2/src/memtable/encoder.rs new file mode 100644 index 0000000000..99597ff27c --- /dev/null +++ b/src/mito2/src/memtable/encoder.rs @@ -0,0 +1,89 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Sparse primary key encoder; + +use std::collections::HashMap; +use datatypes::prelude::ValueRef; +use memcomparable::{Deserializer, Serializer}; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; +use datatypes::value::Value; +use store_api::metadata::RegionMetadataRef; +use store_api::storage::ColumnId; +use crate::error; +use crate::error::{DeserializeFieldSnafu, SerializeFieldSnafu}; +use crate::row_converter::{SortField, SparseValues}; + +pub(crate) struct FieldWithId { + pub(crate) field: SortField, + pub(crate) column_id: ColumnId, +} + +pub(crate) struct SparseEncoder { + pub(crate) columns: Vec, + pub(crate) column_id_to_field: HashMap, +} + +impl SparseEncoder { + pub(crate) fn new(metadata: &RegionMetadataRef) -> Self { + let mut columns = Vec::with_capacity(metadata.primary_key.len()); + let mut column_id_to_field = HashMap::with_capacity(metadata.primary_key.len()); + for (idx, c) in metadata + .primary_key_columns().enumerate() { + let sort_field = SortField::new(c.column_schema.data_type.clone()); + + let field = FieldWithId { + field: sort_field.clone(), + column_id: c.column_id, + }; + columns.push(field); + column_id_to_field.insert(c.column_id, (sort_field, idx)); + } + Self { + columns, + column_id_to_field, + } + } + + pub fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec) -> crate::error::Result<()> + where + I: Iterator>, + { + let mut serializer = Serializer::new(buffer); + for (value, field) in row.zip(self.columns.iter()) { + if !value.is_null() { + field + .column_id + .serialize(&mut serializer) + .context(SerializeFieldSnafu)?; + field.field.serialize(&mut serializer, &value)?; + } + } + Ok(()) + } + + pub fn decode(&self, bytes: &[u8]) -> error::Result> { + let mut deserializer = Deserializer::new(bytes); + let mut values = vec![Value::Null; self.columns.len()]; + + while deserializer.has_remaining() { + let column_id = u32::deserialize(&mut deserializer).context(DeserializeFieldSnafu)?; + let (field, idx) = self.column_id_to_field.get(&column_id).unwrap(); + let value = field.deserialize(&mut deserializer)?; + values[*idx] = value; + } + Ok(values) + } +} diff --git a/src/mito2/src/memtable/key_values.rs b/src/mito2/src/memtable/key_values.rs index 89551f6264..a5b2ab6544 100644 --- a/src/mito2/src/memtable/key_values.rs +++ b/src/mito2/src/memtable/key_values.rs @@ -19,8 +19,8 @@ use datatypes::prelude::ConcreteDataType; use datatypes::value::ValueRef; use memcomparable::Deserializer; use store_api::codec::{infer_primary_key_encoding_from_hint, PrimaryKeyEncoding}; -use store_api::metadata::RegionMetadata; -use store_api::storage::SequenceNumber; +use store_api::metadata::{RegionMetadata, RegionMetadataRef}; +use store_api::storage::{ColumnId, SequenceNumber}; use crate::row_converter::{SortField, COLUMN_ID_ENCODE_SIZE}; diff --git a/src/mito2/src/memtable/partition_tree/tree.rs b/src/mito2/src/memtable/partition_tree/tree.rs index df50c8934d..0ba6b79def 100644 --- a/src/mito2/src/memtable/partition_tree/tree.rs +++ b/src/mito2/src/memtable/partition_tree/tree.rs @@ -23,16 +23,14 @@ use common_recordbatch::filter::SimpleFilterEvaluator; use common_time::Timestamp; use datafusion_common::ScalarValue; use datatypes::prelude::ValueRef; -use memcomparable::Serializer; -use serde::Serialize; -use snafu::{ensure, ResultExt}; +use snafu::{ensure, }; use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, SequenceNumber}; use table::predicate::Predicate; use crate::error::{ - EncodeSparsePrimaryKeySnafu, PrimaryKeyLengthMismatchSnafu, Result, SerializeFieldSnafu, + EncodeSparsePrimaryKeySnafu, PrimaryKeyLengthMismatchSnafu, Result, }; use crate::flush::WriteBufferManagerRef; use crate::memtable::key_values::KeyValue; @@ -42,11 +40,12 @@ use crate::memtable::partition_tree::partition::{ use crate::memtable::partition_tree::PartitionTreeConfig; use crate::memtable::stats::WriteMetrics; use crate::memtable::{BoxedBatchIterator, KeyValues}; +use crate::memtable::encoder::{ SparseEncoder}; use crate::metrics::{PARTITION_TREE_READ_STAGE_ELAPSED, READ_ROWS_TOTAL, READ_STAGE_ELAPSED}; use crate::read::dedup::LastNonNullIter; use crate::read::Batch; use crate::region::options::MergeMode; -use crate::row_converter::{PrimaryKeyCodec, SortField}; +use crate::row_converter::{PrimaryKeyCodec}; /// The partition tree. pub struct PartitionTree { @@ -73,15 +72,7 @@ impl PartitionTree { config: &PartitionTreeConfig, write_buffer_manager: Option, ) -> Self { - let sparse_encoder = SparseEncoder { - fields: metadata - .primary_key_columns() - .map(|c| FieldWithId { - field: SortField::new(c.column_schema.data_type.clone()), - column_id: c.column_id, - }) - .collect(), - }; + let sparse_encoder = SparseEncoder::new(&metadata); let is_partitioned = Partition::has_multi_partitions(&metadata); let mut config = config.clone(); if config.merge_mode == MergeMode::LastNonNull { @@ -436,34 +427,6 @@ impl PartitionTree { } } -struct FieldWithId { - field: SortField, - column_id: ColumnId, -} - -struct SparseEncoder { - fields: Vec, -} - -impl SparseEncoder { - fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec) -> Result<()> - where - I: Iterator>, - { - let mut serializer = Serializer::new(buffer); - for (value, field) in row.zip(self.fields.iter()) { - if !value.is_null() { - field - .column_id - .serialize(&mut serializer) - .context(SerializeFieldSnafu)?; - field.field.serialize(&mut serializer, &value)?; - } - } - Ok(()) - } -} - #[derive(Default)] struct TreeIterMetrics { iter_elapsed: Duration,