feat: use dyn filter (#7545)

* parent b2074e3863
author discord9 <discord9@163.com> 1767869295 +0800
committer discord9 <discord9@163.com> 1772529023 +0800

feat: use dyn filter

Signed-off-by: discord9 <discord9@163.com>

not supported

Signed-off-by: discord9 <discord9@163.com>

refactor: use make_mut instead

Signed-off-by: discord9 <discord9@163.com>

refactor: rm need to clone stream ctx

Signed-off-by: discord9 <discord9@163.com>

r

Signed-off-by: discord9 <discord9@163.com>

pcr

Signed-off-by: discord9 <discord9@163.com>

test: wait for datafusion update

Signed-off-by: discord9 <discord9@163.com>

refactor: use arc swap for dyn filters

Signed-off-by: discord9 <discord9@163.com>

* test: update sqlness

Signed-off-by: discord9 <discord9@163.com>

* chore: comment out sqlness

Signed-off-by: discord9 <discord9@163.com>

* test: update sqlness

Signed-off-by: discord9 <discord9@163.com>

* test: sqlness fix

Signed-off-by: discord9 <discord9@163.com>

* refactor: predicate without option

Signed-off-by: discord9 <discord9@163.com>

* feat: print dyn filters& more tests

Signed-off-by: discord9 <discord9@163.com>

* test: sqlness vector result update

Signed-off-by: discord9 <discord9@163.com>

* chore: log

Signed-off-by: discord9 <discord9@163.com>

* test: properly redact

Signed-off-by: discord9 <discord9@163.com>

* test: better data dist for non empty dyn filter

Signed-off-by: discord9 <discord9@163.com>

* test: properly redacted

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

* properly redact

Signed-off-by: discord9 <discord9@163.com>

* docs:  explain why not  do it

Signed-off-by: discord9 <discord9@163.com>

* chore: rename update to add as its more proper

Signed-off-by: discord9 <discord9@163.com>

* chore: rm no need clone

Signed-off-by: discord9 <discord9@163.com>

* docs: per review

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-03-11 11:06:24 +08:00
committed by GitHub
parent 9e95214fc8
commit 922f9cb3d6
23 changed files with 1875 additions and 49 deletions

1
Cargo.lock generated
View File

@@ -13194,6 +13194,7 @@ name = "table"
version = "1.0.0-rc.1"
dependencies = [
"api",
"arc-swap",
"async-trait",
"chrono",
"common-base",

View File

@@ -84,7 +84,7 @@ impl BulkIterContext {
let dyn_filters = predicate
.as_ref()
.map(|pred| pred.dyn_filters().clone())
.map(|pred| pred.dyn_filters().as_ref().clone())
.unwrap_or_default();
Ok(Self {

View File

@@ -53,7 +53,7 @@ pub struct RowGroupIndex {
/// Meta data of a partition range.
/// If the scanner is [UnorderedScan], each meta only has one row group or memtable.
/// If the scanner is [SeqScan], each meta may have multiple row groups and memtables.
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub(crate) struct RangeMeta {
/// The time range of the range.
pub(crate) time_range: FileTimeRange,
@@ -450,6 +450,7 @@ impl FileRangeBuilder {
}
/// Builder to create mem ranges.
#[derive(Clone)]
pub(crate) struct MemRangeBuilder {
/// Ranges of a memtable.
range: MemtableRange,

View File

@@ -27,6 +27,7 @@ use common_recordbatch::filter::SimpleFilterEvaluator;
use common_telemetry::tracing::Instrument;
use common_telemetry::{debug, error, tracing, warn};
use common_time::range::TimestampRange;
use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
use datafusion_common::Column;
use datafusion_expr::Expr;
use datafusion_expr::utils::expr_to_columns;
@@ -1575,11 +1576,20 @@ impl StreamContext {
.collect();
write!(f, ", \"projection\": {:?}", names)?;
}
if let Some(predicate) = &self.input.predicate.predicate()
&& !predicate.exprs().is_empty()
{
let exprs: Vec<_> = predicate.exprs().iter().map(|e| e.to_string()).collect();
write!(f, ", \"filters\": {:?}", exprs)?;
if let Some(predicate) = &self.input.predicate.predicate() {
if !predicate.exprs().is_empty() {
let exprs: Vec<_> =
predicate.exprs().iter().map(|e| e.to_string()).collect();
write!(f, ", \"filters\": {:?}", exprs)?;
}
if !predicate.dyn_filters().is_empty() {
let dyn_filters: Vec<_> = predicate
.dyn_filters()
.iter()
.map(|f| format!("{}", f))
.collect();
write!(f, ", \"dyn_filters\": {:?}", dyn_filters)?;
}
}
#[cfg(feature = "vector_index")]
if let Some(vector_index_k) = self.input.vector_index_k {
@@ -1602,6 +1612,31 @@ impl StreamContext {
write!(f, "{:?}", InputWrapper { input: &self.input })
}
/// Add new dynamic filters to the predicates.
/// Safe after stream creation; in-flight reads may still observe an older snapshot.
pub(crate) fn add_dyn_filter_to_predicate(
self: &Arc<Self>,
filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
) -> Vec<bool> {
let mut supported = Vec::with_capacity(filter_exprs.len());
let filter_expr = filter_exprs
.into_iter()
.filter_map(|expr| {
if let Ok(dyn_filter) = (expr as Arc<dyn std::any::Any + Send + Sync + 'static>)
.downcast::<datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr>()
{
supported.push(true);
Some(dyn_filter)
} else {
supported.push(false);
None
}
})
.collect();
self.input.predicate.add_dyn_filters(filter_expr);
supported
}
}
/// Predicates to evaluate.
@@ -1610,9 +1645,9 @@ impl StreamContext {
pub struct PredicateGroup {
time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
/// Predicate that includes request filters and region partition expr (if any).
predicate_all: Option<Predicate>,
predicate_all: Predicate,
/// Predicate that only includes request filters.
predicate_without_region: Option<Predicate>,
predicate_without_region: Predicate,
/// Region partition expression restored from metadata.
region_partition_expr: Option<PartitionExpr>,
}
@@ -1654,16 +1689,8 @@ impl PredicateGroup {
Some(Arc::new(time_filters))
};
let predicate_all = if combined_exprs.is_empty() {
None
} else {
Some(Predicate::new(combined_exprs))
};
let predicate_without_region = if exprs.is_empty() {
None
} else {
Some(Predicate::new(exprs.to_vec()))
};
let predicate_all = Predicate::new(combined_exprs);
let predicate_without_region = Predicate::new(exprs.to_vec());
Ok(Self {
time_filters,
@@ -1680,12 +1707,26 @@ impl PredicateGroup {
/// Returns predicate of all exprs (including region partition expr if present).
pub(crate) fn predicate(&self) -> Option<&Predicate> {
self.predicate_all.as_ref()
if self.predicate_all.is_empty() {
None
} else {
Some(&self.predicate_all)
}
}
/// Returns predicate that excludes region partition expr.
pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
self.predicate_without_region.as_ref()
if self.predicate_without_region.is_empty() {
None
} else {
Some(&self.predicate_without_region)
}
}
/// Add dynamic filters in the predicates.
pub(crate) fn add_dyn_filters(&self, dyn_filters: Vec<Arc<DynamicFilterPhysicalExpr>>) {
self.predicate_all.add_dyn_filters(dyn_filters.clone());
self.predicate_without_region.add_dyn_filters(dyn_filters);
}
/// Returns the region partition expr from metadata, if any.
@@ -1720,6 +1761,7 @@ impl PredicateGroup {
mod tests {
use std::sync::Arc;
use datafusion::physical_plan::expressions::lit as physical_lit;
use datafusion_expr::{col, lit};
use store_api::storage::ScanRequest;
@@ -1880,4 +1922,23 @@ mod tests {
);
assert!(scan_region.use_flat_format());
}
#[test]
fn test_update_dyn_filters_with_empty_base_predicates() {
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
let predicate_group = PredicateGroup::new(metadata.as_ref(), &[]).unwrap();
assert!(predicate_group.predicate().is_none());
assert!(predicate_group.predicate_without_region().is_none());
let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], physical_lit(false)));
predicate_group.add_dyn_filters(vec![dyn_filter]);
let predicate_all = predicate_group.predicate().unwrap();
assert!(predicate_all.exprs().is_empty());
assert_eq!(1, predicate_all.dyn_filters().len());
let predicate_without_region = predicate_group.predicate_without_region().unwrap();
assert!(predicate_without_region.exprs().is_empty());
assert_eq!(1, predicate_without_region.dyn_filters().len());
}
}

View File

@@ -715,7 +715,14 @@ impl RegionScanner for SeqScan {
.input
.predicate_group()
.predicate_without_region();
predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
predicate.is_some()
}
fn add_dyn_filter_to_predicate(
&mut self,
filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
) -> Vec<bool> {
self.stream_ctx.add_dyn_filter_to_predicate(filter_exprs)
}
fn set_logical_region(&mut self, logical_region: bool) {

View File

@@ -23,6 +23,7 @@ use common_error::ext::BoxedError;
use common_recordbatch::util::ChainedRecordBatchStream;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_telemetry::tracing::{self, Instrument};
use common_telemetry::warn;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::arrow::array::BinaryArray;
@@ -360,7 +361,14 @@ impl RegionScanner for SeriesScan {
.input
.predicate_group()
.predicate_without_region();
predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
predicate.is_some()
}
fn add_dyn_filter_to_predicate(
&mut self,
filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
) -> Vec<bool> {
self.stream_ctx.add_dyn_filter_to_predicate(filter_exprs)
}
fn set_logical_region(&mut self, logical_region: bool) {

View File

@@ -21,7 +21,7 @@ use std::time::Instant;
use async_stream::{stream, try_stream};
use common_error::ext::BoxedError;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_telemetry::tracing;
use common_telemetry::{tracing, warn};
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::arrow::record_batch::RecordBatch;
@@ -492,8 +492,14 @@ impl RegionScanner for UnorderedScan {
.input
.predicate_group()
.predicate_without_region();
predicate.is_some()
}
predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
fn add_dyn_filter_to_predicate(
&mut self,
filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
) -> Vec<bool> {
self.stream_ctx.add_dyn_filter_to_predicate(filter_exprs)
}
fn set_logical_region(&mut self, logical_region: bool) {

View File

@@ -148,7 +148,7 @@ impl FileRange {
);
// not costly to create a predicate here since dynamic filters are wrapped in Arc
let pred = Predicate::new(vec![]).with_dyn_filters(self.context.base.dyn_filters.clone());
let pred = Predicate::with_dyn_filters(vec![], self.context.base.dyn_filters.clone());
pred.prune_with_stats(&stats, prune_schema.arrow_schema())
.first()
@@ -440,7 +440,7 @@ pub(crate) struct RangeBase {
/// Filters pushed down.
pub(crate) filters: Vec<SimpleFilterContext>,
/// Dynamic filter physical exprs.
pub(crate) dyn_filters: Arc<Vec<DynamicFilterPhysicalExpr>>,
pub(crate) dyn_filters: Vec<Arc<DynamicFilterPhysicalExpr>>,
/// Helper to read the SST.
pub(crate) read_format: ReadFormat,
pub(crate) expected_metadata: Option<RegionMetadataRef>,

View File

@@ -486,9 +486,9 @@ impl ParquetReaderBuilder {
};
let dyn_filters = if let Some(predicate) = &self.predicate {
predicate.dyn_filters().clone()
predicate.dyn_filters().as_ref().clone()
} else {
Arc::new(vec![])
vec![]
};
let codec = build_primary_key_codec(read_format.metadata());

View File

@@ -658,24 +658,125 @@ impl QueryExecutor for DatafusionQueryEngine {
#[cfg(test)]
mod tests {
use std::fmt;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use api::v1::SemanticType;
use arrow::array::{ArrayRef, UInt64Array};
use arrow_schema::SortOptions;
use catalog::RegisterTableRequest;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, NUMBERS_TABLE_ID};
use common_recordbatch::util;
use common_error::ext::BoxedError;
use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream, util};
use datafusion::physical_plan::display::{DisplayAs, DisplayFormatType};
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::joins::{HashJoinExec, JoinOn, PartitionMode};
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::{ExecutionPlan, PhysicalExpr};
use datafusion::prelude::{col, lit};
use datafusion_common::{JoinType, NullEquality};
use datafusion_physical_expr::expressions::Column;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::schema::{ColumnSchema, SchemaRef};
use datatypes::vectors::{Helper, UInt32Vector, VectorRef};
use session::context::{QueryContext, QueryContextBuilder};
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
use store_api::region_engine::{
PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
};
use store_api::storage::{RegionId, ScanRequest};
use table::table::numbers::{NUMBERS_TABLE_NAME, NumbersTable};
use table::table::scan::RegionScanExec;
use super::*;
use crate::options::QueryOptions;
use crate::parser::QueryLanguageParser;
use crate::part_sort::PartSortExec;
use crate::query_engine::{QueryEngineFactory, QueryEngineRef};
#[derive(Debug)]
struct RecordingScanner {
schema: SchemaRef,
metadata: RegionMetadataRef,
properties: ScannerProperties,
update_calls: Arc<AtomicUsize>,
last_filter_len: Arc<AtomicUsize>,
}
impl RecordingScanner {
fn new(
schema: SchemaRef,
metadata: RegionMetadataRef,
update_calls: Arc<AtomicUsize>,
last_filter_len: Arc<AtomicUsize>,
) -> Self {
Self {
schema,
metadata,
properties: ScannerProperties::default(),
update_calls,
last_filter_len,
}
}
}
impl RegionScanner for RecordingScanner {
fn name(&self) -> &str {
"RecordingScanner"
}
fn properties(&self) -> &ScannerProperties {
&self.properties
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn metadata(&self) -> RegionMetadataRef {
self.metadata.clone()
}
fn prepare(&mut self, request: PrepareRequest) -> std::result::Result<(), BoxedError> {
self.properties.prepare(request);
Ok(())
}
fn scan_partition(
&self,
_ctx: &QueryScanContext,
_metrics_set: &ExecutionPlanMetricsSet,
_partition: usize,
) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone())))
}
fn has_predicate_without_region(&self) -> bool {
true
}
fn add_dyn_filter_to_predicate(
&mut self,
filter_exprs: Vec<Arc<dyn PhysicalExpr>>,
) -> Vec<bool> {
self.update_calls.fetch_add(1, Ordering::Relaxed);
self.last_filter_len
.store(filter_exprs.len(), Ordering::Relaxed);
vec![true; filter_exprs.len()]
}
fn set_logical_region(&mut self, logical_region: bool) {
self.properties.set_logical_region(logical_region);
}
}
impl DisplayAs for RecordingScanner {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "RecordingScanner")
}
}
async fn create_test_engine() -> QueryEngineRef {
let catalog_manager = catalog::memory::new_memory_catalog_manager().unwrap();
let req = RegisterTableRequest {
@@ -827,4 +928,179 @@ mod tests {
format!("{}", logical_plan.display_indent())
);
}
#[tokio::test]
async fn test_topk_dynamic_filter_pushdown_reaches_region_scan() {
let engine = create_test_engine().await;
let engine = engine
.as_any()
.downcast_ref::<DatafusionQueryEngine>()
.unwrap();
let engine_ctx = engine.engine_context(QueryContext::arc());
let state = engine_ctx.state();
let schema = Arc::new(datatypes::schema::Schema::new(vec![ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)]));
let mut metadata_builder = RegionMetadataBuilder::new(RegionId::new(1024, 1));
metadata_builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
semantic_type: SemanticType::Timestamp,
column_id: 1,
})
.primary_key(vec![]);
let metadata = Arc::new(metadata_builder.build().unwrap());
let update_calls = Arc::new(AtomicUsize::new(0));
let last_filter_len = Arc::new(AtomicUsize::new(0));
let scanner = Box::new(RecordingScanner::new(
schema,
metadata,
update_calls.clone(),
last_filter_len.clone(),
));
let scan = Arc::new(RegionScanExec::new(scanner, ScanRequest::default(), None).unwrap());
let sort_expr = PhysicalSortExpr {
expr: Arc::new(Column::new("ts", 0)),
options: SortOptions {
descending: true,
..Default::default()
},
};
let partition_ranges: Vec<Vec<PartitionRange>> = vec![vec![]];
let mut plan: Arc<dyn ExecutionPlan> =
Arc::new(PartSortExec::try_new(sort_expr, Some(3), partition_ranges, scan).unwrap());
for optimizer in state.physical_optimizers() {
plan = optimizer.optimize(plan, state.config_options()).unwrap();
}
assert!(update_calls.load(Ordering::Relaxed) > 0);
assert!(last_filter_len.load(Ordering::Relaxed) > 0);
}
#[tokio::test]
async fn test_join_dynamic_filter_pushdown_reaches_region_scan() {
let engine = create_test_engine().await;
let engine = engine
.as_any()
.downcast_ref::<DatafusionQueryEngine>()
.unwrap();
let engine_ctx = engine.engine_context(QueryContext::arc());
let state = engine_ctx.state();
assert!(
state
.config_options()
.optimizer
.enable_join_dynamic_filter_pushdown
);
let schema = Arc::new(datatypes::schema::Schema::new(vec![ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)]));
let mut left_metadata_builder = RegionMetadataBuilder::new(RegionId::new(2048, 1));
left_metadata_builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
semantic_type: SemanticType::Timestamp,
column_id: 1,
})
.primary_key(vec![]);
let left_metadata = Arc::new(left_metadata_builder.build().unwrap());
let mut right_metadata_builder = RegionMetadataBuilder::new(RegionId::new(2048, 2));
right_metadata_builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
semantic_type: SemanticType::Timestamp,
column_id: 1,
})
.primary_key(vec![]);
let right_metadata = Arc::new(right_metadata_builder.build().unwrap());
let left_update_calls = Arc::new(AtomicUsize::new(0));
let left_last_filter_len = Arc::new(AtomicUsize::new(0));
let right_update_calls = Arc::new(AtomicUsize::new(0));
let right_last_filter_len = Arc::new(AtomicUsize::new(0));
let left_scan = Arc::new(
RegionScanExec::new(
Box::new(RecordingScanner::new(
schema.clone(),
left_metadata,
left_update_calls.clone(),
left_last_filter_len.clone(),
)),
ScanRequest::default(),
None,
)
.unwrap(),
);
let right_scan = Arc::new(
RegionScanExec::new(
Box::new(RecordingScanner::new(
schema,
right_metadata,
right_update_calls.clone(),
right_last_filter_len.clone(),
)),
ScanRequest::default(),
None,
)
.unwrap(),
);
let on: JoinOn = vec![(
Arc::new(Column::new("ts", 0)) as Arc<dyn PhysicalExpr>,
Arc::new(Column::new("ts", 0)) as Arc<dyn PhysicalExpr>,
)];
let mut plan: Arc<dyn ExecutionPlan> = Arc::new(
HashJoinExec::try_new(
left_scan,
right_scan,
on,
None,
&JoinType::Inner,
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNull,
false,
)
.unwrap(),
);
for optimizer in state.physical_optimizers() {
plan = optimizer.optimize(plan, state.config_options()).unwrap();
}
assert!(left_update_calls.load(Ordering::Relaxed) > 0);
assert_eq!(0, left_last_filter_len.load(Ordering::Relaxed));
assert!(right_update_calls.load(Ordering::Relaxed) > 0);
assert!(right_last_filter_len.load(Ordering::Relaxed) > 0);
}
}

View File

@@ -237,6 +237,7 @@ impl ExecutionPlan for PartSortExec {
} else {
internal_err!("No children found")?
};
// create a new dynamic filter when with_new_children, as the old filter is bound to the old input and cannot be reused
let new = Self::try_new(
self.expression.clone(),
self.limit,

View File

@@ -192,6 +192,10 @@ impl QueryEngineState {
physical_optimizer
.rules
.push(Arc::new(WindowedSortPhysicalRule));
// explicitly not do filter pushdown for windowed sort&part sort
// (notice that `PartSortExec` create another new dyn filter that need to be pushdown if want to use dyn filter optimization)
// benchmark shows it can cause performance regression due to useless filtering and extra shuffle.
// We can add a rule to do filter pushdown for windowed sort in the future if we find a way to avoid the performance regression.
physical_optimizer
.rules
.push(Arc::new(MatchesConstantTermOptimizer));

View File

@@ -26,7 +26,7 @@ use common_error::ext::BoxedError;
use common_recordbatch::{EmptyRecordBatchStream, MemoryPermit, SendableRecordBatchStream};
use common_time::Timestamp;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::{DisplayAs, DisplayFormatType};
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, PhysicalExpr};
use datatypes::schema::SchemaRef;
use futures::future::join_all;
use serde::{Deserialize, Serialize};
@@ -451,6 +451,15 @@ pub trait RegionScanner: Debug + DisplayAs + Send {
/// Check if there is any predicate exclude region partition exprs that may be executed in this scanner.
fn has_predicate_without_region(&self) -> bool;
/// Add the given dynamic filter expressions to the predicate of the scanner.
/// Returns a vector of booleans indicating which filter expressions were applied.
/// true indicates the filter expression was applied(will be use by scanner to prune by stat for row group),
/// false otherwise.
fn add_dyn_filter_to_predicate(
&mut self,
filter_exprs: Vec<Arc<dyn PhysicalExpr>>,
) -> Vec<bool>;
/// Sets whether the scanner is reading a logical region.
fn set_logical_region(&mut self, logical_region: bool);
}
@@ -996,6 +1005,13 @@ impl RegionScanner for SinglePartitionScanner {
false
}
fn add_dyn_filter_to_predicate(
&mut self,
filter_exprs: Vec<Arc<dyn datafusion_physical_plan::PhysicalExpr>>,
) -> Vec<bool> {
vec![false; filter_exprs.len()]
}
fn metadata(&self) -> RegionMetadataRef {
self.metadata.clone()
}

View File

@@ -12,6 +12,7 @@ workspace = true
[dependencies]
api.workspace = true
arc-swap = "1.6"
async-trait.workspace = true
chrono.workspace = true
common-base.workspace = true

View File

@@ -14,7 +14,8 @@
use std::sync::Arc;
use common_telemetry::{error, warn};
use arc_swap::ArcSwap;
use common_telemetry::{debug, warn};
use common_time::Timestamp;
use common_time::range::TimestampRange;
use common_time::timestamp::TimeUnit;
@@ -53,14 +54,14 @@ macro_rules! return_none_if_utf8 {
}
/// Reference-counted pointer to a list of logical exprs and a list of dynamic filter physical exprs.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub struct Predicate {
/// logical exprs
exprs: Arc<Vec<Expr>>,
/// dynamic filter physical exprs, only useful if dynamic filtering is enabled
///
/// They are usually from `TopK` or `Join` operators, and can dynamically filter data during query execution by using current runtime information to further reduce data scanning
dyn_filters: Arc<Vec<DynamicFilterPhysicalExpr>>,
dyn_filters: Arc<ArcSwap<Vec<Arc<DynamicFilterPhysicalExpr>>>>,
}
impl Predicate {
@@ -70,18 +71,33 @@ impl Predicate {
pub fn new(exprs: Vec<Expr>) -> Self {
Self {
exprs: Arc::new(exprs),
dyn_filters: Arc::new(vec![]),
dyn_filters: Arc::new(ArcSwap::new(Arc::new(vec![]))),
}
}
/// Sets the dynamic filter physical exprs.
pub fn with_dyn_filters(self, dyn_filters: Arc<Vec<DynamicFilterPhysicalExpr>>) -> Self {
pub fn with_dyn_filters(
exprs: Vec<Expr>,
dyn_filters: Vec<Arc<DynamicFilterPhysicalExpr>>,
) -> Self {
Self {
exprs: self.exprs,
dyn_filters,
exprs: Arc::new(exprs),
dyn_filters: Arc::new(ArcSwap::new(Arc::new(dyn_filters))),
}
}
pub fn is_empty(&self) -> bool {
self.exprs.is_empty() && self.dyn_filters.load().is_empty()
}
/// Adds dynamic filter physical exprs to the existing list.
pub fn add_dyn_filters(&self, dyn_filters: Vec<Arc<DynamicFilterPhysicalExpr>>) {
self.dyn_filters.rcu(|existing| {
let mut new_filters = existing.as_ref().clone();
new_filters.extend(dyn_filters.clone());
Arc::new(new_filters)
});
}
/// Returns the logical exprs.
pub fn exprs(&self) -> &[Expr] {
&self.exprs
@@ -89,14 +105,15 @@ impl Predicate {
/// Returns the dynamic filter physical exprs. Notice this return a live dynamic filters which
/// can change during query execution.
pub fn dyn_filters(&self) -> &Arc<Vec<DynamicFilterPhysicalExpr>> {
&self.dyn_filters
pub fn dyn_filters(&self) -> Arc<Vec<Arc<DynamicFilterPhysicalExpr>>> {
self.dyn_filters.load_full()
}
/// Returns the dynamic filter as physical exprs. Notice this return a "snapshot" of
/// dynamic filters at the time of calling this method.
pub fn dyn_filter_phy_exprs(&self) -> error::Result<Vec<Arc<dyn PhysicalExpr>>> {
self.dyn_filters
.load()
.iter()
.map(|e| e.current())
.collect::<Result<Vec<_>, _>>()
@@ -157,7 +174,8 @@ impl Predicate {
}
},
Err(e) => {
error!(e; "Failed to create predicate for expr");
// since dynamic filter exprs could be complex, it's possible that `PruningPredicate::try_new` fails to prove anything from it. In that case, we just log it and skip pruning with this expr.
debug!("Failed to create pruning predicate for expr: {e:?}");
}
}
}

View File

@@ -30,7 +30,7 @@ use datafusion::error::Result as DfResult;
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::filter_pushdown::{
ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation,
ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown,
};
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
@@ -53,6 +53,7 @@ use store_api::storage::{ScanRequest, TimeSeriesDistribution};
use crate::table::metrics::StreamMetrics;
/// A plan to read multiple partitions from a region of a table.
#[derive(Clone)]
pub struct RegionScanExec {
scanner: Arc<Mutex<RegionScannerRef>>,
arrow_schema: ArrowSchemaRef,
@@ -444,8 +445,27 @@ impl ExecutionPlan for RegionScanExec {
child_pushdown_result: ChildPushdownResult,
_config: &datafusion::config::ConfigOptions,
) -> DfResult<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
// TODO(discord9): use the pushdown result to update the scanner's predicate
Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
let parent_filters = child_pushdown_result
.parent_filters
.into_iter()
.map(|f| f.filter)
.collect::<Vec<_>>();
let supported = self
.scanner
.lock()
.unwrap()
.add_dyn_filter_to_predicate(parent_filters);
// datafusion api require to clone self after mutate, even though we are only mutate inside mutex
let new_self = Arc::new(self.clone());
Ok(FilterPushdownPropagation {
filters: supported
.into_iter()
.map(|s| if s { PushedDown::Yes } else { PushedDown::No })
.collect(),
updated_node: Some(new_self),
})
}
}

View File

@@ -0,0 +1,253 @@
-- Dynamic Filter Test: Hash Join
-- Tests dynamic filters generated by hash join
-- Test 1.1: Basic Hash Join
CREATE TABLE orders ("id" INT, customer_id INT, amount DOUBLE, ts TIMESTAMP TIME INDEX);
Affected Rows: 0
CREATE TABLE customers (customer_id INT, "name" STRING, tier STRING, ts TIMESTAMP TIME INDEX);
Affected Rows: 0
-- Insert sufficient test data
INSERT INTO orders VALUES
(1, 1, 100.0, '2024-01-01 00:00:00'),
(2, 2, 200.0, '2024-01-02 00:00:00'),
(3, 1, 150.0, '2024-01-03 00:00:00');
Affected Rows: 3
-- Insert more rows to trigger dynamic filter
INSERT INTO customers VALUES
(1, 'Alice', 'gold', '2024-01-01 00:00:00'),
(2, 'Bob', 'silver', '2024-01-02 00:00:00');
Affected Rows: 2
-- Test: Basic hash join should produce dynamic filter
-- The hash join will build hash table from customers (small table)
-- and probe with orders (larger table), potentially using dynamic filter
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN SELECT o."id", o.amount, c."name", c.tier
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
WHERE c.tier = 'gold';
+---------------+-----------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: o.id, o.amount, c.name, c.tier |
| | Inner Join: o.customer_id = c.customer_id |
| | Projection: o.id, o.customer_id, o.amount |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | SubqueryAlias: o |
| | TableScan: orders |
| | ]] |
| | Filter: c.tier = Utf8("gold") |
| | Projection: c.customer_id, c.name, c.tier |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | SubqueryAlias: c |
| | TableScan: customers |
| | ]] |
| physical_plan | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(customer_id@1, customer_id@0)], projection=[id@0, amount@2, name@4, tier@5] |
| | RepartitionExec: REDACTED
| | ProjectionExec: expr=[id@0 as id, customer_id@1 as customer_id, amount@2 as amount] |
| | MergeScanExec: REDACTED
| | RepartitionExec: REDACTED
| | FilterExec: tier@2 = gold |
| | ProjectionExec: expr=[customer_id@0 as customer_id, name@1 as name, tier@2 as tier] |
| | MergeScanExec: REDACTED
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------+
-- SQLNESS REPLACE ("metrics_per_partition":\s*.*metrics=) "metrics_per_partition": REDACTED metrics=
-- SQLNESS REPLACE (metrics=\{.*\}) metrics=REDACTED
-- SQLNESS REPLACE (metrics=\[[^\]]*\]) metrics=REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (=Hash.*) =REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE "(file_id|time_range_start|time_range_end)":"[^"]+" "$1":"REDACTED"
-- SQLNESS REPLACE ("[a-z_]+":"[0-9\.]+(ns|us|µs|ms|s)") "DURATION": REDACTED
-- SQLNESS REPLACE "(size|flat_format)":\s*(\d+|true|false) "$1":REDACTED
-- SQLNESS REPLACE ,\s*filter=.*?metrics= metrics=
-- SQLNESS REPLACE Total\s+rows:\s+\d+ Total rows: REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
EXPLAIN ANALYZE VERBOSE SELECT o."id", o.amount, c."name", c.tier
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
WHERE c.tier = 'gold';
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(customer_id@1, customer_id@0)], projection=[id@0, amount@2, name@4, tier@5] metrics=REDACTED_|
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_ProjectionExec: expr=[id@0 as id, customer_id@1 as customer_id, amount@2 as amount] metrics=REDACTED_|
|_|_|_MergeScanExec: REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_FilterExec: tier@2 = gold metrics=REDACTED_|
|_|_|_ProjectionExec: expr=[customer_id@0 as customer_id, name@1 as name, tier@2 as tier] metrics=REDACTED_|
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_CooperativeExec metrics=REDACTED_|
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["id", "customer_id", "amount", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED |
|_|_|_|
| 1_| 0_|_CooperativeExec metrics=REDACTED_|
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["customer_id", "name", "tier", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED_|
|_|_|_|
|_|_| Total rows: REDACTED_|
+-+-+-+
-- Verify correctness
-- SQLNESS SORT_RESULT
SELECT o."id", o.amount, c."name", c.tier
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
WHERE c.tier = 'gold';
+----+--------+-------+------+
+----+--------+-------+------+
+----+--------+-------+------+
| 1 | 100.0 | Alice | gold |
| 3 | 150.0 | Alice | gold |
| id | amount | name | tier |
DROP TABLE orders;
Affected Rows: 0
DROP TABLE customers;
Affected Rows: 0
-- Test 1.2: Hash Join with Projection Alias
CREATE TABLE orders ("id" INT, customer_id INT, amount DOUBLE, ts TIMESTAMP TIME INDEX);
Affected Rows: 0
CREATE TABLE customers (customer_id INT, "name" STRING, tier STRING, ts TIMESTAMP TIME INDEX);
Affected Rows: 0
-- Insert test data
INSERT INTO orders VALUES
(1, 1, 100.0, '2024-01-01 00:00:00'),
(2, 2, 200.0, '2024-01-02 00:00:00'),
(3, 1, 150.0, '2024-01-03 00:00:00'),
(4, 3, 300.0, '2024-01-04 00:00:00');
Affected Rows: 4
INSERT INTO customers VALUES
(1, 'Alice', 'gold', '2024-01-01 00:00:00'),
(2, 'Bob', 'silver', '2024-01-02 00:00:00'),
(3, 'Charlie', 'bronze', '2024-01-03 00:00:00');
Affected Rows: 3
-- Test: Hash join with projection alias
-- The alias on customer_id tests that dynamic filter correctly handles column references
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN SELECT o."id", o.amount, c."name", c.tier
FROM (SELECT "id", customer_id as cid, amount, ts FROM orders) o
JOIN customers c ON o.cid = c.customer_id
WHERE c.tier IN ('gold', 'silver');
+---------------+---------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: o.id, o.amount, c.name, c.tier |
| | Inner Join: o.cid = c.customer_id |
| | Projection: o.id, o.cid, o.amount |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | SubqueryAlias: o |
| | Projection: orders.id, orders.customer_id AS cid, orders.amount, orders.ts |
| | TableScan: orders |
| | ]] |
| | Filter: c.tier = Utf8("gold") OR c.tier = Utf8("silver") |
| | Projection: c.customer_id, c.name, c.tier |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | SubqueryAlias: c |
| | TableScan: customers |
| | ]] |
| physical_plan | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(cid@1, customer_id@0)], projection=[id@0, amount@2, name@4, tier@5] |
| | RepartitionExec: REDACTED
| | ProjectionExec: expr=[id@0 as id, cid@1 as cid, amount@2 as amount] |
| | MergeScanExec: REDACTED
| | RepartitionExec: REDACTED
| | FilterExec: tier@2 = gold OR tier@2 = silver |
| | ProjectionExec: expr=[customer_id@0 as customer_id, name@1 as name, tier@2 as tier] |
| | MergeScanExec: REDACTED
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------+
-- SQLNESS REPLACE ("metrics_per_partition":\s*.*metrics=) "metrics_per_partition": REDACTED metrics=
-- SQLNESS REPLACE (metrics=\{.*\}) metrics=REDACTED
-- SQLNESS REPLACE (metrics=\[[^\]]*\]) metrics=REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (=Hash.*) =REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE "(file_id|time_range_start|time_range_end)":"[^"]+" "$1":"REDACTED"
-- SQLNESS REPLACE ("[a-z_]+":"[0-9\.]+(ns|us|µs|ms|s)") "DURATION": REDACTED
-- SQLNESS REPLACE "(size|flat_format)":\s*(\d+|true|false) "$1":REDACTED
-- SQLNESS REPLACE ,\s*filter=.*?metrics= metrics=
-- SQLNESS REPLACE Total\s+rows:\s+\d+ Total rows: REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
EXPLAIN ANALYZE VERBOSE SELECT o."id", o.amount, c."name", c.tier
FROM (SELECT "id", customer_id as cid, amount, ts FROM orders) o
JOIN customers c ON o.cid = c.customer_id
WHERE c.tier IN ('gold', 'silver');
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(cid@1, customer_id@0)], projection=[id@0, amount@2, name@4, tier@5] metrics=REDACTED_|
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_ProjectionExec: expr=[id@0 as id, cid@1 as cid, amount@2 as amount] metrics=REDACTED_|
|_|_|_MergeScanExec: REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_FilterExec: tier@2 = gold OR tier@2 = silver metrics=REDACTED_|
|_|_|_ProjectionExec: expr=[customer_id@0 as customer_id, name@1 as name, tier@2 as tier] metrics=REDACTED_|
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_ProjectionExec: expr=[id@0 as id, customer_id@1 as cid, amount@2 as amount, ts@3 as ts] metrics=REDACTED_|
|_|_|_CooperativeExec metrics=REDACTED_|
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["id", "customer_id", "amount", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED |
|_|_|_|
| 1_| 0_|_CooperativeExec metrics=REDACTED_|
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["customer_id", "name", "tier", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED_|
|_|_|_|
|_|_| Total rows: REDACTED_|
+-+-+-+
-- Verify correctness
-- SQLNESS SORT_RESULT
SELECT o."id", o.amount, c."name", c.tier
FROM (SELECT "id", customer_id as cid, amount, ts FROM orders) o
JOIN customers c ON o.cid = c.customer_id
WHERE c.tier IN ('gold', 'silver');
+----+--------+-------+--------+
+----+--------+-------+--------+
+----+--------+-------+--------+
| 1 | 100.0 | Alice | gold |
| 2 | 200.0 | Bob | silver |
| 3 | 150.0 | Alice | gold |
| id | amount | name | tier |
DROP TABLE orders;
Affected Rows: 0
DROP TABLE customers;
Affected Rows: 0

View File

@@ -0,0 +1,112 @@
-- Dynamic Filter Test: Hash Join
-- Tests dynamic filters generated by hash join
-- Test 1.1: Basic Hash Join
CREATE TABLE orders ("id" INT, customer_id INT, amount DOUBLE, ts TIMESTAMP TIME INDEX);
CREATE TABLE customers (customer_id INT, "name" STRING, tier STRING, ts TIMESTAMP TIME INDEX);
-- Insert sufficient test data
INSERT INTO orders VALUES
(1, 1, 100.0, '2024-01-01 00:00:00'),
(2, 2, 200.0, '2024-01-02 00:00:00'),
(3, 1, 150.0, '2024-01-03 00:00:00');
-- Insert more rows to trigger dynamic filter
INSERT INTO customers VALUES
(1, 'Alice', 'gold', '2024-01-01 00:00:00'),
(2, 'Bob', 'silver', '2024-01-02 00:00:00');
-- Test: Basic hash join should produce dynamic filter
-- The hash join will build hash table from customers (small table)
-- and probe with orders (larger table), potentially using dynamic filter
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN SELECT o."id", o.amount, c."name", c.tier
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
WHERE c.tier = 'gold';
-- SQLNESS REPLACE ("metrics_per_partition":\s*.*metrics=) "metrics_per_partition": REDACTED metrics=
-- SQLNESS REPLACE (metrics=\{.*\}) metrics=REDACTED
-- SQLNESS REPLACE (metrics=\[[^\]]*\]) metrics=REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (=Hash.*) =REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE "(file_id|time_range_start|time_range_end)":"[^"]+" "$1":"REDACTED"
-- SQLNESS REPLACE ("[a-z_]+":"[0-9\.]+(ns|us|µs|ms|s)") "DURATION": REDACTED
-- SQLNESS REPLACE "(size|flat_format)":\s*(\d+|true|false) "$1":REDACTED
-- SQLNESS REPLACE ,\s*filter=.*?metrics= metrics=
-- SQLNESS REPLACE Total\s+rows:\s+\d+ Total rows: REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
EXPLAIN ANALYZE VERBOSE SELECT o."id", o.amount, c."name", c.tier
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
WHERE c.tier = 'gold';
-- Verify correctness
-- SQLNESS SORT_RESULT
SELECT o."id", o.amount, c."name", c.tier
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
WHERE c.tier = 'gold';
DROP TABLE orders;
DROP TABLE customers;
-- Test 1.2: Hash Join with Projection Alias
CREATE TABLE orders ("id" INT, customer_id INT, amount DOUBLE, ts TIMESTAMP TIME INDEX);
CREATE TABLE customers (customer_id INT, "name" STRING, tier STRING, ts TIMESTAMP TIME INDEX);
-- Insert test data
INSERT INTO orders VALUES
(1, 1, 100.0, '2024-01-01 00:00:00'),
(2, 2, 200.0, '2024-01-02 00:00:00'),
(3, 1, 150.0, '2024-01-03 00:00:00'),
(4, 3, 300.0, '2024-01-04 00:00:00');
INSERT INTO customers VALUES
(1, 'Alice', 'gold', '2024-01-01 00:00:00'),
(2, 'Bob', 'silver', '2024-01-02 00:00:00'),
(3, 'Charlie', 'bronze', '2024-01-03 00:00:00');
-- Test: Hash join with projection alias
-- The alias on customer_id tests that dynamic filter correctly handles column references
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN SELECT o."id", o.amount, c."name", c.tier
FROM (SELECT "id", customer_id as cid, amount, ts FROM orders) o
JOIN customers c ON o.cid = c.customer_id
WHERE c.tier IN ('gold', 'silver');
-- SQLNESS REPLACE ("metrics_per_partition":\s*.*metrics=) "metrics_per_partition": REDACTED metrics=
-- SQLNESS REPLACE (metrics=\{.*\}) metrics=REDACTED
-- SQLNESS REPLACE (metrics=\[[^\]]*\]) metrics=REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (=Hash.*) =REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE "(file_id|time_range_start|time_range_end)":"[^"]+" "$1":"REDACTED"
-- SQLNESS REPLACE ("[a-z_]+":"[0-9\.]+(ns|us|µs|ms|s)") "DURATION": REDACTED
-- SQLNESS REPLACE "(size|flat_format)":\s*(\d+|true|false) "$1":REDACTED
-- SQLNESS REPLACE ,\s*filter=.*?metrics= metrics=
-- SQLNESS REPLACE Total\s+rows:\s+\d+ Total rows: REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
EXPLAIN ANALYZE VERBOSE SELECT o."id", o.amount, c."name", c.tier
FROM (SELECT "id", customer_id as cid, amount, ts FROM orders) o
JOIN customers c ON o.cid = c.customer_id
WHERE c.tier IN ('gold', 'silver');
-- Verify correctness
-- SQLNESS SORT_RESULT
SELECT o."id", o.amount, c."name", c.tier
FROM (SELECT "id", customer_id as cid, amount, ts FROM orders) o
JOIN customers c ON o.cid = c.customer_id
WHERE c.tier IN ('gold', 'silver');
DROP TABLE orders;
DROP TABLE customers;

View File

@@ -0,0 +1,493 @@
-- Dynamic Filter Test: Combined Hash Join + TopK
-- Tests dynamic filters with both hash join and TopK operators
-- Test 3.1: Hash Join with TopK Subquery
CREATE TABLE orders ("id" INT, customer_id INT, amount DOUBLE, ts TIMESTAMP TIME INDEX);
Affected Rows: 0
CREATE TABLE customers (customer_id INT, "name" STRING, tier STRING, ts TIMESTAMP TIME INDEX);
Affected Rows: 0
-- Insert test data
INSERT INTO orders VALUES
(1, 1, 100.0, '2024-01-01 00:00:00'),
(2, 2, 200.0, '2024-01-02 00:00:00'),
(3, 3, 300.0, '2024-01-03 00:00:00'),
(4, 1, 150.0, '2024-01-04 00:00:00'),
(5, 2, 250.0, '2024-01-05 00:00:00'),
(6, 1, 175.0, '2024-01-06 00:00:00'),
(7, 3, 325.0, '2024-01-07 00:00:00'),
(8, 2, 225.0, '2024-01-08 00:00:00'),
(9, 1, 400.0, '2024-01-09 00:00:00'),
(10, 3, 500.0, '2024-01-10 00:00:00');
Affected Rows: 10
INSERT INTO customers VALUES
(1, 'Alice', 'gold', '2024-01-01 00:00:00'),
(2, 'Bob', 'silver', '2024-01-02 00:00:00'),
(3, 'Charlie', 'bronze', '2024-01-03 00:00:00');
Affected Rows: 3
-- Test: Hash join where probe side uses TopK
-- This combines both dynamic filter sources: TopK generates filter on amount,
-- and hash join generates filter on customer_id
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (=Hash.*) =REDACTED
EXPLAIN SELECT top_orders."id", top_orders.amount, c."name", c.tier
FROM (
SELECT "id", customer_id, amount, ts
FROM orders
ORDER BY amount DESC
LIMIT 5
) top_orders
JOIN customers c ON top_orders.customer_id = c.customer_id
WHERE c.tier IN ('gold', 'bronze');
+---------------+-----------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: top_orders.id, top_orders.amount, c.name, c.tier |
| | Inner Join: top_orders.customer_id = c.customer_id |
| | Projection: top_orders.id, top_orders.customer_id, top_orders.amount |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | SubqueryAlias: top_orders |
| | Limit: skip=0, fetch=5 |
| | Sort: orders.amount DESC NULLS FIRST |
| | Projection: orders.id, orders.customer_id, orders.amount, orders.ts |
| | TableScan: orders |
| | ]] |
| | Filter: c.tier = Utf8("gold") OR c.tier = Utf8("bronze") |
| | Projection: c.customer_id, c.name, c.tier |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | SubqueryAlias: c |
| | TableScan: customers |
| | ]] |
| physical_plan | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(customer_id@1, customer_id@0)], projection=[id@0, amount@2, name@4, tier@5] |
| | RepartitionExec: partitioning=REDACTED
| | ProjectionExec: expr=[id@0 as id, customer_id@1 as customer_id, amount@2 as amount] |
| | MergeScanExec: REDACTED
| | RepartitionExec: partitioning=REDACTED
| | FilterExec: tier@2 = gold OR tier@2 = bronze |
| | ProjectionExec: expr=[customer_id@0 as customer_id, name@1 as name, tier@2 as tier] |
| | MergeScanExec: REDACTED
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------+
-- SQLNESS REPLACE ("metrics_per_partition":\s*.*metrics=) "metrics_per_partition": REDACTED metrics=
-- SQLNESS REPLACE (metrics=\{.*\}) metrics=REDACTED
-- SQLNESS REPLACE (metrics=\[[^\]]*\]) metrics=REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (=Hash.*) =REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE "(file_id|time_range_start|time_range_end)":"[^"]+" "$1":"REDACTED"
-- SQLNESS REPLACE ("[a-z_]+":"[0-9\.]+(ns|us|µs|ms|s)") "DURATION": REDACTED
-- SQLNESS REPLACE "(size|flat_format)":\s*(\d+|true|false) "$1":REDACTED
-- SQLNESS REPLACE ,\s*filter=.*?metrics= metrics=
-- SQLNESS REPLACE Total\s+rows:\s+\d+ Total rows: REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
EXPLAIN ANALYZE VERBOSE SELECT top_orders."id", top_orders.amount, c."name", c.tier
FROM (
SELECT "id", customer_id, amount, ts
FROM orders
ORDER BY amount DESC
LIMIT 5
) top_orders
JOIN customers c ON top_orders.customer_id = c.customer_id
WHERE c.tier IN ('gold', 'bronze');
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(customer_id@1, customer_id@0)], projection=[id@0, amount@2, name@4, tier@5] metrics=REDACTED_|
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_ProjectionExec: expr=[id@0 as id, customer_id@1 as customer_id, amount@2 as amount] metrics=REDACTED_|
|_|_|_MergeScanExec: REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_FilterExec: tier@2 = gold OR tier@2 = bronze metrics=REDACTED_|
|_|_|_ProjectionExec: expr=[customer_id@0 as customer_id, name@1 as name, tier@2 as tier] metrics=REDACTED_|
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [amount@2 DESC], fetch=5 metrics=REDACTED_|
|_|_|_SortExec: TopK(fetch=5), expr=[amount@2 DESC], preserve_partitioning=[true] metrics=REDACTED_|
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["id", "customer_id", "amount", "ts"], "dyn_filters": ["DynamicFilter [ empty ]"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED |
|_|_|_|
| 1_| 0_|_CooperativeExec metrics=REDACTED_|
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["customer_id", "name", "tier", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED_|
|_|_|_|
|_|_| Total rows: REDACTED_|
+-+-+-+
-- Verify correctness
-- SQLNESS SORT_RESULT
SELECT top_orders."id", top_orders.amount, c."name", c.tier
FROM (
SELECT "id", customer_id, amount, ts
FROM orders
ORDER BY amount DESC
LIMIT 5
) top_orders
JOIN customers c ON top_orders.customer_id = c.customer_id
WHERE c.tier IN ('gold', 'bronze');
+----+--------+---------+--------+
+----+--------+---------+--------+
+----+--------+---------+--------+
| 10 | 500.0 | Charlie | bronze |
| 3 | 300.0 | Charlie | bronze |
| 7 | 325.0 | Charlie | bronze |
| 9 | 400.0 | Alice | gold |
| id | amount | name | tier |
DROP TABLE orders;
Affected Rows: 0
DROP TABLE customers;
Affected Rows: 0
-- Test 3.2: TopK with Hash Join Subquery
CREATE TABLE orders ("id" INT, customer_id INT, amount DOUBLE, ts TIMESTAMP TIME INDEX);
Affected Rows: 0
CREATE TABLE customers (customer_id INT, "name" STRING, tier STRING, ts TIMESTAMP TIME INDEX);
Affected Rows: 0
-- Insert test data
INSERT INTO orders VALUES
(1, 1, 100.0, '2024-01-01 00:00:00'),
(2, 2, 200.0, '2024-01-02 00:00:00'),
(3, 3, 300.0, '2024-01-03 00:00:00'),
(4, 1, 150.0, '2024-01-04 00:00:00'),
(5, 2, 250.0, '2024-01-05 00:00:00'),
(6, 1, 175.0, '2024-01-06 00:00:00'),
(7, 3, 325.0, '2024-01-07 00:00:00'),
(8, 2, 225.0, '2024-01-08 00:00:00'),
(9, 1, 400.0, '2024-01-09 00:00:00'),
(10, 3, 500.0, '2024-01-10 00:00:00');
Affected Rows: 10
INSERT INTO customers VALUES
(1, 'Alice', 'gold', '2024-01-01 00:00:00'),
(2, 'Bob', 'silver', '2024-01-02 00:00:00'),
(3, 'Charlie', 'bronze', '2024-01-03 00:00:00');
Affected Rows: 3
-- Test: TopK where sorting side uses Hash Join
-- Hash join produces customer info, then TopK sorts the joined results
-- This tests that TopK dynamic filter works on hash join output
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (=Hash.*) =REDACTED
EXPLAIN SELECT "id", customer_id, "name", tier, amount
FROM (
SELECT o."id", o.customer_id, c."name", c.tier, o.amount
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
WHERE c.tier IN ('gold', 'silver')
)
ORDER BY amount DESC
LIMIT 4;
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: o.amount DESC NULLS FIRST, fetch=4 |
| | Projection: o.id, o.customer_id, c.name, c.tier, o.amount |
| | Inner Join: o.customer_id = c.customer_id |
| | Projection: o.id, o.customer_id, o.amount |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | SubqueryAlias: o |
| | TableScan: orders |
| | ]] |
| | Filter: c.tier = Utf8("gold") OR c.tier = Utf8("silver") |
| | Projection: c.customer_id, c.name, c.tier |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | SubqueryAlias: c |
| | TableScan: customers |
| | ]] |
| physical_plan | SortPreservingMergeExec: [amount@4 DESC], fetch=4 |
| | SortExec: TopK(fetch=4), expr=[amount@4 DESC], preserve_partitioning=[true] |
| | ProjectionExec: expr=[id@0 as id, customer_id@1 as customer_id, name@3 as name, tier@4 as tier, amount@2 as amount] |
| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(customer_id@1, customer_id@0)], projection=[id@0, customer_id@1, amount@2, name@4, tier@5] |
| | RepartitionExec: partitioning=REDACTED
| | ProjectionExec: expr=[id@0 as id, customer_id@1 as customer_id, amount@2 as amount] |
| | MergeScanExec: REDACTED
| | RepartitionExec: partitioning=REDACTED
| | FilterExec: tier@2 = gold OR tier@2 = silver |
| | ProjectionExec: expr=[customer_id@0 as customer_id, name@1 as name, tier@2 as tier] |
| | MergeScanExec: REDACTED
| | |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQLNESS REPLACE ("metrics_per_partition":\s*.*metrics=) "metrics_per_partition": REDACTED metrics=
-- SQLNESS REPLACE (metrics=\{.*\}) metrics=REDACTED
-- SQLNESS REPLACE (metrics=\[[^\]]*\]) metrics=REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (=Hash.*) =REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE "(file_id|time_range_start|time_range_end)":"[^"]+" "$1":"REDACTED"
-- SQLNESS REPLACE ("[a-z_]+":"[0-9\.]+(ns|us|µs|ms|s)") "DURATION": REDACTED
-- SQLNESS REPLACE "(size|flat_format)":\s*(\d+|true|false) "$1":REDACTED
-- SQLNESS REPLACE ,\s*filter=.*?metrics= metrics=
-- SQLNESS REPLACE Total\s+rows:\s+\d+ Total rows: REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
EXPLAIN ANALYZE VERBOSE SELECT "id", customer_id, "name", tier, amount
FROM (
SELECT o."id", o.customer_id, c."name", c.tier, o.amount
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
WHERE c.tier IN ('gold', 'silver')
)
ORDER BY amount DESC
LIMIT 4;
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_SortPreservingMergeExec: [amount@4 DESC], fetch=4 metrics=REDACTED_|
|_|_|_SortExec: TopK(fetch=4), expr=[amount@4 DESC], preserve_partitioning=[true] metrics=REDACTED_|
|_|_|_ProjectionExec: expr=[id@0 as id, customer_id@1 as customer_id, name@3 as name, tier@4 as tier, amount@2 as amount] metrics=REDACTED_|
|_|_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(customer_id@1, customer_id@0)], projection=[id@0, customer_id@1, amount@2, name@4, tier@5] metrics=REDACTED_|
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_ProjectionExec: expr=[id@0 as id, customer_id@1 as customer_id, amount@2 as amount] metrics=REDACTED_|
|_|_|_MergeScanExec: REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_FilterExec: tier@2 = gold OR tier@2 = silver metrics=REDACTED_|
|_|_|_ProjectionExec: expr=[customer_id@0 as customer_id, name@1 as name, tier@2 as tier] metrics=REDACTED_|
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_CooperativeExec metrics=REDACTED_|
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["id", "customer_id", "amount", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED |
|_|_|_|
| 1_| 0_|_CooperativeExec metrics=REDACTED_|
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["customer_id", "name", "tier", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED_|
|_|_|_|
|_|_| Total rows: REDACTED_|
+-+-+-+
-- Verify correctness
SELECT "id", customer_id, "name", tier, amount
FROM (
SELECT o."id", o.customer_id, c."name", c.tier, o.amount
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
WHERE c.tier IN ('gold', 'silver')
)
ORDER BY amount DESC
LIMIT 4;
+----+-------------+-------+--------+--------+
| id | customer_id | name | tier | amount |
+----+-------------+-------+--------+--------+
| 9 | 1 | Alice | gold | 400.0 |
| 5 | 2 | Bob | silver | 250.0 |
| 8 | 2 | Bob | silver | 225.0 |
| 2 | 2 | Bob | silver | 200.0 |
+----+-------------+-------+--------+--------+
DROP TABLE orders;
Affected Rows: 0
DROP TABLE customers;
Affected Rows: 0
-- Test 3.3: Multiple Dynamic Filters
CREATE TABLE orders ("id" INT, customer_id INT, product_id INT, amount DOUBLE, ts TIMESTAMP TIME INDEX);
Affected Rows: 0
CREATE TABLE customers (customer_id INT, "name" STRING, tier STRING, ts TIMESTAMP TIME INDEX);
Affected Rows: 0
CREATE TABLE products (product_id INT, "name" STRING, "category" STRING, ts TIMESTAMP TIME INDEX);
Affected Rows: 0
-- Insert test data
INSERT INTO orders VALUES
(1, 1, 101, 100.0, '2024-01-01 00:00:00'),
(2, 2, 102, 200.0, '2024-01-02 00:00:00'),
(3, 3, 103, 300.0, '2024-01-03 00:00:00'),
(4, 1, 101, 150.0, '2024-01-04 00:00:00'),
(5, 2, 102, 250.0, '2024-01-05 00:00:00'),
(6, 1, 103, 175.0, '2024-01-06 00:00:00'),
(7, 3, 101, 325.0, '2024-01-07 00:00:00'),
(8, 2, 102, 225.0, '2024-01-08 00:00:00'),
(9, 1, 103, 400.0, '2024-01-09 00:00:00'),
(10, 3, 101, 500.0, '2024-01-10 00:00:00');
Affected Rows: 10
INSERT INTO customers VALUES
(1, 'Alice', 'gold', '2024-01-01 00:00:00'),
(2, 'Bob', 'silver', '2024-01-02 00:00:00'),
(3, 'Charlie', 'bronze', '2024-01-03 00:00:00');
Affected Rows: 3
INSERT INTO products VALUES
(101, 'Laptop', 'electronics', '2024-01-01 00:00:00'),
(102, 'Mouse', 'electronics', '2024-01-02 00:00:00'),
(103, 'Keyboard', 'electronics', '2024-01-03 00:00:00');
Affected Rows: 3
-- Test: Multiple hash joins producing dynamic filters
-- This creates a chain: orders JOIN customers JOIN products
-- Both hash joins can produce dynamic filters
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (=Hash.*) =REDACTED
EXPLAIN SELECT o."id", o.amount, c."name", c.tier, p."name" as product_name, p."category"
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
JOIN products p ON o.product_id = p.product_id
WHERE c.tier IN ('gold', 'silver')
AND p."category" = 'electronics';
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: o.id, o.amount, c.name, c.tier, p.name AS product_name, p.category |
| | Inner Join: o.product_id = p.product_id |
| | Projection: o.id, o.product_id, o.amount, c.name, c.tier |
| | Inner Join: o.customer_id = c.customer_id |
| | Projection: o.id, o.customer_id, o.product_id, o.amount |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | SubqueryAlias: o |
| | TableScan: orders |
| | ]] |
| | Filter: c.tier = Utf8("gold") OR c.tier = Utf8("silver") |
| | Projection: c.customer_id, c.name, c.tier |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | SubqueryAlias: c |
| | TableScan: customers |
| | ]] |
| | Filter: p.category = Utf8("electronics") |
| | Projection: p.product_id, p.name, p.category |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | SubqueryAlias: p |
| | TableScan: products |
| | ]] |
| physical_plan | ProjectionExec: expr=[id@0 as id, amount@1 as amount, name@2 as name, tier@3 as tier, name@4 as product_name, category@5 as category] |
| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(product_id@1, product_id@0)], projection=[id@0, amount@2, name@3, tier@4, name@6, category@7] |
| | RepartitionExec: partitioning=REDACTED
| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(customer_id@1, customer_id@0)], projection=[id@0, product_id@2, amount@3, name@5, tier@6] |
| | RepartitionExec: partitioning=REDACTED
| | ProjectionExec: expr=[id@0 as id, customer_id@1 as customer_id, product_id@2 as product_id, amount@3 as amount] |
| | MergeScanExec: REDACTED
| | RepartitionExec: partitioning=REDACTED
| | FilterExec: tier@2 = gold OR tier@2 = silver |
| | ProjectionExec: expr=[customer_id@0 as customer_id, name@1 as name, tier@2 as tier] |
| | MergeScanExec: REDACTED
| | RepartitionExec: partitioning=REDACTED
| | FilterExec: category@2 = electronics |
| | ProjectionExec: expr=[product_id@0 as product_id, name@1 as name, category@2 as category] |
| | MergeScanExec: REDACTED
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQLNESS REPLACE ("metrics_per_partition":\s*.*metrics=) "metrics_per_partition": REDACTED metrics=
-- SQLNESS REPLACE (metrics=\{.*\}) metrics=REDACTED
-- SQLNESS REPLACE (metrics=\[[^\]]*\]) metrics=REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (=Hash.*) =REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE "(file_id|time_range_start|time_range_end)":"[^"]+" "$1":"REDACTED"
-- SQLNESS REPLACE ("[a-z_]+":"[0-9\.]+(ns|us|µs|ms|s)") "DURATION": REDACTED
-- SQLNESS REPLACE "(size|flat_format)":\s*(\d+|true|false) "$1":REDACTED
-- SQLNESS REPLACE ,\s*filter=.*?metrics= metrics=
-- SQLNESS REPLACE Total\s+rows:\s+\d+ Total rows: REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
EXPLAIN ANALYZE VERBOSE SELECT o."id", o.amount, c."name", c.tier, p."name" as product_name, p."category"
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
JOIN products p ON o.product_id = p.product_id
WHERE c.tier IN ('gold', 'silver')
AND p."category" = 'electronics';
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_ProjectionExec: expr=[id@0 as id, amount@1 as amount, name@2 as name, tier@3 as tier, name@4 as product_name, category@5 as category] metrics=REDACTED_|
|_|_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(product_id@1, product_id@0)], projection=[id@0, amount@2, name@3, tier@4, name@6, category@7] metrics=REDACTED_|
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(customer_id@1, customer_id@0)], projection=[id@0, product_id@2, amount@3, name@5, tier@6] metrics=REDACTED_|
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_ProjectionExec: expr=[id@0 as id, customer_id@1 as customer_id, product_id@2 as product_id, amount@3 as amount] metrics=REDACTED_|
|_|_|_MergeScanExec: REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_FilterExec: tier@2 = gold OR tier@2 = silver metrics=REDACTED_|
|_|_|_ProjectionExec: expr=[customer_id@0 as customer_id, name@1 as name, tier@2 as tier] metrics=REDACTED_|
|_|_|_MergeScanExec: REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_FilterExec: category@2 = electronics metrics=REDACTED_|
|_|_|_ProjectionExec: expr=[product_id@0 as product_id, name@1 as name, category@2 as category] metrics=REDACTED_|
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_CooperativeExec metrics=REDACTED_|
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["id", "customer_id", "product_id", "amount", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED |
|_|_|_|
| 1_| 0_|_CooperativeExec metrics=REDACTED_|
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["customer_id", "name", "tier", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED_|
|_|_|_|
| 1_| 0_|_CooperativeExec metrics=REDACTED_|
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["product_id", "name", "category", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED_|
|_|_|_|
|_|_| Total rows: REDACTED_|
+-+-+-+
-- Verify correctness
-- SQLNESS SORT_RESULT
SELECT o."id", o.amount, c."name", c.tier, p."name" as product_name, p."category"
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
JOIN products p ON o.product_id = p.product_id
WHERE c.tier IN ('gold', 'silver')
AND p."category" = 'electronics';
+----+--------+-------+--------+--------------+-------------+
+----+--------+-------+--------+--------------+-------------+
+----+--------+-------+--------+--------------+-------------+
| 1 | 100.0 | Alice | gold | Laptop | electronics |
| 2 | 200.0 | Bob | silver | Mouse | electronics |
| 4 | 150.0 | Alice | gold | Laptop | electronics |
| 5 | 250.0 | Bob | silver | Mouse | electronics |
| 6 | 175.0 | Alice | gold | Keyboard | electronics |
| 8 | 225.0 | Bob | silver | Mouse | electronics |
| 9 | 400.0 | Alice | gold | Keyboard | electronics |
| id | amount | name | tier | product_name | category |
DROP TABLE orders;
Affected Rows: 0
DROP TABLE customers;
Affected Rows: 0
DROP TABLE products;
Affected Rows: 0

View File

@@ -0,0 +1,233 @@
-- Dynamic Filter Test: Combined Hash Join + TopK
-- Tests dynamic filters with both hash join and TopK operators
-- Test 3.1: Hash Join with TopK Subquery
CREATE TABLE orders ("id" INT, customer_id INT, amount DOUBLE, ts TIMESTAMP TIME INDEX);
CREATE TABLE customers (customer_id INT, "name" STRING, tier STRING, ts TIMESTAMP TIME INDEX);
-- Insert test data
INSERT INTO orders VALUES
(1, 1, 100.0, '2024-01-01 00:00:00'),
(2, 2, 200.0, '2024-01-02 00:00:00'),
(3, 3, 300.0, '2024-01-03 00:00:00'),
(4, 1, 150.0, '2024-01-04 00:00:00'),
(5, 2, 250.0, '2024-01-05 00:00:00'),
(6, 1, 175.0, '2024-01-06 00:00:00'),
(7, 3, 325.0, '2024-01-07 00:00:00'),
(8, 2, 225.0, '2024-01-08 00:00:00'),
(9, 1, 400.0, '2024-01-09 00:00:00'),
(10, 3, 500.0, '2024-01-10 00:00:00');
INSERT INTO customers VALUES
(1, 'Alice', 'gold', '2024-01-01 00:00:00'),
(2, 'Bob', 'silver', '2024-01-02 00:00:00'),
(3, 'Charlie', 'bronze', '2024-01-03 00:00:00');
-- Test: Hash join where probe side uses TopK
-- This combines both dynamic filter sources: TopK generates filter on amount,
-- and hash join generates filter on customer_id
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (=Hash.*) =REDACTED
EXPLAIN SELECT top_orders."id", top_orders.amount, c."name", c.tier
FROM (
SELECT "id", customer_id, amount, ts
FROM orders
ORDER BY amount DESC
LIMIT 5
) top_orders
JOIN customers c ON top_orders.customer_id = c.customer_id
WHERE c.tier IN ('gold', 'bronze');
-- SQLNESS REPLACE ("metrics_per_partition":\s*.*metrics=) "metrics_per_partition": REDACTED metrics=
-- SQLNESS REPLACE (metrics=\{.*\}) metrics=REDACTED
-- SQLNESS REPLACE (metrics=\[[^\]]*\]) metrics=REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (=Hash.*) =REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE "(file_id|time_range_start|time_range_end)":"[^"]+" "$1":"REDACTED"
-- SQLNESS REPLACE ("[a-z_]+":"[0-9\.]+(ns|us|µs|ms|s)") "DURATION": REDACTED
-- SQLNESS REPLACE "(size|flat_format)":\s*(\d+|true|false) "$1":REDACTED
-- SQLNESS REPLACE ,\s*filter=.*?metrics= metrics=
-- SQLNESS REPLACE Total\s+rows:\s+\d+ Total rows: REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
EXPLAIN ANALYZE VERBOSE SELECT top_orders."id", top_orders.amount, c."name", c.tier
FROM (
SELECT "id", customer_id, amount, ts
FROM orders
ORDER BY amount DESC
LIMIT 5
) top_orders
JOIN customers c ON top_orders.customer_id = c.customer_id
WHERE c.tier IN ('gold', 'bronze');
-- Verify correctness
-- SQLNESS SORT_RESULT
SELECT top_orders."id", top_orders.amount, c."name", c.tier
FROM (
SELECT "id", customer_id, amount, ts
FROM orders
ORDER BY amount DESC
LIMIT 5
) top_orders
JOIN customers c ON top_orders.customer_id = c.customer_id
WHERE c.tier IN ('gold', 'bronze');
DROP TABLE orders;
DROP TABLE customers;
-- Test 3.2: TopK with Hash Join Subquery
CREATE TABLE orders ("id" INT, customer_id INT, amount DOUBLE, ts TIMESTAMP TIME INDEX);
CREATE TABLE customers (customer_id INT, "name" STRING, tier STRING, ts TIMESTAMP TIME INDEX);
-- Insert test data
INSERT INTO orders VALUES
(1, 1, 100.0, '2024-01-01 00:00:00'),
(2, 2, 200.0, '2024-01-02 00:00:00'),
(3, 3, 300.0, '2024-01-03 00:00:00'),
(4, 1, 150.0, '2024-01-04 00:00:00'),
(5, 2, 250.0, '2024-01-05 00:00:00'),
(6, 1, 175.0, '2024-01-06 00:00:00'),
(7, 3, 325.0, '2024-01-07 00:00:00'),
(8, 2, 225.0, '2024-01-08 00:00:00'),
(9, 1, 400.0, '2024-01-09 00:00:00'),
(10, 3, 500.0, '2024-01-10 00:00:00');
INSERT INTO customers VALUES
(1, 'Alice', 'gold', '2024-01-01 00:00:00'),
(2, 'Bob', 'silver', '2024-01-02 00:00:00'),
(3, 'Charlie', 'bronze', '2024-01-03 00:00:00');
-- Test: TopK where sorting side uses Hash Join
-- Hash join produces customer info, then TopK sorts the joined results
-- This tests that TopK dynamic filter works on hash join output
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (=Hash.*) =REDACTED
EXPLAIN SELECT "id", customer_id, "name", tier, amount
FROM (
SELECT o."id", o.customer_id, c."name", c.tier, o.amount
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
WHERE c.tier IN ('gold', 'silver')
)
ORDER BY amount DESC
LIMIT 4;
-- SQLNESS REPLACE ("metrics_per_partition":\s*.*metrics=) "metrics_per_partition": REDACTED metrics=
-- SQLNESS REPLACE (metrics=\{.*\}) metrics=REDACTED
-- SQLNESS REPLACE (metrics=\[[^\]]*\]) metrics=REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (=Hash.*) =REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE "(file_id|time_range_start|time_range_end)":"[^"]+" "$1":"REDACTED"
-- SQLNESS REPLACE ("[a-z_]+":"[0-9\.]+(ns|us|µs|ms|s)") "DURATION": REDACTED
-- SQLNESS REPLACE "(size|flat_format)":\s*(\d+|true|false) "$1":REDACTED
-- SQLNESS REPLACE ,\s*filter=.*?metrics= metrics=
-- SQLNESS REPLACE Total\s+rows:\s+\d+ Total rows: REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
EXPLAIN ANALYZE VERBOSE SELECT "id", customer_id, "name", tier, amount
FROM (
SELECT o."id", o.customer_id, c."name", c.tier, o.amount
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
WHERE c.tier IN ('gold', 'silver')
)
ORDER BY amount DESC
LIMIT 4;
-- Verify correctness
SELECT "id", customer_id, "name", tier, amount
FROM (
SELECT o."id", o.customer_id, c."name", c.tier, o.amount
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
WHERE c.tier IN ('gold', 'silver')
)
ORDER BY amount DESC
LIMIT 4;
DROP TABLE orders;
DROP TABLE customers;
-- Test 3.3: Multiple Dynamic Filters
CREATE TABLE orders ("id" INT, customer_id INT, product_id INT, amount DOUBLE, ts TIMESTAMP TIME INDEX);
CREATE TABLE customers (customer_id INT, "name" STRING, tier STRING, ts TIMESTAMP TIME INDEX);
CREATE TABLE products (product_id INT, "name" STRING, "category" STRING, ts TIMESTAMP TIME INDEX);
-- Insert test data
INSERT INTO orders VALUES
(1, 1, 101, 100.0, '2024-01-01 00:00:00'),
(2, 2, 102, 200.0, '2024-01-02 00:00:00'),
(3, 3, 103, 300.0, '2024-01-03 00:00:00'),
(4, 1, 101, 150.0, '2024-01-04 00:00:00'),
(5, 2, 102, 250.0, '2024-01-05 00:00:00'),
(6, 1, 103, 175.0, '2024-01-06 00:00:00'),
(7, 3, 101, 325.0, '2024-01-07 00:00:00'),
(8, 2, 102, 225.0, '2024-01-08 00:00:00'),
(9, 1, 103, 400.0, '2024-01-09 00:00:00'),
(10, 3, 101, 500.0, '2024-01-10 00:00:00');
INSERT INTO customers VALUES
(1, 'Alice', 'gold', '2024-01-01 00:00:00'),
(2, 'Bob', 'silver', '2024-01-02 00:00:00'),
(3, 'Charlie', 'bronze', '2024-01-03 00:00:00');
INSERT INTO products VALUES
(101, 'Laptop', 'electronics', '2024-01-01 00:00:00'),
(102, 'Mouse', 'electronics', '2024-01-02 00:00:00'),
(103, 'Keyboard', 'electronics', '2024-01-03 00:00:00');
-- Test: Multiple hash joins producing dynamic filters
-- This creates a chain: orders JOIN customers JOIN products
-- Both hash joins can produce dynamic filters
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (=Hash.*) =REDACTED
EXPLAIN SELECT o."id", o.amount, c."name", c.tier, p."name" as product_name, p."category"
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
JOIN products p ON o.product_id = p.product_id
WHERE c.tier IN ('gold', 'silver')
AND p."category" = 'electronics';
-- SQLNESS REPLACE ("metrics_per_partition":\s*.*metrics=) "metrics_per_partition": REDACTED metrics=
-- SQLNESS REPLACE (metrics=\{.*\}) metrics=REDACTED
-- SQLNESS REPLACE (metrics=\[[^\]]*\]) metrics=REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (=Hash.*) =REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE "(file_id|time_range_start|time_range_end)":"[^"]+" "$1":"REDACTED"
-- SQLNESS REPLACE ("[a-z_]+":"[0-9\.]+(ns|us|µs|ms|s)") "DURATION": REDACTED
-- SQLNESS REPLACE "(size|flat_format)":\s*(\d+|true|false) "$1":REDACTED
-- SQLNESS REPLACE ,\s*filter=.*?metrics= metrics=
-- SQLNESS REPLACE Total\s+rows:\s+\d+ Total rows: REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
EXPLAIN ANALYZE VERBOSE SELECT o."id", o.amount, c."name", c.tier, p."name" as product_name, p."category"
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
JOIN products p ON o.product_id = p.product_id
WHERE c.tier IN ('gold', 'silver')
AND p."category" = 'electronics';
-- Verify correctness
-- SQLNESS SORT_RESULT
SELECT o."id", o.amount, c."name", c.tier, p."name" as product_name, p."category"
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
JOIN products p ON o.product_id = p.product_id
WHERE c.tier IN ('gold', 'silver')
AND p."category" = 'electronics';
DROP TABLE orders;
DROP TABLE customers;
DROP TABLE products;

View File

@@ -0,0 +1,200 @@
-- Dynamic Filter Test: TopK (Sort + Limit)
-- Tests dynamic filters generated by TopK operator
-- Test 2.1: Basic TopK
CREATE TABLE orders ("id" INT, customer_id INT, amount DOUBLE, ts TIMESTAMP TIME INDEX);
Affected Rows: 0
-- Insert sufficient test data
INSERT INTO orders VALUES
(1, 1, 1100.0, '2024-01-01 00:00:00'),
(2, 2, 1000.0, '2024-01-02 00:00:00'),
(3, 3, 900.0, '2024-01-03 00:00:00');
Affected Rows: 3
admin flush_table('orders');
+-----------------------------+
| ADMIN flush_table('orders') |
+-----------------------------+
| 0 |
+-----------------------------+
INSERT INTO orders VALUES
(4, 1, 325.0, '2024-01-04 00:00:00'),
(5, 2, 300.0, '2024-01-05 00:00:00'),
(6, 1, 250.0, '2024-01-06 00:00:00'),
(7, 3, 225.0, '2024-01-07 00:00:00');
Affected Rows: 4
admin flush_table('orders');
+-----------------------------+
| ADMIN flush_table('orders') |
+-----------------------------+
| 0 |
+-----------------------------+
INSERT INTO orders VALUES
(8, 2, 200.0, '2024-01-08 00:00:00'),
(9, 1, 175.0, '2024-01-09 00:00:00'),
(10, 3, 150.0, '2024-01-10 00:00:00'),
(11, 2, 100.0, '2024-01-11 00:00:00'),
(12, 1, 50.0, '2024-01-12 00:00:00');
Affected Rows: 5
-- Test: Basic TopK query - get top 3 orders by amount
-- TopK (Sort + Limit) should generate dynamic filter on amount
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN SELECT "id", customer_id, amount
FROM orders
ORDER BY amount DESC
LIMIT 3;
+---------------+--------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Limit: skip=0, fetch=3 |
| | Sort: orders.amount DESC NULLS FIRST |
| | Projection: orders.id, orders.customer_id, orders.amount |
| | TableScan: orders |
| | ]] |
| physical_plan | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+--------------------------------------------------------------+
-- SQLNESS REPLACE ("metrics_per_partition":\s*.*metrics=) "metrics_per_partition": REDACTED metrics=
-- SQLNESS REPLACE (metrics=\{.*\}) metrics=REDACTED
-- SQLNESS REPLACE (metrics=\[[^\]]*\]) metrics=REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (=Hash.*) =REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE "(file_id|time_range_start|time_range_end)":"[^"]+" "$1":"REDACTED"
-- SQLNESS REPLACE ("[a-z_]+":"[0-9\.]+(ns|us|µs|ms|s)") "DURATION": REDACTED
-- SQLNESS REPLACE "(size|flat_format)":\s*(\d+|true|false) "$1":REDACTED
-- SQLNESS REPLACE ,\s*filter=.*?metrics= metrics=
-- SQLNESS REPLACE Total\s+rows:\s+\d+ Total rows: REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
-- SQLNESS REPLACE DynamicFilter\s\[[^\]]*\] DynamicFilter [ REDACTED ]
-- SQLNESS REPLACE "files":\s\[[^\]]*\] "files": REDACTED
EXPLAIN ANALYZE VERBOSE SELECT "id", customer_id, amount
FROM orders
ORDER BY amount DESC
LIMIT 3;
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_CooperativeExec metrics=REDACTED_|
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [amount@2 DESC], fetch=3 metrics=REDACTED_|
|_|_|_SortExec: TopK(fetch=3), expr=[amount@2 DESC], preserve_partitioning=[true] metrics=REDACTED_|
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["id", "customer_id", "amount"], "dyn_filters": ["DynamicFilter [ REDACTED ]"], "files": REDACTED, "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED |
|_|_|_|
|_|_| Total rows: REDACTED_|
+-+-+-+
-- Verify correctness
SELECT "id", customer_id, amount
FROM orders
ORDER BY amount DESC
LIMIT 3;
+----+-------------+--------+
| id | customer_id | amount |
+----+-------------+--------+
| 1 | 1 | 1100.0 |
| 2 | 2 | 1000.0 |
| 3 | 3 | 900.0 |
+----+-------------+--------+
DROP TABLE orders;
Affected Rows: 0
-- Test 2.2: TopK with Projection Alias
-- USER NOTE: must make alias for topk's sort column to be able to test dynamic filter pushdown
CREATE TABLE orders ("id" INT, customer_id INT, amount DOUBLE, ts TIMESTAMP TIME INDEX);
Affected Rows: 0
-- Insert test data
INSERT INTO orders VALUES
(1, 1, 100.0, '2024-01-01 00:00:00'),
(2, 2, 200.0, '2024-01-02 00:00:00'),
(3, 3, 300.0, '2024-01-03 00:00:00'),
(4, 1, 150.0, '2024-01-04 00:00:00'),
(5, 2, 250.0, '2024-01-05 00:00:00'),
(6, 1, 175.0, '2024-01-06 00:00:00'),
(7, 3, 325.0, '2024-01-07 00:00:00'),
(8, 2, 225.0, '2024-01-08 00:00:00');
Affected Rows: 8
-- Test: TopK with projection alias
-- The alias on amount tests that dynamic filter correctly handles column references after projection
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN SELECT "id", customer_id, total_amount
FROM (SELECT "id", customer_id, amount as total_amount, ts FROM orders)
ORDER BY total_amount DESC
LIMIT 3;
+---------------+-------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Limit: skip=0, fetch=3 |
| | Sort: total_amount DESC NULLS FIRST |
| | Projection: orders.id, orders.customer_id, total_amount |
| | Projection: orders.id, orders.customer_id, orders.amount AS total_amount, orders.ts |
| | TableScan: orders |
| | ]] |
| physical_plan | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+-------------------------------------------------------------------------------------------+
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (=Hash.*) =REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
-- some order problem between standalone&distributed, comment out for now
-- EXPLAIN ANALYZE SELECT "id", customer_id, total_amount
-- FROM (SELECT "id", customer_id, amount as total_amount, ts FROM orders)
-- ORDER BY total_amount DESC
-- LIMIT 3;
-- Verify correctness
SELECT "id", customer_id, total_amount
FROM (SELECT "id", customer_id, amount as total_amount, ts FROM orders)
ORDER BY total_amount DESC
LIMIT 3;
+-+-+-+
| id | customer_id | total_amount |
+-+-+-+
| 7_| 3_| 325.0_|
| 3_| 3_| 300.0_|
| 5_| 2_| 250.0_|
+-+-+-+
DROP TABLE orders;
Affected Rows: 0

View File

@@ -0,0 +1,115 @@
-- Dynamic Filter Test: TopK (Sort + Limit)
-- Tests dynamic filters generated by TopK operator
-- Test 2.1: Basic TopK
CREATE TABLE orders ("id" INT, customer_id INT, amount DOUBLE, ts TIMESTAMP TIME INDEX);
-- Insert sufficient test data
INSERT INTO orders VALUES
(1, 1, 1100.0, '2024-01-01 00:00:00'),
(2, 2, 1000.0, '2024-01-02 00:00:00'),
(3, 3, 900.0, '2024-01-03 00:00:00');
admin flush_table('orders');
INSERT INTO orders VALUES
(4, 1, 325.0, '2024-01-04 00:00:00'),
(5, 2, 300.0, '2024-01-05 00:00:00'),
(6, 1, 250.0, '2024-01-06 00:00:00'),
(7, 3, 225.0, '2024-01-07 00:00:00');
admin flush_table('orders');
INSERT INTO orders VALUES
(8, 2, 200.0, '2024-01-08 00:00:00'),
(9, 1, 175.0, '2024-01-09 00:00:00'),
(10, 3, 150.0, '2024-01-10 00:00:00'),
(11, 2, 100.0, '2024-01-11 00:00:00'),
(12, 1, 50.0, '2024-01-12 00:00:00');
-- Test: Basic TopK query - get top 3 orders by amount
-- TopK (Sort + Limit) should generate dynamic filter on amount
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN SELECT "id", customer_id, amount
FROM orders
ORDER BY amount DESC
LIMIT 3;
-- SQLNESS REPLACE ("metrics_per_partition":\s*.*metrics=) "metrics_per_partition": REDACTED metrics=
-- SQLNESS REPLACE (metrics=\{.*\}) metrics=REDACTED
-- SQLNESS REPLACE (metrics=\[[^\]]*\]) metrics=REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (=Hash.*) =REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE "(file_id|time_range_start|time_range_end)":"[^"]+" "$1":"REDACTED"
-- SQLNESS REPLACE ("[a-z_]+":"[0-9\.]+(ns|us|µs|ms|s)") "DURATION": REDACTED
-- SQLNESS REPLACE "(size|flat_format)":\s*(\d+|true|false) "$1":REDACTED
-- SQLNESS REPLACE ,\s*filter=.*?metrics= metrics=
-- SQLNESS REPLACE Total\s+rows:\s+\d+ Total rows: REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
-- SQLNESS REPLACE DynamicFilter\s\[[^\]]*\] DynamicFilter [ REDACTED ]
-- SQLNESS REPLACE "files":\s\[[^\]]*\] "files": REDACTED
EXPLAIN ANALYZE VERBOSE SELECT "id", customer_id, amount
FROM orders
ORDER BY amount DESC
LIMIT 3;
-- Verify correctness
SELECT "id", customer_id, amount
FROM orders
ORDER BY amount DESC
LIMIT 3;
DROP TABLE orders;
-- Test 2.2: TopK with Projection Alias
-- USER NOTE: must make alias for topk's sort column to be able to test dynamic filter pushdown
CREATE TABLE orders ("id" INT, customer_id INT, amount DOUBLE, ts TIMESTAMP TIME INDEX);
-- Insert test data
INSERT INTO orders VALUES
(1, 1, 100.0, '2024-01-01 00:00:00'),
(2, 2, 200.0, '2024-01-02 00:00:00'),
(3, 3, 300.0, '2024-01-03 00:00:00'),
(4, 1, 150.0, '2024-01-04 00:00:00'),
(5, 2, 250.0, '2024-01-05 00:00:00'),
(6, 1, 175.0, '2024-01-06 00:00:00'),
(7, 3, 325.0, '2024-01-07 00:00:00'),
(8, 2, 225.0, '2024-01-08 00:00:00');
-- Test: TopK with projection alias
-- The alias on amount tests that dynamic filter correctly handles column references after projection
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN SELECT "id", customer_id, total_amount
FROM (SELECT "id", customer_id, amount as total_amount, ts FROM orders)
ORDER BY total_amount DESC
LIMIT 3;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (=Hash.*) =REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
-- some order problem between standalone&distributed, comment out for now
-- EXPLAIN ANALYZE SELECT "id", customer_id, total_amount
-- FROM (SELECT "id", customer_id, amount as total_amount, ts FROM orders)
-- ORDER BY total_amount DESC
-- LIMIT 3;
-- Verify correctness
SELECT "id", customer_id, total_amount
FROM (SELECT "id", customer_id, amount as total_amount, ts FROM orders)
ORDER BY total_amount DESC
LIMIT 3;
DROP TABLE orders;

View File

@@ -56,7 +56,7 @@ LIMIT 2;
| 1_| 0_|_ProjectionExec: expr=[vec_id@0 as vec_id] metrics=REDACTED_|
|_|_|_SortPreservingMergeExec: [vec_l2sq_distance(embedding@1, [1.0, 0.0]) ASC NULLS LAST, vec_id@0 ASC NULLS LAST], fetch=2 metrics=REDACTED_|
|_|_|_SortExec: TopK(fetch=2), expr=[vec_l2sq_distance(embedding@1, [1.0, 0.0]) ASC NULLS LAST, vec_id@0 ASC NULLS LAST], preserve_partitioning=[true] metrics=REDACTED_|
|_|_|_SeqScan: region=REDACTED, {"partition_count":{"count":1, "mem_ranges":0, "files":1, "file_ranges":1}, "projection": ["vec_id", "embedding"], "vector_index_k": 2, "files": [{"file_id":"REDACTED","time_range_start":"REDACTED","time_range_end":"REDACTED","rows":4,"size":REDACTED,"index_size":893}], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED |
|_|_|_SeqScan: region=REDACTED, {"partition_count":{"count":1, "mem_ranges":0, "files":1, "file_ranges":1}, "projection": ["vec_id", "embedding"], "dyn_filters": ["DynamicFilter [ vec_l2sq_distance(embedding@1, [1.0, 0.0]) < 0.010000004 OR vec_l2sq_distance(embedding@1, [1.0, 0.0]) = 0.010000004 AND vec_id@0 < 2 ]"], "vector_index_k": 2, "files": [{"file_id":"REDACTED","time_range_start":"REDACTED","time_range_end":"REDACTED","rows":4,"size":REDACTED,"index_size":893}], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED |
|_|_|_|
|_|_| Total rows: REDACTED_|
+-+-+-+
@@ -217,7 +217,7 @@ LIMIT 2;
| 1_| 0_|_ProjectionExec: expr=[vec_id@0 as vec_id] metrics=REDACTED_|
|_|_|_SortPreservingMergeExec: [vec_cos_distance(embedding@1, [1.0, 0.0]) ASC NULLS LAST, vec_id@0 ASC NULLS LAST], fetch=2 metrics=REDACTED_|
|_|_|_SortExec: TopK(fetch=2), expr=[vec_cos_distance(embedding@1, [1.0, 0.0]) ASC NULLS LAST, vec_id@0 ASC NULLS LAST], preserve_partitioning=[true] metrics=REDACTED_|
|_|_|_SeqScan: region=REDACTED, {"partition_count":{"count":1, "mem_ranges":0, "files":1, "file_ranges":1}, "projection": ["vec_id", "embedding"], "vector_index_k": 2, "files": [{"file_id":"REDACTED","time_range_start":"REDACTED","time_range_end":"REDACTED","rows":4,"size":REDACTED,"index_size":895}], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED |
|_|_|_SeqScan: region=REDACTED, {"partition_count":{"count":1, "mem_ranges":0, "files":1, "file_ranges":1}, "projection": ["vec_id", "embedding"], "dyn_filters": ["DynamicFilter [ vec_cos_distance(embedding@1, [1.0, 0.0]) < 1 OR vec_cos_distance(embedding@1, [1.0, 0.0]) = 1 AND vec_id@0 < 4 ]"], "vector_index_k": 2, "files": [{"file_id":"REDACTED","time_range_start":"REDACTED","time_range_end":"REDACTED","rows":4,"size":REDACTED,"index_size":895}], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED |
|_|_|_|
|_|_| Total rows: REDACTED_|
+-+-+-+
@@ -251,7 +251,7 @@ LIMIT 2;
| 1_| 0_|_ProjectionExec: expr=[vec_id@0 as vec_id] metrics=REDACTED_|
|_|_|_SortPreservingMergeExec: [vec_dot_product(embedding@1, [1.0, 0.0]) DESC, vec_id@0 ASC NULLS LAST], fetch=2 metrics=REDACTED_|
|_|_|_SortExec: TopK(fetch=2), expr=[vec_dot_product(embedding@1, [1.0, 0.0]) DESC, vec_id@0 ASC NULLS LAST], preserve_partitioning=[true] metrics=REDACTED_|
|_|_|_SeqScan: region=REDACTED, {"partition_count":{"count":1, "mem_ranges":0, "files":1, "file_ranges":1}, "projection": ["vec_id", "embedding"], "vector_index_k": 2, "files": [{"file_id":"REDACTED","time_range_start":"REDACTED","time_range_end":"REDACTED","rows":4,"size":REDACTED,"index_size":895}], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED |
|_|_|_SeqScan: region=REDACTED, {"partition_count":{"count":1, "mem_ranges":0, "files":1, "file_ranges":1}, "projection": ["vec_id", "embedding"], "dyn_filters": ["DynamicFilter [ vec_dot_product(embedding@1, [1.0, 0.0]) IS NULL OR vec_dot_product(embedding@1, [1.0, 0.0]) > 0 OR vec_dot_product(embedding@1, [1.0, 0.0]) = 0 AND vec_id@0 < 2 ]"], "vector_index_k": 2, "files": [{"file_id":"REDACTED","time_range_start":"REDACTED","time_range_end":"REDACTED","rows":4,"size":REDACTED,"index_size":895}], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED |
|_|_|_|
|_|_| Total rows: REDACTED_|
+-+-+-+