diff --git a/config/config.md b/config/config.md
index b1630d97ad..0fae0caaa4 100644
--- a/config/config.md
+++ b/config/config.md
@@ -155,6 +155,8 @@
| `region_engine.mito.vector_cache_size` | String | Auto | Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.
If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.page_cache_size` | String | Auto | Cache size for pages of SST row groups. Setting it to 0 to disable the cache.
If not set, it's default to 1/8 of OS memory. |
| `region_engine.mito.selector_result_cache_size` | String | Auto | Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.
If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
+| `region_engine.mito.range_result_cache_size` | String | Auto | Cache size for flat range scan results. Setting it to 0 to disable the cache.
If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
+| `region_engine.mito.prefilter_result_cache_size` | String | Auto | Cache size for prefilter results. Setting it to 0 to disable the cache.
If not set, it's default to 1/32 of OS memory with a max limitation of 128MB. |
| `region_engine.mito.enable_write_cache` | Bool | `false` | Whether to enable the write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. |
| `region_engine.mito.write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}`. |
| `region_engine.mito.write_cache_size` | String | `5GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |
@@ -543,6 +545,8 @@
| `region_engine.mito.vector_cache_size` | String | Auto | Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.
If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.page_cache_size` | String | Auto | Cache size for pages of SST row groups. Setting it to 0 to disable the cache.
If not set, it's default to 1/8 of OS memory. |
| `region_engine.mito.selector_result_cache_size` | String | Auto | Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.
If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
+| `region_engine.mito.range_result_cache_size` | String | Auto | Cache size for flat range scan results. Setting it to 0 to disable the cache.
If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
+| `region_engine.mito.prefilter_result_cache_size` | String | Auto | Cache size for prefilter results. Setting it to 0 to disable the cache.
If not set, it's default to 1/32 of OS memory with a max limitation of 128MB. |
| `region_engine.mito.enable_write_cache` | Bool | `false` | Whether to enable the write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. |
| `region_engine.mito.write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}`. |
| `region_engine.mito.write_cache_size` | String | `5GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |
diff --git a/config/datanode.example.toml b/config/datanode.example.toml
index 170045a090..d558918daf 100644
--- a/config/datanode.example.toml
+++ b/config/datanode.example.toml
@@ -480,6 +480,16 @@ auto_flush_interval = "1h"
## @toml2docs:none-default="Auto"
#+ selector_result_cache_size = "512MB"
+## Cache size for flat range scan results. Setting it to 0 to disable the cache.
+## If not set, it's default to 1/16 of OS memory with a max limitation of 512MB.
+## @toml2docs:none-default="Auto"
+#+ range_result_cache_size = "512MB"
+
+## Cache size for prefilter results. Setting it to 0 to disable the cache.
+## If not set, it's default to 1/32 of OS memory with a max limitation of 128MB.
+## @toml2docs:none-default="Auto"
+#+ prefilter_result_cache_size = "128MB"
+
## Whether to enable the write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance.
enable_write_cache = false
diff --git a/config/standalone.example.toml b/config/standalone.example.toml
index 24249270b2..d5c42e744c 100644
--- a/config/standalone.example.toml
+++ b/config/standalone.example.toml
@@ -599,6 +599,16 @@ auto_flush_interval = "1h"
## @toml2docs:none-default="Auto"
#+ selector_result_cache_size = "512MB"
+## Cache size for flat range scan results. Setting it to 0 to disable the cache.
+## If not set, it's default to 1/16 of OS memory with a max limitation of 512MB.
+## @toml2docs:none-default="Auto"
+#+ range_result_cache_size = "512MB"
+
+## Cache size for prefilter results. Setting it to 0 to disable the cache.
+## If not set, it's default to 1/32 of OS memory with a max limitation of 128MB.
+## @toml2docs:none-default="Auto"
+#+ prefilter_result_cache_size = "128MB"
+
## Whether to enable the write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance.
enable_write_cache = false
diff --git a/src/cmd/src/datanode/objbench.rs b/src/cmd/src/datanode/objbench.rs
index a298430c83..65f194d19f 100644
--- a/src/cmd/src/datanode/objbench.rs
+++ b/src/cmd/src/datanode/objbench.rs
@@ -588,6 +588,8 @@ async fn build_cache_manager(
.vector_cache_size(config.vector_cache_size.as_bytes())
.page_cache_size(config.page_cache_size.as_bytes())
.selector_result_cache_size(config.selector_result_cache_size.as_bytes())
+ .range_result_cache_size(config.range_result_cache_size.as_bytes())
+ .prefilter_result_cache_size(config.prefilter_result_cache_size.as_bytes())
.index_metadata_size(config.index.metadata_cache_size.as_bytes())
.index_content_size(config.index.content_cache_size.as_bytes())
.index_content_page_size(config.index.content_cache_page_size.as_bytes())
diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs
index c05db5b989..eee1cfae0a 100644
--- a/src/mito2/src/cache.rs
+++ b/src/mito2/src/cache.rs
@@ -30,6 +30,7 @@ use std::sync::Arc;
use bytes::Bytes;
use common_base::readable_size::ReadableSize;
use common_telemetry::warn;
+use datatypes::arrow::buffer::BooleanBuffer;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
@@ -38,8 +39,10 @@ use index::result_cache::IndexResultCache;
use moka::notification::RemovalCause;
use moka::sync::Cache;
use object_store::ObjectStore;
+use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::file::metadata::{FileMetaData, PageIndexPolicy, ParquetMetaData};
use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
+use smallvec::SmallVec;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelector};
@@ -74,6 +77,8 @@ const INDEX_TYPE: &str = "index";
const SELECTOR_RESULT_TYPE: &str = "selector_result";
/// Metrics type key for range scan result cache.
const RANGE_RESULT_TYPE: &str = "range_result";
+/// Metrics type key for prefilter result cache.
+const PREFILTER_RESULT_TYPE: &str = "prefilter_result";
const RANGE_RESULT_CONCAT_MEMORY_LIMIT: ReadableSize = ReadableSize::mb(512);
const RANGE_RESULT_CONCAT_MEMORY_PERMIT: ReadableSize = ReadableSize::kb(1);
@@ -274,6 +279,117 @@ fn strip_region_metadata_from_parquet(parquet_metadata: ParquetMetaData) -> Parq
.build()
}
+fn removal_cause_str(cause: RemovalCause) -> &'static str {
+ match cause {
+ RemovalCause::Expired => "expired",
+ RemovalCause::Explicit => "explicit",
+ RemovalCause::Replaced => "replaced",
+ RemovalCause::Size => "size",
+ }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub(crate) struct PrefilterRowSelector {
+ row_count: usize,
+ skip: bool,
+}
+
+// `parquet::arrow::arrow_reader::RowSelector` does not implement `Hash`, but
+// prefilter cache keys must hash the upstream row-selection snapshot. Keep a
+// local hashable mirror of the two fields that define selector semantics.
+// TODO(yingwen): Remove this mirror if upstream `RowSelector` implements `Hash`.
+impl From<&RowSelector> for PrefilterRowSelector {
+ fn from(selector: &RowSelector) -> Self {
+ Self {
+ row_count: selector.row_count,
+ skip: selector.skip,
+ }
+ }
+}
+
+/// Key for a cached prefilter result.
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub(crate) struct PrefilterKey {
+ file_id: FileId,
+ row_group_idx: u32,
+ row_selection: Option>>,
+ schema_version: u64,
+ filter_exprs: SmallVec<[String; 1]>,
+ mem_usage: usize,
+}
+
+impl PrefilterKey {
+ pub(crate) fn row_selection_snapshot(
+ row_selection: Option<&RowSelection>,
+ ) -> Option>> {
+ row_selection.map(|selection| {
+ Arc::new(
+ selection
+ .iter()
+ .map(PrefilterRowSelector::from)
+ .collect::>(),
+ )
+ })
+ }
+
+ pub(crate) fn new(
+ file_id: FileId,
+ row_group_idx: u32,
+ row_selection: Option>>,
+ schema_version: u64,
+ filter_exprs: SmallVec<[String; 1]>,
+ ) -> Self {
+ let row_selection_bytes = row_selection
+ .as_ref()
+ .map(|selection| selection.len() * mem::size_of::())
+ .unwrap_or(0);
+ let spilled_expr_bytes = if filter_exprs.spilled() {
+ filter_exprs.capacity() * mem::size_of::()
+ } else {
+ 0
+ };
+ let expr_bytes = filter_exprs.iter().map(|s| s.capacity()).sum::();
+
+ Self {
+ file_id,
+ row_group_idx,
+ row_selection,
+ schema_version,
+ filter_exprs,
+ mem_usage: mem::size_of::()
+ + row_selection_bytes
+ + spilled_expr_bytes
+ + expr_bytes,
+ }
+ }
+
+ fn mem_usage(&self) -> usize {
+ self.mem_usage
+ }
+}
+
+type PrefilterResultCache = Cache>;
+
+fn new_prefilter_result_cache(capacity: u64) -> PrefilterResultCache {
+ Cache::builder()
+ .max_capacity(capacity)
+ .weigher(prefilter_result_cache_weight)
+ .eviction_listener(|k, v, cause| {
+ let size = prefilter_result_cache_weight(&k, &v);
+ CACHE_BYTES
+ .with_label_values(&[PREFILTER_RESULT_TYPE])
+ .sub(size.into());
+ CACHE_EVICTION
+ .with_label_values(&[PREFILTER_RESULT_TYPE, removal_cause_str(cause)])
+ .inc();
+ })
+ .build()
+}
+
+fn prefilter_result_cache_weight(k: &PrefilterKey, v: &Arc) -> u32 {
+ (k.mem_usage() + mem::size_of::() + v.values().len()) as u32
+}
+
/// Cache strategies that may only enable a subset of caches.
#[derive(Clone)]
pub enum CacheStrategy {
@@ -358,6 +474,23 @@ impl CacheStrategy {
}
}
+ /// Calls [CacheManager::get_prefilter_result()].
+ /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
+ pub(crate) fn get_prefilter_result(&self, key: &PrefilterKey) -> Option> {
+ match self {
+ CacheStrategy::EnableAll(cache_manager) => cache_manager.get_prefilter_result(key),
+ CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
+ }
+ }
+
+ /// Calls [CacheManager::put_prefilter_result()].
+ /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
+ pub(crate) fn put_prefilter_result(&self, key: PrefilterKey, result: Arc) {
+ if let CacheStrategy::EnableAll(cache_manager) = self {
+ cache_manager.put_prefilter_result(key, result);
+ }
+ }
+
/// Calls [CacheManager::remove_parquet_meta_data()].
pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
match self {
@@ -610,6 +743,8 @@ pub struct CacheManager {
range_result_memory_limiter: Arc,
/// Cache for index result.
index_result_cache: Option,
+ /// Cache for prefilter result.
+ prefilter_result_cache: Option,
}
pub type CacheManagerRef = Arc;
@@ -908,6 +1043,21 @@ impl CacheManager {
pub(crate) fn index_result_cache(&self) -> Option<&IndexResultCache> {
self.index_result_cache.as_ref()
}
+
+ pub(crate) fn get_prefilter_result(&self, key: &PrefilterKey) -> Option> {
+ self.prefilter_result_cache
+ .as_ref()
+ .and_then(|cache| update_hit_miss(cache.get(key), PREFILTER_RESULT_TYPE))
+ }
+
+ pub(crate) fn put_prefilter_result(&self, key: PrefilterKey, result: Arc) {
+ if let Some(cache) = &self.prefilter_result_cache {
+ CACHE_BYTES
+ .with_label_values(&[PREFILTER_RESULT_TYPE])
+ .add(prefilter_result_cache_weight(&key, &result).into());
+ cache.insert(key, result);
+ }
+ }
}
/// Increases selector cache miss metrics.
@@ -930,6 +1080,7 @@ pub struct CacheManagerBuilder {
index_content_size: u64,
index_content_page_size: u64,
index_result_cache_size: u64,
+ prefilter_result_cache_size: u64,
puffin_metadata_size: u64,
write_cache: Option,
selector_result_cache_size: u64,
@@ -985,6 +1136,12 @@ impl CacheManagerBuilder {
self
}
+ /// Sets cache size for prefilter result.
+ pub fn prefilter_result_cache_size(mut self, bytes: u64) -> Self {
+ self.prefilter_result_cache_size = bytes;
+ self
+ }
+
/// Sets cache size for puffin metadata.
pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
self.puffin_metadata_size = bytes;
@@ -1005,15 +1162,6 @@ impl CacheManagerBuilder {
/// Builds the [CacheManager].
pub fn build(self) -> CacheManager {
- fn to_str(cause: RemovalCause) -> &'static str {
- match cause {
- RemovalCause::Expired => "expired",
- RemovalCause::Explicit => "explicit",
- RemovalCause::Replaced => "replaced",
- RemovalCause::Size => "size",
- }
- }
-
let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
Cache::builder()
.max_capacity(self.sst_meta_cache_size)
@@ -1024,7 +1172,7 @@ impl CacheManagerBuilder {
.with_label_values(&[SST_META_TYPE])
.sub(size.into());
CACHE_EVICTION
- .with_label_values(&[SST_META_TYPE, to_str(cause)])
+ .with_label_values(&[SST_META_TYPE, removal_cause_str(cause)])
.inc();
})
.build()
@@ -1039,7 +1187,7 @@ impl CacheManagerBuilder {
.with_label_values(&[VECTOR_TYPE])
.sub(size.into());
CACHE_EVICTION
- .with_label_values(&[VECTOR_TYPE, to_str(cause)])
+ .with_label_values(&[VECTOR_TYPE, removal_cause_str(cause)])
.inc();
})
.build()
@@ -1052,7 +1200,7 @@ impl CacheManagerBuilder {
let size = page_cache_weight(&k, &v);
CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
CACHE_EVICTION
- .with_label_values(&[PAGE_TYPE, to_str(cause)])
+ .with_label_values(&[PAGE_TYPE, removal_cause_str(cause)])
.inc();
})
.build()
@@ -1073,6 +1221,8 @@ impl CacheManagerBuilder {
.then(|| Arc::new(VectorIndexCache::new(self.index_content_size)));
let index_result_cache = (self.index_result_cache_size != 0)
.then(|| IndexResultCache::new(self.index_result_cache_size));
+ let prefilter_result_cache = (self.prefilter_result_cache_size != 0)
+ .then(|| new_prefilter_result_cache(self.prefilter_result_cache_size));
let puffin_metadata_cache =
PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
@@ -1085,7 +1235,7 @@ impl CacheManagerBuilder {
.with_label_values(&[SELECTOR_RESULT_TYPE])
.sub(size.into());
CACHE_EVICTION
- .with_label_values(&[SELECTOR_RESULT_TYPE, to_str(cause)])
+ .with_label_values(&[SELECTOR_RESULT_TYPE, removal_cause_str(cause)])
.inc();
})
.build()
@@ -1100,7 +1250,7 @@ impl CacheManagerBuilder {
.with_label_values(&[RANGE_RESULT_TYPE])
.sub(size.into());
CACHE_EVICTION
- .with_label_values(&[RANGE_RESULT_TYPE, to_str(cause)])
+ .with_label_values(&[RANGE_RESULT_TYPE, removal_cause_str(cause)])
.inc();
})
.build()
@@ -1123,6 +1273,7 @@ impl CacheManagerBuilder {
RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize,
)),
index_result_cache,
+ prefilter_result_cache,
}
}
}
@@ -1551,6 +1702,127 @@ mod tests {
assert!(cache.get_selector_result(&key).is_some());
}
+ #[test]
+ fn test_prefilter_result_cache() {
+ let disabled = CacheManager::builder().build();
+ let file_id = FileId::random();
+ let key = PrefilterKey::new(
+ file_id,
+ 0,
+ None,
+ 1,
+ SmallVec::from_vec(vec!["tag_0 IN ([a])".to_string()]),
+ );
+ let selection = Arc::new(BooleanBuffer::new_set(3));
+
+ disabled.put_prefilter_result(key.clone(), selection.clone());
+ assert!(disabled.get_prefilter_result(&key).is_none());
+
+ let cache = Arc::new(
+ CacheManager::builder()
+ .prefilter_result_cache_size(1000)
+ .build(),
+ );
+ assert!(cache.get_prefilter_result(&key).is_none());
+ cache.put_prefilter_result(key.clone(), selection.clone());
+ assert_eq!(
+ cache.get_prefilter_result(&key).unwrap().as_ref(),
+ selection.as_ref()
+ );
+
+ let enable_all = CacheStrategy::EnableAll(cache.clone());
+ assert!(enable_all.get_prefilter_result(&key).is_some());
+
+ let compaction = CacheStrategy::Compaction(cache.clone());
+ assert!(compaction.get_prefilter_result(&key).is_none());
+ compaction.put_prefilter_result(key.clone(), selection.clone());
+ assert!(cache.get_prefilter_result(&key).is_some());
+
+ let disabled_strategy = CacheStrategy::Disabled;
+ assert!(disabled_strategy.get_prefilter_result(&key).is_none());
+ disabled_strategy.put_prefilter_result(key.clone(), selection);
+ assert!(cache.get_prefilter_result(&key).is_some());
+ }
+
+ #[test]
+ fn test_prefilter_key_distinguishes_dimensions() {
+ let file_id = FileId::random();
+ let row_selection = RowSelection::from(vec![RowSelector::skip(1), RowSelector::select(3)]);
+ let other_row_selection =
+ RowSelection::from(vec![RowSelector::skip(2), RowSelector::select(2)]);
+ let row_selection = PrefilterKey::row_selection_snapshot(Some(&row_selection));
+ let other_row_selection = PrefilterKey::row_selection_snapshot(Some(&other_row_selection));
+ let base = PrefilterKey::new(
+ file_id,
+ 0,
+ row_selection.clone(),
+ 1,
+ SmallVec::from_vec(vec!["tag_0 IN ([a])".to_string()]),
+ );
+
+ assert_ne!(
+ base,
+ PrefilterKey::new(
+ FileId::random(),
+ 0,
+ row_selection.clone(),
+ 1,
+ SmallVec::from_vec(vec!["tag_0 IN ([a])".to_string()])
+ )
+ );
+ assert_ne!(
+ base,
+ PrefilterKey::new(
+ file_id,
+ 1,
+ row_selection.clone(),
+ 1,
+ SmallVec::from_vec(vec!["tag_0 IN ([a])".to_string()])
+ )
+ );
+ assert_ne!(
+ base,
+ PrefilterKey::new(
+ file_id,
+ 0,
+ other_row_selection,
+ 1,
+ SmallVec::from_vec(vec!["tag_0 IN ([a])".to_string()])
+ )
+ );
+ assert_ne!(
+ base,
+ PrefilterKey::new(
+ file_id,
+ 0,
+ row_selection.clone(),
+ 1,
+ SmallVec::from_vec(vec!["tag_0 IN ([b])".to_string()])
+ )
+ );
+ assert_ne!(
+ base,
+ PrefilterKey::new(
+ file_id,
+ 0,
+ row_selection.clone(),
+ 2,
+ SmallVec::from_vec(vec!["tag_0 IN ([a])".to_string()])
+ )
+ );
+ let pk_group = PrefilterKey::new(
+ file_id,
+ 0,
+ row_selection,
+ 1,
+ SmallVec::from_vec(vec![
+ "tag_0 IN ([a])".to_string(),
+ "tag_1 IN ([x])".to_string(),
+ ]),
+ );
+ assert_ne!(base, pk_group);
+ }
+
#[test]
fn test_range_result_cache() {
let cache = Arc::new(
diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs
index 98e97fca85..3a85ff1c65 100644
--- a/src/mito2/src/config.rs
+++ b/src/mito2/src/config.rs
@@ -117,6 +117,8 @@ pub struct MitoConfig {
pub selector_result_cache_size: ReadableSize,
/// Cache size for flat range scan results. Setting it to 0 to disable the cache.
pub range_result_cache_size: ReadableSize,
+ /// Cache size for prefilter results. Setting it to 0 to disable the cache.
+ pub prefilter_result_cache_size: ReadableSize,
/// Whether to enable the write cache.
pub enable_write_cache: bool,
/// File system path for write cache dir's root, defaults to `{data_home}`.
@@ -202,6 +204,7 @@ impl Default for MitoConfig {
page_cache_size: ReadableSize::mb(512),
selector_result_cache_size: ReadableSize::mb(512),
range_result_cache_size: ReadableSize::mb(512),
+ prefilter_result_cache_size: ReadableSize::mb(128),
enable_write_cache: false,
write_cache_path: String::new(),
write_cache_size: ReadableSize::gb(5),
@@ -330,6 +333,8 @@ impl MitoConfig {
self.page_cache_size = page_cache_size;
self.selector_result_cache_size = mem_cache_size;
self.range_result_cache_size = mem_cache_size;
+ // Use a smaller cache size because prefilter result usually should be small.
+ self.prefilter_result_cache_size = sst_meta_cache_size;
self.index.adjust_buffer_and_cache_size(sys_memory);
}
diff --git a/src/mito2/src/sst/parquet/prefilter.rs b/src/mito2/src/sst/parquet/prefilter.rs
index c98da1abac..7fb549e17d 100644
--- a/src/mito2/src/sst/parquet/prefilter.rs
+++ b/src/mito2/src/sst/parquet/prefilter.rs
@@ -26,17 +26,20 @@ use api::v1::SemanticType;
use common_recordbatch::filter::SimpleFilterEvaluator;
use datatypes::arrow::array::{Array, BinaryArray, BooleanArray, BooleanBufferBuilder};
use datatypes::arrow::buffer::BooleanBuffer;
+use datatypes::arrow::datatypes::SchemaRef;
use datatypes::arrow::record_batch::RecordBatch;
use futures::StreamExt;
use mito_codec::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter};
use parquet::arrow::ProjectionMask;
use parquet::arrow::arrow_reader::RowSelection;
use parquet::schema::types::SchemaDescriptor;
+use smallvec::{SmallVec, smallvec};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
use table::predicate::Predicate;
+use crate::cache::PrefilterKey;
use crate::error::{
ComputeArrowSnafu, DecodeSnafu, EvalPartitionFilterSnafu, NewRecordBatchSnafu,
RecordBatchSnafu, Result, UnexpectedSnafu,
@@ -285,7 +288,6 @@ pub(crate) fn build_reader_filter_plan(
expected_metadata: Option<&RegionMetadata>,
pre_filter_mode: PreFilterMode,
read_format: &FlatReadFormat,
- parquet_schema: &SchemaDescriptor,
codec: &Arc,
) -> ReaderFilterPlan {
let Some(predicate) = predicate else {
@@ -372,15 +374,27 @@ pub(crate) fn build_reader_filter_plan(
}
}
+ let pk_filter_expr_strs = (!pk_filter_contexts.is_empty()).then(|| {
+ let mut expr_strs = pk_filter_contexts
+ .iter()
+ .map(|filter_ctx| filter_ctx.expr_str().to_string())
+ .collect::>();
+ expr_strs.sort();
+ SmallVec::from_vec(expr_strs)
+ });
let pk_filter_exprs =
(!primary_key_filters.is_empty()).then_some(Arc::new(primary_key_filters));
+ let schema_version = expected_metadata
+ .map(|metadata| metadata.schema_version)
+ .unwrap_or_else(|| read_format.metadata().schema_version);
let prefilter_builder = PrefilterContextBuilder::new(
read_format,
codec,
pk_filter_exprs,
+ pk_filter_expr_strs,
prefilter_simple_filters.clone(),
prefilter_physical_filters,
- parquet_schema,
+ schema_version,
);
if prefilter_builder.is_some() {
@@ -402,8 +416,6 @@ pub(crate) fn build_reader_filter_plan(
/// Context for prefiltering a row group.
pub(crate) struct PrefilterContext {
- /// Projection mask for reading prefilter columns.
- projection: ProjectionMask,
/// Optional PK filter for legacy primary-key-format parquet.
pk_filter: Option>,
/// Simple filters that can be evaluated directly from the prefilter batch.
@@ -411,6 +423,12 @@ pub(crate) struct PrefilterContext {
/// Physical filters that can be evaluated directly from the prefilter batch.
/// Physical expressions are only applied in the prefilter phase.
physical_filters: Vec,
+ /// Region schema version used in per-filter cache keys.
+ schema_version: u64,
+ /// Sorted expression strings for the encoded-PK filter group.
+ pk_filter_expr_strs: Option>,
+ /// Arrow schema used to build narrowed prefilter projections.
+ arrow_schema: SchemaRef,
}
/// Pre-built state for constructing [PrefilterContext] per row group.
@@ -419,12 +437,14 @@ pub(crate) struct PrefilterContext {
/// are computed once. A fresh [PrefilterContext] with its own mutable PK filter
/// is created via [PrefilterContextBuilder::build()] for each row group.
pub(crate) struct PrefilterContextBuilder {
- projection: ProjectionMask,
pk_filters: Option>>,
+ pk_filter_expr_strs: Option>,
filters: Vec,
physical_filters: Vec,
codec: Arc,
metadata: RegionMetadataRef,
+ schema_version: u64,
+ arrow_schema: SchemaRef,
}
impl PrefilterContextBuilder {
@@ -438,9 +458,10 @@ impl PrefilterContextBuilder {
read_format: &FlatReadFormat,
codec: &Arc,
primary_key_filters: Option>>,
+ primary_key_filter_expr_strs: Option>,
filters: Vec,
physical_filters: Vec,
- parquet_schema: &SchemaDescriptor,
+ schema_version: u64,
) -> Option {
let metadata = read_format.metadata();
let use_raw_tag_columns = read_format.batch_has_raw_pk_columns();
@@ -448,6 +469,10 @@ impl PrefilterContextBuilder {
.then_some(primary_key_filters)
.flatten()
.filter(|filters| !filters.is_empty());
+ let pk_filter_expr_strs = pk_filters
+ .is_some()
+ .then_some(primary_key_filter_expr_strs)
+ .flatten();
let mut prefilter_column_names = HashSet::new();
for filter_ctx in &filters {
@@ -464,11 +489,8 @@ impl PrefilterContextBuilder {
prefilter_column_names.insert(filter_ctx.column_name().to_string());
}
- let (projection, prefilter_count) = compute_projection_mask(
- &prefilter_column_names,
- read_format.arrow_schema(),
- parquet_schema,
- );
+ let prefilter_count =
+ compute_projection_count(&prefilter_column_names, read_format.arrow_schema());
if prefilter_count == 0 {
return None;
@@ -487,12 +509,14 @@ impl PrefilterContextBuilder {
}
Some(Self {
- projection,
pk_filters,
+ pk_filter_expr_strs,
filters,
physical_filters,
codec: Arc::clone(codec),
metadata: metadata.clone(),
+ schema_version,
+ arrow_schema: read_format.arrow_schema().clone(),
})
}
@@ -505,10 +529,12 @@ impl PrefilterContextBuilder {
Box::new(CachedPrimaryKeyFilter::new(pk_filter)) as Box
});
PrefilterContext {
- projection: self.projection.clone(),
pk_filter,
filters: self.filters.clone(),
physical_filters: self.physical_filters.clone(),
+ schema_version: self.schema_version,
+ pk_filter_expr_strs: self.pk_filter_expr_strs.clone(),
+ arrow_schema: self.arrow_schema.clone(),
}
}
}
@@ -532,18 +558,31 @@ fn compute_projection_mask(
column_names: &HashSet,
arrow_schema: &datatypes::arrow::datatypes::SchemaRef,
parquet_schema: &SchemaDescriptor,
-) -> (ProjectionMask, usize) {
+) -> ProjectionMask {
+ ProjectionMask::roots(
+ parquet_schema,
+ projection_indices(column_names, arrow_schema),
+ )
+}
+
+fn compute_projection_count(
+ column_names: &HashSet,
+ arrow_schema: &datatypes::arrow::datatypes::SchemaRef,
+) -> usize {
+ projection_indices(column_names, arrow_schema).len()
+}
+
+fn projection_indices(
+ column_names: &HashSet,
+ arrow_schema: &datatypes::arrow::datatypes::SchemaRef,
+) -> Vec {
let mut projection_indices: Vec = column_names
.iter()
.filter_map(|name| arrow_schema.column_with_name(name).map(|(index, _)| index))
.collect();
projection_indices.sort_unstable();
projection_indices.dedup();
- let count = projection_indices.len();
- (
- ProjectionMask::roots(parquet_schema, projection_indices.iter().copied()),
- count,
- )
+ projection_indices
}
fn should_use_prefilter(
@@ -568,18 +607,121 @@ pub(crate) async fn execute_prefilter(
reader_builder: &RowGroupReaderBuilder,
build_ctx: &RowGroupBuildContext<'_>,
) -> Result {
+ let entries = build_prefilter_cache_entries(prefilter_ctx, reader_builder, build_ctx);
+
+ if entries.is_empty() {
+ return execute_prefilter_by_reading_columns(prefilter_ctx, reader_builder, build_ctx)
+ .await;
+ }
+
+ execute_prefilter_with_result_cache(prefilter_ctx, reader_builder, build_ctx, entries).await
+}
+
+async fn execute_prefilter_with_result_cache(
+ prefilter_ctx: &mut PrefilterContext,
+ reader_builder: &RowGroupReaderBuilder,
+ build_ctx: &RowGroupBuildContext<'_>,
+ entries: Vec,
+) -> Result {
+ let non_cacheable_physical = non_cacheable_physical_filters(prefilter_ctx);
+ let mut hit_mask: Option = None;
+ let mut misses = Vec::new();
+ for entry in entries {
+ let Some(key) = &entry.key else {
+ misses.push(entry);
+ continue;
+ };
+
+ if let Some(mask) = reader_builder.cache_strategy().get_prefilter_result(key) {
+ hit_mask = Some(match hit_mask {
+ Some(hit_mask) => hit_mask.bitand(mask.as_ref()),
+ None => mask.as_ref().clone(),
+ });
+ } else {
+ misses.push(entry);
+ }
+ }
+
+ if misses.is_empty() && non_cacheable_physical.is_empty() {
+ let combined_mask = hit_mask.unwrap_or_else(|| BooleanBuffer::new_set(0));
+ let refined_selection =
+ refined_selection_from_mask(&combined_mask, &build_ctx.row_selection);
+ let rows_before_filter = rows_before_filter(reader_builder, build_ctx);
+ let filtered_rows = rows_before_filter.saturating_sub(refined_selection.row_count());
+ return Ok(PrefilterResult {
+ refined_selection,
+ filtered_rows,
+ });
+ }
+
+ let mut uncached_entries = misses;
+ uncached_entries.extend(
+ non_cacheable_physical
+ .iter()
+ .copied()
+ .map(|idx| PrefilterEntry::without_cache(PrefilterEntryKind::Physical(idx))),
+ );
+ let (uncached_mask, read_rows) =
+ build_prefilter_masks(prefilter_ctx, reader_builder, build_ctx, &uncached_entries).await?;
+
+ let final_mask = match (hit_mask, uncached_mask) {
+ (Some(hit_mask), Some(uncached_mask)) => hit_mask.bitand(&uncached_mask),
+ (Some(hit_mask), None) => hit_mask,
+ (None, Some(uncached_mask)) => uncached_mask,
+ (None, None) => BooleanBuffer::new_set(read_rows),
+ };
+ debug_assert_eq!(final_mask.len(), read_rows);
+ let rows_selected = final_mask.count_set_bits();
+ let filtered_rows = read_rows.saturating_sub(rows_selected);
+ let refined_selection = refined_selection_from_mask(&final_mask, &build_ctx.row_selection);
+
+ Ok(PrefilterResult {
+ refined_selection,
+ filtered_rows,
+ })
+}
+
+fn non_cacheable_physical_filters(prefilter_ctx: &PrefilterContext) -> Vec {
+ prefilter_ctx
+ .physical_filters
+ .iter()
+ .enumerate()
+ .filter_map(|(idx, filter)| (!filter.is_immutable()).then_some(idx))
+ .collect()
+}
+
+async fn build_prefilter_masks(
+ prefilter_ctx: &mut PrefilterContext,
+ reader_builder: &RowGroupReaderBuilder,
+ build_ctx: &RowGroupBuildContext<'_>,
+ entries: &[PrefilterEntry],
+) -> Result<(Option, usize)> {
+ let prefilter_column_names = prefilter_column_names_for_entries(prefilter_ctx, entries);
+ let parquet_schema = reader_builder
+ .parquet_metadata()
+ .file_metadata()
+ .schema_descr();
+ let projection = compute_projection_mask(
+ &prefilter_column_names,
+ &prefilter_ctx.arrow_schema,
+ parquet_schema,
+ );
+
let mut stream = reader_builder
.build_with_projection(
build_ctx.row_group_idx,
build_ctx.row_selection.clone(),
- prefilter_ctx.projection.clone(),
+ projection,
build_ctx.fetch_metrics,
)
.await?;
- let mut filter_arrays = Vec::new();
+ let mut cache_builders = entries
+ .iter()
+ .map(|entry| entry.key.is_some().then(|| BooleanBufferBuilder::new(0)))
+ .collect::>();
+ let mut combined_builder = (!entries.is_empty()).then(|| BooleanBufferBuilder::new(0));
let mut rows_before_filter = 0usize;
- let mut rows_selected = 0usize;
while let Some(batch_result) = stream.next().await {
let batch = batch_result?;
@@ -589,30 +731,78 @@ pub(crate) async fn execute_prefilter(
}
rows_before_filter += num_rows;
- let batch_mask = match apply_filters_to_batch(
- &batch,
- &mut prefilter_ctx.pk_filter,
- &prefilter_ctx.filters,
- &prefilter_ctx.physical_filters,
- reader_builder.file_path(),
- )? {
- Some(mask) => mask,
- None => BooleanBuffer::new_unset(num_rows),
- };
- rows_selected += batch_mask.count_set_bits();
- filter_arrays.push(BooleanArray::from(batch_mask));
+ let mut batch_mask = BooleanBuffer::new_set(num_rows);
+ for (idx, entry) in entries.iter().enumerate() {
+ let mask = eval_entry_mask(
+ &batch,
+ prefilter_ctx,
+ entry.kind,
+ reader_builder.file_path(),
+ )?;
+ batch_mask = batch_mask.bitand(&mask);
+ if let Some(Some(builder)) = cache_builders.get_mut(idx) {
+ builder.append_buffer(&mask);
+ }
+ }
+ if let Some(builder) = &mut combined_builder {
+ builder.append_buffer(&batch_mask);
+ }
}
- let filtered_rows = rows_before_filter.saturating_sub(rows_selected);
- let refined_selection = if filter_arrays.is_empty() || rows_selected == 0 {
- RowSelection::from(vec![])
- } else {
- let prefilter_selection = RowSelection::from_filters(&filter_arrays);
- match &build_ctx.row_selection {
- Some(original) => original.and_then(&prefilter_selection),
- None => prefilter_selection,
+ for (entry, builder) in entries.iter().zip(cache_builders) {
+ if let (Some(key), Some(mut builder)) = (&entry.key, builder) {
+ reader_builder
+ .cache_strategy()
+ .put_prefilter_result(key.clone(), Arc::new(builder.finish()));
}
- };
+ }
+
+ Ok((
+ combined_builder.map(|mut builder| builder.finish()),
+ rows_before_filter,
+ ))
+}
+
+fn prefilter_column_names_for_entries(
+ prefilter_ctx: &PrefilterContext,
+ entries: &[PrefilterEntry],
+) -> HashSet {
+ let mut prefilter_column_names = HashSet::new();
+ for entry in entries {
+ match entry.kind {
+ PrefilterEntryKind::Simple(idx) => {
+ if let MaybeFilter::Filter(filter) = prefilter_ctx.filters[idx].filter() {
+ prefilter_column_names.insert(filter.column_name().to_string());
+ }
+ }
+ PrefilterEntryKind::Physical(idx) => {
+ prefilter_column_names.insert(
+ prefilter_ctx.physical_filters[idx]
+ .column_name()
+ .to_string(),
+ );
+ }
+ PrefilterEntryKind::PkGroup => {
+ prefilter_column_names.insert(PRIMARY_KEY_COLUMN_NAME.to_string());
+ }
+ }
+ }
+ prefilter_column_names
+}
+
+async fn execute_prefilter_by_reading_columns(
+ prefilter_ctx: &mut PrefilterContext,
+ reader_builder: &RowGroupReaderBuilder,
+ build_ctx: &RowGroupBuildContext<'_>,
+) -> Result {
+ let entries = all_prefilter_entries(prefilter_ctx);
+ let (mask, rows_before_filter) =
+ build_prefilter_masks(prefilter_ctx, reader_builder, build_ctx, &entries).await?;
+
+ let final_mask = mask.unwrap_or_else(|| BooleanBuffer::new_set(rows_before_filter));
+ let rows_selected = final_mask.count_set_bits();
+ let filtered_rows = rows_before_filter.saturating_sub(rows_selected);
+ let refined_selection = refined_selection_from_mask(&final_mask, &build_ctx.row_selection);
Ok(PrefilterResult {
refined_selection,
@@ -620,100 +810,243 @@ pub(crate) async fn execute_prefilter(
})
}
-fn apply_filters_to_batch(
+fn all_prefilter_entries(prefilter_ctx: &PrefilterContext) -> Vec {
+ let mut entries = Vec::new();
+ if prefilter_ctx.pk_filter.is_some() {
+ entries.push(PrefilterEntry::without_cache(PrefilterEntryKind::PkGroup));
+ }
+ entries.extend(
+ prefilter_ctx
+ .filters
+ .iter()
+ .enumerate()
+ .map(|(idx, _)| PrefilterEntry::without_cache(PrefilterEntryKind::Simple(idx))),
+ );
+ entries.extend(
+ prefilter_ctx
+ .physical_filters
+ .iter()
+ .enumerate()
+ .map(|(idx, _)| PrefilterEntry::without_cache(PrefilterEntryKind::Physical(idx))),
+ );
+ entries
+}
+
+#[derive(Clone, Copy)]
+enum PrefilterEntryKind {
+ Simple(usize),
+ Physical(usize),
+ PkGroup,
+}
+
+struct PrefilterEntry {
+ kind: PrefilterEntryKind,
+ key: Option,
+}
+
+impl PrefilterEntry {
+ fn without_cache(kind: PrefilterEntryKind) -> Self {
+ Self { kind, key: None }
+ }
+}
+
+fn build_prefilter_cache_entries(
+ prefilter_ctx: &PrefilterContext,
+ reader_builder: &RowGroupReaderBuilder,
+ build_ctx: &RowGroupBuildContext<'_>,
+) -> Vec {
+ let row_selection = PrefilterKey::row_selection_snapshot(build_ctx.row_selection.as_ref());
+ let file_id = reader_builder.file_handle().file_id().file_id();
+ let row_group_idx = build_ctx.row_group_idx as u32;
+ let mut entries = Vec::new();
+
+ for (idx, filter_ctx) in prefilter_ctx.filters.iter().enumerate() {
+ entries.push(PrefilterEntry {
+ kind: PrefilterEntryKind::Simple(idx),
+ key: Some(PrefilterKey::new(
+ file_id,
+ row_group_idx,
+ row_selection.clone(),
+ prefilter_ctx.schema_version,
+ smallvec![filter_ctx.expr_str().to_string()],
+ )),
+ });
+ }
+
+ for (idx, filter_ctx) in prefilter_ctx.physical_filters.iter().enumerate() {
+ if !filter_ctx.is_immutable() {
+ continue;
+ }
+ entries.push(PrefilterEntry {
+ kind: PrefilterEntryKind::Physical(idx),
+ key: Some(PrefilterKey::new(
+ file_id,
+ row_group_idx,
+ row_selection.clone(),
+ prefilter_ctx.schema_version,
+ smallvec![filter_ctx.expr_str().to_string()],
+ )),
+ });
+ }
+
+ if prefilter_ctx.pk_filter.is_some()
+ && let Some(exprs) = &prefilter_ctx.pk_filter_expr_strs
+ {
+ entries.push(PrefilterEntry {
+ kind: PrefilterEntryKind::PkGroup,
+ key: Some(PrefilterKey::new(
+ file_id,
+ row_group_idx,
+ row_selection,
+ prefilter_ctx.schema_version,
+ exprs.clone(),
+ )),
+ });
+ }
+
+ entries
+}
+
+fn rows_before_filter(
+ reader_builder: &RowGroupReaderBuilder,
+ build_ctx: &RowGroupBuildContext<'_>,
+) -> usize {
+ build_ctx.row_selection.as_ref().map_or_else(
+ || {
+ reader_builder
+ .parquet_metadata()
+ .row_group(build_ctx.row_group_idx)
+ .num_rows() as usize
+ },
+ RowSelection::row_count,
+ )
+}
+
+fn refined_selection_from_mask(
+ mask: &BooleanBuffer,
+ original_selection: &Option,
+) -> RowSelection {
+ if mask.is_empty() || mask.count_set_bits() == 0 {
+ return RowSelection::from(vec![]);
+ }
+
+ let prefilter_selection = RowSelection::from_filters(&[BooleanArray::from(mask.clone())]);
+ match original_selection {
+ Some(original) => original.and_then(&prefilter_selection),
+ None => prefilter_selection,
+ }
+}
+
+fn eval_entry_mask(
batch: &RecordBatch,
- pk_filter: &mut Option>,
- filters: &[SimpleFilterContext],
- physical_filters: &[PhysicalFilterContext],
+ prefilter_ctx: &mut PrefilterContext,
+ kind: PrefilterEntryKind,
file_path: &str,
-) -> Result