feat: implement a cache for the prefilter (#8102)

* feat: cache parquet prefilter results

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: set result cache size

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: rename is_stable to is_immutable and reject ScalarVariable

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: typo

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: use capacity() for prefilter key memory accounting

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: per filter cache

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: support other variants in MaybeFilter

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: split compute_projection_mask

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: build_prefilter_masks takes PrefilterEntry

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2026-05-25 11:10:12 +08:00
committed by GitHub
parent 8c267f3844
commit e1e75b3ffe
10 changed files with 973 additions and 179 deletions

View File

@@ -155,6 +155,8 @@
| `region_engine.mito.vector_cache_size` | String | Auto | Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.page_cache_size` | String | Auto | Cache size for pages of SST row groups. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/8 of OS memory. |
| `region_engine.mito.selector_result_cache_size` | String | Auto | Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.range_result_cache_size` | String | Auto | Cache size for flat range scan results. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.prefilter_result_cache_size` | String | Auto | Cache size for prefilter results. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/32 of OS memory with a max limitation of 128MB. |
| `region_engine.mito.enable_write_cache` | Bool | `false` | Whether to enable the write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. |
| `region_engine.mito.write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}`. |
| `region_engine.mito.write_cache_size` | String | `5GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |
@@ -543,6 +545,8 @@
| `region_engine.mito.vector_cache_size` | String | Auto | Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.page_cache_size` | String | Auto | Cache size for pages of SST row groups. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/8 of OS memory. |
| `region_engine.mito.selector_result_cache_size` | String | Auto | Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.range_result_cache_size` | String | Auto | Cache size for flat range scan results. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.prefilter_result_cache_size` | String | Auto | Cache size for prefilter results. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/32 of OS memory with a max limitation of 128MB. |
| `region_engine.mito.enable_write_cache` | Bool | `false` | Whether to enable the write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. |
| `region_engine.mito.write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}`. |
| `region_engine.mito.write_cache_size` | String | `5GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |

View File

@@ -480,6 +480,16 @@ auto_flush_interval = "1h"
## @toml2docs:none-default="Auto"
#+ selector_result_cache_size = "512MB"
## Cache size for flat range scan results. Setting it to 0 to disable the cache.
## If not set, it's default to 1/16 of OS memory with a max limitation of 512MB.
## @toml2docs:none-default="Auto"
#+ range_result_cache_size = "512MB"
## Cache size for prefilter results. Setting it to 0 to disable the cache.
## If not set, it's default to 1/32 of OS memory with a max limitation of 128MB.
## @toml2docs:none-default="Auto"
#+ prefilter_result_cache_size = "128MB"
## Whether to enable the write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance.
enable_write_cache = false

View File

@@ -599,6 +599,16 @@ auto_flush_interval = "1h"
## @toml2docs:none-default="Auto"
#+ selector_result_cache_size = "512MB"
## Cache size for flat range scan results. Setting it to 0 to disable the cache.
## If not set, it's default to 1/16 of OS memory with a max limitation of 512MB.
## @toml2docs:none-default="Auto"
#+ range_result_cache_size = "512MB"
## Cache size for prefilter results. Setting it to 0 to disable the cache.
## If not set, it's default to 1/32 of OS memory with a max limitation of 128MB.
## @toml2docs:none-default="Auto"
#+ prefilter_result_cache_size = "128MB"
## Whether to enable the write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance.
enable_write_cache = false

View File

@@ -588,6 +588,8 @@ async fn build_cache_manager(
.vector_cache_size(config.vector_cache_size.as_bytes())
.page_cache_size(config.page_cache_size.as_bytes())
.selector_result_cache_size(config.selector_result_cache_size.as_bytes())
.range_result_cache_size(config.range_result_cache_size.as_bytes())
.prefilter_result_cache_size(config.prefilter_result_cache_size.as_bytes())
.index_metadata_size(config.index.metadata_cache_size.as_bytes())
.index_content_size(config.index.content_cache_size.as_bytes())
.index_content_page_size(config.index.content_cache_page_size.as_bytes())

View File

@@ -30,6 +30,7 @@ use std::sync::Arc;
use bytes::Bytes;
use common_base::readable_size::ReadableSize;
use common_telemetry::warn;
use datatypes::arrow::buffer::BooleanBuffer;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
@@ -38,8 +39,10 @@ use index::result_cache::IndexResultCache;
use moka::notification::RemovalCause;
use moka::sync::Cache;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::file::metadata::{FileMetaData, PageIndexPolicy, ParquetMetaData};
use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
use smallvec::SmallVec;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelector};
@@ -74,6 +77,8 @@ const INDEX_TYPE: &str = "index";
const SELECTOR_RESULT_TYPE: &str = "selector_result";
/// Metrics type key for range scan result cache.
const RANGE_RESULT_TYPE: &str = "range_result";
/// Metrics type key for prefilter result cache.
const PREFILTER_RESULT_TYPE: &str = "prefilter_result";
const RANGE_RESULT_CONCAT_MEMORY_LIMIT: ReadableSize = ReadableSize::mb(512);
const RANGE_RESULT_CONCAT_MEMORY_PERMIT: ReadableSize = ReadableSize::kb(1);
@@ -274,6 +279,117 @@ fn strip_region_metadata_from_parquet(parquet_metadata: ParquetMetaData) -> Parq
.build()
}
fn removal_cause_str(cause: RemovalCause) -> &'static str {
match cause {
RemovalCause::Expired => "expired",
RemovalCause::Explicit => "explicit",
RemovalCause::Replaced => "replaced",
RemovalCause::Size => "size",
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub(crate) struct PrefilterRowSelector {
row_count: usize,
skip: bool,
}
// `parquet::arrow::arrow_reader::RowSelector` does not implement `Hash`, but
// prefilter cache keys must hash the upstream row-selection snapshot. Keep a
// local hashable mirror of the two fields that define selector semantics.
// TODO(yingwen): Remove this mirror if upstream `RowSelector` implements `Hash`.
impl From<&RowSelector> for PrefilterRowSelector {
fn from(selector: &RowSelector) -> Self {
Self {
row_count: selector.row_count,
skip: selector.skip,
}
}
}
/// Key for a cached prefilter result.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub(crate) struct PrefilterKey {
file_id: FileId,
row_group_idx: u32,
row_selection: Option<Arc<Vec<PrefilterRowSelector>>>,
schema_version: u64,
filter_exprs: SmallVec<[String; 1]>,
mem_usage: usize,
}
impl PrefilterKey {
pub(crate) fn row_selection_snapshot(
row_selection: Option<&RowSelection>,
) -> Option<Arc<Vec<PrefilterRowSelector>>> {
row_selection.map(|selection| {
Arc::new(
selection
.iter()
.map(PrefilterRowSelector::from)
.collect::<Vec<_>>(),
)
})
}
pub(crate) fn new(
file_id: FileId,
row_group_idx: u32,
row_selection: Option<Arc<Vec<PrefilterRowSelector>>>,
schema_version: u64,
filter_exprs: SmallVec<[String; 1]>,
) -> Self {
let row_selection_bytes = row_selection
.as_ref()
.map(|selection| selection.len() * mem::size_of::<PrefilterRowSelector>())
.unwrap_or(0);
let spilled_expr_bytes = if filter_exprs.spilled() {
filter_exprs.capacity() * mem::size_of::<String>()
} else {
0
};
let expr_bytes = filter_exprs.iter().map(|s| s.capacity()).sum::<usize>();
Self {
file_id,
row_group_idx,
row_selection,
schema_version,
filter_exprs,
mem_usage: mem::size_of::<Self>()
+ row_selection_bytes
+ spilled_expr_bytes
+ expr_bytes,
}
}
fn mem_usage(&self) -> usize {
self.mem_usage
}
}
type PrefilterResultCache = Cache<PrefilterKey, Arc<BooleanBuffer>>;
fn new_prefilter_result_cache(capacity: u64) -> PrefilterResultCache {
Cache::builder()
.max_capacity(capacity)
.weigher(prefilter_result_cache_weight)
.eviction_listener(|k, v, cause| {
let size = prefilter_result_cache_weight(&k, &v);
CACHE_BYTES
.with_label_values(&[PREFILTER_RESULT_TYPE])
.sub(size.into());
CACHE_EVICTION
.with_label_values(&[PREFILTER_RESULT_TYPE, removal_cause_str(cause)])
.inc();
})
.build()
}
fn prefilter_result_cache_weight(k: &PrefilterKey, v: &Arc<BooleanBuffer>) -> u32 {
(k.mem_usage() + mem::size_of::<BooleanBuffer>() + v.values().len()) as u32
}
/// Cache strategies that may only enable a subset of caches.
#[derive(Clone)]
pub enum CacheStrategy {
@@ -358,6 +474,23 @@ impl CacheStrategy {
}
}
/// Calls [CacheManager::get_prefilter_result()].
/// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
pub(crate) fn get_prefilter_result(&self, key: &PrefilterKey) -> Option<Arc<BooleanBuffer>> {
match self {
CacheStrategy::EnableAll(cache_manager) => cache_manager.get_prefilter_result(key),
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
}
}
/// Calls [CacheManager::put_prefilter_result()].
/// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
pub(crate) fn put_prefilter_result(&self, key: PrefilterKey, result: Arc<BooleanBuffer>) {
if let CacheStrategy::EnableAll(cache_manager) = self {
cache_manager.put_prefilter_result(key, result);
}
}
/// Calls [CacheManager::remove_parquet_meta_data()].
pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
match self {
@@ -610,6 +743,8 @@ pub struct CacheManager {
range_result_memory_limiter: Arc<RangeResultMemoryLimiter>,
/// Cache for index result.
index_result_cache: Option<IndexResultCache>,
/// Cache for prefilter result.
prefilter_result_cache: Option<PrefilterResultCache>,
}
pub type CacheManagerRef = Arc<CacheManager>;
@@ -908,6 +1043,21 @@ impl CacheManager {
pub(crate) fn index_result_cache(&self) -> Option<&IndexResultCache> {
self.index_result_cache.as_ref()
}
pub(crate) fn get_prefilter_result(&self, key: &PrefilterKey) -> Option<Arc<BooleanBuffer>> {
self.prefilter_result_cache
.as_ref()
.and_then(|cache| update_hit_miss(cache.get(key), PREFILTER_RESULT_TYPE))
}
pub(crate) fn put_prefilter_result(&self, key: PrefilterKey, result: Arc<BooleanBuffer>) {
if let Some(cache) = &self.prefilter_result_cache {
CACHE_BYTES
.with_label_values(&[PREFILTER_RESULT_TYPE])
.add(prefilter_result_cache_weight(&key, &result).into());
cache.insert(key, result);
}
}
}
/// Increases selector cache miss metrics.
@@ -930,6 +1080,7 @@ pub struct CacheManagerBuilder {
index_content_size: u64,
index_content_page_size: u64,
index_result_cache_size: u64,
prefilter_result_cache_size: u64,
puffin_metadata_size: u64,
write_cache: Option<WriteCacheRef>,
selector_result_cache_size: u64,
@@ -985,6 +1136,12 @@ impl CacheManagerBuilder {
self
}
/// Sets cache size for prefilter result.
pub fn prefilter_result_cache_size(mut self, bytes: u64) -> Self {
self.prefilter_result_cache_size = bytes;
self
}
/// Sets cache size for puffin metadata.
pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
self.puffin_metadata_size = bytes;
@@ -1005,15 +1162,6 @@ impl CacheManagerBuilder {
/// Builds the [CacheManager].
pub fn build(self) -> CacheManager {
fn to_str(cause: RemovalCause) -> &'static str {
match cause {
RemovalCause::Expired => "expired",
RemovalCause::Explicit => "explicit",
RemovalCause::Replaced => "replaced",
RemovalCause::Size => "size",
}
}
let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
Cache::builder()
.max_capacity(self.sst_meta_cache_size)
@@ -1024,7 +1172,7 @@ impl CacheManagerBuilder {
.with_label_values(&[SST_META_TYPE])
.sub(size.into());
CACHE_EVICTION
.with_label_values(&[SST_META_TYPE, to_str(cause)])
.with_label_values(&[SST_META_TYPE, removal_cause_str(cause)])
.inc();
})
.build()
@@ -1039,7 +1187,7 @@ impl CacheManagerBuilder {
.with_label_values(&[VECTOR_TYPE])
.sub(size.into());
CACHE_EVICTION
.with_label_values(&[VECTOR_TYPE, to_str(cause)])
.with_label_values(&[VECTOR_TYPE, removal_cause_str(cause)])
.inc();
})
.build()
@@ -1052,7 +1200,7 @@ impl CacheManagerBuilder {
let size = page_cache_weight(&k, &v);
CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
CACHE_EVICTION
.with_label_values(&[PAGE_TYPE, to_str(cause)])
.with_label_values(&[PAGE_TYPE, removal_cause_str(cause)])
.inc();
})
.build()
@@ -1073,6 +1221,8 @@ impl CacheManagerBuilder {
.then(|| Arc::new(VectorIndexCache::new(self.index_content_size)));
let index_result_cache = (self.index_result_cache_size != 0)
.then(|| IndexResultCache::new(self.index_result_cache_size));
let prefilter_result_cache = (self.prefilter_result_cache_size != 0)
.then(|| new_prefilter_result_cache(self.prefilter_result_cache_size));
let puffin_metadata_cache =
PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
@@ -1085,7 +1235,7 @@ impl CacheManagerBuilder {
.with_label_values(&[SELECTOR_RESULT_TYPE])
.sub(size.into());
CACHE_EVICTION
.with_label_values(&[SELECTOR_RESULT_TYPE, to_str(cause)])
.with_label_values(&[SELECTOR_RESULT_TYPE, removal_cause_str(cause)])
.inc();
})
.build()
@@ -1100,7 +1250,7 @@ impl CacheManagerBuilder {
.with_label_values(&[RANGE_RESULT_TYPE])
.sub(size.into());
CACHE_EVICTION
.with_label_values(&[RANGE_RESULT_TYPE, to_str(cause)])
.with_label_values(&[RANGE_RESULT_TYPE, removal_cause_str(cause)])
.inc();
})
.build()
@@ -1123,6 +1273,7 @@ impl CacheManagerBuilder {
RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize,
)),
index_result_cache,
prefilter_result_cache,
}
}
}
@@ -1551,6 +1702,127 @@ mod tests {
assert!(cache.get_selector_result(&key).is_some());
}
#[test]
fn test_prefilter_result_cache() {
let disabled = CacheManager::builder().build();
let file_id = FileId::random();
let key = PrefilterKey::new(
file_id,
0,
None,
1,
SmallVec::from_vec(vec!["tag_0 IN ([a])".to_string()]),
);
let selection = Arc::new(BooleanBuffer::new_set(3));
disabled.put_prefilter_result(key.clone(), selection.clone());
assert!(disabled.get_prefilter_result(&key).is_none());
let cache = Arc::new(
CacheManager::builder()
.prefilter_result_cache_size(1000)
.build(),
);
assert!(cache.get_prefilter_result(&key).is_none());
cache.put_prefilter_result(key.clone(), selection.clone());
assert_eq!(
cache.get_prefilter_result(&key).unwrap().as_ref(),
selection.as_ref()
);
let enable_all = CacheStrategy::EnableAll(cache.clone());
assert!(enable_all.get_prefilter_result(&key).is_some());
let compaction = CacheStrategy::Compaction(cache.clone());
assert!(compaction.get_prefilter_result(&key).is_none());
compaction.put_prefilter_result(key.clone(), selection.clone());
assert!(cache.get_prefilter_result(&key).is_some());
let disabled_strategy = CacheStrategy::Disabled;
assert!(disabled_strategy.get_prefilter_result(&key).is_none());
disabled_strategy.put_prefilter_result(key.clone(), selection);
assert!(cache.get_prefilter_result(&key).is_some());
}
#[test]
fn test_prefilter_key_distinguishes_dimensions() {
let file_id = FileId::random();
let row_selection = RowSelection::from(vec![RowSelector::skip(1), RowSelector::select(3)]);
let other_row_selection =
RowSelection::from(vec![RowSelector::skip(2), RowSelector::select(2)]);
let row_selection = PrefilterKey::row_selection_snapshot(Some(&row_selection));
let other_row_selection = PrefilterKey::row_selection_snapshot(Some(&other_row_selection));
let base = PrefilterKey::new(
file_id,
0,
row_selection.clone(),
1,
SmallVec::from_vec(vec!["tag_0 IN ([a])".to_string()]),
);
assert_ne!(
base,
PrefilterKey::new(
FileId::random(),
0,
row_selection.clone(),
1,
SmallVec::from_vec(vec!["tag_0 IN ([a])".to_string()])
)
);
assert_ne!(
base,
PrefilterKey::new(
file_id,
1,
row_selection.clone(),
1,
SmallVec::from_vec(vec!["tag_0 IN ([a])".to_string()])
)
);
assert_ne!(
base,
PrefilterKey::new(
file_id,
0,
other_row_selection,
1,
SmallVec::from_vec(vec!["tag_0 IN ([a])".to_string()])
)
);
assert_ne!(
base,
PrefilterKey::new(
file_id,
0,
row_selection.clone(),
1,
SmallVec::from_vec(vec!["tag_0 IN ([b])".to_string()])
)
);
assert_ne!(
base,
PrefilterKey::new(
file_id,
0,
row_selection.clone(),
2,
SmallVec::from_vec(vec!["tag_0 IN ([a])".to_string()])
)
);
let pk_group = PrefilterKey::new(
file_id,
0,
row_selection,
1,
SmallVec::from_vec(vec![
"tag_0 IN ([a])".to_string(),
"tag_1 IN ([x])".to_string(),
]),
);
assert_ne!(base, pk_group);
}
#[test]
fn test_range_result_cache() {
let cache = Arc::new(

View File

@@ -117,6 +117,8 @@ pub struct MitoConfig {
pub selector_result_cache_size: ReadableSize,
/// Cache size for flat range scan results. Setting it to 0 to disable the cache.
pub range_result_cache_size: ReadableSize,
/// Cache size for prefilter results. Setting it to 0 to disable the cache.
pub prefilter_result_cache_size: ReadableSize,
/// Whether to enable the write cache.
pub enable_write_cache: bool,
/// File system path for write cache dir's root, defaults to `{data_home}`.
@@ -202,6 +204,7 @@ impl Default for MitoConfig {
page_cache_size: ReadableSize::mb(512),
selector_result_cache_size: ReadableSize::mb(512),
range_result_cache_size: ReadableSize::mb(512),
prefilter_result_cache_size: ReadableSize::mb(128),
enable_write_cache: false,
write_cache_path: String::new(),
write_cache_size: ReadableSize::gb(5),
@@ -330,6 +333,8 @@ impl MitoConfig {
self.page_cache_size = page_cache_size;
self.selector_result_cache_size = mem_cache_size;
self.range_result_cache_size = mem_cache_size;
// Use a smaller cache size because prefilter result usually should be small.
self.prefilter_result_cache_size = sst_meta_cache_size;
self.index.adjust_buffer_and_cache_size(sys_memory);
}

View File

@@ -26,17 +26,20 @@ use api::v1::SemanticType;
use common_recordbatch::filter::SimpleFilterEvaluator;
use datatypes::arrow::array::{Array, BinaryArray, BooleanArray, BooleanBufferBuilder};
use datatypes::arrow::buffer::BooleanBuffer;
use datatypes::arrow::datatypes::SchemaRef;
use datatypes::arrow::record_batch::RecordBatch;
use futures::StreamExt;
use mito_codec::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter};
use parquet::arrow::ProjectionMask;
use parquet::arrow::arrow_reader::RowSelection;
use parquet::schema::types::SchemaDescriptor;
use smallvec::{SmallVec, smallvec};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
use table::predicate::Predicate;
use crate::cache::PrefilterKey;
use crate::error::{
ComputeArrowSnafu, DecodeSnafu, EvalPartitionFilterSnafu, NewRecordBatchSnafu,
RecordBatchSnafu, Result, UnexpectedSnafu,
@@ -285,7 +288,6 @@ pub(crate) fn build_reader_filter_plan(
expected_metadata: Option<&RegionMetadata>,
pre_filter_mode: PreFilterMode,
read_format: &FlatReadFormat,
parquet_schema: &SchemaDescriptor,
codec: &Arc<dyn PrimaryKeyCodec>,
) -> ReaderFilterPlan {
let Some(predicate) = predicate else {
@@ -372,15 +374,27 @@ pub(crate) fn build_reader_filter_plan(
}
}
let pk_filter_expr_strs = (!pk_filter_contexts.is_empty()).then(|| {
let mut expr_strs = pk_filter_contexts
.iter()
.map(|filter_ctx| filter_ctx.expr_str().to_string())
.collect::<Vec<_>>();
expr_strs.sort();
SmallVec::from_vec(expr_strs)
});
let pk_filter_exprs =
(!primary_key_filters.is_empty()).then_some(Arc::new(primary_key_filters));
let schema_version = expected_metadata
.map(|metadata| metadata.schema_version)
.unwrap_or_else(|| read_format.metadata().schema_version);
let prefilter_builder = PrefilterContextBuilder::new(
read_format,
codec,
pk_filter_exprs,
pk_filter_expr_strs,
prefilter_simple_filters.clone(),
prefilter_physical_filters,
parquet_schema,
schema_version,
);
if prefilter_builder.is_some() {
@@ -402,8 +416,6 @@ pub(crate) fn build_reader_filter_plan(
/// Context for prefiltering a row group.
pub(crate) struct PrefilterContext {
/// Projection mask for reading prefilter columns.
projection: ProjectionMask,
/// Optional PK filter for legacy primary-key-format parquet.
pk_filter: Option<Box<dyn PrimaryKeyFilter>>,
/// Simple filters that can be evaluated directly from the prefilter batch.
@@ -411,6 +423,12 @@ pub(crate) struct PrefilterContext {
/// Physical filters that can be evaluated directly from the prefilter batch.
/// Physical expressions are only applied in the prefilter phase.
physical_filters: Vec<PhysicalFilterContext>,
/// Region schema version used in per-filter cache keys.
schema_version: u64,
/// Sorted expression strings for the encoded-PK filter group.
pk_filter_expr_strs: Option<SmallVec<[String; 1]>>,
/// Arrow schema used to build narrowed prefilter projections.
arrow_schema: SchemaRef,
}
/// Pre-built state for constructing [PrefilterContext] per row group.
@@ -419,12 +437,14 @@ pub(crate) struct PrefilterContext {
/// are computed once. A fresh [PrefilterContext] with its own mutable PK filter
/// is created via [PrefilterContextBuilder::build()] for each row group.
pub(crate) struct PrefilterContextBuilder {
projection: ProjectionMask,
pk_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
pk_filter_expr_strs: Option<SmallVec<[String; 1]>>,
filters: Vec<SimpleFilterContext>,
physical_filters: Vec<PhysicalFilterContext>,
codec: Arc<dyn PrimaryKeyCodec>,
metadata: RegionMetadataRef,
schema_version: u64,
arrow_schema: SchemaRef,
}
impl PrefilterContextBuilder {
@@ -438,9 +458,10 @@ impl PrefilterContextBuilder {
read_format: &FlatReadFormat,
codec: &Arc<dyn PrimaryKeyCodec>,
primary_key_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
primary_key_filter_expr_strs: Option<SmallVec<[String; 1]>>,
filters: Vec<SimpleFilterContext>,
physical_filters: Vec<PhysicalFilterContext>,
parquet_schema: &SchemaDescriptor,
schema_version: u64,
) -> Option<Self> {
let metadata = read_format.metadata();
let use_raw_tag_columns = read_format.batch_has_raw_pk_columns();
@@ -448,6 +469,10 @@ impl PrefilterContextBuilder {
.then_some(primary_key_filters)
.flatten()
.filter(|filters| !filters.is_empty());
let pk_filter_expr_strs = pk_filters
.is_some()
.then_some(primary_key_filter_expr_strs)
.flatten();
let mut prefilter_column_names = HashSet::new();
for filter_ctx in &filters {
@@ -464,11 +489,8 @@ impl PrefilterContextBuilder {
prefilter_column_names.insert(filter_ctx.column_name().to_string());
}
let (projection, prefilter_count) = compute_projection_mask(
&prefilter_column_names,
read_format.arrow_schema(),
parquet_schema,
);
let prefilter_count =
compute_projection_count(&prefilter_column_names, read_format.arrow_schema());
if prefilter_count == 0 {
return None;
@@ -487,12 +509,14 @@ impl PrefilterContextBuilder {
}
Some(Self {
projection,
pk_filters,
pk_filter_expr_strs,
filters,
physical_filters,
codec: Arc::clone(codec),
metadata: metadata.clone(),
schema_version,
arrow_schema: read_format.arrow_schema().clone(),
})
}
@@ -505,10 +529,12 @@ impl PrefilterContextBuilder {
Box::new(CachedPrimaryKeyFilter::new(pk_filter)) as Box<dyn PrimaryKeyFilter>
});
PrefilterContext {
projection: self.projection.clone(),
pk_filter,
filters: self.filters.clone(),
physical_filters: self.physical_filters.clone(),
schema_version: self.schema_version,
pk_filter_expr_strs: self.pk_filter_expr_strs.clone(),
arrow_schema: self.arrow_schema.clone(),
}
}
}
@@ -532,18 +558,31 @@ fn compute_projection_mask(
column_names: &HashSet<String>,
arrow_schema: &datatypes::arrow::datatypes::SchemaRef,
parquet_schema: &SchemaDescriptor,
) -> (ProjectionMask, usize) {
) -> ProjectionMask {
ProjectionMask::roots(
parquet_schema,
projection_indices(column_names, arrow_schema),
)
}
fn compute_projection_count(
column_names: &HashSet<String>,
arrow_schema: &datatypes::arrow::datatypes::SchemaRef,
) -> usize {
projection_indices(column_names, arrow_schema).len()
}
fn projection_indices(
column_names: &HashSet<String>,
arrow_schema: &datatypes::arrow::datatypes::SchemaRef,
) -> Vec<usize> {
let mut projection_indices: Vec<usize> = column_names
.iter()
.filter_map(|name| arrow_schema.column_with_name(name).map(|(index, _)| index))
.collect();
projection_indices.sort_unstable();
projection_indices.dedup();
let count = projection_indices.len();
(
ProjectionMask::roots(parquet_schema, projection_indices.iter().copied()),
count,
)
projection_indices
}
fn should_use_prefilter(
@@ -568,18 +607,121 @@ pub(crate) async fn execute_prefilter(
reader_builder: &RowGroupReaderBuilder,
build_ctx: &RowGroupBuildContext<'_>,
) -> Result<PrefilterResult> {
let entries = build_prefilter_cache_entries(prefilter_ctx, reader_builder, build_ctx);
if entries.is_empty() {
return execute_prefilter_by_reading_columns(prefilter_ctx, reader_builder, build_ctx)
.await;
}
execute_prefilter_with_result_cache(prefilter_ctx, reader_builder, build_ctx, entries).await
}
async fn execute_prefilter_with_result_cache(
prefilter_ctx: &mut PrefilterContext,
reader_builder: &RowGroupReaderBuilder,
build_ctx: &RowGroupBuildContext<'_>,
entries: Vec<PrefilterEntry>,
) -> Result<PrefilterResult> {
let non_cacheable_physical = non_cacheable_physical_filters(prefilter_ctx);
let mut hit_mask: Option<BooleanBuffer> = None;
let mut misses = Vec::new();
for entry in entries {
let Some(key) = &entry.key else {
misses.push(entry);
continue;
};
if let Some(mask) = reader_builder.cache_strategy().get_prefilter_result(key) {
hit_mask = Some(match hit_mask {
Some(hit_mask) => hit_mask.bitand(mask.as_ref()),
None => mask.as_ref().clone(),
});
} else {
misses.push(entry);
}
}
if misses.is_empty() && non_cacheable_physical.is_empty() {
let combined_mask = hit_mask.unwrap_or_else(|| BooleanBuffer::new_set(0));
let refined_selection =
refined_selection_from_mask(&combined_mask, &build_ctx.row_selection);
let rows_before_filter = rows_before_filter(reader_builder, build_ctx);
let filtered_rows = rows_before_filter.saturating_sub(refined_selection.row_count());
return Ok(PrefilterResult {
refined_selection,
filtered_rows,
});
}
let mut uncached_entries = misses;
uncached_entries.extend(
non_cacheable_physical
.iter()
.copied()
.map(|idx| PrefilterEntry::without_cache(PrefilterEntryKind::Physical(idx))),
);
let (uncached_mask, read_rows) =
build_prefilter_masks(prefilter_ctx, reader_builder, build_ctx, &uncached_entries).await?;
let final_mask = match (hit_mask, uncached_mask) {
(Some(hit_mask), Some(uncached_mask)) => hit_mask.bitand(&uncached_mask),
(Some(hit_mask), None) => hit_mask,
(None, Some(uncached_mask)) => uncached_mask,
(None, None) => BooleanBuffer::new_set(read_rows),
};
debug_assert_eq!(final_mask.len(), read_rows);
let rows_selected = final_mask.count_set_bits();
let filtered_rows = read_rows.saturating_sub(rows_selected);
let refined_selection = refined_selection_from_mask(&final_mask, &build_ctx.row_selection);
Ok(PrefilterResult {
refined_selection,
filtered_rows,
})
}
fn non_cacheable_physical_filters(prefilter_ctx: &PrefilterContext) -> Vec<usize> {
prefilter_ctx
.physical_filters
.iter()
.enumerate()
.filter_map(|(idx, filter)| (!filter.is_immutable()).then_some(idx))
.collect()
}
async fn build_prefilter_masks(
prefilter_ctx: &mut PrefilterContext,
reader_builder: &RowGroupReaderBuilder,
build_ctx: &RowGroupBuildContext<'_>,
entries: &[PrefilterEntry],
) -> Result<(Option<BooleanBuffer>, usize)> {
let prefilter_column_names = prefilter_column_names_for_entries(prefilter_ctx, entries);
let parquet_schema = reader_builder
.parquet_metadata()
.file_metadata()
.schema_descr();
let projection = compute_projection_mask(
&prefilter_column_names,
&prefilter_ctx.arrow_schema,
parquet_schema,
);
let mut stream = reader_builder
.build_with_projection(
build_ctx.row_group_idx,
build_ctx.row_selection.clone(),
prefilter_ctx.projection.clone(),
projection,
build_ctx.fetch_metrics,
)
.await?;
let mut filter_arrays = Vec::new();
let mut cache_builders = entries
.iter()
.map(|entry| entry.key.is_some().then(|| BooleanBufferBuilder::new(0)))
.collect::<Vec<_>>();
let mut combined_builder = (!entries.is_empty()).then(|| BooleanBufferBuilder::new(0));
let mut rows_before_filter = 0usize;
let mut rows_selected = 0usize;
while let Some(batch_result) = stream.next().await {
let batch = batch_result?;
@@ -589,30 +731,78 @@ pub(crate) async fn execute_prefilter(
}
rows_before_filter += num_rows;
let batch_mask = match apply_filters_to_batch(
&batch,
&mut prefilter_ctx.pk_filter,
&prefilter_ctx.filters,
&prefilter_ctx.physical_filters,
reader_builder.file_path(),
)? {
Some(mask) => mask,
None => BooleanBuffer::new_unset(num_rows),
};
rows_selected += batch_mask.count_set_bits();
filter_arrays.push(BooleanArray::from(batch_mask));
let mut batch_mask = BooleanBuffer::new_set(num_rows);
for (idx, entry) in entries.iter().enumerate() {
let mask = eval_entry_mask(
&batch,
prefilter_ctx,
entry.kind,
reader_builder.file_path(),
)?;
batch_mask = batch_mask.bitand(&mask);
if let Some(Some(builder)) = cache_builders.get_mut(idx) {
builder.append_buffer(&mask);
}
}
if let Some(builder) = &mut combined_builder {
builder.append_buffer(&batch_mask);
}
}
let filtered_rows = rows_before_filter.saturating_sub(rows_selected);
let refined_selection = if filter_arrays.is_empty() || rows_selected == 0 {
RowSelection::from(vec![])
} else {
let prefilter_selection = RowSelection::from_filters(&filter_arrays);
match &build_ctx.row_selection {
Some(original) => original.and_then(&prefilter_selection),
None => prefilter_selection,
for (entry, builder) in entries.iter().zip(cache_builders) {
if let (Some(key), Some(mut builder)) = (&entry.key, builder) {
reader_builder
.cache_strategy()
.put_prefilter_result(key.clone(), Arc::new(builder.finish()));
}
};
}
Ok((
combined_builder.map(|mut builder| builder.finish()),
rows_before_filter,
))
}
fn prefilter_column_names_for_entries(
prefilter_ctx: &PrefilterContext,
entries: &[PrefilterEntry],
) -> HashSet<String> {
let mut prefilter_column_names = HashSet::new();
for entry in entries {
match entry.kind {
PrefilterEntryKind::Simple(idx) => {
if let MaybeFilter::Filter(filter) = prefilter_ctx.filters[idx].filter() {
prefilter_column_names.insert(filter.column_name().to_string());
}
}
PrefilterEntryKind::Physical(idx) => {
prefilter_column_names.insert(
prefilter_ctx.physical_filters[idx]
.column_name()
.to_string(),
);
}
PrefilterEntryKind::PkGroup => {
prefilter_column_names.insert(PRIMARY_KEY_COLUMN_NAME.to_string());
}
}
}
prefilter_column_names
}
async fn execute_prefilter_by_reading_columns(
prefilter_ctx: &mut PrefilterContext,
reader_builder: &RowGroupReaderBuilder,
build_ctx: &RowGroupBuildContext<'_>,
) -> Result<PrefilterResult> {
let entries = all_prefilter_entries(prefilter_ctx);
let (mask, rows_before_filter) =
build_prefilter_masks(prefilter_ctx, reader_builder, build_ctx, &entries).await?;
let final_mask = mask.unwrap_or_else(|| BooleanBuffer::new_set(rows_before_filter));
let rows_selected = final_mask.count_set_bits();
let filtered_rows = rows_before_filter.saturating_sub(rows_selected);
let refined_selection = refined_selection_from_mask(&final_mask, &build_ctx.row_selection);
Ok(PrefilterResult {
refined_selection,
@@ -620,100 +810,243 @@ pub(crate) async fn execute_prefilter(
})
}
fn apply_filters_to_batch(
fn all_prefilter_entries(prefilter_ctx: &PrefilterContext) -> Vec<PrefilterEntry> {
let mut entries = Vec::new();
if prefilter_ctx.pk_filter.is_some() {
entries.push(PrefilterEntry::without_cache(PrefilterEntryKind::PkGroup));
}
entries.extend(
prefilter_ctx
.filters
.iter()
.enumerate()
.map(|(idx, _)| PrefilterEntry::without_cache(PrefilterEntryKind::Simple(idx))),
);
entries.extend(
prefilter_ctx
.physical_filters
.iter()
.enumerate()
.map(|(idx, _)| PrefilterEntry::without_cache(PrefilterEntryKind::Physical(idx))),
);
entries
}
#[derive(Clone, Copy)]
enum PrefilterEntryKind {
Simple(usize),
Physical(usize),
PkGroup,
}
struct PrefilterEntry {
kind: PrefilterEntryKind,
key: Option<PrefilterKey>,
}
impl PrefilterEntry {
fn without_cache(kind: PrefilterEntryKind) -> Self {
Self { kind, key: None }
}
}
fn build_prefilter_cache_entries(
prefilter_ctx: &PrefilterContext,
reader_builder: &RowGroupReaderBuilder,
build_ctx: &RowGroupBuildContext<'_>,
) -> Vec<PrefilterEntry> {
let row_selection = PrefilterKey::row_selection_snapshot(build_ctx.row_selection.as_ref());
let file_id = reader_builder.file_handle().file_id().file_id();
let row_group_idx = build_ctx.row_group_idx as u32;
let mut entries = Vec::new();
for (idx, filter_ctx) in prefilter_ctx.filters.iter().enumerate() {
entries.push(PrefilterEntry {
kind: PrefilterEntryKind::Simple(idx),
key: Some(PrefilterKey::new(
file_id,
row_group_idx,
row_selection.clone(),
prefilter_ctx.schema_version,
smallvec![filter_ctx.expr_str().to_string()],
)),
});
}
for (idx, filter_ctx) in prefilter_ctx.physical_filters.iter().enumerate() {
if !filter_ctx.is_immutable() {
continue;
}
entries.push(PrefilterEntry {
kind: PrefilterEntryKind::Physical(idx),
key: Some(PrefilterKey::new(
file_id,
row_group_idx,
row_selection.clone(),
prefilter_ctx.schema_version,
smallvec![filter_ctx.expr_str().to_string()],
)),
});
}
if prefilter_ctx.pk_filter.is_some()
&& let Some(exprs) = &prefilter_ctx.pk_filter_expr_strs
{
entries.push(PrefilterEntry {
kind: PrefilterEntryKind::PkGroup,
key: Some(PrefilterKey::new(
file_id,
row_group_idx,
row_selection,
prefilter_ctx.schema_version,
exprs.clone(),
)),
});
}
entries
}
fn rows_before_filter(
reader_builder: &RowGroupReaderBuilder,
build_ctx: &RowGroupBuildContext<'_>,
) -> usize {
build_ctx.row_selection.as_ref().map_or_else(
|| {
reader_builder
.parquet_metadata()
.row_group(build_ctx.row_group_idx)
.num_rows() as usize
},
RowSelection::row_count,
)
}
fn refined_selection_from_mask(
mask: &BooleanBuffer,
original_selection: &Option<RowSelection>,
) -> RowSelection {
if mask.is_empty() || mask.count_set_bits() == 0 {
return RowSelection::from(vec![]);
}
let prefilter_selection = RowSelection::from_filters(&[BooleanArray::from(mask.clone())]);
match original_selection {
Some(original) => original.and_then(&prefilter_selection),
None => prefilter_selection,
}
}
fn eval_entry_mask(
batch: &RecordBatch,
pk_filter: &mut Option<Box<dyn PrimaryKeyFilter>>,
filters: &[SimpleFilterContext],
physical_filters: &[PhysicalFilterContext],
prefilter_ctx: &mut PrefilterContext,
kind: PrefilterEntryKind,
file_path: &str,
) -> Result<Option<BooleanBuffer>> {
let mut mask = BooleanBuffer::new_set(batch.num_rows());
if let Some(pk_filter) = pk_filter.as_mut() {
// Prefilter reads a reduced projection. For PK prefilter, the encoded
// primary key column is always appended as the last projected column,
// while `__sequence` and `__op_type` are not read.
let pk_column_index = batch.num_columns() - 1;
let matched_row_ranges =
matching_row_ranges_by_primary_key(batch, pk_column_index, pk_filter.as_mut())?;
let mut builder = BooleanBufferBuilder::new(batch.num_rows());
builder.append_n(batch.num_rows(), false);
for range in matched_row_ranges {
for row in range {
builder.set_bit(row, true);
}
) -> Result<BooleanBuffer> {
match kind {
PrefilterEntryKind::Simple(idx) => {
eval_simple_filter_mask(batch, &prefilter_ctx.filters[idx], file_path)
}
mask = mask.bitand(&builder.finish());
}
for filter_ctx in filters {
let filter = match filter_ctx.filter() {
MaybeFilter::Filter(filter) => filter,
MaybeFilter::Matched => continue,
MaybeFilter::Pruned => return Ok(None),
};
let (idx, _) = batch
.schema()
.column_with_name(filter.column_name())
.with_context(|| UnexpectedSnafu {
reason: format!(
"Prefilter column '{}' (id {}) not found in batch for file {}",
filter.column_name(),
filter_ctx.column_id(),
file_path
),
})?;
let column = batch.column(idx).clone();
let result = filter.evaluate_array(&column).context(RecordBatchSnafu)?;
mask = mask.bitand(&result);
}
for filter_ctx in physical_filters {
let filter = filter_ctx.filter();
let (idx, _) = batch
.schema()
.column_with_name(filter_ctx.column_name())
.with_context(|| UnexpectedSnafu {
reason: format!(
"Prefilter physical column '{}' (id {}) not found in batch for file {}",
filter_ctx.column_name(),
filter_ctx.column_id(),
file_path
),
})?;
let column = batch.column(idx).clone();
let record_batch = RecordBatch::try_new(filter_ctx.schema().clone(), vec![column])
.context(NewRecordBatchSnafu)?;
let evaluated = filter
.evaluate(&record_batch)
.context(EvalPartitionFilterSnafu)?;
let array = evaluated
.into_array(record_batch.num_rows())
.context(EvalPartitionFilterSnafu)?;
let boolean_array =
array
.as_any()
.downcast_ref::<BooleanArray>()
.context(UnexpectedSnafu {
reason: "Failed to downcast physical filter result to BooleanArray",
})?;
// Treat null results as false (filtered out); value bits are not guaranteed
// to be false for invalid entries.
let mut result = boolean_array.values().clone();
if let Some(nulls) = boolean_array.nulls() {
result = result.bitand(nulls.inner());
PrefilterEntryKind::Physical(idx) => {
eval_physical_filter_mask(batch, &prefilter_ctx.physical_filters[idx], file_path)
}
PrefilterEntryKind::PkGroup => {
let pk_filter = prefilter_ctx.pk_filter.as_mut().context(UnexpectedSnafu {
reason: "Missing primary key filter for prefilter cache entry",
})?;
eval_pk_group_mask(batch, pk_filter.as_mut())
}
mask = mask.bitand(&result);
}
}
if mask.count_set_bits() == 0 {
Ok(None)
} else {
Ok(Some(mask))
fn eval_pk_group_mask(
batch: &RecordBatch,
pk_filter: &mut dyn PrimaryKeyFilter,
) -> Result<BooleanBuffer> {
let (pk_column_index, _) = batch
.schema()
.column_with_name(PRIMARY_KEY_COLUMN_NAME)
.context(UnexpectedSnafu {
reason: "Primary key column not found in prefilter batch",
})?;
let matched_row_ranges = matching_row_ranges_by_primary_key(batch, pk_column_index, pk_filter)?;
let mut builder = BooleanBufferBuilder::new(batch.num_rows());
builder.append_n(batch.num_rows(), false);
for range in matched_row_ranges {
for row in range {
builder.set_bit(row, true);
}
}
Ok(builder.finish())
}
fn eval_simple_filter_mask(
batch: &RecordBatch,
filter_ctx: &SimpleFilterContext,
file_path: &str,
) -> Result<BooleanBuffer> {
let filter = match filter_ctx.filter() {
MaybeFilter::Filter(filter) => filter,
MaybeFilter::Matched => return Ok(BooleanBuffer::new_set(batch.num_rows())),
MaybeFilter::Pruned => return Ok(BooleanBuffer::new_unset(batch.num_rows())),
};
let (idx, _) = batch
.schema()
.column_with_name(filter.column_name())
.with_context(|| UnexpectedSnafu {
reason: format!(
"Prefilter column '{}' (id {}) not found in batch for file {}",
filter.column_name(),
filter_ctx.column_id(),
file_path
),
})?;
let column = batch.column(idx).clone();
filter.evaluate_array(&column).context(RecordBatchSnafu)
}
fn eval_physical_filter_mask(
batch: &RecordBatch,
filter_ctx: &PhysicalFilterContext,
file_path: &str,
) -> Result<BooleanBuffer> {
let filter = filter_ctx.filter();
let (idx, _) = batch
.schema()
.column_with_name(filter_ctx.column_name())
.with_context(|| UnexpectedSnafu {
reason: format!(
"Prefilter physical column '{}' (id {}) not found in batch for file {}",
filter_ctx.column_name(),
filter_ctx.column_id(),
file_path
),
})?;
let column = batch.column(idx).clone();
let record_batch = RecordBatch::try_new(filter_ctx.schema().clone(), vec![column])
.context(NewRecordBatchSnafu)?;
let evaluated = filter
.evaluate(&record_batch)
.context(EvalPartitionFilterSnafu)?;
let array = evaluated
.into_array(record_batch.num_rows())
.context(EvalPartitionFilterSnafu)?;
let boolean_array = array
.as_any()
.downcast_ref::<BooleanArray>()
.context(UnexpectedSnafu {
reason: "Failed to downcast physical filter result to BooleanArray",
})?;
// Treat null results as false (filtered out); value bits are not guaranteed
// to be false for invalid entries.
let mut result = boolean_array.values().clone();
if let Some(nulls) = boolean_array.nulls() {
result = result.bitand(nulls.inner());
}
Ok(result)
}
#[cfg(test)]
@@ -728,7 +1061,6 @@ mod tests {
};
use datatypes::arrow::datatypes::{Schema, UInt32Type};
use mito_codec::row_converter::{PrimaryKeyFilter, build_primary_key_codec};
use parquet::arrow::ArrowSchemaConverter;
use store_api::codec::PrimaryKeyEncoding;
use super::*;
@@ -800,12 +1132,6 @@ mod tests {
.collect()
}
fn parquet_schema(read_format: &FlatReadFormat) -> SchemaDescriptor {
ArrowSchemaConverter::new()
.convert(read_format.arrow_schema())
.unwrap()
}
fn new_raw_batch(primary_keys: &[&[u8]], field_values: &[u64]) -> RecordBatch {
assert_eq!(primary_keys.len(), field_values.len());
@@ -989,15 +1315,15 @@ mod tests {
)
.unwrap();
let codec = build_primary_key_codec(metadata.as_ref());
let parquet_schema = parquet_schema(&read_format);
let builder = PrefilterContextBuilder::new(
&read_format,
&codec,
None,
None,
Vec::new(),
Vec::new(),
&parquet_schema,
metadata.schema_version,
);
assert!(builder.is_none());
}
@@ -1091,7 +1417,6 @@ mod tests {
true,
)
.unwrap();
let full_parquet_schema = parquet_schema(&full_read_format);
let codec = build_primary_key_codec(metadata.as_ref());
let skip_fields_plan = build_reader_filter_plan(
@@ -1102,7 +1427,6 @@ mod tests {
None,
PreFilterMode::SkipFields,
&full_read_format,
&full_parquet_schema,
&codec,
);
assert!(skip_fields_plan.prefilter_builder.is_some());
@@ -1121,13 +1445,11 @@ mod tests {
true,
)
.unwrap();
let projected_parquet_schema = parquet_schema(&projected_read_format);
let pk_prefilter_plan = build_reader_filter_plan(
Some(&Predicate::new(vec![col("tag_0").eq(lit("a"))])),
None,
PreFilterMode::All,
&projected_read_format,
&projected_parquet_schema,
&codec,
);
assert!(pk_prefilter_plan.prefilter_builder.is_some());
@@ -1135,35 +1457,94 @@ mod tests {
}
#[test]
fn test_apply_filters_to_batch_uses_flat_tag_columns_directly() {
fn test_pk_filter_expr_strings_are_stable_under_expr_order() {
let metadata: RegionMetadataRef = Arc::new(sst_region_metadata_with_encoding(
PrimaryKeyEncoding::Sparse,
));
let read_format = FlatReadFormat::new(
metadata.clone(),
ReadColumns::from_deduped_column_ids(
metadata.column_metadatas.iter().map(|c| c.column_id),
),
None,
"test",
false,
)
.unwrap();
let codec = build_primary_key_codec(metadata.as_ref());
let expr_a = col("tag_0").eq(lit("a"));
let expr_b = col("tag_1").eq(lit("x"));
let plan_ab = build_reader_filter_plan(
Some(&Predicate::new(vec![expr_a.clone(), expr_b.clone()])),
None,
PreFilterMode::All,
&read_format,
&codec,
);
let plan_b_a = build_reader_filter_plan(
Some(&Predicate::new(vec![expr_b, expr_a])),
None,
PreFilterMode::All,
&read_format,
&codec,
);
let exprs_ab = plan_ab.prefilter_builder.unwrap().pk_filter_expr_strs;
let exprs_b_a = plan_b_a.prefilter_builder.unwrap().pk_filter_expr_strs;
assert!(exprs_ab.is_some());
assert_eq!(exprs_ab, exprs_b_a);
}
#[test]
fn test_simple_and_physical_contexts_preserve_expr_strings() {
let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
let read_format = FlatReadFormat::new(
metadata.clone(),
ReadColumns::from_deduped_column_ids(
metadata.column_metadatas.iter().map(|c| c.column_id),
),
None,
"test",
true,
)
.unwrap();
let simple_expr = col("tag_0").eq(lit("a"));
let simple = SimpleFilterContext::new_opt(&metadata, None, &simple_expr).unwrap();
assert_eq!(simple.expr_str(), format!("{simple_expr:?}"));
let physical_expr = col("field_0").in_list(vec![lit(1_u64), lit(2_u64)], false);
let physical =
PhysicalFilterContext::new_opt(&metadata, None, &read_format, &physical_expr).unwrap();
assert_eq!(physical.expr_str(), format!("{physical_expr:?}"));
}
#[test]
fn test_eval_simple_filter_mask_uses_flat_tag_columns_directly() {
let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
let filters = new_simple_filter_contexts(&metadata, &[col("tag_0").eq(lit("a"))]);
let batch = new_record_batch_with_custom_sequence(&["a", "x"], 0, 4, 1);
let mut no_pk_filter = None;
let mask = apply_filters_to_batch(&batch, &mut no_pk_filter, &filters, &[], "test")
.unwrap()
.unwrap();
let mask = eval_simple_filter_mask(&batch, &filters[0], "test").unwrap();
assert_eq!(mask.count_set_bits(), 4);
}
#[test]
fn test_apply_filters_to_batch_errors_on_missing_selected_column() {
fn test_eval_simple_filter_mask_errors_on_missing_selected_column() {
let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
let filters = new_simple_filter_contexts(&metadata, &[col("tag_0").eq(lit("a"))]);
let pk = new_primary_key(&["a", "x"]);
let batch = new_raw_batch(&[pk.as_slice()], &[10]);
let mut no_pk_filter = None;
let err =
apply_filters_to_batch(&batch, &mut no_pk_filter, &filters, &[], "test").unwrap_err();
let err = eval_simple_filter_mask(&batch, &filters[0], "test").unwrap_err();
let err = err.to_string();
assert!(err.contains("Prefilter column"));
assert!(err.contains("tag_0"));
}
#[test]
fn test_apply_filters_to_batch_evaluates_physical_filters() {
fn test_eval_physical_filter_mask_evaluates_physical_filters() {
let metadata: RegionMetadataRef =
Arc::new(sst_region_metadata_with_encoding(PrimaryKeyEncoding::Dense));
let read_format = FlatReadFormat::new(
@@ -1181,16 +1562,12 @@ mod tests {
let pk = new_primary_key(&["a", "x"]);
let batch = new_raw_batch(&[pk.as_slice(), pk.as_slice(), pk.as_slice()], &[9, 10, 11]);
let mut no_pk_filter = None;
let mask =
apply_filters_to_batch(&batch, &mut no_pk_filter, &[], &physical_filters, "test")
.unwrap()
.unwrap();
let mask = eval_physical_filter_mask(&batch, &physical_filters[0], "test").unwrap();
assert_eq!(mask.count_set_bits(), 1);
}
#[test]
fn test_apply_filters_to_batch_uses_last_projected_column_for_pk_prefilter() {
fn test_eval_pk_group_mask_finds_pk_column_by_name() {
let metadata = Arc::new(sst_region_metadata());
let filters = Arc::new(new_test_filters(&[col("tag_0").eq(lit("a"))]));
let mut pk_filter = Some(Box::new(CachedPrimaryKeyFilter::new(
@@ -1208,9 +1585,7 @@ mod tests {
&[10, 11, 12, 13],
);
let mask = apply_filters_to_batch(&batch, &mut pk_filter, &[], &[], "test")
.unwrap()
.unwrap();
let mask = eval_pk_group_mask(&batch, pk_filter.as_mut().unwrap().as_mut()).unwrap();
assert_eq!(mask.count_set_bits(), 2);
}

View File

@@ -26,8 +26,9 @@ use api::v1::SemanticType;
use common_recordbatch::filter::SimpleFilterEvaluator;
use common_telemetry::{error, tracing, warn};
use datafusion::physical_plan::PhysicalExpr;
use datafusion_expr::Expr;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_expr::utils::expr_to_columns;
use datafusion_expr::{Expr, Volatility};
use datatypes::arrow::array::ArrayRef;
use datatypes::arrow::datatypes::{Field, Schema as ArrowSchema, SchemaRef};
use datatypes::arrow::record_batch::RecordBatch;
@@ -458,7 +459,6 @@ impl ParquetReaderBuilder {
self.expected_metadata.as_deref(),
self.pre_filter_mode,
&read_format,
parquet_meta.file_metadata().schema_descr(),
&codec,
);
@@ -1862,6 +1862,8 @@ impl MaybeFilter {
pub(crate) struct SimpleFilterContext {
/// Filter to evaluate.
filter: MaybeFilter,
/// Debug string of the original logical expression.
expr_str: String,
/// Id of the column to evaluate.
column_id: ColumnId,
/// Semantic type of the column.
@@ -1879,6 +1881,7 @@ impl SimpleFilterContext {
expr: &Expr,
) -> Option<Self> {
let filter = SimpleFilterEvaluator::try_new(expr)?;
let expr_str = format!("{expr:?}");
let (column_metadata, maybe_filter) = match expected_meta {
Some(meta) => {
// Gets the column metadata from the expected metadata.
@@ -1924,6 +1927,7 @@ impl SimpleFilterContext {
Some(Self {
filter: maybe_filter,
expr_str,
column_id: column_metadata.column_id,
semantic_type: column_metadata.semantic_type,
})
@@ -1934,6 +1938,11 @@ impl SimpleFilterContext {
&self.filter
}
/// Returns the original logical expression string.
pub(crate) fn expr_str(&self) -> &str {
&self.expr_str
}
/// Returns the column id.
pub(crate) fn column_id(&self) -> ColumnId {
self.column_id
@@ -1950,6 +1959,8 @@ impl SimpleFilterContext {
pub(crate) struct PhysicalFilterContext {
/// Filter to evaluate.
filter: Arc<dyn PhysicalExpr>,
/// Debug string of the original logical expression.
expr_str: String,
/// Id of the column to evaluate.
column_id: ColumnId,
/// Name of the column to evaluate.
@@ -1958,6 +1969,8 @@ pub(crate) struct PhysicalFilterContext {
semantic_type: SemanticType,
/// Schema containing only the referenced column.
schema: SchemaRef,
/// Whether the original logical expression is immutable across queries.
immutable: bool,
}
impl PhysicalFilterContext {
@@ -1974,6 +1987,7 @@ impl PhysicalFilterContext {
if !Self::is_prefilter_candidate(expr) {
return None;
}
let expr_str = format!("{expr:?}");
let column_name = Self::single_column_name(expr)?;
let column_metadata = match expected_meta {
Some(meta) => {
@@ -1998,13 +2012,16 @@ impl PhysicalFilterContext {
error!(e; "Unable to build physical filter for {expr}, schema: {schema:?}");
})
.ok()?;
let immutable = expr_is_immutable(expr);
Some(Self {
filter: physical_expr,
expr_str,
column_id: column_metadata.column_id,
column_name,
semantic_type: column_metadata.semantic_type,
schema,
immutable,
})
}
@@ -2035,6 +2052,11 @@ impl PhysicalFilterContext {
&self.filter
}
/// Returns the original logical expression string.
pub(crate) fn expr_str(&self) -> &str {
&self.expr_str
}
/// Returns the column id.
pub(crate) fn column_id(&self) -> ColumnId {
self.column_id
@@ -2054,6 +2076,29 @@ impl PhysicalFilterContext {
pub(crate) fn schema(&self) -> &SchemaRef {
&self.schema
}
/// Returns true if the original logical expression is immutable across queries.
pub(crate) fn is_immutable(&self) -> bool {
self.immutable
}
}
fn expr_is_immutable(expr: &Expr) -> bool {
let mut is_immutable = true;
let _ = expr.apply(|expr| match expr {
Expr::ScalarFunction(function)
if function.func.signature().volatility != Volatility::Immutable =>
{
is_immutable = false;
Ok(TreeNodeRecursion::Stop)
}
Expr::ScalarVariable(_, _) => {
is_immutable = false;
Ok(TreeNodeRecursion::Stop)
}
_ => Ok(TreeNodeRecursion::Continue),
});
is_immutable
}
/// Prune a column by its default value.
@@ -2335,6 +2380,74 @@ mod tests {
assert!(!selection.is_empty());
}
#[test]
fn test_expr_is_immutable_checks_scalar_function_volatility() {
#[derive(Debug, PartialEq, Eq, Hash)]
struct TestVolatilityUdf {
name: String,
signature: Signature,
}
impl TestVolatilityUdf {
fn new(name: &str, volatility: Volatility) -> Self {
Self {
name: name.to_string(),
signature: Signature::variadic_any(volatility),
}
}
}
impl ScalarUDFImpl for TestVolatilityUdf {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
&self.name
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _arg_types: &[DataType]) -> datafusion_common::Result<DataType> {
Ok(DataType::Int64)
}
fn invoke_with_args(
&self,
_args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(1))))
}
}
let expr = |name: &str, volatility| {
Expr::ScalarFunction(ScalarFunction::new_udf(
Arc::new(ScalarUDF::new_from_impl(TestVolatilityUdf::new(
name, volatility,
))),
vec![],
))
};
assert!(expr_is_immutable(&expr(
"immutable_udf",
Volatility::Immutable
)));
assert!(!expr_is_immutable(&expr("stable_udf", Volatility::Stable)));
assert!(!expr_is_immutable(&expr(
"volatile_udf",
Volatility::Volatile
)));
let scalar_variable = Expr::ScalarVariable(
Arc::new(Field::new("@@version", DataType::Utf8, false)),
vec!["@@version".to_string()],
);
assert!(!expr_is_immutable(&scalar_variable));
}
#[tokio::test(flavor = "current_thread")]
async fn test_has_row_level_selection() {
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();

View File

@@ -208,6 +208,7 @@ impl WorkerGroup {
.page_cache_size(config.page_cache_size.as_bytes())
.selector_result_cache_size(config.selector_result_cache_size.as_bytes())
.range_result_cache_size(config.range_result_cache_size.as_bytes())
.prefilter_result_cache_size(config.prefilter_result_cache_size.as_bytes())
.index_metadata_size(config.index.metadata_cache_size.as_bytes())
.index_content_size(config.index.content_cache_size.as_bytes())
.index_content_page_size(config.index.content_cache_page_size.as_bytes())
@@ -423,6 +424,7 @@ impl WorkerGroup {
.page_cache_size(config.page_cache_size.as_bytes())
.selector_result_cache_size(config.selector_result_cache_size.as_bytes())
.range_result_cache_size(config.range_result_cache_size.as_bytes())
.prefilter_result_cache_size(config.prefilter_result_cache_size.as_bytes())
.write_cache(write_cache)
.build(),
);

View File

@@ -1719,10 +1719,11 @@ fn drop_lines_with_inconsistent_results(input: String) -> String {
"vector_cache_size =",
"page_cache_size =",
"selector_result_cache_size =",
"range_result_cache_size =",
"prefilter_result_cache_size =",
"metadata_cache_size =",
"content_cache_size =",
"result_cache_size =",
"range_result_cache_size =",
"name =",
"recovery_parallelism =",
"max_background_index_builds =",