diff --git a/Cargo.lock b/Cargo.lock index a9c2179ede..1e1349709a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3117,7 +3117,7 @@ checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" [[package]] name = "datafusion" version = "45.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4" dependencies = [ "arrow 54.2.1", "arrow-array 54.2.1", @@ -3168,7 +3168,7 @@ dependencies = [ [[package]] name = "datafusion-catalog" version = "45.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4" dependencies = [ "arrow 54.2.1", "async-trait", @@ -3188,7 +3188,7 @@ dependencies = [ [[package]] name = "datafusion-catalog-listing" version = "45.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4" dependencies = [ "arrow 54.2.1", "arrow-schema 54.3.1", @@ -3211,7 +3211,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "45.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4" dependencies = [ "ahash 0.8.11", "arrow 54.2.1", @@ -3236,7 +3236,7 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" version = "45.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4" dependencies = [ "log", "tokio", @@ -3245,12 +3245,12 @@ dependencies = [ [[package]] name = "datafusion-doc" version = "45.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4" [[package]] name = "datafusion-execution" version = "45.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4" dependencies = [ "arrow 54.2.1", "dashmap", @@ -3268,7 +3268,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "45.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4" dependencies = [ "arrow 54.2.1", "chrono", @@ -3288,7 +3288,7 @@ dependencies = [ [[package]] name = "datafusion-expr-common" version = "45.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4" dependencies = [ "arrow 54.2.1", "datafusion-common", @@ -3299,7 +3299,7 @@ dependencies = [ [[package]] name = "datafusion-functions" version = "45.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4" dependencies = [ "arrow 54.2.1", "arrow-buffer 54.3.1", @@ -3328,7 +3328,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" version = "45.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4" dependencies = [ "ahash 0.8.11", "arrow 54.2.1", @@ -3349,7 +3349,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate-common" version = "45.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4" dependencies = [ "ahash 0.8.11", "arrow 54.2.1", @@ -3361,7 +3361,7 @@ dependencies = [ [[package]] name = "datafusion-functions-nested" version = "45.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4" dependencies = [ "arrow 54.2.1", "arrow-array 54.2.1", @@ -3383,7 +3383,7 @@ dependencies = [ [[package]] name = "datafusion-functions-table" version = "45.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4" dependencies = [ "arrow 54.2.1", "async-trait", @@ -3398,7 +3398,7 @@ dependencies = [ [[package]] name = "datafusion-functions-window" version = "45.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4" dependencies = [ "datafusion-common", "datafusion-doc", @@ -3414,7 +3414,7 @@ dependencies = [ [[package]] name = "datafusion-functions-window-common" version = "45.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4" dependencies = [ "datafusion-common", "datafusion-physical-expr-common", @@ -3423,7 +3423,7 @@ dependencies = [ [[package]] name = "datafusion-macros" version = "45.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4" dependencies = [ "datafusion-expr", "quote", @@ -3433,7 +3433,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "45.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4" dependencies = [ "arrow 54.2.1", "chrono", @@ -3451,7 +3451,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "45.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4" dependencies = [ "ahash 0.8.11", "arrow 54.2.1", @@ -3474,7 +3474,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" version = "45.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4" dependencies = [ "ahash 0.8.11", "arrow 54.2.1", @@ -3487,7 +3487,7 @@ dependencies = [ [[package]] name = "datafusion-physical-optimizer" version = "45.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4" dependencies = [ "arrow 54.2.1", "arrow-schema 54.3.1", @@ -3508,7 +3508,7 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" version = "45.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4" dependencies = [ "ahash 0.8.11", "arrow 54.2.1", @@ -3538,7 +3538,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "45.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4" dependencies = [ "arrow 54.2.1", "arrow-array 54.2.1", @@ -3556,7 +3556,7 @@ dependencies = [ [[package]] name = "datafusion-substrait" version = "45.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4" dependencies = [ "async-recursion", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index aa8793abb6..e51d50ee2b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -112,15 +112,15 @@ clap = { version = "4.4", features = ["derive"] } config = "0.13.0" crossbeam-utils = "0.8" dashmap = "6.1" -datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" } -datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" } -datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" } -datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" } -datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" } -datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" } -datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" } -datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" } -datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" } +datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" } +datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" } +datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" } +datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" } +datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" } +datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" } +datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" } +datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" } +datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" } deadpool = "0.12" deadpool-postgres = "0.14" derive_builder = "0.20" diff --git a/src/common/function/src/scalars/uddsketch_calc.rs b/src/common/function/src/scalars/uddsketch_calc.rs index 5c0beb4fec..f429766eb7 100644 --- a/src/common/function/src/scalars/uddsketch_calc.rs +++ b/src/common/function/src/scalars/uddsketch_calc.rs @@ -115,6 +115,13 @@ impl Function for UddSketchCalcFunction { } }; + // Check if the sketch is empty, if so, return null + // This is important to avoid panics when calling estimate_quantile on an empty sketch + // In practice, this will happen if input is all null + if sketch.bucket_iter().count() == 0 { + builder.push_null(); + continue; + } // Compute the estimated quantile from the sketch let result = sketch.estimate_quantile(perc); builder.push(Some(result)); diff --git a/src/flow/src/batching_mode.rs b/src/flow/src/batching_mode.rs index 152ad5781c..031c7aad4b 100644 --- a/src/flow/src/batching_mode.rs +++ b/src/flow/src/batching_mode.rs @@ -32,3 +32,9 @@ pub const SLOW_QUERY_THRESHOLD: Duration = Duration::from_secs(60); /// The minimum duration between two queries execution by batching mode task const MIN_REFRESH_DURATION: Duration = Duration::new(5, 0); + +/// Grpc connection timeout +const GRPC_CONN_TIMEOUT: Duration = Duration::from_secs(5); + +/// Grpc max retry number +const GRPC_MAX_RETRIES: u32 = 3; diff --git a/src/flow/src/batching_mode/frontend_client.rs b/src/flow/src/batching_mode/frontend_client.rs index 2454e86251..9f16ea07fa 100644 --- a/src/flow/src/batching_mode/frontend_client.rs +++ b/src/flow/src/batching_mode/frontend_client.rs @@ -25,12 +25,15 @@ use common_meta::cluster::{NodeInfo, NodeInfoKey, Role}; use common_meta::peer::Peer; use common_meta::rpc::store::RangeRequest; use common_query::Output; +use common_telemetry::warn; use meta_client::client::MetaClient; use servers::query_handler::grpc::GrpcQueryHandler; use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::{OptionExt, ResultExt}; -use crate::batching_mode::DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT; +use crate::batching_mode::{ + DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, GRPC_CONN_TIMEOUT, GRPC_MAX_RETRIES, +}; use crate::error::{ExternalSnafu, InvalidRequestSnafu, UnexpectedSnafu}; use crate::Error; @@ -99,7 +102,9 @@ impl FrontendClient { Self::Distributed { meta_client, chnl_mgr: { - let cfg = ChannelConfig::new().timeout(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT); + let cfg = ChannelConfig::new() + .connect_timeout(GRPC_CONN_TIMEOUT) + .timeout(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT); ChannelManager::with_config(cfg) }, } @@ -223,12 +228,32 @@ impl FrontendClient { peer: db.peer.clone(), }); - db.database - .handle(req.clone()) - .await - .with_context(|_| InvalidRequestSnafu { - context: format!("Failed to handle request: {:?}", req), - }) + let mut retry = 0; + + loop { + let ret = db.database.handle(req.clone()).await.with_context(|_| { + InvalidRequestSnafu { + context: format!("Failed to handle request: {:?}", req), + } + }); + if let Err(err) = ret { + if retry < GRPC_MAX_RETRIES { + retry += 1; + warn!( + "Failed to send request to grpc handle at Peer={:?}, retry = {}, error = {:?}", + db.peer, retry, err + ); + continue; + } else { + common_telemetry::error!( + "Failed to send request to grpc handle at Peer={:?} after {} retries, error = {:?}", + db.peer, retry, err + ); + return Err(err); + } + } + return ret; + } } FrontendClient::Standalone { database_client } => { let ctx = QueryContextBuilder::default() diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index 1547faae11..bb1f296c90 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -53,6 +53,7 @@ use crate::batching_mode::utils::{ use crate::batching_mode::{ DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, MIN_REFRESH_DURATION, SLOW_QUERY_THRESHOLD, }; +use crate::df_optimizer::apply_df_optimizer; use crate::error::{ ConvertColumnSchemaSnafu, DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu, SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu, @@ -541,7 +542,10 @@ impl BatchingTask { .clone() .rewrite(&mut add_auto_column) .with_context(|_| DatafusionSnafu { - context: format!("Failed to rewrite plan {:?}", self.config.plan), + context: format!( + "Failed to rewrite plan:\n {}\n", + self.config.plan + ), })? .data; let schema_len = plan.schema().fields().len(); @@ -573,16 +577,19 @@ impl BatchingTask { let mut add_filter = AddFilterRewriter::new(expr); let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone()); - // make a not optimized plan for clearer unparse + let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false) .await?; - plan.clone() + let rewrite = plan + .clone() .rewrite(&mut add_filter) .and_then(|p| p.data.rewrite(&mut add_auto_column)) .with_context(|_| DatafusionSnafu { - context: format!("Failed to rewrite plan {plan:?}"), + context: format!("Failed to rewrite plan:\n {}\n", plan), })? - .data + .data; + // only apply optimize after complex rewrite is done + apply_df_optimizer(rewrite).await? }; Ok(Some((new_plan, schema_len))) diff --git a/src/flow/src/batching_mode/time_window.rs b/src/flow/src/batching_mode/time_window.rs index e6a0d6ad8c..398250fc8b 100644 --- a/src/flow/src/batching_mode/time_window.rs +++ b/src/flow/src/batching_mode/time_window.rs @@ -704,6 +704,28 @@ mod test { ), "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')" ), + // complex time window index with where + ( + "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE number in (2, 3, 4) GROUP BY time_window;", + Timestamp::new(1740394109, TimeUnit::Second), + ( + "ts".to_string(), + Some(Timestamp::new(1740394080, TimeUnit::Second)), + Some(Timestamp::new(1740394140, TimeUnit::Second)), + ), + "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE numbers_with_ts.number IN (2, 3, 4) AND ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')" + ), + // complex time window index with between and + ( + "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE number BETWEEN 2 AND 4 GROUP BY time_window;", + Timestamp::new(1740394109, TimeUnit::Second), + ( + "ts".to_string(), + Some(Timestamp::new(1740394080, TimeUnit::Second)), + Some(Timestamp::new(1740394140, TimeUnit::Second)), + ), + "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE (numbers_with_ts.number BETWEEN 2 AND 4) AND ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')" + ), // no time index ( "SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;", diff --git a/src/flow/src/batching_mode/utils.rs b/src/flow/src/batching_mode/utils.rs index 7aa6a8b12f..117db03665 100644 --- a/src/flow/src/batching_mode/utils.rs +++ b/src/flow/src/batching_mode/utils.rs @@ -342,8 +342,8 @@ impl TreeNodeRewriter for AddAutoColumnRewriter { } } else { return Err(DataFusionError::Plan(format!( - "Expect table have 0,1 or 2 columns more than query columns, found {} query columns {:?}, {} table columns {:?} at node {:?}", - query_col_cnt, exprs, table_col_cnt, self.schema.column_schemas(), node + "Expect table have 0,1 or 2 columns more than query columns, found {} query columns {:?}, {} table columns {:?}", + query_col_cnt, exprs, table_col_cnt, self.schema.column_schemas() ))); } @@ -406,7 +406,9 @@ mod test { use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema}; use pretty_assertions::assert_eq; + use query::query_engine::DefaultSerializer; use session::context::QueryContext; + use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use super::*; use crate::test_utils::create_test_query_engine; @@ -701,4 +703,18 @@ mod test { ); } } + + #[tokio::test] + async fn test_null_cast() { + let query_engine = create_test_query_engine(); + let ctx = QueryContext::arc(); + let sql = "SELECT NULL::DOUBLE FROM numbers_with_ts"; + let plan = sql_to_df_plan(ctx, query_engine.clone(), sql, false) + .await + .unwrap(); + + let _sub_plan = DFLogicalSubstraitConvertor {} + .encode(&plan, DefaultSerializer) + .unwrap(); + } } diff --git a/src/flow/src/df_optimizer.rs b/src/flow/src/df_optimizer.rs index d83bb77718..bef5b3ed79 100644 --- a/src/flow/src/df_optimizer.rs +++ b/src/flow/src/df_optimizer.rs @@ -25,7 +25,6 @@ use datafusion::config::ConfigOptions; use datafusion::error::DataFusionError; use datafusion::functions_aggregate::count::count_udaf; use datafusion::functions_aggregate::sum::sum_udaf; -use datafusion::optimizer::analyzer::count_wildcard_rule::CountWildcardRule; use datafusion::optimizer::analyzer::type_coercion::TypeCoercion; use datafusion::optimizer::common_subexpr_eliminate::CommonSubexprEliminate; use datafusion::optimizer::optimize_projections::OptimizeProjections; @@ -42,6 +41,7 @@ use datafusion_expr::{ BinaryExpr, ColumnarValue, Expr, Operator, Projection, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, }; +use query::optimizer::count_wildcard::CountWildcardToTimeIndexRule; use query::parser::QueryLanguageParser; use query::query_engine::DefaultSerializer; use query::QueryEngine; @@ -61,9 +61,9 @@ pub async fn apply_df_optimizer( ) -> Result { let cfg = ConfigOptions::new(); let analyzer = Analyzer::with_rules(vec![ - Arc::new(CountWildcardRule::new()), - Arc::new(AvgExpandRule::new()), - Arc::new(TumbleExpandRule::new()), + Arc::new(CountWildcardToTimeIndexRule), + Arc::new(AvgExpandRule), + Arc::new(TumbleExpandRule), Arc::new(CheckGroupByRule::new()), Arc::new(TypeCoercion::new()), ]); @@ -128,13 +128,7 @@ pub async fn sql_to_flow_plan( } #[derive(Debug)] -struct AvgExpandRule {} - -impl AvgExpandRule { - pub fn new() -> Self { - Self {} - } -} +struct AvgExpandRule; impl AnalyzerRule for AvgExpandRule { fn analyze( @@ -331,13 +325,7 @@ impl TreeNodeRewriter for ExpandAvgRewriter<'_> { /// expand tumble in aggr expr to tumble_start and tumble_end with column name like `window_start` #[derive(Debug)] -struct TumbleExpandRule {} - -impl TumbleExpandRule { - pub fn new() -> Self { - Self {} - } -} +struct TumbleExpandRule; impl AnalyzerRule for TumbleExpandRule { fn analyze( diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index 26fbfb27cd..0aed2860e9 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -28,7 +28,7 @@ pub mod error; pub mod executor; pub mod log_query; pub mod metrics; -mod optimizer; +pub mod optimizer; pub mod options; pub mod parser; mod part_sort; diff --git a/tests/cases/standalone/common/flow/flow_step_aggr.result b/tests/cases/standalone/common/flow/flow_step_aggr.result new file mode 100644 index 0000000000..ab76a67617 --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_step_aggr.result @@ -0,0 +1,266 @@ +CREATE TABLE access_log ( + "url" STRING, + user_id BIGINT, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY ("url", user_id) +); + +Affected Rows: 0 + +CREATE TABLE access_log_10s ( + "url" STRING, + time_window timestamp time INDEX, + state BINARY, + PRIMARY KEY ("url") +); + +Affected Rows: 0 + +CREATE FLOW calc_access_log_10s SINK TO access_log_10s +AS +SELECT + "url", + date_bin('10s'::INTERVAL, ts) AS time_window, + hll(user_id) AS state +FROM + access_log +GROUP BY + "url", + time_window; + +Affected Rows: 0 + +-- insert 4 rows of data +INSERT INTO access_log VALUES + ("/dashboard", 1, "2025-03-04 00:00:00"), + ("/dashboard", 1, "2025-03-04 00:00:01"), + ("/dashboard", 2, "2025-03-04 00:00:05"), + ("/not_found", 3, "2025-03-04 00:00:11"), + ("/dashboard", 4, "2025-03-04 00:00:15"); + +Affected Rows: 5 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_access_log_10s'); + ++-----------------------------------------+ +| ADMIN FLUSH_FLOW('calc_access_log_10s') | ++-----------------------------------------+ +| FLOW_FLUSHED | ++-----------------------------------------+ + +-- query should return 3 rows +SELECT "url", time_window FROM access_log_10s +ORDER BY + time_window; + ++------------+---------------------+ +| url | time_window | ++------------+---------------------+ +| /dashboard | 2025-03-04T00:00:00 | +| /dashboard | 2025-03-04T00:00:10 | +| /not_found | 2025-03-04T00:00:10 | ++------------+---------------------+ + +-- use hll_count to query the approximate data in access_log_10s +SELECT "url", time_window, hll_count(state) FROM access_log_10s +ORDER BY + time_window; + ++------------+---------------------+---------------------------------+ +| url | time_window | hll_count(access_log_10s.state) | ++------------+---------------------+---------------------------------+ +| /dashboard | 2025-03-04T00:00:00 | 2 | +| /dashboard | 2025-03-04T00:00:10 | 1 | +| /not_found | 2025-03-04T00:00:10 | 1 | ++------------+---------------------+---------------------------------+ + +-- further, we can aggregate 10 seconds of data to every minute, by using hll_merge to merge 10 seconds of hyperloglog state +SELECT + "url", + date_bin('1 minute'::INTERVAL, time_window) AS time_window_1m, + hll_count(hll_merge(state)) as uv_per_min +FROM + access_log_10s +GROUP BY + "url", + time_window_1m +ORDER BY + time_window_1m; + ++------------+---------------------+------------+ +| url | time_window_1m | uv_per_min | ++------------+---------------------+------------+ +| /not_found | 2025-03-04T00:00:00 | 1 | +| /dashboard | 2025-03-04T00:00:00 | 3 | ++------------+---------------------+------------+ + +DROP FLOW calc_access_log_10s; + +Affected Rows: 0 + +DROP TABLE access_log_10s; + +Affected Rows: 0 + +DROP TABLE access_log; + +Affected Rows: 0 + +CREATE TABLE percentile_base ( + "id" INT PRIMARY KEY, + "value" DOUBLE, + ts timestamp(0) time index +); + +Affected Rows: 0 + +CREATE TABLE percentile_5s ( + "percentile_state" BINARY, + time_window timestamp(0) time index +); + +Affected Rows: 0 + +CREATE FLOW calc_percentile_5s SINK TO percentile_5s +AS +SELECT + uddsketch_state(128, 0.01, "value") AS "value", + date_bin('5 seconds'::INTERVAL, ts) AS time_window +FROM + percentile_base +WHERE + "value" > 0 AND "value" < 70 +GROUP BY + time_window; + +Affected Rows: 0 + +INSERT INTO percentile_base ("id", "value", ts) VALUES + (1, 10.0, 1), + (2, 20.0, 2), + (3, 30.0, 3), + (4, 40.0, 4), + (5, 50.0, 5), + (6, 60.0, 6), + (7, 70.0, 7), + (8, 80.0, 8), + (9, 90.0, 9), + (10, 100.0, 10); + +Affected Rows: 10 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_percentile_5s'); + ++----------------------------------------+ +| ADMIN FLUSH_FLOW('calc_percentile_5s') | ++----------------------------------------+ +| FLOW_FLUSHED | ++----------------------------------------+ + +SELECT + time_window, + uddsketch_calc(0.99, `percentile_state`) AS p99 +FROM + percentile_5s +ORDER BY + time_window; + ++---------------------+--------------------+ +| time_window | p99 | ++---------------------+--------------------+ +| 1970-01-01T00:00:00 | 40.04777053326359 | +| 1970-01-01T00:00:05 | 59.745049810145126 | ++---------------------+--------------------+ + +DROP FLOW calc_percentile_5s; + +Affected Rows: 0 + +DROP TABLE percentile_5s; + +Affected Rows: 0 + +DROP TABLE percentile_base; + +Affected Rows: 0 + +CREATE TABLE percentile_base ( + "id" INT PRIMARY KEY, + "value" DOUBLE, + ts timestamp(0) time index +); + +Affected Rows: 0 + +CREATE TABLE percentile_5s ( + "percentile_state" BINARY, + time_window timestamp(0) time index +); + +Affected Rows: 0 + +CREATE FLOW calc_percentile_5s SINK TO percentile_5s +AS +SELECT + uddsketch_state(128, 0.01, CASE WHEN "value" > 0 AND "value" < 70 THEN "value" ELSE NULL END) AS "value", + date_bin('5 seconds'::INTERVAL, ts) AS time_window +FROM + percentile_base +GROUP BY + time_window; + +Affected Rows: 0 + +INSERT INTO percentile_base ("id", "value", ts) VALUES + (1, 10.0, 1), + (2, 20.0, 2), + (3, 30.0, 3), + (4, 40.0, 4), + (5, 50.0, 5), + (6, 60.0, 6), + (7, 70.0, 7), + (8, 80.0, 8), + (9, 90.0, 9), + (10, 100.0, 10); + +Affected Rows: 10 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_percentile_5s'); + ++----------------------------------------+ +| ADMIN FLUSH_FLOW('calc_percentile_5s') | ++----------------------------------------+ +| FLOW_FLUSHED | ++----------------------------------------+ + +SELECT + time_window, + uddsketch_calc(0.99, percentile_state) AS p99 +FROM + percentile_5s +ORDER BY + time_window; + ++---------------------+--------------------+ +| time_window | p99 | ++---------------------+--------------------+ +| 1970-01-01T00:00:00 | 40.04777053326359 | +| 1970-01-01T00:00:05 | 59.745049810145126 | +| 1970-01-01T00:00:10 | | ++---------------------+--------------------+ + +DROP FLOW calc_percentile_5s; + +Affected Rows: 0 + +DROP TABLE percentile_5s; + +Affected Rows: 0 + +DROP TABLE percentile_base; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_step_aggr.sql b/tests/cases/standalone/common/flow/flow_step_aggr.sql new file mode 100644 index 0000000000..44dde88912 --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_step_aggr.sql @@ -0,0 +1,161 @@ +CREATE TABLE access_log ( + "url" STRING, + user_id BIGINT, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY ("url", user_id) +); + +CREATE TABLE access_log_10s ( + "url" STRING, + time_window timestamp time INDEX, + state BINARY, + PRIMARY KEY ("url") +); + +CREATE FLOW calc_access_log_10s SINK TO access_log_10s +AS +SELECT + "url", + date_bin('10s'::INTERVAL, ts) AS time_window, + hll(user_id) AS state +FROM + access_log +GROUP BY + "url", + time_window; + +-- insert 4 rows of data +INSERT INTO access_log VALUES + ("/dashboard", 1, "2025-03-04 00:00:00"), + ("/dashboard", 1, "2025-03-04 00:00:01"), + ("/dashboard", 2, "2025-03-04 00:00:05"), + ("/not_found", 3, "2025-03-04 00:00:11"), + ("/dashboard", 4, "2025-03-04 00:00:15"); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_access_log_10s'); + +-- query should return 3 rows +SELECT "url", time_window FROM access_log_10s +ORDER BY + time_window; + +-- use hll_count to query the approximate data in access_log_10s +SELECT "url", time_window, hll_count(state) FROM access_log_10s +ORDER BY + time_window; + +-- further, we can aggregate 10 seconds of data to every minute, by using hll_merge to merge 10 seconds of hyperloglog state +SELECT + "url", + date_bin('1 minute'::INTERVAL, time_window) AS time_window_1m, + hll_count(hll_merge(state)) as uv_per_min +FROM + access_log_10s +GROUP BY + "url", + time_window_1m +ORDER BY + time_window_1m; + +DROP FLOW calc_access_log_10s; +DROP TABLE access_log_10s; +DROP TABLE access_log; + +CREATE TABLE percentile_base ( + "id" INT PRIMARY KEY, + "value" DOUBLE, + ts timestamp(0) time index +); + +CREATE TABLE percentile_5s ( + "percentile_state" BINARY, + time_window timestamp(0) time index +); + +CREATE FLOW calc_percentile_5s SINK TO percentile_5s +AS +SELECT + uddsketch_state(128, 0.01, "value") AS "value", + date_bin('5 seconds'::INTERVAL, ts) AS time_window +FROM + percentile_base +WHERE + "value" > 0 AND "value" < 70 +GROUP BY + time_window; + +INSERT INTO percentile_base ("id", "value", ts) VALUES + (1, 10.0, 1), + (2, 20.0, 2), + (3, 30.0, 3), + (4, 40.0, 4), + (5, 50.0, 5), + (6, 60.0, 6), + (7, 70.0, 7), + (8, 80.0, 8), + (9, 90.0, 9), + (10, 100.0, 10); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_percentile_5s'); + +SELECT + time_window, + uddsketch_calc(0.99, `percentile_state`) AS p99 +FROM + percentile_5s +ORDER BY + time_window; + +DROP FLOW calc_percentile_5s; +DROP TABLE percentile_5s; +DROP TABLE percentile_base; + +CREATE TABLE percentile_base ( + "id" INT PRIMARY KEY, + "value" DOUBLE, + ts timestamp(0) time index +); + +CREATE TABLE percentile_5s ( + "percentile_state" BINARY, + time_window timestamp(0) time index +); + +CREATE FLOW calc_percentile_5s SINK TO percentile_5s +AS +SELECT + uddsketch_state(128, 0.01, CASE WHEN "value" > 0 AND "value" < 70 THEN "value" ELSE NULL END) AS "value", + date_bin('5 seconds'::INTERVAL, ts) AS time_window +FROM + percentile_base +GROUP BY + time_window; + +INSERT INTO percentile_base ("id", "value", ts) VALUES + (1, 10.0, 1), + (2, 20.0, 2), + (3, 30.0, 3), + (4, 40.0, 4), + (5, 50.0, 5), + (6, 60.0, 6), + (7, 70.0, 7), + (8, 80.0, 8), + (9, 90.0, 9), + (10, 100.0, 10); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_percentile_5s'); + +SELECT + time_window, + uddsketch_calc(0.99, percentile_state) AS p99 +FROM + percentile_5s +ORDER BY + time_window; + +DROP FLOW calc_percentile_5s; +DROP TABLE percentile_5s; +DROP TABLE percentile_base;