From d76ecf67bd013df7b9bed7bb0dd08f44841a537f Mon Sep 17 00:00:00 2001 From: LFC <990479+MichaelScofield@users.noreply.github.com> Date: Thu, 11 Jun 2026 17:48:12 +0800 Subject: [PATCH] refactor: skip prefilter for expr with functions (#8280) Signed-off-by: luofucong --- src/mito2/src/sst/parquet/reader.rs | 60 ++++++++++++++++++++++++++++- 1 file changed, 58 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index a865372ae3..ff7479c876 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -2074,10 +2074,18 @@ impl PhysicalFilterContext { /// [`SimpleFilterEvaluator`] already handles them. // TODO(yingwen): extend more expressions if necessary. For example, allow some cheap scalar functions (e.g. `lower`, `length`, date truncations) fn is_prefilter_candidate(expr: &Expr) -> bool { - matches!( + if !matches!( expr, Expr::InList(_) | Expr::IsNull(_) | Expr::IsNotNull(_) | Expr::Between(_) - ) + ) { + return false; + } + + // If any functions are found in the expr, it will be not considered as worthy enough to + // be evaluated in the prefilter. At last, prefilter reads the Parquet files one more time. + !expr + .exists(|e| Ok(matches!(e, Expr::ScalarFunction(_)))) + .unwrap_or(false) } fn single_column_name(expr: &Expr) -> Option { @@ -2306,6 +2314,8 @@ mod tests { use std::sync::{Arc, LazyLock}; use common_error::ext::WhateverResult; + use common_function::scalars::json::json_get::JsonGetWithType; + use common_function::scalars::udf::create_udf; use common_recordbatch::ext::RecordBatchExt; use datafusion::arrow::datatypes::DataType; use datafusion_common::ScalarValue; @@ -2333,6 +2343,52 @@ mod tests { use crate::sst::parquet::read_columns::{ParquetReadColumn, ParquetReadColumns}; use crate::test_util::sst_util::{sst_file_handle, sst_region_metadata}; + #[test] + fn test_skip_prefilter_for_json_get() -> WhateverResult<()> { + fn json_get_expr(base: Expr, path: &str) -> Expr { + let json_get = Arc::new(create_udf(Arc::new(JsonGetWithType::default()))); + Expr::ScalarFunction(ScalarFunction::new_udf(json_get, vec![base, lit(path)])) + } + + let metadata = Arc::new(sst_region_metadata()); + let format = FlatReadFormat::new( + metadata.clone(), + ReadColumns::from_deduped_column_ids( + metadata.column_metadatas.iter().map(|c| c.column_id), + ), + None, + "test", + true, + )?; + let new_filter = + |expr: Expr| PhysicalFilterContext::new_opt(&metadata, None, &format, &expr); + + let json_get = || json_get_expr(col("field_0"), "a.b"); + + let regular_expr = col("field_0").is_null(); + assert!(new_filter(regular_expr).is_some()); + + let is_null = json_get().is_null(); + assert!(new_filter(is_null).is_none()); + + let is_not_null = json_get().is_not_null(); + assert!(new_filter(is_not_null).is_none()); + + let in_list = json_get().in_list(vec![lit("value")], false); + assert!(new_filter(in_list).is_none()); + + let in_list_nested = col("field_0").in_list(vec![json_get()], false); + assert!(new_filter(in_list_nested).is_none()); + + let between = json_get().between(lit(1_u64), lit(10_u64)); + assert!(new_filter(between).is_none()); + + let between_nested = col("field_0").between(json_get(), lit(10_u64)); + assert!(new_filter(between_nested).is_none()); + + Ok(()) + } + #[tokio::test] async fn test_nested_projection_reads_partial_json2_physical_fields() -> WhateverResult<()> { // Write a full JSON2-like Arrow struct: