gate it, only run when necessary

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-03-19 05:32:04 +08:00
parent 680581103d
commit 1c7e2bb382
3 changed files with 97 additions and 18 deletions

View File

@@ -314,18 +314,21 @@ impl FlatPruneReader {
}
pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
while let Some(raw_batch) = {
loop {
let start = std::time::Instant::now();
let batch = self.source.next_raw_batch()?;
self.metrics.scan_cost += start.elapsed();
batch
} {
// Update metrics for the received batch
let Some(raw_batch) = self.source.next_raw_batch()? else {
return Ok(None);
};
// Account rows as soon as parquet yields a raw batch. The scan timer spans
// the raw read, encoded primary-key prefilter, and flat conversion so
// `scan_cost` keeps the same meaning after splitting `next_raw_batch()`
// from `convert_batch()`.
self.metrics.num_rows += raw_batch.num_rows();
self.metrics.num_batches += 1;
let num_rows_before_prefilter = raw_batch.num_rows();
let Some(prefiltered_batch) = self.prefilter_primary_keys(raw_batch)? else {
self.metrics.scan_cost += start.elapsed();
self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_prefilter;
continue;
};
@@ -333,7 +336,10 @@ impl FlatPruneReader {
self.metrics.filter_metrics.rows_precise_filtered += prefiltered_rows;
let record_batch = self.source.convert_batch(prefiltered_batch)?;
self.metrics.scan_cost += start.elapsed();
// `num_batches` counts decoded flat batches, not raw parquet batches.
self.metrics.num_batches += 1;
match self.prune_flat(record_batch)? {
Some(filtered_batch) => {
return Ok(Some(filtered_batch));
@@ -343,8 +349,6 @@ impl FlatPruneReader {
}
}
}
Ok(None)
}
fn prefilter_primary_keys(&mut self, record_batch: RecordBatch) -> Result<Option<RecordBatch>> {

View File

@@ -29,6 +29,7 @@ use datatypes::arrow::buffer::BooleanBuffer;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::Schema;
use mito_codec::primary_key_filter::is_partition_column;
use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec, PrimaryKeyFilter};
use parquet::arrow::arrow_reader::RowSelection;
use parquet::file::metadata::ParquetMetaData;
@@ -491,12 +492,31 @@ impl TagDecodeState {
}
impl RangeBase {
fn has_usable_primary_key_filter(&self) -> bool {
let Some(filters) = &self.primary_key_filters else {
return false;
};
let metadata = self.read_format.metadata();
filters.iter().any(|filter| {
!is_partition_column(filter.column_name())
&& metadata
.column_by_name(filter.column_name())
.is_some_and(|column| {
column.semantic_type == SemanticType::Tag
&& metadata.primary_key_index(column.column_id).is_some()
})
})
}
/// Builds an encoded primary-key filter for flat scan pre-filtering.
pub(crate) fn new_primary_key_filter(&self) -> Option<Box<dyn PrimaryKeyFilter>> {
let filters = self.primary_key_filters.as_ref()?;
if self.read_format.metadata().primary_key.is_empty() {
if self.read_format.metadata().primary_key.is_empty()
|| !self.has_usable_primary_key_filter()
{
return None;
}
let filters = self.primary_key_filters.as_ref()?;
Some(
self.codec
@@ -1071,12 +1091,19 @@ mod tests {
};
use datatypes::arrow::datatypes::UInt32Type;
use mito_codec::row_converter::build_primary_key_codec;
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadata;
use super::*;
use crate::test_util::sst_util::{new_primary_key, sst_region_metadata};
use crate::test_util::sst_util::{
new_primary_key, new_sparse_primary_key, sst_region_metadata,
sst_region_metadata_with_encoding,
};
fn new_test_range_base(exprs: &[Expr]) -> RangeBase {
let metadata = Arc::new(sst_region_metadata());
fn new_test_range_base_with_metadata(
metadata: Arc<RegionMetadata>,
exprs: &[Expr],
) -> RangeBase {
let read_format = ReadFormat::new_flat(
metadata.clone(),
metadata.column_metadatas.iter().map(|c| c.column_id),
@@ -1106,10 +1133,17 @@ mod tests {
}
}
fn new_raw_batch(primary_keys: &[&[u8]], field_values: &[u64]) -> RecordBatch {
fn new_test_range_base(exprs: &[Expr]) -> RangeBase {
new_test_range_base_with_metadata(Arc::new(sst_region_metadata()), exprs)
}
fn new_raw_batch_with_metadata(
metadata: Arc<RegionMetadata>,
primary_keys: &[&[u8]],
field_values: &[u64],
) -> RecordBatch {
assert_eq!(primary_keys.len(), field_values.len());
let metadata = Arc::new(sst_region_metadata());
let schema = ReadFormat::new_flat(
metadata.clone(),
metadata.column_metadatas.iter().map(|c| c.column_id),
@@ -1154,6 +1188,10 @@ mod tests {
.unwrap()
}
fn new_raw_batch(primary_keys: &[&[u8]], field_values: &[u64]) -> RecordBatch {
new_raw_batch_with_metadata(Arc::new(sst_region_metadata()), primary_keys, field_values)
}
fn field_values(batch: &RecordBatch) -> Vec<u64> {
batch
.column(0)
@@ -1164,6 +1202,13 @@ mod tests {
.to_vec()
}
#[test]
fn test_new_primary_key_filter_skips_non_tag_filters() {
let base = new_test_range_base(&[col("field_0").eq(lit(1_u64)), col("ts").gt(lit(0_i64))]);
assert!(base.new_primary_key_filter().is_none());
}
#[test]
fn test_prefilter_primary_key_drops_single_dictionary_batch() {
let pk_a = new_primary_key(&["a", "x"]);
@@ -1239,4 +1284,33 @@ mod tests {
assert_eq!(filtered.num_rows(), 4);
assert_eq!(field_values(&filtered), vec![10, 11, 14, 15]);
}
#[test]
fn test_prefilter_primary_key_sparse_path() {
let metadata = Arc::new(sst_region_metadata_with_encoding(
PrimaryKeyEncoding::Sparse,
));
let pk_a = new_sparse_primary_key(&["a", "x"], &metadata, 1, 11);
let pk_b = new_sparse_primary_key(&["b", "x"], &metadata, 1, 22);
let batch = new_raw_batch_with_metadata(
metadata.clone(),
&[
pk_a.as_slice(),
pk_a.as_slice(),
pk_b.as_slice(),
pk_b.as_slice(),
],
&[10, 11, 12, 13],
);
let base = new_test_range_base_with_metadata(metadata, &[col("tag_0").eq(lit("b"))]);
let mut primary_key_filter = base.new_primary_key_filter().unwrap();
let filtered = base
.prefilter_flat_batch_by_primary_key(batch, primary_key_filter.as_mut())
.unwrap()
.unwrap();
assert_eq!(filtered.num_rows(), 2);
assert_eq!(field_values(&filtered), vec![12, 13]);
}
}

View File

@@ -1633,11 +1633,12 @@ pub struct ReaderMetrics {
pub(crate) filter_metrics: ReaderFilterMetrics,
/// Duration to build the parquet reader.
pub(crate) build_cost: Duration,
/// Duration to scan the reader.
/// Duration to scan the reader, including parquet fetches and decoding work
/// needed to materialize output batches.
pub(crate) scan_cost: Duration,
/// Number of record batches read.
pub(crate) num_record_batches: usize,
/// Number of batches decoded.
/// Number of decoded output batches materialized from parquet data.
pub(crate) num_batches: usize,
/// Number of rows read.
pub(crate) num_rows: usize,