feat: implement commutativity rule for prom-related plans

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-04-10 16:26:31 +08:00
parent 4b82ec7409
commit 8638075cdd
4 changed files with 60 additions and 39 deletions

48
Cargo.lock generated
View File

@@ -2895,7 +2895,7 @@ checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2"
[[package]]
name = "datafusion"
version = "45.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663"
dependencies = [
"arrow",
"arrow-array",
@@ -2946,7 +2946,7 @@ dependencies = [
[[package]]
name = "datafusion-catalog"
version = "45.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663"
dependencies = [
"arrow",
"async-trait",
@@ -2966,7 +2966,7 @@ dependencies = [
[[package]]
name = "datafusion-catalog-listing"
version = "45.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663"
dependencies = [
"arrow",
"arrow-schema",
@@ -2989,7 +2989,7 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "45.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663"
dependencies = [
"ahash 0.8.11",
"arrow",
@@ -3014,7 +3014,7 @@ dependencies = [
[[package]]
name = "datafusion-common-runtime"
version = "45.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663"
dependencies = [
"log",
"tokio",
@@ -3023,12 +3023,12 @@ dependencies = [
[[package]]
name = "datafusion-doc"
version = "45.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663"
[[package]]
name = "datafusion-execution"
version = "45.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663"
dependencies = [
"arrow",
"dashmap",
@@ -3046,7 +3046,7 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "45.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663"
dependencies = [
"arrow",
"chrono",
@@ -3066,7 +3066,7 @@ dependencies = [
[[package]]
name = "datafusion-expr-common"
version = "45.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663"
dependencies = [
"arrow",
"datafusion-common",
@@ -3077,7 +3077,7 @@ dependencies = [
[[package]]
name = "datafusion-functions"
version = "45.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663"
dependencies = [
"arrow",
"arrow-buffer",
@@ -3106,7 +3106,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-aggregate"
version = "45.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663"
dependencies = [
"ahash 0.8.11",
"arrow",
@@ -3127,7 +3127,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-aggregate-common"
version = "45.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663"
dependencies = [
"ahash 0.8.11",
"arrow",
@@ -3139,7 +3139,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-nested"
version = "45.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663"
dependencies = [
"arrow",
"arrow-array",
@@ -3161,7 +3161,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-table"
version = "45.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663"
dependencies = [
"arrow",
"async-trait",
@@ -3176,7 +3176,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-window"
version = "45.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663"
dependencies = [
"datafusion-common",
"datafusion-doc",
@@ -3192,7 +3192,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-window-common"
version = "45.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663"
dependencies = [
"datafusion-common",
"datafusion-physical-expr-common",
@@ -3201,7 +3201,7 @@ dependencies = [
[[package]]
name = "datafusion-macros"
version = "45.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663"
dependencies = [
"datafusion-expr",
"quote",
@@ -3211,7 +3211,7 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
version = "45.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663"
dependencies = [
"arrow",
"chrono",
@@ -3229,7 +3229,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "45.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663"
dependencies = [
"ahash 0.8.11",
"arrow",
@@ -3252,7 +3252,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr-common"
version = "45.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663"
dependencies = [
"ahash 0.8.11",
"arrow",
@@ -3265,7 +3265,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-optimizer"
version = "45.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663"
dependencies = [
"arrow",
"arrow-schema",
@@ -3286,7 +3286,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-plan"
version = "45.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663"
dependencies = [
"ahash 0.8.11",
"arrow",
@@ -3316,7 +3316,7 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "45.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663"
dependencies = [
"arrow",
"arrow-array",
@@ -3334,7 +3334,7 @@ dependencies = [
[[package]]
name = "datafusion-substrait"
version = "45.0.0"
source = "git+https://github.com/apache/datafusion.git?rev=8ebed674dd71f8a466f658626877944cd16a4375#8ebed674dd71f8a466f658626877944cd16a4375"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=90dba8f3ff278df3a6b1de64e145c275e7e84663#90dba8f3ff278df3a6b1de64e145c275e7e84663"
dependencies = [
"async-recursion",
"async-trait",

View File

@@ -113,15 +113,15 @@ clap = { version = "4.4", features = ["derive"] }
config = "0.13.0"
crossbeam-utils = "0.8"
dashmap = "6.1"
datafusion = { git = "https://github.com/apache/datafusion.git", rev = "8ebed674dd71f8a466f658626877944cd16a4375" }
datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "8ebed674dd71f8a466f658626877944cd16a4375" }
datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "8ebed674dd71f8a466f658626877944cd16a4375" }
datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "8ebed674dd71f8a466f658626877944cd16a4375" }
datafusion-optimizer = { git = "https://github.com/apache/datafusion.git", rev = "8ebed674dd71f8a466f658626877944cd16a4375" }
datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "8ebed674dd71f8a466f658626877944cd16a4375" }
datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "8ebed674dd71f8a466f658626877944cd16a4375" }
datafusion-sql = { git = "https://github.com/apache/datafusion.git", rev = "8ebed674dd71f8a466f658626877944cd16a4375" }
datafusion-substrait = { git = "https://github.com/apache/datafusion.git", rev = "8ebed674dd71f8a466f658626877944cd16a4375" }
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "90dba8f3ff278df3a6b1de64e145c275e7e84663" }
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "90dba8f3ff278df3a6b1de64e145c275e7e84663" }
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "90dba8f3ff278df3a6b1de64e145c275e7e84663" }
datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "90dba8f3ff278df3a6b1de64e145c275e7e84663" }
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "90dba8f3ff278df3a6b1de64e145c275e7e84663" }
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "90dba8f3ff278df3a6b1de64e145c275e7e84663" }
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "90dba8f3ff278df3a6b1de64e145c275e7e84663" }
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "90dba8f3ff278df3a6b1de64e145c275e7e84663" }
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "90dba8f3ff278df3a6b1de64e145c275e7e84663" }
deadpool = "0.12"
deadpool-postgres = "0.14"
derive_builder = "0.20"

View File

@@ -106,6 +106,10 @@ impl SeriesDivide {
})
}
pub fn tags(&self) -> &[String] {
&self.tag_columns
}
pub fn serialize(&self) -> Vec<u8> {
pb::SeriesDivide {
tag_columns: self.tag_columns.clone(),

View File

@@ -94,7 +94,7 @@ impl Categorizer {
}
}
LogicalPlan::Extension(extension) => {
Self::check_extension_plan(extension.node.as_ref() as _)
Self::check_extension_plan(extension.node.as_ref() as _, &partition_cols)
}
LogicalPlan::Distinct(_) => {
if partition_cols.is_empty() {
@@ -116,13 +116,30 @@ impl Categorizer {
}
}
pub fn check_extension_plan(plan: &dyn UserDefinedLogicalNode) -> Commutativity {
pub fn check_extension_plan(
plan: &dyn UserDefinedLogicalNode,
partition_cols: &[String],
) -> Commutativity {
match plan.name() {
name if name == EmptyMetric::name()
name if name == SeriesDivide::name() => {
let series_divide = plan.as_any().downcast_ref::<SeriesDivide>().unwrap();
let tags = series_divide.tags().into_iter().collect::<HashSet<_>>();
for partition_col in partition_cols {
if !tags.contains(partition_col) {
return Commutativity::NonCommutative;
}
}
Commutativity::Commutative
}
name if name == SeriesNormalize::name()
|| name == InstantManipulate::name()
|| name == SeriesNormalize::name()
|| name == RangeManipulate::name()
|| name == SeriesDivide::name()
|| name == RangeManipulate::name() =>
{
// They should always follows Series Divide.
// Either commutative or non-commutative (which will be blocked by SeriesDivide).
Commutativity::Commutative
}
name if name == EmptyMetric::name()
|| name == MergeScanLogicalPlan::name()
|| name == MergeSortLogicalPlan::name() =>
{