From 733f406e42c73cd4ecfb00cf80b07afb6fc6f0a9 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 19 Mar 2026 12:43:50 +0800 Subject: [PATCH] fix flat memtable and last row Signed-off-by: Ruihang Xia --- src/mito2/src/memtable/bulk/context.rs | 6 ++++- src/mito2/src/memtable/bulk/part_reader.rs | 29 ++++++++++++++++++++-- src/mito2/src/read/last_row.rs | 26 ++++++++++++++++--- src/mito2/src/read/prune.rs | 2 +- src/mito2/src/sst/parquet/file_range.rs | 1 + src/mito2/src/sst/parquet/reader.rs | 15 ++++++----- 6 files changed, 66 insertions(+), 13 deletions(-) diff --git a/src/mito2/src/memtable/bulk/context.rs b/src/mito2/src/memtable/bulk/context.rs index f291a9dcd5..999ea23eb3 100644 --- a/src/mito2/src/memtable/bulk/context.rs +++ b/src/mito2/src/memtable/bulk/context.rs @@ -18,7 +18,7 @@ use std::collections::VecDeque; use std::sync::Arc; use common_recordbatch::filter::SimpleFilterEvaluator; -use mito_codec::row_converter::{DensePrimaryKeyCodec, build_primary_key_codec}; +use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyFilter, build_primary_key_codec}; use parquet::file::metadata::ParquetMetaData; use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; @@ -149,6 +149,10 @@ impl BulkIterContext { &self.base.read_format } + pub(crate) fn new_primary_key_filter(&self) -> Option> { + self.base.new_primary_key_filter() + } + /// Returns the pre-filter mode. pub(crate) fn pre_filter_mode(&self) -> PreFilterMode { self.base.pre_filter_mode diff --git a/src/mito2/src/memtable/bulk/part_reader.rs b/src/mito2/src/memtable/bulk/part_reader.rs index 1e9d955321..63ec0fab3d 100644 --- a/src/mito2/src/memtable/bulk/part_reader.rs +++ b/src/mito2/src/memtable/bulk/part_reader.rs @@ -17,6 +17,7 @@ use std::time::Instant; use datatypes::arrow::array::BooleanArray; use datatypes::arrow::record_batch::RecordBatch; +use mito_codec::row_converter::PrimaryKeyFilter; use parquet::arrow::ProjectionMask; use parquet::arrow::arrow_reader::ParquetRecordBatchReader; use snafu::ResultExt; @@ -46,6 +47,7 @@ pub struct EncodedBulkPartIter { metrics: MemScanMetricsData, /// Optional memory scan metrics to report to. mem_scan_metrics: Option, + primary_key_filter: Option>, } impl EncodedBulkPartIter { @@ -58,6 +60,7 @@ impl EncodedBulkPartIter { mem_scan_metrics: Option, ) -> error::Result { assert!(context.read_format().as_flat().is_some()); + let primary_key_filter = context.base.new_primary_key_filter(); let parquet_meta = encoded_part.metadata().parquet_metadata.clone(); let data = encoded_part.data().clone(); @@ -91,6 +94,7 @@ impl EncodedBulkPartIter { ..Default::default() }, mem_scan_metrics, + primary_key_filter, }) } @@ -115,6 +119,7 @@ impl EncodedBulkPartIter { if let Some(batch) = apply_combined_filters( &self.context, &self.sequence, + &mut self.primary_key_filter, batch, self.current_skip_fields, )? { @@ -141,6 +146,7 @@ impl EncodedBulkPartIter { if let Some(batch) = apply_combined_filters( &self.context, &self.sequence, + &mut self.primary_key_filter, batch, self.current_skip_fields, )? { @@ -210,6 +216,7 @@ pub struct BulkPartBatchIter { metrics: MemScanMetricsData, /// Optional memory scan metrics to report to. mem_scan_metrics: Option, + primary_key_filter: Option>, } impl BulkPartBatchIter { @@ -222,6 +229,7 @@ impl BulkPartBatchIter { mem_scan_metrics: Option, ) -> Self { assert!(context.read_format().as_flat().is_some()); + let primary_key_filter = context.base.new_primary_key_filter(); Self { batches: VecDeque::from(batches), @@ -232,6 +240,7 @@ impl BulkPartBatchIter { ..Default::default() }, mem_scan_metrics, + primary_key_filter, } } @@ -283,8 +292,13 @@ impl BulkPartBatchIter { PreFilterMode::SkipFieldsOnDelete => true, }; - let Some(filtered_batch) = - apply_combined_filters(&self.context, &self.sequence, projected_batch, skip_fields)? + let Some(filtered_batch) = apply_combined_filters( + &self.context, + &self.sequence, + &mut self.primary_key_filter, + projected_batch, + skip_fields, + )? else { self.metrics.scan_cost += start.elapsed(); return Ok(None); @@ -352,9 +366,20 @@ impl Drop for BulkPartBatchIter { fn apply_combined_filters( context: &BulkIterContext, sequence: &Option, + primary_key_filter: &mut Option>, record_batch: RecordBatch, skip_fields: bool, ) -> error::Result> { + let record_batch = match primary_key_filter.as_mut() { + Some(primary_key_filter) => context + .base + .prefilter_flat_batch_by_primary_key(record_batch, primary_key_filter.as_mut())?, + None => Some(record_batch), + }; + let Some(record_batch) = record_batch else { + return Ok(None); + }; + // Converts the format to the flat format first. let format = context.read_format().as_flat().unwrap(); let record_batch = format.convert_batch(record_batch, None)?; diff --git a/src/mito2/src/read/last_row.rs b/src/mito2/src/read/last_row.rs index c2336f218d..0a4ff7a1fc 100644 --- a/src/mito2/src/read/last_row.rs +++ b/src/mito2/src/read/last_row.rs @@ -21,6 +21,7 @@ use datatypes::arrow::array::{Array, BinaryArray}; use datatypes::arrow::compute::concat_batches; use datatypes::arrow::record_batch::RecordBatch; use datatypes::vectors::UInt32Vector; +use mito_codec::row_converter::PrimaryKeyFilter; use snafu::ResultExt; use store_api::storage::{FileId, TimeSeriesRowSelector}; @@ -311,6 +312,7 @@ impl FlatRowGroupLastRowCachedReader { cache_strategy: CacheStrategy, projection: &[usize], reader: FlatRowGroupReader, + primary_key_filter: Option>, ) -> Self { let key = SelectorResultKey { file_id, @@ -324,10 +326,10 @@ impl FlatRowGroupLastRowCachedReader { if is_flat && schema_matches { Self::new_hit(value) } else { - Self::new_miss(key, projection, reader, cache_strategy) + Self::new_miss(key, projection, reader, cache_strategy, primary_key_filter) } } else { - Self::new_miss(key, projection, reader, cache_strategy) + Self::new_miss(key, projection, reader, cache_strategy, primary_key_filter) } } @@ -349,6 +351,7 @@ impl FlatRowGroupLastRowCachedReader { projection: &[usize], reader: FlatRowGroupReader, cache_strategy: CacheStrategy, + primary_key_filter: Option>, ) -> Self { selector_result_cache_miss(); Self::Miss(FlatRowGroupLastRowReader::new( @@ -356,6 +359,7 @@ impl FlatRowGroupLastRowCachedReader { projection.to_vec(), reader, cache_strategy, + primary_key_filter, )) } } @@ -429,6 +433,7 @@ impl BatchBuffer { pub(crate) struct FlatRowGroupLastRowReader { key: SelectorResultKey, reader: FlatRowGroupReader, + primary_key_filter: Option>, selector: FlatLastTimestampSelector, yielded_batches: Vec, cache_strategy: CacheStrategy, @@ -443,10 +448,12 @@ impl FlatRowGroupLastRowReader { projection: Vec, reader: FlatRowGroupReader, cache_strategy: CacheStrategy, + primary_key_filter: Option>, ) -> Self { Self { key, reader, + primary_key_filter, selector: FlatLastTimestampSelector::default(), yielded_batches: vec![], cache_strategy, @@ -470,7 +477,11 @@ impl FlatRowGroupLastRowReader { return self.flush_pending(); } - while let Some(batch) = self.reader.next_batch()? { + while let Some(raw_batch) = self.reader.next_raw_batch()? { + let Some(raw_batch) = self.prefilter_primary_keys(raw_batch)? else { + continue; + }; + let batch = self.reader.convert_batch(raw_batch)?; self.selector.on_next(batch, &mut self.pending)?; if self.pending.is_full() { return self.flush_pending(); @@ -491,6 +502,15 @@ impl FlatRowGroupLastRowReader { Ok(None) } + fn prefilter_primary_keys(&mut self, batch: RecordBatch) -> Result> { + let Some(primary_key_filter) = self.primary_key_filter.as_mut() else { + return Ok(Some(batch)); + }; + + self.reader + .prefilter_raw_batch_by_primary_key(batch, primary_key_filter.as_mut()) + } + fn maybe_update_cache(&mut self) { if self.yielded_batches.is_empty() { return; diff --git a/src/mito2/src/read/prune.rs b/src/mito2/src/read/prune.rs index 18177c8edd..b08f6c55d8 100644 --- a/src/mito2/src/read/prune.rs +++ b/src/mito2/src/read/prune.rs @@ -300,7 +300,7 @@ impl FlatPruneReader { skip_fields: bool, ) -> Self { Self { - primary_key_filter: ctx.new_primary_key_filter(), + primary_key_filter: None, context: ctx, source: FlatSource::LastRow(reader), metrics: Default::default(), diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 3682e26fc8..b7b8a79ea5 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -285,6 +285,7 @@ impl FileRange { self.context.reader_builder.cache_strategy().clone(), self.context.read_format().projection_indices(), flat_row_group_reader, + self.context.new_primary_key_filter(), ); FlatPruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields) } else { diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 1f80d1ee02..5dbe026b23 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -31,7 +31,7 @@ use datatypes::arrow::error::ArrowError; use datatypes::arrow::record_batch::RecordBatch; use datatypes::data_type::ConcreteDataType; use datatypes::prelude::DataType; -use mito_codec::row_converter::build_primary_key_codec; +use mito_codec::row_converter::{PrimaryKeyFilter, build_primary_key_codec}; use object_store::ObjectStore; use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection}; use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels}; @@ -2203,11 +2203,14 @@ impl FlatRowGroupReader { flat_format.convert_batch(record_batch, self.override_sequence.as_ref()) } - /// Returns the next converted flat RecordBatch. - pub(crate) fn next_batch(&mut self) -> Result> { - self.next_raw_batch()? - .map(|record_batch| self.convert_batch(record_batch)) - .transpose() + /// Applies the encoded primary-key prefilter to a raw parquet batch before flat conversion. + pub(crate) fn prefilter_raw_batch_by_primary_key( + &self, + record_batch: RecordBatch, + primary_key_filter: &mut dyn PrimaryKeyFilter, + ) -> Result> { + self.context + .prefilter_flat_batch_by_primary_key(record_batch, primary_key_filter) } }