fix: failed to run subquery wrapped in two parentheses (#1157)

This commit is contained in:
LFC
2023-03-14 10:59:43 +08:00
committed by evenyag
parent e8e11072f8
commit 8658d428e0
20 changed files with 235 additions and 212 deletions

132
Cargo.lock generated
View File

@@ -190,9 +190,9 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6"
[[package]]
name = "arrow"
version = "33.0.0"
version = "34.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3724c874f1517cf898cd1c3ad18ab5071edf893c48e73139ab1e16cf0f2affe"
checksum = "f410d3907b6b3647b9e7bca4551274b2e3d716aa940afb67b7287257401da921"
dependencies = [
"ahash 0.8.3",
"arrow-arith",
@@ -214,9 +214,9 @@ dependencies = [
[[package]]
name = "arrow-arith"
version = "33.0.0"
version = "34.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e958823b8383ca14d0a2e973de478dd7674cd9f72837f8c41c132a0fda6a4e5e"
checksum = "f87391cf46473c9bc53dab68cb8872c3a81d4dfd1703f1c8aa397dba9880a043"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -229,9 +229,9 @@ dependencies = [
[[package]]
name = "arrow-array"
version = "33.0.0"
version = "34.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db670eab50e76654065b5aed930f4367101fcddcb2223802007d1e0b4d5a2579"
checksum = "d35d5475e65c57cffba06d0022e3006b677515f99b54af33a7cd54f6cdd4a5b5"
dependencies = [
"ahash 0.8.3",
"arrow-buffer",
@@ -245,9 +245,9 @@ dependencies = [
[[package]]
name = "arrow-buffer"
version = "33.0.0"
version = "34.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f0e01c931882448c0407bd32311a624b9f099739e94e786af68adc97016b5f2"
checksum = "68b4ec72eda7c0207727df96cf200f539749d736b21f3e782ece113e18c1a0a7"
dependencies = [
"half 2.2.1",
"num",
@@ -255,9 +255,9 @@ dependencies = [
[[package]]
name = "arrow-cast"
version = "33.0.0"
version = "34.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bf35d78836c93f80d9362f3ccb47ff5e2c5ecfc270ff42cdf1ef80334961d44"
checksum = "0a7285272c9897321dfdba59de29f5b05aeafd3cdedf104a941256d155f6d304"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -271,9 +271,9 @@ dependencies = [
[[package]]
name = "arrow-csv"
version = "33.0.0"
version = "34.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a6aa7c2531d89d01fed8c469a9b1bf97132a0bdf70b4724fe4bbb4537a50880"
checksum = "981ee4e7f6a120da04e00d0b39182e1eeacccb59c8da74511de753c56b7fddf7"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -290,9 +290,9 @@ dependencies = [
[[package]]
name = "arrow-data"
version = "33.0.0"
version = "34.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea50db4d1e1e4c2da2bfdea7b6d2722eef64267d5ab680d815f7ae42428057f5"
checksum = "27cc673ee6989ea6e4b4e8c7d461f7e06026a096c8f0b1a7288885ff71ae1e56"
dependencies = [
"arrow-buffer",
"arrow-schema",
@@ -302,9 +302,9 @@ dependencies = [
[[package]]
name = "arrow-flight"
version = "33.0.0"
version = "34.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ad4c883d509d89f05b2891ad889729f17ab2191b5fd22b0cf3660a28cc40af5"
checksum = "bd16945f8f3be0f6170b8ced60d414e56239d91a16a3f8800bc1504bc58b2592"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -325,9 +325,9 @@ dependencies = [
[[package]]
name = "arrow-ipc"
version = "33.0.0"
version = "34.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4042fe6585155d1ec28a8e4937ec901a3ca7a19a22b9f6cd3f551b935cd84f5"
checksum = "e37b8b69d9e59116b6b538e8514e0ec63a30f08b617ce800d31cb44e3ef64c1a"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -339,9 +339,9 @@ dependencies = [
[[package]]
name = "arrow-json"
version = "33.0.0"
version = "34.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c907c4ab4f26970a3719dc06e78e8054a01d0c96da3664d23b941e201b33d2b"
checksum = "80c3fa0bed7cfebf6d18e46b733f9cb8a1cb43ce8e6539055ca3e1e48a426266"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -358,9 +358,9 @@ dependencies = [
[[package]]
name = "arrow-ord"
version = "33.0.0"
version = "34.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e131b447242a32129efc7932f58ed8931b42f35d8701c1a08f9f524da13b1d3c"
checksum = "d247dce7bed6a8d6a3c6debfa707a3a2f694383f0c692a39d736a593eae5ef94"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -372,9 +372,9 @@ dependencies = [
[[package]]
name = "arrow-row"
version = "33.0.0"
version = "34.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b591ef70d76f4ac28dd7666093295fece0e5f9298f49af51ea49c001e1635bb6"
checksum = "8d609c0181f963cea5c70fddf9a388595b5be441f3aa1d1cdbf728ca834bbd3a"
dependencies = [
"ahash 0.8.3",
"arrow-array",
@@ -387,9 +387,9 @@ dependencies = [
[[package]]
name = "arrow-schema"
version = "33.0.0"
version = "34.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb327717d87eb94be5eff3b0cb8987f54059d343ee5235abf7f143c85f54cfc8"
checksum = "64951898473bfb8e22293e83a44f02874d2257514d49cd95f9aa4afcff183fbc"
dependencies = [
"bitflags",
"serde",
@@ -397,9 +397,9 @@ dependencies = [
[[package]]
name = "arrow-select"
version = "33.0.0"
version = "34.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79d3c389d1cea86793934f31594f914c8547d82e91e3411d4833ad0aac3266a7"
checksum = "2a513d89c2e1ac22b28380900036cf1f3992c6443efc5e079de631dcf83c6888"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -410,9 +410,9 @@ dependencies = [
[[package]]
name = "arrow-string"
version = "33.0.0"
version = "34.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30ee67790496dd310ddbf5096870324431e89aa76453e010020ac29b1184d356"
checksum = "5288979b2705dae1114c864d73150629add9153b9b8f1d7ee3963db94c372ba5"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -477,6 +477,8 @@ dependencies = [
"pin-project-lite",
"tokio",
"xz2",
"zstd 0.11.2+zstd.1.5.2",
"zstd-safe 5.0.2+zstd.1.5.2",
]
[[package]]
@@ -2106,7 +2108,7 @@ dependencies = [
[[package]]
name = "datafusion"
version = "19.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=fad360df0132a2fcb264a7c07b2b02f0b1dfc644#fad360df0132a2fcb264a7c07b2b02f0b1dfc644"
source = "git+https://github.com/MichaelScofield/arrow-datafusion.git?rev=d7b3c730049f2561755f9d855f638cb580c38eff#d7b3c730049f2561755f9d855f638cb580c38eff"
dependencies = [
"ahash 0.8.3",
"arrow",
@@ -2117,6 +2119,7 @@ dependencies = [
"chrono",
"dashmap",
"datafusion-common",
"datafusion-execution",
"datafusion-expr",
"datafusion-optimizer",
"datafusion-physical-expr",
@@ -2147,12 +2150,13 @@ dependencies = [
"url",
"uuid",
"xz2",
"zstd 0.12.3+zstd.1.5.2",
]
[[package]]
name = "datafusion-common"
version = "19.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=fad360df0132a2fcb264a7c07b2b02f0b1dfc644#fad360df0132a2fcb264a7c07b2b02f0b1dfc644"
source = "git+https://github.com/MichaelScofield/arrow-datafusion.git?rev=d7b3c730049f2561755f9d855f638cb580c38eff#d7b3c730049f2561755f9d855f638cb580c38eff"
dependencies = [
"arrow",
"chrono",
@@ -2162,10 +2166,27 @@ dependencies = [
"sqlparser",
]
[[package]]
name = "datafusion-execution"
version = "19.0.0"
source = "git+https://github.com/MichaelScofield/arrow-datafusion.git?rev=d7b3c730049f2561755f9d855f638cb580c38eff#d7b3c730049f2561755f9d855f638cb580c38eff"
dependencies = [
"dashmap",
"datafusion-common",
"datafusion-expr",
"hashbrown 0.13.2",
"log",
"object_store",
"parking_lot",
"rand",
"tempfile",
"url",
]
[[package]]
name = "datafusion-expr"
version = "19.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=fad360df0132a2fcb264a7c07b2b02f0b1dfc644#fad360df0132a2fcb264a7c07b2b02f0b1dfc644"
source = "git+https://github.com/MichaelScofield/arrow-datafusion.git?rev=d7b3c730049f2561755f9d855f638cb580c38eff#d7b3c730049f2561755f9d855f638cb580c38eff"
dependencies = [
"ahash 0.8.3",
"arrow",
@@ -2177,7 +2198,7 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
version = "19.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=fad360df0132a2fcb264a7c07b2b02f0b1dfc644#fad360df0132a2fcb264a7c07b2b02f0b1dfc644"
source = "git+https://github.com/MichaelScofield/arrow-datafusion.git?rev=d7b3c730049f2561755f9d855f638cb580c38eff#d7b3c730049f2561755f9d855f638cb580c38eff"
dependencies = [
"arrow",
"async-trait",
@@ -2194,7 +2215,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "19.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=fad360df0132a2fcb264a7c07b2b02f0b1dfc644#fad360df0132a2fcb264a7c07b2b02f0b1dfc644"
source = "git+https://github.com/MichaelScofield/arrow-datafusion.git?rev=d7b3c730049f2561755f9d855f638cb580c38eff#d7b3c730049f2561755f9d855f638cb580c38eff"
dependencies = [
"ahash 0.8.3",
"arrow",
@@ -2214,6 +2235,7 @@ dependencies = [
"md-5",
"num-traits",
"paste",
"petgraph",
"rand",
"regex",
"sha2",
@@ -2224,7 +2246,7 @@ dependencies = [
[[package]]
name = "datafusion-row"
version = "19.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=fad360df0132a2fcb264a7c07b2b02f0b1dfc644#fad360df0132a2fcb264a7c07b2b02f0b1dfc644"
source = "git+https://github.com/MichaelScofield/arrow-datafusion.git?rev=d7b3c730049f2561755f9d855f638cb580c38eff#d7b3c730049f2561755f9d855f638cb580c38eff"
dependencies = [
"arrow",
"datafusion-common",
@@ -2235,7 +2257,7 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "19.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=fad360df0132a2fcb264a7c07b2b02f0b1dfc644#fad360df0132a2fcb264a7c07b2b02f0b1dfc644"
source = "git+https://github.com/MichaelScofield/arrow-datafusion.git?rev=d7b3c730049f2561755f9d855f638cb580c38eff#d7b3c730049f2561755f9d855f638cb580c38eff"
dependencies = [
"arrow-schema",
"datafusion-common",
@@ -4785,9 +4807,9 @@ dependencies = [
[[package]]
name = "parquet"
version = "33.0.0"
version = "34.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1b076829801167d889795cd1957989055543430fa1469cb1f6e32b789bfc764"
checksum = "7ac135ecf63ebb5f53dda0921b0b76d6048b3ef631a5f4760b9e8f863ff00cfa"
dependencies = [
"ahash 0.8.3",
"arrow-array",
@@ -4813,7 +4835,7 @@ dependencies = [
"thrift 0.17.0",
"tokio",
"twox-hash",
"zstd",
"zstd 0.12.3+zstd.1.5.2",
]
[[package]]
@@ -5406,9 +5428,9 @@ dependencies = [
[[package]]
name = "prost-build"
version = "0.11.6"
version = "0.11.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3f8ad728fb08fe212df3c05169e940fbb6d9d16a877ddde14644a983ba2012e"
checksum = "a24be1d23b4552a012093e1b93697b73d644ae9590e3253d878d0e77d411b614"
dependencies = [
"bytes",
"heck 0.4.1",
@@ -7201,9 +7223,9 @@ dependencies = [
[[package]]
name = "sqlparser"
version = "0.30.0"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db67dc6ef36edb658196c3fef0464a80b53dbbc194a904e81f9bd4190f9ecc5b"
checksum = "0366f270dbabb5cc2e4c88427dc4c08bba144f81e32fbd459a013f26a4d16aa0"
dependencies = [
"log",
"sqlparser_derive",
@@ -7553,6 +7575,7 @@ dependencies = [
"datafusion",
"datafusion-common",
"datafusion-expr",
"datafusion-physical-expr",
"datatypes",
"derive_builder 0.11.2",
"futures",
@@ -9097,13 +9120,32 @@ version = "1.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c394b5bd0c6f669e7275d9c20aa90ae064cb22e75a1cad54e1b34088034b149f"
[[package]]
name = "zstd"
version = "0.11.2+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20cc960326ece64f010d2d2107537f26dc589a6573a316bd5b1dba685fa5fde4"
dependencies = [
"zstd-safe 5.0.2+zstd.1.5.2",
]
[[package]]
name = "zstd"
version = "0.12.3+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76eea132fb024e0e13fd9c2f5d5d595d8a967aa72382ac2f9d39fcc95afd0806"
dependencies = [
"zstd-safe",
"zstd-safe 6.0.4+zstd.1.5.4",
]
[[package]]
name = "zstd-safe"
version = "5.0.2+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d2a5585e04f9eea4b2a3d1eca508c4dee9592a89ef6f450c11719da0726f4db"
dependencies = [
"libc",
"zstd-sys",
]
[[package]]

View File

@@ -50,28 +50,29 @@ edition = "2021"
license = "Apache-2.0"
[workspace.dependencies]
arrow = { version = "33.0" }
arrow-array = "33.0"
arrow-flight = "33.0"
arrow-schema = { version = "33.0", features = ["serde"] }
arrow = { version = "34.0" }
arrow-array = "34.0"
arrow-flight = "34.0"
arrow-schema = { version = "34.0", features = ["serde"] }
async-stream = "0.3"
async-trait = "0.1"
chrono = { version = "0.4", features = ["serde"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "fad360df0132a2fcb264a7c07b2b02f0b1dfc644" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "fad360df0132a2fcb264a7c07b2b02f0b1dfc644" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "fad360df0132a2fcb264a7c07b2b02f0b1dfc644" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "fad360df0132a2fcb264a7c07b2b02f0b1dfc644" }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "fad360df0132a2fcb264a7c07b2b02f0b1dfc644" }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "fad360df0132a2fcb264a7c07b2b02f0b1dfc644" }
# TODO(LFC): Use official DataFusion, when https://github.com/apache/arrow-datafusion/pull/5542 got merged
datafusion = { git = "https://github.com/MichaelScofield/arrow-datafusion.git", rev = "d7b3c730049f2561755f9d855f638cb580c38eff" }
datafusion-common = { git = "https://github.com/MichaelScofield/arrow-datafusion.git", rev = "d7b3c730049f2561755f9d855f638cb580c38eff" }
datafusion-expr = { git = "https://github.com/MichaelScofield/arrow-datafusion.git", rev = "d7b3c730049f2561755f9d855f638cb580c38eff" }
datafusion-optimizer = { git = "https://github.com/MichaelScofield/arrow-datafusion.git", rev = "d7b3c730049f2561755f9d855f638cb580c38eff" }
datafusion-physical-expr = { git = "https://github.com/MichaelScofield/arrow-datafusion.git", rev = "d7b3c730049f2561755f9d855f638cb580c38eff" }
datafusion-sql = { git = "https://github.com/MichaelScofield/arrow-datafusion.git", rev = "d7b3c730049f2561755f9d855f638cb580c38eff" }
futures = "0.3"
futures-util = "0.3"
parquet = "33.0"
parquet = "34.0"
paste = "1.0"
prost = "0.11"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
snafu = { version = "0.7", features = ["backtraces"] }
sqlparser = "0.30"
sqlparser = "0.32"
tempfile = "3"
tokio = { version = "1.24.2", features = ["full"] }
tokio-util = "0.7"

View File

@@ -140,8 +140,11 @@ impl Table for DistTable {
Ok(Arc::new(dist_scan))
}
fn supports_filter_pushdown(&self, _filter: &Expr) -> table::Result<FilterPushDownType> {
Ok(FilterPushDownType::Inexact)
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> table::Result<Vec<FilterPushDownType>> {
Ok(vec![FilterPushDownType::Inexact; filters.len()])
}
async fn alter(&self, context: AlterContext, request: &AlterTableRequest) -> table::Result<()> {

View File

@@ -208,8 +208,8 @@ impl<R: Region> Table for MitoTable<R> {
Ok(Arc::new(SimpleTableScan::new(stream)))
}
fn supports_filter_pushdown(&self, _filter: &Expr) -> table::error::Result<FilterPushDownType> {
Ok(FilterPushDownType::Inexact)
fn supports_filters_pushdown(&self, filters: &[&Expr]) -> TableResult<Vec<FilterPushDownType>> {
Ok(vec![FilterPushDownType::Inexact; filters.len()])
}
/// Alter table changes the schemas of the table.

View File

@@ -23,7 +23,7 @@ use datafusion::arrow::datatypes::{DataType, TimeUnit};
use datafusion::common::{DFField, DFSchema, DFSchemaRef, Result as DataFusionResult, Statistics};
use datafusion::error::DataFusionError;
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode};
use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
@@ -37,7 +37,7 @@ use futures::Stream;
use crate::extension_plan::Millisecond;
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct EmptyMetric {
start: Millisecond,
end: Millisecond,
@@ -86,9 +86,9 @@ impl EmptyMetric {
}
}
impl UserDefinedLogicalNode for EmptyMetric {
fn as_any(&self) -> &dyn Any {
self as _
impl UserDefinedLogicalNodeCore for EmptyMetric {
fn name(&self) -> &str {
"EmptyMetric"
}
fn inputs(&self) -> Vec<&LogicalPlan> {
@@ -111,12 +111,8 @@ impl UserDefinedLogicalNode for EmptyMetric {
)
}
fn from_template(
&self,
_exprs: &[datafusion::prelude::Expr],
_inputs: &[LogicalPlan],
) -> Arc<dyn UserDefinedLogicalNode> {
Arc::new(self.clone())
fn from_template(&self, _expr: &[Expr], _inputs: &[LogicalPlan]) -> Self {
self.clone()
}
}

View File

@@ -24,7 +24,7 @@ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::DFSchemaRef;
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNode};
use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
@@ -42,7 +42,7 @@ use crate::extension_plan::Millisecond;
/// This plan will try to align the input time series, for every timestamp between
/// `start` and `end` with step `interval`. Find in the `lookback` range if data
/// is missing at the given timestamp.
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct InstantManipulate {
start: Millisecond,
end: Millisecond,
@@ -52,9 +52,9 @@ pub struct InstantManipulate {
input: LogicalPlan,
}
impl UserDefinedLogicalNode for InstantManipulate {
fn as_any(&self) -> &dyn Any {
self as _
impl UserDefinedLogicalNodeCore for InstantManipulate {
fn name(&self) -> &str {
"InstantManipulate"
}
fn inputs(&self) -> Vec<&LogicalPlan> {
@@ -77,21 +77,17 @@ impl UserDefinedLogicalNode for InstantManipulate {
)
}
fn from_template(
&self,
_exprs: &[Expr],
inputs: &[LogicalPlan],
) -> Arc<dyn UserDefinedLogicalNode> {
fn from_template(&self, _exprs: &[Expr], inputs: &[LogicalPlan]) -> Self {
assert!(!inputs.is_empty());
Arc::new(Self {
Self {
start: self.start,
end: self.end,
lookback_delta: self.lookback_delta,
interval: self.interval,
time_index_column: self.time_index_column.clone(),
input: inputs[0].clone(),
})
}
}
}

View File

@@ -22,7 +22,7 @@ use datafusion::arrow::compute;
use datafusion::common::{DFSchemaRef, Result as DataFusionResult, Statistics};
use datafusion::error::DataFusionError;
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode};
use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
@@ -43,7 +43,7 @@ use crate::extension_plan::Millisecond;
/// - bias sample's timestamp by offset
/// - sort the record batch based on timestamp column
/// - remove NaN values
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct SeriesNormalize {
offset: Millisecond,
time_index_column_name: String,
@@ -51,9 +51,9 @@ pub struct SeriesNormalize {
input: LogicalPlan,
}
impl UserDefinedLogicalNode for SeriesNormalize {
fn as_any(&self) -> &dyn Any {
self as _
impl UserDefinedLogicalNodeCore for SeriesNormalize {
fn name(&self) -> &str {
"SeriesNormalize"
}
fn inputs(&self) -> Vec<&LogicalPlan> {
@@ -76,18 +76,14 @@ impl UserDefinedLogicalNode for SeriesNormalize {
)
}
fn from_template(
&self,
_exprs: &[datafusion::logical_expr::Expr],
inputs: &[LogicalPlan],
) -> Arc<dyn UserDefinedLogicalNode> {
fn from_template(&self, _exprs: &[Expr], inputs: &[LogicalPlan]) -> Self {
assert!(!inputs.is_empty());
Arc::new(Self {
Self {
offset: self.offset,
time_index_column_name: self.time_index_column_name.clone(),
input: inputs[0].clone(),
})
}
}
}

View File

@@ -26,7 +26,7 @@ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::{DFField, DFSchema, DFSchemaRef};
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNode};
use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
@@ -42,7 +42,7 @@ use crate::range_array::RangeArray;
///
/// This plan will "fold" time index and value columns into [RangeArray]s, and truncate
/// other columns to the same length with the "folded" [RangeArray] column.
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct RangeManipulate {
start: Millisecond,
end: Millisecond,
@@ -137,9 +137,9 @@ impl RangeManipulate {
}
}
impl UserDefinedLogicalNode for RangeManipulate {
fn as_any(&self) -> &dyn Any {
self as _
impl UserDefinedLogicalNodeCore for RangeManipulate {
fn name(&self) -> &str {
"RangeManipulate"
}
fn inputs(&self) -> Vec<&LogicalPlan> {
@@ -162,14 +162,10 @@ impl UserDefinedLogicalNode for RangeManipulate {
)
}
fn from_template(
&self,
_exprs: &[Expr],
inputs: &[LogicalPlan],
) -> Arc<dyn UserDefinedLogicalNode> {
fn from_template(&self, _exprs: &[Expr], inputs: &[LogicalPlan]) -> Self {
assert!(!inputs.is_empty());
Arc::new(Self {
Self {
start: self.start,
end: self.end,
interval: self.interval,
@@ -178,7 +174,7 @@ impl UserDefinedLogicalNode for RangeManipulate {
value_columns: self.value_columns.clone(),
input: inputs[0].clone(),
output_schema: self.output_schema.clone(),
})
}
}
}

View File

@@ -23,7 +23,7 @@ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::DFSchemaRef;
use datafusion::error::Result as DataFusionResult;
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNode};
use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
@@ -33,15 +33,15 @@ use datafusion::physical_plan::{
use datatypes::arrow::compute;
use futures::{ready, Stream, StreamExt};
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct SeriesDivide {
tag_columns: Vec<String>,
input: LogicalPlan,
}
impl UserDefinedLogicalNode for SeriesDivide {
fn as_any(&self) -> &dyn Any {
self as _
impl UserDefinedLogicalNodeCore for SeriesDivide {
fn name(&self) -> &str {
"SeriesDivide"
}
fn inputs(&self) -> Vec<&LogicalPlan> {
@@ -60,17 +60,13 @@ impl UserDefinedLogicalNode for SeriesDivide {
write!(f, "PromSeriesDivide: tags={:?}", self.tag_columns)
}
fn from_template(
&self,
_exprs: &[Expr],
inputs: &[LogicalPlan],
) -> Arc<dyn UserDefinedLogicalNode> {
fn from_template(&self, _exprs: &[Expr], inputs: &[LogicalPlan]) -> Self {
assert!(!inputs.is_empty());
Arc::new(Self {
Self {
tag_columns: self.tag_columns.clone(),
input: inputs[0].clone(),
})
}
}
}

View File

@@ -157,7 +157,7 @@ mod test {
distinct: false, \
top: None, \
projection: \
[Wildcard(WildcardAdditionalOptions { opt_exclude: None, opt_except: None, opt_rename: None })], \
[Wildcard(WildcardAdditionalOptions { opt_exclude: None, opt_except: None, opt_rename: None, opt_replace: None })], \
into: None, \
from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: \"t1\", quote_style: None }]\
), \

View File

@@ -70,8 +70,11 @@ impl Table for MemTableWrapper {
self.inner.scan(projection, filters, limit).await
}
fn supports_filter_pushdown(&self, _filter: &Expr) -> table::Result<FilterPushDownType> {
Ok(FilterPushDownType::Exact)
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> table::Result<Vec<FilterPushDownType>> {
Ok(vec![FilterPushDownType::Exact; filters.len()])
}
}

View File

@@ -19,6 +19,7 @@ common-time = { path = "../common/time" }
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datafusion-physical-expr.workspace = true
datatypes = { path = "../datatypes" }
derive_builder = "0.11"
futures.workspace = true

View File

@@ -17,6 +17,7 @@ use std::sync::Arc;
use chrono::{DateTime, Utc};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datafusion_expr::TableProviderFilterPushDown;
pub use datatypes::error::{Error as ConvertError, Result as ConvertResult};
use datatypes::schema::{ColumnSchema, RawSchema, Schema, SchemaBuilder, SchemaRef};
use derive_builder::Builder;
@@ -47,6 +48,26 @@ pub enum FilterPushDownType {
Exact,
}
impl From<TableProviderFilterPushDown> for FilterPushDownType {
fn from(value: TableProviderFilterPushDown) -> Self {
match value {
TableProviderFilterPushDown::Unsupported => FilterPushDownType::Unsupported,
TableProviderFilterPushDown::Inexact => FilterPushDownType::Inexact,
TableProviderFilterPushDown::Exact => FilterPushDownType::Exact,
}
}
}
impl From<FilterPushDownType> for TableProviderFilterPushDown {
fn from(value: FilterPushDownType) -> Self {
match value {
FilterPushDownType::Unsupported => TableProviderFilterPushDown::Unsupported,
FilterPushDownType::Inexact => TableProviderFilterPushDown::Inexact,
FilterPushDownType::Exact => TableProviderFilterPushDown::Exact,
}
}
}
/// Indicates the type of this table for metadata/catalog purposes.
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
pub enum TableType {

View File

@@ -18,7 +18,10 @@ use common_time::range::TimestampRange;
use common_time::Timestamp;
use datafusion::parquet::file::metadata::RowGroupMetaData;
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion_common::ToDFSchema;
use datafusion_expr::{Between, BinaryExpr, Operator};
use datafusion_physical_expr::create_physical_expr;
use datafusion_physical_expr::execution_props::ExecutionProps;
use datatypes::schema::SchemaRef;
use datatypes::value::scalar_value_to_timestamp;
@@ -46,8 +49,26 @@ impl Predicate {
row_groups: &[RowGroupMetaData],
) -> Vec<bool> {
let mut res = vec![true; row_groups.len()];
let arrow_schema = (*schema.arrow_schema()).clone();
let df_schema = arrow_schema.clone().to_dfschema_ref();
let df_schema = match df_schema {
Ok(x) => x,
Err(e) => {
warn!("Failed to create Datafusion schema when trying to prune row groups, error: {e}");
return res;
}
};
let execution_props = &ExecutionProps::new();
for expr in &self.exprs {
match PruningPredicate::try_new(expr.df_expr().clone(), schema.arrow_schema().clone()) {
match create_physical_expr(
expr.df_expr(),
df_schema.as_ref(),
arrow_schema.as_ref(),
execution_props,
)
.and_then(|expr| PruningPredicate::try_new(expr, arrow_schema.clone()))
{
Ok(p) => {
let stat = RowGroupPruningStatistics::new(row_groups, &schema);
match p.prune(&stat) {

View File

@@ -70,10 +70,10 @@ pub trait Table: Send + Sync {
limit: Option<usize>,
) -> Result<PhysicalPlanRef>;
/// Tests whether the table provider can make use of a filter expression
/// Tests whether the table provider can make use of any or all filter expressions
/// to optimise data retrieval.
fn supports_filter_pushdown(&self, _filter: &Expr) -> Result<FilterPushDownType> {
Ok(FilterPushDownType::Unsupported)
fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result<Vec<FilterPushDownType>> {
Ok(vec![FilterPushDownType::Unsupported; filters.len()])
}
/// Alter table.

View File

@@ -78,15 +78,18 @@ impl TableProvider for DfTableProviderAdapter {
Ok(Arc::new(DfPhysicalPlanAdapter(inner)))
}
fn supports_filter_pushdown(&self, filter: &DfExpr) -> DfResult<DfTableProviderFilterPushDown> {
let p = self
fn supports_filters_pushdown(
&self,
filters: &[&DfExpr],
) -> DfResult<Vec<DfTableProviderFilterPushDown>> {
let filters = filters
.iter()
.map(|&x| x.clone().into())
.collect::<Vec<_>>();
Ok(self
.table
.supports_filter_pushdown(&filter.clone().into())?;
match p {
FilterPushDownType::Unsupported => Ok(DfTableProviderFilterPushDown::Unsupported),
FilterPushDownType::Inexact => Ok(DfTableProviderFilterPushDown::Inexact),
FilterPushDownType::Exact => Ok(DfTableProviderFilterPushDown::Exact),
}
.supports_filters_pushdown(&filters.iter().collect::<Vec<_>>())
.map(|v| v.into_iter().map(Into::into).collect::<Vec<_>>())?)
}
}
@@ -155,16 +158,11 @@ impl Table for TableAdapter {
Ok(Arc::new(PhysicalPlanAdapter::new(schema, execution_plan)))
}
fn supports_filter_pushdown(&self, filter: &Expr) -> Result<FilterPushDownType> {
match self
.table_provider
.supports_filter_pushdown(filter.df_expr())
.context(error::DatafusionSnafu)?
{
DfTableProviderFilterPushDown::Unsupported => Ok(FilterPushDownType::Unsupported),
DfTableProviderFilterPushDown::Inexact => Ok(FilterPushDownType::Inexact),
DfTableProviderFilterPushDown::Exact => Ok(FilterPushDownType::Exact),
}
fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result<Vec<FilterPushDownType>> {
self.table_provider
.supports_filters_pushdown(&filters.iter().map(|x| x.df_expr()).collect::<Vec<_>>())
.context(error::DatafusionSnafu)
.map(|v| v.into_iter().map(Into::into).collect::<Vec<_>>())
}
}

View File

@@ -1,49 +0,0 @@
CREATE TABLE integers(i INTEGER, j BIGINT TIME INDEX);
Affected Rows: 0
INSERT INTO integers VALUES (1, 1), (2, 2), (3, 3), (NULL, 4);
Affected Rows: 4
SELECT i1.i, i2.i FROM integers i1, integers i2 WHERE i1.i=i2.i ORDER BY 1;
+---+---+
| i | i |
+---+---+
| 1 | 1 |
| 2 | 2 |
| 3 | 3 |
+---+---+
SELECT i1.i,i2.i FROM integers i1, integers i2 WHERE i1.i=i2.i AND i1.i>1 ORDER BY 1;
+---+---+
| i | i |
+---+---+
| 2 | 2 |
| 3 | 3 |
+---+---+
SELECT i1.i,i2.i,i3.i FROM integers i1, integers i2, integers i3 WHERE i1.i=i2.i AND i1.i=i3.i AND i1.i>1 ORDER BY 1;
+---+---+---+
| i | i | i |
+---+---+---+
| 2 | 2 | 2 |
| 3 | 3 | 3 |
+---+---+---+
SELECT i1.i,i2.i FROM integers i1 JOIN integers i2 ON i1.i=i2.i WHERE i1.i>1 ORDER BY 1;
+---+---+
| i | i |
+---+---+
| 2 | 2 |
| 3 | 3 |
+---+---+
DROP TABLE integers;
Affected Rows: 1

View File

@@ -1,15 +0,0 @@
CREATE TABLE integers(i INTEGER, j BIGINT TIME INDEX);
INSERT INTO integers VALUES (1, 1), (2, 2), (3, 3), (NULL, 4);
SELECT i1.i, i2.i FROM integers i1, integers i2 WHERE i1.i=i2.i ORDER BY 1;
SELECT i1.i,i2.i FROM integers i1, integers i2 WHERE i1.i=i2.i AND i1.i>1 ORDER BY 1;
SELECT i1.i,i2.i,i3.i FROM integers i1, integers i2, integers i3 WHERE i1.i=i2.i AND i1.i=i3.i AND i1.i>1 ORDER BY 1;
SELECT i1.i,i2.i FROM integers i1 JOIN integers i2 ON i1.i=i2.i WHERE i1.i>1 ORDER BY 1;
-- TODO(LFC): Resolve #790, then port remaining test case from standalone.
DROP TABLE integers;

View File

@@ -92,15 +92,32 @@ SELECT i1.i,i2.i FROM integers i1 LEFT OUTER JOIN integers i2 ON 1=1 WHERE i1.i=
SELECT * FROM integers WHERE i IN ((SELECT i FROM integers)) ORDER BY i;
Error: 3001(EngineExecuteQuery), This feature is not implemented: Physical plan does not support logical expression (<subquery>)
+---+---+
| i | j |
+---+---+
| 1 | 1 |
| 2 | 2 |
| 3 | 3 |
+---+---+
SELECT * FROM integers WHERE i NOT IN ((SELECT i FROM integers WHERE i=1)) ORDER BY i;
Error: 3001(EngineExecuteQuery), This feature is not implemented: Physical plan does not support logical expression (<subquery>)
+---+---+
| i | j |
+---+---+
| 2 | 2 |
| 3 | 3 |
| | 4 |
+---+---+
SELECT * FROM integers WHERE i IN ((SELECT i FROM integers)) AND i<3 ORDER BY i;
Error: 3001(EngineExecuteQuery), This feature is not implemented: Physical plan does not support logical expression (<subquery>)
+---+---+
| i | j |
+---+---+
| 1 | 1 |
| 2 | 2 |
+---+---+
SELECT i1.i,i2.i FROM integers i1, integers i2 WHERE i IN ((SELECT i FROM integers)) AND i1.i=i2.i ORDER BY 1;