fix flat memtable and last row

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-03-19 12:43:50 +08:00
parent d7e64419c9
commit 733f406e42
6 changed files with 66 additions and 13 deletions

View File

@@ -18,7 +18,7 @@ use std::collections::VecDeque;
use std::sync::Arc;
use common_recordbatch::filter::SimpleFilterEvaluator;
use mito_codec::row_converter::{DensePrimaryKeyCodec, build_primary_key_codec};
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyFilter, build_primary_key_codec};
use parquet::file::metadata::ParquetMetaData;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
@@ -149,6 +149,10 @@ impl BulkIterContext {
&self.base.read_format
}
pub(crate) fn new_primary_key_filter(&self) -> Option<Box<dyn PrimaryKeyFilter>> {
self.base.new_primary_key_filter()
}
/// Returns the pre-filter mode.
pub(crate) fn pre_filter_mode(&self) -> PreFilterMode {
self.base.pre_filter_mode

View File

@@ -17,6 +17,7 @@ use std::time::Instant;
use datatypes::arrow::array::BooleanArray;
use datatypes::arrow::record_batch::RecordBatch;
use mito_codec::row_converter::PrimaryKeyFilter;
use parquet::arrow::ProjectionMask;
use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
use snafu::ResultExt;
@@ -46,6 +47,7 @@ pub struct EncodedBulkPartIter {
metrics: MemScanMetricsData,
/// Optional memory scan metrics to report to.
mem_scan_metrics: Option<MemScanMetrics>,
primary_key_filter: Option<Box<dyn PrimaryKeyFilter>>,
}
impl EncodedBulkPartIter {
@@ -58,6 +60,7 @@ impl EncodedBulkPartIter {
mem_scan_metrics: Option<MemScanMetrics>,
) -> error::Result<Self> {
assert!(context.read_format().as_flat().is_some());
let primary_key_filter = context.base.new_primary_key_filter();
let parquet_meta = encoded_part.metadata().parquet_metadata.clone();
let data = encoded_part.data().clone();
@@ -91,6 +94,7 @@ impl EncodedBulkPartIter {
..Default::default()
},
mem_scan_metrics,
primary_key_filter,
})
}
@@ -115,6 +119,7 @@ impl EncodedBulkPartIter {
if let Some(batch) = apply_combined_filters(
&self.context,
&self.sequence,
&mut self.primary_key_filter,
batch,
self.current_skip_fields,
)? {
@@ -141,6 +146,7 @@ impl EncodedBulkPartIter {
if let Some(batch) = apply_combined_filters(
&self.context,
&self.sequence,
&mut self.primary_key_filter,
batch,
self.current_skip_fields,
)? {
@@ -210,6 +216,7 @@ pub struct BulkPartBatchIter {
metrics: MemScanMetricsData,
/// Optional memory scan metrics to report to.
mem_scan_metrics: Option<MemScanMetrics>,
primary_key_filter: Option<Box<dyn PrimaryKeyFilter>>,
}
impl BulkPartBatchIter {
@@ -222,6 +229,7 @@ impl BulkPartBatchIter {
mem_scan_metrics: Option<MemScanMetrics>,
) -> Self {
assert!(context.read_format().as_flat().is_some());
let primary_key_filter = context.base.new_primary_key_filter();
Self {
batches: VecDeque::from(batches),
@@ -232,6 +240,7 @@ impl BulkPartBatchIter {
..Default::default()
},
mem_scan_metrics,
primary_key_filter,
}
}
@@ -283,8 +292,13 @@ impl BulkPartBatchIter {
PreFilterMode::SkipFieldsOnDelete => true,
};
let Some(filtered_batch) =
apply_combined_filters(&self.context, &self.sequence, projected_batch, skip_fields)?
let Some(filtered_batch) = apply_combined_filters(
&self.context,
&self.sequence,
&mut self.primary_key_filter,
projected_batch,
skip_fields,
)?
else {
self.metrics.scan_cost += start.elapsed();
return Ok(None);
@@ -352,9 +366,20 @@ impl Drop for BulkPartBatchIter {
fn apply_combined_filters(
context: &BulkIterContext,
sequence: &Option<SequenceRange>,
primary_key_filter: &mut Option<Box<dyn PrimaryKeyFilter>>,
record_batch: RecordBatch,
skip_fields: bool,
) -> error::Result<Option<RecordBatch>> {
let record_batch = match primary_key_filter.as_mut() {
Some(primary_key_filter) => context
.base
.prefilter_flat_batch_by_primary_key(record_batch, primary_key_filter.as_mut())?,
None => Some(record_batch),
};
let Some(record_batch) = record_batch else {
return Ok(None);
};
// Converts the format to the flat format first.
let format = context.read_format().as_flat().unwrap();
let record_batch = format.convert_batch(record_batch, None)?;

View File

@@ -21,6 +21,7 @@ use datatypes::arrow::array::{Array, BinaryArray};
use datatypes::arrow::compute::concat_batches;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::vectors::UInt32Vector;
use mito_codec::row_converter::PrimaryKeyFilter;
use snafu::ResultExt;
use store_api::storage::{FileId, TimeSeriesRowSelector};
@@ -311,6 +312,7 @@ impl FlatRowGroupLastRowCachedReader {
cache_strategy: CacheStrategy,
projection: &[usize],
reader: FlatRowGroupReader,
primary_key_filter: Option<Box<dyn PrimaryKeyFilter>>,
) -> Self {
let key = SelectorResultKey {
file_id,
@@ -324,10 +326,10 @@ impl FlatRowGroupLastRowCachedReader {
if is_flat && schema_matches {
Self::new_hit(value)
} else {
Self::new_miss(key, projection, reader, cache_strategy)
Self::new_miss(key, projection, reader, cache_strategy, primary_key_filter)
}
} else {
Self::new_miss(key, projection, reader, cache_strategy)
Self::new_miss(key, projection, reader, cache_strategy, primary_key_filter)
}
}
@@ -349,6 +351,7 @@ impl FlatRowGroupLastRowCachedReader {
projection: &[usize],
reader: FlatRowGroupReader,
cache_strategy: CacheStrategy,
primary_key_filter: Option<Box<dyn PrimaryKeyFilter>>,
) -> Self {
selector_result_cache_miss();
Self::Miss(FlatRowGroupLastRowReader::new(
@@ -356,6 +359,7 @@ impl FlatRowGroupLastRowCachedReader {
projection.to_vec(),
reader,
cache_strategy,
primary_key_filter,
))
}
}
@@ -429,6 +433,7 @@ impl BatchBuffer {
pub(crate) struct FlatRowGroupLastRowReader {
key: SelectorResultKey,
reader: FlatRowGroupReader,
primary_key_filter: Option<Box<dyn PrimaryKeyFilter>>,
selector: FlatLastTimestampSelector,
yielded_batches: Vec<RecordBatch>,
cache_strategy: CacheStrategy,
@@ -443,10 +448,12 @@ impl FlatRowGroupLastRowReader {
projection: Vec<usize>,
reader: FlatRowGroupReader,
cache_strategy: CacheStrategy,
primary_key_filter: Option<Box<dyn PrimaryKeyFilter>>,
) -> Self {
Self {
key,
reader,
primary_key_filter,
selector: FlatLastTimestampSelector::default(),
yielded_batches: vec![],
cache_strategy,
@@ -470,7 +477,11 @@ impl FlatRowGroupLastRowReader {
return self.flush_pending();
}
while let Some(batch) = self.reader.next_batch()? {
while let Some(raw_batch) = self.reader.next_raw_batch()? {
let Some(raw_batch) = self.prefilter_primary_keys(raw_batch)? else {
continue;
};
let batch = self.reader.convert_batch(raw_batch)?;
self.selector.on_next(batch, &mut self.pending)?;
if self.pending.is_full() {
return self.flush_pending();
@@ -491,6 +502,15 @@ impl FlatRowGroupLastRowReader {
Ok(None)
}
fn prefilter_primary_keys(&mut self, batch: RecordBatch) -> Result<Option<RecordBatch>> {
let Some(primary_key_filter) = self.primary_key_filter.as_mut() else {
return Ok(Some(batch));
};
self.reader
.prefilter_raw_batch_by_primary_key(batch, primary_key_filter.as_mut())
}
fn maybe_update_cache(&mut self) {
if self.yielded_batches.is_empty() {
return;

View File

@@ -300,7 +300,7 @@ impl FlatPruneReader {
skip_fields: bool,
) -> Self {
Self {
primary_key_filter: ctx.new_primary_key_filter(),
primary_key_filter: None,
context: ctx,
source: FlatSource::LastRow(reader),
metrics: Default::default(),

View File

@@ -285,6 +285,7 @@ impl FileRange {
self.context.reader_builder.cache_strategy().clone(),
self.context.read_format().projection_indices(),
flat_row_group_reader,
self.context.new_primary_key_filter(),
);
FlatPruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields)
} else {

View File

@@ -31,7 +31,7 @@ use datatypes::arrow::error::ArrowError;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::DataType;
use mito_codec::row_converter::build_primary_key_codec;
use mito_codec::row_converter::{PrimaryKeyFilter, build_primary_key_codec};
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
@@ -2203,11 +2203,14 @@ impl FlatRowGroupReader {
flat_format.convert_batch(record_batch, self.override_sequence.as_ref())
}
/// Returns the next converted flat RecordBatch.
pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
self.next_raw_batch()?
.map(|record_batch| self.convert_batch(record_batch))
.transpose()
/// Applies the encoded primary-key prefilter to a raw parquet batch before flat conversion.
pub(crate) fn prefilter_raw_batch_by_primary_key(
&self,
record_batch: RecordBatch,
primary_key_filter: &mut dyn PrimaryKeyFilter,
) -> Result<Option<RecordBatch>> {
self.context
.prefilter_flat_batch_by_primary_key(record_batch, primary_key_filter)
}
}