From 63acc30ce744283b105db2274d2e110a5d71a63e Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 12 Jul 2024 14:56:13 +0800 Subject: [PATCH] =?UTF-8?q?perf:=20fine=E2=80=93tuned=20plan=20steps=20(#4?= =?UTF-8?q?258)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * perf: fine–tuned plan steps Signed-off-by: Ruihang Xia * fix clippy Signed-off-by: Ruihang Xia * handle explain plan Signed-off-by: Ruihang Xia * handle explain plan again Signed-off-by: Ruihang Xia * fix clippy Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- Cargo.lock | 47 ++++++++++++++---------------- Cargo.toml | 18 ++++++------ src/query/src/datafusion.rs | 36 ++++++++++++++++++++++- src/query/src/dist_plan/planner.rs | 16 +++++----- 4 files changed, 73 insertions(+), 44 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0acae30c2c..fdf1952380 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -739,12 +739,9 @@ dependencies = [ [[package]] name = "atomic" -version = "0.6.0" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d818003e740b63afc82337e3160717f4f63078720a810b7b903e70a5d1d2994" -dependencies = [ - "bytemuck", -] +checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba" [[package]] name = "atomic-waker" @@ -2766,7 +2763,7 @@ checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" [[package]] name = "datafusion" version = "38.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8" dependencies = [ "ahash 0.8.11", "arrow", @@ -2818,7 +2815,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "38.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8" dependencies = [ "ahash 0.8.11", "arrow", @@ -2839,7 +2836,7 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" version = "38.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8" dependencies = [ "tokio", ] @@ -2847,7 +2844,7 @@ dependencies = [ [[package]] name = "datafusion-execution" version = "38.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8" dependencies = [ "arrow", "chrono", @@ -2867,7 +2864,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "38.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8" dependencies = [ "ahash 0.8.11", "arrow", @@ -2884,7 +2881,7 @@ dependencies = [ [[package]] name = "datafusion-functions" version = "38.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8" dependencies = [ "arrow", "base64 0.22.1", @@ -2910,7 +2907,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" version = "38.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8" dependencies = [ "ahash 0.8.11", "arrow", @@ -2927,7 +2924,7 @@ dependencies = [ [[package]] name = "datafusion-functions-array" version = "38.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8" dependencies = [ "arrow", "arrow-array", @@ -2946,7 +2943,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "38.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8" dependencies = [ "arrow", "async-trait", @@ -2964,7 +2961,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "38.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8" dependencies = [ "ahash 0.8.11", "arrow", @@ -2994,7 +2991,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" version = "38.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8" dependencies = [ "arrow", "datafusion-common", @@ -3005,7 +3002,7 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" version = "38.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8" dependencies = [ "ahash 0.8.11", "arrow", @@ -3038,7 +3035,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "38.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8" dependencies = [ "arrow", "arrow-array", @@ -3054,7 +3051,7 @@ dependencies = [ [[package]] name = "datafusion-substrait" version = "38.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8" dependencies = [ "async-recursion", "chrono", @@ -6410,9 +6407,9 @@ dependencies = [ [[package]] name = "multimap" -version = "0.8.3" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" +checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" [[package]] name = "mur3" @@ -12900,9 +12897,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.9.1" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5de17fd2f7da591098415cff336e12965a28061ddace43b59cb3c430179c9439" +checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" dependencies = [ "atomic", "getrandom", @@ -12913,9 +12910,9 @@ dependencies = [ [[package]] name = "uuid-macro-internal" -version = "1.9.1" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3ff64d5cde1e2cb5268bdb497235b6bd255ba8244f910dbc3574e59593de68c" +checksum = "9881bea7cbe687e36c9ab3b778c36cd0487402e270304e8b1296d5085303c1a2" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 26d2baf2b7..3985bb5255 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -104,15 +104,15 @@ clap = { version = "4.4", features = ["derive"] } config = "0.13.0" crossbeam-utils = "0.8" dashmap = "5.4" -datafusion = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" } -datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" } -datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" } -datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" } -datafusion-optimizer = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" } -datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" } -datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" } -datafusion-sql = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" } -datafusion-substrait = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" } +datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" } +datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" } +datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" } +datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" } +datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" } +datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" } +datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" } +datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" } +datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" } derive_builder = "0.12" dotenv = "0.15" etcd-client = { version = "0.13" } diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 14c01b05d3..fe57987a5c 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -48,6 +48,7 @@ use table::TableRef; use crate::analyze::DistAnalyzeExec; use crate::dataframe::DataFrame; pub use crate::datafusion::planner::DfContextProviderAdapter; +use crate::dist_plan::MergeScanLogicalPlan; use crate::error::{ CatalogSnafu, CreateRecordBatchSnafu, DataFusionSnafu, MissingTableMutationHandlerSnafu, MissingTimestampColumnSnafu, QueryExecutionSnafu, Result, TableMutationSnafu, @@ -373,8 +374,41 @@ impl PhysicalPlanner for DatafusionQueryEngine { match logical_plan { LogicalPlan::DfPlan(df_plan) => { let state = ctx.state(); + + // special handle EXPLAIN plan + if matches!(df_plan, DfLogicalPlan::Explain(_)) { + return state + .create_physical_plan(df_plan) + .await + .context(error::DatafusionSnafu) + .map_err(BoxedError::new) + .context(QueryExecutionSnafu); + } + + // analyze first + let analyzed_plan = state + .analyzer() + .execute_and_check(df_plan.clone(), state.config_options(), |_, _| {}) + .context(error::DatafusionSnafu) + .map_err(BoxedError::new) + .context(QueryExecutionSnafu)?; + // skip optimize for MergeScan + let optimized_plan = if let DfLogicalPlan::Extension(ext) = &analyzed_plan + && ext.node.name() == MergeScanLogicalPlan::name() + { + analyzed_plan.clone() + } else { + state + .optimizer() + .optimize(analyzed_plan, state, |_, _| {}) + .context(error::DatafusionSnafu) + .map_err(BoxedError::new) + .context(QueryExecutionSnafu)? + }; + let physical_plan = state - .create_physical_plan(df_plan) + .query_planner() + .create_physical_plan(&optimized_plan, state) .await .context(error::DatafusionSnafu) .map_err(BoxedError::new) diff --git a/src/query/src/dist_plan/planner.rs b/src/query/src/dist_plan/planner.rs index 41227e8687..73168ff1bd 100644 --- a/src/query/src/dist_plan/planner.rs +++ b/src/query/src/dist_plan/planner.rs @@ -27,7 +27,6 @@ use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner}; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion_common::TableReference; use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode}; -use datafusion_optimizer::analyzer::Analyzer; use session::context::QueryContext; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; @@ -72,8 +71,9 @@ impl ExtensionPlanner for DistExtensionPlanner { let input_plan = merge_scan.input(); let fallback = |logical_plan| async move { + let optimized_plan = self.optimize_input_logical_plan(session_state, logical_plan)?; planner - .create_physical_plan(logical_plan, session_state) + .create_physical_plan(&optimized_plan, session_state) .await .map(Some) }; @@ -83,15 +83,15 @@ impl ExtensionPlanner for DistExtensionPlanner { return fallback(input_plan).await; } - let optimized_plan = self.optimize_input_logical_plan(session_state, input_plan)?; + let optimized_plan = input_plan; let Some(table_name) = Self::extract_full_table_name(input_plan)? else { // no relation found in input plan, going to execute them locally - return fallback(&optimized_plan).await; + return fallback(optimized_plan).await; }; let Ok(regions) = self.get_regions(&table_name).await else { // no peers found, going to execute them locally - return fallback(&optimized_plan).await; + return fallback(optimized_plan).await; }; // TODO(ruihang): generate different execution plans for different variant merge operation @@ -137,16 +137,14 @@ impl DistExtensionPlanner { Ok(table.table_info().region_ids()) } - // TODO(ruihang): find a more elegant way to optimize input logical plan + /// Input logical plan is analyzed. Thus only call logical optimizer to optimize it. fn optimize_input_logical_plan( &self, session_state: &SessionState, plan: &LogicalPlan, ) -> Result { let state = session_state.clone(); - let analyzer = Analyzer::default(); - let state = state.with_analyzer_rules(analyzer.rules); - state.optimize(plan) + state.optimizer().optimize(plan.clone(), &state, |_, _| {}) } }