From e1e75b3ffe93cf6aa8745c25321eb1820cd9123e Mon Sep 17 00:00:00 2001 From: Yingwen Date: Mon, 25 May 2026 11:10:12 +0800 Subject: [PATCH] feat: implement a cache for the prefilter (#8102) * feat: cache parquet prefilter results Signed-off-by: evenyag * chore: set result cache size Signed-off-by: evenyag * refactor: rename is_stable to is_immutable and reject ScalarVariable Signed-off-by: evenyag * chore: typo Signed-off-by: evenyag * refactor: use capacity() for prefilter key memory accounting Signed-off-by: evenyag * feat: per filter cache Signed-off-by: evenyag * refactor: support other variants in MaybeFilter Signed-off-by: evenyag * refactor: split compute_projection_mask Signed-off-by: evenyag * refactor: build_prefilter_masks takes PrefilterEntry Signed-off-by: evenyag --------- Signed-off-by: evenyag --- config/config.md | 4 + config/datanode.example.toml | 10 + config/standalone.example.toml | 10 + src/cmd/src/datanode/objbench.rs | 2 + src/mito2/src/cache.rs | 300 ++++++++++- src/mito2/src/config.rs | 5 + src/mito2/src/sst/parquet/prefilter.rs | 699 +++++++++++++++++++------ src/mito2/src/sst/parquet/reader.rs | 117 ++++- src/mito2/src/worker.rs | 2 + tests-integration/tests/http.rs | 3 +- 10 files changed, 973 insertions(+), 179 deletions(-) diff --git a/config/config.md b/config/config.md index b1630d97ad..0fae0caaa4 100644 --- a/config/config.md +++ b/config/config.md @@ -155,6 +155,8 @@ | `region_engine.mito.vector_cache_size` | String | Auto | Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.
If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. | | `region_engine.mito.page_cache_size` | String | Auto | Cache size for pages of SST row groups. Setting it to 0 to disable the cache.
If not set, it's default to 1/8 of OS memory. | | `region_engine.mito.selector_result_cache_size` | String | Auto | Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.
If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. | +| `region_engine.mito.range_result_cache_size` | String | Auto | Cache size for flat range scan results. Setting it to 0 to disable the cache.
If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. | +| `region_engine.mito.prefilter_result_cache_size` | String | Auto | Cache size for prefilter results. Setting it to 0 to disable the cache.
If not set, it's default to 1/32 of OS memory with a max limitation of 128MB. | | `region_engine.mito.enable_write_cache` | Bool | `false` | Whether to enable the write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. | | `region_engine.mito.write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}`. | | `region_engine.mito.write_cache_size` | String | `5GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. | @@ -543,6 +545,8 @@ | `region_engine.mito.vector_cache_size` | String | Auto | Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.
If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. | | `region_engine.mito.page_cache_size` | String | Auto | Cache size for pages of SST row groups. Setting it to 0 to disable the cache.
If not set, it's default to 1/8 of OS memory. | | `region_engine.mito.selector_result_cache_size` | String | Auto | Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.
If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. | +| `region_engine.mito.range_result_cache_size` | String | Auto | Cache size for flat range scan results. Setting it to 0 to disable the cache.
If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. | +| `region_engine.mito.prefilter_result_cache_size` | String | Auto | Cache size for prefilter results. Setting it to 0 to disable the cache.
If not set, it's default to 1/32 of OS memory with a max limitation of 128MB. | | `region_engine.mito.enable_write_cache` | Bool | `false` | Whether to enable the write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. | | `region_engine.mito.write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}`. | | `region_engine.mito.write_cache_size` | String | `5GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 170045a090..d558918daf 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -480,6 +480,16 @@ auto_flush_interval = "1h" ## @toml2docs:none-default="Auto" #+ selector_result_cache_size = "512MB" +## Cache size for flat range scan results. Setting it to 0 to disable the cache. +## If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. +## @toml2docs:none-default="Auto" +#+ range_result_cache_size = "512MB" + +## Cache size for prefilter results. Setting it to 0 to disable the cache. +## If not set, it's default to 1/32 of OS memory with a max limitation of 128MB. +## @toml2docs:none-default="Auto" +#+ prefilter_result_cache_size = "128MB" + ## Whether to enable the write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. enable_write_cache = false diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 24249270b2..d5c42e744c 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -599,6 +599,16 @@ auto_flush_interval = "1h" ## @toml2docs:none-default="Auto" #+ selector_result_cache_size = "512MB" +## Cache size for flat range scan results. Setting it to 0 to disable the cache. +## If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. +## @toml2docs:none-default="Auto" +#+ range_result_cache_size = "512MB" + +## Cache size for prefilter results. Setting it to 0 to disable the cache. +## If not set, it's default to 1/32 of OS memory with a max limitation of 128MB. +## @toml2docs:none-default="Auto" +#+ prefilter_result_cache_size = "128MB" + ## Whether to enable the write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. enable_write_cache = false diff --git a/src/cmd/src/datanode/objbench.rs b/src/cmd/src/datanode/objbench.rs index a298430c83..65f194d19f 100644 --- a/src/cmd/src/datanode/objbench.rs +++ b/src/cmd/src/datanode/objbench.rs @@ -588,6 +588,8 @@ async fn build_cache_manager( .vector_cache_size(config.vector_cache_size.as_bytes()) .page_cache_size(config.page_cache_size.as_bytes()) .selector_result_cache_size(config.selector_result_cache_size.as_bytes()) + .range_result_cache_size(config.range_result_cache_size.as_bytes()) + .prefilter_result_cache_size(config.prefilter_result_cache_size.as_bytes()) .index_metadata_size(config.index.metadata_cache_size.as_bytes()) .index_content_size(config.index.content_cache_size.as_bytes()) .index_content_page_size(config.index.content_cache_page_size.as_bytes()) diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index c05db5b989..eee1cfae0a 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -30,6 +30,7 @@ use std::sync::Arc; use bytes::Bytes; use common_base::readable_size::ReadableSize; use common_telemetry::warn; +use datatypes::arrow::buffer::BooleanBuffer; use datatypes::arrow::record_batch::RecordBatch; use datatypes::value::Value; use datatypes::vectors::VectorRef; @@ -38,8 +39,10 @@ use index::result_cache::IndexResultCache; use moka::notification::RemovalCause; use moka::sync::Cache; use object_store::ObjectStore; +use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; use parquet::file::metadata::{FileMetaData, PageIndexPolicy, ParquetMetaData}; use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef}; +use smallvec::SmallVec; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelector}; @@ -74,6 +77,8 @@ const INDEX_TYPE: &str = "index"; const SELECTOR_RESULT_TYPE: &str = "selector_result"; /// Metrics type key for range scan result cache. const RANGE_RESULT_TYPE: &str = "range_result"; +/// Metrics type key for prefilter result cache. +const PREFILTER_RESULT_TYPE: &str = "prefilter_result"; const RANGE_RESULT_CONCAT_MEMORY_LIMIT: ReadableSize = ReadableSize::mb(512); const RANGE_RESULT_CONCAT_MEMORY_PERMIT: ReadableSize = ReadableSize::kb(1); @@ -274,6 +279,117 @@ fn strip_region_metadata_from_parquet(parquet_metadata: ParquetMetaData) -> Parq .build() } +fn removal_cause_str(cause: RemovalCause) -> &'static str { + match cause { + RemovalCause::Expired => "expired", + RemovalCause::Explicit => "explicit", + RemovalCause::Replaced => "replaced", + RemovalCause::Size => "size", + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub(crate) struct PrefilterRowSelector { + row_count: usize, + skip: bool, +} + +// `parquet::arrow::arrow_reader::RowSelector` does not implement `Hash`, but +// prefilter cache keys must hash the upstream row-selection snapshot. Keep a +// local hashable mirror of the two fields that define selector semantics. +// TODO(yingwen): Remove this mirror if upstream `RowSelector` implements `Hash`. +impl From<&RowSelector> for PrefilterRowSelector { + fn from(selector: &RowSelector) -> Self { + Self { + row_count: selector.row_count, + skip: selector.skip, + } + } +} + +/// Key for a cached prefilter result. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub(crate) struct PrefilterKey { + file_id: FileId, + row_group_idx: u32, + row_selection: Option>>, + schema_version: u64, + filter_exprs: SmallVec<[String; 1]>, + mem_usage: usize, +} + +impl PrefilterKey { + pub(crate) fn row_selection_snapshot( + row_selection: Option<&RowSelection>, + ) -> Option>> { + row_selection.map(|selection| { + Arc::new( + selection + .iter() + .map(PrefilterRowSelector::from) + .collect::>(), + ) + }) + } + + pub(crate) fn new( + file_id: FileId, + row_group_idx: u32, + row_selection: Option>>, + schema_version: u64, + filter_exprs: SmallVec<[String; 1]>, + ) -> Self { + let row_selection_bytes = row_selection + .as_ref() + .map(|selection| selection.len() * mem::size_of::()) + .unwrap_or(0); + let spilled_expr_bytes = if filter_exprs.spilled() { + filter_exprs.capacity() * mem::size_of::() + } else { + 0 + }; + let expr_bytes = filter_exprs.iter().map(|s| s.capacity()).sum::(); + + Self { + file_id, + row_group_idx, + row_selection, + schema_version, + filter_exprs, + mem_usage: mem::size_of::() + + row_selection_bytes + + spilled_expr_bytes + + expr_bytes, + } + } + + fn mem_usage(&self) -> usize { + self.mem_usage + } +} + +type PrefilterResultCache = Cache>; + +fn new_prefilter_result_cache(capacity: u64) -> PrefilterResultCache { + Cache::builder() + .max_capacity(capacity) + .weigher(prefilter_result_cache_weight) + .eviction_listener(|k, v, cause| { + let size = prefilter_result_cache_weight(&k, &v); + CACHE_BYTES + .with_label_values(&[PREFILTER_RESULT_TYPE]) + .sub(size.into()); + CACHE_EVICTION + .with_label_values(&[PREFILTER_RESULT_TYPE, removal_cause_str(cause)]) + .inc(); + }) + .build() +} + +fn prefilter_result_cache_weight(k: &PrefilterKey, v: &Arc) -> u32 { + (k.mem_usage() + mem::size_of::() + v.values().len()) as u32 +} + /// Cache strategies that may only enable a subset of caches. #[derive(Clone)] pub enum CacheStrategy { @@ -358,6 +474,23 @@ impl CacheStrategy { } } + /// Calls [CacheManager::get_prefilter_result()]. + /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled]. + pub(crate) fn get_prefilter_result(&self, key: &PrefilterKey) -> Option> { + match self { + CacheStrategy::EnableAll(cache_manager) => cache_manager.get_prefilter_result(key), + CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None, + } + } + + /// Calls [CacheManager::put_prefilter_result()]. + /// It does nothing if the strategy isn't [CacheStrategy::EnableAll]. + pub(crate) fn put_prefilter_result(&self, key: PrefilterKey, result: Arc) { + if let CacheStrategy::EnableAll(cache_manager) = self { + cache_manager.put_prefilter_result(key, result); + } + } + /// Calls [CacheManager::remove_parquet_meta_data()]. pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) { match self { @@ -610,6 +743,8 @@ pub struct CacheManager { range_result_memory_limiter: Arc, /// Cache for index result. index_result_cache: Option, + /// Cache for prefilter result. + prefilter_result_cache: Option, } pub type CacheManagerRef = Arc; @@ -908,6 +1043,21 @@ impl CacheManager { pub(crate) fn index_result_cache(&self) -> Option<&IndexResultCache> { self.index_result_cache.as_ref() } + + pub(crate) fn get_prefilter_result(&self, key: &PrefilterKey) -> Option> { + self.prefilter_result_cache + .as_ref() + .and_then(|cache| update_hit_miss(cache.get(key), PREFILTER_RESULT_TYPE)) + } + + pub(crate) fn put_prefilter_result(&self, key: PrefilterKey, result: Arc) { + if let Some(cache) = &self.prefilter_result_cache { + CACHE_BYTES + .with_label_values(&[PREFILTER_RESULT_TYPE]) + .add(prefilter_result_cache_weight(&key, &result).into()); + cache.insert(key, result); + } + } } /// Increases selector cache miss metrics. @@ -930,6 +1080,7 @@ pub struct CacheManagerBuilder { index_content_size: u64, index_content_page_size: u64, index_result_cache_size: u64, + prefilter_result_cache_size: u64, puffin_metadata_size: u64, write_cache: Option, selector_result_cache_size: u64, @@ -985,6 +1136,12 @@ impl CacheManagerBuilder { self } + /// Sets cache size for prefilter result. + pub fn prefilter_result_cache_size(mut self, bytes: u64) -> Self { + self.prefilter_result_cache_size = bytes; + self + } + /// Sets cache size for puffin metadata. pub fn puffin_metadata_size(mut self, bytes: u64) -> Self { self.puffin_metadata_size = bytes; @@ -1005,15 +1162,6 @@ impl CacheManagerBuilder { /// Builds the [CacheManager]. pub fn build(self) -> CacheManager { - fn to_str(cause: RemovalCause) -> &'static str { - match cause { - RemovalCause::Expired => "expired", - RemovalCause::Explicit => "explicit", - RemovalCause::Replaced => "replaced", - RemovalCause::Size => "size", - } - } - let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| { Cache::builder() .max_capacity(self.sst_meta_cache_size) @@ -1024,7 +1172,7 @@ impl CacheManagerBuilder { .with_label_values(&[SST_META_TYPE]) .sub(size.into()); CACHE_EVICTION - .with_label_values(&[SST_META_TYPE, to_str(cause)]) + .with_label_values(&[SST_META_TYPE, removal_cause_str(cause)]) .inc(); }) .build() @@ -1039,7 +1187,7 @@ impl CacheManagerBuilder { .with_label_values(&[VECTOR_TYPE]) .sub(size.into()); CACHE_EVICTION - .with_label_values(&[VECTOR_TYPE, to_str(cause)]) + .with_label_values(&[VECTOR_TYPE, removal_cause_str(cause)]) .inc(); }) .build() @@ -1052,7 +1200,7 @@ impl CacheManagerBuilder { let size = page_cache_weight(&k, &v); CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into()); CACHE_EVICTION - .with_label_values(&[PAGE_TYPE, to_str(cause)]) + .with_label_values(&[PAGE_TYPE, removal_cause_str(cause)]) .inc(); }) .build() @@ -1073,6 +1221,8 @@ impl CacheManagerBuilder { .then(|| Arc::new(VectorIndexCache::new(self.index_content_size))); let index_result_cache = (self.index_result_cache_size != 0) .then(|| IndexResultCache::new(self.index_result_cache_size)); + let prefilter_result_cache = (self.prefilter_result_cache_size != 0) + .then(|| new_prefilter_result_cache(self.prefilter_result_cache_size)); let puffin_metadata_cache = PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES); let selector_result_cache = (self.selector_result_cache_size != 0).then(|| { @@ -1085,7 +1235,7 @@ impl CacheManagerBuilder { .with_label_values(&[SELECTOR_RESULT_TYPE]) .sub(size.into()); CACHE_EVICTION - .with_label_values(&[SELECTOR_RESULT_TYPE, to_str(cause)]) + .with_label_values(&[SELECTOR_RESULT_TYPE, removal_cause_str(cause)]) .inc(); }) .build() @@ -1100,7 +1250,7 @@ impl CacheManagerBuilder { .with_label_values(&[RANGE_RESULT_TYPE]) .sub(size.into()); CACHE_EVICTION - .with_label_values(&[RANGE_RESULT_TYPE, to_str(cause)]) + .with_label_values(&[RANGE_RESULT_TYPE, removal_cause_str(cause)]) .inc(); }) .build() @@ -1123,6 +1273,7 @@ impl CacheManagerBuilder { RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize, )), index_result_cache, + prefilter_result_cache, } } } @@ -1551,6 +1702,127 @@ mod tests { assert!(cache.get_selector_result(&key).is_some()); } + #[test] + fn test_prefilter_result_cache() { + let disabled = CacheManager::builder().build(); + let file_id = FileId::random(); + let key = PrefilterKey::new( + file_id, + 0, + None, + 1, + SmallVec::from_vec(vec!["tag_0 IN ([a])".to_string()]), + ); + let selection = Arc::new(BooleanBuffer::new_set(3)); + + disabled.put_prefilter_result(key.clone(), selection.clone()); + assert!(disabled.get_prefilter_result(&key).is_none()); + + let cache = Arc::new( + CacheManager::builder() + .prefilter_result_cache_size(1000) + .build(), + ); + assert!(cache.get_prefilter_result(&key).is_none()); + cache.put_prefilter_result(key.clone(), selection.clone()); + assert_eq!( + cache.get_prefilter_result(&key).unwrap().as_ref(), + selection.as_ref() + ); + + let enable_all = CacheStrategy::EnableAll(cache.clone()); + assert!(enable_all.get_prefilter_result(&key).is_some()); + + let compaction = CacheStrategy::Compaction(cache.clone()); + assert!(compaction.get_prefilter_result(&key).is_none()); + compaction.put_prefilter_result(key.clone(), selection.clone()); + assert!(cache.get_prefilter_result(&key).is_some()); + + let disabled_strategy = CacheStrategy::Disabled; + assert!(disabled_strategy.get_prefilter_result(&key).is_none()); + disabled_strategy.put_prefilter_result(key.clone(), selection); + assert!(cache.get_prefilter_result(&key).is_some()); + } + + #[test] + fn test_prefilter_key_distinguishes_dimensions() { + let file_id = FileId::random(); + let row_selection = RowSelection::from(vec![RowSelector::skip(1), RowSelector::select(3)]); + let other_row_selection = + RowSelection::from(vec![RowSelector::skip(2), RowSelector::select(2)]); + let row_selection = PrefilterKey::row_selection_snapshot(Some(&row_selection)); + let other_row_selection = PrefilterKey::row_selection_snapshot(Some(&other_row_selection)); + let base = PrefilterKey::new( + file_id, + 0, + row_selection.clone(), + 1, + SmallVec::from_vec(vec!["tag_0 IN ([a])".to_string()]), + ); + + assert_ne!( + base, + PrefilterKey::new( + FileId::random(), + 0, + row_selection.clone(), + 1, + SmallVec::from_vec(vec!["tag_0 IN ([a])".to_string()]) + ) + ); + assert_ne!( + base, + PrefilterKey::new( + file_id, + 1, + row_selection.clone(), + 1, + SmallVec::from_vec(vec!["tag_0 IN ([a])".to_string()]) + ) + ); + assert_ne!( + base, + PrefilterKey::new( + file_id, + 0, + other_row_selection, + 1, + SmallVec::from_vec(vec!["tag_0 IN ([a])".to_string()]) + ) + ); + assert_ne!( + base, + PrefilterKey::new( + file_id, + 0, + row_selection.clone(), + 1, + SmallVec::from_vec(vec!["tag_0 IN ([b])".to_string()]) + ) + ); + assert_ne!( + base, + PrefilterKey::new( + file_id, + 0, + row_selection.clone(), + 2, + SmallVec::from_vec(vec!["tag_0 IN ([a])".to_string()]) + ) + ); + let pk_group = PrefilterKey::new( + file_id, + 0, + row_selection, + 1, + SmallVec::from_vec(vec![ + "tag_0 IN ([a])".to_string(), + "tag_1 IN ([x])".to_string(), + ]), + ); + assert_ne!(base, pk_group); + } + #[test] fn test_range_result_cache() { let cache = Arc::new( diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 98e97fca85..3a85ff1c65 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -117,6 +117,8 @@ pub struct MitoConfig { pub selector_result_cache_size: ReadableSize, /// Cache size for flat range scan results. Setting it to 0 to disable the cache. pub range_result_cache_size: ReadableSize, + /// Cache size for prefilter results. Setting it to 0 to disable the cache. + pub prefilter_result_cache_size: ReadableSize, /// Whether to enable the write cache. pub enable_write_cache: bool, /// File system path for write cache dir's root, defaults to `{data_home}`. @@ -202,6 +204,7 @@ impl Default for MitoConfig { page_cache_size: ReadableSize::mb(512), selector_result_cache_size: ReadableSize::mb(512), range_result_cache_size: ReadableSize::mb(512), + prefilter_result_cache_size: ReadableSize::mb(128), enable_write_cache: false, write_cache_path: String::new(), write_cache_size: ReadableSize::gb(5), @@ -330,6 +333,8 @@ impl MitoConfig { self.page_cache_size = page_cache_size; self.selector_result_cache_size = mem_cache_size; self.range_result_cache_size = mem_cache_size; + // Use a smaller cache size because prefilter result usually should be small. + self.prefilter_result_cache_size = sst_meta_cache_size; self.index.adjust_buffer_and_cache_size(sys_memory); } diff --git a/src/mito2/src/sst/parquet/prefilter.rs b/src/mito2/src/sst/parquet/prefilter.rs index c98da1abac..7fb549e17d 100644 --- a/src/mito2/src/sst/parquet/prefilter.rs +++ b/src/mito2/src/sst/parquet/prefilter.rs @@ -26,17 +26,20 @@ use api::v1::SemanticType; use common_recordbatch::filter::SimpleFilterEvaluator; use datatypes::arrow::array::{Array, BinaryArray, BooleanArray, BooleanBufferBuilder}; use datatypes::arrow::buffer::BooleanBuffer; +use datatypes::arrow::datatypes::SchemaRef; use datatypes::arrow::record_batch::RecordBatch; use futures::StreamExt; use mito_codec::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter}; use parquet::arrow::ProjectionMask; use parquet::arrow::arrow_reader::RowSelection; use parquet::schema::types::SchemaDescriptor; +use smallvec::{SmallVec, smallvec}; use snafu::{OptionExt, ResultExt}; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME; use table::predicate::Predicate; +use crate::cache::PrefilterKey; use crate::error::{ ComputeArrowSnafu, DecodeSnafu, EvalPartitionFilterSnafu, NewRecordBatchSnafu, RecordBatchSnafu, Result, UnexpectedSnafu, @@ -285,7 +288,6 @@ pub(crate) fn build_reader_filter_plan( expected_metadata: Option<&RegionMetadata>, pre_filter_mode: PreFilterMode, read_format: &FlatReadFormat, - parquet_schema: &SchemaDescriptor, codec: &Arc, ) -> ReaderFilterPlan { let Some(predicate) = predicate else { @@ -372,15 +374,27 @@ pub(crate) fn build_reader_filter_plan( } } + let pk_filter_expr_strs = (!pk_filter_contexts.is_empty()).then(|| { + let mut expr_strs = pk_filter_contexts + .iter() + .map(|filter_ctx| filter_ctx.expr_str().to_string()) + .collect::>(); + expr_strs.sort(); + SmallVec::from_vec(expr_strs) + }); let pk_filter_exprs = (!primary_key_filters.is_empty()).then_some(Arc::new(primary_key_filters)); + let schema_version = expected_metadata + .map(|metadata| metadata.schema_version) + .unwrap_or_else(|| read_format.metadata().schema_version); let prefilter_builder = PrefilterContextBuilder::new( read_format, codec, pk_filter_exprs, + pk_filter_expr_strs, prefilter_simple_filters.clone(), prefilter_physical_filters, - parquet_schema, + schema_version, ); if prefilter_builder.is_some() { @@ -402,8 +416,6 @@ pub(crate) fn build_reader_filter_plan( /// Context for prefiltering a row group. pub(crate) struct PrefilterContext { - /// Projection mask for reading prefilter columns. - projection: ProjectionMask, /// Optional PK filter for legacy primary-key-format parquet. pk_filter: Option>, /// Simple filters that can be evaluated directly from the prefilter batch. @@ -411,6 +423,12 @@ pub(crate) struct PrefilterContext { /// Physical filters that can be evaluated directly from the prefilter batch. /// Physical expressions are only applied in the prefilter phase. physical_filters: Vec, + /// Region schema version used in per-filter cache keys. + schema_version: u64, + /// Sorted expression strings for the encoded-PK filter group. + pk_filter_expr_strs: Option>, + /// Arrow schema used to build narrowed prefilter projections. + arrow_schema: SchemaRef, } /// Pre-built state for constructing [PrefilterContext] per row group. @@ -419,12 +437,14 @@ pub(crate) struct PrefilterContext { /// are computed once. A fresh [PrefilterContext] with its own mutable PK filter /// is created via [PrefilterContextBuilder::build()] for each row group. pub(crate) struct PrefilterContextBuilder { - projection: ProjectionMask, pk_filters: Option>>, + pk_filter_expr_strs: Option>, filters: Vec, physical_filters: Vec, codec: Arc, metadata: RegionMetadataRef, + schema_version: u64, + arrow_schema: SchemaRef, } impl PrefilterContextBuilder { @@ -438,9 +458,10 @@ impl PrefilterContextBuilder { read_format: &FlatReadFormat, codec: &Arc, primary_key_filters: Option>>, + primary_key_filter_expr_strs: Option>, filters: Vec, physical_filters: Vec, - parquet_schema: &SchemaDescriptor, + schema_version: u64, ) -> Option { let metadata = read_format.metadata(); let use_raw_tag_columns = read_format.batch_has_raw_pk_columns(); @@ -448,6 +469,10 @@ impl PrefilterContextBuilder { .then_some(primary_key_filters) .flatten() .filter(|filters| !filters.is_empty()); + let pk_filter_expr_strs = pk_filters + .is_some() + .then_some(primary_key_filter_expr_strs) + .flatten(); let mut prefilter_column_names = HashSet::new(); for filter_ctx in &filters { @@ -464,11 +489,8 @@ impl PrefilterContextBuilder { prefilter_column_names.insert(filter_ctx.column_name().to_string()); } - let (projection, prefilter_count) = compute_projection_mask( - &prefilter_column_names, - read_format.arrow_schema(), - parquet_schema, - ); + let prefilter_count = + compute_projection_count(&prefilter_column_names, read_format.arrow_schema()); if prefilter_count == 0 { return None; @@ -487,12 +509,14 @@ impl PrefilterContextBuilder { } Some(Self { - projection, pk_filters, + pk_filter_expr_strs, filters, physical_filters, codec: Arc::clone(codec), metadata: metadata.clone(), + schema_version, + arrow_schema: read_format.arrow_schema().clone(), }) } @@ -505,10 +529,12 @@ impl PrefilterContextBuilder { Box::new(CachedPrimaryKeyFilter::new(pk_filter)) as Box }); PrefilterContext { - projection: self.projection.clone(), pk_filter, filters: self.filters.clone(), physical_filters: self.physical_filters.clone(), + schema_version: self.schema_version, + pk_filter_expr_strs: self.pk_filter_expr_strs.clone(), + arrow_schema: self.arrow_schema.clone(), } } } @@ -532,18 +558,31 @@ fn compute_projection_mask( column_names: &HashSet, arrow_schema: &datatypes::arrow::datatypes::SchemaRef, parquet_schema: &SchemaDescriptor, -) -> (ProjectionMask, usize) { +) -> ProjectionMask { + ProjectionMask::roots( + parquet_schema, + projection_indices(column_names, arrow_schema), + ) +} + +fn compute_projection_count( + column_names: &HashSet, + arrow_schema: &datatypes::arrow::datatypes::SchemaRef, +) -> usize { + projection_indices(column_names, arrow_schema).len() +} + +fn projection_indices( + column_names: &HashSet, + arrow_schema: &datatypes::arrow::datatypes::SchemaRef, +) -> Vec { let mut projection_indices: Vec = column_names .iter() .filter_map(|name| arrow_schema.column_with_name(name).map(|(index, _)| index)) .collect(); projection_indices.sort_unstable(); projection_indices.dedup(); - let count = projection_indices.len(); - ( - ProjectionMask::roots(parquet_schema, projection_indices.iter().copied()), - count, - ) + projection_indices } fn should_use_prefilter( @@ -568,18 +607,121 @@ pub(crate) async fn execute_prefilter( reader_builder: &RowGroupReaderBuilder, build_ctx: &RowGroupBuildContext<'_>, ) -> Result { + let entries = build_prefilter_cache_entries(prefilter_ctx, reader_builder, build_ctx); + + if entries.is_empty() { + return execute_prefilter_by_reading_columns(prefilter_ctx, reader_builder, build_ctx) + .await; + } + + execute_prefilter_with_result_cache(prefilter_ctx, reader_builder, build_ctx, entries).await +} + +async fn execute_prefilter_with_result_cache( + prefilter_ctx: &mut PrefilterContext, + reader_builder: &RowGroupReaderBuilder, + build_ctx: &RowGroupBuildContext<'_>, + entries: Vec, +) -> Result { + let non_cacheable_physical = non_cacheable_physical_filters(prefilter_ctx); + let mut hit_mask: Option = None; + let mut misses = Vec::new(); + for entry in entries { + let Some(key) = &entry.key else { + misses.push(entry); + continue; + }; + + if let Some(mask) = reader_builder.cache_strategy().get_prefilter_result(key) { + hit_mask = Some(match hit_mask { + Some(hit_mask) => hit_mask.bitand(mask.as_ref()), + None => mask.as_ref().clone(), + }); + } else { + misses.push(entry); + } + } + + if misses.is_empty() && non_cacheable_physical.is_empty() { + let combined_mask = hit_mask.unwrap_or_else(|| BooleanBuffer::new_set(0)); + let refined_selection = + refined_selection_from_mask(&combined_mask, &build_ctx.row_selection); + let rows_before_filter = rows_before_filter(reader_builder, build_ctx); + let filtered_rows = rows_before_filter.saturating_sub(refined_selection.row_count()); + return Ok(PrefilterResult { + refined_selection, + filtered_rows, + }); + } + + let mut uncached_entries = misses; + uncached_entries.extend( + non_cacheable_physical + .iter() + .copied() + .map(|idx| PrefilterEntry::without_cache(PrefilterEntryKind::Physical(idx))), + ); + let (uncached_mask, read_rows) = + build_prefilter_masks(prefilter_ctx, reader_builder, build_ctx, &uncached_entries).await?; + + let final_mask = match (hit_mask, uncached_mask) { + (Some(hit_mask), Some(uncached_mask)) => hit_mask.bitand(&uncached_mask), + (Some(hit_mask), None) => hit_mask, + (None, Some(uncached_mask)) => uncached_mask, + (None, None) => BooleanBuffer::new_set(read_rows), + }; + debug_assert_eq!(final_mask.len(), read_rows); + let rows_selected = final_mask.count_set_bits(); + let filtered_rows = read_rows.saturating_sub(rows_selected); + let refined_selection = refined_selection_from_mask(&final_mask, &build_ctx.row_selection); + + Ok(PrefilterResult { + refined_selection, + filtered_rows, + }) +} + +fn non_cacheable_physical_filters(prefilter_ctx: &PrefilterContext) -> Vec { + prefilter_ctx + .physical_filters + .iter() + .enumerate() + .filter_map(|(idx, filter)| (!filter.is_immutable()).then_some(idx)) + .collect() +} + +async fn build_prefilter_masks( + prefilter_ctx: &mut PrefilterContext, + reader_builder: &RowGroupReaderBuilder, + build_ctx: &RowGroupBuildContext<'_>, + entries: &[PrefilterEntry], +) -> Result<(Option, usize)> { + let prefilter_column_names = prefilter_column_names_for_entries(prefilter_ctx, entries); + let parquet_schema = reader_builder + .parquet_metadata() + .file_metadata() + .schema_descr(); + let projection = compute_projection_mask( + &prefilter_column_names, + &prefilter_ctx.arrow_schema, + parquet_schema, + ); + let mut stream = reader_builder .build_with_projection( build_ctx.row_group_idx, build_ctx.row_selection.clone(), - prefilter_ctx.projection.clone(), + projection, build_ctx.fetch_metrics, ) .await?; - let mut filter_arrays = Vec::new(); + let mut cache_builders = entries + .iter() + .map(|entry| entry.key.is_some().then(|| BooleanBufferBuilder::new(0))) + .collect::>(); + let mut combined_builder = (!entries.is_empty()).then(|| BooleanBufferBuilder::new(0)); let mut rows_before_filter = 0usize; - let mut rows_selected = 0usize; while let Some(batch_result) = stream.next().await { let batch = batch_result?; @@ -589,30 +731,78 @@ pub(crate) async fn execute_prefilter( } rows_before_filter += num_rows; - let batch_mask = match apply_filters_to_batch( - &batch, - &mut prefilter_ctx.pk_filter, - &prefilter_ctx.filters, - &prefilter_ctx.physical_filters, - reader_builder.file_path(), - )? { - Some(mask) => mask, - None => BooleanBuffer::new_unset(num_rows), - }; - rows_selected += batch_mask.count_set_bits(); - filter_arrays.push(BooleanArray::from(batch_mask)); + let mut batch_mask = BooleanBuffer::new_set(num_rows); + for (idx, entry) in entries.iter().enumerate() { + let mask = eval_entry_mask( + &batch, + prefilter_ctx, + entry.kind, + reader_builder.file_path(), + )?; + batch_mask = batch_mask.bitand(&mask); + if let Some(Some(builder)) = cache_builders.get_mut(idx) { + builder.append_buffer(&mask); + } + } + if let Some(builder) = &mut combined_builder { + builder.append_buffer(&batch_mask); + } } - let filtered_rows = rows_before_filter.saturating_sub(rows_selected); - let refined_selection = if filter_arrays.is_empty() || rows_selected == 0 { - RowSelection::from(vec![]) - } else { - let prefilter_selection = RowSelection::from_filters(&filter_arrays); - match &build_ctx.row_selection { - Some(original) => original.and_then(&prefilter_selection), - None => prefilter_selection, + for (entry, builder) in entries.iter().zip(cache_builders) { + if let (Some(key), Some(mut builder)) = (&entry.key, builder) { + reader_builder + .cache_strategy() + .put_prefilter_result(key.clone(), Arc::new(builder.finish())); } - }; + } + + Ok(( + combined_builder.map(|mut builder| builder.finish()), + rows_before_filter, + )) +} + +fn prefilter_column_names_for_entries( + prefilter_ctx: &PrefilterContext, + entries: &[PrefilterEntry], +) -> HashSet { + let mut prefilter_column_names = HashSet::new(); + for entry in entries { + match entry.kind { + PrefilterEntryKind::Simple(idx) => { + if let MaybeFilter::Filter(filter) = prefilter_ctx.filters[idx].filter() { + prefilter_column_names.insert(filter.column_name().to_string()); + } + } + PrefilterEntryKind::Physical(idx) => { + prefilter_column_names.insert( + prefilter_ctx.physical_filters[idx] + .column_name() + .to_string(), + ); + } + PrefilterEntryKind::PkGroup => { + prefilter_column_names.insert(PRIMARY_KEY_COLUMN_NAME.to_string()); + } + } + } + prefilter_column_names +} + +async fn execute_prefilter_by_reading_columns( + prefilter_ctx: &mut PrefilterContext, + reader_builder: &RowGroupReaderBuilder, + build_ctx: &RowGroupBuildContext<'_>, +) -> Result { + let entries = all_prefilter_entries(prefilter_ctx); + let (mask, rows_before_filter) = + build_prefilter_masks(prefilter_ctx, reader_builder, build_ctx, &entries).await?; + + let final_mask = mask.unwrap_or_else(|| BooleanBuffer::new_set(rows_before_filter)); + let rows_selected = final_mask.count_set_bits(); + let filtered_rows = rows_before_filter.saturating_sub(rows_selected); + let refined_selection = refined_selection_from_mask(&final_mask, &build_ctx.row_selection); Ok(PrefilterResult { refined_selection, @@ -620,100 +810,243 @@ pub(crate) async fn execute_prefilter( }) } -fn apply_filters_to_batch( +fn all_prefilter_entries(prefilter_ctx: &PrefilterContext) -> Vec { + let mut entries = Vec::new(); + if prefilter_ctx.pk_filter.is_some() { + entries.push(PrefilterEntry::without_cache(PrefilterEntryKind::PkGroup)); + } + entries.extend( + prefilter_ctx + .filters + .iter() + .enumerate() + .map(|(idx, _)| PrefilterEntry::without_cache(PrefilterEntryKind::Simple(idx))), + ); + entries.extend( + prefilter_ctx + .physical_filters + .iter() + .enumerate() + .map(|(idx, _)| PrefilterEntry::without_cache(PrefilterEntryKind::Physical(idx))), + ); + entries +} + +#[derive(Clone, Copy)] +enum PrefilterEntryKind { + Simple(usize), + Physical(usize), + PkGroup, +} + +struct PrefilterEntry { + kind: PrefilterEntryKind, + key: Option, +} + +impl PrefilterEntry { + fn without_cache(kind: PrefilterEntryKind) -> Self { + Self { kind, key: None } + } +} + +fn build_prefilter_cache_entries( + prefilter_ctx: &PrefilterContext, + reader_builder: &RowGroupReaderBuilder, + build_ctx: &RowGroupBuildContext<'_>, +) -> Vec { + let row_selection = PrefilterKey::row_selection_snapshot(build_ctx.row_selection.as_ref()); + let file_id = reader_builder.file_handle().file_id().file_id(); + let row_group_idx = build_ctx.row_group_idx as u32; + let mut entries = Vec::new(); + + for (idx, filter_ctx) in prefilter_ctx.filters.iter().enumerate() { + entries.push(PrefilterEntry { + kind: PrefilterEntryKind::Simple(idx), + key: Some(PrefilterKey::new( + file_id, + row_group_idx, + row_selection.clone(), + prefilter_ctx.schema_version, + smallvec![filter_ctx.expr_str().to_string()], + )), + }); + } + + for (idx, filter_ctx) in prefilter_ctx.physical_filters.iter().enumerate() { + if !filter_ctx.is_immutable() { + continue; + } + entries.push(PrefilterEntry { + kind: PrefilterEntryKind::Physical(idx), + key: Some(PrefilterKey::new( + file_id, + row_group_idx, + row_selection.clone(), + prefilter_ctx.schema_version, + smallvec![filter_ctx.expr_str().to_string()], + )), + }); + } + + if prefilter_ctx.pk_filter.is_some() + && let Some(exprs) = &prefilter_ctx.pk_filter_expr_strs + { + entries.push(PrefilterEntry { + kind: PrefilterEntryKind::PkGroup, + key: Some(PrefilterKey::new( + file_id, + row_group_idx, + row_selection, + prefilter_ctx.schema_version, + exprs.clone(), + )), + }); + } + + entries +} + +fn rows_before_filter( + reader_builder: &RowGroupReaderBuilder, + build_ctx: &RowGroupBuildContext<'_>, +) -> usize { + build_ctx.row_selection.as_ref().map_or_else( + || { + reader_builder + .parquet_metadata() + .row_group(build_ctx.row_group_idx) + .num_rows() as usize + }, + RowSelection::row_count, + ) +} + +fn refined_selection_from_mask( + mask: &BooleanBuffer, + original_selection: &Option, +) -> RowSelection { + if mask.is_empty() || mask.count_set_bits() == 0 { + return RowSelection::from(vec![]); + } + + let prefilter_selection = RowSelection::from_filters(&[BooleanArray::from(mask.clone())]); + match original_selection { + Some(original) => original.and_then(&prefilter_selection), + None => prefilter_selection, + } +} + +fn eval_entry_mask( batch: &RecordBatch, - pk_filter: &mut Option>, - filters: &[SimpleFilterContext], - physical_filters: &[PhysicalFilterContext], + prefilter_ctx: &mut PrefilterContext, + kind: PrefilterEntryKind, file_path: &str, -) -> Result> { - let mut mask = BooleanBuffer::new_set(batch.num_rows()); - - if let Some(pk_filter) = pk_filter.as_mut() { - // Prefilter reads a reduced projection. For PK prefilter, the encoded - // primary key column is always appended as the last projected column, - // while `__sequence` and `__op_type` are not read. - let pk_column_index = batch.num_columns() - 1; - let matched_row_ranges = - matching_row_ranges_by_primary_key(batch, pk_column_index, pk_filter.as_mut())?; - let mut builder = BooleanBufferBuilder::new(batch.num_rows()); - builder.append_n(batch.num_rows(), false); - for range in matched_row_ranges { - for row in range { - builder.set_bit(row, true); - } +) -> Result { + match kind { + PrefilterEntryKind::Simple(idx) => { + eval_simple_filter_mask(batch, &prefilter_ctx.filters[idx], file_path) } - mask = mask.bitand(&builder.finish()); - } - - for filter_ctx in filters { - let filter = match filter_ctx.filter() { - MaybeFilter::Filter(filter) => filter, - MaybeFilter::Matched => continue, - MaybeFilter::Pruned => return Ok(None), - }; - - let (idx, _) = batch - .schema() - .column_with_name(filter.column_name()) - .with_context(|| UnexpectedSnafu { - reason: format!( - "Prefilter column '{}' (id {}) not found in batch for file {}", - filter.column_name(), - filter_ctx.column_id(), - file_path - ), - })?; - let column = batch.column(idx).clone(); - let result = filter.evaluate_array(&column).context(RecordBatchSnafu)?; - mask = mask.bitand(&result); - } - - for filter_ctx in physical_filters { - let filter = filter_ctx.filter(); - - let (idx, _) = batch - .schema() - .column_with_name(filter_ctx.column_name()) - .with_context(|| UnexpectedSnafu { - reason: format!( - "Prefilter physical column '{}' (id {}) not found in batch for file {}", - filter_ctx.column_name(), - filter_ctx.column_id(), - file_path - ), - })?; - let column = batch.column(idx).clone(); - - let record_batch = RecordBatch::try_new(filter_ctx.schema().clone(), vec![column]) - .context(NewRecordBatchSnafu)?; - let evaluated = filter - .evaluate(&record_batch) - .context(EvalPartitionFilterSnafu)?; - let array = evaluated - .into_array(record_batch.num_rows()) - .context(EvalPartitionFilterSnafu)?; - let boolean_array = - array - .as_any() - .downcast_ref::() - .context(UnexpectedSnafu { - reason: "Failed to downcast physical filter result to BooleanArray", - })?; - // Treat null results as false (filtered out); value bits are not guaranteed - // to be false for invalid entries. - let mut result = boolean_array.values().clone(); - if let Some(nulls) = boolean_array.nulls() { - result = result.bitand(nulls.inner()); + PrefilterEntryKind::Physical(idx) => { + eval_physical_filter_mask(batch, &prefilter_ctx.physical_filters[idx], file_path) + } + PrefilterEntryKind::PkGroup => { + let pk_filter = prefilter_ctx.pk_filter.as_mut().context(UnexpectedSnafu { + reason: "Missing primary key filter for prefilter cache entry", + })?; + eval_pk_group_mask(batch, pk_filter.as_mut()) } - mask = mask.bitand(&result); } +} - if mask.count_set_bits() == 0 { - Ok(None) - } else { - Ok(Some(mask)) +fn eval_pk_group_mask( + batch: &RecordBatch, + pk_filter: &mut dyn PrimaryKeyFilter, +) -> Result { + let (pk_column_index, _) = batch + .schema() + .column_with_name(PRIMARY_KEY_COLUMN_NAME) + .context(UnexpectedSnafu { + reason: "Primary key column not found in prefilter batch", + })?; + let matched_row_ranges = matching_row_ranges_by_primary_key(batch, pk_column_index, pk_filter)?; + let mut builder = BooleanBufferBuilder::new(batch.num_rows()); + builder.append_n(batch.num_rows(), false); + for range in matched_row_ranges { + for row in range { + builder.set_bit(row, true); + } } + Ok(builder.finish()) +} + +fn eval_simple_filter_mask( + batch: &RecordBatch, + filter_ctx: &SimpleFilterContext, + file_path: &str, +) -> Result { + let filter = match filter_ctx.filter() { + MaybeFilter::Filter(filter) => filter, + MaybeFilter::Matched => return Ok(BooleanBuffer::new_set(batch.num_rows())), + MaybeFilter::Pruned => return Ok(BooleanBuffer::new_unset(batch.num_rows())), + }; + + let (idx, _) = batch + .schema() + .column_with_name(filter.column_name()) + .with_context(|| UnexpectedSnafu { + reason: format!( + "Prefilter column '{}' (id {}) not found in batch for file {}", + filter.column_name(), + filter_ctx.column_id(), + file_path + ), + })?; + let column = batch.column(idx).clone(); + filter.evaluate_array(&column).context(RecordBatchSnafu) +} + +fn eval_physical_filter_mask( + batch: &RecordBatch, + filter_ctx: &PhysicalFilterContext, + file_path: &str, +) -> Result { + let filter = filter_ctx.filter(); + + let (idx, _) = batch + .schema() + .column_with_name(filter_ctx.column_name()) + .with_context(|| UnexpectedSnafu { + reason: format!( + "Prefilter physical column '{}' (id {}) not found in batch for file {}", + filter_ctx.column_name(), + filter_ctx.column_id(), + file_path + ), + })?; + let column = batch.column(idx).clone(); + + let record_batch = RecordBatch::try_new(filter_ctx.schema().clone(), vec![column]) + .context(NewRecordBatchSnafu)?; + let evaluated = filter + .evaluate(&record_batch) + .context(EvalPartitionFilterSnafu)?; + let array = evaluated + .into_array(record_batch.num_rows()) + .context(EvalPartitionFilterSnafu)?; + let boolean_array = array + .as_any() + .downcast_ref::() + .context(UnexpectedSnafu { + reason: "Failed to downcast physical filter result to BooleanArray", + })?; + // Treat null results as false (filtered out); value bits are not guaranteed + // to be false for invalid entries. + let mut result = boolean_array.values().clone(); + if let Some(nulls) = boolean_array.nulls() { + result = result.bitand(nulls.inner()); + } + Ok(result) } #[cfg(test)] @@ -728,7 +1061,6 @@ mod tests { }; use datatypes::arrow::datatypes::{Schema, UInt32Type}; use mito_codec::row_converter::{PrimaryKeyFilter, build_primary_key_codec}; - use parquet::arrow::ArrowSchemaConverter; use store_api::codec::PrimaryKeyEncoding; use super::*; @@ -800,12 +1132,6 @@ mod tests { .collect() } - fn parquet_schema(read_format: &FlatReadFormat) -> SchemaDescriptor { - ArrowSchemaConverter::new() - .convert(read_format.arrow_schema()) - .unwrap() - } - fn new_raw_batch(primary_keys: &[&[u8]], field_values: &[u64]) -> RecordBatch { assert_eq!(primary_keys.len(), field_values.len()); @@ -989,15 +1315,15 @@ mod tests { ) .unwrap(); let codec = build_primary_key_codec(metadata.as_ref()); - let parquet_schema = parquet_schema(&read_format); let builder = PrefilterContextBuilder::new( &read_format, &codec, None, + None, Vec::new(), Vec::new(), - &parquet_schema, + metadata.schema_version, ); assert!(builder.is_none()); } @@ -1091,7 +1417,6 @@ mod tests { true, ) .unwrap(); - let full_parquet_schema = parquet_schema(&full_read_format); let codec = build_primary_key_codec(metadata.as_ref()); let skip_fields_plan = build_reader_filter_plan( @@ -1102,7 +1427,6 @@ mod tests { None, PreFilterMode::SkipFields, &full_read_format, - &full_parquet_schema, &codec, ); assert!(skip_fields_plan.prefilter_builder.is_some()); @@ -1121,13 +1445,11 @@ mod tests { true, ) .unwrap(); - let projected_parquet_schema = parquet_schema(&projected_read_format); let pk_prefilter_plan = build_reader_filter_plan( Some(&Predicate::new(vec![col("tag_0").eq(lit("a"))])), None, PreFilterMode::All, &projected_read_format, - &projected_parquet_schema, &codec, ); assert!(pk_prefilter_plan.prefilter_builder.is_some()); @@ -1135,35 +1457,94 @@ mod tests { } #[test] - fn test_apply_filters_to_batch_uses_flat_tag_columns_directly() { + fn test_pk_filter_expr_strings_are_stable_under_expr_order() { + let metadata: RegionMetadataRef = Arc::new(sst_region_metadata_with_encoding( + PrimaryKeyEncoding::Sparse, + )); + let read_format = FlatReadFormat::new( + metadata.clone(), + ReadColumns::from_deduped_column_ids( + metadata.column_metadatas.iter().map(|c| c.column_id), + ), + None, + "test", + false, + ) + .unwrap(); + let codec = build_primary_key_codec(metadata.as_ref()); + + let expr_a = col("tag_0").eq(lit("a")); + let expr_b = col("tag_1").eq(lit("x")); + let plan_ab = build_reader_filter_plan( + Some(&Predicate::new(vec![expr_a.clone(), expr_b.clone()])), + None, + PreFilterMode::All, + &read_format, + &codec, + ); + let plan_b_a = build_reader_filter_plan( + Some(&Predicate::new(vec![expr_b, expr_a])), + None, + PreFilterMode::All, + &read_format, + &codec, + ); + + let exprs_ab = plan_ab.prefilter_builder.unwrap().pk_filter_expr_strs; + let exprs_b_a = plan_b_a.prefilter_builder.unwrap().pk_filter_expr_strs; + assert!(exprs_ab.is_some()); + assert_eq!(exprs_ab, exprs_b_a); + } + + #[test] + fn test_simple_and_physical_contexts_preserve_expr_strings() { + let metadata: RegionMetadataRef = Arc::new(sst_region_metadata()); + let read_format = FlatReadFormat::new( + metadata.clone(), + ReadColumns::from_deduped_column_ids( + metadata.column_metadatas.iter().map(|c| c.column_id), + ), + None, + "test", + true, + ) + .unwrap(); + + let simple_expr = col("tag_0").eq(lit("a")); + let simple = SimpleFilterContext::new_opt(&metadata, None, &simple_expr).unwrap(); + assert_eq!(simple.expr_str(), format!("{simple_expr:?}")); + + let physical_expr = col("field_0").in_list(vec![lit(1_u64), lit(2_u64)], false); + let physical = + PhysicalFilterContext::new_opt(&metadata, None, &read_format, &physical_expr).unwrap(); + assert_eq!(physical.expr_str(), format!("{physical_expr:?}")); + } + + #[test] + fn test_eval_simple_filter_mask_uses_flat_tag_columns_directly() { let metadata: RegionMetadataRef = Arc::new(sst_region_metadata()); let filters = new_simple_filter_contexts(&metadata, &[col("tag_0").eq(lit("a"))]); let batch = new_record_batch_with_custom_sequence(&["a", "x"], 0, 4, 1); - let mut no_pk_filter = None; - let mask = apply_filters_to_batch(&batch, &mut no_pk_filter, &filters, &[], "test") - .unwrap() - .unwrap(); + let mask = eval_simple_filter_mask(&batch, &filters[0], "test").unwrap(); assert_eq!(mask.count_set_bits(), 4); } #[test] - fn test_apply_filters_to_batch_errors_on_missing_selected_column() { + fn test_eval_simple_filter_mask_errors_on_missing_selected_column() { let metadata: RegionMetadataRef = Arc::new(sst_region_metadata()); let filters = new_simple_filter_contexts(&metadata, &[col("tag_0").eq(lit("a"))]); let pk = new_primary_key(&["a", "x"]); let batch = new_raw_batch(&[pk.as_slice()], &[10]); - let mut no_pk_filter = None; - let err = - apply_filters_to_batch(&batch, &mut no_pk_filter, &filters, &[], "test").unwrap_err(); + let err = eval_simple_filter_mask(&batch, &filters[0], "test").unwrap_err(); let err = err.to_string(); assert!(err.contains("Prefilter column")); assert!(err.contains("tag_0")); } #[test] - fn test_apply_filters_to_batch_evaluates_physical_filters() { + fn test_eval_physical_filter_mask_evaluates_physical_filters() { let metadata: RegionMetadataRef = Arc::new(sst_region_metadata_with_encoding(PrimaryKeyEncoding::Dense)); let read_format = FlatReadFormat::new( @@ -1181,16 +1562,12 @@ mod tests { let pk = new_primary_key(&["a", "x"]); let batch = new_raw_batch(&[pk.as_slice(), pk.as_slice(), pk.as_slice()], &[9, 10, 11]); - let mut no_pk_filter = None; - let mask = - apply_filters_to_batch(&batch, &mut no_pk_filter, &[], &physical_filters, "test") - .unwrap() - .unwrap(); + let mask = eval_physical_filter_mask(&batch, &physical_filters[0], "test").unwrap(); assert_eq!(mask.count_set_bits(), 1); } #[test] - fn test_apply_filters_to_batch_uses_last_projected_column_for_pk_prefilter() { + fn test_eval_pk_group_mask_finds_pk_column_by_name() { let metadata = Arc::new(sst_region_metadata()); let filters = Arc::new(new_test_filters(&[col("tag_0").eq(lit("a"))])); let mut pk_filter = Some(Box::new(CachedPrimaryKeyFilter::new( @@ -1208,9 +1585,7 @@ mod tests { &[10, 11, 12, 13], ); - let mask = apply_filters_to_batch(&batch, &mut pk_filter, &[], &[], "test") - .unwrap() - .unwrap(); + let mask = eval_pk_group_mask(&batch, pk_filter.as_mut().unwrap().as_mut()).unwrap(); assert_eq!(mask.count_set_bits(), 2); } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 0e1ce8d28b..5d812f6307 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -26,8 +26,9 @@ use api::v1::SemanticType; use common_recordbatch::filter::SimpleFilterEvaluator; use common_telemetry::{error, tracing, warn}; use datafusion::physical_plan::PhysicalExpr; -use datafusion_expr::Expr; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_expr::utils::expr_to_columns; +use datafusion_expr::{Expr, Volatility}; use datatypes::arrow::array::ArrayRef; use datatypes::arrow::datatypes::{Field, Schema as ArrowSchema, SchemaRef}; use datatypes::arrow::record_batch::RecordBatch; @@ -458,7 +459,6 @@ impl ParquetReaderBuilder { self.expected_metadata.as_deref(), self.pre_filter_mode, &read_format, - parquet_meta.file_metadata().schema_descr(), &codec, ); @@ -1862,6 +1862,8 @@ impl MaybeFilter { pub(crate) struct SimpleFilterContext { /// Filter to evaluate. filter: MaybeFilter, + /// Debug string of the original logical expression. + expr_str: String, /// Id of the column to evaluate. column_id: ColumnId, /// Semantic type of the column. @@ -1879,6 +1881,7 @@ impl SimpleFilterContext { expr: &Expr, ) -> Option { let filter = SimpleFilterEvaluator::try_new(expr)?; + let expr_str = format!("{expr:?}"); let (column_metadata, maybe_filter) = match expected_meta { Some(meta) => { // Gets the column metadata from the expected metadata. @@ -1924,6 +1927,7 @@ impl SimpleFilterContext { Some(Self { filter: maybe_filter, + expr_str, column_id: column_metadata.column_id, semantic_type: column_metadata.semantic_type, }) @@ -1934,6 +1938,11 @@ impl SimpleFilterContext { &self.filter } + /// Returns the original logical expression string. + pub(crate) fn expr_str(&self) -> &str { + &self.expr_str + } + /// Returns the column id. pub(crate) fn column_id(&self) -> ColumnId { self.column_id @@ -1950,6 +1959,8 @@ impl SimpleFilterContext { pub(crate) struct PhysicalFilterContext { /// Filter to evaluate. filter: Arc, + /// Debug string of the original logical expression. + expr_str: String, /// Id of the column to evaluate. column_id: ColumnId, /// Name of the column to evaluate. @@ -1958,6 +1969,8 @@ pub(crate) struct PhysicalFilterContext { semantic_type: SemanticType, /// Schema containing only the referenced column. schema: SchemaRef, + /// Whether the original logical expression is immutable across queries. + immutable: bool, } impl PhysicalFilterContext { @@ -1974,6 +1987,7 @@ impl PhysicalFilterContext { if !Self::is_prefilter_candidate(expr) { return None; } + let expr_str = format!("{expr:?}"); let column_name = Self::single_column_name(expr)?; let column_metadata = match expected_meta { Some(meta) => { @@ -1998,13 +2012,16 @@ impl PhysicalFilterContext { error!(e; "Unable to build physical filter for {expr}, schema: {schema:?}"); }) .ok()?; + let immutable = expr_is_immutable(expr); Some(Self { filter: physical_expr, + expr_str, column_id: column_metadata.column_id, column_name, semantic_type: column_metadata.semantic_type, schema, + immutable, }) } @@ -2035,6 +2052,11 @@ impl PhysicalFilterContext { &self.filter } + /// Returns the original logical expression string. + pub(crate) fn expr_str(&self) -> &str { + &self.expr_str + } + /// Returns the column id. pub(crate) fn column_id(&self) -> ColumnId { self.column_id @@ -2054,6 +2076,29 @@ impl PhysicalFilterContext { pub(crate) fn schema(&self) -> &SchemaRef { &self.schema } + + /// Returns true if the original logical expression is immutable across queries. + pub(crate) fn is_immutable(&self) -> bool { + self.immutable + } +} + +fn expr_is_immutable(expr: &Expr) -> bool { + let mut is_immutable = true; + let _ = expr.apply(|expr| match expr { + Expr::ScalarFunction(function) + if function.func.signature().volatility != Volatility::Immutable => + { + is_immutable = false; + Ok(TreeNodeRecursion::Stop) + } + Expr::ScalarVariable(_, _) => { + is_immutable = false; + Ok(TreeNodeRecursion::Stop) + } + _ => Ok(TreeNodeRecursion::Continue), + }); + is_immutable } /// Prune a column by its default value. @@ -2335,6 +2380,74 @@ mod tests { assert!(!selection.is_empty()); } + #[test] + fn test_expr_is_immutable_checks_scalar_function_volatility() { + #[derive(Debug, PartialEq, Eq, Hash)] + struct TestVolatilityUdf { + name: String, + signature: Signature, + } + + impl TestVolatilityUdf { + fn new(name: &str, volatility: Volatility) -> Self { + Self { + name: name.to_string(), + signature: Signature::variadic_any(volatility), + } + } + } + + impl ScalarUDFImpl for TestVolatilityUdf { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + &self.name + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> datafusion_common::Result { + Ok(DataType::Int64) + } + + fn invoke_with_args( + &self, + _args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(1)))) + } + } + + let expr = |name: &str, volatility| { + Expr::ScalarFunction(ScalarFunction::new_udf( + Arc::new(ScalarUDF::new_from_impl(TestVolatilityUdf::new( + name, volatility, + ))), + vec![], + )) + }; + + assert!(expr_is_immutable(&expr( + "immutable_udf", + Volatility::Immutable + ))); + assert!(!expr_is_immutable(&expr("stable_udf", Volatility::Stable))); + assert!(!expr_is_immutable(&expr( + "volatile_udf", + Volatility::Volatile + ))); + + let scalar_variable = Expr::ScalarVariable( + Arc::new(Field::new("@@version", DataType::Utf8, false)), + vec!["@@version".to_string()], + ); + assert!(!expr_is_immutable(&scalar_variable)); + } + #[tokio::test(flavor = "current_thread")] async fn test_has_row_level_selection() { let object_store = ObjectStore::new(Memory::default()).unwrap().finish(); diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 6e9573a9bb..75279acf8b 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -208,6 +208,7 @@ impl WorkerGroup { .page_cache_size(config.page_cache_size.as_bytes()) .selector_result_cache_size(config.selector_result_cache_size.as_bytes()) .range_result_cache_size(config.range_result_cache_size.as_bytes()) + .prefilter_result_cache_size(config.prefilter_result_cache_size.as_bytes()) .index_metadata_size(config.index.metadata_cache_size.as_bytes()) .index_content_size(config.index.content_cache_size.as_bytes()) .index_content_page_size(config.index.content_cache_page_size.as_bytes()) @@ -423,6 +424,7 @@ impl WorkerGroup { .page_cache_size(config.page_cache_size.as_bytes()) .selector_result_cache_size(config.selector_result_cache_size.as_bytes()) .range_result_cache_size(config.range_result_cache_size.as_bytes()) + .prefilter_result_cache_size(config.prefilter_result_cache_size.as_bytes()) .write_cache(write_cache) .build(), ); diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index f17b78a7e5..7f411cdec2 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1719,10 +1719,11 @@ fn drop_lines_with_inconsistent_results(input: String) -> String { "vector_cache_size =", "page_cache_size =", "selector_result_cache_size =", + "range_result_cache_size =", + "prefilter_result_cache_size =", "metadata_cache_size =", "content_cache_size =", "result_cache_size =", - "range_result_cache_size =", "name =", "recovery_parallelism =", "max_background_index_builds =",