diff --git a/src/mito2/src/extension.rs b/src/mito2/src/extension.rs index 944344b1cf..b31d006c27 100644 --- a/src/mito2/src/extension.rs +++ b/src/mito2/src/extension.rs @@ -8,14 +8,23 @@ use common_time::range::TimestampRange; use store_api::storage::{ScanRequest, SequenceNumber}; use crate::error::Result; +use crate::read::BoxedRecordBatchStream; use crate::read::range::RowGroupIndex; use crate::read::scan_region::StreamContext; use crate::read::scan_util::PartitionMetrics; -use crate::read::{BoxedBatchStream, BoxedRecordBatchStream}; use crate::region::MitoRegionRef; +use crate::sst::parquet::file_range::PreFilterMode; pub type InclusiveTimeRange = (Timestamp, Timestamp); +/// Per-range read options passed to [`ExtensionRange::flat_reader`]. +#[derive(Debug, Clone, Copy)] +pub struct ExtensionRangeReadOptions { + /// How aggressively to pre-filter columns before merging with other sources + /// in the same partition range. + pub pre_filter_mode: PreFilterMode, +} + /// [`ExtensionRange`] is used to represent a scannable "range" for mito engine, just like the /// memtable range and sst file range, but resides on the outside. /// It can be scanned side by side as other ranges to produce the final result, so it's very useful @@ -30,29 +39,16 @@ pub trait ExtensionRange: Debug + Send + Sync { /// The row groups number in this range. fn num_row_groups(&self) -> u64; - /// Create the reader for reading this range. - fn reader(&self, context: &StreamContext) -> BoxedExtensionRangeReader; - /// Create the flat reader for reading this range in flat format. - fn flat_reader(&self, context: &StreamContext) -> BoxedExtensionFlatRangeReader; + fn flat_reader( + &self, + context: &StreamContext, + options: ExtensionRangeReadOptions, + ) -> BoxedExtensionFlatRangeReader; } pub type BoxedExtensionRange = Box; -/// The reader to read an extension range. -#[async_trait] -pub trait ExtensionRangeReader: Send { - /// Read the extension range by creating a stream that produces [`Batch`]. - async fn read( - self: Box, - context: Arc, - metrics: PartitionMetrics, - index: RowGroupIndex, - ) -> Result; -} - -pub type BoxedExtensionRangeReader = Box; - /// The reader to read an extension range in flat format (producing [`RecordBatch`]). #[async_trait] pub trait ExtensionFlatRangeReader: Send { diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index af19f42262..1a94c274be 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -47,7 +47,7 @@ use crate::sst::file::{FileTimeRange, RegionFileId}; use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics; use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics; use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics; -use crate::sst::parquet::file_range::FileRange; +use crate::sst::parquet::file_range::{FileRange, PreFilterMode}; use crate::sst::parquet::flat_format::time_index_column_index; use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics}; use crate::sst::parquet::row_group::ParquetFetchMetrics; @@ -1577,11 +1577,12 @@ pub(crate) async fn scan_flat_extension_range( context: Arc, index: RowGroupIndex, partition_metrics: PartitionMetrics, + options: crate::extension::ExtensionRangeReadOptions, ) -> Result { use snafu::ResultExt; let range = context.input.extension_range(index.index); - let reader = range.flat_reader(context.as_ref()); + let reader = range.flat_reader(context.as_ref(), options); let stream = reader .read(context, partition_metrics, index) .await @@ -1593,10 +1594,12 @@ pub(crate) async fn maybe_scan_flat_other_ranges( context: &Arc, index: RowGroupIndex, metrics: &PartitionMetrics, + pre_filter_mode: PreFilterMode, ) -> Result { #[cfg(feature = "enterprise")] { - scan_flat_extension_range(context.clone(), index, metrics.clone()).await + let options = crate::extension::ExtensionRangeReadOptions { pre_filter_mode }; + scan_flat_extension_range(context.clone(), index, metrics.clone(), options).await } #[cfg(not(feature = "enterprise"))] @@ -1604,6 +1607,7 @@ pub(crate) async fn maybe_scan_flat_other_ranges( let _ = context; let _ = index; let _ = metrics; + let _ = pre_filter_mode; crate::error::UnexpectedSnafu { reason: "no other ranges scannable in flat format", diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 39bd0ce842..432099dbcf 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -649,6 +649,7 @@ pub(crate) async fn build_flat_sources( let mut ordered_sources = Vec::with_capacity(num_indices); ordered_sources.resize_with(num_indices, || None); let mut file_scan_tasks = Vec::new(); + let pre_filter_mode = stream_ctx.range_pre_filter_mode(part_range); for (position, index) in range_meta.row_group_indices.iter().enumerate() { if stream_ctx.is_mem_range_index(*index) { @@ -692,8 +693,13 @@ pub(crate) async fn build_flat_sources( ordered_sources[position] = Some(Box::pin(stream) as _); } } else { - let stream = - scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await?; + let stream = scan_util::maybe_scan_flat_other_ranges( + stream_ctx, + *index, + part_metrics, + pre_filter_mode, + ) + .await?; ordered_sources[position] = Some(stream); } } diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index cb6e850439..a48a7048bd 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -119,6 +119,8 @@ impl UnorderedScan { try_stream! { // Gets range meta. let range_meta = &stream_ctx.ranges[part_range_id]; + let part_range = range_meta.new_partition_range(part_range_id); + let pre_filter_mode = stream_ctx.range_pre_filter_mode(&part_range); for index in &range_meta.row_group_indices { if stream_ctx.is_mem_range_index(*index) { let stream = scan_flat_mem_ranges( @@ -146,6 +148,7 @@ impl UnorderedScan { &stream_ctx, *index, &part_metrics, + pre_filter_mode, ).await?; for await record_batch in stream { yield record_batch?;