feat: decode pk eagerly (#7350)

* feat: decode pk eagerly

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* merge primary_key_codec and decode_primary_key_values

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-12-05 17:11:51 +08:00
committed by GitHub
parent 1ebcef4794
commit edb1f6086f
4 changed files with 59 additions and 3 deletions

View File

@@ -84,6 +84,14 @@ impl ProjectionMapper {
}
}
/// Returns true if the projection includes any tag columns.
pub(crate) fn has_tags(&self) -> bool {
match self {
ProjectionMapper::PrimaryKey(m) => m.has_tags(),
ProjectionMapper::Flat(_) => false,
}
}
/// Returns ids of projected columns that we need to read
/// from memtables and SSTs.
pub(crate) fn column_ids(&self) -> &[ColumnId] {
@@ -257,6 +265,11 @@ impl PrimaryKeyProjectionMapper {
&self.metadata
}
/// Returns true if the projection includes any tag columns.
pub(crate) fn has_tags(&self) -> bool {
self.has_tags
}
/// Returns ids of projected columns that we need to read
/// from memtables and SSTs.
pub(crate) fn column_ids(&self) -> &[ColumnId] {

View File

@@ -958,6 +958,7 @@ impl ScanInput {
) -> Result<FileRangeBuilder> {
let predicate = self.predicate_for_file(file);
let filter_mode = pre_filter_mode(self.append_mode, self.merge_mode);
let decode_pk_values = !self.compaction && self.mapper.has_tags();
let res = self
.access_layer
.read_sst(file.clone())
@@ -971,6 +972,7 @@ impl ScanInput {
.flat_format(self.flat_format)
.compaction(self.compaction)
.pre_filter_mode(filter_mode)
.decode_primary_key_values(decode_pk_values)
.build_reader_input(reader_metrics)
.await;
let (mut file_range_ctx, selection) = match res {

View File

@@ -40,7 +40,10 @@ use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::prelude::DataType;
use datatypes::vectors::{Helper, Vector};
use mito_codec::row_converter::{SortField, build_primary_key_codec_with_fields};
use mito_codec::row_converter::{
CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec,
build_primary_key_codec_with_fields,
};
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
use parquet::file::statistics::Statistics;
use snafu::{OptionExt, ResultExt, ensure};
@@ -48,7 +51,8 @@ use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
use store_api::storage::{ColumnId, SequenceNumber};
use crate::error::{
ConvertVectorSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result,
ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu,
NewRecordBatchSnafu, Result,
};
use crate::read::{Batch, BatchBuilder, BatchColumn};
use crate::sst::file::{FileMeta, FileTimeRange};
@@ -386,6 +390,13 @@ impl ReadFormat {
}
}
/// Enables or disables eager decoding of primary key values into batches.
pub(crate) fn set_decode_primary_key_values(&mut self, decode: bool) {
if let ReadFormat::PrimaryKey(format) = self {
format.set_decode_primary_key_values(decode);
}
}
/// Creates a sequence array to override.
pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
match self {
@@ -411,6 +422,8 @@ pub struct PrimaryKeyReadFormat {
field_id_to_projected_index: HashMap<ColumnId, usize>,
/// Sequence number to override the sequence read from the SST.
override_sequence: Option<SequenceNumber>,
/// Codec used to decode primary key values if eager decoding is enabled.
primary_key_codec: Option<Arc<dyn PrimaryKeyCodec>>,
}
impl PrimaryKeyReadFormat {
@@ -439,6 +452,7 @@ impl PrimaryKeyReadFormat {
projection_indices: format_projection.projection_indices,
field_id_to_projected_index: format_projection.column_id_to_projected_index,
override_sequence: None,
primary_key_codec: None,
}
}
@@ -447,6 +461,15 @@ impl PrimaryKeyReadFormat {
self.override_sequence = sequence;
}
/// Enables or disables eager decoding of primary key values into batches.
pub(crate) fn set_decode_primary_key_values(&mut self, decode: bool) {
self.primary_key_codec = if decode {
Some(build_primary_key_codec(&self.metadata))
} else {
None
};
}
/// Gets the arrow schema of the SST file.
///
/// This schema is computed from the region metadata but should be the same
@@ -561,7 +584,12 @@ impl PrimaryKeyReadFormat {
});
}
let batch = builder.build()?;
let mut batch = builder.build()?;
if let Some(codec) = &self.primary_key_codec {
let pk_values: CompositeValues =
codec.decode(batch.primary_key()).context(DecodeSnafu)?;
batch.set_pk_values(pk_values);
}
batches.push_back(batch);
}

View File

@@ -127,6 +127,8 @@ pub struct ParquetReaderBuilder {
compaction: bool,
/// Mode to pre-filter columns.
pre_filter_mode: PreFilterMode,
/// Whether to decode primary key values eagerly when reading primary key format SSTs.
decode_primary_key_values: bool,
}
impl ParquetReaderBuilder {
@@ -152,6 +154,7 @@ impl ParquetReaderBuilder {
flat_format: false,
compaction: false,
pre_filter_mode: PreFilterMode::All,
decode_primary_key_values: false,
}
}
@@ -236,6 +239,13 @@ impl ParquetReaderBuilder {
self
}
/// Decodes primary key values eagerly when reading primary key format SSTs.
#[must_use]
pub(crate) fn decode_primary_key_values(mut self, decode: bool) -> Self {
self.decode_primary_key_values = decode;
self
}
/// Builds a [ParquetReader].
///
/// This needs to perform IO operation.
@@ -292,6 +302,9 @@ impl ParquetReaderBuilder {
self.compaction,
)?
};
if self.decode_primary_key_values {
read_format.set_decode_primary_key_values(true);
}
if need_override_sequence(&parquet_meta) {
read_format
.set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));