diff --git a/Cargo.lock b/Cargo.lock index 96b5cbf5b3..cea3840b64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13194,6 +13194,7 @@ name = "table" version = "1.0.0-rc.1" dependencies = [ "api", + "arc-swap", "async-trait", "chrono", "common-base", diff --git a/src/mito2/src/memtable/bulk/context.rs b/src/mito2/src/memtable/bulk/context.rs index 70ffa827cf..c3274d30e9 100644 --- a/src/mito2/src/memtable/bulk/context.rs +++ b/src/mito2/src/memtable/bulk/context.rs @@ -84,7 +84,7 @@ impl BulkIterContext { let dyn_filters = predicate .as_ref() - .map(|pred| pred.dyn_filters().clone()) + .map(|pred| pred.dyn_filters().as_ref().clone()) .unwrap_or_default(); Ok(Self { diff --git a/src/mito2/src/read/range.rs b/src/mito2/src/read/range.rs index 894b0bb774..d667be9cb8 100644 --- a/src/mito2/src/read/range.rs +++ b/src/mito2/src/read/range.rs @@ -53,7 +53,7 @@ pub struct RowGroupIndex { /// Meta data of a partition range. /// If the scanner is [UnorderedScan], each meta only has one row group or memtable. /// If the scanner is [SeqScan], each meta may have multiple row groups and memtables. -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone)] pub(crate) struct RangeMeta { /// The time range of the range. pub(crate) time_range: FileTimeRange, @@ -450,6 +450,7 @@ impl FileRangeBuilder { } /// Builder to create mem ranges. +#[derive(Clone)] pub(crate) struct MemRangeBuilder { /// Ranges of a memtable. range: MemtableRange, diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 3b119de055..5d934afd2d 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -27,6 +27,7 @@ use common_recordbatch::filter::SimpleFilterEvaluator; use common_telemetry::tracing::Instrument; use common_telemetry::{debug, error, tracing, warn}; use common_time::range::TimestampRange; +use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr; use datafusion_common::Column; use datafusion_expr::Expr; use datafusion_expr::utils::expr_to_columns; @@ -1575,11 +1576,20 @@ impl StreamContext { .collect(); write!(f, ", \"projection\": {:?}", names)?; } - if let Some(predicate) = &self.input.predicate.predicate() - && !predicate.exprs().is_empty() - { - let exprs: Vec<_> = predicate.exprs().iter().map(|e| e.to_string()).collect(); - write!(f, ", \"filters\": {:?}", exprs)?; + if let Some(predicate) = &self.input.predicate.predicate() { + if !predicate.exprs().is_empty() { + let exprs: Vec<_> = + predicate.exprs().iter().map(|e| e.to_string()).collect(); + write!(f, ", \"filters\": {:?}", exprs)?; + } + if !predicate.dyn_filters().is_empty() { + let dyn_filters: Vec<_> = predicate + .dyn_filters() + .iter() + .map(|f| format!("{}", f)) + .collect(); + write!(f, ", \"dyn_filters\": {:?}", dyn_filters)?; + } } #[cfg(feature = "vector_index")] if let Some(vector_index_k) = self.input.vector_index_k { @@ -1602,6 +1612,31 @@ impl StreamContext { write!(f, "{:?}", InputWrapper { input: &self.input }) } + + /// Add new dynamic filters to the predicates. + /// Safe after stream creation; in-flight reads may still observe an older snapshot. + pub(crate) fn add_dyn_filter_to_predicate( + self: &Arc, + filter_exprs: Vec>, + ) -> Vec { + let mut supported = Vec::with_capacity(filter_exprs.len()); + let filter_expr = filter_exprs + .into_iter() + .filter_map(|expr| { + if let Ok(dyn_filter) = (expr as Arc) + .downcast::() + { + supported.push(true); + Some(dyn_filter) + } else { + supported.push(false); + None + } + }) + .collect(); + self.input.predicate.add_dyn_filters(filter_expr); + supported + } } /// Predicates to evaluate. @@ -1610,9 +1645,9 @@ impl StreamContext { pub struct PredicateGroup { time_filters: Option>>, /// Predicate that includes request filters and region partition expr (if any). - predicate_all: Option, + predicate_all: Predicate, /// Predicate that only includes request filters. - predicate_without_region: Option, + predicate_without_region: Predicate, /// Region partition expression restored from metadata. region_partition_expr: Option, } @@ -1654,16 +1689,8 @@ impl PredicateGroup { Some(Arc::new(time_filters)) }; - let predicate_all = if combined_exprs.is_empty() { - None - } else { - Some(Predicate::new(combined_exprs)) - }; - let predicate_without_region = if exprs.is_empty() { - None - } else { - Some(Predicate::new(exprs.to_vec())) - }; + let predicate_all = Predicate::new(combined_exprs); + let predicate_without_region = Predicate::new(exprs.to_vec()); Ok(Self { time_filters, @@ -1680,12 +1707,26 @@ impl PredicateGroup { /// Returns predicate of all exprs (including region partition expr if present). pub(crate) fn predicate(&self) -> Option<&Predicate> { - self.predicate_all.as_ref() + if self.predicate_all.is_empty() { + None + } else { + Some(&self.predicate_all) + } } /// Returns predicate that excludes region partition expr. pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> { - self.predicate_without_region.as_ref() + if self.predicate_without_region.is_empty() { + None + } else { + Some(&self.predicate_without_region) + } + } + + /// Add dynamic filters in the predicates. + pub(crate) fn add_dyn_filters(&self, dyn_filters: Vec>) { + self.predicate_all.add_dyn_filters(dyn_filters.clone()); + self.predicate_without_region.add_dyn_filters(dyn_filters); } /// Returns the region partition expr from metadata, if any. @@ -1720,6 +1761,7 @@ impl PredicateGroup { mod tests { use std::sync::Arc; + use datafusion::physical_plan::expressions::lit as physical_lit; use datafusion_expr::{col, lit}; use store_api::storage::ScanRequest; @@ -1880,4 +1922,23 @@ mod tests { ); assert!(scan_region.use_flat_format()); } + + #[test] + fn test_update_dyn_filters_with_empty_base_predicates() { + let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); + let predicate_group = PredicateGroup::new(metadata.as_ref(), &[]).unwrap(); + assert!(predicate_group.predicate().is_none()); + assert!(predicate_group.predicate_without_region().is_none()); + + let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], physical_lit(false))); + predicate_group.add_dyn_filters(vec![dyn_filter]); + + let predicate_all = predicate_group.predicate().unwrap(); + assert!(predicate_all.exprs().is_empty()); + assert_eq!(1, predicate_all.dyn_filters().len()); + + let predicate_without_region = predicate_group.predicate_without_region().unwrap(); + assert!(predicate_without_region.exprs().is_empty()); + assert_eq!(1, predicate_without_region.dyn_filters().len()); + } } diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 121d1b6c2d..c13b40d111 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -715,7 +715,14 @@ impl RegionScanner for SeqScan { .input .predicate_group() .predicate_without_region(); - predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false) + predicate.is_some() + } + + fn add_dyn_filter_to_predicate( + &mut self, + filter_exprs: Vec>, + ) -> Vec { + self.stream_ctx.add_dyn_filter_to_predicate(filter_exprs) } fn set_logical_region(&mut self, logical_region: bool) { diff --git a/src/mito2/src/read/series_scan.rs b/src/mito2/src/read/series_scan.rs index c391072fe8..2d6994d0af 100644 --- a/src/mito2/src/read/series_scan.rs +++ b/src/mito2/src/read/series_scan.rs @@ -23,6 +23,7 @@ use common_error::ext::BoxedError; use common_recordbatch::util::ChainedRecordBatchStream; use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; use common_telemetry::tracing::{self, Instrument}; +use common_telemetry::warn; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; use datatypes::arrow::array::BinaryArray; @@ -360,7 +361,14 @@ impl RegionScanner for SeriesScan { .input .predicate_group() .predicate_without_region(); - predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false) + predicate.is_some() + } + + fn add_dyn_filter_to_predicate( + &mut self, + filter_exprs: Vec>, + ) -> Vec { + self.stream_ctx.add_dyn_filter_to_predicate(filter_exprs) } fn set_logical_region(&mut self, logical_region: bool) { diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index 73ab4eb73c..2d557e8871 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -21,7 +21,7 @@ use std::time::Instant; use async_stream::{stream, try_stream}; use common_error::ext::BoxedError; use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; -use common_telemetry::tracing; +use common_telemetry::{tracing, warn}; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; use datatypes::arrow::record_batch::RecordBatch; @@ -492,8 +492,14 @@ impl RegionScanner for UnorderedScan { .input .predicate_group() .predicate_without_region(); + predicate.is_some() + } - predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false) + fn add_dyn_filter_to_predicate( + &mut self, + filter_exprs: Vec>, + ) -> Vec { + self.stream_ctx.add_dyn_filter_to_predicate(filter_exprs) } fn set_logical_region(&mut self, logical_region: bool) { diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 6445517bc8..3a5251cb1a 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -148,7 +148,7 @@ impl FileRange { ); // not costly to create a predicate here since dynamic filters are wrapped in Arc - let pred = Predicate::new(vec![]).with_dyn_filters(self.context.base.dyn_filters.clone()); + let pred = Predicate::with_dyn_filters(vec![], self.context.base.dyn_filters.clone()); pred.prune_with_stats(&stats, prune_schema.arrow_schema()) .first() @@ -440,7 +440,7 @@ pub(crate) struct RangeBase { /// Filters pushed down. pub(crate) filters: Vec, /// Dynamic filter physical exprs. - pub(crate) dyn_filters: Arc>, + pub(crate) dyn_filters: Vec>, /// Helper to read the SST. pub(crate) read_format: ReadFormat, pub(crate) expected_metadata: Option, diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 3451f24d4b..500f32ae91 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -486,9 +486,9 @@ impl ParquetReaderBuilder { }; let dyn_filters = if let Some(predicate) = &self.predicate { - predicate.dyn_filters().clone() + predicate.dyn_filters().as_ref().clone() } else { - Arc::new(vec![]) + vec![] }; let codec = build_primary_key_codec(read_format.metadata()); diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 863dd9c0f9..dc84c4afac 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -658,24 +658,125 @@ impl QueryExecutor for DatafusionQueryEngine { #[cfg(test)] mod tests { + use std::fmt; use std::sync::Arc; + use std::sync::atomic::{AtomicUsize, Ordering}; + use api::v1::SemanticType; use arrow::array::{ArrayRef, UInt64Array}; + use arrow_schema::SortOptions; use catalog::RegisterTableRequest; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, NUMBERS_TABLE_ID}; - use common_recordbatch::util; + use common_error::ext::BoxedError; + use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream, util}; + use datafusion::physical_plan::display::{DisplayAs, DisplayFormatType}; + use datafusion::physical_plan::expressions::PhysicalSortExpr; + use datafusion::physical_plan::joins::{HashJoinExec, JoinOn, PartitionMode}; + use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; + use datafusion::physical_plan::{ExecutionPlan, PhysicalExpr}; use datafusion::prelude::{col, lit}; + use datafusion_common::{JoinType, NullEquality}; + use datafusion_physical_expr::expressions::Column; use datatypes::prelude::ConcreteDataType; - use datatypes::schema::ColumnSchema; + use datatypes::schema::{ColumnSchema, SchemaRef}; use datatypes::vectors::{Helper, UInt32Vector, VectorRef}; use session::context::{QueryContext, QueryContextBuilder}; + use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef}; + use store_api::region_engine::{ + PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties, + }; + use store_api::storage::{RegionId, ScanRequest}; use table::table::numbers::{NUMBERS_TABLE_NAME, NumbersTable}; + use table::table::scan::RegionScanExec; use super::*; use crate::options::QueryOptions; use crate::parser::QueryLanguageParser; + use crate::part_sort::PartSortExec; use crate::query_engine::{QueryEngineFactory, QueryEngineRef}; + #[derive(Debug)] + struct RecordingScanner { + schema: SchemaRef, + metadata: RegionMetadataRef, + properties: ScannerProperties, + update_calls: Arc, + last_filter_len: Arc, + } + + impl RecordingScanner { + fn new( + schema: SchemaRef, + metadata: RegionMetadataRef, + update_calls: Arc, + last_filter_len: Arc, + ) -> Self { + Self { + schema, + metadata, + properties: ScannerProperties::default(), + update_calls, + last_filter_len, + } + } + } + + impl RegionScanner for RecordingScanner { + fn name(&self) -> &str { + "RecordingScanner" + } + + fn properties(&self) -> &ScannerProperties { + &self.properties + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn metadata(&self) -> RegionMetadataRef { + self.metadata.clone() + } + + fn prepare(&mut self, request: PrepareRequest) -> std::result::Result<(), BoxedError> { + self.properties.prepare(request); + Ok(()) + } + + fn scan_partition( + &self, + _ctx: &QueryScanContext, + _metrics_set: &ExecutionPlanMetricsSet, + _partition: usize, + ) -> std::result::Result { + Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone()))) + } + + fn has_predicate_without_region(&self) -> bool { + true + } + + fn add_dyn_filter_to_predicate( + &mut self, + filter_exprs: Vec>, + ) -> Vec { + self.update_calls.fetch_add(1, Ordering::Relaxed); + self.last_filter_len + .store(filter_exprs.len(), Ordering::Relaxed); + vec![true; filter_exprs.len()] + } + + fn set_logical_region(&mut self, logical_region: bool) { + self.properties.set_logical_region(logical_region); + } + } + + impl DisplayAs for RecordingScanner { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "RecordingScanner") + } + } + async fn create_test_engine() -> QueryEngineRef { let catalog_manager = catalog::memory::new_memory_catalog_manager().unwrap(); let req = RegisterTableRequest { @@ -827,4 +928,179 @@ mod tests { format!("{}", logical_plan.display_indent()) ); } + + #[tokio::test] + async fn test_topk_dynamic_filter_pushdown_reaches_region_scan() { + let engine = create_test_engine().await; + let engine = engine + .as_any() + .downcast_ref::() + .unwrap(); + let engine_ctx = engine.engine_context(QueryContext::arc()); + let state = engine_ctx.state(); + + let schema = Arc::new(datatypes::schema::Schema::new(vec![ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + )])); + + let mut metadata_builder = RegionMetadataBuilder::new(RegionId::new(1024, 1)); + metadata_builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + semantic_type: SemanticType::Timestamp, + column_id: 1, + }) + .primary_key(vec![]); + let metadata = Arc::new(metadata_builder.build().unwrap()); + + let update_calls = Arc::new(AtomicUsize::new(0)); + let last_filter_len = Arc::new(AtomicUsize::new(0)); + let scanner = Box::new(RecordingScanner::new( + schema, + metadata, + update_calls.clone(), + last_filter_len.clone(), + )); + let scan = Arc::new(RegionScanExec::new(scanner, ScanRequest::default(), None).unwrap()); + + let sort_expr = PhysicalSortExpr { + expr: Arc::new(Column::new("ts", 0)), + options: SortOptions { + descending: true, + ..Default::default() + }, + }; + let partition_ranges: Vec> = vec![vec![]]; + let mut plan: Arc = + Arc::new(PartSortExec::try_new(sort_expr, Some(3), partition_ranges, scan).unwrap()); + + for optimizer in state.physical_optimizers() { + plan = optimizer.optimize(plan, state.config_options()).unwrap(); + } + + assert!(update_calls.load(Ordering::Relaxed) > 0); + assert!(last_filter_len.load(Ordering::Relaxed) > 0); + } + + #[tokio::test] + async fn test_join_dynamic_filter_pushdown_reaches_region_scan() { + let engine = create_test_engine().await; + let engine = engine + .as_any() + .downcast_ref::() + .unwrap(); + let engine_ctx = engine.engine_context(QueryContext::arc()); + let state = engine_ctx.state(); + + assert!( + state + .config_options() + .optimizer + .enable_join_dynamic_filter_pushdown + ); + + let schema = Arc::new(datatypes::schema::Schema::new(vec![ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + )])); + + let mut left_metadata_builder = RegionMetadataBuilder::new(RegionId::new(2048, 1)); + left_metadata_builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + semantic_type: SemanticType::Timestamp, + column_id: 1, + }) + .primary_key(vec![]); + let left_metadata = Arc::new(left_metadata_builder.build().unwrap()); + + let mut right_metadata_builder = RegionMetadataBuilder::new(RegionId::new(2048, 2)); + right_metadata_builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + semantic_type: SemanticType::Timestamp, + column_id: 1, + }) + .primary_key(vec![]); + let right_metadata = Arc::new(right_metadata_builder.build().unwrap()); + + let left_update_calls = Arc::new(AtomicUsize::new(0)); + let left_last_filter_len = Arc::new(AtomicUsize::new(0)); + let right_update_calls = Arc::new(AtomicUsize::new(0)); + let right_last_filter_len = Arc::new(AtomicUsize::new(0)); + + let left_scan = Arc::new( + RegionScanExec::new( + Box::new(RecordingScanner::new( + schema.clone(), + left_metadata, + left_update_calls.clone(), + left_last_filter_len.clone(), + )), + ScanRequest::default(), + None, + ) + .unwrap(), + ); + let right_scan = Arc::new( + RegionScanExec::new( + Box::new(RecordingScanner::new( + schema, + right_metadata, + right_update_calls.clone(), + right_last_filter_len.clone(), + )), + ScanRequest::default(), + None, + ) + .unwrap(), + ); + + let on: JoinOn = vec![( + Arc::new(Column::new("ts", 0)) as Arc, + Arc::new(Column::new("ts", 0)) as Arc, + )]; + + let mut plan: Arc = Arc::new( + HashJoinExec::try_new( + left_scan, + right_scan, + on, + None, + &JoinType::Inner, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNull, + false, + ) + .unwrap(), + ); + + for optimizer in state.physical_optimizers() { + plan = optimizer.optimize(plan, state.config_options()).unwrap(); + } + + assert!(left_update_calls.load(Ordering::Relaxed) > 0); + assert_eq!(0, left_last_filter_len.load(Ordering::Relaxed)); + assert!(right_update_calls.load(Ordering::Relaxed) > 0); + assert!(right_last_filter_len.load(Ordering::Relaxed) > 0); + } } diff --git a/src/query/src/part_sort.rs b/src/query/src/part_sort.rs index b0a27f3b8c..e12479cc5a 100644 --- a/src/query/src/part_sort.rs +++ b/src/query/src/part_sort.rs @@ -237,6 +237,7 @@ impl ExecutionPlan for PartSortExec { } else { internal_err!("No children found")? }; + // create a new dynamic filter when with_new_children, as the old filter is bound to the old input and cannot be reused let new = Self::try_new( self.expression.clone(), self.limit, diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index 4ce8684c77..a45fc4c896 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -192,6 +192,10 @@ impl QueryEngineState { physical_optimizer .rules .push(Arc::new(WindowedSortPhysicalRule)); + // explicitly not do filter pushdown for windowed sort&part sort + // (notice that `PartSortExec` create another new dyn filter that need to be pushdown if want to use dyn filter optimization) + // benchmark shows it can cause performance regression due to useless filtering and extra shuffle. + // We can add a rule to do filter pushdown for windowed sort in the future if we find a way to avoid the performance regression. physical_optimizer .rules .push(Arc::new(MatchesConstantTermOptimizer)); diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 2a078f0a84..b3f460d01d 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -26,7 +26,7 @@ use common_error::ext::BoxedError; use common_recordbatch::{EmptyRecordBatchStream, MemoryPermit, SendableRecordBatchStream}; use common_time::Timestamp; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; -use datafusion_physical_plan::{DisplayAs, DisplayFormatType}; +use datafusion_physical_plan::{DisplayAs, DisplayFormatType, PhysicalExpr}; use datatypes::schema::SchemaRef; use futures::future::join_all; use serde::{Deserialize, Serialize}; @@ -451,6 +451,15 @@ pub trait RegionScanner: Debug + DisplayAs + Send { /// Check if there is any predicate exclude region partition exprs that may be executed in this scanner. fn has_predicate_without_region(&self) -> bool; + /// Add the given dynamic filter expressions to the predicate of the scanner. + /// Returns a vector of booleans indicating which filter expressions were applied. + /// true indicates the filter expression was applied(will be use by scanner to prune by stat for row group), + /// false otherwise. + fn add_dyn_filter_to_predicate( + &mut self, + filter_exprs: Vec>, + ) -> Vec; + /// Sets whether the scanner is reading a logical region. fn set_logical_region(&mut self, logical_region: bool); } @@ -996,6 +1005,13 @@ impl RegionScanner for SinglePartitionScanner { false } + fn add_dyn_filter_to_predicate( + &mut self, + filter_exprs: Vec>, + ) -> Vec { + vec![false; filter_exprs.len()] + } + fn metadata(&self) -> RegionMetadataRef { self.metadata.clone() } diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 0b330dc5f3..8ecc51a320 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -12,6 +12,7 @@ workspace = true [dependencies] api.workspace = true +arc-swap = "1.6" async-trait.workspace = true chrono.workspace = true common-base.workspace = true diff --git a/src/table/src/predicate.rs b/src/table/src/predicate.rs index 57d94fa361..f9be7be16e 100644 --- a/src/table/src/predicate.rs +++ b/src/table/src/predicate.rs @@ -14,7 +14,8 @@ use std::sync::Arc; -use common_telemetry::{error, warn}; +use arc_swap::ArcSwap; +use common_telemetry::{debug, warn}; use common_time::Timestamp; use common_time::range::TimestampRange; use common_time::timestamp::TimeUnit; @@ -53,14 +54,14 @@ macro_rules! return_none_if_utf8 { } /// Reference-counted pointer to a list of logical exprs and a list of dynamic filter physical exprs. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct Predicate { /// logical exprs exprs: Arc>, /// dynamic filter physical exprs, only useful if dynamic filtering is enabled /// /// They are usually from `TopK` or `Join` operators, and can dynamically filter data during query execution by using current runtime information to further reduce data scanning - dyn_filters: Arc>, + dyn_filters: Arc>>>, } impl Predicate { @@ -70,18 +71,33 @@ impl Predicate { pub fn new(exprs: Vec) -> Self { Self { exprs: Arc::new(exprs), - dyn_filters: Arc::new(vec![]), + dyn_filters: Arc::new(ArcSwap::new(Arc::new(vec![]))), } } - /// Sets the dynamic filter physical exprs. - pub fn with_dyn_filters(self, dyn_filters: Arc>) -> Self { + pub fn with_dyn_filters( + exprs: Vec, + dyn_filters: Vec>, + ) -> Self { Self { - exprs: self.exprs, - dyn_filters, + exprs: Arc::new(exprs), + dyn_filters: Arc::new(ArcSwap::new(Arc::new(dyn_filters))), } } + pub fn is_empty(&self) -> bool { + self.exprs.is_empty() && self.dyn_filters.load().is_empty() + } + + /// Adds dynamic filter physical exprs to the existing list. + pub fn add_dyn_filters(&self, dyn_filters: Vec>) { + self.dyn_filters.rcu(|existing| { + let mut new_filters = existing.as_ref().clone(); + new_filters.extend(dyn_filters.clone()); + Arc::new(new_filters) + }); + } + /// Returns the logical exprs. pub fn exprs(&self) -> &[Expr] { &self.exprs @@ -89,14 +105,15 @@ impl Predicate { /// Returns the dynamic filter physical exprs. Notice this return a live dynamic filters which /// can change during query execution. - pub fn dyn_filters(&self) -> &Arc> { - &self.dyn_filters + pub fn dyn_filters(&self) -> Arc>> { + self.dyn_filters.load_full() } /// Returns the dynamic filter as physical exprs. Notice this return a "snapshot" of /// dynamic filters at the time of calling this method. pub fn dyn_filter_phy_exprs(&self) -> error::Result>> { self.dyn_filters + .load() .iter() .map(|e| e.current()) .collect::, _>>() @@ -157,7 +174,8 @@ impl Predicate { } }, Err(e) => { - error!(e; "Failed to create predicate for expr"); + // since dynamic filter exprs could be complex, it's possible that `PruningPredicate::try_new` fails to prove anything from it. In that case, we just log it and skip pruning with this expr. + debug!("Failed to create pruning predicate for expr: {e:?}"); } } } diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index ae9c734d33..e2d8f794da 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -30,7 +30,7 @@ use datafusion::error::Result as DfResult; use datafusion::execution::context::TaskContext; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::filter_pushdown::{ - ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, + ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown, }; use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::{ @@ -53,6 +53,7 @@ use store_api::storage::{ScanRequest, TimeSeriesDistribution}; use crate::table::metrics::StreamMetrics; /// A plan to read multiple partitions from a region of a table. +#[derive(Clone)] pub struct RegionScanExec { scanner: Arc>, arrow_schema: ArrowSchemaRef, @@ -444,8 +445,27 @@ impl ExecutionPlan for RegionScanExec { child_pushdown_result: ChildPushdownResult, _config: &datafusion::config::ConfigOptions, ) -> DfResult>> { - // TODO(discord9): use the pushdown result to update the scanner's predicate - Ok(FilterPushdownPropagation::if_all(child_pushdown_result)) + let parent_filters = child_pushdown_result + .parent_filters + .into_iter() + .map(|f| f.filter) + .collect::>(); + + let supported = self + .scanner + .lock() + .unwrap() + .add_dyn_filter_to_predicate(parent_filters); + // datafusion api require to clone self after mutate, even though we are only mutate inside mutex + let new_self = Arc::new(self.clone()); + + Ok(FilterPushdownPropagation { + filters: supported + .into_iter() + .map(|s| if s { PushedDown::Yes } else { PushedDown::No }) + .collect(), + updated_node: Some(new_self), + }) } } diff --git a/tests/cases/standalone/common/filter/hash_join_dyn_filter.result b/tests/cases/standalone/common/filter/hash_join_dyn_filter.result new file mode 100644 index 0000000000..3ec77d457e --- /dev/null +++ b/tests/cases/standalone/common/filter/hash_join_dyn_filter.result @@ -0,0 +1,253 @@ +-- Dynamic Filter Test: Hash Join +-- Tests dynamic filters generated by hash join +-- Test 1.1: Basic Hash Join +CREATE TABLE orders ("id" INT, customer_id INT, amount DOUBLE, ts TIMESTAMP TIME INDEX); + +Affected Rows: 0 + +CREATE TABLE customers (customer_id INT, "name" STRING, tier STRING, ts TIMESTAMP TIME INDEX); + +Affected Rows: 0 + +-- Insert sufficient test data +INSERT INTO orders VALUES + (1, 1, 100.0, '2024-01-01 00:00:00'), + (2, 2, 200.0, '2024-01-02 00:00:00'), + (3, 1, 150.0, '2024-01-03 00:00:00'); + +Affected Rows: 3 + +-- Insert more rows to trigger dynamic filter +INSERT INTO customers VALUES + (1, 'Alice', 'gold', '2024-01-01 00:00:00'), + (2, 'Bob', 'silver', '2024-01-02 00:00:00'); + +Affected Rows: 2 + +-- Test: Basic hash join should produce dynamic filter +-- The hash join will build hash table from customers (small table) +-- and probe with orders (larger table), potentially using dynamic filter +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN SELECT o."id", o.amount, c."name", c.tier +FROM orders o +JOIN customers c ON o.customer_id = c.customer_id +WHERE c.tier = 'gold'; + ++---------------+-----------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-----------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: o.id, o.amount, c.name, c.tier | +| | Inner Join: o.customer_id = c.customer_id | +| | Projection: o.id, o.customer_id, o.amount | +| | MergeScan [is_placeholder=false, remote_input=[ | +| | SubqueryAlias: o | +| | TableScan: orders | +| | ]] | +| | Filter: c.tier = Utf8("gold") | +| | Projection: c.customer_id, c.name, c.tier | +| | MergeScan [is_placeholder=false, remote_input=[ | +| | SubqueryAlias: c | +| | TableScan: customers | +| | ]] | +| physical_plan | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(customer_id@1, customer_id@0)], projection=[id@0, amount@2, name@4, tier@5] | +| | RepartitionExec: REDACTED +| | ProjectionExec: expr=[id@0 as id, customer_id@1 as customer_id, amount@2 as amount] | +| | MergeScanExec: REDACTED +| | RepartitionExec: REDACTED +| | FilterExec: tier@2 = gold | +| | ProjectionExec: expr=[customer_id@0 as customer_id, name@1 as name, tier@2 as tier] | +| | MergeScanExec: REDACTED +| | | ++---------------+-----------------------------------------------------------------------------------------------------------------------------------+ + +-- SQLNESS REPLACE ("metrics_per_partition":\s*.*metrics=) "metrics_per_partition": REDACTED metrics= +-- SQLNESS REPLACE (metrics=\{.*\}) metrics=REDACTED +-- SQLNESS REPLACE (metrics=\[[^\]]*\]) metrics=REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (=Hash.*) =REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE "(file_id|time_range_start|time_range_end)":"[^"]+" "$1":"REDACTED" +-- SQLNESS REPLACE ("[a-z_]+":"[0-9\.]+(ns|us|µs|ms|s)") "DURATION": REDACTED +-- SQLNESS REPLACE "(size|flat_format)":\s*(\d+|true|false) "$1":REDACTED +-- SQLNESS REPLACE ,\s*filter=.*?metrics= metrics= +-- SQLNESS REPLACE Total\s+rows:\s+\d+ Total rows: REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED +EXPLAIN ANALYZE VERBOSE SELECT o."id", o.amount, c."name", c.tier +FROM orders o +JOIN customers c ON o.customer_id = c.customer_id +WHERE c.tier = 'gold'; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(customer_id@1, customer_id@0)], projection=[id@0, amount@2, name@4, tier@5] metrics=REDACTED_| +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_ProjectionExec: expr=[id@0 as id, customer_id@1 as customer_id, amount@2 as amount] metrics=REDACTED_| +|_|_|_MergeScanExec: REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_FilterExec: tier@2 = gold metrics=REDACTED_| +|_|_|_ProjectionExec: expr=[customer_id@0 as customer_id, name@1 as name, tier@2 as tier] metrics=REDACTED_| +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_CooperativeExec metrics=REDACTED_| +|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["id", "customer_id", "amount", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED | +|_|_|_| +| 1_| 0_|_CooperativeExec metrics=REDACTED_| +|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["customer_id", "name", "tier", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED_| +|_|_|_| +|_|_| Total rows: REDACTED_| ++-+-+-+ + +-- Verify correctness +-- SQLNESS SORT_RESULT +SELECT o."id", o.amount, c."name", c.tier +FROM orders o +JOIN customers c ON o.customer_id = c.customer_id +WHERE c.tier = 'gold'; + ++----+--------+-------+------+ ++----+--------+-------+------+ ++----+--------+-------+------+ +| 1 | 100.0 | Alice | gold | +| 3 | 150.0 | Alice | gold | +| id | amount | name | tier | + +DROP TABLE orders; + +Affected Rows: 0 + +DROP TABLE customers; + +Affected Rows: 0 + +-- Test 1.2: Hash Join with Projection Alias +CREATE TABLE orders ("id" INT, customer_id INT, amount DOUBLE, ts TIMESTAMP TIME INDEX); + +Affected Rows: 0 + +CREATE TABLE customers (customer_id INT, "name" STRING, tier STRING, ts TIMESTAMP TIME INDEX); + +Affected Rows: 0 + +-- Insert test data +INSERT INTO orders VALUES + (1, 1, 100.0, '2024-01-01 00:00:00'), + (2, 2, 200.0, '2024-01-02 00:00:00'), + (3, 1, 150.0, '2024-01-03 00:00:00'), + (4, 3, 300.0, '2024-01-04 00:00:00'); + +Affected Rows: 4 + +INSERT INTO customers VALUES + (1, 'Alice', 'gold', '2024-01-01 00:00:00'), + (2, 'Bob', 'silver', '2024-01-02 00:00:00'), + (3, 'Charlie', 'bronze', '2024-01-03 00:00:00'); + +Affected Rows: 3 + +-- Test: Hash join with projection alias +-- The alias on customer_id tests that dynamic filter correctly handles column references +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN SELECT o."id", o.amount, c."name", c.tier +FROM (SELECT "id", customer_id as cid, amount, ts FROM orders) o +JOIN customers c ON o.cid = c.customer_id +WHERE c.tier IN ('gold', 'silver'); + ++---------------+---------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: o.id, o.amount, c.name, c.tier | +| | Inner Join: o.cid = c.customer_id | +| | Projection: o.id, o.cid, o.amount | +| | MergeScan [is_placeholder=false, remote_input=[ | +| | SubqueryAlias: o | +| | Projection: orders.id, orders.customer_id AS cid, orders.amount, orders.ts | +| | TableScan: orders | +| | ]] | +| | Filter: c.tier = Utf8("gold") OR c.tier = Utf8("silver") | +| | Projection: c.customer_id, c.name, c.tier | +| | MergeScan [is_placeholder=false, remote_input=[ | +| | SubqueryAlias: c | +| | TableScan: customers | +| | ]] | +| physical_plan | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(cid@1, customer_id@0)], projection=[id@0, amount@2, name@4, tier@5] | +| | RepartitionExec: REDACTED +| | ProjectionExec: expr=[id@0 as id, cid@1 as cid, amount@2 as amount] | +| | MergeScanExec: REDACTED +| | RepartitionExec: REDACTED +| | FilterExec: tier@2 = gold OR tier@2 = silver | +| | ProjectionExec: expr=[customer_id@0 as customer_id, name@1 as name, tier@2 as tier] | +| | MergeScanExec: REDACTED +| | | ++---------------+---------------------------------------------------------------------------------------------------------------------------+ + +-- SQLNESS REPLACE ("metrics_per_partition":\s*.*metrics=) "metrics_per_partition": REDACTED metrics= +-- SQLNESS REPLACE (metrics=\{.*\}) metrics=REDACTED +-- SQLNESS REPLACE (metrics=\[[^\]]*\]) metrics=REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (=Hash.*) =REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE "(file_id|time_range_start|time_range_end)":"[^"]+" "$1":"REDACTED" +-- SQLNESS REPLACE ("[a-z_]+":"[0-9\.]+(ns|us|µs|ms|s)") "DURATION": REDACTED +-- SQLNESS REPLACE "(size|flat_format)":\s*(\d+|true|false) "$1":REDACTED +-- SQLNESS REPLACE ,\s*filter=.*?metrics= metrics= +-- SQLNESS REPLACE Total\s+rows:\s+\d+ Total rows: REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED +EXPLAIN ANALYZE VERBOSE SELECT o."id", o.amount, c."name", c.tier +FROM (SELECT "id", customer_id as cid, amount, ts FROM orders) o +JOIN customers c ON o.cid = c.customer_id +WHERE c.tier IN ('gold', 'silver'); + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(cid@1, customer_id@0)], projection=[id@0, amount@2, name@4, tier@5] metrics=REDACTED_| +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_ProjectionExec: expr=[id@0 as id, cid@1 as cid, amount@2 as amount] metrics=REDACTED_| +|_|_|_MergeScanExec: REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_FilterExec: tier@2 = gold OR tier@2 = silver metrics=REDACTED_| +|_|_|_ProjectionExec: expr=[customer_id@0 as customer_id, name@1 as name, tier@2 as tier] metrics=REDACTED_| +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_ProjectionExec: expr=[id@0 as id, customer_id@1 as cid, amount@2 as amount, ts@3 as ts] metrics=REDACTED_| +|_|_|_CooperativeExec metrics=REDACTED_| +|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["id", "customer_id", "amount", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED | +|_|_|_| +| 1_| 0_|_CooperativeExec metrics=REDACTED_| +|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["customer_id", "name", "tier", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED_| +|_|_|_| +|_|_| Total rows: REDACTED_| ++-+-+-+ + +-- Verify correctness +-- SQLNESS SORT_RESULT +SELECT o."id", o.amount, c."name", c.tier +FROM (SELECT "id", customer_id as cid, amount, ts FROM orders) o +JOIN customers c ON o.cid = c.customer_id +WHERE c.tier IN ('gold', 'silver'); + ++----+--------+-------+--------+ ++----+--------+-------+--------+ ++----+--------+-------+--------+ +| 1 | 100.0 | Alice | gold | +| 2 | 200.0 | Bob | silver | +| 3 | 150.0 | Alice | gold | +| id | amount | name | tier | + +DROP TABLE orders; + +Affected Rows: 0 + +DROP TABLE customers; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/filter/hash_join_dyn_filter.sql b/tests/cases/standalone/common/filter/hash_join_dyn_filter.sql new file mode 100644 index 0000000000..29bc86b315 --- /dev/null +++ b/tests/cases/standalone/common/filter/hash_join_dyn_filter.sql @@ -0,0 +1,112 @@ +-- Dynamic Filter Test: Hash Join +-- Tests dynamic filters generated by hash join + +-- Test 1.1: Basic Hash Join +CREATE TABLE orders ("id" INT, customer_id INT, amount DOUBLE, ts TIMESTAMP TIME INDEX); +CREATE TABLE customers (customer_id INT, "name" STRING, tier STRING, ts TIMESTAMP TIME INDEX); + +-- Insert sufficient test data +INSERT INTO orders VALUES + (1, 1, 100.0, '2024-01-01 00:00:00'), + (2, 2, 200.0, '2024-01-02 00:00:00'), + (3, 1, 150.0, '2024-01-03 00:00:00'); +-- Insert more rows to trigger dynamic filter + +INSERT INTO customers VALUES + (1, 'Alice', 'gold', '2024-01-01 00:00:00'), + (2, 'Bob', 'silver', '2024-01-02 00:00:00'); + +-- Test: Basic hash join should produce dynamic filter +-- The hash join will build hash table from customers (small table) +-- and probe with orders (larger table), potentially using dynamic filter +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN SELECT o."id", o.amount, c."name", c.tier +FROM orders o +JOIN customers c ON o.customer_id = c.customer_id +WHERE c.tier = 'gold'; + +-- SQLNESS REPLACE ("metrics_per_partition":\s*.*metrics=) "metrics_per_partition": REDACTED metrics= +-- SQLNESS REPLACE (metrics=\{.*\}) metrics=REDACTED +-- SQLNESS REPLACE (metrics=\[[^\]]*\]) metrics=REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (=Hash.*) =REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE "(file_id|time_range_start|time_range_end)":"[^"]+" "$1":"REDACTED" +-- SQLNESS REPLACE ("[a-z_]+":"[0-9\.]+(ns|us|µs|ms|s)") "DURATION": REDACTED +-- SQLNESS REPLACE "(size|flat_format)":\s*(\d+|true|false) "$1":REDACTED +-- SQLNESS REPLACE ,\s*filter=.*?metrics= metrics= +-- SQLNESS REPLACE Total\s+rows:\s+\d+ Total rows: REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED +EXPLAIN ANALYZE VERBOSE SELECT o."id", o.amount, c."name", c.tier +FROM orders o +JOIN customers c ON o.customer_id = c.customer_id +WHERE c.tier = 'gold'; + +-- Verify correctness +-- SQLNESS SORT_RESULT +SELECT o."id", o.amount, c."name", c.tier +FROM orders o +JOIN customers c ON o.customer_id = c.customer_id +WHERE c.tier = 'gold'; + +DROP TABLE orders; +DROP TABLE customers; + +-- Test 1.2: Hash Join with Projection Alias +CREATE TABLE orders ("id" INT, customer_id INT, amount DOUBLE, ts TIMESTAMP TIME INDEX); +CREATE TABLE customers (customer_id INT, "name" STRING, tier STRING, ts TIMESTAMP TIME INDEX); + +-- Insert test data +INSERT INTO orders VALUES + (1, 1, 100.0, '2024-01-01 00:00:00'), + (2, 2, 200.0, '2024-01-02 00:00:00'), + (3, 1, 150.0, '2024-01-03 00:00:00'), + (4, 3, 300.0, '2024-01-04 00:00:00'); + +INSERT INTO customers VALUES + (1, 'Alice', 'gold', '2024-01-01 00:00:00'), + (2, 'Bob', 'silver', '2024-01-02 00:00:00'), + (3, 'Charlie', 'bronze', '2024-01-03 00:00:00'); + +-- Test: Hash join with projection alias +-- The alias on customer_id tests that dynamic filter correctly handles column references +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN SELECT o."id", o.amount, c."name", c.tier +FROM (SELECT "id", customer_id as cid, amount, ts FROM orders) o +JOIN customers c ON o.cid = c.customer_id +WHERE c.tier IN ('gold', 'silver'); + +-- SQLNESS REPLACE ("metrics_per_partition":\s*.*metrics=) "metrics_per_partition": REDACTED metrics= +-- SQLNESS REPLACE (metrics=\{.*\}) metrics=REDACTED +-- SQLNESS REPLACE (metrics=\[[^\]]*\]) metrics=REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (=Hash.*) =REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE "(file_id|time_range_start|time_range_end)":"[^"]+" "$1":"REDACTED" +-- SQLNESS REPLACE ("[a-z_]+":"[0-9\.]+(ns|us|µs|ms|s)") "DURATION": REDACTED +-- SQLNESS REPLACE "(size|flat_format)":\s*(\d+|true|false) "$1":REDACTED +-- SQLNESS REPLACE ,\s*filter=.*?metrics= metrics= +-- SQLNESS REPLACE Total\s+rows:\s+\d+ Total rows: REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED +EXPLAIN ANALYZE VERBOSE SELECT o."id", o.amount, c."name", c.tier +FROM (SELECT "id", customer_id as cid, amount, ts FROM orders) o +JOIN customers c ON o.cid = c.customer_id +WHERE c.tier IN ('gold', 'silver'); + +-- Verify correctness +-- SQLNESS SORT_RESULT +SELECT o."id", o.amount, c."name", c.tier +FROM (SELECT "id", customer_id as cid, amount, ts FROM orders) o +JOIN customers c ON o.cid = c.customer_id +WHERE c.tier IN ('gold', 'silver'); + +DROP TABLE orders; +DROP TABLE customers; diff --git a/tests/cases/standalone/common/filter/hash_join_topk_dyn_filter.result b/tests/cases/standalone/common/filter/hash_join_topk_dyn_filter.result new file mode 100644 index 0000000000..c3455f84d8 --- /dev/null +++ b/tests/cases/standalone/common/filter/hash_join_topk_dyn_filter.result @@ -0,0 +1,493 @@ +-- Dynamic Filter Test: Combined Hash Join + TopK +-- Tests dynamic filters with both hash join and TopK operators +-- Test 3.1: Hash Join with TopK Subquery +CREATE TABLE orders ("id" INT, customer_id INT, amount DOUBLE, ts TIMESTAMP TIME INDEX); + +Affected Rows: 0 + +CREATE TABLE customers (customer_id INT, "name" STRING, tier STRING, ts TIMESTAMP TIME INDEX); + +Affected Rows: 0 + +-- Insert test data +INSERT INTO orders VALUES + (1, 1, 100.0, '2024-01-01 00:00:00'), + (2, 2, 200.0, '2024-01-02 00:00:00'), + (3, 3, 300.0, '2024-01-03 00:00:00'), + (4, 1, 150.0, '2024-01-04 00:00:00'), + (5, 2, 250.0, '2024-01-05 00:00:00'), + (6, 1, 175.0, '2024-01-06 00:00:00'), + (7, 3, 325.0, '2024-01-07 00:00:00'), + (8, 2, 225.0, '2024-01-08 00:00:00'), + (9, 1, 400.0, '2024-01-09 00:00:00'), + (10, 3, 500.0, '2024-01-10 00:00:00'); + +Affected Rows: 10 + +INSERT INTO customers VALUES + (1, 'Alice', 'gold', '2024-01-01 00:00:00'), + (2, 'Bob', 'silver', '2024-01-02 00:00:00'), + (3, 'Charlie', 'bronze', '2024-01-03 00:00:00'); + +Affected Rows: 3 + +-- Test: Hash join where probe side uses TopK +-- This combines both dynamic filter sources: TopK generates filter on amount, +-- and hash join generates filter on customer_id +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE (=Hash.*) =REDACTED +EXPLAIN SELECT top_orders."id", top_orders.amount, c."name", c.tier +FROM ( + SELECT "id", customer_id, amount, ts + FROM orders + ORDER BY amount DESC + LIMIT 5 +) top_orders +JOIN customers c ON top_orders.customer_id = c.customer_id +WHERE c.tier IN ('gold', 'bronze'); + ++---------------+-----------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-----------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: top_orders.id, top_orders.amount, c.name, c.tier | +| | Inner Join: top_orders.customer_id = c.customer_id | +| | Projection: top_orders.id, top_orders.customer_id, top_orders.amount | +| | MergeScan [is_placeholder=false, remote_input=[ | +| | SubqueryAlias: top_orders | +| | Limit: skip=0, fetch=5 | +| | Sort: orders.amount DESC NULLS FIRST | +| | Projection: orders.id, orders.customer_id, orders.amount, orders.ts | +| | TableScan: orders | +| | ]] | +| | Filter: c.tier = Utf8("gold") OR c.tier = Utf8("bronze") | +| | Projection: c.customer_id, c.name, c.tier | +| | MergeScan [is_placeholder=false, remote_input=[ | +| | SubqueryAlias: c | +| | TableScan: customers | +| | ]] | +| physical_plan | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(customer_id@1, customer_id@0)], projection=[id@0, amount@2, name@4, tier@5] | +| | RepartitionExec: partitioning=REDACTED +| | ProjectionExec: expr=[id@0 as id, customer_id@1 as customer_id, amount@2 as amount] | +| | MergeScanExec: REDACTED +| | RepartitionExec: partitioning=REDACTED +| | FilterExec: tier@2 = gold OR tier@2 = bronze | +| | ProjectionExec: expr=[customer_id@0 as customer_id, name@1 as name, tier@2 as tier] | +| | MergeScanExec: REDACTED +| | | ++---------------+-----------------------------------------------------------------------------------------------------------------------------------+ + +-- SQLNESS REPLACE ("metrics_per_partition":\s*.*metrics=) "metrics_per_partition": REDACTED metrics= +-- SQLNESS REPLACE (metrics=\{.*\}) metrics=REDACTED +-- SQLNESS REPLACE (metrics=\[[^\]]*\]) metrics=REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (=Hash.*) =REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE "(file_id|time_range_start|time_range_end)":"[^"]+" "$1":"REDACTED" +-- SQLNESS REPLACE ("[a-z_]+":"[0-9\.]+(ns|us|µs|ms|s)") "DURATION": REDACTED +-- SQLNESS REPLACE "(size|flat_format)":\s*(\d+|true|false) "$1":REDACTED +-- SQLNESS REPLACE ,\s*filter=.*?metrics= metrics= +-- SQLNESS REPLACE Total\s+rows:\s+\d+ Total rows: REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED +EXPLAIN ANALYZE VERBOSE SELECT top_orders."id", top_orders.amount, c."name", c.tier +FROM ( + SELECT "id", customer_id, amount, ts + FROM orders + ORDER BY amount DESC + LIMIT 5 +) top_orders +JOIN customers c ON top_orders.customer_id = c.customer_id +WHERE c.tier IN ('gold', 'bronze'); + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(customer_id@1, customer_id@0)], projection=[id@0, amount@2, name@4, tier@5] metrics=REDACTED_| +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_ProjectionExec: expr=[id@0 as id, customer_id@1 as customer_id, amount@2 as amount] metrics=REDACTED_| +|_|_|_MergeScanExec: REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_FilterExec: tier@2 = gold OR tier@2 = bronze metrics=REDACTED_| +|_|_|_ProjectionExec: expr=[customer_id@0 as customer_id, name@1 as name, tier@2 as tier] metrics=REDACTED_| +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_SortPreservingMergeExec: [amount@2 DESC], fetch=5 metrics=REDACTED_| +|_|_|_SortExec: TopK(fetch=5), expr=[amount@2 DESC], preserve_partitioning=[true] metrics=REDACTED_| +|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["id", "customer_id", "amount", "ts"], "dyn_filters": ["DynamicFilter [ empty ]"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED | +|_|_|_| +| 1_| 0_|_CooperativeExec metrics=REDACTED_| +|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["customer_id", "name", "tier", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED_| +|_|_|_| +|_|_| Total rows: REDACTED_| ++-+-+-+ + +-- Verify correctness +-- SQLNESS SORT_RESULT +SELECT top_orders."id", top_orders.amount, c."name", c.tier +FROM ( + SELECT "id", customer_id, amount, ts + FROM orders + ORDER BY amount DESC + LIMIT 5 +) top_orders +JOIN customers c ON top_orders.customer_id = c.customer_id +WHERE c.tier IN ('gold', 'bronze'); + ++----+--------+---------+--------+ ++----+--------+---------+--------+ ++----+--------+---------+--------+ +| 10 | 500.0 | Charlie | bronze | +| 3 | 300.0 | Charlie | bronze | +| 7 | 325.0 | Charlie | bronze | +| 9 | 400.0 | Alice | gold | +| id | amount | name | tier | + +DROP TABLE orders; + +Affected Rows: 0 + +DROP TABLE customers; + +Affected Rows: 0 + +-- Test 3.2: TopK with Hash Join Subquery +CREATE TABLE orders ("id" INT, customer_id INT, amount DOUBLE, ts TIMESTAMP TIME INDEX); + +Affected Rows: 0 + +CREATE TABLE customers (customer_id INT, "name" STRING, tier STRING, ts TIMESTAMP TIME INDEX); + +Affected Rows: 0 + +-- Insert test data +INSERT INTO orders VALUES + (1, 1, 100.0, '2024-01-01 00:00:00'), + (2, 2, 200.0, '2024-01-02 00:00:00'), + (3, 3, 300.0, '2024-01-03 00:00:00'), + (4, 1, 150.0, '2024-01-04 00:00:00'), + (5, 2, 250.0, '2024-01-05 00:00:00'), + (6, 1, 175.0, '2024-01-06 00:00:00'), + (7, 3, 325.0, '2024-01-07 00:00:00'), + (8, 2, 225.0, '2024-01-08 00:00:00'), + (9, 1, 400.0, '2024-01-09 00:00:00'), + (10, 3, 500.0, '2024-01-10 00:00:00'); + +Affected Rows: 10 + +INSERT INTO customers VALUES + (1, 'Alice', 'gold', '2024-01-01 00:00:00'), + (2, 'Bob', 'silver', '2024-01-02 00:00:00'), + (3, 'Charlie', 'bronze', '2024-01-03 00:00:00'); + +Affected Rows: 3 + +-- Test: TopK where sorting side uses Hash Join +-- Hash join produces customer info, then TopK sorts the joined results +-- This tests that TopK dynamic filter works on hash join output +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE (=Hash.*) =REDACTED +EXPLAIN SELECT "id", customer_id, "name", tier, amount +FROM ( + SELECT o."id", o.customer_id, c."name", c.tier, o.amount + FROM orders o + JOIN customers c ON o.customer_id = c.customer_id + WHERE c.tier IN ('gold', 'silver') +) +ORDER BY amount DESC +LIMIT 4; + ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Sort: o.amount DESC NULLS FIRST, fetch=4 | +| | Projection: o.id, o.customer_id, c.name, c.tier, o.amount | +| | Inner Join: o.customer_id = c.customer_id | +| | Projection: o.id, o.customer_id, o.amount | +| | MergeScan [is_placeholder=false, remote_input=[ | +| | SubqueryAlias: o | +| | TableScan: orders | +| | ]] | +| | Filter: c.tier = Utf8("gold") OR c.tier = Utf8("silver") | +| | Projection: c.customer_id, c.name, c.tier | +| | MergeScan [is_placeholder=false, remote_input=[ | +| | SubqueryAlias: c | +| | TableScan: customers | +| | ]] | +| physical_plan | SortPreservingMergeExec: [amount@4 DESC], fetch=4 | +| | SortExec: TopK(fetch=4), expr=[amount@4 DESC], preserve_partitioning=[true] | +| | ProjectionExec: expr=[id@0 as id, customer_id@1 as customer_id, name@3 as name, tier@4 as tier, amount@2 as amount] | +| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(customer_id@1, customer_id@0)], projection=[id@0, customer_id@1, amount@2, name@4, tier@5] | +| | RepartitionExec: partitioning=REDACTED +| | ProjectionExec: expr=[id@0 as id, customer_id@1 as customer_id, amount@2 as amount] | +| | MergeScanExec: REDACTED +| | RepartitionExec: partitioning=REDACTED +| | FilterExec: tier@2 = gold OR tier@2 = silver | +| | ProjectionExec: expr=[customer_id@0 as customer_id, name@1 as name, tier@2 as tier] | +| | MergeScanExec: REDACTED +| | | ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+ + +-- SQLNESS REPLACE ("metrics_per_partition":\s*.*metrics=) "metrics_per_partition": REDACTED metrics= +-- SQLNESS REPLACE (metrics=\{.*\}) metrics=REDACTED +-- SQLNESS REPLACE (metrics=\[[^\]]*\]) metrics=REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (=Hash.*) =REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE "(file_id|time_range_start|time_range_end)":"[^"]+" "$1":"REDACTED" +-- SQLNESS REPLACE ("[a-z_]+":"[0-9\.]+(ns|us|µs|ms|s)") "DURATION": REDACTED +-- SQLNESS REPLACE "(size|flat_format)":\s*(\d+|true|false) "$1":REDACTED +-- SQLNESS REPLACE ,\s*filter=.*?metrics= metrics= +-- SQLNESS REPLACE Total\s+rows:\s+\d+ Total rows: REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED +EXPLAIN ANALYZE VERBOSE SELECT "id", customer_id, "name", tier, amount +FROM ( + SELECT o."id", o.customer_id, c."name", c.tier, o.amount + FROM orders o + JOIN customers c ON o.customer_id = c.customer_id + WHERE c.tier IN ('gold', 'silver') +) +ORDER BY amount DESC +LIMIT 4; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_SortPreservingMergeExec: [amount@4 DESC], fetch=4 metrics=REDACTED_| +|_|_|_SortExec: TopK(fetch=4), expr=[amount@4 DESC], preserve_partitioning=[true] metrics=REDACTED_| +|_|_|_ProjectionExec: expr=[id@0 as id, customer_id@1 as customer_id, name@3 as name, tier@4 as tier, amount@2 as amount] metrics=REDACTED_| +|_|_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(customer_id@1, customer_id@0)], projection=[id@0, customer_id@1, amount@2, name@4, tier@5] metrics=REDACTED_| +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_ProjectionExec: expr=[id@0 as id, customer_id@1 as customer_id, amount@2 as amount] metrics=REDACTED_| +|_|_|_MergeScanExec: REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_FilterExec: tier@2 = gold OR tier@2 = silver metrics=REDACTED_| +|_|_|_ProjectionExec: expr=[customer_id@0 as customer_id, name@1 as name, tier@2 as tier] metrics=REDACTED_| +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_CooperativeExec metrics=REDACTED_| +|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["id", "customer_id", "amount", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED | +|_|_|_| +| 1_| 0_|_CooperativeExec metrics=REDACTED_| +|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["customer_id", "name", "tier", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED_| +|_|_|_| +|_|_| Total rows: REDACTED_| ++-+-+-+ + +-- Verify correctness +SELECT "id", customer_id, "name", tier, amount +FROM ( + SELECT o."id", o.customer_id, c."name", c.tier, o.amount + FROM orders o + JOIN customers c ON o.customer_id = c.customer_id + WHERE c.tier IN ('gold', 'silver') +) +ORDER BY amount DESC +LIMIT 4; + ++----+-------------+-------+--------+--------+ +| id | customer_id | name | tier | amount | ++----+-------------+-------+--------+--------+ +| 9 | 1 | Alice | gold | 400.0 | +| 5 | 2 | Bob | silver | 250.0 | +| 8 | 2 | Bob | silver | 225.0 | +| 2 | 2 | Bob | silver | 200.0 | ++----+-------------+-------+--------+--------+ + +DROP TABLE orders; + +Affected Rows: 0 + +DROP TABLE customers; + +Affected Rows: 0 + +-- Test 3.3: Multiple Dynamic Filters +CREATE TABLE orders ("id" INT, customer_id INT, product_id INT, amount DOUBLE, ts TIMESTAMP TIME INDEX); + +Affected Rows: 0 + +CREATE TABLE customers (customer_id INT, "name" STRING, tier STRING, ts TIMESTAMP TIME INDEX); + +Affected Rows: 0 + +CREATE TABLE products (product_id INT, "name" STRING, "category" STRING, ts TIMESTAMP TIME INDEX); + +Affected Rows: 0 + +-- Insert test data +INSERT INTO orders VALUES + (1, 1, 101, 100.0, '2024-01-01 00:00:00'), + (2, 2, 102, 200.0, '2024-01-02 00:00:00'), + (3, 3, 103, 300.0, '2024-01-03 00:00:00'), + (4, 1, 101, 150.0, '2024-01-04 00:00:00'), + (5, 2, 102, 250.0, '2024-01-05 00:00:00'), + (6, 1, 103, 175.0, '2024-01-06 00:00:00'), + (7, 3, 101, 325.0, '2024-01-07 00:00:00'), + (8, 2, 102, 225.0, '2024-01-08 00:00:00'), + (9, 1, 103, 400.0, '2024-01-09 00:00:00'), + (10, 3, 101, 500.0, '2024-01-10 00:00:00'); + +Affected Rows: 10 + +INSERT INTO customers VALUES + (1, 'Alice', 'gold', '2024-01-01 00:00:00'), + (2, 'Bob', 'silver', '2024-01-02 00:00:00'), + (3, 'Charlie', 'bronze', '2024-01-03 00:00:00'); + +Affected Rows: 3 + +INSERT INTO products VALUES + (101, 'Laptop', 'electronics', '2024-01-01 00:00:00'), + (102, 'Mouse', 'electronics', '2024-01-02 00:00:00'), + (103, 'Keyboard', 'electronics', '2024-01-03 00:00:00'); + +Affected Rows: 3 + +-- Test: Multiple hash joins producing dynamic filters +-- This creates a chain: orders JOIN customers JOIN products +-- Both hash joins can produce dynamic filters +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE (=Hash.*) =REDACTED +EXPLAIN SELECT o."id", o.amount, c."name", c.tier, p."name" as product_name, p."category" +FROM orders o +JOIN customers c ON o.customer_id = c.customer_id +JOIN products p ON o.product_id = p.product_id +WHERE c.tier IN ('gold', 'silver') + AND p."category" = 'electronics'; + ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: o.id, o.amount, c.name, c.tier, p.name AS product_name, p.category | +| | Inner Join: o.product_id = p.product_id | +| | Projection: o.id, o.product_id, o.amount, c.name, c.tier | +| | Inner Join: o.customer_id = c.customer_id | +| | Projection: o.id, o.customer_id, o.product_id, o.amount | +| | MergeScan [is_placeholder=false, remote_input=[ | +| | SubqueryAlias: o | +| | TableScan: orders | +| | ]] | +| | Filter: c.tier = Utf8("gold") OR c.tier = Utf8("silver") | +| | Projection: c.customer_id, c.name, c.tier | +| | MergeScan [is_placeholder=false, remote_input=[ | +| | SubqueryAlias: c | +| | TableScan: customers | +| | ]] | +| | Filter: p.category = Utf8("electronics") | +| | Projection: p.product_id, p.name, p.category | +| | MergeScan [is_placeholder=false, remote_input=[ | +| | SubqueryAlias: p | +| | TableScan: products | +| | ]] | +| physical_plan | ProjectionExec: expr=[id@0 as id, amount@1 as amount, name@2 as name, tier@3 as tier, name@4 as product_name, category@5 as category] | +| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(product_id@1, product_id@0)], projection=[id@0, amount@2, name@3, tier@4, name@6, category@7] | +| | RepartitionExec: partitioning=REDACTED +| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(customer_id@1, customer_id@0)], projection=[id@0, product_id@2, amount@3, name@5, tier@6] | +| | RepartitionExec: partitioning=REDACTED +| | ProjectionExec: expr=[id@0 as id, customer_id@1 as customer_id, product_id@2 as product_id, amount@3 as amount] | +| | MergeScanExec: REDACTED +| | RepartitionExec: partitioning=REDACTED +| | FilterExec: tier@2 = gold OR tier@2 = silver | +| | ProjectionExec: expr=[customer_id@0 as customer_id, name@1 as name, tier@2 as tier] | +| | MergeScanExec: REDACTED +| | RepartitionExec: partitioning=REDACTED +| | FilterExec: category@2 = electronics | +| | ProjectionExec: expr=[product_id@0 as product_id, name@1 as name, category@2 as category] | +| | MergeScanExec: REDACTED +| | | ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------+ + +-- SQLNESS REPLACE ("metrics_per_partition":\s*.*metrics=) "metrics_per_partition": REDACTED metrics= +-- SQLNESS REPLACE (metrics=\{.*\}) metrics=REDACTED +-- SQLNESS REPLACE (metrics=\[[^\]]*\]) metrics=REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (=Hash.*) =REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE "(file_id|time_range_start|time_range_end)":"[^"]+" "$1":"REDACTED" +-- SQLNESS REPLACE ("[a-z_]+":"[0-9\.]+(ns|us|µs|ms|s)") "DURATION": REDACTED +-- SQLNESS REPLACE "(size|flat_format)":\s*(\d+|true|false) "$1":REDACTED +-- SQLNESS REPLACE ,\s*filter=.*?metrics= metrics= +-- SQLNESS REPLACE Total\s+rows:\s+\d+ Total rows: REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED +EXPLAIN ANALYZE VERBOSE SELECT o."id", o.amount, c."name", c.tier, p."name" as product_name, p."category" +FROM orders o +JOIN customers c ON o.customer_id = c.customer_id +JOIN products p ON o.product_id = p.product_id +WHERE c.tier IN ('gold', 'silver') + AND p."category" = 'electronics'; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_ProjectionExec: expr=[id@0 as id, amount@1 as amount, name@2 as name, tier@3 as tier, name@4 as product_name, category@5 as category] metrics=REDACTED_| +|_|_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(product_id@1, product_id@0)], projection=[id@0, amount@2, name@3, tier@4, name@6, category@7] metrics=REDACTED_| +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(customer_id@1, customer_id@0)], projection=[id@0, product_id@2, amount@3, name@5, tier@6] metrics=REDACTED_| +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_ProjectionExec: expr=[id@0 as id, customer_id@1 as customer_id, product_id@2 as product_id, amount@3 as amount] metrics=REDACTED_| +|_|_|_MergeScanExec: REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_FilterExec: tier@2 = gold OR tier@2 = silver metrics=REDACTED_| +|_|_|_ProjectionExec: expr=[customer_id@0 as customer_id, name@1 as name, tier@2 as tier] metrics=REDACTED_| +|_|_|_MergeScanExec: REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_FilterExec: category@2 = electronics metrics=REDACTED_| +|_|_|_ProjectionExec: expr=[product_id@0 as product_id, name@1 as name, category@2 as category] metrics=REDACTED_| +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_CooperativeExec metrics=REDACTED_| +|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["id", "customer_id", "product_id", "amount", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED | +|_|_|_| +| 1_| 0_|_CooperativeExec metrics=REDACTED_| +|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["customer_id", "name", "tier", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED_| +|_|_|_| +| 1_| 0_|_CooperativeExec metrics=REDACTED_| +|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["product_id", "name", "category", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED_| +|_|_|_| +|_|_| Total rows: REDACTED_| ++-+-+-+ + +-- Verify correctness +-- SQLNESS SORT_RESULT +SELECT o."id", o.amount, c."name", c.tier, p."name" as product_name, p."category" +FROM orders o +JOIN customers c ON o.customer_id = c.customer_id +JOIN products p ON o.product_id = p.product_id +WHERE c.tier IN ('gold', 'silver') + AND p."category" = 'electronics'; + ++----+--------+-------+--------+--------------+-------------+ ++----+--------+-------+--------+--------------+-------------+ ++----+--------+-------+--------+--------------+-------------+ +| 1 | 100.0 | Alice | gold | Laptop | electronics | +| 2 | 200.0 | Bob | silver | Mouse | electronics | +| 4 | 150.0 | Alice | gold | Laptop | electronics | +| 5 | 250.0 | Bob | silver | Mouse | electronics | +| 6 | 175.0 | Alice | gold | Keyboard | electronics | +| 8 | 225.0 | Bob | silver | Mouse | electronics | +| 9 | 400.0 | Alice | gold | Keyboard | electronics | +| id | amount | name | tier | product_name | category | + +DROP TABLE orders; + +Affected Rows: 0 + +DROP TABLE customers; + +Affected Rows: 0 + +DROP TABLE products; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/filter/hash_join_topk_dyn_filter.sql b/tests/cases/standalone/common/filter/hash_join_topk_dyn_filter.sql new file mode 100644 index 0000000000..287d1ca064 --- /dev/null +++ b/tests/cases/standalone/common/filter/hash_join_topk_dyn_filter.sql @@ -0,0 +1,233 @@ +-- Dynamic Filter Test: Combined Hash Join + TopK +-- Tests dynamic filters with both hash join and TopK operators + +-- Test 3.1: Hash Join with TopK Subquery +CREATE TABLE orders ("id" INT, customer_id INT, amount DOUBLE, ts TIMESTAMP TIME INDEX); +CREATE TABLE customers (customer_id INT, "name" STRING, tier STRING, ts TIMESTAMP TIME INDEX); + +-- Insert test data +INSERT INTO orders VALUES + (1, 1, 100.0, '2024-01-01 00:00:00'), + (2, 2, 200.0, '2024-01-02 00:00:00'), + (3, 3, 300.0, '2024-01-03 00:00:00'), + (4, 1, 150.0, '2024-01-04 00:00:00'), + (5, 2, 250.0, '2024-01-05 00:00:00'), + (6, 1, 175.0, '2024-01-06 00:00:00'), + (7, 3, 325.0, '2024-01-07 00:00:00'), + (8, 2, 225.0, '2024-01-08 00:00:00'), + (9, 1, 400.0, '2024-01-09 00:00:00'), + (10, 3, 500.0, '2024-01-10 00:00:00'); + +INSERT INTO customers VALUES + (1, 'Alice', 'gold', '2024-01-01 00:00:00'), + (2, 'Bob', 'silver', '2024-01-02 00:00:00'), + (3, 'Charlie', 'bronze', '2024-01-03 00:00:00'); + +-- Test: Hash join where probe side uses TopK +-- This combines both dynamic filter sources: TopK generates filter on amount, +-- and hash join generates filter on customer_id +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE (=Hash.*) =REDACTED +EXPLAIN SELECT top_orders."id", top_orders.amount, c."name", c.tier +FROM ( + SELECT "id", customer_id, amount, ts + FROM orders + ORDER BY amount DESC + LIMIT 5 +) top_orders +JOIN customers c ON top_orders.customer_id = c.customer_id +WHERE c.tier IN ('gold', 'bronze'); + +-- SQLNESS REPLACE ("metrics_per_partition":\s*.*metrics=) "metrics_per_partition": REDACTED metrics= +-- SQLNESS REPLACE (metrics=\{.*\}) metrics=REDACTED +-- SQLNESS REPLACE (metrics=\[[^\]]*\]) metrics=REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (=Hash.*) =REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE "(file_id|time_range_start|time_range_end)":"[^"]+" "$1":"REDACTED" +-- SQLNESS REPLACE ("[a-z_]+":"[0-9\.]+(ns|us|µs|ms|s)") "DURATION": REDACTED +-- SQLNESS REPLACE "(size|flat_format)":\s*(\d+|true|false) "$1":REDACTED +-- SQLNESS REPLACE ,\s*filter=.*?metrics= metrics= +-- SQLNESS REPLACE Total\s+rows:\s+\d+ Total rows: REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED +EXPLAIN ANALYZE VERBOSE SELECT top_orders."id", top_orders.amount, c."name", c.tier +FROM ( + SELECT "id", customer_id, amount, ts + FROM orders + ORDER BY amount DESC + LIMIT 5 +) top_orders +JOIN customers c ON top_orders.customer_id = c.customer_id +WHERE c.tier IN ('gold', 'bronze'); + +-- Verify correctness +-- SQLNESS SORT_RESULT +SELECT top_orders."id", top_orders.amount, c."name", c.tier +FROM ( + SELECT "id", customer_id, amount, ts + FROM orders + ORDER BY amount DESC + LIMIT 5 +) top_orders +JOIN customers c ON top_orders.customer_id = c.customer_id +WHERE c.tier IN ('gold', 'bronze'); + +DROP TABLE orders; +DROP TABLE customers; + +-- Test 3.2: TopK with Hash Join Subquery +CREATE TABLE orders ("id" INT, customer_id INT, amount DOUBLE, ts TIMESTAMP TIME INDEX); +CREATE TABLE customers (customer_id INT, "name" STRING, tier STRING, ts TIMESTAMP TIME INDEX); + +-- Insert test data +INSERT INTO orders VALUES + (1, 1, 100.0, '2024-01-01 00:00:00'), + (2, 2, 200.0, '2024-01-02 00:00:00'), + (3, 3, 300.0, '2024-01-03 00:00:00'), + (4, 1, 150.0, '2024-01-04 00:00:00'), + (5, 2, 250.0, '2024-01-05 00:00:00'), + (6, 1, 175.0, '2024-01-06 00:00:00'), + (7, 3, 325.0, '2024-01-07 00:00:00'), + (8, 2, 225.0, '2024-01-08 00:00:00'), + (9, 1, 400.0, '2024-01-09 00:00:00'), + (10, 3, 500.0, '2024-01-10 00:00:00'); + +INSERT INTO customers VALUES + (1, 'Alice', 'gold', '2024-01-01 00:00:00'), + (2, 'Bob', 'silver', '2024-01-02 00:00:00'), + (3, 'Charlie', 'bronze', '2024-01-03 00:00:00'); + +-- Test: TopK where sorting side uses Hash Join +-- Hash join produces customer info, then TopK sorts the joined results +-- This tests that TopK dynamic filter works on hash join output +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE (=Hash.*) =REDACTED +EXPLAIN SELECT "id", customer_id, "name", tier, amount +FROM ( + SELECT o."id", o.customer_id, c."name", c.tier, o.amount + FROM orders o + JOIN customers c ON o.customer_id = c.customer_id + WHERE c.tier IN ('gold', 'silver') +) +ORDER BY amount DESC +LIMIT 4; + +-- SQLNESS REPLACE ("metrics_per_partition":\s*.*metrics=) "metrics_per_partition": REDACTED metrics= +-- SQLNESS REPLACE (metrics=\{.*\}) metrics=REDACTED +-- SQLNESS REPLACE (metrics=\[[^\]]*\]) metrics=REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (=Hash.*) =REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE "(file_id|time_range_start|time_range_end)":"[^"]+" "$1":"REDACTED" +-- SQLNESS REPLACE ("[a-z_]+":"[0-9\.]+(ns|us|µs|ms|s)") "DURATION": REDACTED +-- SQLNESS REPLACE "(size|flat_format)":\s*(\d+|true|false) "$1":REDACTED +-- SQLNESS REPLACE ,\s*filter=.*?metrics= metrics= +-- SQLNESS REPLACE Total\s+rows:\s+\d+ Total rows: REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED +EXPLAIN ANALYZE VERBOSE SELECT "id", customer_id, "name", tier, amount +FROM ( + SELECT o."id", o.customer_id, c."name", c.tier, o.amount + FROM orders o + JOIN customers c ON o.customer_id = c.customer_id + WHERE c.tier IN ('gold', 'silver') +) +ORDER BY amount DESC +LIMIT 4; + +-- Verify correctness +SELECT "id", customer_id, "name", tier, amount +FROM ( + SELECT o."id", o.customer_id, c."name", c.tier, o.amount + FROM orders o + JOIN customers c ON o.customer_id = c.customer_id + WHERE c.tier IN ('gold', 'silver') +) +ORDER BY amount DESC +LIMIT 4; + +DROP TABLE orders; +DROP TABLE customers; + +-- Test 3.3: Multiple Dynamic Filters +CREATE TABLE orders ("id" INT, customer_id INT, product_id INT, amount DOUBLE, ts TIMESTAMP TIME INDEX); +CREATE TABLE customers (customer_id INT, "name" STRING, tier STRING, ts TIMESTAMP TIME INDEX); +CREATE TABLE products (product_id INT, "name" STRING, "category" STRING, ts TIMESTAMP TIME INDEX); + +-- Insert test data +INSERT INTO orders VALUES + (1, 1, 101, 100.0, '2024-01-01 00:00:00'), + (2, 2, 102, 200.0, '2024-01-02 00:00:00'), + (3, 3, 103, 300.0, '2024-01-03 00:00:00'), + (4, 1, 101, 150.0, '2024-01-04 00:00:00'), + (5, 2, 102, 250.0, '2024-01-05 00:00:00'), + (6, 1, 103, 175.0, '2024-01-06 00:00:00'), + (7, 3, 101, 325.0, '2024-01-07 00:00:00'), + (8, 2, 102, 225.0, '2024-01-08 00:00:00'), + (9, 1, 103, 400.0, '2024-01-09 00:00:00'), + (10, 3, 101, 500.0, '2024-01-10 00:00:00'); + +INSERT INTO customers VALUES + (1, 'Alice', 'gold', '2024-01-01 00:00:00'), + (2, 'Bob', 'silver', '2024-01-02 00:00:00'), + (3, 'Charlie', 'bronze', '2024-01-03 00:00:00'); + +INSERT INTO products VALUES + (101, 'Laptop', 'electronics', '2024-01-01 00:00:00'), + (102, 'Mouse', 'electronics', '2024-01-02 00:00:00'), + (103, 'Keyboard', 'electronics', '2024-01-03 00:00:00'); + +-- Test: Multiple hash joins producing dynamic filters +-- This creates a chain: orders JOIN customers JOIN products +-- Both hash joins can produce dynamic filters +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE (=Hash.*) =REDACTED +EXPLAIN SELECT o."id", o.amount, c."name", c.tier, p."name" as product_name, p."category" +FROM orders o +JOIN customers c ON o.customer_id = c.customer_id +JOIN products p ON o.product_id = p.product_id +WHERE c.tier IN ('gold', 'silver') + AND p."category" = 'electronics'; + +-- SQLNESS REPLACE ("metrics_per_partition":\s*.*metrics=) "metrics_per_partition": REDACTED metrics= +-- SQLNESS REPLACE (metrics=\{.*\}) metrics=REDACTED +-- SQLNESS REPLACE (metrics=\[[^\]]*\]) metrics=REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (=Hash.*) =REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE "(file_id|time_range_start|time_range_end)":"[^"]+" "$1":"REDACTED" +-- SQLNESS REPLACE ("[a-z_]+":"[0-9\.]+(ns|us|µs|ms|s)") "DURATION": REDACTED +-- SQLNESS REPLACE "(size|flat_format)":\s*(\d+|true|false) "$1":REDACTED +-- SQLNESS REPLACE ,\s*filter=.*?metrics= metrics= +-- SQLNESS REPLACE Total\s+rows:\s+\d+ Total rows: REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED +EXPLAIN ANALYZE VERBOSE SELECT o."id", o.amount, c."name", c.tier, p."name" as product_name, p."category" +FROM orders o +JOIN customers c ON o.customer_id = c.customer_id +JOIN products p ON o.product_id = p.product_id +WHERE c.tier IN ('gold', 'silver') + AND p."category" = 'electronics'; + +-- Verify correctness +-- SQLNESS SORT_RESULT +SELECT o."id", o.amount, c."name", c.tier, p."name" as product_name, p."category" +FROM orders o +JOIN customers c ON o.customer_id = c.customer_id +JOIN products p ON o.product_id = p.product_id +WHERE c.tier IN ('gold', 'silver') + AND p."category" = 'electronics'; + +DROP TABLE orders; +DROP TABLE customers; +DROP TABLE products; diff --git a/tests/cases/standalone/common/filter/topk_dyn_filter.result b/tests/cases/standalone/common/filter/topk_dyn_filter.result new file mode 100644 index 0000000000..ac905994ea --- /dev/null +++ b/tests/cases/standalone/common/filter/topk_dyn_filter.result @@ -0,0 +1,200 @@ +-- Dynamic Filter Test: TopK (Sort + Limit) +-- Tests dynamic filters generated by TopK operator +-- Test 2.1: Basic TopK +CREATE TABLE orders ("id" INT, customer_id INT, amount DOUBLE, ts TIMESTAMP TIME INDEX); + +Affected Rows: 0 + +-- Insert sufficient test data +INSERT INTO orders VALUES + (1, 1, 1100.0, '2024-01-01 00:00:00'), + (2, 2, 1000.0, '2024-01-02 00:00:00'), + (3, 3, 900.0, '2024-01-03 00:00:00'); + +Affected Rows: 3 + +admin flush_table('orders'); + ++-----------------------------+ +| ADMIN flush_table('orders') | ++-----------------------------+ +| 0 | ++-----------------------------+ + +INSERT INTO orders VALUES + (4, 1, 325.0, '2024-01-04 00:00:00'), + (5, 2, 300.0, '2024-01-05 00:00:00'), + (6, 1, 250.0, '2024-01-06 00:00:00'), + (7, 3, 225.0, '2024-01-07 00:00:00'); + +Affected Rows: 4 + +admin flush_table('orders'); + ++-----------------------------+ +| ADMIN flush_table('orders') | ++-----------------------------+ +| 0 | ++-----------------------------+ + +INSERT INTO orders VALUES + (8, 2, 200.0, '2024-01-08 00:00:00'), + (9, 1, 175.0, '2024-01-09 00:00:00'), + (10, 3, 150.0, '2024-01-10 00:00:00'), + (11, 2, 100.0, '2024-01-11 00:00:00'), + (12, 1, 50.0, '2024-01-12 00:00:00'); + +Affected Rows: 5 + +-- Test: Basic TopK query - get top 3 orders by amount +-- TopK (Sort + Limit) should generate dynamic filter on amount +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN SELECT "id", customer_id, amount +FROM orders +ORDER BY amount DESC +LIMIT 3; + ++---------------+--------------------------------------------------------------+ +| plan_type | plan | ++---------------+--------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false, remote_input=[ | +| | Limit: skip=0, fetch=3 | +| | Sort: orders.amount DESC NULLS FIRST | +| | Projection: orders.id, orders.customer_id, orders.amount | +| | TableScan: orders | +| | ]] | +| physical_plan | CooperativeExec | +| | MergeScanExec: REDACTED +| | | ++---------------+--------------------------------------------------------------+ + +-- SQLNESS REPLACE ("metrics_per_partition":\s*.*metrics=) "metrics_per_partition": REDACTED metrics= +-- SQLNESS REPLACE (metrics=\{.*\}) metrics=REDACTED +-- SQLNESS REPLACE (metrics=\[[^\]]*\]) metrics=REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (=Hash.*) =REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE "(file_id|time_range_start|time_range_end)":"[^"]+" "$1":"REDACTED" +-- SQLNESS REPLACE ("[a-z_]+":"[0-9\.]+(ns|us|µs|ms|s)") "DURATION": REDACTED +-- SQLNESS REPLACE "(size|flat_format)":\s*(\d+|true|false) "$1":REDACTED +-- SQLNESS REPLACE ,\s*filter=.*?metrics= metrics= +-- SQLNESS REPLACE Total\s+rows:\s+\d+ Total rows: REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED +-- SQLNESS REPLACE DynamicFilter\s\[[^\]]*\] DynamicFilter [ REDACTED ] +-- SQLNESS REPLACE "files":\s\[[^\]]*\] "files": REDACTED +EXPLAIN ANALYZE VERBOSE SELECT "id", customer_id, amount +FROM orders +ORDER BY amount DESC +LIMIT 3; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_CooperativeExec metrics=REDACTED_| +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_SortPreservingMergeExec: [amount@2 DESC], fetch=3 metrics=REDACTED_| +|_|_|_SortExec: TopK(fetch=3), expr=[amount@2 DESC], preserve_partitioning=[true] metrics=REDACTED_| +|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["id", "customer_id", "amount"], "dyn_filters": ["DynamicFilter [ REDACTED ]"], "files": REDACTED, "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED | +|_|_|_| +|_|_| Total rows: REDACTED_| ++-+-+-+ + +-- Verify correctness +SELECT "id", customer_id, amount +FROM orders +ORDER BY amount DESC +LIMIT 3; + ++----+-------------+--------+ +| id | customer_id | amount | ++----+-------------+--------+ +| 1 | 1 | 1100.0 | +| 2 | 2 | 1000.0 | +| 3 | 3 | 900.0 | ++----+-------------+--------+ + +DROP TABLE orders; + +Affected Rows: 0 + +-- Test 2.2: TopK with Projection Alias +-- USER NOTE: must make alias for topk's sort column to be able to test dynamic filter pushdown +CREATE TABLE orders ("id" INT, customer_id INT, amount DOUBLE, ts TIMESTAMP TIME INDEX); + +Affected Rows: 0 + +-- Insert test data +INSERT INTO orders VALUES + (1, 1, 100.0, '2024-01-01 00:00:00'), + (2, 2, 200.0, '2024-01-02 00:00:00'), + (3, 3, 300.0, '2024-01-03 00:00:00'), + (4, 1, 150.0, '2024-01-04 00:00:00'), + (5, 2, 250.0, '2024-01-05 00:00:00'), + (6, 1, 175.0, '2024-01-06 00:00:00'), + (7, 3, 325.0, '2024-01-07 00:00:00'), + (8, 2, 225.0, '2024-01-08 00:00:00'); + +Affected Rows: 8 + +-- Test: TopK with projection alias +-- The alias on amount tests that dynamic filter correctly handles column references after projection +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN SELECT "id", customer_id, total_amount +FROM (SELECT "id", customer_id, amount as total_amount, ts FROM orders) +ORDER BY total_amount DESC +LIMIT 3; + ++---------------+-------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false, remote_input=[ | +| | Limit: skip=0, fetch=3 | +| | Sort: total_amount DESC NULLS FIRST | +| | Projection: orders.id, orders.customer_id, total_amount | +| | Projection: orders.id, orders.customer_id, orders.amount AS total_amount, orders.ts | +| | TableScan: orders | +| | ]] | +| physical_plan | CooperativeExec | +| | MergeScanExec: REDACTED +| | | ++---------------+-------------------------------------------------------------------------------------------+ + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (=Hash.*) =REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED +-- some order problem between standalone&distributed, comment out for now +-- EXPLAIN ANALYZE SELECT "id", customer_id, total_amount +-- FROM (SELECT "id", customer_id, amount as total_amount, ts FROM orders) +-- ORDER BY total_amount DESC +-- LIMIT 3; +-- Verify correctness +SELECT "id", customer_id, total_amount +FROM (SELECT "id", customer_id, amount as total_amount, ts FROM orders) +ORDER BY total_amount DESC +LIMIT 3; + ++-+-+-+ +| id | customer_id | total_amount | ++-+-+-+ +| 7_| 3_| 325.0_| +| 3_| 3_| 300.0_| +| 5_| 2_| 250.0_| ++-+-+-+ + +DROP TABLE orders; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/filter/topk_dyn_filter.sql b/tests/cases/standalone/common/filter/topk_dyn_filter.sql new file mode 100644 index 0000000000..ffce7d258e --- /dev/null +++ b/tests/cases/standalone/common/filter/topk_dyn_filter.sql @@ -0,0 +1,115 @@ +-- Dynamic Filter Test: TopK (Sort + Limit) +-- Tests dynamic filters generated by TopK operator + +-- Test 2.1: Basic TopK +CREATE TABLE orders ("id" INT, customer_id INT, amount DOUBLE, ts TIMESTAMP TIME INDEX); + +-- Insert sufficient test data +INSERT INTO orders VALUES + (1, 1, 1100.0, '2024-01-01 00:00:00'), + (2, 2, 1000.0, '2024-01-02 00:00:00'), + (3, 3, 900.0, '2024-01-03 00:00:00'); + +admin flush_table('orders'); + +INSERT INTO orders VALUES + (4, 1, 325.0, '2024-01-04 00:00:00'), + (5, 2, 300.0, '2024-01-05 00:00:00'), + (6, 1, 250.0, '2024-01-06 00:00:00'), + (7, 3, 225.0, '2024-01-07 00:00:00'); + +admin flush_table('orders'); + +INSERT INTO orders VALUES + (8, 2, 200.0, '2024-01-08 00:00:00'), + (9, 1, 175.0, '2024-01-09 00:00:00'), + (10, 3, 150.0, '2024-01-10 00:00:00'), + (11, 2, 100.0, '2024-01-11 00:00:00'), + (12, 1, 50.0, '2024-01-12 00:00:00'); + +-- Test: Basic TopK query - get top 3 orders by amount +-- TopK (Sort + Limit) should generate dynamic filter on amount +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN SELECT "id", customer_id, amount +FROM orders +ORDER BY amount DESC +LIMIT 3; + +-- SQLNESS REPLACE ("metrics_per_partition":\s*.*metrics=) "metrics_per_partition": REDACTED metrics= +-- SQLNESS REPLACE (metrics=\{.*\}) metrics=REDACTED +-- SQLNESS REPLACE (metrics=\[[^\]]*\]) metrics=REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (=Hash.*) =REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE "(file_id|time_range_start|time_range_end)":"[^"]+" "$1":"REDACTED" +-- SQLNESS REPLACE ("[a-z_]+":"[0-9\.]+(ns|us|µs|ms|s)") "DURATION": REDACTED +-- SQLNESS REPLACE "(size|flat_format)":\s*(\d+|true|false) "$1":REDACTED +-- SQLNESS REPLACE ,\s*filter=.*?metrics= metrics= +-- SQLNESS REPLACE Total\s+rows:\s+\d+ Total rows: REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED +-- SQLNESS REPLACE DynamicFilter\s\[[^\]]*\] DynamicFilter [ REDACTED ] +-- SQLNESS REPLACE "files":\s\[[^\]]*\] "files": REDACTED +EXPLAIN ANALYZE VERBOSE SELECT "id", customer_id, amount +FROM orders +ORDER BY amount DESC +LIMIT 3; + +-- Verify correctness +SELECT "id", customer_id, amount +FROM orders +ORDER BY amount DESC +LIMIT 3; + +DROP TABLE orders; + +-- Test 2.2: TopK with Projection Alias +-- USER NOTE: must make alias for topk's sort column to be able to test dynamic filter pushdown +CREATE TABLE orders ("id" INT, customer_id INT, amount DOUBLE, ts TIMESTAMP TIME INDEX); + +-- Insert test data +INSERT INTO orders VALUES + (1, 1, 100.0, '2024-01-01 00:00:00'), + (2, 2, 200.0, '2024-01-02 00:00:00'), + (3, 3, 300.0, '2024-01-03 00:00:00'), + (4, 1, 150.0, '2024-01-04 00:00:00'), + (5, 2, 250.0, '2024-01-05 00:00:00'), + (6, 1, 175.0, '2024-01-06 00:00:00'), + (7, 3, 325.0, '2024-01-07 00:00:00'), + (8, 2, 225.0, '2024-01-08 00:00:00'); + +-- Test: TopK with projection alias +-- The alias on amount tests that dynamic filter correctly handles column references after projection +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +EXPLAIN SELECT "id", customer_id, total_amount +FROM (SELECT "id", customer_id, amount as total_amount, ts FROM orders) +ORDER BY total_amount DESC +LIMIT 3; + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (=Hash.*) =REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED +-- some order problem between standalone&distributed, comment out for now +-- EXPLAIN ANALYZE SELECT "id", customer_id, total_amount +-- FROM (SELECT "id", customer_id, amount as total_amount, ts FROM orders) +-- ORDER BY total_amount DESC +-- LIMIT 3; + +-- Verify correctness +SELECT "id", customer_id, total_amount +FROM (SELECT "id", customer_id, amount as total_amount, ts FROM orders) +ORDER BY total_amount DESC +LIMIT 3; + +DROP TABLE orders; diff --git a/tests/cases/standalone/common/function/vector/vector_index_explain.result b/tests/cases/standalone/common/function/vector/vector_index_explain.result index 7a53041bf6..246a49f405 100644 --- a/tests/cases/standalone/common/function/vector/vector_index_explain.result +++ b/tests/cases/standalone/common/function/vector/vector_index_explain.result @@ -56,7 +56,7 @@ LIMIT 2; | 1_| 0_|_ProjectionExec: expr=[vec_id@0 as vec_id] metrics=REDACTED_| |_|_|_SortPreservingMergeExec: [vec_l2sq_distance(embedding@1, [1.0, 0.0]) ASC NULLS LAST, vec_id@0 ASC NULLS LAST], fetch=2 metrics=REDACTED_| |_|_|_SortExec: TopK(fetch=2), expr=[vec_l2sq_distance(embedding@1, [1.0, 0.0]) ASC NULLS LAST, vec_id@0 ASC NULLS LAST], preserve_partitioning=[true] metrics=REDACTED_| -|_|_|_SeqScan: region=REDACTED, {"partition_count":{"count":1, "mem_ranges":0, "files":1, "file_ranges":1}, "projection": ["vec_id", "embedding"], "vector_index_k": 2, "files": [{"file_id":"REDACTED","time_range_start":"REDACTED","time_range_end":"REDACTED","rows":4,"size":REDACTED,"index_size":893}], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED | +|_|_|_SeqScan: region=REDACTED, {"partition_count":{"count":1, "mem_ranges":0, "files":1, "file_ranges":1}, "projection": ["vec_id", "embedding"], "dyn_filters": ["DynamicFilter [ vec_l2sq_distance(embedding@1, [1.0, 0.0]) < 0.010000004 OR vec_l2sq_distance(embedding@1, [1.0, 0.0]) = 0.010000004 AND vec_id@0 < 2 ]"], "vector_index_k": 2, "files": [{"file_id":"REDACTED","time_range_start":"REDACTED","time_range_end":"REDACTED","rows":4,"size":REDACTED,"index_size":893}], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED | |_|_|_| |_|_| Total rows: REDACTED_| +-+-+-+ @@ -217,7 +217,7 @@ LIMIT 2; | 1_| 0_|_ProjectionExec: expr=[vec_id@0 as vec_id] metrics=REDACTED_| |_|_|_SortPreservingMergeExec: [vec_cos_distance(embedding@1, [1.0, 0.0]) ASC NULLS LAST, vec_id@0 ASC NULLS LAST], fetch=2 metrics=REDACTED_| |_|_|_SortExec: TopK(fetch=2), expr=[vec_cos_distance(embedding@1, [1.0, 0.0]) ASC NULLS LAST, vec_id@0 ASC NULLS LAST], preserve_partitioning=[true] metrics=REDACTED_| -|_|_|_SeqScan: region=REDACTED, {"partition_count":{"count":1, "mem_ranges":0, "files":1, "file_ranges":1}, "projection": ["vec_id", "embedding"], "vector_index_k": 2, "files": [{"file_id":"REDACTED","time_range_start":"REDACTED","time_range_end":"REDACTED","rows":4,"size":REDACTED,"index_size":895}], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED | +|_|_|_SeqScan: region=REDACTED, {"partition_count":{"count":1, "mem_ranges":0, "files":1, "file_ranges":1}, "projection": ["vec_id", "embedding"], "dyn_filters": ["DynamicFilter [ vec_cos_distance(embedding@1, [1.0, 0.0]) < 1 OR vec_cos_distance(embedding@1, [1.0, 0.0]) = 1 AND vec_id@0 < 4 ]"], "vector_index_k": 2, "files": [{"file_id":"REDACTED","time_range_start":"REDACTED","time_range_end":"REDACTED","rows":4,"size":REDACTED,"index_size":895}], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED | |_|_|_| |_|_| Total rows: REDACTED_| +-+-+-+ @@ -251,7 +251,7 @@ LIMIT 2; | 1_| 0_|_ProjectionExec: expr=[vec_id@0 as vec_id] metrics=REDACTED_| |_|_|_SortPreservingMergeExec: [vec_dot_product(embedding@1, [1.0, 0.0]) DESC, vec_id@0 ASC NULLS LAST], fetch=2 metrics=REDACTED_| |_|_|_SortExec: TopK(fetch=2), expr=[vec_dot_product(embedding@1, [1.0, 0.0]) DESC, vec_id@0 ASC NULLS LAST], preserve_partitioning=[true] metrics=REDACTED_| -|_|_|_SeqScan: region=REDACTED, {"partition_count":{"count":1, "mem_ranges":0, "files":1, "file_ranges":1}, "projection": ["vec_id", "embedding"], "vector_index_k": 2, "files": [{"file_id":"REDACTED","time_range_start":"REDACTED","time_range_end":"REDACTED","rows":4,"size":REDACTED,"index_size":895}], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED | +|_|_|_SeqScan: region=REDACTED, {"partition_count":{"count":1, "mem_ranges":0, "files":1, "file_ranges":1}, "projection": ["vec_id", "embedding"], "dyn_filters": ["DynamicFilter [ vec_dot_product(embedding@1, [1.0, 0.0]) IS NULL OR vec_dot_product(embedding@1, [1.0, 0.0]) > 0 OR vec_dot_product(embedding@1, [1.0, 0.0]) = 0 AND vec_id@0 < 2 ]"], "vector_index_k": 2, "files": [{"file_id":"REDACTED","time_range_start":"REDACTED","time_range_end":"REDACTED","rows":4,"size":REDACTED,"index_size":895}], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED | |_|_|_| |_|_| Total rows: REDACTED_| +-+-+-+