feat: apply remote dynamic filters on datanode scans (#8262)

* feat: apply rdf

Signed-off-by: discord9 <discord9@163.com>

* chore: clippy

Signed-off-by: discord9 <discord9@163.com>

* fix: drop remote dyn filter fallback exec

Signed-off-by: discord9 <discord9@163.com>

* Revert "fix: drop remote dyn filter fallback exec"

This reverts commit bb757a596c.

Signed-off-by: discord9 <discord9@163.com>

* refactor: use rdf receiver logical plan instead

Signed-off-by: discord9 <discord9@163.com>

* test: update sqlness

Signed-off-by: discord9 <discord9@163.com>

* feat: rdf disable option

Signed-off-by: discord9 <discord9@163.com>

* tests: large int tests

Signed-off-by: discord9 <discord9@163.com>

* chore: clippy

Signed-off-by: discord9 <discord9@163.com>

* refactor: per review

Signed-off-by: discord9 <discord9@163.com>

* test: update prec fix

Signed-off-by: discord9 <discord9@163.com>

* fix: make receiver node works

Signed-off-by: discord9 <discord9@163.com>

* refactor: per review

Signed-off-by: discord9 <discord9@163.com>

* fix: tql disable rdf

Signed-off-by: discord9 <discord9@163.com>

* chore: rm useless joins

Signed-off-by: discord9 <discord9@163.com>

* fix: also disable in flow tql

Signed-off-by: discord9 <discord9@163.com>

* refactor: per review rm to promql

Signed-off-by: discord9 <discord9@163.com>

* chore: promql ut

Signed-off-by: discord9 <discord9@163.com>

* per review

Signed-off-by: discord9 <discord9@163.com>

* test: rm misleading&add some nested/cleanup

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-06-10 20:57:23 +08:00
committed by GitHub
parent 0a2dc6d3f2
commit 8fc5a3b1c7
24 changed files with 3103 additions and 299 deletions

70
Cargo.lock generated
View File

@@ -3626,7 +3626,7 @@ checksum = "a4ae5f15dda3c708c0ade84bfee31ccab44a3da4f88015ed22f63732abe300c8"
[[package]]
name = "datafusion"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"arrow 58.3.0",
"arrow-schema 58.3.0",
@@ -3680,7 +3680,7 @@ dependencies = [
[[package]]
name = "datafusion-catalog"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"arrow 58.3.0",
"async-trait",
@@ -3704,7 +3704,7 @@ dependencies = [
[[package]]
name = "datafusion-catalog-listing"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"arrow 58.3.0",
"async-trait",
@@ -3726,7 +3726,7 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"ahash 0.8.12",
"arrow 58.3.0",
@@ -3750,7 +3750,7 @@ dependencies = [
[[package]]
name = "datafusion-common-runtime"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"futures",
"log",
@@ -3760,7 +3760,7 @@ dependencies = [
[[package]]
name = "datafusion-datasource"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"arrow 58.3.0",
"async-compression",
@@ -3794,7 +3794,7 @@ dependencies = [
[[package]]
name = "datafusion-datasource-arrow"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"arrow 58.3.0",
"arrow-ipc 58.3.0",
@@ -3817,7 +3817,7 @@ dependencies = [
[[package]]
name = "datafusion-datasource-csv"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"arrow 58.3.0",
"async-trait",
@@ -3839,7 +3839,7 @@ dependencies = [
[[package]]
name = "datafusion-datasource-json"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"arrow 58.3.0",
"async-trait",
@@ -3862,7 +3862,7 @@ dependencies = [
[[package]]
name = "datafusion-datasource-parquet"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"arrow 58.3.0",
"async-trait",
@@ -3891,12 +3891,12 @@ dependencies = [
[[package]]
name = "datafusion-doc"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
[[package]]
name = "datafusion-execution"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"arrow 58.3.0",
"arrow-buffer 58.3.0",
@@ -3918,7 +3918,7 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"arrow 58.3.0",
"async-trait",
@@ -3940,7 +3940,7 @@ dependencies = [
[[package]]
name = "datafusion-expr-common"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"arrow 58.3.0",
"datafusion-common",
@@ -3952,7 +3952,7 @@ dependencies = [
[[package]]
name = "datafusion-functions"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"arrow 58.3.0",
"arrow-buffer 58.3.0",
@@ -3983,7 +3983,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-aggregate"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"ahash 0.8.12",
"arrow 58.3.0",
@@ -4004,7 +4004,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-aggregate-common"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"ahash 0.8.12",
"arrow 58.3.0",
@@ -4016,7 +4016,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-nested"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"arrow 58.3.0",
"arrow-ord 58.3.0",
@@ -4040,7 +4040,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-table"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"arrow 58.3.0",
"async-trait",
@@ -4055,7 +4055,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-window"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"arrow 58.3.0",
"datafusion-common",
@@ -4072,7 +4072,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-window-common"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"datafusion-common",
"datafusion-physical-expr-common",
@@ -4081,7 +4081,7 @@ dependencies = [
[[package]]
name = "datafusion-macros"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"datafusion-doc",
"quote",
@@ -4091,7 +4091,7 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"arrow 58.3.0",
"chrono",
@@ -4140,7 +4140,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"ahash 0.8.12",
"arrow 58.3.0",
@@ -4163,7 +4163,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr-adapter"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"arrow 58.3.0",
"datafusion-common",
@@ -4177,7 +4177,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr-common"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"ahash 0.8.12",
"arrow 58.3.0",
@@ -4193,7 +4193,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-optimizer"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"arrow 58.3.0",
"datafusion-common",
@@ -4211,7 +4211,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-plan"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"ahash 0.8.12",
"arrow 58.3.0",
@@ -4242,7 +4242,7 @@ dependencies = [
[[package]]
name = "datafusion-proto"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"arrow 58.3.0",
"chrono",
@@ -4269,7 +4269,7 @@ dependencies = [
[[package]]
name = "datafusion-proto-common"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"arrow 58.3.0",
"datafusion-common",
@@ -4279,7 +4279,7 @@ dependencies = [
[[package]]
name = "datafusion-pruning"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"arrow 58.3.0",
"datafusion-common",
@@ -4295,7 +4295,7 @@ dependencies = [
[[package]]
name = "datafusion-session"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"async-trait",
"datafusion-common",
@@ -4308,7 +4308,7 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"arrow 58.3.0",
"bigdecimal 0.4.8",
@@ -4326,7 +4326,7 @@ dependencies = [
[[package]]
name = "datafusion-substrait"
version = "53.1.0"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4"
source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=e8a127c28e8839964bb6aefdd909810dc11cd2c9#e8a127c28e8839964bb6aefdd909810dc11cd2c9"
dependencies = [
"async-recursion",
"async-trait",
@@ -6457,7 +6457,7 @@ dependencies = [
"libc",
"percent-encoding",
"pin-project-lite",
"socket2 0.5.10",
"socket2 0.6.4",
"tokio",
"tower-service",
"tracing",

View File

@@ -338,21 +338,21 @@ git = "https://github.com/GreptimeTeam/greptime-meter.git"
rev = "5618e779cf2bb4755b499c630fba4c35e91898cb"
[patch.crates-io]
datafusion = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-functions = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-functions-aggregate-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-functions-window-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-physical-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-physical-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-proto = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "e8a127c28e8839964bb6aefdd909810dc11cd2c9" }
datafusion-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "e8a127c28e8839964bb6aefdd909810dc11cd2c9" }
datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "e8a127c28e8839964bb6aefdd909810dc11cd2c9" }
datafusion-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "e8a127c28e8839964bb6aefdd909810dc11cd2c9" }
datafusion-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "e8a127c28e8839964bb6aefdd909810dc11cd2c9" }
datafusion-functions = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "e8a127c28e8839964bb6aefdd909810dc11cd2c9" }
datafusion-functions-aggregate-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "e8a127c28e8839964bb6aefdd909810dc11cd2c9" }
datafusion-functions-window-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "e8a127c28e8839964bb6aefdd909810dc11cd2c9" }
datafusion-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "e8a127c28e8839964bb6aefdd909810dc11cd2c9" }
datafusion-physical-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "e8a127c28e8839964bb6aefdd909810dc11cd2c9" }
datafusion-physical-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "e8a127c28e8839964bb6aefdd909810dc11cd2c9" }
datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "e8a127c28e8839964bb6aefdd909810dc11cd2c9" }
datafusion-proto = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "e8a127c28e8839964bb6aefdd909810dc11cd2c9" }
datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "e8a127c28e8839964bb6aefdd909810dc11cd2c9" }
datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "e8a127c28e8839964bb6aefdd909810dc11cd2c9" }
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "2aefa08a8d69c96eec2d6d6703598a009bba6e4c" } # on branch v0.61.x
[profile.release]

View File

@@ -459,6 +459,7 @@ impl DatanodeBuilder {
opts.concurrent_query_limiter_timeout,
opts.grpc.flight_compression,
);
region_server.install_remote_dyn_filter_receiver_injector(&self.plugins);
let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
let engines = self

View File

@@ -13,6 +13,8 @@
// limitations under the License.
mod catalog;
mod registrations;
mod remote_dyn_filter;
use std::collections::HashMap;
use std::fmt::Debug;
@@ -25,11 +27,9 @@ use std::time::Duration;
use api::region::RegionResponse;
use api::v1::meta::TopicStat;
use api::v1::region::remote_dyn_filter_request::Action;
use api::v1::region::sync_request::ManifestInfo;
use api::v1::region::{
ListMetadataRequest, RegionResponse as RegionResponseV1, RemoteDynFilterRequest, SyncRequest,
region_request,
ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest, region_request,
};
use api::v1::{ResponseHeader, Status};
use arrow_flight::{FlightData, Ticket};
@@ -92,13 +92,16 @@ use crate::error::{
ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, GetRegionMetadataSnafu,
HandleBatchDdlRequestSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu,
NewPlanDecoderSnafu, NotYetImplementedSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu,
RegionNotReadySnafu, Result, RuntimeJoinSnafu, SerializeJsonSnafu, StopRegionEngineSnafu,
UnexpectedSnafu, UnsupportedOutputSnafu,
NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu,
Result, RuntimeJoinSnafu, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu,
UnsupportedOutputSnafu,
};
use crate::event_listener::RegionServerEventListenerRef;
use crate::query_stream::QueryRuntimeStream;
use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder};
use crate::region_server::registrations::RemoteDynFilterRegistry;
use crate::region_server::remote_dyn_filter::wrap_remote_dyn_filter_guarded_stream;
const QUERY_RUNTIME_STREAM_BUFFER_SIZE: usize = 8;
#[derive(Clone)]
@@ -257,11 +260,14 @@ impl RegionServer {
RegionNotReadySnafu { region_id }
);
self.inner
let provider = self
.inner
.table_provider_factory
.create(region_id, status.into_engine(), ctx)
.await
.context(ExecuteLogicalPlanSnafu)
.context(ExecuteLogicalPlanSnafu)?;
Ok(provider)
}
/// Handle reads from remote. They're often query requests received by our Arrow Flight service.
@@ -307,6 +313,8 @@ impl RegionServer {
.await
.context(DecodeLogicalPlanSnafu)?;
let cleanup = self.register_initial_remote_dyn_filter_cleanup(&query_ctx, region_id);
let stream = self
.inner
.handle_read(
@@ -318,7 +326,13 @@ impl RegionServer {
query_ctx.clone(),
)
.await?;
let stream = wrap_flow_region_watermark_stream(stream, region_id, &query_ctx);
let stream = if let Some(cleanup) = cleanup {
wrap_remote_dyn_filter_guarded_stream(stream, cleanup)
} else {
stream
};
Ok(maybe_guard_stream(stream, permit))
}
@@ -350,12 +364,19 @@ impl RegionServer {
.context(DataFusionSnafu)?
.data;
let cleanup = self.register_initial_remote_dyn_filter_cleanup(&query_ctx, region_id);
let stream = self
.inner
.handle_read(QueryRequest { plan, ..request }, query_ctx.clone())
.await?;
let stream = wrap_flow_region_watermark_stream(stream, region_id, &query_ctx);
let stream = if let Some(cleanup) = cleanup {
wrap_remote_dyn_filter_guarded_stream(stream, cleanup)
} else {
stream
};
Ok(maybe_guard_stream(stream, permit))
}
@@ -742,70 +763,6 @@ impl RegionServer {
Ok(response)
}
async fn handle_remote_dyn_filter_request(
&self,
request: &RemoteDynFilterRequest,
) -> Result<RegionResponse> {
if request.query_id.is_empty() {
return error::MissingRequiredFieldSnafu { name: "query_id" }.fail();
}
match request
.action
.as_ref()
.context(error::MissingRequiredFieldSnafu { name: "action" })?
{
Action::Update(update) => {
self.handle_remote_dyn_filter_update(&request.query_id, update)
.await
}
Action::Unregister(unregister) => {
self.handle_remote_dyn_filter_unregister(&request.query_id, unregister)
.await
}
}
}
async fn handle_remote_dyn_filter_update(
&self,
query_id: &str,
request: &api::v1::region::RemoteDynFilterUpdate,
) -> Result<RegionResponse> {
if request.filter_id.is_empty() {
return error::MissingRequiredFieldSnafu { name: "filter_id" }.fail();
}
if request.payload.is_empty() {
return error::MissingRequiredFieldSnafu { name: "payload" }.fail();
}
NotYetImplementedSnafu {
what: format!(
"remote dyn filter update unary RPC placeholder for query_id {query_id}, filter_id {}",
request.filter_id
),
}
.fail()
}
async fn handle_remote_dyn_filter_unregister(
&self,
query_id: &str,
request: &api::v1::region::RemoteDynFilterUnregister,
) -> Result<RegionResponse> {
if request.filter_id.is_empty() {
return error::MissingRequiredFieldSnafu { name: "filter_id" }.fail();
}
NotYetImplementedSnafu {
what: format!(
"remote dyn filter unregister unary RPC placeholder for query_id {query_id}, filter_id {}",
request.filter_id
),
}
.fail()
}
/// Sync region manifest and registers new opened logical regions.
pub async fn sync_region(
&self,
@@ -1081,6 +1038,9 @@ struct RegionServerInner {
/// server with a concrete engine; acceptable for now to fetch Mito-specific
/// info (e.g., list SSTs). Consider a diagnostics trait later.
mito_engine: RwLock<Option<MitoEngine>>,
/// TODO(remote-dyn-filter): Reap this query-scoped placeholder registry on query finish/cancel
/// and later fold it into the real remote dyn filter runtime state lifecycle.
initial_remote_dyn_filter_registrations: RemoteDynFilterRegistry,
}
struct RegionServerParallelism {
@@ -1197,6 +1157,7 @@ impl RegionServerInner {
parallelism,
topic_stats_reporter: RwLock::new(None),
mito_engine: RwLock::new(None),
initial_remote_dyn_filter_registrations: RemoteDynFilterRegistry::new(),
}
}
@@ -1964,10 +1925,6 @@ mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::region::{
RemoteDynFilterRequest, RemoteDynFilterUnregister, RemoteDynFilterUpdate,
remote_dyn_filter_request,
};
use api::v1::{Rows, SemanticType};
use common_error::ext::ErrorExt;
use common_recordbatch::RecordBatches;
@@ -1987,7 +1944,6 @@ mod tests {
use store_api::storage::RegionId;
use super::*;
use crate::error::Result;
use crate::tests::{MockRegionEngine, mock_region_server};
#[test]
@@ -2779,135 +2735,4 @@ mod tests {
.await
.unwrap_err();
}
#[tokio::test]
async fn test_handle_remote_dyn_filter_request_requires_query_id() {
let mock_region_server = mock_region_server();
let err = mock_region_server
.handle_remote_dyn_filter_request(&RemoteDynFilterRequest {
query_id: String::new(),
action: Some(remote_dyn_filter_request::Action::Unregister(
RemoteDynFilterUnregister {
filter_id: "filter-1".to_string(),
},
)),
})
.await
.unwrap_err();
assert_matches!(
err,
crate::error::Error::MissingRequiredField { ref name, .. } if name == "query_id"
);
}
#[tokio::test]
async fn test_handle_remote_dyn_filter_request_requires_action() {
let mock_region_server = mock_region_server();
let err = mock_region_server
.handle_remote_dyn_filter_request(&RemoteDynFilterRequest {
query_id: "query-1".to_string(),
action: None,
})
.await
.unwrap_err();
assert_matches!(
err,
crate::error::Error::MissingRequiredField { ref name, .. } if name == "action"
);
}
#[tokio::test]
async fn test_handle_remote_dyn_filter_update_requires_filter_id() {
let mock_region_server = mock_region_server();
let err = mock_region_server
.handle_remote_dyn_filter_request(&RemoteDynFilterRequest {
query_id: "query-1".to_string(),
action: Some(remote_dyn_filter_request::Action::Update(
RemoteDynFilterUpdate {
filter_id: String::new(),
payload: vec![1],
generation: 1,
is_complete: false,
},
)),
})
.await
.unwrap_err();
assert_matches!(
err,
crate::error::Error::MissingRequiredField { ref name, .. } if name == "filter_id"
);
}
#[tokio::test]
async fn test_handle_remote_dyn_filter_update_requires_payload() {
let mock_region_server = mock_region_server();
let err = mock_region_server
.handle_remote_dyn_filter_request(&RemoteDynFilterRequest {
query_id: "query-1".to_string(),
action: Some(remote_dyn_filter_request::Action::Update(
RemoteDynFilterUpdate {
filter_id: "filter-1".to_string(),
payload: Vec::new(),
generation: 1,
is_complete: false,
},
)),
})
.await
.unwrap_err();
assert_matches!(
err,
crate::error::Error::MissingRequiredField { ref name, .. } if name == "payload"
);
}
#[tokio::test]
async fn test_handle_remote_dyn_filter_update_placeholder() {
let mock_region_server = mock_region_server();
let err = mock_region_server
.handle_remote_dyn_filter_request(&RemoteDynFilterRequest {
query_id: "query-1".to_string(),
action: Some(remote_dyn_filter_request::Action::Update(
RemoteDynFilterUpdate {
filter_id: "filter-1".to_string(),
payload: vec![1],
generation: 1,
is_complete: false,
},
)),
})
.await
.unwrap_err();
assert_matches!(err, crate::error::Error::NotYetImplemented { .. });
}
#[tokio::test]
async fn test_handle_remote_dyn_filter_unregister_placeholder() {
let mock_region_server = mock_region_server();
let err = mock_region_server
.handle_remote_dyn_filter_request(&RemoteDynFilterRequest {
query_id: "query-1".to_string(),
action: Some(remote_dyn_filter_request::Action::Unregister(
RemoteDynFilterUnregister {
filter_id: "filter-1".to_string(),
},
)),
})
.await
.unwrap_err();
assert_matches!(err, crate::error::Error::NotYetImplemented { .. });
}
}

View File

@@ -0,0 +1,686 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::fmt::{Display, Formatter};
use std::sync::{Arc, Mutex, OnceLock};
use common_query::request::{
DynFilterPayload, INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY, InitialDynFilterReg,
InitialDynFilterRegs,
};
use common_telemetry::warn;
use dashmap::DashMap;
use datafusion::arrow::datatypes::{Schema, SchemaRef};
use datafusion::execution::{SessionStateBuilder, TaskContext};
use datafusion::physical_plan::PhysicalExpr;
use datafusion::physical_plan::expressions::{DynamicFilterPhysicalExpr, lit};
use datafusion_common::Result as DataFusionResult;
use session::context::QueryContextRef;
use session::query_id::QueryId;
use store_api::storage::RegionId;
pub(super) const REMOTE_DYN_FILTER_PAYLOAD_MAX_BYTES: usize = 64 * 1024;
type QueryRemoteDynFilterRegs = HashMap<RemoteDynFilterId, RegisteredDynFilter>;
#[derive(Debug, Default)]
pub(super) struct RemoteDynFilterRegistry {
// Keep cross-query concurrency while making each query's RDF state machine a
// single critical section. RDF count per query is small
queries: DashMap<QueryId, Arc<Mutex<QueryRemoteDynFilterRegs>>>,
}
impl RemoteDynFilterRegistry {
pub(super) fn new() -> Self {
Self::default()
}
fn get_or_insert_query(&self, query_id: QueryId) -> Arc<Mutex<QueryRemoteDynFilterRegs>> {
self.queries
.entry(query_id)
.or_insert_with(|| Arc::new(Mutex::new(HashMap::new())))
.clone()
}
fn get_query(&self, query_id: &QueryId) -> Option<Arc<Mutex<QueryRemoteDynFilterRegs>>> {
self.queries
.get(query_id)
.map(|query_regs| query_regs.clone())
}
fn remove_query_if_empty(
&self,
query_id: &QueryId,
expected: &Arc<Mutex<QueryRemoteDynFilterRegs>>,
) {
// Protect against stale cleanup of an old per-query state: remove the
// outer entry only if it still points to the same inner mutex and that
// inner map is still empty.
self.queries.remove_if(query_id, |_, query_regs| {
Arc::ptr_eq(query_regs, expected) && query_regs.lock().unwrap().is_empty()
});
}
#[cfg(test)]
pub(super) fn inspect_query<R>(
&self,
query_id: &QueryId,
inspect: impl FnOnce(&QueryRemoteDynFilterRegs) -> R,
) -> Option<R> {
self.get_query(query_id)
.map(|query_regs| inspect(&query_regs.lock().unwrap()))
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub(super) struct RemoteDynFilterId(String);
impl RemoteDynFilterId {
pub(super) fn new(value: impl Into<String>) -> Self {
Self(value.into())
}
}
impl Display for RemoteDynFilterId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum RemoteDynFilterUpdateOutcome {
MissingRegistration,
Buffered,
Applied,
Idempotent,
Stale,
AlreadyComplete,
PayloadTooLarge,
DecodeFailed,
}
#[derive(Debug, Clone)]
struct PendingDynFilterUpdate {
payload: Vec<u8>,
generation: u64,
is_complete: bool,
}
impl PendingDynFilterUpdate {
fn from_initial_reg(reg: &InitialDynFilterReg) -> Option<Self> {
let snapshot = reg.initial_snapshot.as_ref()?;
match &snapshot.payload {
DynFilterPayload::Datafusion(payload) => Some(Self {
payload: payload.clone(),
generation: snapshot.generation,
is_complete: snapshot.is_complete,
}),
_ => None,
}
}
}
#[derive(Debug)]
struct RemoteDynFilterEpochState {
generation: Option<u64>,
is_complete: bool,
}
#[derive(Debug)]
struct RemoteDynFilterState {
filter: Arc<DynamicFilterPhysicalExpr>,
input_schema: SchemaRef,
epoch: Mutex<RemoteDynFilterEpochState>,
}
impl RemoteDynFilterState {
fn new(filter: Arc<DynamicFilterPhysicalExpr>, input_schema: SchemaRef) -> Self {
Self {
filter,
input_schema,
epoch: Mutex::new(RemoteDynFilterEpochState {
generation: None,
is_complete: false,
}),
}
}
fn filter(&self) -> Arc<DynamicFilterPhysicalExpr> {
self.filter.clone()
}
fn apply_update(
&self,
payload: &[u8],
generation: u64,
is_complete: bool,
) -> RemoteDynFilterUpdateOutcome {
if !validate_update_payload_size(payload) {
return RemoteDynFilterUpdateOutcome::PayloadTooLarge;
}
let mut epoch = self.epoch.lock().unwrap();
if let Some(current_generation) = epoch.generation {
if generation < current_generation {
return RemoteDynFilterUpdateOutcome::Stale;
}
if generation == current_generation {
if is_complete && !epoch.is_complete {
self.filter.mark_complete();
epoch.is_complete = true;
return RemoteDynFilterUpdateOutcome::Applied;
}
return RemoteDynFilterUpdateOutcome::Idempotent;
}
}
if epoch.is_complete {
return RemoteDynFilterUpdateOutcome::AlreadyComplete;
}
let expr = match decode_update_payload(payload, self.input_schema.as_ref()) {
Ok(expr) => expr,
Err(error) => {
warn!(error; "Failed to decode remote dynamic filter update payload");
return RemoteDynFilterUpdateOutcome::DecodeFailed;
}
};
if let Err(error) = self.filter.update(expr) {
warn!(error; "Failed to apply remote dynamic filter update");
return RemoteDynFilterUpdateOutcome::DecodeFailed;
}
epoch.generation = Some(generation);
if is_complete {
self.filter.mark_complete();
epoch.is_complete = true;
}
RemoteDynFilterUpdateOutcome::Applied
}
}
#[derive(Debug)]
pub(super) struct RegisteredDynFilter {
pub(super) filter_id: RemoteDynFilterId,
pub(super) child_exprs_datafusion_proto: Vec<Vec<u8>>,
pub(super) subscriber_regions: HashSet<RegionId>,
runtime: Option<Arc<RemoteDynFilterState>>,
pending_update: Option<PendingDynFilterUpdate>,
}
impl RegisteredDynFilter {
fn new(
filter_id: RemoteDynFilterId,
child_exprs_datafusion_proto: Vec<Vec<u8>>,
pending_update: Option<PendingDynFilterUpdate>,
region_id: RegionId,
) -> Self {
let mut subscriber_regions = HashSet::new();
subscriber_regions.insert(region_id);
Self {
filter_id,
child_exprs_datafusion_proto,
subscriber_regions,
runtime: None,
pending_update,
}
}
fn apply_initial_snapshot(
&mut self,
reg: &InitialDynFilterReg,
) -> RemoteDynFilterUpdateOutcome {
let Some(snapshot) = PendingDynFilterUpdate::from_initial_reg(reg) else {
return RemoteDynFilterUpdateOutcome::Idempotent;
};
self.apply_or_buffer_update(&snapshot.payload, snapshot.generation, snapshot.is_complete)
}
fn register_subscriber(&mut self, region_id: RegionId) -> bool {
if !self.subscriber_regions.insert(region_id) {
warn!(
"Duplicate remote dynamic filter subscriber region, filter_id: {}, region_id: {}",
self.filter_id, region_id
);
return false;
}
true
}
fn has_subscribers(&self) -> bool {
!self.subscriber_regions.is_empty()
}
fn should_drop_after_remove(&mut self, region_id: RegionId) -> bool {
self.subscriber_regions.remove(&region_id);
!self.has_subscribers()
}
fn buffer_update(
&mut self,
payload: &[u8],
generation: u64,
is_complete: bool,
) -> RemoteDynFilterUpdateOutcome {
if !validate_update_payload_size(payload) {
return RemoteDynFilterUpdateOutcome::PayloadTooLarge;
}
if let Some(pending) = self.pending_update.as_mut() {
if generation < pending.generation {
return RemoteDynFilterUpdateOutcome::Stale;
}
if generation == pending.generation {
pending.is_complete |= is_complete;
return RemoteDynFilterUpdateOutcome::Idempotent;
}
if pending.is_complete {
return RemoteDynFilterUpdateOutcome::AlreadyComplete;
}
}
self.pending_update = Some(PendingDynFilterUpdate {
payload: payload.to_vec(),
generation,
is_complete,
});
RemoteDynFilterUpdateOutcome::Buffered
}
fn apply_or_buffer_update(
&mut self,
payload: &[u8],
generation: u64,
is_complete: bool,
) -> RemoteDynFilterUpdateOutcome {
if let Some(runtime) = &self.runtime {
return runtime.apply_update(payload, generation, is_complete);
}
self.buffer_update(payload, generation, is_complete)
}
fn decode_children(
&self,
input_schema: &Schema,
) -> DataFusionResult<Vec<Arc<dyn PhysicalExpr>>> {
InitialDynFilterReg::new(
self.filter_id.to_string(),
self.child_exprs_datafusion_proto.clone(),
)
.decode_children(
remote_dyn_filter_task_context(),
input_schema,
REMOTE_DYN_FILTER_PAYLOAD_MAX_BYTES,
)
}
fn dyn_filter(&mut self, input_schema: &Schema) -> Option<Arc<dyn PhysicalExpr>> {
let children = match self.decode_children(input_schema) {
Ok(children) => children,
Err(error) => {
warn!(error; "Failed to decode remote dynamic filter initial children");
return None;
}
};
let runtime = match &self.runtime {
Some(runtime) => runtime.clone(),
None => {
let filter = Arc::new(DynamicFilterPhysicalExpr::new(children.clone(), lit(true)));
let runtime = Arc::new(RemoteDynFilterState::new(
filter,
Arc::new(input_schema.clone()),
));
if let Some(pending) = self.pending_update.take() {
let outcome = runtime.apply_update(
&pending.payload,
pending.generation,
pending.is_complete,
);
if matches!(outcome, RemoteDynFilterUpdateOutcome::DecodeFailed) {
warn!(
"Dropped buffered remote dynamic filter update after decode failure, filter_id: {}, generation: {}",
self.filter_id, pending.generation
);
}
}
self.runtime = Some(runtime.clone());
runtime
}
};
match runtime.filter().with_new_children(children) {
Ok(expr) => Some(expr),
Err(error) => {
warn!(error; "Failed to remap remote dynamic filter children for scan");
None
}
}
}
fn deactivate(&self) {
if let Some(runtime) = &self.runtime {
runtime.filter.mark_complete();
}
}
}
impl Drop for RegisteredDynFilter {
fn drop(&mut self) {
self.deactivate();
}
}
pub(super) fn initial_dyn_filter_regs_from_query_ctx(
query_ctx: &QueryContextRef,
) -> Option<InitialDynFilterRegs> {
let registrations =
query_ctx.extension(INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY)?;
match InitialDynFilterRegs::from_extension_value(registrations) {
Ok(registrations) => match registrations.validate_default_bounds() {
Ok(()) => Some(registrations),
Err(error) => {
warn!(error; "Initial remote dyn filter registrations exceeded Task 03 bounds");
None
}
},
Err(error) => {
warn!(error; "Failed to decode initial remote dyn filter registrations from query context");
None
}
}
}
pub(super) fn register_initial_dyn_filter_regs(
regs_by_query: &RemoteDynFilterRegistry,
query_id: &QueryId,
region_id: RegionId,
regs: &InitialDynFilterRegs,
) -> Vec<RemoteDynFilterId> {
if regs.is_empty() {
return Vec::new();
}
if let Err(error) = regs.validate_default_bounds() {
warn!(error; "Ignored invalid initial dyn filter registrations for query_id {}", query_id);
return Vec::new();
}
let query_regs = regs_by_query.get_or_insert_query(*query_id);
let mut query_regs = query_regs.lock().unwrap();
let mut registered_filter_ids = Vec::with_capacity(regs.regs.len());
for reg in &regs.regs {
let filter_id = RemoteDynFilterId::new(reg.filter_id.clone());
match query_regs.entry(filter_id.clone()) {
Entry::Occupied(mut entry) => {
let registered = entry.get_mut();
if registered.child_exprs_datafusion_proto != reg.child_exprs_datafusion_proto {
warn!(
"Remote dynamic filter registration reused filter_id with different children, query_id: {}, filter_id: {}, region_id: {}",
query_id, filter_id, region_id
);
}
if registered.register_subscriber(region_id) {
registered_filter_ids.push(filter_id);
}
let _ = registered.apply_initial_snapshot(reg);
}
Entry::Vacant(entry) => {
entry.insert(RegisteredDynFilter::new(
filter_id.clone(),
reg.child_exprs_datafusion_proto.clone(),
PendingDynFilterUpdate::from_initial_reg(reg),
region_id,
));
registered_filter_ids.push(filter_id);
}
}
}
registered_filter_ids
}
pub(super) fn remote_dyn_filter_exprs_for_initial_regs(
regs_by_query: &RemoteDynFilterRegistry,
query_id: &QueryId,
initial_regs: &InitialDynFilterRegs,
input_schema: &Schema,
) -> Vec<Arc<dyn PhysicalExpr>> {
let Some(query_regs) = regs_by_query.get_query(query_id) else {
return Vec::new();
};
let mut query_regs = query_regs.lock().unwrap();
initial_regs
.regs
.iter()
.filter_map(|reg| {
let filter_id = RemoteDynFilterId::new(reg.filter_id.clone());
let registered = query_regs.get_mut(&filter_id)?;
registered.dyn_filter(input_schema)
})
.collect()
}
pub(super) fn apply_remote_dyn_filter_update(
regs_by_query: &RemoteDynFilterRegistry,
query_id: &QueryId,
filter_id: &RemoteDynFilterId,
payload: &[u8],
generation: u64,
is_complete: bool,
) -> RemoteDynFilterUpdateOutcome {
if !validate_update_payload_size(payload) {
warn!(
"Ignored oversized remote dynamic filter update, query_id: {}, filter_id: {}, payload_size: {}, max_payload_size: {}",
query_id,
filter_id,
payload.len(),
REMOTE_DYN_FILTER_PAYLOAD_MAX_BYTES
);
return RemoteDynFilterUpdateOutcome::PayloadTooLarge;
}
let Some(query_regs) = regs_by_query.get_query(query_id) else {
warn!(
"Ignored remote dynamic filter update without query registration, query_id: {}, filter_id: {}",
query_id, filter_id
);
return RemoteDynFilterUpdateOutcome::MissingRegistration;
};
let mut query_regs = query_regs.lock().unwrap();
let Some(registered) = query_regs.get_mut(filter_id) else {
warn!(
"Ignored remote dynamic filter update without filter registration, query_id: {}, filter_id: {}",
query_id, filter_id
);
return RemoteDynFilterUpdateOutcome::MissingRegistration;
};
registered.apply_or_buffer_update(payload, generation, is_complete)
}
pub(super) fn unregister_remote_dyn_filter(
regs_by_query: &RemoteDynFilterRegistry,
query_id: &QueryId,
filter_id: &RemoteDynFilterId,
) -> RemoteDynFilterUpdateOutcome {
let Some(query_regs) = regs_by_query.get_query(query_id) else {
warn!(
"Ignored remote dynamic filter unregister without query registration, query_id: {}, filter_id: {}",
query_id, filter_id
);
return RemoteDynFilterUpdateOutcome::MissingRegistration;
};
let (registered, should_remove_query) = {
let mut locked = query_regs.lock().unwrap();
let Some(registered) = locked.remove(filter_id) else {
warn!(
"Ignored remote dynamic filter unregister without filter registration, query_id: {}, filter_id: {}",
query_id, filter_id
);
return RemoteDynFilterUpdateOutcome::MissingRegistration;
};
let should_remove_query = locked.is_empty();
(registered, should_remove_query)
};
drop(registered);
if should_remove_query {
regs_by_query.remove_query_if_empty(query_id, &query_regs);
}
RemoteDynFilterUpdateOutcome::Applied
}
pub(super) fn remove_initial_dyn_filter_regs(
regs_by_query: &RemoteDynFilterRegistry,
query_id: &QueryId,
region_id: RegionId,
filter_ids: &[RemoteDynFilterId],
) {
if filter_ids.is_empty() {
return;
}
let Some(query_regs) = regs_by_query.get_query(query_id) else {
return;
};
let (removed_filters, should_remove_query) = {
let mut locked = query_regs.lock().unwrap();
let mut removed_filters = Vec::new();
for filter_id in filter_ids {
let should_remove_filter = locked
.get_mut(filter_id)
.map(|registered| registered.should_drop_after_remove(region_id))
.unwrap_or(false);
if should_remove_filter && let Some(registered) = locked.remove(filter_id) {
removed_filters.push(registered);
}
}
let should_remove_query = locked.is_empty();
(removed_filters, should_remove_query)
};
drop(removed_filters);
if should_remove_query {
regs_by_query.remove_query_if_empty(query_id, &query_regs);
}
}
fn decode_update_payload(
payload: &[u8],
input_schema: &Schema,
) -> DataFusionResult<Arc<dyn PhysicalExpr>> {
DynFilterPayload::Datafusion(payload.to_vec()).decode_datafusion_expr(
remote_dyn_filter_task_context(),
input_schema,
REMOTE_DYN_FILTER_PAYLOAD_MAX_BYTES,
)
}
fn remote_dyn_filter_task_context() -> &'static TaskContext {
static TASK_CONTEXT: OnceLock<TaskContext> = OnceLock::new();
TASK_CONTEXT.get_or_init(|| {
// RDF payloads can contain DataFusion built-in scalar functions. For
// example, multi-column join dynamic filters use `struct(...) IN (...)`.
// `TaskContext::default()` has an empty function registry and cannot
// decode those expressions.
let session_state = SessionStateBuilder::new().with_default_features().build();
TaskContext::from(&session_state)
})
}
fn validate_update_payload_size(payload: &[u8]) -> bool {
payload.len() <= REMOTE_DYN_FILTER_PAYLOAD_MAX_BYTES
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn remote_dyn_filter_same_query_reuses_one_inner_lock() {
let regs_by_query = RemoteDynFilterRegistry::new();
let query_id = QueryId::new();
let first = regs_by_query.get_or_insert_query(query_id);
let second = regs_by_query.get_or_insert_query(query_id);
assert!(Arc::ptr_eq(&first, &second));
}
#[test]
fn remote_dyn_filter_stale_query_remove_does_not_remove_new_query_state() {
let regs_by_query = RemoteDynFilterRegistry::new();
let query_id = QueryId::new();
let stale_query_regs = regs_by_query.get_or_insert_query(query_id);
regs_by_query.remove_query_if_empty(&query_id, &stale_query_regs);
assert!(regs_by_query.get_query(&query_id).is_none());
let new_query_regs = regs_by_query.get_or_insert_query(query_id);
let filter_id = RemoteDynFilterId::new("filter-1");
let region_id = RegionId::new(1024, 7);
new_query_regs.lock().unwrap().insert(
filter_id.clone(),
RegisteredDynFilter::new(filter_id.clone(), vec![], None, region_id),
);
regs_by_query.remove_query_if_empty(&query_id, &stale_query_regs);
let current_query_regs = regs_by_query.get_query(&query_id).unwrap();
assert!(Arc::ptr_eq(&current_query_regs, &new_query_regs));
assert_eq!(current_query_regs.lock().unwrap().len(), 1);
}
#[test]
fn remote_dyn_filter_register_uses_entry_to_merge_same_filter_subscribers() {
let regs_by_query = RemoteDynFilterRegistry::new();
let query_id = QueryId::new();
let first_region_id = RegionId::new(1024, 7);
let second_region_id = RegionId::new(1024, 8);
let regs = InitialDynFilterRegs::new(vec![InitialDynFilterReg::new("filter-1", vec![])]);
register_initial_dyn_filter_regs(&regs_by_query, &query_id, first_region_id, &regs);
register_initial_dyn_filter_regs(&regs_by_query, &query_id, second_region_id, &regs);
let query_regs = regs_by_query.get_query(&query_id).unwrap();
let query_regs = query_regs.lock().unwrap();
assert_eq!(query_regs.len(), 1);
let registered = query_regs.get(&RemoteDynFilterId::new("filter-1")).unwrap();
assert_eq!(registered.subscriber_regions.len(), 2);
assert!(registered.subscriber_regions.contains(&first_region_id));
assert!(registered.subscriber_regions.contains(&second_region_id));
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -53,7 +53,9 @@ use tracing::Span;
use crate::analyze::DistAnalyzeExec;
pub use crate::datafusion::planner::DfContextProviderAdapter;
use crate::dist_plan::{DistPlannerOptions, MergeScanLogicalPlan};
use crate::dist_plan::{
DistPlannerOptions, MergeScanLogicalPlan, RemoteDynFilterReceiverInjectorRef,
};
use crate::error::{
CatalogSnafu, CreateRecordBatchSnafu, MissingTableMutationHandlerSnafu,
MissingTimestampColumnSnafu, QueryExecutionSnafu, Result, TableMutationSnafu,
@@ -93,15 +95,21 @@ impl DatafusionQueryEngine {
query_ctx: QueryContextRef,
) -> Result<Output> {
let mut ctx = self.engine_context(query_ctx.clone());
let plan = if let Some(receiver_injector) =
self.plugins.get::<RemoteDynFilterReceiverInjectorRef>()
{
receiver_injector.maybe_inject(plan, query_ctx.clone())
} else {
plan
};
// `create_physical_plan` will optimize logical plan internally
let physical_plan = self.create_physical_plan(&mut ctx, &plan).await?;
let optimized_physical_plan = self.optimize_physical_plan(&mut ctx, physical_plan)?;
let physical_plan = self.optimize_physical_plan(&mut ctx, physical_plan)?;
let physical_plan = if let Some(wrapper) = self.plugins.get::<PhysicalPlanWrapperRef>() {
wrapper.wrap(optimized_physical_plan, query_ctx)
wrapper.wrap(physical_plan, query_ctx)
} else {
optimized_physical_plan
physical_plan
};
let stream = self.execute_stream(&ctx, &physical_plan)?;

View File

@@ -21,6 +21,7 @@ mod merge_sort;
mod planner;
mod predicate_extractor;
mod region_pruner;
mod remote_dyn_filter_receiver;
mod remote_dyn_filter_registry;
pub use analyzer::{DistPlannerAnalyzer, DistPlannerOptions};
@@ -29,6 +30,10 @@ pub use merge_scan::{MergeScanExec, MergeScanLogicalPlan};
pub use planner::{DistExtensionPlanner, MergeSortExtensionPlanner};
pub use predicate_extractor::PredicateExtractor;
pub use region_pruner::ConstraintPruner;
pub use remote_dyn_filter_receiver::{
RemoteDynFilterReceiverExtensionPlanner, RemoteDynFilterReceiverInjector,
RemoteDynFilterReceiverInjectorRef, RemoteDynFilterReceiverLogicalPlan,
};
pub use remote_dyn_filter_registry::{
DynFilterEntry, DynFilterRegistryManager, EntryRegistration, QueryDynFilterRegistry,
RemoteDynFilterRegistryLease, Subscriber, SubscriberRegistration,

View File

@@ -59,7 +59,7 @@ use crate::dist_plan::dyn_filter_bridge::{
};
use crate::dist_plan::{RemoteDynFilterProducerId, RemoteDynFilterRegistryLease};
use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS};
use crate::options::FlowQueryExtensions;
use crate::options::{FlowQueryExtensions, remote_dyn_filter_pushdown_enabled_from_extensions};
use crate::query_engine::QueryEngineState;
use crate::region_query::RegionQueryHandlerRef;
@@ -67,6 +67,11 @@ fn query_engine_state_from_task_context(context: &TaskContext) -> Option<Arc<Que
context.session_config().get_extension()
}
fn remote_dyn_filter_enabled(query_ctx: &QueryContextRef) -> Result<bool> {
remote_dyn_filter_pushdown_enabled_from_extensions(&query_ctx.extensions())
.map_err(|err| DataFusionError::External(Box::new(err)))
}
fn acquire_remote_dyn_filter_registry_lease(
context: &TaskContext,
query_ctx: &QueryContextRef,
@@ -334,7 +339,12 @@ impl MergeScanExec {
let partition_metrics_moved = self.partition_metrics.clone();
let plan = self.plan.clone();
let target_partition = self.target_partition;
let captured_remote_dyn_filters = self.captured_remote_dyn_filters();
let remote_dyn_filter_enabled = remote_dyn_filter_enabled(&self.query_ctx)?;
let captured_remote_dyn_filters = if remote_dyn_filter_enabled {
self.captured_remote_dyn_filters()
} else {
Vec::new()
};
let dbname = context.task_id().unwrap_or_default();
let tracing_context = TracingContext::from_json(context.session_id().as_str());
let current_channel = self.query_ctx.channel();
@@ -738,6 +748,20 @@ impl ExecutionPlan for MergeScanExec {
.into_iter()
.map(|filter| filter.filter)
.collect::<Vec<_>>();
if !remote_dyn_filter_enabled(&self.query_ctx)? {
// Reject remote pushdown instead of pretending success: this keeps
// DataFusion/local dynamic filter semantics intact while disabling
// only FE -> DN remote dynamic filter propagation.
self.captured_remote_dyn_filters.lock().unwrap().clear();
let new_self = Arc::new(self.clone());
return Ok(FilterPushdownPropagation {
filters: parent_filters.into_iter().map(|_| PushedDown::No).collect(),
updated_node: Some(new_self),
});
}
let Some(remote_dyn_filter_producer_id) = self.remote_dyn_filter_producer_id else {
// Missing RDF identity disables only RDF, not normal execution.
common_telemetry::warn!(
@@ -761,10 +785,12 @@ impl ExecutionPlan for MergeScanExec {
filters: remote_dyn_filter_pushdown
.pushed_down
.into_iter()
.map(|_pushdown_ready| {
// TODO(discord9): Return `PushedDown::Yes` after datanodes consume RDF
// registrations and pending updates. Until then, keep the parent-side filter.
PushedDown::No
.map(|pushdown_ready| {
if pushdown_ready {
PushedDown::Yes
} else {
PushedDown::No
}
})
.collect(),
updated_node: Some(new_self),
@@ -1077,7 +1103,7 @@ mod tests {
}
#[test]
fn remote_dyn_filter_preflight_keeps_parent_filter_until_dn_runtime_is_ready() {
fn remote_dyn_filter_preflight_removes_parent_filter_after_dn_runtime_is_ready() {
let remote_dyn_filter_producer_id = RemoteDynFilterProducerId::new(42);
let plan = LogicalPlanBuilder::empty(true)
.project(vec![lit(1i32).alias("col1")])
@@ -1124,6 +1150,6 @@ mod tests {
.unwrap();
assert_eq!(exec.captured_remote_dyn_filters().len(), 1);
assert!(matches!(propagation.filters.as_slice(), [PushedDown::No]));
assert!(matches!(propagation.filters.as_slice(), [PushedDown::Yes]));
}
}

View File

@@ -0,0 +1,319 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::cmp::Ordering;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use async_trait::async_trait;
use datafusion::common::Result;
use datafusion::execution::context::SessionState;
use datafusion::physical_expr::utils::conjunction;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::expressions::Column;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
use datafusion_common::{DFSchemaRef, DataFusionError};
use datafusion_expr::{
Expr, Extension, LogicalPlan, UserDefinedLogicalNode, UserDefinedLogicalNodeCore,
};
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::utils::collect_columns;
use session::context::QueryContextRef;
type InjectRemoteDynFilterReceiver =
dyn Fn(LogicalPlan, QueryContextRef) -> LogicalPlan + Send + Sync + 'static;
/// Injects a logical remote dynamic filter receiver into a query plan.
pub struct RemoteDynFilterReceiverInjector {
inject: Box<InjectRemoteDynFilterReceiver>,
}
impl RemoteDynFilterReceiverInjector {
pub fn new(
inject: impl Fn(LogicalPlan, QueryContextRef) -> LogicalPlan + Send + Sync + 'static,
) -> Self {
Self {
inject: Box::new(inject),
}
}
pub fn maybe_inject(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> LogicalPlan {
(self.inject)(plan, query_ctx)
}
}
pub type RemoteDynFilterReceiverInjectorRef = Arc<RemoteDynFilterReceiverInjector>;
/// A logical marker that is converted to a [`FilterExec`] carrying remote dynamic filters.
#[derive(Clone)]
pub struct RemoteDynFilterReceiverLogicalPlan {
input: Arc<LogicalPlan>,
dyn_filters: Vec<Arc<dyn PhysicalExpr>>,
}
impl RemoteDynFilterReceiverLogicalPlan {
pub fn new(input: LogicalPlan, dyn_filters: Vec<Arc<dyn PhysicalExpr>>) -> Self {
Self {
input: Arc::new(input),
dyn_filters,
}
}
pub fn name() -> &'static str {
"RemoteDynFilterReceiver"
}
pub fn into_logical_plan(self) -> LogicalPlan {
LogicalPlan::Extension(Extension {
node: Arc::new(self),
})
}
fn dyn_filters(&self) -> &[Arc<dyn PhysicalExpr>] {
&self.dyn_filters
}
fn ord_key(&self) -> String {
format!("input={:?}, dyn_filters={:?}", self.input, self.dyn_filters)
}
}
impl fmt::Debug for RemoteDynFilterReceiverLogicalPlan {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
UserDefinedLogicalNodeCore::fmt_for_explain(self, f)
}
}
impl Hash for RemoteDynFilterReceiverLogicalPlan {
fn hash<H: Hasher>(&self, state: &mut H) {
self.input.hash(state);
self.dyn_filters.hash(state);
}
}
impl PartialEq for RemoteDynFilterReceiverLogicalPlan {
fn eq(&self, other: &Self) -> bool {
self.input == other.input && self.dyn_filters == other.dyn_filters
}
}
impl Eq for RemoteDynFilterReceiverLogicalPlan {}
impl PartialOrd for RemoteDynFilterReceiverLogicalPlan {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.ord_key().cmp(&other.ord_key()))
}
}
impl UserDefinedLogicalNodeCore for RemoteDynFilterReceiverLogicalPlan {
fn name(&self) -> &str {
Self::name()
}
fn inputs(&self) -> Vec<&LogicalPlan> {
vec![self.input.as_ref()]
}
fn schema(&self) -> &DFSchemaRef {
self.input.schema()
}
fn expressions(&self) -> Vec<Expr> {
Vec::new()
}
fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
let mut required = output_columns.to_vec();
for filter in &self.dyn_filters {
required.extend(
collect_columns(filter)
.into_iter()
.map(|column| column.index()),
);
}
required.sort_unstable();
required.dedup();
Some(vec![required])
}
fn fmt_for_explain(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}: filters={}", Self::name(), self.dyn_filters.len())
}
fn with_exprs_and_inputs(
&self,
_exprs: Vec<Expr>,
mut inputs: Vec<LogicalPlan>,
) -> Result<Self> {
let input = inputs.pop().ok_or_else(|| {
DataFusionError::Internal(format!("Expected exactly one input with {}", Self::name()))
})?;
let dyn_filters = self
.dyn_filters
.iter()
.map(|filter| remap_physical_expr_columns(filter.clone(), input.schema().as_arrow()))
.collect::<Result<Vec<_>>>()?;
Ok(Self::new(input, dyn_filters))
}
}
fn remap_physical_expr_columns(
expr: Arc<dyn PhysicalExpr>,
input_schema: &datafusion::arrow::datatypes::Schema,
) -> Result<Arc<dyn PhysicalExpr>> {
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
return Ok(Arc::new(Column::new_with_schema(
column.name(),
input_schema,
)?));
}
let children = expr.children();
if children.is_empty() {
return Ok(expr);
}
let new_children = children
.into_iter()
.map(|child| remap_physical_expr_columns(child.clone(), input_schema))
.collect::<Result<Vec<_>>>()?;
expr.with_new_children(new_children)
}
pub struct RemoteDynFilterReceiverExtensionPlanner;
#[async_trait]
impl ExtensionPlanner for RemoteDynFilterReceiverExtensionPlanner {
async fn plan_extension(
&self,
_planner: &dyn PhysicalPlanner,
node: &dyn UserDefinedLogicalNode,
_logical_inputs: &[&LogicalPlan],
physical_inputs: &[Arc<dyn ExecutionPlan>],
_session_state: &SessionState,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let Some(receiver) = node
.as_any()
.downcast_ref::<RemoteDynFilterReceiverLogicalPlan>()
else {
return Ok(None);
};
let input = physical_inputs.first().cloned().ok_or_else(|| {
DataFusionError::Internal(format!("Expected exactly one input with {}", Self::name()))
})?;
if receiver.dyn_filters().is_empty() {
return Ok(Some(input));
}
let predicate = conjunction(receiver.dyn_filters().to_vec());
Ok(Some(Arc::new(FilterExec::try_new(predicate, input)?) as _))
}
}
impl RemoteDynFilterReceiverExtensionPlanner {
fn name() -> &'static str {
RemoteDynFilterReceiverLogicalPlan::name()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::physical_plan::expressions::{Column, DynamicFilterPhysicalExpr, lit};
use datafusion_common::DFSchema;
use datafusion_expr::{EmptyRelation, UserDefinedLogicalNodeCore};
use super::*;
fn empty_input() -> LogicalPlan {
let schema = Arc::new(Schema::new(vec![
Field::new(
"ts",
DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
false,
),
Field::new("value", DataType::Float64, true),
Field::new("instance", DataType::Utf8, true),
Field::new("job", DataType::Utf8, true),
]));
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(DFSchema::try_from(schema).unwrap()),
})
}
fn pruned_input() -> LogicalPlan {
let schema = Arc::new(Schema::new(vec![
Field::new(
"ts",
DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
false,
),
Field::new("instance", DataType::Utf8, true),
]));
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(DFSchema::try_from(schema).unwrap()),
})
}
#[test]
fn necessary_children_exprs_keeps_parent_and_filter_columns() {
let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(
vec![Arc::new(Column::new("ts", 0)) as Arc<_>],
lit(true) as _,
));
let plan = RemoteDynFilterReceiverLogicalPlan::new(empty_input(), vec![dyn_filter]);
// Parent only needs `value`, but the receiver must still keep `ts` for
// evaluating its dynamic filter after logical projection pruning.
let required = UserDefinedLogicalNodeCore::necessary_children_exprs(&plan, &[1]).unwrap();
assert_eq!(required, vec![vec![0, 1]]);
}
#[test]
fn necessary_children_exprs_is_transparent_without_filters() {
let plan = RemoteDynFilterReceiverLogicalPlan::new(empty_input(), vec![]);
let required =
UserDefinedLogicalNodeCore::necessary_children_exprs(&plan, &[1, 3]).unwrap();
assert_eq!(required, vec![vec![1, 3]]);
}
#[test]
fn with_exprs_and_inputs_remaps_dyn_filter_columns_to_pruned_input() {
let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(
vec![Arc::new(Column::new("instance", 2)) as Arc<_>],
lit(true) as _,
));
let plan = RemoteDynFilterReceiverLogicalPlan::new(empty_input(), vec![dyn_filter]);
let remapped =
UserDefinedLogicalNodeCore::with_exprs_and_inputs(&plan, vec![], vec![pruned_input()])
.unwrap();
let columns = collect_columns(&remapped.dyn_filters()[0]);
assert_eq!(columns.len(), 1);
let column = columns.iter().next().unwrap();
assert_eq!(column.name(), "instance");
assert_eq!(column.index(), 1);
}
}

View File

@@ -25,6 +25,9 @@ pub const FLOW_INCREMENTAL_AFTER_SEQS: &str = "flow.incremental_after_seqs";
pub const FLOW_INCREMENTAL_MODE: &str = "flow.incremental_mode";
pub const FLOW_RETURN_REGION_SEQ: &str = "flow.return_region_seq";
pub const FLOW_SINK_TABLE_ID: &str = "flow.sink_table_id";
/// Enable by default, set to false to explicitly disable.
pub const QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN: &str =
"query.enable_remote_dynamic_filter_pushdown";
pub const FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY: &str = "memtable_only";
@@ -106,7 +109,7 @@ impl FlowQueryExtensions {
let return_region_seq = extensions
.get(FLOW_RETURN_REGION_SEQ)
.map(|value| parse_bool(value.as_str()))
.map(|value| parse_bool(FLOW_RETURN_REGION_SEQ, value.as_str()))
.transpose()?
.unwrap_or(false);
@@ -184,6 +187,22 @@ impl FlowQueryExtensions {
}
}
/// Returns whether query-level remote dynamic filter propagation is enabled.
///
/// The option defaults to enabled to preserve existing behavior. Callers may set
/// `query.enable_remote_dynamic_filter_pushdown=false` in query context
/// extensions to disable FE->DN remote dynamic filter propagation for a single
/// query.
pub fn remote_dyn_filter_pushdown_enabled_from_extensions(
extensions: &HashMap<String, String>,
) -> Result<bool> {
extensions
.get(QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN)
.map(|value| parse_bool(QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN, value.as_str()))
.transpose()
.map(|value| value.unwrap_or(true))
}
/// Returns whether raw Flow query extensions request terminal region watermark collection.
///
/// This is only an intent/presence check for transport/scan plumbing; callers that need
@@ -249,13 +268,13 @@ fn parse_incremental_after_seqs(value: &str) -> Result<HashMap<u64, u64>> {
.collect()
}
fn parse_bool(value: &str) -> Result<bool> {
fn parse_bool(option_name: &str, value: &str) -> Result<bool> {
match value {
v if v.eq_ignore_ascii_case("true") => Ok(true),
v if v.eq_ignore_ascii_case("false") => Ok(false),
_ => Err(invalid_query_context_extension(format!(
"Invalid value for {}: {}",
FLOW_RETURN_REGION_SEQ, value
option_name, value
))),
}
}
@@ -276,6 +295,37 @@ mod flow_extension_tests {
assert_eq!(parsed, None);
}
#[test]
fn test_remote_dyn_filter_pushdown_enabled_from_extensions_defaults_true() {
assert!(remote_dyn_filter_pushdown_enabled_from_extensions(&HashMap::new()).unwrap());
}
#[test]
fn test_remote_dyn_filter_pushdown_enabled_from_extensions_parses_bool() {
let exts = HashMap::from([(
QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN.to_string(),
"false".to_string(),
)]);
assert!(!remote_dyn_filter_pushdown_enabled_from_extensions(&exts).unwrap());
let exts = HashMap::from([(
QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN.to_string(),
"true".to_string(),
)]);
assert!(remote_dyn_filter_pushdown_enabled_from_extensions(&exts).unwrap());
}
#[test]
fn test_remote_dyn_filter_pushdown_enabled_from_extensions_rejects_invalid_bool() {
let exts = HashMap::from([(
QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN.to_string(),
"invalid".to_string(),
)]);
let err = remote_dyn_filter_pushdown_enabled_from_extensions(&exts).unwrap_err();
assert!(format!("{err}").contains(QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN));
}
#[test]
fn test_parse_flow_extensions_memtable_only_success() {
let exts = HashMap::from([

View File

@@ -15,3 +15,94 @@
pub mod error;
pub mod label_values;
pub mod planner;
use datafusion_common::tree_node::{TreeNode as _, TreeNodeRecursion};
use datafusion_expr::{Extension, LogicalPlan};
use promql::extension_plan::{
Absent, EmptyMetric, HistogramFold, InstantManipulate, RangeManipulate, ScalarCalculate,
SeriesDivide, SeriesNormalize, UnionDistinctOn,
};
/// Returns true if the plan contains PromQL-specific extension plan nodes.
pub fn plan_contains_promql_extension(plan: &LogicalPlan) -> bool {
let mut found = false;
let _ = plan.apply(|node| {
if is_promql_extension_plan(node) {
found = true;
Ok(TreeNodeRecursion::Stop)
} else {
Ok(TreeNodeRecursion::Continue)
}
});
found
}
fn is_promql_extension_plan(plan: &LogicalPlan) -> bool {
let LogicalPlan::Extension(Extension { node }) = plan else {
return false;
};
node.as_any().is::<Absent>()
|| node.as_any().is::<EmptyMetric>()
|| node.as_any().is::<HistogramFold>()
|| node.as_any().is::<InstantManipulate>()
|| node.as_any().is::<RangeManipulate>()
|| node.as_any().is::<ScalarCalculate>()
|| node.as_any().is::<SeriesDivide>()
|| node.as_any().is::<SeriesNormalize>()
|| node.as_any().is::<UnionDistinctOn>()
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use datafusion_common::DFSchema;
use datafusion_expr::{EmptyRelation, Extension, LogicalPlanBuilder, col};
use super::*;
#[test]
fn plan_contains_promql_extension_returns_true_for_promql_extension() {
let plan = empty_metric_plan();
assert!(plan_contains_promql_extension(&plan));
}
#[test]
fn plan_contains_promql_extension_returns_true_for_nested_promql_extension() {
let plan = LogicalPlanBuilder::from(empty_metric_plan())
.project(vec![col("ts")])
.unwrap()
.build()
.unwrap();
assert!(plan_contains_promql_extension(&plan));
}
#[test]
fn plan_contains_promql_extension_returns_false_for_non_promql_plan() {
let plan = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(DFSchema::empty()),
});
assert!(!plan_contains_promql_extension(&plan));
}
fn empty_metric_plan() -> LogicalPlan {
let empty_metric = EmptyMetric::new(
0,
10_000,
5_000,
"ts".to_string(),
"greptime_value".to_string(),
None,
)
.unwrap();
LogicalPlan::Extension(Extension {
node: Arc::new(empty_metric),
})
}
}

View File

@@ -57,7 +57,8 @@ use table::table::adapter::DfTableProviderAdapter;
use crate::QueryEngineContext;
use crate::dist_plan::{
DistExtensionPlanner, DistPlannerAnalyzer, DistPlannerOptions, DynFilterRegistryManager,
MergeSortExtensionPlanner, RemoteDynFilterRegistryLease,
MergeSortExtensionPlanner, RemoteDynFilterReceiverExtensionPlanner,
RemoteDynFilterRegistryLease,
};
use crate::metrics::{QUERY_MEMORY_POOL_REJECTED_TOTAL, QUERY_MEMORY_POOL_USAGE_BYTES};
use crate::optimizer::ExtensionAnalyzerRule;
@@ -511,8 +512,11 @@ impl DfQueryPlanner {
partition_rule_manager: Option<PartitionRuleManagerRef>,
region_query_handler: Option<RegionQueryHandlerRef>,
) -> Self {
let mut planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>> =
vec![Arc::new(PromExtensionPlanner), Arc::new(RangeSelectPlanner)];
let mut planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>> = vec![
Arc::new(PromExtensionPlanner),
Arc::new(RangeSelectPlanner),
Arc::new(RemoteDynFilterReceiverExtensionPlanner),
];
if let (Some(region_query_handler), Some(partition_rule_manager)) =
(region_query_handler, partition_rule_manager)
{

View File

@@ -40,7 +40,9 @@ use datafusion::physical_plan::{
use datafusion_common::stats::Precision;
use datafusion_common::{ColumnStatistics, DataFusionError, Statistics};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalSortExpr};
use datafusion_physical_expr::{
EquivalenceProperties, Partitioning, PhysicalExpr, PhysicalSortExpr,
};
use datatypes::arrow::datatypes::SchemaRef as ArrowSchemaRef;
use datatypes::compute::SortOptions;
use futures::{Stream, StreamExt};
@@ -339,6 +341,21 @@ impl RegionScanExec {
pub fn set_explain_verbose(&mut self, explain_verbose: bool) {
self.explain_verbose = explain_verbose;
}
/// Adds dynamic filters directly to the underlying region scanner predicate.
///
/// This is the same mutation path used by DataFusion child-filter pushdown. Remote dynamic
/// filter updates install their shared runtime wrapper through this method at scan build time
/// and later update the wrapper state out-of-band.
pub fn add_dyn_filters_to_predicate(
&self,
filter_exprs: Vec<Arc<dyn PhysicalExpr>>,
) -> Vec<bool> {
self.scanner
.lock()
.unwrap()
.add_dyn_filter_to_predicate(filter_exprs)
}
}
impl ExecutionPlan for RegionScanExec {
@@ -451,11 +468,7 @@ impl ExecutionPlan for RegionScanExec {
.map(|f| f.filter)
.collect::<Vec<_>>();
let supported = self
.scanner
.lock()
.unwrap()
.add_dyn_filter_to_predicate(parent_filters);
let supported = self.add_dyn_filters_to_predicate(parent_filters);
// datafusion api require to clone self after mutate, even though we are only mutate inside mutex
let new_self = Arc::new(self.clone());

View File

@@ -18,6 +18,7 @@ mod instance_noop_wal_test;
mod instance_test;
mod promql_test;
mod reconcile_table;
mod remote_dyn_filter_test;
pub mod test_util;
use std::collections::HashMap;

View File

@@ -0,0 +1,509 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_query::Output;
use frontend::instance::Instance;
use query::datafusion::QUERY_PARALLELISM_HINT;
use query::options::QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::QueryContext;
use crate::test_util::execute_sql;
use crate::tests;
#[tokio::test(flavor = "multi_thread")]
async fn test_remote_dyn_filter_join_e2e() {
common_telemetry::init_default_ut_logging();
let distributed = tests::create_distributed_instance("test_remote_dyn_filter_join_e2e").await;
let frontend = distributed.frontend();
prepare_remote_dyn_filter_tables(&frontend).await;
let join_sql = remote_dyn_filter_join_sql();
let result = output_to_pretty_string(
execute_sql_with_query_parallelism_one(&frontend, join_sql, true).await,
)
.await;
assert_eq!(
result,
r#"+---+------+
| k | v |
+---+------+
| 2 | 20.0 |
| 4 | 40.0 |
+---+------+"#
);
let explain_sql = format!("EXPLAIN ANALYZE VERBOSE {join_sql}");
let explain = output_to_pretty_string(
execute_sql_with_query_parallelism_one(&frontend, &explain_sql, true).await,
)
.await;
assert_contains(&explain, "HashJoinExec: mode=CollectLeft");
assert_contains(&explain, "MergeScanExec");
assert_seq_scan_has_dyn_filter(&explain);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_remote_dyn_filter_left_join_e2e() {
common_telemetry::init_default_ut_logging();
let distributed =
tests::create_distributed_instance("test_remote_dyn_filter_left_join_e2e").await;
let frontend = distributed.frontend();
prepare_remote_dyn_filter_tables(&frontend).await;
execute_sql(
&frontend,
r#"
INSERT INTO rdf_build(k, ts) VALUES (9, 9000)
"#,
)
.await;
let join_sql = remote_dyn_filter_left_join_sql();
let result = output_to_pretty_string(
execute_sql_with_query_parallelism_one(&frontend, join_sql, true).await,
)
.await;
assert_eq!(
result,
r#"+---+------+
| k | v |
+---+------+
| 2 | 20.0 |
| 4 | 40.0 |
| 9 | -1.0 |
+---+------+"#
);
let explain_sql = format!("EXPLAIN ANALYZE VERBOSE {join_sql}");
let explain = output_to_pretty_string(
execute_sql_with_query_parallelism_one(&frontend, &explain_sql, true).await,
)
.await;
assert_contains(&explain, "HashJoinExec: mode=CollectLeft, join_type=Left");
assert_contains(&explain, "MergeScanExec");
assert_seq_scan_dyn_filter_contains(
&explain,
&[
"DynamicFilter [ k@0 >= 2 AND k@0 <= 9",
"k@0 IN (SET) ([2, 4, 9])",
],
);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_remote_dyn_filter_multi_column_join_e2e() {
common_telemetry::init_default_ut_logging();
let distributed =
tests::create_distributed_instance("test_remote_dyn_filter_multi_column_join_e2e").await;
let frontend = distributed.frontend();
prepare_remote_dyn_filter_multi_column_tables(&frontend).await;
let join_sql = remote_dyn_filter_multi_column_join_sql();
let result = output_to_pretty_string(
execute_sql_with_query_parallelism_one(&frontend, join_sql, true).await,
)
.await;
assert_eq!(
result,
r#"+---+----+-------+
| a | k | v |
+---+----+-------+
| 1 | 10 | 110.0 |
| 2 | 20 | 220.0 |
+---+----+-------+"#
);
let explain_sql = format!("EXPLAIN ANALYZE VERBOSE {join_sql}");
let explain = output_to_pretty_string(
execute_sql_with_query_parallelism_one(&frontend, &explain_sql, true).await,
)
.await;
assert_contains(&explain, "HashJoinExec: mode=CollectLeft");
assert_contains(&explain, "MergeScanExec");
assert_seq_scan_has_dyn_filter(&explain);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_remote_dyn_filter_large_join_e2e() {
common_telemetry::init_default_ut_logging();
let distributed =
tests::create_distributed_instance("test_remote_dyn_filter_large_join_e2e").await;
let frontend = distributed.frontend();
prepare_remote_dyn_filter_large_tables(&frontend).await;
let join_sql = remote_dyn_filter_large_join_sql();
let result = output_to_pretty_string(
execute_sql_with_query_parallelism_one(&frontend, join_sql, true).await,
)
.await;
assert_eq!(
result,
r#"+------+---------+
| k | v |
+------+---------+
| 3 | 30.0 |
| 129 | 1290.0 |
| 511 | 5110.0 |
| 900 | 9000.0 |
| 8195 | 81950.0 |
+------+---------+"#
);
let result_without_rdf = output_to_pretty_string(
execute_sql_with_query_parallelism_one(&frontend, join_sql, false).await,
)
.await;
assert_eq!(result_without_rdf, result);
let explain_sql = format!("EXPLAIN ANALYZE VERBOSE {join_sql}");
let explain = output_to_pretty_string(
execute_sql_with_query_parallelism_one(&frontend, &explain_sql, true).await,
)
.await;
assert_contains(&explain, "HashJoinExec: mode=CollectLeft");
assert_contains(&explain, "MergeScanExec");
assert_seq_scan_dyn_filter_contains(
&explain,
&[
"DynamicFilter [ k@0 >= 3 AND k@0 <= 8195",
"k@0 IN (SET) ([3, 129, 511, 900, 8195])",
],
);
let explain_without_rdf = output_to_pretty_string(
execute_sql_with_query_parallelism_one(&frontend, &explain_sql, false).await,
)
.await;
assert_no_seq_scan_dyn_filter(&explain_without_rdf);
}
async fn prepare_remote_dyn_filter_tables(frontend: &Arc<Instance>) {
execute_sql(
frontend,
r#"
CREATE TABLE rdf_probe(
k INT,
ts TIMESTAMP,
v DOUBLE,
TIME INDEX (ts),
PRIMARY KEY(k)
)
PARTITION ON COLUMNS (k) (
k < 2,
k >= 2 AND k < 4,
k >= 4 AND k < 6,
k >= 6
)
engine=mito
"#,
)
.await;
execute_sql(
frontend,
r#"
CREATE TABLE rdf_build(
k INT,
ts TIMESTAMP,
TIME INDEX (ts),
PRIMARY KEY(k)
) engine=mito
"#,
)
.await;
execute_sql(
frontend,
r#"
INSERT INTO rdf_probe(k, ts, v) VALUES
(1, 1000, 10.0),
(2, 2000, 20.0),
(3, 3000, 30.0),
(4, 4000, 40.0),
(7, 5000, 50.0)
"#,
)
.await;
execute_sql(
frontend,
r#"
INSERT INTO rdf_build(k, ts) VALUES
(2, 1000),
(4, 2000)
"#,
)
.await;
}
async fn prepare_remote_dyn_filter_multi_column_tables(frontend: &Arc<Instance>) {
execute_sql(
frontend,
r#"
CREATE TABLE rdf_multi_probe(
a INT,
k INT,
ts TIMESTAMP,
v DOUBLE,
TIME INDEX (ts),
PRIMARY KEY(a, k)
)
PARTITION ON COLUMNS (a) (
a < 2,
a >= 2 AND a < 3,
a >= 3 AND a < 4,
a >= 4
)
engine=mito
"#,
)
.await;
execute_sql(
frontend,
r#"
CREATE TABLE rdf_multi_build(
a INT,
k INT,
ts TIMESTAMP,
TIME INDEX (ts),
PRIMARY KEY(a, k)
) engine=mito
"#,
)
.await;
execute_sql(
frontend,
r#"
INSERT INTO rdf_multi_probe(a, k, ts, v) VALUES
(1, 10, 1000, 110.0),
(1, 11, 1100, 111.0),
(2, 10, 2000, 210.0),
(2, 20, 2200, 220.0),
(3, 30, 3000, 330.0),
(4, 40, 4000, 440.0)
"#,
)
.await;
execute_sql(
frontend,
r#"
INSERT INTO rdf_multi_build(a, k, ts) VALUES
(1, 10, 1000),
(2, 20, 2000)
"#,
)
.await;
}
async fn prepare_remote_dyn_filter_large_tables(frontend: &Arc<Instance>) {
execute_sql(
frontend,
r#"
CREATE TABLE rdf_large_probe(
k INT,
ts TIMESTAMP,
v DOUBLE,
TIME INDEX (ts),
PRIMARY KEY(k)
)
PARTITION ON COLUMNS (k) (
k < 1024,
k >= 1024 AND k < 2048,
k >= 2048 AND k < 3072,
k >= 3072 AND k < 4096,
k >= 4096 AND k < 5120,
k >= 5120 AND k < 6144,
k >= 6144 AND k < 7168,
k >= 7168
)
engine=mito
"#,
)
.await;
execute_sql(
frontend,
r#"
CREATE TABLE rdf_large_build(
k INT,
ts TIMESTAMP,
TIME INDEX (ts),
PRIMARY KEY(k)
) engine=mito
"#,
)
.await;
for start in (0..8192).step_by(1024) {
insert_remote_dyn_filter_large_probe_range(frontend, start, start + 1024).await;
}
execute_sql(frontend, "ADMIN FLUSH_TABLE('rdf_large_probe')").await;
// Keep a few rows in memtable after flush so the same query covers both
// flushed SST/file data and newly written memtable data.
insert_remote_dyn_filter_large_probe_range(frontend, 8192, 8200).await;
execute_sql(
frontend,
r#"
INSERT INTO rdf_large_build(k, ts) VALUES
(3, 3000),
(129, 129000),
(511, 511000),
(900, 900000),
(8195, 8195000)
"#,
)
.await;
}
async fn insert_remote_dyn_filter_large_probe_range(
frontend: &Arc<Instance>,
start: usize,
end: usize,
) {
let values = (start..end)
.map(|k| format!("({k}, {k}, {}.0)", k * 10))
.collect::<Vec<_>>()
.join(",");
let insert_probe_sql = format!("INSERT INTO rdf_large_probe(k, ts, v) VALUES {values}");
execute_sql(frontend, &insert_probe_sql).await;
}
fn remote_dyn_filter_join_sql() -> &'static str {
r#"
SELECT p.k, p.v
FROM rdf_build b
JOIN rdf_probe p ON p.k = b.k
ORDER BY p.k
"#
}
fn remote_dyn_filter_left_join_sql() -> &'static str {
r#"
SELECT b.k,
CASE WHEN p.v IS NULL THEN -1.0 ELSE p.v END AS v
FROM rdf_build b
LEFT JOIN rdf_probe p ON p.k = b.k
ORDER BY b.k
"#
}
fn remote_dyn_filter_multi_column_join_sql() -> &'static str {
r#"
SELECT p.a, p.k, p.v
FROM rdf_multi_build b
JOIN rdf_multi_probe p ON p.a = b.a AND p.k = b.k
ORDER BY p.a, p.k
"#
}
fn remote_dyn_filter_large_join_sql() -> &'static str {
r#"
SELECT p.k, p.v
FROM rdf_large_build b
JOIN rdf_large_probe p ON p.k = b.k
ORDER BY p.k
"#
}
async fn output_to_pretty_string(output: Output) -> String {
output.data.pretty_print().await
}
async fn execute_sql_with_query_parallelism_one(
instance: &Arc<Instance>,
sql: &str,
remote_dyn_filter_enabled: bool,
) -> Output {
let mut query_ctx = QueryContext::with_db_name(None);
query_ctx.set_extension(QUERY_PARALLELISM_HINT, "1");
if !remote_dyn_filter_enabled {
query_ctx.set_extension(QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN, "false");
}
SqlQueryHandler::do_query(instance.as_ref(), sql, Arc::new(query_ctx))
.await
.remove(0)
.unwrap()
}
fn assert_no_seq_scan_dyn_filter(explain: &str) {
let seq_scan_dyn_filter_lines = explain
.lines()
.filter(|line| line.contains("SeqScan: region=") && line.contains("\"dyn_filters\""))
.collect::<Vec<_>>();
assert!(
seq_scan_dyn_filter_lines.is_empty(),
"expected no region SeqScan line with dyn_filters; actual SeqScan dyn_filters lines:\n{}\n\nfull explain:\n{explain}",
seq_scan_dyn_filter_lines.join("\n")
);
}
fn assert_contains(haystack: &str, needle: &str) {
assert!(
haystack.contains(needle),
"expected to find {needle:?} in:\n{haystack}"
);
}
fn assert_seq_scan_dyn_filter_contains(explain: &str, needles: &[&str]) {
let seq_scan_dyn_filter_lines = explain
.lines()
.filter(|line| line.contains("SeqScan: region=") && line.contains("\"dyn_filters\""))
.collect::<Vec<_>>();
assert!(
!seq_scan_dyn_filter_lines.is_empty(),
"expected at least one region SeqScan line with dyn_filters in:\n{explain}"
);
let matched_line = seq_scan_dyn_filter_lines
.iter()
.find(|line| needles.iter().all(|needle| line.contains(needle)));
assert!(
matched_line.is_some(),
"expected one region SeqScan dyn_filters line containing all of {needles:?}; actual SeqScan dyn_filters lines:\n{}\n\nfull explain:\n{explain}",
seq_scan_dyn_filter_lines.join("\n")
);
}
fn assert_seq_scan_has_dyn_filter(explain: &str) {
let has_dyn_filter = explain
.lines()
.any(|line| line.contains("SeqScan: region=") && line.contains("\"dyn_filters\""));
assert!(
has_dyn_filter,
"expected at least one region SeqScan line with dyn_filters in:\n{explain}"
);
}

View File

@@ -76,6 +76,8 @@ WHERE c.tier = 'gold';
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
-- SQLNESS REPLACE ,\s"dyn_filters":\s\["DynamicFilter\s\[[^"]*\]"\] , "dyn_filters": ["DynamicFilter [ REDACTED ]"]
-- SQLNESS REPLACE metrics=REDACTED\s*\| metrics=REDACTED_|
EXPLAIN ANALYZE VERBOSE SELECT o."id", o.amount, c."name", c.tier
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
@@ -94,10 +96,10 @@ WHERE c.tier = 'gold';
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_CooperativeExec metrics=REDACTED_|
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["id", "customer_id", "amount", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED |
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["id", "customer_id", "amount", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED_|
|_|_|_|
| 1_| 0_|_CooperativeExec metrics=REDACTED_|
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["customer_id", "name", "tier", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED_|
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["customer_id", "name", "tier", "ts"], "dyn_filters": ["DynamicFilter [ REDACTED ]"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED_|
|_|_|_|
|_|_| Total rows: REDACTED_|
+-+-+-+
@@ -201,6 +203,8 @@ WHERE c.tier IN ('gold', 'silver');
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
-- SQLNESS REPLACE ,\s"dyn_filters":\s\["DynamicFilter\s\[[^"]*\]"\] , "dyn_filters": ["DynamicFilter [ REDACTED ]"]
-- SQLNESS REPLACE metrics=REDACTED\s*\| metrics=REDACTED_|
EXPLAIN ANALYZE VERBOSE SELECT o."id", o.amount, c."name", c.tier
FROM (SELECT "id", customer_id as cid, amount, ts FROM orders) o
JOIN customers c ON o.cid = c.customer_id
@@ -220,10 +224,10 @@ WHERE c.tier IN ('gold', 'silver');
|_|_|_|
| 1_| 0_|_ProjectionExec: expr=[id@0 as id, customer_id@1 as cid, amount@2 as amount, ts@3 as ts] metrics=REDACTED_|
|_|_|_CooperativeExec metrics=REDACTED_|
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["id", "customer_id", "amount", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED |
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["id", "customer_id", "amount", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED_|
|_|_|_|
| 1_| 0_|_CooperativeExec metrics=REDACTED_|
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["customer_id", "name", "tier", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED_|
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["customer_id", "name", "tier", "ts"], "dyn_filters": ["DynamicFilter [ REDACTED ]"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED_|
|_|_|_|
|_|_| Total rows: REDACTED_|
+-+-+-+

View File

@@ -41,6 +41,8 @@ WHERE c.tier = 'gold';
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
-- SQLNESS REPLACE ,\s"dyn_filters":\s\["DynamicFilter\s\[[^"]*\]"\] , "dyn_filters": ["DynamicFilter [ REDACTED ]"]
-- SQLNESS REPLACE metrics=REDACTED\s*\| metrics=REDACTED_|
EXPLAIN ANALYZE VERBOSE SELECT o."id", o.amount, c."name", c.tier
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
@@ -96,6 +98,8 @@ WHERE c.tier IN ('gold', 'silver');
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
-- SQLNESS REPLACE ,\s"dyn_filters":\s\["DynamicFilter\s\[[^"]*\]"\] , "dyn_filters": ["DynamicFilter [ REDACTED ]"]
-- SQLNESS REPLACE metrics=REDACTED\s*\| metrics=REDACTED_|
EXPLAIN ANALYZE VERBOSE SELECT o."id", o.amount, c."name", c.tier
FROM (SELECT "id", customer_id as cid, amount, ts FROM orders) o
JOIN customers c ON o.cid = c.customer_id

View File

@@ -92,6 +92,8 @@ WHERE c.tier IN ('gold', 'bronze');
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
-- SQLNESS REPLACE ,\s"dyn_filters":\s\["DynamicFilter\s\[[^"]*\]"\] , "dyn_filters": ["DynamicFilter [ REDACTED ]"]
-- SQLNESS REPLACE metrics=REDACTED\s*\| metrics=REDACTED_|
EXPLAIN ANALYZE VERBOSE SELECT top_orders."id", top_orders.amount, c."name", c.tier
FROM (
SELECT "id", customer_id, amount, ts
@@ -116,10 +118,10 @@ WHERE c.tier IN ('gold', 'bronze');
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [amount@2 DESC], fetch=5 metrics=REDACTED_|
|_|_|_SortExec: TopK(fetch=5), expr=[amount@2 DESC], preserve_partitioning=[true] metrics=REDACTED_|
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["id", "customer_id", "amount", "ts"], "dyn_filters": ["DynamicFilter [ empty ]"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED |
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["id", "customer_id", "amount", "ts"], "dyn_filters": ["DynamicFilter [ REDACTED ]"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED_|
|_|_|_|
| 1_| 0_|_CooperativeExec metrics=REDACTED_|
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["customer_id", "name", "tier", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED_|
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["customer_id", "name", "tier", "ts"], "dyn_filters": ["DynamicFilter [ REDACTED ]"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED_|
|_|_|_|
|_|_| Total rows: REDACTED_|
+-+-+-+
@@ -246,6 +248,8 @@ LIMIT 4;
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
-- SQLNESS REPLACE ,\s"dyn_filters":\s\["DynamicFilter\s\[[^"]*\]"\] , "dyn_filters": ["DynamicFilter [ REDACTED ]"]
-- SQLNESS REPLACE metrics=REDACTED\s*\| metrics=REDACTED_|
EXPLAIN ANALYZE VERBOSE SELECT "id", customer_id, "name", tier, amount
FROM (
SELECT o."id", o.customer_id, c."name", c.tier, o.amount
@@ -272,10 +276,10 @@ LIMIT 4;
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_CooperativeExec metrics=REDACTED_|
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["id", "customer_id", "amount", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED |
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["id", "customer_id", "amount", "ts"], "dyn_filters": ["DynamicFilter [ REDACTED ]"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED_|
|_|_|_|
| 1_| 0_|_CooperativeExec metrics=REDACTED_|
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["customer_id", "name", "tier", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED_|
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["customer_id", "name", "tier", "ts"], "dyn_filters": ["DynamicFilter [ REDACTED ]"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED_|
|_|_|_|
|_|_| Total rows: REDACTED_|
+-+-+-+
@@ -420,6 +424,8 @@ WHERE c.tier IN ('gold', 'silver')
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
-- SQLNESS REPLACE ,\s"dyn_filters":\s\["DynamicFilter\s\[[^"]*\]"\] , "dyn_filters": ["DynamicFilter [ REDACTED ]"]
-- SQLNESS REPLACE metrics=REDACTED\s*\| metrics=REDACTED_|
EXPLAIN ANALYZE VERBOSE SELECT o."id", o.amount, c."name", c.tier, p."name" as product_name, p."category"
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
@@ -447,13 +453,13 @@ WHERE c.tier IN ('gold', 'silver')
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_CooperativeExec metrics=REDACTED_|
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["id", "customer_id", "product_id", "amount", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED |
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["id", "customer_id", "product_id", "amount", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED_|
|_|_|_|
| 1_| 0_|_CooperativeExec metrics=REDACTED_|
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["customer_id", "name", "tier", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED_|
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["customer_id", "name", "tier", "ts"], "dyn_filters": ["DynamicFilter [ REDACTED ]"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED_|
|_|_|_|
| 1_| 0_|_CooperativeExec metrics=REDACTED_|
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["product_id", "name", "category", "ts"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED_|
|_|_|_SeqScan: region=REDACTED, {"partition_count":REDACTED, "projection": ["product_id", "name", "category", "ts"], "dyn_filters": ["DynamicFilter [ REDACTED ]"], "flat_format":REDACTED, "metrics_per_partition": REDACTED metrics=REDACTED_|
|_|_|_|
|_|_| Total rows: REDACTED_|
+-+-+-+

View File

@@ -54,6 +54,8 @@ WHERE c.tier IN ('gold', 'bronze');
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
-- SQLNESS REPLACE ,\s"dyn_filters":\s\["DynamicFilter\s\[[^"]*\]"\] , "dyn_filters": ["DynamicFilter [ REDACTED ]"]
-- SQLNESS REPLACE metrics=REDACTED\s*\| metrics=REDACTED_|
EXPLAIN ANALYZE VERBOSE SELECT top_orders."id", top_orders.amount, c."name", c.tier
FROM (
SELECT "id", customer_id, amount, ts
@@ -132,6 +134,8 @@ LIMIT 4;
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
-- SQLNESS REPLACE ,\s"dyn_filters":\s\["DynamicFilter\s\[[^"]*\]"\] , "dyn_filters": ["DynamicFilter [ REDACTED ]"]
-- SQLNESS REPLACE metrics=REDACTED\s*\| metrics=REDACTED_|
EXPLAIN ANALYZE VERBOSE SELECT "id", customer_id, "name", tier, amount
FROM (
SELECT o."id", o.customer_id, c."name", c.tier, o.amount
@@ -212,6 +216,8 @@ WHERE c.tier IN ('gold', 'silver')
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
-- SQLNESS REPLACE ,\s"dyn_filters":\s\["DynamicFilter\s\[[^"]*\]"\] , "dyn_filters": ["DynamicFilter [ REDACTED ]"]
-- SQLNESS REPLACE metrics=REDACTED\s*\| metrics=REDACTED_|
EXPLAIN ANALYZE VERBOSE SELECT o."id", o.amount, c."name", c.tier, p."name" as product_name, p."category"
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id

View File

@@ -0,0 +1,46 @@
-- Regression test for remote dynamic filter pushdown preserving a HAVING filter.
CREATE TABLE rdf_having_dim(k INTEGER, label STRING, ts TIMESTAMP TIME INDEX);
Affected Rows: 0
CREATE TABLE rdf_having_fact(k INTEGER, v DOUBLE, ts TIMESTAMP TIME INDEX);
Affected Rows: 0
INSERT INTO rdf_having_dim VALUES
(1, 'keep', 1000),
(2, 'drop', 2000);
Affected Rows: 2
INSERT INTO rdf_having_fact VALUES
(1, 200.0, 1000),
(1, 150.0, 2000),
(2, 100.0, 3000);
Affected Rows: 3
SELECT d.label, s.total
FROM rdf_having_dim d
INNER JOIN (
SELECT k, SUM(v) AS total
FROM rdf_having_fact
GROUP BY k
HAVING SUM(v) > 300
) s ON d.k = s.k
ORDER BY s.total DESC, d.label;
+-------+-------+
| label | total |
+-------+-------+
| keep | 350.0 |
+-------+-------+
DROP TABLE rdf_having_dim;
Affected Rows: 0
DROP TABLE rdf_having_fact;
Affected Rows: 0

View File

@@ -0,0 +1,26 @@
-- Regression test for remote dynamic filter pushdown preserving a HAVING filter.
CREATE TABLE rdf_having_dim(k INTEGER, label STRING, ts TIMESTAMP TIME INDEX);
CREATE TABLE rdf_having_fact(k INTEGER, v DOUBLE, ts TIMESTAMP TIME INDEX);
INSERT INTO rdf_having_dim VALUES
(1, 'keep', 1000),
(2, 'drop', 2000);
INSERT INTO rdf_having_fact VALUES
(1, 200.0, 1000),
(1, 150.0, 2000),
(2, 100.0, 3000);
SELECT d.label, s.total
FROM rdf_having_dim d
INNER JOIN (
SELECT k, SUM(v) AS total
FROM rdf_having_fact
GROUP BY k
HAVING SUM(v) > 300
) s ON d.k = s.k
ORDER BY s.total DESC, d.label;
DROP TABLE rdf_having_dim;
DROP TABLE rdf_having_fact;

View File

@@ -299,12 +299,16 @@ explain analyze select tag from t where num > 6 order by ts desc limit 2;
|_|_|_SortExec: TopK(fetch=2), expr=[ts@1 DESC], preserve_partitioning=[true] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [ts@1 DESC], fetch=2 REDACTED
| 1_| 0_|_FilterExec: DynamicFilter [ true ] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_SortPreservingMergeExec: [ts@1 DESC], fetch=2 REDACTED
|_|_|_SortExec: TopK(fetch=2), expr=[ts@1 DESC], preserve_partitioning=[true] REDACTED
|_|_|_FilterExec: num@2 > 6, projection=[tag@0, ts@1] REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|_|_|_|
| 1_| 1_|_SortPreservingMergeExec: [ts@1 DESC], fetch=2 REDACTED
| 1_| 1_|_FilterExec: DynamicFilter [ true ] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_SortPreservingMergeExec: [ts@1 DESC], fetch=2 REDACTED
|_|_|_SortExec: TopK(fetch=2), expr=[ts@1 DESC], preserve_partitioning=[true] REDACTED
|_|_|_FilterExec: num@2 > 6, projection=[tag@0, ts@1] REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED

View File

@@ -319,7 +319,7 @@ select '2008-01-01 00:00:11.1'::TIMESTAMP_US = '2008-01-01 00:00:11.1'::TIMESTAM
+----------------------------------------------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2008-01-01 00:00:11.1"),Utf8("Timestamp(µs)")) = arrow_cast(Utf8("2008-01-01 00:00:11.1"),Utf8("Timestamp(s)")) |
+----------------------------------------------------------------------------------------------------------------------------------+
| true |
| false |
+----------------------------------------------------------------------------------------------------------------------------------+
select '2008-01-01 00:00:11.1'::TIMESTAMP_MS = '2008-01-01 00:00:11'::TIMESTAMP_NS;
@@ -335,7 +335,7 @@ select '2008-01-01 00:00:11.1'::TIMESTAMP_MS = '2008-01-01 00:00:11'::TIMESTAMP_
+--------------------------------------------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2008-01-01 00:00:11.1"),Utf8("Timestamp(ms)")) = arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(s)")) |
+--------------------------------------------------------------------------------------------------------------------------------+
| true |
| false |
+--------------------------------------------------------------------------------------------------------------------------------+
select '2008-01-01 00:00:11.1'::TIMESTAMP_NS = '2008-01-01 00:00:11'::TIMESTAMP_S;
@@ -343,7 +343,7 @@ select '2008-01-01 00:00:11.1'::TIMESTAMP_NS = '2008-01-01 00:00:11'::TIMESTAMP_
+--------------------------------------------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2008-01-01 00:00:11.1"),Utf8("Timestamp(ns)")) = arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(s)")) |
+--------------------------------------------------------------------------------------------------------------------------------+
| true |
| false |
+--------------------------------------------------------------------------------------------------------------------------------+
select '2008-01-01 00:00:11'::TIMESTAMP_MS = '2008-01-01 00:00:11'::TIMESTAMP_NS;