From 8658d428e0ef084cfe5e071c38b91076fc326343 Mon Sep 17 00:00:00 2001 From: LFC Date: Tue, 14 Mar 2023 10:59:43 +0800 Subject: [PATCH] fix: failed to run subquery wrapped in two parentheses (#1157) --- Cargo.lock | 132 ++++++++++++------ Cargo.toml | 25 ++-- src/frontend/src/table.rs | 7 +- src/mito/src/table.rs | 4 +- src/promql/src/extension_plan/empty_metric.rs | 18 +-- .../src/extension_plan/instant_manipulate.rs | 20 ++- src/promql/src/extension_plan/normalize.rs | 20 ++- .../src/extension_plan/range_manipulate.rs | 20 ++- .../src/extension_plan/series_divide.rs | 20 ++- src/query/src/parser.rs | 2 +- src/query/src/tests/time_range_filter_test.rs | 7 +- src/table/Cargo.toml | 1 + src/table/src/metadata.rs | 21 +++ src/table/src/predicate.rs | 23 ++- src/table/src/table.rs | 6 +- src/table/src/table/adapter.rs | 34 +++-- .../optimizer/filter_push_down.result | 49 ------- .../optimizer/filter_push_down.sql | 15 -- .../optimizer/filter_push_down.result | 23 ++- .../optimizer/filter_push_down.sql | 0 20 files changed, 235 insertions(+), 212 deletions(-) delete mode 100644 tests/cases/distributed/optimizer/filter_push_down.result delete mode 100644 tests/cases/distributed/optimizer/filter_push_down.sql rename tests/cases/standalone/{ => common}/optimizer/filter_push_down.result (90%) rename tests/cases/standalone/{ => common}/optimizer/filter_push_down.sql (100%) diff --git a/Cargo.lock b/Cargo.lock index 827db7ade6..09b1856d3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -190,9 +190,9 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" [[package]] name = "arrow" -version = "33.0.0" +version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3724c874f1517cf898cd1c3ad18ab5071edf893c48e73139ab1e16cf0f2affe" +checksum = "f410d3907b6b3647b9e7bca4551274b2e3d716aa940afb67b7287257401da921" dependencies = [ "ahash 0.8.3", "arrow-arith", @@ -214,9 +214,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "33.0.0" +version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e958823b8383ca14d0a2e973de478dd7674cd9f72837f8c41c132a0fda6a4e5e" +checksum = "f87391cf46473c9bc53dab68cb8872c3a81d4dfd1703f1c8aa397dba9880a043" dependencies = [ "arrow-array", "arrow-buffer", @@ -229,9 +229,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "33.0.0" +version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db670eab50e76654065b5aed930f4367101fcddcb2223802007d1e0b4d5a2579" +checksum = "d35d5475e65c57cffba06d0022e3006b677515f99b54af33a7cd54f6cdd4a5b5" dependencies = [ "ahash 0.8.3", "arrow-buffer", @@ -245,9 +245,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "33.0.0" +version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f0e01c931882448c0407bd32311a624b9f099739e94e786af68adc97016b5f2" +checksum = "68b4ec72eda7c0207727df96cf200f539749d736b21f3e782ece113e18c1a0a7" dependencies = [ "half 2.2.1", "num", @@ -255,9 +255,9 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "33.0.0" +version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bf35d78836c93f80d9362f3ccb47ff5e2c5ecfc270ff42cdf1ef80334961d44" +checksum = "0a7285272c9897321dfdba59de29f5b05aeafd3cdedf104a941256d155f6d304" dependencies = [ "arrow-array", "arrow-buffer", @@ -271,9 +271,9 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "33.0.0" +version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a6aa7c2531d89d01fed8c469a9b1bf97132a0bdf70b4724fe4bbb4537a50880" +checksum = "981ee4e7f6a120da04e00d0b39182e1eeacccb59c8da74511de753c56b7fddf7" dependencies = [ "arrow-array", "arrow-buffer", @@ -290,9 +290,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "33.0.0" +version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea50db4d1e1e4c2da2bfdea7b6d2722eef64267d5ab680d815f7ae42428057f5" +checksum = "27cc673ee6989ea6e4b4e8c7d461f7e06026a096c8f0b1a7288885ff71ae1e56" dependencies = [ "arrow-buffer", "arrow-schema", @@ -302,9 +302,9 @@ dependencies = [ [[package]] name = "arrow-flight" -version = "33.0.0" +version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ad4c883d509d89f05b2891ad889729f17ab2191b5fd22b0cf3660a28cc40af5" +checksum = "bd16945f8f3be0f6170b8ced60d414e56239d91a16a3f8800bc1504bc58b2592" dependencies = [ "arrow-array", "arrow-buffer", @@ -325,9 +325,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "33.0.0" +version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4042fe6585155d1ec28a8e4937ec901a3ca7a19a22b9f6cd3f551b935cd84f5" +checksum = "e37b8b69d9e59116b6b538e8514e0ec63a30f08b617ce800d31cb44e3ef64c1a" dependencies = [ "arrow-array", "arrow-buffer", @@ -339,9 +339,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "33.0.0" +version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c907c4ab4f26970a3719dc06e78e8054a01d0c96da3664d23b941e201b33d2b" +checksum = "80c3fa0bed7cfebf6d18e46b733f9cb8a1cb43ce8e6539055ca3e1e48a426266" dependencies = [ "arrow-array", "arrow-buffer", @@ -358,9 +358,9 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "33.0.0" +version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e131b447242a32129efc7932f58ed8931b42f35d8701c1a08f9f524da13b1d3c" +checksum = "d247dce7bed6a8d6a3c6debfa707a3a2f694383f0c692a39d736a593eae5ef94" dependencies = [ "arrow-array", "arrow-buffer", @@ -372,9 +372,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "33.0.0" +version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b591ef70d76f4ac28dd7666093295fece0e5f9298f49af51ea49c001e1635bb6" +checksum = "8d609c0181f963cea5c70fddf9a388595b5be441f3aa1d1cdbf728ca834bbd3a" dependencies = [ "ahash 0.8.3", "arrow-array", @@ -387,9 +387,9 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "33.0.0" +version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb327717d87eb94be5eff3b0cb8987f54059d343ee5235abf7f143c85f54cfc8" +checksum = "64951898473bfb8e22293e83a44f02874d2257514d49cd95f9aa4afcff183fbc" dependencies = [ "bitflags", "serde", @@ -397,9 +397,9 @@ dependencies = [ [[package]] name = "arrow-select" -version = "33.0.0" +version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79d3c389d1cea86793934f31594f914c8547d82e91e3411d4833ad0aac3266a7" +checksum = "2a513d89c2e1ac22b28380900036cf1f3992c6443efc5e079de631dcf83c6888" dependencies = [ "arrow-array", "arrow-buffer", @@ -410,9 +410,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "33.0.0" +version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30ee67790496dd310ddbf5096870324431e89aa76453e010020ac29b1184d356" +checksum = "5288979b2705dae1114c864d73150629add9153b9b8f1d7ee3963db94c372ba5" dependencies = [ "arrow-array", "arrow-buffer", @@ -477,6 +477,8 @@ dependencies = [ "pin-project-lite", "tokio", "xz2", + "zstd 0.11.2+zstd.1.5.2", + "zstd-safe 5.0.2+zstd.1.5.2", ] [[package]] @@ -2106,7 +2108,7 @@ dependencies = [ [[package]] name = "datafusion" version = "19.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=fad360df0132a2fcb264a7c07b2b02f0b1dfc644#fad360df0132a2fcb264a7c07b2b02f0b1dfc644" +source = "git+https://github.com/MichaelScofield/arrow-datafusion.git?rev=d7b3c730049f2561755f9d855f638cb580c38eff#d7b3c730049f2561755f9d855f638cb580c38eff" dependencies = [ "ahash 0.8.3", "arrow", @@ -2117,6 +2119,7 @@ dependencies = [ "chrono", "dashmap", "datafusion-common", + "datafusion-execution", "datafusion-expr", "datafusion-optimizer", "datafusion-physical-expr", @@ -2147,12 +2150,13 @@ dependencies = [ "url", "uuid", "xz2", + "zstd 0.12.3+zstd.1.5.2", ] [[package]] name = "datafusion-common" version = "19.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=fad360df0132a2fcb264a7c07b2b02f0b1dfc644#fad360df0132a2fcb264a7c07b2b02f0b1dfc644" +source = "git+https://github.com/MichaelScofield/arrow-datafusion.git?rev=d7b3c730049f2561755f9d855f638cb580c38eff#d7b3c730049f2561755f9d855f638cb580c38eff" dependencies = [ "arrow", "chrono", @@ -2162,10 +2166,27 @@ dependencies = [ "sqlparser", ] +[[package]] +name = "datafusion-execution" +version = "19.0.0" +source = "git+https://github.com/MichaelScofield/arrow-datafusion.git?rev=d7b3c730049f2561755f9d855f638cb580c38eff#d7b3c730049f2561755f9d855f638cb580c38eff" +dependencies = [ + "dashmap", + "datafusion-common", + "datafusion-expr", + "hashbrown 0.13.2", + "log", + "object_store", + "parking_lot", + "rand", + "tempfile", + "url", +] + [[package]] name = "datafusion-expr" version = "19.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=fad360df0132a2fcb264a7c07b2b02f0b1dfc644#fad360df0132a2fcb264a7c07b2b02f0b1dfc644" +source = "git+https://github.com/MichaelScofield/arrow-datafusion.git?rev=d7b3c730049f2561755f9d855f638cb580c38eff#d7b3c730049f2561755f9d855f638cb580c38eff" dependencies = [ "ahash 0.8.3", "arrow", @@ -2177,7 +2198,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "19.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=fad360df0132a2fcb264a7c07b2b02f0b1dfc644#fad360df0132a2fcb264a7c07b2b02f0b1dfc644" +source = "git+https://github.com/MichaelScofield/arrow-datafusion.git?rev=d7b3c730049f2561755f9d855f638cb580c38eff#d7b3c730049f2561755f9d855f638cb580c38eff" dependencies = [ "arrow", "async-trait", @@ -2194,7 +2215,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "19.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=fad360df0132a2fcb264a7c07b2b02f0b1dfc644#fad360df0132a2fcb264a7c07b2b02f0b1dfc644" +source = "git+https://github.com/MichaelScofield/arrow-datafusion.git?rev=d7b3c730049f2561755f9d855f638cb580c38eff#d7b3c730049f2561755f9d855f638cb580c38eff" dependencies = [ "ahash 0.8.3", "arrow", @@ -2214,6 +2235,7 @@ dependencies = [ "md-5", "num-traits", "paste", + "petgraph", "rand", "regex", "sha2", @@ -2224,7 +2246,7 @@ dependencies = [ [[package]] name = "datafusion-row" version = "19.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=fad360df0132a2fcb264a7c07b2b02f0b1dfc644#fad360df0132a2fcb264a7c07b2b02f0b1dfc644" +source = "git+https://github.com/MichaelScofield/arrow-datafusion.git?rev=d7b3c730049f2561755f9d855f638cb580c38eff#d7b3c730049f2561755f9d855f638cb580c38eff" dependencies = [ "arrow", "datafusion-common", @@ -2235,7 +2257,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "19.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=fad360df0132a2fcb264a7c07b2b02f0b1dfc644#fad360df0132a2fcb264a7c07b2b02f0b1dfc644" +source = "git+https://github.com/MichaelScofield/arrow-datafusion.git?rev=d7b3c730049f2561755f9d855f638cb580c38eff#d7b3c730049f2561755f9d855f638cb580c38eff" dependencies = [ "arrow-schema", "datafusion-common", @@ -4785,9 +4807,9 @@ dependencies = [ [[package]] name = "parquet" -version = "33.0.0" +version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1b076829801167d889795cd1957989055543430fa1469cb1f6e32b789bfc764" +checksum = "7ac135ecf63ebb5f53dda0921b0b76d6048b3ef631a5f4760b9e8f863ff00cfa" dependencies = [ "ahash 0.8.3", "arrow-array", @@ -4813,7 +4835,7 @@ dependencies = [ "thrift 0.17.0", "tokio", "twox-hash", - "zstd", + "zstd 0.12.3+zstd.1.5.2", ] [[package]] @@ -5406,9 +5428,9 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.11.6" +version = "0.11.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3f8ad728fb08fe212df3c05169e940fbb6d9d16a877ddde14644a983ba2012e" +checksum = "a24be1d23b4552a012093e1b93697b73d644ae9590e3253d878d0e77d411b614" dependencies = [ "bytes", "heck 0.4.1", @@ -7201,9 +7223,9 @@ dependencies = [ [[package]] name = "sqlparser" -version = "0.30.0" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db67dc6ef36edb658196c3fef0464a80b53dbbc194a904e81f9bd4190f9ecc5b" +checksum = "0366f270dbabb5cc2e4c88427dc4c08bba144f81e32fbd459a013f26a4d16aa0" dependencies = [ "log", "sqlparser_derive", @@ -7553,6 +7575,7 @@ dependencies = [ "datafusion", "datafusion-common", "datafusion-expr", + "datafusion-physical-expr", "datatypes", "derive_builder 0.11.2", "futures", @@ -9097,13 +9120,32 @@ version = "1.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c394b5bd0c6f669e7275d9c20aa90ae064cb22e75a1cad54e1b34088034b149f" +[[package]] +name = "zstd" +version = "0.11.2+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20cc960326ece64f010d2d2107537f26dc589a6573a316bd5b1dba685fa5fde4" +dependencies = [ + "zstd-safe 5.0.2+zstd.1.5.2", +] + [[package]] name = "zstd" version = "0.12.3+zstd.1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76eea132fb024e0e13fd9c2f5d5d595d8a967aa72382ac2f9d39fcc95afd0806" dependencies = [ - "zstd-safe", + "zstd-safe 6.0.4+zstd.1.5.4", +] + +[[package]] +name = "zstd-safe" +version = "5.0.2+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d2a5585e04f9eea4b2a3d1eca508c4dee9592a89ef6f450c11719da0726f4db" +dependencies = [ + "libc", + "zstd-sys", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index b1ee1d0e62..fdb5366910 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,28 +50,29 @@ edition = "2021" license = "Apache-2.0" [workspace.dependencies] -arrow = { version = "33.0" } -arrow-array = "33.0" -arrow-flight = "33.0" -arrow-schema = { version = "33.0", features = ["serde"] } +arrow = { version = "34.0" } +arrow-array = "34.0" +arrow-flight = "34.0" +arrow-schema = { version = "34.0", features = ["serde"] } async-stream = "0.3" async-trait = "0.1" chrono = { version = "0.4", features = ["serde"] } -datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "fad360df0132a2fcb264a7c07b2b02f0b1dfc644" } -datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "fad360df0132a2fcb264a7c07b2b02f0b1dfc644" } -datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "fad360df0132a2fcb264a7c07b2b02f0b1dfc644" } -datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "fad360df0132a2fcb264a7c07b2b02f0b1dfc644" } -datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "fad360df0132a2fcb264a7c07b2b02f0b1dfc644" } -datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "fad360df0132a2fcb264a7c07b2b02f0b1dfc644" } +# TODO(LFC): Use official DataFusion, when https://github.com/apache/arrow-datafusion/pull/5542 got merged +datafusion = { git = "https://github.com/MichaelScofield/arrow-datafusion.git", rev = "d7b3c730049f2561755f9d855f638cb580c38eff" } +datafusion-common = { git = "https://github.com/MichaelScofield/arrow-datafusion.git", rev = "d7b3c730049f2561755f9d855f638cb580c38eff" } +datafusion-expr = { git = "https://github.com/MichaelScofield/arrow-datafusion.git", rev = "d7b3c730049f2561755f9d855f638cb580c38eff" } +datafusion-optimizer = { git = "https://github.com/MichaelScofield/arrow-datafusion.git", rev = "d7b3c730049f2561755f9d855f638cb580c38eff" } +datafusion-physical-expr = { git = "https://github.com/MichaelScofield/arrow-datafusion.git", rev = "d7b3c730049f2561755f9d855f638cb580c38eff" } +datafusion-sql = { git = "https://github.com/MichaelScofield/arrow-datafusion.git", rev = "d7b3c730049f2561755f9d855f638cb580c38eff" } futures = "0.3" futures-util = "0.3" -parquet = "33.0" +parquet = "34.0" paste = "1.0" prost = "0.11" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" snafu = { version = "0.7", features = ["backtraces"] } -sqlparser = "0.30" +sqlparser = "0.32" tempfile = "3" tokio = { version = "1.24.2", features = ["full"] } tokio-util = "0.7" diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index de13919ebc..b6b76c324b 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -140,8 +140,11 @@ impl Table for DistTable { Ok(Arc::new(dist_scan)) } - fn supports_filter_pushdown(&self, _filter: &Expr) -> table::Result { - Ok(FilterPushDownType::Inexact) + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> table::Result> { + Ok(vec![FilterPushDownType::Inexact; filters.len()]) } async fn alter(&self, context: AlterContext, request: &AlterTableRequest) -> table::Result<()> { diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index a940e49f2e..8bf9a568ec 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -208,8 +208,8 @@ impl Table for MitoTable { Ok(Arc::new(SimpleTableScan::new(stream))) } - fn supports_filter_pushdown(&self, _filter: &Expr) -> table::error::Result { - Ok(FilterPushDownType::Inexact) + fn supports_filters_pushdown(&self, filters: &[&Expr]) -> TableResult> { + Ok(vec![FilterPushDownType::Inexact; filters.len()]) } /// Alter table changes the schemas of the table. diff --git a/src/promql/src/extension_plan/empty_metric.rs b/src/promql/src/extension_plan/empty_metric.rs index 26ed6d508c..af646e585d 100644 --- a/src/promql/src/extension_plan/empty_metric.rs +++ b/src/promql/src/extension_plan/empty_metric.rs @@ -23,7 +23,7 @@ use datafusion::arrow::datatypes::{DataType, TimeUnit}; use datafusion::common::{DFField, DFSchema, DFSchemaRef, Result as DataFusionResult, Statistics}; use datafusion::error::DataFusionError; use datafusion::execution::context::TaskContext; -use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode}; +use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNodeCore}; use datafusion::physical_expr::PhysicalSortExpr; use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::{ @@ -37,7 +37,7 @@ use futures::Stream; use crate::extension_plan::Millisecond; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct EmptyMetric { start: Millisecond, end: Millisecond, @@ -86,9 +86,9 @@ impl EmptyMetric { } } -impl UserDefinedLogicalNode for EmptyMetric { - fn as_any(&self) -> &dyn Any { - self as _ +impl UserDefinedLogicalNodeCore for EmptyMetric { + fn name(&self) -> &str { + "EmptyMetric" } fn inputs(&self) -> Vec<&LogicalPlan> { @@ -111,12 +111,8 @@ impl UserDefinedLogicalNode for EmptyMetric { ) } - fn from_template( - &self, - _exprs: &[datafusion::prelude::Expr], - _inputs: &[LogicalPlan], - ) -> Arc { - Arc::new(self.clone()) + fn from_template(&self, _expr: &[Expr], _inputs: &[LogicalPlan]) -> Self { + self.clone() } } diff --git a/src/promql/src/extension_plan/instant_manipulate.rs b/src/promql/src/extension_plan/instant_manipulate.rs index 2f362e93c9..1b6e980394 100644 --- a/src/promql/src/extension_plan/instant_manipulate.rs +++ b/src/promql/src/extension_plan/instant_manipulate.rs @@ -24,7 +24,7 @@ use datafusion::arrow::record_batch::RecordBatch; use datafusion::common::DFSchemaRef; use datafusion::error::{DataFusionError, Result as DataFusionResult}; use datafusion::execution::context::TaskContext; -use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNode}; +use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore}; use datafusion::physical_expr::PhysicalSortExpr; use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::{ @@ -42,7 +42,7 @@ use crate::extension_plan::Millisecond; /// This plan will try to align the input time series, for every timestamp between /// `start` and `end` with step `interval`. Find in the `lookback` range if data /// is missing at the given timestamp. -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq, Hash)] pub struct InstantManipulate { start: Millisecond, end: Millisecond, @@ -52,9 +52,9 @@ pub struct InstantManipulate { input: LogicalPlan, } -impl UserDefinedLogicalNode for InstantManipulate { - fn as_any(&self) -> &dyn Any { - self as _ +impl UserDefinedLogicalNodeCore for InstantManipulate { + fn name(&self) -> &str { + "InstantManipulate" } fn inputs(&self) -> Vec<&LogicalPlan> { @@ -77,21 +77,17 @@ impl UserDefinedLogicalNode for InstantManipulate { ) } - fn from_template( - &self, - _exprs: &[Expr], - inputs: &[LogicalPlan], - ) -> Arc { + fn from_template(&self, _exprs: &[Expr], inputs: &[LogicalPlan]) -> Self { assert!(!inputs.is_empty()); - Arc::new(Self { + Self { start: self.start, end: self.end, lookback_delta: self.lookback_delta, interval: self.interval, time_index_column: self.time_index_column.clone(), input: inputs[0].clone(), - }) + } } } diff --git a/src/promql/src/extension_plan/normalize.rs b/src/promql/src/extension_plan/normalize.rs index ea10bd4ee3..b4e0168819 100644 --- a/src/promql/src/extension_plan/normalize.rs +++ b/src/promql/src/extension_plan/normalize.rs @@ -22,7 +22,7 @@ use datafusion::arrow::compute; use datafusion::common::{DFSchemaRef, Result as DataFusionResult, Statistics}; use datafusion::error::DataFusionError; use datafusion::execution::context::TaskContext; -use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode}; +use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore}; use datafusion::physical_expr::PhysicalSortExpr; use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::{ @@ -43,7 +43,7 @@ use crate::extension_plan::Millisecond; /// - bias sample's timestamp by offset /// - sort the record batch based on timestamp column /// - remove NaN values -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq, Hash)] pub struct SeriesNormalize { offset: Millisecond, time_index_column_name: String, @@ -51,9 +51,9 @@ pub struct SeriesNormalize { input: LogicalPlan, } -impl UserDefinedLogicalNode for SeriesNormalize { - fn as_any(&self) -> &dyn Any { - self as _ +impl UserDefinedLogicalNodeCore for SeriesNormalize { + fn name(&self) -> &str { + "SeriesNormalize" } fn inputs(&self) -> Vec<&LogicalPlan> { @@ -76,18 +76,14 @@ impl UserDefinedLogicalNode for SeriesNormalize { ) } - fn from_template( - &self, - _exprs: &[datafusion::logical_expr::Expr], - inputs: &[LogicalPlan], - ) -> Arc { + fn from_template(&self, _exprs: &[Expr], inputs: &[LogicalPlan]) -> Self { assert!(!inputs.is_empty()); - Arc::new(Self { + Self { offset: self.offset, time_index_column_name: self.time_index_column_name.clone(), input: inputs[0].clone(), - }) + } } } diff --git a/src/promql/src/extension_plan/range_manipulate.rs b/src/promql/src/extension_plan/range_manipulate.rs index b668632310..f39fbcd4b4 100644 --- a/src/promql/src/extension_plan/range_manipulate.rs +++ b/src/promql/src/extension_plan/range_manipulate.rs @@ -26,7 +26,7 @@ use datafusion::arrow::record_batch::RecordBatch; use datafusion::common::{DFField, DFSchema, DFSchemaRef}; use datafusion::error::{DataFusionError, Result as DataFusionResult}; use datafusion::execution::context::TaskContext; -use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNode}; +use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore}; use datafusion::physical_expr::PhysicalSortExpr; use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::{ @@ -42,7 +42,7 @@ use crate::range_array::RangeArray; /// /// This plan will "fold" time index and value columns into [RangeArray]s, and truncate /// other columns to the same length with the "folded" [RangeArray] column. -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq, Hash)] pub struct RangeManipulate { start: Millisecond, end: Millisecond, @@ -137,9 +137,9 @@ impl RangeManipulate { } } -impl UserDefinedLogicalNode for RangeManipulate { - fn as_any(&self) -> &dyn Any { - self as _ +impl UserDefinedLogicalNodeCore for RangeManipulate { + fn name(&self) -> &str { + "RangeManipulate" } fn inputs(&self) -> Vec<&LogicalPlan> { @@ -162,14 +162,10 @@ impl UserDefinedLogicalNode for RangeManipulate { ) } - fn from_template( - &self, - _exprs: &[Expr], - inputs: &[LogicalPlan], - ) -> Arc { + fn from_template(&self, _exprs: &[Expr], inputs: &[LogicalPlan]) -> Self { assert!(!inputs.is_empty()); - Arc::new(Self { + Self { start: self.start, end: self.end, interval: self.interval, @@ -178,7 +174,7 @@ impl UserDefinedLogicalNode for RangeManipulate { value_columns: self.value_columns.clone(), input: inputs[0].clone(), output_schema: self.output_schema.clone(), - }) + } } } diff --git a/src/promql/src/extension_plan/series_divide.rs b/src/promql/src/extension_plan/series_divide.rs index e1261a415f..55c2916b15 100644 --- a/src/promql/src/extension_plan/series_divide.rs +++ b/src/promql/src/extension_plan/series_divide.rs @@ -23,7 +23,7 @@ use datafusion::arrow::record_batch::RecordBatch; use datafusion::common::DFSchemaRef; use datafusion::error::Result as DataFusionResult; use datafusion::execution::context::TaskContext; -use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNode}; +use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore}; use datafusion::physical_expr::PhysicalSortExpr; use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::{ @@ -33,15 +33,15 @@ use datafusion::physical_plan::{ use datatypes::arrow::compute; use futures::{ready, Stream, StreamExt}; -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq, Hash)] pub struct SeriesDivide { tag_columns: Vec, input: LogicalPlan, } -impl UserDefinedLogicalNode for SeriesDivide { - fn as_any(&self) -> &dyn Any { - self as _ +impl UserDefinedLogicalNodeCore for SeriesDivide { + fn name(&self) -> &str { + "SeriesDivide" } fn inputs(&self) -> Vec<&LogicalPlan> { @@ -60,17 +60,13 @@ impl UserDefinedLogicalNode for SeriesDivide { write!(f, "PromSeriesDivide: tags={:?}", self.tag_columns) } - fn from_template( - &self, - _exprs: &[Expr], - inputs: &[LogicalPlan], - ) -> Arc { + fn from_template(&self, _exprs: &[Expr], inputs: &[LogicalPlan]) -> Self { assert!(!inputs.is_empty()); - Arc::new(Self { + Self { tag_columns: self.tag_columns.clone(), input: inputs[0].clone(), - }) + } } } diff --git a/src/query/src/parser.rs b/src/query/src/parser.rs index d2d1b73bbf..e7559f03e7 100644 --- a/src/query/src/parser.rs +++ b/src/query/src/parser.rs @@ -157,7 +157,7 @@ mod test { distinct: false, \ top: None, \ projection: \ - [Wildcard(WildcardAdditionalOptions { opt_exclude: None, opt_except: None, opt_rename: None })], \ + [Wildcard(WildcardAdditionalOptions { opt_exclude: None, opt_except: None, opt_rename: None, opt_replace: None })], \ into: None, \ from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: \"t1\", quote_style: None }]\ ), \ diff --git a/src/query/src/tests/time_range_filter_test.rs b/src/query/src/tests/time_range_filter_test.rs index 3a06e0f525..e0f5a458fc 100644 --- a/src/query/src/tests/time_range_filter_test.rs +++ b/src/query/src/tests/time_range_filter_test.rs @@ -70,8 +70,11 @@ impl Table for MemTableWrapper { self.inner.scan(projection, filters, limit).await } - fn supports_filter_pushdown(&self, _filter: &Expr) -> table::Result { - Ok(FilterPushDownType::Exact) + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> table::Result> { + Ok(vec![FilterPushDownType::Exact; filters.len()]) } } diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 41c44cb6e3..b0c5b1ee98 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -19,6 +19,7 @@ common-time = { path = "../common/time" } datafusion.workspace = true datafusion-common.workspace = true datafusion-expr.workspace = true +datafusion-physical-expr.workspace = true datatypes = { path = "../datatypes" } derive_builder = "0.11" futures.workspace = true diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index bd2e5c6fb7..55f660ca76 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use chrono::{DateTime, Utc}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use datafusion_expr::TableProviderFilterPushDown; pub use datatypes::error::{Error as ConvertError, Result as ConvertResult}; use datatypes::schema::{ColumnSchema, RawSchema, Schema, SchemaBuilder, SchemaRef}; use derive_builder::Builder; @@ -47,6 +48,26 @@ pub enum FilterPushDownType { Exact, } +impl From for FilterPushDownType { + fn from(value: TableProviderFilterPushDown) -> Self { + match value { + TableProviderFilterPushDown::Unsupported => FilterPushDownType::Unsupported, + TableProviderFilterPushDown::Inexact => FilterPushDownType::Inexact, + TableProviderFilterPushDown::Exact => FilterPushDownType::Exact, + } + } +} + +impl From for TableProviderFilterPushDown { + fn from(value: FilterPushDownType) -> Self { + match value { + FilterPushDownType::Unsupported => TableProviderFilterPushDown::Unsupported, + FilterPushDownType::Inexact => TableProviderFilterPushDown::Inexact, + FilterPushDownType::Exact => TableProviderFilterPushDown::Exact, + } + } +} + /// Indicates the type of this table for metadata/catalog purposes. #[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)] pub enum TableType { diff --git a/src/table/src/predicate.rs b/src/table/src/predicate.rs index b6ede3491b..e5153bee11 100644 --- a/src/table/src/predicate.rs +++ b/src/table/src/predicate.rs @@ -18,7 +18,10 @@ use common_time::range::TimestampRange; use common_time::Timestamp; use datafusion::parquet::file::metadata::RowGroupMetaData; use datafusion::physical_optimizer::pruning::PruningPredicate; +use datafusion_common::ToDFSchema; use datafusion_expr::{Between, BinaryExpr, Operator}; +use datafusion_physical_expr::create_physical_expr; +use datafusion_physical_expr::execution_props::ExecutionProps; use datatypes::schema::SchemaRef; use datatypes::value::scalar_value_to_timestamp; @@ -46,8 +49,26 @@ impl Predicate { row_groups: &[RowGroupMetaData], ) -> Vec { let mut res = vec![true; row_groups.len()]; + let arrow_schema = (*schema.arrow_schema()).clone(); + let df_schema = arrow_schema.clone().to_dfschema_ref(); + let df_schema = match df_schema { + Ok(x) => x, + Err(e) => { + warn!("Failed to create Datafusion schema when trying to prune row groups, error: {e}"); + return res; + } + }; + + let execution_props = &ExecutionProps::new(); for expr in &self.exprs { - match PruningPredicate::try_new(expr.df_expr().clone(), schema.arrow_schema().clone()) { + match create_physical_expr( + expr.df_expr(), + df_schema.as_ref(), + arrow_schema.as_ref(), + execution_props, + ) + .and_then(|expr| PruningPredicate::try_new(expr, arrow_schema.clone())) + { Ok(p) => { let stat = RowGroupPruningStatistics::new(row_groups, &schema); match p.prune(&stat) { diff --git a/src/table/src/table.rs b/src/table/src/table.rs index 8684da9aff..223f132dda 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -70,10 +70,10 @@ pub trait Table: Send + Sync { limit: Option, ) -> Result; - /// Tests whether the table provider can make use of a filter expression + /// Tests whether the table provider can make use of any or all filter expressions /// to optimise data retrieval. - fn supports_filter_pushdown(&self, _filter: &Expr) -> Result { - Ok(FilterPushDownType::Unsupported) + fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result> { + Ok(vec![FilterPushDownType::Unsupported; filters.len()]) } /// Alter table. diff --git a/src/table/src/table/adapter.rs b/src/table/src/table/adapter.rs index 5ed7ffd36a..15c915fe9c 100644 --- a/src/table/src/table/adapter.rs +++ b/src/table/src/table/adapter.rs @@ -78,15 +78,18 @@ impl TableProvider for DfTableProviderAdapter { Ok(Arc::new(DfPhysicalPlanAdapter(inner))) } - fn supports_filter_pushdown(&self, filter: &DfExpr) -> DfResult { - let p = self + fn supports_filters_pushdown( + &self, + filters: &[&DfExpr], + ) -> DfResult> { + let filters = filters + .iter() + .map(|&x| x.clone().into()) + .collect::>(); + Ok(self .table - .supports_filter_pushdown(&filter.clone().into())?; - match p { - FilterPushDownType::Unsupported => Ok(DfTableProviderFilterPushDown::Unsupported), - FilterPushDownType::Inexact => Ok(DfTableProviderFilterPushDown::Inexact), - FilterPushDownType::Exact => Ok(DfTableProviderFilterPushDown::Exact), - } + .supports_filters_pushdown(&filters.iter().collect::>()) + .map(|v| v.into_iter().map(Into::into).collect::>())?) } } @@ -155,16 +158,11 @@ impl Table for TableAdapter { Ok(Arc::new(PhysicalPlanAdapter::new(schema, execution_plan))) } - fn supports_filter_pushdown(&self, filter: &Expr) -> Result { - match self - .table_provider - .supports_filter_pushdown(filter.df_expr()) - .context(error::DatafusionSnafu)? - { - DfTableProviderFilterPushDown::Unsupported => Ok(FilterPushDownType::Unsupported), - DfTableProviderFilterPushDown::Inexact => Ok(FilterPushDownType::Inexact), - DfTableProviderFilterPushDown::Exact => Ok(FilterPushDownType::Exact), - } + fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result> { + self.table_provider + .supports_filters_pushdown(&filters.iter().map(|x| x.df_expr()).collect::>()) + .context(error::DatafusionSnafu) + .map(|v| v.into_iter().map(Into::into).collect::>()) } } diff --git a/tests/cases/distributed/optimizer/filter_push_down.result b/tests/cases/distributed/optimizer/filter_push_down.result deleted file mode 100644 index 9f89961d50..0000000000 --- a/tests/cases/distributed/optimizer/filter_push_down.result +++ /dev/null @@ -1,49 +0,0 @@ -CREATE TABLE integers(i INTEGER, j BIGINT TIME INDEX); - -Affected Rows: 0 - -INSERT INTO integers VALUES (1, 1), (2, 2), (3, 3), (NULL, 4); - -Affected Rows: 4 - -SELECT i1.i, i2.i FROM integers i1, integers i2 WHERE i1.i=i2.i ORDER BY 1; - -+---+---+ -| i | i | -+---+---+ -| 1 | 1 | -| 2 | 2 | -| 3 | 3 | -+---+---+ - -SELECT i1.i,i2.i FROM integers i1, integers i2 WHERE i1.i=i2.i AND i1.i>1 ORDER BY 1; - -+---+---+ -| i | i | -+---+---+ -| 2 | 2 | -| 3 | 3 | -+---+---+ - -SELECT i1.i,i2.i,i3.i FROM integers i1, integers i2, integers i3 WHERE i1.i=i2.i AND i1.i=i3.i AND i1.i>1 ORDER BY 1; - -+---+---+---+ -| i | i | i | -+---+---+---+ -| 2 | 2 | 2 | -| 3 | 3 | 3 | -+---+---+---+ - -SELECT i1.i,i2.i FROM integers i1 JOIN integers i2 ON i1.i=i2.i WHERE i1.i>1 ORDER BY 1; - -+---+---+ -| i | i | -+---+---+ -| 2 | 2 | -| 3 | 3 | -+---+---+ - -DROP TABLE integers; - -Affected Rows: 1 - diff --git a/tests/cases/distributed/optimizer/filter_push_down.sql b/tests/cases/distributed/optimizer/filter_push_down.sql deleted file mode 100644 index dab2825454..0000000000 --- a/tests/cases/distributed/optimizer/filter_push_down.sql +++ /dev/null @@ -1,15 +0,0 @@ -CREATE TABLE integers(i INTEGER, j BIGINT TIME INDEX); - -INSERT INTO integers VALUES (1, 1), (2, 2), (3, 3), (NULL, 4); - -SELECT i1.i, i2.i FROM integers i1, integers i2 WHERE i1.i=i2.i ORDER BY 1; - -SELECT i1.i,i2.i FROM integers i1, integers i2 WHERE i1.i=i2.i AND i1.i>1 ORDER BY 1; - -SELECT i1.i,i2.i,i3.i FROM integers i1, integers i2, integers i3 WHERE i1.i=i2.i AND i1.i=i3.i AND i1.i>1 ORDER BY 1; - -SELECT i1.i,i2.i FROM integers i1 JOIN integers i2 ON i1.i=i2.i WHERE i1.i>1 ORDER BY 1; - --- TODO(LFC): Resolve #790, then port remaining test case from standalone. - -DROP TABLE integers; diff --git a/tests/cases/standalone/optimizer/filter_push_down.result b/tests/cases/standalone/common/optimizer/filter_push_down.result similarity index 90% rename from tests/cases/standalone/optimizer/filter_push_down.result rename to tests/cases/standalone/common/optimizer/filter_push_down.result index ca7976abfc..369da7da84 100644 --- a/tests/cases/standalone/optimizer/filter_push_down.result +++ b/tests/cases/standalone/common/optimizer/filter_push_down.result @@ -92,15 +92,32 @@ SELECT i1.i,i2.i FROM integers i1 LEFT OUTER JOIN integers i2 ON 1=1 WHERE i1.i= SELECT * FROM integers WHERE i IN ((SELECT i FROM integers)) ORDER BY i; -Error: 3001(EngineExecuteQuery), This feature is not implemented: Physical plan does not support logical expression () ++---+---+ +| i | j | ++---+---+ +| 1 | 1 | +| 2 | 2 | +| 3 | 3 | ++---+---+ SELECT * FROM integers WHERE i NOT IN ((SELECT i FROM integers WHERE i=1)) ORDER BY i; -Error: 3001(EngineExecuteQuery), This feature is not implemented: Physical plan does not support logical expression () ++---+---+ +| i | j | ++---+---+ +| 2 | 2 | +| 3 | 3 | +| | 4 | ++---+---+ SELECT * FROM integers WHERE i IN ((SELECT i FROM integers)) AND i<3 ORDER BY i; -Error: 3001(EngineExecuteQuery), This feature is not implemented: Physical plan does not support logical expression () ++---+---+ +| i | j | ++---+---+ +| 1 | 1 | +| 2 | 2 | ++---+---+ SELECT i1.i,i2.i FROM integers i1, integers i2 WHERE i IN ((SELECT i FROM integers)) AND i1.i=i2.i ORDER BY 1; diff --git a/tests/cases/standalone/optimizer/filter_push_down.sql b/tests/cases/standalone/common/optimizer/filter_push_down.sql similarity index 100% rename from tests/cases/standalone/optimizer/filter_push_down.sql rename to tests/cases/standalone/common/optimizer/filter_push_down.sql