perf: fine–tuned plan steps (#4258)

* perf: fine–tuned plan steps

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* handle explain plan

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* handle explain plan again

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2024-07-12 14:56:13 +08:00
committed by GitHub
parent 285ffc5850
commit 63acc30ce7
4 changed files with 73 additions and 44 deletions

47
Cargo.lock generated
View File

@@ -739,12 +739,9 @@ dependencies = [
[[package]]
name = "atomic"
version = "0.6.0"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d818003e740b63afc82337e3160717f4f63078720a810b7b903e70a5d1d2994"
dependencies = [
"bytemuck",
]
checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba"
[[package]]
name = "atomic-waker"
@@ -2766,7 +2763,7 @@ checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2"
[[package]]
name = "datafusion"
version = "38.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8"
dependencies = [
"ahash 0.8.11",
"arrow",
@@ -2818,7 +2815,7 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "38.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8"
dependencies = [
"ahash 0.8.11",
"arrow",
@@ -2839,7 +2836,7 @@ dependencies = [
[[package]]
name = "datafusion-common-runtime"
version = "38.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8"
dependencies = [
"tokio",
]
@@ -2847,7 +2844,7 @@ dependencies = [
[[package]]
name = "datafusion-execution"
version = "38.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8"
dependencies = [
"arrow",
"chrono",
@@ -2867,7 +2864,7 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "38.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8"
dependencies = [
"ahash 0.8.11",
"arrow",
@@ -2884,7 +2881,7 @@ dependencies = [
[[package]]
name = "datafusion-functions"
version = "38.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8"
dependencies = [
"arrow",
"base64 0.22.1",
@@ -2910,7 +2907,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-aggregate"
version = "38.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8"
dependencies = [
"ahash 0.8.11",
"arrow",
@@ -2927,7 +2924,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-array"
version = "38.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8"
dependencies = [
"arrow",
"arrow-array",
@@ -2946,7 +2943,7 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
version = "38.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8"
dependencies = [
"arrow",
"async-trait",
@@ -2964,7 +2961,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "38.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8"
dependencies = [
"ahash 0.8.11",
"arrow",
@@ -2994,7 +2991,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr-common"
version = "38.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8"
dependencies = [
"arrow",
"datafusion-common",
@@ -3005,7 +3002,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-plan"
version = "38.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8"
dependencies = [
"ahash 0.8.11",
"arrow",
@@ -3038,7 +3035,7 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "38.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8"
dependencies = [
"arrow",
"arrow-array",
@@ -3054,7 +3051,7 @@ dependencies = [
[[package]]
name = "datafusion-substrait"
version = "38.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=729b356ef543ffcda6813c7b5373507a04ae0109#729b356ef543ffcda6813c7b5373507a04ae0109"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=d7bda5c9b762426e81f144296deadc87e5f4a0b8#d7bda5c9b762426e81f144296deadc87e5f4a0b8"
dependencies = [
"async-recursion",
"chrono",
@@ -6410,9 +6407,9 @@ dependencies = [
[[package]]
name = "multimap"
version = "0.8.3"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03"
[[package]]
name = "mur3"
@@ -12900,9 +12897,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "uuid"
version = "1.9.1"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5de17fd2f7da591098415cff336e12965a28061ddace43b59cb3c430179c9439"
checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0"
dependencies = [
"atomic",
"getrandom",
@@ -12913,9 +12910,9 @@ dependencies = [
[[package]]
name = "uuid-macro-internal"
version = "1.9.1"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3ff64d5cde1e2cb5268bdb497235b6bd255ba8244f910dbc3574e59593de68c"
checksum = "9881bea7cbe687e36c9ab3b778c36cd0487402e270304e8b1296d5085303c1a2"
dependencies = [
"proc-macro2",
"quote",

View File

@@ -104,15 +104,15 @@ clap = { version = "4.4", features = ["derive"] }
config = "0.13.0"
crossbeam-utils = "0.8"
dashmap = "5.4"
datafusion = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion-optimizer = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion-sql = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion-substrait = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" }
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" }
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" }
datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" }
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" }
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" }
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" }
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" }
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "d7bda5c9b762426e81f144296deadc87e5f4a0b8" }
derive_builder = "0.12"
dotenv = "0.15"
etcd-client = { version = "0.13" }

View File

@@ -48,6 +48,7 @@ use table::TableRef;
use crate::analyze::DistAnalyzeExec;
use crate::dataframe::DataFrame;
pub use crate::datafusion::planner::DfContextProviderAdapter;
use crate::dist_plan::MergeScanLogicalPlan;
use crate::error::{
CatalogSnafu, CreateRecordBatchSnafu, DataFusionSnafu, MissingTableMutationHandlerSnafu,
MissingTimestampColumnSnafu, QueryExecutionSnafu, Result, TableMutationSnafu,
@@ -373,8 +374,41 @@ impl PhysicalPlanner for DatafusionQueryEngine {
match logical_plan {
LogicalPlan::DfPlan(df_plan) => {
let state = ctx.state();
// special handle EXPLAIN plan
if matches!(df_plan, DfLogicalPlan::Explain(_)) {
return state
.create_physical_plan(df_plan)
.await
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu);
}
// analyze first
let analyzed_plan = state
.analyzer()
.execute_and_check(df_plan.clone(), state.config_options(), |_, _| {})
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
// skip optimize for MergeScan
let optimized_plan = if let DfLogicalPlan::Extension(ext) = &analyzed_plan
&& ext.node.name() == MergeScanLogicalPlan::name()
{
analyzed_plan.clone()
} else {
state
.optimizer()
.optimize(analyzed_plan, state, |_, _| {})
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?
};
let physical_plan = state
.create_physical_plan(df_plan)
.query_planner()
.create_physical_plan(&optimized_plan, state)
.await
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)

View File

@@ -27,7 +27,6 @@ use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_common::TableReference;
use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode};
use datafusion_optimizer::analyzer::Analyzer;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
@@ -72,8 +71,9 @@ impl ExtensionPlanner for DistExtensionPlanner {
let input_plan = merge_scan.input();
let fallback = |logical_plan| async move {
let optimized_plan = self.optimize_input_logical_plan(session_state, logical_plan)?;
planner
.create_physical_plan(logical_plan, session_state)
.create_physical_plan(&optimized_plan, session_state)
.await
.map(Some)
};
@@ -83,15 +83,15 @@ impl ExtensionPlanner for DistExtensionPlanner {
return fallback(input_plan).await;
}
let optimized_plan = self.optimize_input_logical_plan(session_state, input_plan)?;
let optimized_plan = input_plan;
let Some(table_name) = Self::extract_full_table_name(input_plan)? else {
// no relation found in input plan, going to execute them locally
return fallback(&optimized_plan).await;
return fallback(optimized_plan).await;
};
let Ok(regions) = self.get_regions(&table_name).await else {
// no peers found, going to execute them locally
return fallback(&optimized_plan).await;
return fallback(optimized_plan).await;
};
// TODO(ruihang): generate different execution plans for different variant merge operation
@@ -137,16 +137,14 @@ impl DistExtensionPlanner {
Ok(table.table_info().region_ids())
}
// TODO(ruihang): find a more elegant way to optimize input logical plan
/// Input logical plan is analyzed. Thus only call logical optimizer to optimize it.
fn optimize_input_logical_plan(
&self,
session_state: &SessionState,
plan: &LogicalPlan,
) -> Result<LogicalPlan> {
let state = session_state.clone();
let analyzer = Analyzer::default();
let state = state.with_analyzer_rules(analyzer.rules);
state.optimize(plan)
state.optimizer().optimize(plan.clone(), &state, |_, _| {})
}
}