From 8638075cdde7064c069d5b349f8f024d10cba51c Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 10 Apr 2025 16:26:31 +0800 Subject: [PATCH] feat: implement commutativity rule for prom-related plans Signed-off-by: Ruihang Xia --- Cargo.lock | 48 +++++++++---------- Cargo.toml | 18 +++---- .../src/extension_plan/series_divide.rs | 4 ++ src/query/src/dist_plan/commutativity.rs | 29 ++++++++--- 4 files changed, 60 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 188f9ce144..a9ec65de9c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2895,7 +2895,7 @@ checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" [[package]] name = "datafusion" version = "45.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663" dependencies = [ "arrow", "arrow-array", @@ -2946,7 +2946,7 @@ dependencies = [ [[package]] name = "datafusion-catalog" version = "45.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663" dependencies = [ "arrow", "async-trait", @@ -2966,7 +2966,7 @@ dependencies = [ [[package]] name = "datafusion-catalog-listing" version = "45.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663" dependencies = [ "arrow", "arrow-schema", @@ -2989,7 +2989,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "45.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663" dependencies = [ "ahash 0.8.11", "arrow", @@ -3014,7 +3014,7 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" version = "45.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663" dependencies = [ "log", "tokio", @@ -3023,12 +3023,12 @@ dependencies = [ [[package]] name = "datafusion-doc" version = "45.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663" [[package]] name = "datafusion-execution" version = "45.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663" dependencies = [ "arrow", "dashmap", @@ -3046,7 +3046,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "45.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663" dependencies = [ "arrow", "chrono", @@ -3066,7 +3066,7 @@ dependencies = [ [[package]] name = "datafusion-expr-common" version = "45.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663" dependencies = [ "arrow", "datafusion-common", @@ -3077,7 +3077,7 @@ dependencies = [ [[package]] name = "datafusion-functions" version = "45.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663" dependencies = [ "arrow", "arrow-buffer", @@ -3106,7 +3106,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" version = "45.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663" dependencies = [ "ahash 0.8.11", "arrow", @@ -3127,7 +3127,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate-common" version = "45.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663" dependencies = [ "ahash 0.8.11", "arrow", @@ -3139,7 +3139,7 @@ dependencies = [ [[package]] name = "datafusion-functions-nested" version = "45.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663" dependencies = [ "arrow", "arrow-array", @@ -3161,7 +3161,7 @@ dependencies = [ [[package]] name = "datafusion-functions-table" version = "45.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663" dependencies = [ "arrow", "async-trait", @@ -3176,7 +3176,7 @@ dependencies = [ [[package]] name = "datafusion-functions-window" version = "45.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663" dependencies = [ "datafusion-common", "datafusion-doc", @@ -3192,7 +3192,7 @@ dependencies = [ [[package]] name = "datafusion-functions-window-common" version = "45.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663" dependencies = [ "datafusion-common", "datafusion-physical-expr-common", @@ -3201,7 +3201,7 @@ dependencies = [ [[package]] name = "datafusion-macros" version = "45.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663" dependencies = [ "datafusion-expr", "quote", @@ -3211,7 +3211,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "45.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663" dependencies = [ "arrow", "chrono", @@ -3229,7 +3229,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "45.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663" dependencies = [ "ahash 0.8.11", "arrow", @@ -3252,7 +3252,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" version = "45.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663" dependencies = [ "ahash 0.8.11", "arrow", @@ -3265,7 +3265,7 @@ dependencies = [ [[package]] name = "datafusion-physical-optimizer" version = "45.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663" dependencies = [ "arrow", "arrow-schema", @@ -3286,7 +3286,7 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" version = "45.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663" dependencies = [ "ahash 0.8.11", "arrow", @@ -3316,7 +3316,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "45.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663" dependencies = [ "arrow", "arrow-array", @@ -3334,7 +3334,7 @@ dependencies = [ [[package]] name = "datafusion-substrait" version = "45.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663" dependencies = [ "async-recursion", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 9c36a76805..ba0fb9a889 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -113,15 +113,15 @@ clap = { version = "4.4", features = ["derive"] } config = "0.13.0" crossbeam-utils = "0.8" dashmap = "6.1" -datafusion = { git = "https://github.com/apache/datafusion.git", rev = "8ebed674dd71f8a466f658626877944cd16a4375" } -datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "8ebed674dd71f8a466f658626877944cd16a4375" } -datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "8ebed674dd71f8a466f658626877944cd16a4375" } -datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "8ebed674dd71f8a466f658626877944cd16a4375" } -datafusion-optimizer = { git = "https://github.com/apache/datafusion.git", rev = "8ebed674dd71f8a466f658626877944cd16a4375" } -datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "8ebed674dd71f8a466f658626877944cd16a4375" } -datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "8ebed674dd71f8a466f658626877944cd16a4375" } -datafusion-sql = { git = "https://github.com/apache/datafusion.git", rev = "8ebed674dd71f8a466f658626877944cd16a4375" } -datafusion-substrait = { git = "https://github.com/apache/datafusion.git", rev = "8ebed674dd71f8a466f658626877944cd16a4375" } +datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "90dba8f3ff278df3a6b1de64e145c275e7e84663" } +datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "90dba8f3ff278df3a6b1de64e145c275e7e84663" } +datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "90dba8f3ff278df3a6b1de64e145c275e7e84663" } +datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "90dba8f3ff278df3a6b1de64e145c275e7e84663" } +datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "90dba8f3ff278df3a6b1de64e145c275e7e84663" } +datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "90dba8f3ff278df3a6b1de64e145c275e7e84663" } +datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "90dba8f3ff278df3a6b1de64e145c275e7e84663" } +datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "90dba8f3ff278df3a6b1de64e145c275e7e84663" } +datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "90dba8f3ff278df3a6b1de64e145c275e7e84663" } deadpool = "0.12" deadpool-postgres = "0.14" derive_builder = "0.20" diff --git a/src/promql/src/extension_plan/series_divide.rs b/src/promql/src/extension_plan/series_divide.rs index 06ef942762..de0ebf1e66 100644 --- a/src/promql/src/extension_plan/series_divide.rs +++ b/src/promql/src/extension_plan/series_divide.rs @@ -106,6 +106,10 @@ impl SeriesDivide { }) } + pub fn tags(&self) -> &[String] { + &self.tag_columns + } + pub fn serialize(&self) -> Vec { pb::SeriesDivide { tag_columns: self.tag_columns.clone(), diff --git a/src/query/src/dist_plan/commutativity.rs b/src/query/src/dist_plan/commutativity.rs index 5b3cb0f2db..19ef70c3d5 100644 --- a/src/query/src/dist_plan/commutativity.rs +++ b/src/query/src/dist_plan/commutativity.rs @@ -94,7 +94,7 @@ impl Categorizer { } } LogicalPlan::Extension(extension) => { - Self::check_extension_plan(extension.node.as_ref() as _) + Self::check_extension_plan(extension.node.as_ref() as _, &partition_cols) } LogicalPlan::Distinct(_) => { if partition_cols.is_empty() { @@ -116,13 +116,30 @@ impl Categorizer { } } - pub fn check_extension_plan(plan: &dyn UserDefinedLogicalNode) -> Commutativity { + pub fn check_extension_plan( + plan: &dyn UserDefinedLogicalNode, + partition_cols: &[String], + ) -> Commutativity { match plan.name() { - name if name == EmptyMetric::name() + name if name == SeriesDivide::name() => { + let series_divide = plan.as_any().downcast_ref::().unwrap(); + let tags = series_divide.tags().into_iter().collect::>(); + for partition_col in partition_cols { + if !tags.contains(partition_col) { + return Commutativity::NonCommutative; + } + } + Commutativity::Commutative + } + name if name == SeriesNormalize::name() || name == InstantManipulate::name() - || name == SeriesNormalize::name() - || name == RangeManipulate::name() - || name == SeriesDivide::name() + || name == RangeManipulate::name() => + { + // They should always follows Series Divide. + // Either commutative or non-commutative (which will be blocked by SeriesDivide). + Commutativity::Commutative + } + name if name == EmptyMetric::name() || name == MergeScanLogicalPlan::name() || name == MergeSortLogicalPlan::name() => {