From edb1f6086f62d99f588476af1142455303b71695 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 5 Dec 2025 17:11:51 +0800 Subject: [PATCH] feat: decode pk eagerly (#7350) * feat: decode pk eagerly Signed-off-by: Ruihang Xia * merge primary_key_codec and decode_primary_key_values Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/mito2/src/read/projection.rs | 13 +++++++++++ src/mito2/src/read/scan_region.rs | 2 ++ src/mito2/src/sst/parquet/format.rs | 34 ++++++++++++++++++++++++++--- src/mito2/src/sst/parquet/reader.rs | 13 +++++++++++ 4 files changed, 59 insertions(+), 3 deletions(-) diff --git a/src/mito2/src/read/projection.rs b/src/mito2/src/read/projection.rs index d7171e7f60..e4a3af5831 100644 --- a/src/mito2/src/read/projection.rs +++ b/src/mito2/src/read/projection.rs @@ -84,6 +84,14 @@ impl ProjectionMapper { } } + /// Returns true if the projection includes any tag columns. + pub(crate) fn has_tags(&self) -> bool { + match self { + ProjectionMapper::PrimaryKey(m) => m.has_tags(), + ProjectionMapper::Flat(_) => false, + } + } + /// Returns ids of projected columns that we need to read /// from memtables and SSTs. pub(crate) fn column_ids(&self) -> &[ColumnId] { @@ -257,6 +265,11 @@ impl PrimaryKeyProjectionMapper { &self.metadata } + /// Returns true if the projection includes any tag columns. + pub(crate) fn has_tags(&self) -> bool { + self.has_tags + } + /// Returns ids of projected columns that we need to read /// from memtables and SSTs. pub(crate) fn column_ids(&self) -> &[ColumnId] { diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index babdd43b0b..0fa9a0eef9 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -958,6 +958,7 @@ impl ScanInput { ) -> Result { let predicate = self.predicate_for_file(file); let filter_mode = pre_filter_mode(self.append_mode, self.merge_mode); + let decode_pk_values = !self.compaction && self.mapper.has_tags(); let res = self .access_layer .read_sst(file.clone()) @@ -971,6 +972,7 @@ impl ScanInput { .flat_format(self.flat_format) .compaction(self.compaction) .pre_filter_mode(filter_mode) + .decode_primary_key_values(decode_pk_values) .build_reader_input(reader_metrics) .await; let (mut file_range_ctx, selection) = match res { diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index f6b9c4272b..4d66292696 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -40,7 +40,10 @@ use datatypes::arrow::datatypes::{SchemaRef, UInt32Type}; use datatypes::arrow::record_batch::RecordBatch; use datatypes::prelude::DataType; use datatypes::vectors::{Helper, Vector}; -use mito_codec::row_converter::{SortField, build_primary_key_codec_with_fields}; +use mito_codec::row_converter::{ + CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec, + build_primary_key_codec_with_fields, +}; use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; use parquet::file::statistics::Statistics; use snafu::{OptionExt, ResultExt, ensure}; @@ -48,7 +51,8 @@ use store_api::metadata::{ColumnMetadata, RegionMetadataRef}; use store_api::storage::{ColumnId, SequenceNumber}; use crate::error::{ - ConvertVectorSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result, + ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu, + NewRecordBatchSnafu, Result, }; use crate::read::{Batch, BatchBuilder, BatchColumn}; use crate::sst::file::{FileMeta, FileTimeRange}; @@ -386,6 +390,13 @@ impl ReadFormat { } } + /// Enables or disables eager decoding of primary key values into batches. + pub(crate) fn set_decode_primary_key_values(&mut self, decode: bool) { + if let ReadFormat::PrimaryKey(format) = self { + format.set_decode_primary_key_values(decode); + } + } + /// Creates a sequence array to override. pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option { match self { @@ -411,6 +422,8 @@ pub struct PrimaryKeyReadFormat { field_id_to_projected_index: HashMap, /// Sequence number to override the sequence read from the SST. override_sequence: Option, + /// Codec used to decode primary key values if eager decoding is enabled. + primary_key_codec: Option>, } impl PrimaryKeyReadFormat { @@ -439,6 +452,7 @@ impl PrimaryKeyReadFormat { projection_indices: format_projection.projection_indices, field_id_to_projected_index: format_projection.column_id_to_projected_index, override_sequence: None, + primary_key_codec: None, } } @@ -447,6 +461,15 @@ impl PrimaryKeyReadFormat { self.override_sequence = sequence; } + /// Enables or disables eager decoding of primary key values into batches. + pub(crate) fn set_decode_primary_key_values(&mut self, decode: bool) { + self.primary_key_codec = if decode { + Some(build_primary_key_codec(&self.metadata)) + } else { + None + }; + } + /// Gets the arrow schema of the SST file. /// /// This schema is computed from the region metadata but should be the same @@ -561,7 +584,12 @@ impl PrimaryKeyReadFormat { }); } - let batch = builder.build()?; + let mut batch = builder.build()?; + if let Some(codec) = &self.primary_key_codec { + let pk_values: CompositeValues = + codec.decode(batch.primary_key()).context(DecodeSnafu)?; + batch.set_pk_values(pk_values); + } batches.push_back(batch); } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 769bb0fd2b..3fbb656471 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -127,6 +127,8 @@ pub struct ParquetReaderBuilder { compaction: bool, /// Mode to pre-filter columns. pre_filter_mode: PreFilterMode, + /// Whether to decode primary key values eagerly when reading primary key format SSTs. + decode_primary_key_values: bool, } impl ParquetReaderBuilder { @@ -152,6 +154,7 @@ impl ParquetReaderBuilder { flat_format: false, compaction: false, pre_filter_mode: PreFilterMode::All, + decode_primary_key_values: false, } } @@ -236,6 +239,13 @@ impl ParquetReaderBuilder { self } + /// Decodes primary key values eagerly when reading primary key format SSTs. + #[must_use] + pub(crate) fn decode_primary_key_values(mut self, decode: bool) -> Self { + self.decode_primary_key_values = decode; + self + } + /// Builds a [ParquetReader]. /// /// This needs to perform IO operation. @@ -292,6 +302,9 @@ impl ParquetReaderBuilder { self.compaction, )? }; + if self.decode_primary_key_values { + read_format.set_decode_primary_key_values(true); + } if need_override_sequence(&parquet_meta) { read_format .set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));