refactor(mito2): reshape extension range API (#8004)

* refactor(mito2): reshape extension range API

Thread per-range read options into the extension range flat reader so
it can honor PreFilterMode like other range kinds, and drop the unused
legacy Batch reader surface.

- extension.rs: introduce ExtensionRangeReadOptions { pre_filter_mode };
  flat_reader() takes it; remove reader(), ExtensionRangeReader, and
  BoxedExtensionRangeReader (no in-tree or out-of-tree call sites use
  the Batch-format reader path anymore).
- scan_util.rs: plumb options through maybe_scan_flat_other_ranges and
  scan_flat_extension_range.
- seq_scan.rs / unordered_scan.rs: build options at the call site via
  StreamContext::range_pre_filter_mode.

Using an options struct (rather than a bare PreFilterMode argument)
keeps future additions additive for out-of-tree ExtensionRange impls.

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor(mito2): hoist ExtensionRangeReadOptions out of row-group loops

Build ext_options once per partition range instead of per row-group index in
build_flat_sources and scan_flat_partition_range. Both inputs (part_range and
range_pre_filter_mode) are loop-invariant.

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: fix compiler error without enterprise feature

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2026-04-27 15:41:54 +08:00
committed by GitHub
parent 0c942fc23a
commit 9b75d8b734
4 changed files with 33 additions and 24 deletions

View File

@@ -8,14 +8,23 @@ use common_time::range::TimestampRange;
use store_api::storage::{ScanRequest, SequenceNumber};
use crate::error::Result;
use crate::read::BoxedRecordBatchStream;
use crate::read::range::RowGroupIndex;
use crate::read::scan_region::StreamContext;
use crate::read::scan_util::PartitionMetrics;
use crate::read::{BoxedBatchStream, BoxedRecordBatchStream};
use crate::region::MitoRegionRef;
use crate::sst::parquet::file_range::PreFilterMode;
pub type InclusiveTimeRange = (Timestamp, Timestamp);
/// Per-range read options passed to [`ExtensionRange::flat_reader`].
#[derive(Debug, Clone, Copy)]
pub struct ExtensionRangeReadOptions {
/// How aggressively to pre-filter columns before merging with other sources
/// in the same partition range.
pub pre_filter_mode: PreFilterMode,
}
/// [`ExtensionRange`] is used to represent a scannable "range" for mito engine, just like the
/// memtable range and sst file range, but resides on the outside.
/// It can be scanned side by side as other ranges to produce the final result, so it's very useful
@@ -30,29 +39,16 @@ pub trait ExtensionRange: Debug + Send + Sync {
/// The row groups number in this range.
fn num_row_groups(&self) -> u64;
/// Create the reader for reading this range.
fn reader(&self, context: &StreamContext) -> BoxedExtensionRangeReader;
/// Create the flat reader for reading this range in flat format.
fn flat_reader(&self, context: &StreamContext) -> BoxedExtensionFlatRangeReader;
fn flat_reader(
&self,
context: &StreamContext,
options: ExtensionRangeReadOptions,
) -> BoxedExtensionFlatRangeReader;
}
pub type BoxedExtensionRange = Box<dyn ExtensionRange>;
/// The reader to read an extension range.
#[async_trait]
pub trait ExtensionRangeReader: Send {
/// Read the extension range by creating a stream that produces [`Batch`].
async fn read(
self: Box<Self>,
context: Arc<StreamContext>,
metrics: PartitionMetrics,
index: RowGroupIndex,
) -> Result<BoxedBatchStream, BoxedError>;
}
pub type BoxedExtensionRangeReader = Box<dyn ExtensionRangeReader>;
/// The reader to read an extension range in flat format (producing [`RecordBatch`]).
#[async_trait]
pub trait ExtensionFlatRangeReader: Send {

View File

@@ -47,7 +47,7 @@ use crate::sst::file::{FileTimeRange, RegionFileId};
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics;
use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics;
use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics;
use crate::sst::parquet::file_range::FileRange;
use crate::sst::parquet::file_range::{FileRange, PreFilterMode};
use crate::sst::parquet::flat_format::time_index_column_index;
use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics};
use crate::sst::parquet::row_group::ParquetFetchMetrics;
@@ -1577,11 +1577,12 @@ pub(crate) async fn scan_flat_extension_range(
context: Arc<StreamContext>,
index: RowGroupIndex,
partition_metrics: PartitionMetrics,
options: crate::extension::ExtensionRangeReadOptions,
) -> Result<BoxedRecordBatchStream> {
use snafu::ResultExt;
let range = context.input.extension_range(index.index);
let reader = range.flat_reader(context.as_ref());
let reader = range.flat_reader(context.as_ref(), options);
let stream = reader
.read(context, partition_metrics, index)
.await
@@ -1593,10 +1594,12 @@ pub(crate) async fn maybe_scan_flat_other_ranges(
context: &Arc<StreamContext>,
index: RowGroupIndex,
metrics: &PartitionMetrics,
pre_filter_mode: PreFilterMode,
) -> Result<BoxedRecordBatchStream> {
#[cfg(feature = "enterprise")]
{
scan_flat_extension_range(context.clone(), index, metrics.clone()).await
let options = crate::extension::ExtensionRangeReadOptions { pre_filter_mode };
scan_flat_extension_range(context.clone(), index, metrics.clone(), options).await
}
#[cfg(not(feature = "enterprise"))]
@@ -1604,6 +1607,7 @@ pub(crate) async fn maybe_scan_flat_other_ranges(
let _ = context;
let _ = index;
let _ = metrics;
let _ = pre_filter_mode;
crate::error::UnexpectedSnafu {
reason: "no other ranges scannable in flat format",

View File

@@ -649,6 +649,7 @@ pub(crate) async fn build_flat_sources(
let mut ordered_sources = Vec::with_capacity(num_indices);
ordered_sources.resize_with(num_indices, || None);
let mut file_scan_tasks = Vec::new();
let pre_filter_mode = stream_ctx.range_pre_filter_mode(part_range);
for (position, index) in range_meta.row_group_indices.iter().enumerate() {
if stream_ctx.is_mem_range_index(*index) {
@@ -692,8 +693,13 @@ pub(crate) async fn build_flat_sources(
ordered_sources[position] = Some(Box::pin(stream) as _);
}
} else {
let stream =
scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await?;
let stream = scan_util::maybe_scan_flat_other_ranges(
stream_ctx,
*index,
part_metrics,
pre_filter_mode,
)
.await?;
ordered_sources[position] = Some(stream);
}
}

View File

@@ -119,6 +119,8 @@ impl UnorderedScan {
try_stream! {
// Gets range meta.
let range_meta = &stream_ctx.ranges[part_range_id];
let part_range = range_meta.new_partition_range(part_range_id);
let pre_filter_mode = stream_ctx.range_pre_filter_mode(&part_range);
for index in &range_meta.row_group_indices {
if stream_ctx.is_mem_range_index(*index) {
let stream = scan_flat_mem_ranges(
@@ -146,6 +148,7 @@ impl UnorderedScan {
&stream_ctx,
*index,
&part_metrics,
pre_filter_mode,
).await?;
for await record_batch in stream {
yield record_batch?;