fix: prevent filter pushdown in distributed planner (#1806)

* fix: prevent filter pushdown in distributed planner

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix metadata

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-06-21 16:25:50 +08:00
committed by GitHub
parent d1b5ce0d35
commit b1ccc7ef5d
4 changed files with 105 additions and 4 deletions

View File

@@ -14,7 +14,7 @@
use std::sync::Arc;
use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode};
use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNode};
use promql::extension_plan::{
EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize,
};
@@ -37,7 +37,8 @@ impl Categorizer {
pub fn check_plan(plan: &LogicalPlan) -> Commutativity {
match plan {
LogicalPlan::Projection(_) => Commutativity::Unimplemented,
LogicalPlan::Filter(_) => Commutativity::Commutative,
// TODO(ruihang): Change this to Commutative once Like is supported in substrait
LogicalPlan::Filter(filter) => Self::check_expr(&filter.predicate),
LogicalPlan::Window(_) => Commutativity::Unimplemented,
LogicalPlan::Aggregate(_) => {
// check all children exprs and uses the strictest level
@@ -85,6 +86,50 @@ impl Categorizer {
_ => Commutativity::Unsupported,
}
}
pub fn check_expr(expr: &Expr) -> Commutativity {
match expr {
Expr::Alias(_, _)
| Expr::Column(_)
| Expr::ScalarVariable(_, _)
| Expr::Literal(_)
| Expr::BinaryExpr(_)
| Expr::Not(_)
| Expr::IsNotNull(_)
| Expr::IsNull(_)
| Expr::IsTrue(_)
| Expr::IsFalse(_)
| Expr::IsNotTrue(_)
| Expr::IsNotFalse(_)
| Expr::Negative(_)
| Expr::Between(_)
| Expr::Sort(_)
| Expr::Exists(_) => Commutativity::Commutative,
Expr::Like(_)
| Expr::ILike(_)
| Expr::SimilarTo(_)
| Expr::IsUnknown(_)
| Expr::IsNotUnknown(_)
| Expr::GetIndexedField(_)
| Expr::Case(_)
| Expr::Cast(_)
| Expr::TryCast(_)
| Expr::ScalarFunction(_)
| Expr::ScalarUDF(_)
| Expr::AggregateFunction(_)
| Expr::WindowFunction(_)
| Expr::AggregateUDF(_)
| Expr::InList(_)
| Expr::InSubquery(_)
| Expr::ScalarSubquery(_)
| Expr::Wildcard => Commutativity::Unimplemented,
Expr::QualifiedWildcard { .. }
| Expr::GroupingSet(_)
| Expr::Placeholder(_)
| Expr::OuterReferenceColumn(_, _) => Commutativity::Unimplemented,
}
}
}
pub type Transformer = Arc<dyn Fn(&LogicalPlan) -> Option<LogicalPlan>>;

View File

@@ -90,11 +90,23 @@ SELECT i1.i,i2.i FROM integers i1 LEFT OUTER JOIN integers i2 ON 1=1 WHERE i1.i=
SELECT * FROM integers WHERE i IN ((SELECT i FROM integers)) ORDER BY i;
Error: 3001(EngineExecuteQuery), No field named __correlated_sq_1.i. Valid fields are integers.i, integers.j.
+---+---+
| i | j |
+---+---+
| 1 | 1 |
| 2 | 2 |
| 3 | 3 |
+---+---+
SELECT * FROM integers WHERE i NOT IN ((SELECT i FROM integers WHERE i=1)) ORDER BY i;
Error: 3001(EngineExecuteQuery), No field named __correlated_sq_2.i. Valid fields are integers.i, integers.j.
+---+---+
| i | j |
+---+---+
| 2 | 2 |
| 3 | 3 |
| | 4 |
+---+---+
SELECT * FROM integers WHERE i IN ((SELECT i FROM integers)) AND i<3 ORDER BY i;

View File

@@ -0,0 +1,29 @@
CREATE TABLE host (
ts TIMESTAMP(3) TIME INDEX,
host STRING PRIMARY KEY,
val DOUBLE,
);
Affected Rows: 0
INSERT INTO TABLE host VALUES
(0, 'a+b', 1.0),
(1, 'b+c', 2.0),
(2, 'a', 3.0),
(3, 'c', 4.0);
Affected Rows: 4
SELECT * FROM host WHERE host LIKE '%+%';
+-------------------------+------+-----+
| ts | host | val |
+-------------------------+------+-----+
| 1970-01-01T00:00:00 | a+b | 1.0 |
| 1970-01-01T00:00:00.001 | b+c | 2.0 |
+-------------------------+------+-----+
DROP TABLE host;
Affected Rows: 1

View File

@@ -0,0 +1,15 @@
CREATE TABLE host (
ts TIMESTAMP(3) TIME INDEX,
host STRING PRIMARY KEY,
val DOUBLE,
);
INSERT INTO TABLE host VALUES
(0, 'a+b', 1.0),
(1, 'b+c', 2.0),
(2, 'a', 3.0),
(3, 'c', 4.0);
SELECT * FROM host WHERE host LIKE '%+%';
DROP TABLE host;