From 2cc1c72e4fe459f6518d6f683d765200db5d77ef Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 13 May 2026 10:53:42 +0800 Subject: [PATCH] feat: bump datafusion to 53 Signed-off-by: Ruihang Xia --- Cargo.lock | 577 +++++++++--------- Cargo.toml | 92 +-- src/common/datasource/Cargo.toml | 4 +- src/common/datasource/src/file_format/orc.rs | 1 + .../datasource/src/file_format/parquet.rs | 9 +- .../datasource/src/file_format/tests.rs | 2 +- src/file-engine/Cargo.toml | 1 - src/file-engine/src/query/file_stream.rs | 2 +- src/mito2/src/cache/test_util.rs | 5 +- src/mito2/src/memtable/bulk/part.rs | 2 +- src/mito2/src/memtable/partition_tree/data.rs | 2 +- src/mito2/src/sst/parquet.rs | 2 +- src/mito2/src/sst/parquet/writer.rs | 2 +- src/object-store/Cargo.toml | 1 - src/object-store/src/compat.rs | 116 ++-- src/object-store/tests/object_store_test.rs | 4 +- src/query/src/part_sort.rs | 555 +++++------------ src/servers/Cargo.toml | 2 +- src/table/src/predicate.rs | 2 +- 19 files changed, 582 insertions(+), 799 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4d35a7e9bb..5c96b4e60b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -214,7 +214,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c" name = "api" version = "1.0.0" dependencies = [ - "arrow-schema 57.3.0", + "arrow-schema 58.3.0", "common-base", "common-decimal", "common-error", @@ -316,23 +316,23 @@ dependencies = [ [[package]] name = "arrow" -version = "57.3.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4754a624e5ae42081f464514be454b39711daae0458906dacde5f4c632f33a8" +checksum = "378530e55cd479eda3c14eb345310799717e6f76d0c332041e8487022166b471" dependencies = [ - "arrow-arith 57.3.0", - "arrow-array 57.3.0", - "arrow-buffer 57.3.0", - "arrow-cast 57.3.0", - "arrow-csv 57.3.0", - "arrow-data 57.3.0", - "arrow-ipc 57.3.0", - "arrow-json 57.3.0", - "arrow-ord 57.3.0", - "arrow-row 57.3.0", - "arrow-schema 57.3.0", - "arrow-select 57.3.0", - "arrow-string 57.3.0", + "arrow-arith 58.3.0", + "arrow-array 58.3.0", + "arrow-buffer 58.3.0", + "arrow-cast 58.3.0", + "arrow-csv 58.3.0", + "arrow-data 58.3.0", + "arrow-ipc 58.3.0", + "arrow-json 58.3.0", + "arrow-ord 58.3.0", + "arrow-row 58.3.0", + "arrow-schema 58.3.0", + "arrow-select 58.3.0", + "arrow-string 58.3.0", ] [[package]] @@ -351,14 +351,14 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "57.3.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7b3141e0ec5145a22d8694ea8b6d6f69305971c4fa1c1a13ef0195aef2d678b" +checksum = "a0ab212d2c1886e802f51c5212d78ebbcbb0bec980fff9dadc1eb8d45cd0b738" dependencies = [ - "arrow-array 57.3.0", - "arrow-buffer 57.3.0", - "arrow-data 57.3.0", - "arrow-schema 57.3.0", + "arrow-array 58.3.0", + "arrow-buffer 58.3.0", + "arrow-data 58.3.0", + "arrow-schema 58.3.0", "chrono", "num-traits", ] @@ -381,18 +381,18 @@ dependencies = [ [[package]] name = "arrow-array" -version = "57.3.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c8955af33b25f3b175ee10af580577280b4bd01f7e823d94c7cdef7cf8c9aef" +checksum = "cfd33d3e92f207444098c75b42de99d329562be0cf686b307b097cc52b4e999e" dependencies = [ "ahash 0.8.12", - "arrow-buffer 57.3.0", - "arrow-data 57.3.0", - "arrow-schema 57.3.0", + "arrow-buffer 58.3.0", + "arrow-data 58.3.0", + "arrow-schema 58.3.0", "chrono", "chrono-tz", "half", - "hashbrown 0.16.1", + "hashbrown 0.17.1", "num-complex", "num-integer", "num-traits", @@ -411,9 +411,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "57.3.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c697ddca96183182f35b3a18e50b9110b11e916d7b7799cbfd4d34662f2c56c2" +checksum = "0c6cd424c2693bcdbc150d843dc9d4d137dd2de4782ce6df491ad11a3a0416c0" dependencies = [ "bytes", "half", @@ -443,16 +443,16 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "57.3.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "646bbb821e86fd57189c10b4fcdaa941deaf4181924917b0daa92735baa6ada5" +checksum = "4c5aefb56a2c02e9e2b30746241058b85f8983f0fcff2ba0c6d09006e1cded7f" dependencies = [ - "arrow-array 57.3.0", - "arrow-buffer 57.3.0", - "arrow-data 57.3.0", - "arrow-ord 57.3.0", - "arrow-schema 57.3.0", - "arrow-select 57.3.0", + "arrow-array 58.3.0", + "arrow-buffer 58.3.0", + "arrow-data 58.3.0", + "arrow-ord 58.3.0", + "arrow-schema 58.3.0", + "arrow-select 58.3.0", "atoi", "base64 0.22.1", "chrono", @@ -480,13 +480,13 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "57.3.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8da746f4180004e3ce7b83c977daf6394d768332349d3d913998b10a120b790a" +checksum = "e94e8cf7e517657a52b91ea1263acf38c4ca62a84655d72458a3359b12ab97de" dependencies = [ - "arrow-array 57.3.0", - "arrow-cast 57.3.0", - "arrow-schema 57.3.0", + "arrow-array 58.3.0", + "arrow-cast 58.3.0", + "arrow-schema 58.3.0", "chrono", "csv", "csv-core", @@ -507,12 +507,12 @@ dependencies = [ [[package]] name = "arrow-data" -version = "57.3.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fdd994a9d28e6365aa78e15da3f3950c0fdcea6b963a12fa1c391afb637b304" +checksum = "3c88210023a2bfee1896af366309a3028fc3bcbd6515fa29a7990ee1baa08ee0" dependencies = [ - "arrow-buffer 57.3.0", - "arrow-schema 57.3.0", + "arrow-buffer 58.3.0", + "arrow-schema 58.3.0", "half", "num-integer", "num-traits", @@ -520,15 +520,15 @@ dependencies = [ [[package]] name = "arrow-flight" -version = "57.3.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58c5b083668e6230eae3eab2fc4b5fb989974c845d0aa538dde61a4327c78675" +checksum = "28abfe8bf9f124e5fc83b334af4fa58f8d0323ad25312ccb2d1da50178415704" dependencies = [ - "arrow-array 57.3.0", - "arrow-buffer 57.3.0", - "arrow-cast 57.3.0", - "arrow-ipc 57.3.0", - "arrow-schema 57.3.0", + "arrow-array 58.3.0", + "arrow-buffer 58.3.0", + "arrow-cast 58.3.0", + "arrow-ipc 58.3.0", + "arrow-schema 58.3.0", "base64 0.22.1", "bytes", "futures", @@ -555,17 +555,17 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "57.3.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abf7df950701ab528bf7c0cf7eeadc0445d03ef5d6ffc151eaae6b38a58feff1" +checksum = "238438f0834483703d88896db6fe5a7138b2230debc31b34c0336c2996e3c64f" dependencies = [ - "arrow-array 57.3.0", - "arrow-buffer 57.3.0", - "arrow-data 57.3.0", - "arrow-schema 57.3.0", - "arrow-select 57.3.0", + "arrow-array 58.3.0", + "arrow-buffer 58.3.0", + "arrow-data 58.3.0", + "arrow-schema 58.3.0", + "arrow-select 58.3.0", "flatbuffers", - "lz4_flex 0.12.1", + "lz4_flex 0.13.1", "zstd", ] @@ -593,15 +593,16 @@ dependencies = [ [[package]] name = "arrow-json" -version = "57.3.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ff8357658bedc49792b13e2e862b80df908171275f8e6e075c460da5ee4bf86" +checksum = "205ca2119e6d679d5c133c6f30e68f027738d95ed948cf77677ea69c7800036b" dependencies = [ - "arrow-array 57.3.0", - "arrow-buffer 57.3.0", - "arrow-cast 57.3.0", - "arrow-data 57.3.0", - "arrow-schema 57.3.0", + "arrow-array 58.3.0", + "arrow-buffer 58.3.0", + "arrow-cast 58.3.0", + "arrow-ord 58.3.0", + "arrow-schema 58.3.0", + "arrow-select 58.3.0", "chrono", "half", "indexmap 2.13.0", @@ -630,27 +631,28 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "57.3.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7d8f1870e03d4cbed632959498bcc84083b5a24bded52905ae1695bd29da45b" +checksum = "1bffd8fd2579286a5d63bac898159873e5094a79009940bcb42bbfce4f19f1d0" dependencies = [ - "arrow-array 57.3.0", - "arrow-buffer 57.3.0", - "arrow-data 57.3.0", - "arrow-schema 57.3.0", - "arrow-select 57.3.0", + "arrow-array 58.3.0", + "arrow-buffer 58.3.0", + "arrow-data 58.3.0", + "arrow-schema 58.3.0", + "arrow-select 58.3.0", ] [[package]] name = "arrow-pg" -version = "0.12.2" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87077429e1f81c2a024d0a62eaec0bbc101913c0ba7ce676bf90d21a4dbc79f0" +checksum = "34ec6f5d8b2025c5950e554ec2b3b4c4d6bd55b4d59b9f50c2b5eed4906c0f64" dependencies = [ - "arrow 57.3.0", - "arrow-schema 57.3.0", + "arrow 58.3.0", + "arrow-schema 58.3.0", "bytes", "chrono", + "datafusion", "futures", "pg_interval_2", "pgwire", @@ -673,14 +675,14 @@ dependencies = [ [[package]] name = "arrow-row" -version = "57.3.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18228633bad92bff92a95746bbeb16e5fc318e8382b75619dec26db79e4de4c0" +checksum = "bab5994731204603c73ba69267616c50f80780774c6bb0476f1f830625115e0c" dependencies = [ - "arrow-array 57.3.0", - "arrow-buffer 57.3.0", - "arrow-data 57.3.0", - "arrow-schema 57.3.0", + "arrow-array 58.3.0", + "arrow-buffer 58.3.0", + "arrow-data 58.3.0", + "arrow-schema 58.3.0", "half", ] @@ -692,9 +694,9 @@ checksum = "b3aa9e59c611ebc291c28582077ef25c97f1975383f1479b12f3b9ffee2ffabe" [[package]] name = "arrow-schema" -version = "57.3.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c872d36b7bf2a6a6a2b40de9156265f0242910791db366a2c17476ba8330d68" +checksum = "f633dbfdf39c039ada1bf9e34c694816eb71fbb7dc78f613993b7245e078a1ed" dependencies = [ "serde", "serde_core", @@ -717,15 +719,15 @@ dependencies = [ [[package]] name = "arrow-select" -version = "57.3.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68bf3e3efbd1278f770d67e5dc410257300b161b93baedb3aae836144edcaf4b" +checksum = "8cd065c54172ac787cf3f2f8d4107e0d3fdc26edba76fdf4f4cc170258942222" dependencies = [ "ahash 0.8.12", - "arrow-array 57.3.0", - "arrow-buffer 57.3.0", - "arrow-data 57.3.0", - "arrow-schema 57.3.0", + "arrow-array 58.3.0", + "arrow-buffer 58.3.0", + "arrow-data 58.3.0", + "arrow-schema 58.3.0", "num-traits", ] @@ -748,15 +750,15 @@ dependencies = [ [[package]] name = "arrow-string" -version = "57.3.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85e968097061b3c0e9fe3079cf2e703e487890700546b5b0647f60fca1b5a8d8" +checksum = "29dd7cda3ab9692f43a2e4acc444d760cc17b12bb6d8232ddf64e9bab7c06b42" dependencies = [ - "arrow-array 57.3.0", - "arrow-buffer 57.3.0", - "arrow-data 57.3.0", - "arrow-schema 57.3.0", - "arrow-select 57.3.0", + "arrow-array 58.3.0", + "arrow-buffer 58.3.0", + "arrow-data 58.3.0", + "arrow-schema 58.3.0", + "arrow-select 58.3.0", "memchr", "num-traits", "regex", @@ -1585,8 +1587,8 @@ name = "catalog" version = "1.0.0" dependencies = [ "api", - "arrow 57.3.0", - "arrow-schema 57.3.0", + "arrow 58.3.0", + "arrow-schema 58.3.0", "async-stream", "async-trait", "bytes", @@ -2169,7 +2171,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c" dependencies = [ "lazy_static", - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] @@ -2248,8 +2250,8 @@ dependencies = [ name = "common-datasource" version = "1.0.0" dependencies = [ - "arrow 57.3.0", - "arrow-schema 57.3.0", + "arrow 58.3.0", + "arrow-schema 58.3.0", "async-compression", "async-trait", "bytes", @@ -2355,9 +2357,9 @@ dependencies = [ "api", "approx 0.5.1", "arc-swap", - "arrow 57.3.0", - "arrow-cast 57.3.0", - "arrow-schema 57.3.0", + "arrow 58.3.0", + "arrow-cast 58.3.0", + "arrow-schema 58.3.0", "async-trait", "bincode", "catalog", @@ -2748,7 +2750,7 @@ dependencies = [ name = "common-sql" version = "1.0.0" dependencies = [ - "arrow-schema 57.3.0", + "arrow-schema 58.3.0", "common-base", "common-decimal", "common-error", @@ -2825,7 +2827,7 @@ dependencies = [ name = "common-time" version = "1.0.0" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "chrono", "chrono-tz", "common-error", @@ -3571,11 +3573,11 @@ dependencies = [ [[package]] name = "datafusion" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", - "arrow-schema 57.3.0", + "arrow 58.3.0", + "arrow-schema 58.3.0", "async-trait", "bytes", "bzip2", @@ -3610,7 +3612,7 @@ dependencies = [ "itertools 0.14.0", "liblzma", "log", - "object_store 0.12.5", + "object_store", "parking_lot 0.12.4", "parquet", "rand 0.9.1", @@ -3625,10 +3627,10 @@ dependencies = [ [[package]] name = "datafusion-catalog" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "async-trait", "dashmap", "datafusion-common", @@ -3642,17 +3644,17 @@ dependencies = [ "futures", "itertools 0.14.0", "log", - "object_store 0.12.5", + "object_store", "parking_lot 0.12.4", "tokio", ] [[package]] name = "datafusion-catalog-listing" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "async-trait", "datafusion-catalog", "datafusion-common", @@ -3666,17 +3668,17 @@ dependencies = [ "futures", "itertools 0.14.0", "log", - "object_store 0.12.5", + "object_store", ] [[package]] name = "datafusion-common" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ "ahash 0.8.12", - "arrow 57.3.0", - "arrow-ipc 57.3.0", + "arrow 58.3.0", + "arrow-ipc 58.3.0", "chrono", "half", "hashbrown 0.16.1", @@ -3684,7 +3686,7 @@ dependencies = [ "itertools 0.14.0", "libc", "log", - "object_store 0.12.5", + "object_store", "parquet", "paste", "recursive", @@ -3695,8 +3697,8 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ "futures", "log", @@ -3705,10 +3707,10 @@ dependencies = [ [[package]] name = "datafusion-datasource" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "async-compression", "async-trait", "bytes", @@ -3729,7 +3731,7 @@ dependencies = [ "itertools 0.14.0", "liblzma", "log", - "object_store 0.12.5", + "object_store", "rand 0.9.1", "tokio", "tokio-util", @@ -3739,11 +3741,11 @@ dependencies = [ [[package]] name = "datafusion-datasource-arrow" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", - "arrow-ipc 57.3.0", + "arrow 58.3.0", + "arrow-ipc 58.3.0", "async-trait", "bytes", "datafusion-common", @@ -3756,16 +3758,16 @@ dependencies = [ "datafusion-session", "futures", "itertools 0.14.0", - "object_store 0.12.5", + "object_store", "tokio", ] [[package]] name = "datafusion-datasource-csv" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "async-trait", "bytes", "datafusion-common", @@ -3777,17 +3779,17 @@ dependencies = [ "datafusion-physical-plan", "datafusion-session", "futures", - "object_store 0.12.5", + "object_store", "regex", "tokio", ] [[package]] name = "datafusion-datasource-json" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "async-trait", "bytes", "datafusion-common", @@ -3799,7 +3801,7 @@ dependencies = [ "datafusion-physical-plan", "datafusion-session", "futures", - "object_store 0.12.5", + "object_store", "serde_json", "tokio", "tokio-stream", @@ -3807,10 +3809,10 @@ dependencies = [ [[package]] name = "datafusion-datasource-parquet" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "async-trait", "bytes", "datafusion-common", @@ -3828,7 +3830,7 @@ dependencies = [ "futures", "itertools 0.14.0", "log", - "object_store 0.12.5", + "object_store", "parking_lot 0.12.4", "parquet", "tokio", @@ -3836,16 +3838,16 @@ dependencies = [ [[package]] name = "datafusion-doc" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" [[package]] name = "datafusion-execution" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", - "arrow-buffer 57.3.0", + "arrow 58.3.0", + "arrow-buffer 58.3.0", "async-trait", "chrono", "dashmap", @@ -3854,7 +3856,7 @@ dependencies = [ "datafusion-physical-expr-common", "futures", "log", - "object_store 0.12.5", + "object_store", "parking_lot 0.12.4", "rand 0.9.1", "tempfile", @@ -3863,10 +3865,10 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "async-trait", "chrono", "datafusion-common", @@ -3885,10 +3887,10 @@ dependencies = [ [[package]] name = "datafusion-expr-common" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "datafusion-common", "indexmap 2.13.0", "itertools 0.14.0", @@ -3897,11 +3899,11 @@ dependencies = [ [[package]] name = "datafusion-functions" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", - "arrow-buffer 57.3.0", + "arrow 58.3.0", + "arrow-buffer 58.3.0", "base64 0.22.1", "blake2", "blake3", @@ -3928,11 +3930,11 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ "ahash 0.8.12", - "arrow 57.3.0", + "arrow 58.3.0", "datafusion-common", "datafusion-doc", "datafusion-execution", @@ -3949,11 +3951,11 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate-common" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ "ahash 0.8.12", - "arrow 57.3.0", + "arrow 58.3.0", "datafusion-common", "datafusion-expr-common", "datafusion-physical-expr-common", @@ -3961,11 +3963,11 @@ dependencies = [ [[package]] name = "datafusion-functions-nested" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", - "arrow-ord 57.3.0", + "arrow 58.3.0", + "arrow-ord 58.3.0", "datafusion-common", "datafusion-doc", "datafusion-execution", @@ -3978,16 +3980,17 @@ dependencies = [ "datafusion-physical-expr-common", "hashbrown 0.16.1", "itertools 0.14.0", + "itoa", "log", "paste", ] [[package]] name = "datafusion-functions-table" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "async-trait", "datafusion-catalog", "datafusion-common", @@ -3999,10 +4002,10 @@ dependencies = [ [[package]] name = "datafusion-functions-window" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "datafusion-common", "datafusion-doc", "datafusion-expr", @@ -4016,8 +4019,8 @@ dependencies = [ [[package]] name = "datafusion-functions-window-common" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ "datafusion-common", "datafusion-physical-expr-common", @@ -4025,8 +4028,8 @@ dependencies = [ [[package]] name = "datafusion-macros" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ "datafusion-doc", "quote", @@ -4035,10 +4038,10 @@ dependencies = [ [[package]] name = "datafusion-optimizer" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "chrono", "datafusion-common", "datafusion-expr", @@ -4054,26 +4057,26 @@ dependencies = [ [[package]] name = "datafusion-orc" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3afe28e177f33d54b7841add7679176c4354934ff3db350f43c013b4f9b0bad8" +version = "0.8.0" +source = "git+https://github.com/datafusion-contrib/datafusion-orc.git?rev=73a7036a68dcc277b76d4e29d1d9a4a1fffae70c#73a7036a68dcc277b76d4e29d1d9a4a1fffae70c" dependencies = [ "async-trait", "bytes", "datafusion", "futures", "futures-util", - "object_store 0.12.5", + "object_store", "orc-rust", "tokio", ] [[package]] name = "datafusion-pg-catalog" -version = "0.15.1" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69e170232116b48e2445e29127bd4834a694d68256c338749d7362421509e889" +checksum = "6970b964fdfc8698359860880cf1b3bee0032b5dffa3d2e4785739c99c879cae" dependencies = [ + "arrow-pg", "async-trait", "datafusion", "futures", @@ -4084,11 +4087,11 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ "ahash 0.8.12", - "arrow 57.3.0", + "arrow 58.3.0", "datafusion-common", "datafusion-expr", "datafusion-expr-common", @@ -4107,10 +4110,10 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-adapter" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "datafusion-common", "datafusion-expr", "datafusion-functions", @@ -4121,11 +4124,11 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ "ahash 0.8.12", - "arrow 57.3.0", + "arrow 58.3.0", "chrono", "datafusion-common", "datafusion-expr-common", @@ -4137,10 +4140,10 @@ dependencies = [ [[package]] name = "datafusion-physical-optimizer" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "datafusion-common", "datafusion-execution", "datafusion-expr", @@ -4155,13 +4158,13 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ "ahash 0.8.12", - "arrow 57.3.0", - "arrow-ord 57.3.0", - "arrow-schema 57.3.0", + "arrow 58.3.0", + "arrow-ord 58.3.0", + "arrow-schema 58.3.0", "async-trait", "datafusion-common", "datafusion-common-runtime", @@ -4186,10 +4189,10 @@ dependencies = [ [[package]] name = "datafusion-pruning" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "datafusion-common", "datafusion-datasource", "datafusion-expr-common", @@ -4202,8 +4205,8 @@ dependencies = [ [[package]] name = "datafusion-session" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ "async-trait", "datafusion-common", @@ -4215,10 +4218,10 @@ dependencies = [ [[package]] name = "datafusion-sql" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "bigdecimal 0.4.8", "chrono", "datafusion-common", @@ -4233,8 +4236,8 @@ dependencies = [ [[package]] name = "datafusion-substrait" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ "async-recursion", "async-trait", @@ -4242,7 +4245,7 @@ dependencies = [ "datafusion", "half", "itertools 0.14.0", - "object_store 0.12.5", + "object_store", "pbjson-types", "prost 0.14.1", "substrait 0.62.2", @@ -4322,9 +4325,9 @@ dependencies = [ name = "datatypes" version = "1.0.0" dependencies = [ - "arrow 57.3.0", - "arrow-array 57.3.0", - "arrow-schema 57.3.0", + "arrow 58.3.0", + "arrow-array 58.3.0", + "arrow-schema 58.3.0", "common-base", "common-decimal", "common-error", @@ -5035,7 +5038,6 @@ dependencies = [ "common-time", "datafusion", "datafusion-expr", - "datafusion-orc", "datatypes", "futures", "object-store", @@ -5153,8 +5155,8 @@ name = "flow" version = "1.0.0" dependencies = [ "api", - "arrow 57.3.0", - "arrow-schema 57.3.0", + "arrow 58.3.0", + "arrow-schema 58.3.0", "async-recursion", "async-trait", "auth", @@ -5914,6 +5916,12 @@ dependencies = [ "foldhash 0.2.0", ] +[[package]] +name = "hashbrown" +version = "0.17.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed5909b6e89a2db4456e54cd5f673791d7eca6732202bbf2a9cc504fe2f9b84a" + [[package]] name = "hashlink" version = "0.10.0" @@ -7462,7 +7470,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -7768,9 +7776,9 @@ dependencies = [ [[package]] name = "lz4_flex" -version = "0.12.1" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98c23545df7ecf1b16c303910a69b079e8e251d60f7dd2cc9b4177f2afaf1746" +checksum = "7ef0d4ed8669f8f8826eb00dc878084aa8f253506c4fd5e8f58f5bce72ddb97e" dependencies = [ "twox-hash", ] @@ -8167,7 +8175,7 @@ version = "1.0.0" dependencies = [ "api", "aquamarine", - "arrow-schema 57.3.0", + "arrow-schema 58.3.0", "async-channel 1.9.0", "async-stream", "async-trait", @@ -8904,8 +8912,7 @@ dependencies = [ "futures", "humantime-serde", "lazy_static", - "object_store 0.12.5", - "object_store 0.13.2", + "object_store", "object_store_opendal", "opendal", "prometheus 0.14.0", @@ -8918,30 +8925,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "object_store" -version = "0.12.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbfbfff40aeccab00ec8a910b57ca8ecf4319b335c542f2edcd19dd25a1e2a00" -dependencies = [ - "async-trait", - "bytes", - "chrono", - "futures", - "http 1.3.1", - "humantime", - "itertools 0.14.0", - "parking_lot 0.12.4", - "percent-encoding", - "thiserror 2.0.17", - "tokio", - "tracing", - "url", - "walkdir", - "wasm-bindgen-futures", - "web-time", -] - [[package]] name = "object_store" version = "0.13.2" @@ -8979,7 +8962,7 @@ dependencies = [ "chrono", "futures", "mea", - "object_store 0.13.2", + "object_store", "opendal", "pin-project", "tokio", @@ -9473,8 +9456,8 @@ version = "1.0.0" dependencies = [ "ahash 0.8.12", "api", - "arrow 57.3.0", - "arrow-ipc 57.3.0", + "arrow 58.3.0", + "arrow-ipc 58.3.0", "async-stream", "async-trait", "bytes", @@ -9538,11 +9521,11 @@ dependencies = [ [[package]] name = "orc-rust" -version = "0.7.1" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8f1a357fb58dd9aab45ca21166d852f7894320d67ebeaa6d6b3fe557a9356c8" +checksum = "32b9867c4e7343218682ba11aceae1310c80735ec2beaf6124a0f8f848dad197" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "async-trait", "bytemuck", "bytes", @@ -9552,6 +9535,7 @@ dependencies = [ "flate2", "futures", "futures-util", + "log", "lz4_flex 0.11.6", "lzokay-native", "num", @@ -9755,18 +9739,17 @@ dependencies = [ [[package]] name = "parquet" -version = "57.3.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ee96b29972a257b855ff2341b37e61af5f12d6af1158b6dcdb5b31ea07bb3cb" +checksum = "5dafa7d01085b62a47dd0c1829550a0a36710ea9c4fe358a05a85477cec8a908" dependencies = [ "ahash 0.8.12", - "arrow-array 57.3.0", - "arrow-buffer 57.3.0", - "arrow-cast 57.3.0", - "arrow-data 57.3.0", - "arrow-ipc 57.3.0", - "arrow-schema 57.3.0", - "arrow-select 57.3.0", + "arrow-array 58.3.0", + "arrow-buffer 58.3.0", + "arrow-data 58.3.0", + "arrow-ipc 58.3.0", + "arrow-schema 58.3.0", + "arrow-select 58.3.0", "base64 0.22.1", "brotli", "bytes", @@ -9774,12 +9757,12 @@ dependencies = [ "flate2", "futures", "half", - "hashbrown 0.16.1", - "lz4_flex 0.12.1", + "hashbrown 0.17.1", + "lz4_flex 0.13.1", "num-bigint", "num-integer", "num-traits", - "object_store 0.12.5", + "object_store", "paste", "seq-macro", "simdutf8", @@ -10164,8 +10147,8 @@ version = "1.0.0" dependencies = [ "ahash 0.8.12", "api", - "arrow 57.3.0", - "arrow-schema 57.3.0", + "arrow 58.3.0", + "arrow-schema 58.3.0", "async-trait", "catalog", "chrono", @@ -10745,7 +10728,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" dependencies = [ "bytes", - "heck 0.4.1", + "heck 0.5.0", "itertools 0.12.1", "log", "multimap", @@ -10765,7 +10748,7 @@ version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac6c3320f9abac597dcbc668774ef006702672474aad53c6d596b62e487b40b1" dependencies = [ - "heck 0.4.1", + "heck 0.5.0", "itertools 0.14.0", "log", "multimap", @@ -11063,8 +11046,8 @@ dependencies = [ "ahash 0.8.12", "api", "arc-swap", - "arrow 57.3.0", - "arrow-schema 57.3.0", + "arrow 58.3.0", + "arrow-schema 58.3.0", "async-recursion", "async-stream", "async-trait", @@ -12037,9 +12020,9 @@ dependencies = [ [[package]] name = "rust_decimal" -version = "1.40.0" +version = "1.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61f703d19852dbf87cbc513643fa81428361eb6940f1ac14fd58155d295a3eb0" +checksum = "0c5108e3d4d903e21aac27f12ba5377b6b34f9f44b325e4894c7924169d06995" dependencies = [ "arrayvec", "borsh", @@ -12050,6 +12033,7 @@ dependencies = [ "rkyv", "serde", "serde_json", + "wasm-bindgen", ] [[package]] @@ -12594,11 +12578,11 @@ version = "1.0.0" dependencies = [ "ahash 0.8.12", "api", - "arrow 57.3.0", + "arrow 58.3.0", "arrow-flight", - "arrow-ipc 57.3.0", + "arrow-ipc 58.3.0", "arrow-pg", - "arrow-schema 57.3.0", + "arrow-schema 58.3.0", "async-trait", "auth", "axum 0.8.4", @@ -12982,7 +12966,7 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1961e2ef424c1424204d3a5d6975f934f56b6d50ff5732382d84ebf460e147f7" dependencies = [ - "heck 0.4.1", + "heck 0.5.0", "proc-macro2", "quote", "syn 2.0.117", @@ -13062,7 +13046,7 @@ name = "sql" version = "1.0.0" dependencies = [ "api", - "arrow-buffer 57.3.0", + "arrow-buffer 58.3.0", "chrono", "common-base", "common-catalog", @@ -14348,9 +14332,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.49.0" +version = "1.50.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72a2903cd7736441aac9df9d7688bd0ce48edccaadf181c3b90be801e81d3d86" +checksum = "27ad5e34374e03cfffefc301becb44e9dc3c17584f414349ebe29ed26661822d" dependencies = [ "bytes", "libc", @@ -15474,6 +15458,7 @@ dependencies = [ "cfg-if", "once_cell", "rustversion", + "serde", "wasm-bindgen-macro", "wasm-bindgen-shared", ] @@ -15680,7 +15665,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 969cd7da38..b8a6e8e70c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -100,17 +100,16 @@ rust.unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] } # See for more detaiils: https://github.com/rust-lang/cargo/issues/11329 ahash = { version = "0.8", features = ["compile-time-rng"] } aquamarine = "0.6" -arrow = { version = "57.3", features = ["prettyprint"] } -arrow-array = { version = "57.3", default-features = false, features = ["chrono-tz"] } -arrow-buffer = "57.3" -arrow-cast = "57.3" -arrow-flight = "57.3" -arrow-ipc = { version = "57.3", default-features = false, features = ["lz4", "zstd"] } -arrow-schema = { version = "57.3", features = ["serde"] } +arrow = { version = "58.3", features = ["prettyprint"] } +arrow-array = { version = "58.3", default-features = false, features = ["chrono-tz"] } +arrow-buffer = "58.3" +arrow-cast = "58.3" +arrow-flight = "58.3" +arrow-ipc = { version = "58.3", default-features = false, features = ["lz4", "zstd"] } +arrow-schema = { version = "58.3", features = ["serde"] } async-stream = "0.3" async-trait = "0.1" # Remember to update axum-extra, axum-macros when updating axum -arrow_object_store = { package = "object_store", version = "0.13.2" } axum = "0.8" axum-extra = "0.10" axum-macros = "0.5" @@ -128,21 +127,21 @@ const_format = "0.2" criterion = "0.7" crossbeam-utils = "0.8" dashmap = "6.1" -datafusion = "=52.1" -datafusion-common = "=52.1" -datafusion-datasource = "=52.1" -datafusion-expr = "=52.1" -datafusion-functions = "=52.1" -datafusion-functions-aggregate-common = "=52.1" -datafusion-functions-window-common = "=52.1" -datafusion-optimizer = "=52.1" -datafusion-orc = "0.7" -datafusion-pg-catalog = "0.15.1" -datafusion-physical-expr = "=52.1" -datafusion-physical-plan = "=52.1" -datafusion-sql = "=52.1" -datafusion-substrait = "=52.1" -datafusion_object_store = { package = "object_store", version = "0.12.5" } +datafusion = "=53.1.0" +datafusion-common = "=53.1.0" +datafusion-datasource = "=53.1.0" +datafusion-expr = "=53.1.0" +datafusion-functions = "=53.1.0" +datafusion-functions-aggregate-common = "=53.1.0" +datafusion-functions-window-common = "=53.1.0" +datafusion-optimizer = "=53.1.0" +datafusion-orc = { git = "https://github.com/datafusion-contrib/datafusion-orc.git", rev = "73a7036a68dcc277b76d4e29d1d9a4a1fffae70c" } +datafusion-pg-catalog = "0.16" +datafusion-physical-expr = "=53.1.0" +datafusion-physical-plan = "=53.1.0" +datafusion-sql = "=53.1.0" +datafusion-substrait = "=53.1.0" +datafusion_object_store = { package = "object_store", version = "0.13.2" } deadpool = "0.12" deadpool-postgres = "0.14" derive_builder = "0.20" @@ -191,7 +190,7 @@ otel-arrow-rust = { git = "https://github.com/GreptimeTeam/otel-arrow", rev = "5 "server", ] } parking_lot = "0.12" -parquet = { version = "57.3", default-features = false, features = ["arrow", "async", "object_store"] } +parquet = { version = "58.3", default-features = false, features = ["arrow", "async", "object_store"] } paste = "1.0" pin-project = "1.0" pretty_assertions = "1.4.0" @@ -337,19 +336,38 @@ git = "https://github.com/GreptimeTeam/greptime-meter.git" rev = "5618e779cf2bb4755b499c630fba4c35e91898cb" [patch.crates-io] -datafusion = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" } -datafusion-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" } -datafusion-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" } -datafusion-functions = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" } -datafusion-functions-aggregate-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" } -datafusion-functions-window-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" } -datafusion-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" } -datafusion-physical-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" } -datafusion-physical-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" } -datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" } -datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" } -datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" } -datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" } +datafusion = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-catalog = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-catalog-listing = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-common-runtime = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-datasource-arrow = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-datasource-csv = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-datasource-json = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-datasource-parquet = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-doc = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-execution = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-functions = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-functions-aggregate = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-functions-aggregate-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-functions-nested = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-functions-table = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-functions-window = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-functions-window-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-macros = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-physical-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-physical-expr-adapter = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-physical-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-physical-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-pruning = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-session = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "2aefa08a8d69c96eec2d6d6703598a009bba6e4c" } # on branch v0.61.x [profile.release] diff --git a/src/common/datasource/Cargo.toml b/src/common/datasource/Cargo.toml index 7df293d6d3..470b5371f7 100644 --- a/src/common/datasource/Cargo.toml +++ b/src/common/datasource/Cargo.toml @@ -28,11 +28,12 @@ common-runtime.workspace = true common-telemetry.workspace = true datafusion.workspace = true datafusion-datasource.workspace = true +datafusion-orc.workspace = true datatypes.workspace = true futures.workspace = true lazy_static.workspace = true object-store.workspace = true -orc-rust = { version = "0.7", default-features = false, features = ["async"] } +orc-rust = { version = "0.8", default-features = false, features = ["async"] } parquet.workspace = true paste.workspace = true regex.workspace = true @@ -45,4 +46,3 @@ url.workspace = true [dev-dependencies] common-test-util.workspace = true -datafusion-orc.workspace = true diff --git a/src/common/datasource/src/file_format/orc.rs b/src/common/datasource/src/file_format/orc.rs index 6b00b41036..0a6b90ccc7 100644 --- a/src/common/datasource/src/file_format/orc.rs +++ b/src/common/datasource/src/file_format/orc.rs @@ -15,6 +15,7 @@ use arrow_schema::Schema; use async_trait::async_trait; use bytes::Bytes; +pub use datafusion_orc::OrcSource; use futures::FutureExt; use futures::future::BoxFuture; use object_store::ObjectStore; diff --git a/src/common/datasource/src/file_format/parquet.rs b/src/common/datasource/src/file_format/parquet.rs index 9c8e8d6ce8..43b281077e 100644 --- a/src/common/datasource/src/file_format/parquet.rs +++ b/src/common/datasource/src/file_format/parquet.rs @@ -176,11 +176,14 @@ impl AsyncFileReader for LazyParquetFileReader { .map_err(|e| ParquetError::External(Box::new(e)))?; let metadata_opts = options.map(|o| o.metadata_options().clone()); + let column_index_policy = + options.map_or(PageIndexPolicy::Skip, |o| o.column_index_policy()); + let offset_index_policy = + options.map_or(PageIndexPolicy::Skip, |o| o.offset_index_policy()); let metadata_reader = ParquetMetaDataReader::new() .with_metadata_options(metadata_opts) - .with_page_index_policy(PageIndexPolicy::from( - options.is_some_and(|o| o.page_index()), - )) + .with_column_index_policy(column_index_policy) + .with_offset_index_policy(offset_index_policy) .with_prefetch_hint(self.metadata_size_hint); let metadata = metadata_reader diff --git a/src/common/datasource/src/file_format/tests.rs b/src/common/datasource/src/file_format/tests.rs index 6ef669ab7b..93ab3b4409 100644 --- a/src/common/datasource/src/file_format/tests.rs +++ b/src/common/datasource/src/file_format/tests.rs @@ -26,11 +26,11 @@ use datafusion::execution::context::TaskContext; use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::prelude::SessionContext; -use datafusion_orc::OrcSource; use futures::StreamExt; use object_store::ObjectStore; use super::FORMAT_TYPE; +use crate::file_format::orc::OrcSource; use crate::file_format::parquet::DefaultParquetFileReaderFactory; use crate::file_format::{FileFormat, Format, OrcFormat}; use crate::test_util::{basic_schema_with_time_format, scan_config, test_basic_schema, test_store}; diff --git a/src/file-engine/Cargo.toml b/src/file-engine/Cargo.toml index 8d69512fa1..6c8c9e887d 100644 --- a/src/file-engine/Cargo.toml +++ b/src/file-engine/Cargo.toml @@ -26,7 +26,6 @@ common-test-util = { workspace = true, optional = true } common-time.workspace = true datafusion.workspace = true datafusion-expr.workspace = true -datafusion-orc.workspace = true datatypes.workspace = true futures.workspace = true object-store.workspace = true diff --git a/src/file-engine/src/query/file_stream.rs b/src/file-engine/src/query/file_stream.rs index 931dfabe62..eec8f8961d 100644 --- a/src/file-engine/src/query/file_stream.rs +++ b/src/file-engine/src/query/file_stream.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use common_datasource::file_format::Format; use common_datasource::file_format::csv::CsvFormat; +use common_datasource::file_format::orc::OrcSource; use common_datasource::file_format::parquet::DefaultParquetFileReaderFactory; use datafusion::common::ToDFSchema; use datafusion::config::CsvOptions; @@ -34,7 +35,6 @@ use datafusion::physical_plan::{ use datafusion::prelude::SessionContext; use datafusion_expr::expr::Expr; use datafusion_expr::utils::conjunction; -use datafusion_orc::OrcSource; use datatypes::schema::SchemaRef; use object_store::ObjectStore; use snafu::ResultExt; diff --git a/src/mito2/src/cache/test_util.rs b/src/mito2/src/cache/test_util.rs index ef3d8e9315..c5baae3c75 100644 --- a/src/mito2/src/cache/test_util.rs +++ b/src/mito2/src/cache/test_util.rs @@ -127,7 +127,10 @@ pub(crate) fn assert_parquet_metadata_equal(x: Arc, y: Arc DataPartEncoder<'a> { fn writer_props(self) -> WriterProperties { let mut builder = WriterProperties::builder(); if let Some(row_group_size) = self.row_group_size { - builder = builder.set_max_row_group_size(row_group_size) + builder = builder.set_max_row_group_row_count(Some(row_group_size)) } let ts_col = ColumnPath::new(vec![self.timestamp_column_name]); diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index d8d1f91e3d..b27f883d5b 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -590,7 +590,7 @@ mod tests { .set_key_value_metadata(Some(vec![key_value_meta])) .set_compression(Compression::ZSTD(ZstdLevel::default())) .set_encoding(Encoding::PLAIN) - .set_max_row_group_size(write_opts.row_group_size); + .set_max_row_group_row_count(Some(write_opts.row_group_size)); let writer_props = props_builder.build(); diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 13005ff9fc..373f488f04 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -425,7 +425,7 @@ where .set_key_value_metadata(Some(vec![key_value_meta])) .set_compression(Compression::ZSTD(ZstdLevel::default())) .set_encoding(Encoding::PLAIN) - .set_max_row_group_size(opts.row_group_size) + .set_max_row_group_row_count(Some(opts.row_group_size)) .set_column_index_truncate_length(None) .set_statistics_truncate_length(None); diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index 5334d0f30a..372fef8f9a 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -43,7 +43,6 @@ uuid.workspace = true [dev-dependencies] anyhow = "1.0" -arrow_object_store.workspace = true common-telemetry.workspace = true common-test-util.workspace = true object_store_opendal.workspace = true diff --git a/src/object-store/src/compat.rs b/src/object-store/src/compat.rs index 3e46355bd1..4498f8f3be 100644 --- a/src/object-store/src/compat.rs +++ b/src/object-store/src/compat.rs @@ -23,13 +23,13 @@ use async_trait::async_trait; use bytes::Bytes; use datafusion_object_store::path::Path; use datafusion_object_store::{ - Attribute, Attributes, GetOptions, GetRange, GetResult, GetResultPayload, ListResult, - MultipartUpload, ObjectMeta, ObjectStore as ArrowObjectStore, PutMode, PutMultipartOptions, - PutOptions, PutPayload, PutResult, UploadPart, + Attribute, Attributes, CopyMode, CopyOptions, GetOptions, GetRange, GetResult, + GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore as ArrowObjectStore, + PutMode, PutMultipartOptions, PutOptions, PutPayload, PutResult, UploadPart, }; use futures::stream::BoxStream; use futures::{FutureExt, StreamExt, TryStreamExt}; -use opendal::options::CopyOptions; +use opendal::options::CopyOptions as OpendalCopyOptions; use opendal::raw::percent_decode_path; use opendal::{Buffer, Operator, OperatorInfo, Writer}; use tokio::sync::{Mutex, oneshot}; @@ -107,7 +107,7 @@ impl OpendalStore { to: &Path, if_not_exists: bool, ) -> datafusion_object_store::Result<()> { - let mut copy_options = CopyOptions::default(); + let mut copy_options = OpendalCopyOptions::default(); if if_not_exists { copy_options.if_not_exists = true; } @@ -212,22 +212,6 @@ impl ArrowObjectStore for OpendalStore { Ok(PutResult { e_tag, version }) } - async fn put_multipart( - &self, - location: &Path, - ) -> datafusion_object_store::Result> { - let decoded_location = percent_decode_path(location.as_ref()); - let writer = self - .inner - .writer_with(&decoded_location) - .concurrent(8) - .await - .map_err(|err| format_object_store_error(err, location.as_ref()))?; - let upload = OpendalMultipartUpload::new(writer, location.clone()); - - Ok(Box::new(upload)) - } - async fn put_multipart_opts( &self, location: &Path, @@ -403,33 +387,49 @@ impl ArrowObjectStore for OpendalStore { }) } - async fn get_range( + async fn get_ranges( &self, location: &Path, - range: Range, - ) -> datafusion_object_store::Result { + ranges: &[Range], + ) -> datafusion_object_store::Result> { + if ranges.is_empty() { + return Ok(Vec::new()); + } + let raw_location = percent_decode_path(location.as_ref()); let reader = self .inner - .reader_with(&raw_location) + .reader_with(raw_location.as_ref()) + .await + .map_err(|err| format_object_store_error(err, location.as_ref()))?; + let buffers = reader + .fetch(ranges.to_vec()) .await .map_err(|err| format_object_store_error(err, location.as_ref()))?; - reader - .read(range.start..range.end) - .await - .map(|buf| buf.to_bytes()) - .map_err(|err| format_object_store_error(err, location.as_ref())) + Ok(buffers.into_iter().map(|buf| buf.to_bytes()).collect()) } - async fn delete(&self, location: &Path) -> datafusion_object_store::Result<()> { - let decoded_location = percent_decode_path(location.as_ref()); - self.inner - .delete(&decoded_location) - .await - .map_err(|err| format_object_store_error(err, location.as_ref()))?; - - Ok(()) + fn delete_stream( + &self, + locations: BoxStream<'static, datafusion_object_store::Result>, + ) -> BoxStream<'static, datafusion_object_store::Result> { + let this = self.clone(); + locations + .map(move |location| { + let this = this.clone(); + async move { + let location = location?; + let decoded_location = percent_decode_path(location.as_ref()); + this.inner + .delete(&decoded_location) + .await + .map_err(|err| format_object_store_error(err, location.as_ref()))?; + Ok(location) + } + }) + .buffered(10) + .boxed() } fn list( @@ -568,28 +568,14 @@ impl ArrowObjectStore for OpendalStore { }) } - async fn copy(&self, from: &Path, to: &Path) -> datafusion_object_store::Result<()> { - self.copy_request(from, to, false).await - } - - async fn rename(&self, from: &Path, to: &Path) -> datafusion_object_store::Result<()> { - self.inner - .rename( - &percent_decode_path(from.as_ref()), - &percent_decode_path(to.as_ref()), - ) - .await - .map_err(|err| format_object_store_error(err, from.as_ref()))?; - - Ok(()) - } - - async fn copy_if_not_exists( + async fn copy_opts( &self, from: &Path, to: &Path, + options: CopyOptions, ) -> datafusion_object_store::Result<()> { - self.copy_request(from, to, true).await + let if_not_exists = options.mode == CopyMode::Create; + self.copy_request(from, to, if_not_exists).await } } @@ -731,7 +717,9 @@ mod tests { use bytes::Bytes; use datafusion_object_store::path::Path; - use datafusion_object_store::{ObjectStore as ArrowObjectStore, WriteMultipart}; + use datafusion_object_store::{ + ObjectStore as ArrowObjectStore, ObjectStoreExt, WriteMultipart, + }; use opendal::{Operator, services}; use rand::{Rng, RngCore}; @@ -1003,7 +991,7 @@ mod tests { } #[tokio::test] - async fn test_get_range_no_stat() { + async fn test_get_ranges_no_stat() { use std::sync::atomic::{AtomicUsize, Ordering}; // Create a stat counter and operator with tracking layer @@ -1022,13 +1010,17 @@ mod tests { // Reset counter after put stat_count.store(0, Ordering::SeqCst); - // Test 1: get_range should NOT call stat() - let ret = store.get_range(&location, 0..5).await.unwrap(); - assert_eq!(Bytes::from_static(b"Hello"), ret); + // Test 1: get_ranges should NOT call stat() + let range = 0..5; + let ret = store + .get_ranges(&location, std::slice::from_ref(&range)) + .await + .unwrap(); + assert_eq!(vec![Bytes::from_static(b"Hello")], ret); assert_eq!( stat_count.load(Ordering::SeqCst), 0, - "get_range should not call stat()" + "get_ranges should not call stat()" ); // Reset counter diff --git a/src/object-store/tests/object_store_test.rs b/src/object-store/tests/object_store_test.rs index 4ade4fc383..d373972473 100644 --- a/src/object-store/tests/object_store_test.rs +++ b/src/object-store/tests/object_store_test.rs @@ -16,10 +16,10 @@ use std::env; use std::sync::Arc; use anyhow::Result; -use arrow_object_store::path::Path; -use arrow_object_store::{ObjectStore as ArrowObjectStore, ObjectStoreExt}; use bytes::Bytes; use common_telemetry::info; +use datafusion_object_store::path::Path; +use datafusion_object_store::{ObjectStore as ArrowObjectStore, ObjectStoreExt}; use futures::TryStreamExt; use object_store::ObjectStore; use object_store::services::{Fs, S3}; diff --git a/src/query/src/part_sort.rs b/src/query/src/part_sort.rs index 933f0e47cf..72dd3d5d4c 100644 --- a/src/query/src/part_sort.rs +++ b/src/query/src/part_sort.rs @@ -23,16 +23,11 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use arrow::array::{ - ArrayRef, AsArray, TimestampMicrosecondArray, TimestampMillisecondArray, - TimestampNanosecondArray, TimestampSecondArray, -}; +use arrow::array::{Array, ArrayRef}; use arrow::compute::{concat, concat_batches, take_record_batch}; -use arrow_schema::{Schema, SchemaRef}; +use arrow_schema::SchemaRef; use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream}; -use common_telemetry::warn; use common_time::Timestamp; -use common_time::timestamp::TimeUnit; use datafusion::common::arrow::compute::sort_to_indices; use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion::execution::{RecordBatchStream, TaskContext}; @@ -42,16 +37,13 @@ use datafusion::physical_plan::filter_pushdown::{ }; use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, TopK, - TopKDynamicFilters, + DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, }; -use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{DataFusionError, internal_err}; -use datafusion_physical_expr::expressions::{Column, DynamicFilterPhysicalExpr, lit}; +use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit}; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; -use futures::{Stream, StreamExt}; +use futures::Stream; use itertools::Itertools; -use parking_lot::RwLock; use snafu::location; use store_api::region_engine::PartitionRange; @@ -115,10 +107,7 @@ pub struct PartSortExec { metrics: ExecutionPlanMetricsSet, partition_ranges: Vec>, properties: Arc, - /// Filter matching the state of the sort for dynamic filter pushdown. - /// If `limit` is `Some`, this will also be set and a TopK operator may be used. - /// If `limit` is `None`, this will be `None`. - filter: Option>>, + dynamic_filter: Option>, } impl PartSortExec { @@ -139,9 +128,7 @@ impl PartSortExec { properties.boundedness, )); - let filter = limit - .is_some() - .then(|| Self::create_filter(expression.expr.clone())); + let dynamic_filter = Self::new_dynamic_filter(&expression, limit); Ok(Self { expression, @@ -150,15 +137,20 @@ impl PartSortExec { metrics, partition_ranges, properties, - filter, + dynamic_filter, }) } - /// Add or reset `self.filter` to a new `TopKDynamicFilters`. - fn create_filter(expr: Arc) -> Arc> { - Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new( - DynamicFilterPhysicalExpr::new(vec![expr], lit(true)), - )))) + fn new_dynamic_filter( + expression: &PhysicalSortExpr, + limit: Option, + ) -> Option> { + limit.map(|_| { + Arc::new(DynamicFilterPhysicalExpr::new( + vec![expression.expr.clone()], + lit(true), + )) + }) } pub fn to_stream( @@ -185,7 +177,6 @@ impl PartSortExec { input_stream, self.partition_ranges[partition].clone(), partition, - self.filter.clone(), )?) as _; Ok(df_stream) @@ -276,28 +267,24 @@ impl ExecutionPlan for PartSortExec { &self, phase: FilterPushdownPhase, parent_filters: Vec>, - _config: &datafusion::config::ConfigOptions, + config: &datafusion::config::ConfigOptions, ) -> datafusion_common::Result { if !matches!(phase, FilterPushdownPhase::Post) { return FilterDescription::from_children(parent_filters, &self.children()); } let mut child = ChildFilterDescription::from_child(&parent_filters, &self.input)?; - - if let Some(filter) = &self.filter { - child = child.with_self_filter(filter.read().expr()); + if let Some(filter) = &self.dynamic_filter + && config.optimizer.enable_topk_dynamic_filter_pushdown + { + let filter: Arc = filter.clone(); + child = child.with_self_filter(filter); } - Ok(FilterDescription::new().with_child(child)) } fn reset_state(self: Arc) -> datafusion_common::Result> { - // shared dynamic filter needs to be reset - let new_filter = self - .limit - .is_some() - .then(|| Self::create_filter(self.expression.expr.clone())); - + let dynamic_filter = Self::new_dynamic_filter(&self.expression, self.limit); Ok(Arc::new(Self { expression: self.expression.clone(), limit: self.limit, @@ -305,25 +292,30 @@ impl ExecutionPlan for PartSortExec { metrics: self.metrics.clone(), partition_ranges: self.partition_ranges.clone(), properties: self.properties.clone(), - filter: new_filter, + dynamic_filter, })) } } enum PartSortBuffer { All(Vec), - /// TopK buffer with row count. - /// - /// Given this heap only keeps k element, the capacity of this buffer - /// is not accurate, and is only used for empty check. - Top(TopK, usize), +} + +enum TopKThreshold { + Null, + Value(i64), } impl PartSortBuffer { pub fn is_empty(&self) -> bool { match self { PartSortBuffer::All(v) => v.is_empty(), - PartSortBuffer::Top(_, cnt) => *cnt == 0, + } + } + + pub fn num_rows(&self) -> usize { + match self { + PartSortBuffer::All(v) => v.iter().map(|batch| batch.num_rows()).sum(), } } } @@ -343,16 +335,12 @@ struct PartSortStream { cur_part_idx: usize, evaluating_batch: Option, metrics: BaselineMetrics, - context: Arc, - root_metrics: ExecutionPlanMetricsSet, + dynamic_filter: Option>, /// Groups of ranges by primary end: (primary_end, start_idx_inclusive, end_idx_exclusive). /// Ranges in the same group must be processed together before outputting results. range_groups: Vec<(Timestamp, usize, usize)>, /// Current group being processed (index into range_groups). cur_group_idx: usize, - /// Dynamic Filter for all TopK instance, notice the `PartSortExec`/`PartSortStream`/`TopK` must share the same filter - /// so that updates from each `TopK` can be seen by others(and by the table scan operator). - filter: Option>>, } impl PartSortStream { @@ -363,33 +351,8 @@ impl PartSortStream { input: DfSendableRecordBatchStream, partition_ranges: Vec, partition: usize, - filter: Option>>, ) -> datafusion_common::Result { - let buffer = if let Some(limit) = limit { - let Some(filter) = filter.clone() else { - return internal_err!( - "TopKDynamicFilters must be provided when limit is set at {}", - snafu::location!() - ); - }; - - PartSortBuffer::Top( - TopK::try_new( - partition, - sort.schema().clone(), - vec![], - [sort.expression.clone()].into(), - limit, - context.session_config().batch_size(), - context.runtime_env(), - &sort.metrics, - filter.clone(), - )?, - 0, - ) - } else { - PartSortBuffer::All(Vec::new()) - }; + let buffer = PartSortBuffer::All(Vec::new()); // Compute range groups by primary end let descending = sort.expression.options.descending; @@ -409,11 +372,9 @@ impl PartSortStream { cur_part_idx: 0, evaluating_batch: None, metrics: BaselineMetrics::new(&sort.metrics, partition), - context, - root_metrics: sort.metrics.clone(), + dynamic_filter: sort.dynamic_filter.clone(), range_groups, cur_group_idx: 0, - filter, }) } } @@ -456,6 +417,20 @@ macro_rules! array_check_helper { }}; } +macro_rules! threshold_helper { + ($t:ty, $unit:expr, $arr:expr, $threshold_idx:expr) => {{ + let arr = $arr + .as_any() + .downcast_ref::>() + .unwrap(); + if arr.is_null($threshold_idx) { + TopKThreshold::Null + } else { + TopKThreshold::Value(arr.value($threshold_idx)) + } + }}; +} + impl PartSortStream { /// check whether the sort column's min/max value is within the current group's effective range. /// For group-based processing, data from multiple ranges with the same primary end @@ -535,95 +510,96 @@ impl PartSortStream { fn push_buffer(&mut self, batch: DfRecordBatch) -> datafusion_common::Result<()> { match &mut self.buffer { PartSortBuffer::All(v) => v.push(batch), - PartSortBuffer::Top(top, cnt) => { - *cnt += batch.num_rows(); - top.insert_batch(batch)?; - } } Ok(()) } - /// Stop read earlier when current group do not overlap with any of those next group - /// If not overlap, we can stop read further input as current top k is final - /// Use dynamic filter to evaluate the next group's primary end - fn can_stop_early(&mut self, schema: &Arc) -> datafusion_common::Result { - let topk_cnt = match &self.buffer { - PartSortBuffer::Top(_, cnt) => *cnt, - _ => return Ok(false), + fn topk_threshold( + &self, + sort_data_type: &arrow_schema::DataType, + ) -> datafusion_common::Result> { + let Some(limit) = self.limit else { + return Ok(None); }; - // not fulfill topk yet - if Some(topk_cnt) < self.limit { + + if self.buffer.num_rows() < limit { + return Ok(None); + } + + let PartSortBuffer::All(buffer) = &self.buffer; + let mut sort_columns = Vec::with_capacity(buffer.len()); + let mut opt = None; + for batch in buffer { + let sort_column = self.expression.evaluate_to_sort_column(batch)?; + opt = opt.or(sort_column.options); + sort_columns.push(sort_column.values); + } + + let sort_column = + concat(&sort_columns.iter().map(|a| a.as_ref()).collect_vec()).map_err(|e| { + DataFusionError::ArrowError( + Box::new(e), + Some(format!("Fail to concat sort columns at {}", location!())), + ) + })?; + + let indices = sort_to_indices(&sort_column, opt, Some(limit)).map_err(|e| { + DataFusionError::ArrowError( + Box::new(e), + Some(format!("Fail to sort to indices at {}", location!())), + ) + })?; + + if indices.len() < limit { + return Ok(None); + } + + let threshold_idx = indices.value(indices.len() - 1) as usize; + let threshold = downcast_ts_array!( + sort_data_type => (threshold_helper, sort_column, threshold_idx), + _ => internal_err!( + "Unsupported data type for sort column: {:?}", + sort_data_type + )?, + ); + + Ok(Some(threshold)) + } + + /// Returns true when all rows in the next group are guaranteed to be worse + /// than the current top-k threshold. + fn can_stop_before_group( + &self, + group_idx: usize, + sort_data_type: &arrow_schema::DataType, + ) -> datafusion_common::Result { + if group_idx >= self.range_groups.len() { return Ok(false); } - let next_group_primary_end = if self.cur_group_idx + 1 < self.range_groups.len() { - self.range_groups[self.cur_group_idx + 1].0 - } else { - // no next group + + let Some(threshold) = self.topk_threshold(sort_data_type)? else { return Ok(false); }; - // dyn filter is updated based on the last value of topk heap("threshold") - // it's a max-heap for a ASC TopK operator - // so can use dyn filter to prune data range - let filter = self - .filter - .as_ref() - .expect("TopKDynamicFilters must be provided when limit is set"); - let filter = filter.read().expr().current()?; - let mut ts_index = None; - // invariant: the filter must contain only the same column expr that's time index column - let filter = filter - .transform_down(|c| { - // rewrite all column's index as 0 - if let Some(column) = c.as_any().downcast_ref::() { - ts_index = Some(column.index()); - Ok(Transformed::yes( - Arc::new(Column::new(column.name(), 0)) as Arc - )) + let (_, start_idx, _) = self.range_groups[group_idx]; + let next_range = + project_partition_range_for_sort(self.partition_ranges[start_idx], sort_data_type)?; + let descending = self.expression.options.descending; + let next_primary = get_primary_end(&next_range, descending).value(); + + let can_stop = match threshold { + TopKThreshold::Null => self.expression.options.nulls_first, + TopKThreshold::Value(value) => { + if descending { + value >= next_primary } else { - Ok(Transformed::no(c)) + value < next_primary } - })? - .data; - let Some(ts_index) = ts_index else { - return Ok(false); // dyn filter is still true, cannot decide, continue read + } }; - let field = if schema.fields().len() <= ts_index { - warn!( - "Schema mismatch when evaluating dynamic filter for PartSortExec at {}, schema: {:?}, ts_index: {}", - self.partition, schema, ts_index - ); - return Ok(false); // schema mismatch, cannot decide, continue read - } else { - schema.field(ts_index) - }; - let schema = Arc::new(Schema::new(vec![field.clone()])); - // convert next_group_primary_end to array&filter, if eval to false, means no overlap, can stop early - let primary_end_array = match next_group_primary_end.unit() { - TimeUnit::Second => Arc::new(TimestampSecondArray::from(vec![ - next_group_primary_end.value(), - ])) as ArrayRef, - TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from(vec![ - next_group_primary_end.value(), - ])) as ArrayRef, - TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from(vec![ - next_group_primary_end.value(), - ])) as ArrayRef, - TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from(vec![ - next_group_primary_end.value(), - ])) as ArrayRef, - }; - let primary_end_batch = DfRecordBatch::try_new(schema, vec![primary_end_array])?; - let res = filter.evaluate(&primary_end_batch)?; - let array = res.into_array(primary_end_batch.num_rows())?; - let filter = array.as_boolean().clone(); - let overlap = filter.iter().next().flatten(); - if let Some(false) = overlap { - Ok(true) - } else { - Ok(false) - } + + Ok(can_stop) } /// Check if the given partition index is within the current group. @@ -685,17 +661,13 @@ impl PartSortStream { fn sort_buffer(&mut self) -> datafusion_common::Result { match &mut self.buffer { PartSortBuffer::All(_) => self.sort_all_buffer(), - PartSortBuffer::Top(_, _) => self.sort_top_buffer(), } } /// Internal method for sorting `All` buffer (without limit). fn sort_all_buffer(&mut self) -> datafusion_common::Result { let PartSortBuffer::All(buffer) = - std::mem::replace(&mut self.buffer, PartSortBuffer::All(Vec::new())) - else { - unreachable!("buffer type is checked before and should be All variant") - }; + std::mem::replace(&mut self.buffer, PartSortBuffer::All(Vec::new())); if buffer.is_empty() { return Ok(DfRecordBatch::new_empty(self.schema.clone())); @@ -726,23 +698,25 @@ impl PartSortStream { return Ok(DfRecordBatch::new_empty(self.schema.clone())); } - self.check_in_range( - &sort_column, - ( - indices.value(0) as usize, - indices.value(indices.len() - 1) as usize, - ), - ) - .inspect_err(|_e| { - #[cfg(debug_assertions)] - common_telemetry::error!( - "Fail to check sort column in range at {}, current_idx: {}, num_rows: {}, err: {}", - self.partition, - self.cur_part_idx, - sort_column.len(), - _e - ); - })?; + if self.limit.is_none() { + self.check_in_range( + &sort_column, + ( + indices.value(0) as usize, + indices.value(indices.len() - 1) as usize, + ), + ) + .inspect_err(|_e| { + #[cfg(debug_assertions)] + common_telemetry::error!( + "Fail to check sort column in range at {}, current_idx: {}, num_rows: {}, err: {}", + self.partition, + self.cur_part_idx, + sort_column.len(), + _e + ); + })?; + } // reserve memory for the concat input and sorted output let total_mem: usize = buffer.iter().map(|r| r.get_array_memory_size()).sum(); @@ -774,65 +748,6 @@ impl PartSortStream { Ok(sorted) } - /// Internal method for sorting `Top` buffer (with limit). - fn sort_top_buffer(&mut self) -> datafusion_common::Result { - let Some(filter) = self.filter.clone() else { - return internal_err!( - "TopKDynamicFilters must be provided when sorting with limit at {}", - snafu::location!() - ); - }; - - let new_top_buffer = TopK::try_new( - self.partition, - self.schema().clone(), - vec![], - [self.expression.clone()].into(), - self.limit.unwrap(), - self.context.session_config().batch_size(), - self.context.runtime_env(), - &self.root_metrics, - filter, - )?; - let PartSortBuffer::Top(top_k, _) = - std::mem::replace(&mut self.buffer, PartSortBuffer::Top(new_top_buffer, 0)) - else { - unreachable!("buffer type is checked before and should be Top variant") - }; - - let mut result_stream = top_k.emit()?; - let mut placeholder_ctx = std::task::Context::from_waker(futures::task::noop_waker_ref()); - let mut results = vec![]; - // according to the current implementation of `TopK`, the result stream will always be ready - loop { - match result_stream.poll_next_unpin(&mut placeholder_ctx) { - Poll::Ready(Some(batch)) => { - let batch = batch?; - results.push(batch); - } - Poll::Pending => { - #[cfg(debug_assertions)] - unreachable!("TopK result stream should always be ready") - } - Poll::Ready(None) => { - break; - } - } - } - - let concat_batch = concat_batches(&self.schema, &results).map_err(|e| { - DataFusionError::ArrowError( - Box::new(e), - Some(format!( - "Fail to concat top k result record batch when sorting at {}", - location!() - )), - ) - })?; - - Ok(concat_batch) - } - /// Sorts current buffer and returns `None` when there is nothing to emit. fn sorted_buffer_if_non_empty(&mut self) -> datafusion_common::Result> { if self.buffer.is_empty() { @@ -847,6 +762,12 @@ impl PartSortStream { } } + fn mark_dynamic_filter_complete(&self) { + if let Some(filter) = &self.dynamic_filter { + filter.mark_complete(); + } + } + /// Try to split the input batch if it contains data that exceeds the current partition range. /// /// When the input batch contains data that exceeds the current partition range, this function @@ -860,14 +781,14 @@ impl PartSortStream { /// /// Returns `None` if the input batch is empty or fully within the current partition range /// (or we're still collecting data within the same group), and `Some(batch)` when we've - /// completed a group and have sorted output. When operating in TopK (limit) mode, this + /// completed a group and have sorted output. When operating in limit mode, this /// function will not emit intermediate batches; it only prepares state for a single final /// output. fn split_batch( &mut self, batch: DfRecordBatch, ) -> datafusion_common::Result> { - if matches!(self.buffer, PartSortBuffer::Top(_, _)) { + if self.limit.is_some() { self.split_batch_topk(batch)?; return Ok(None); } @@ -875,9 +796,9 @@ impl PartSortStream { self.split_batch_all(batch) } - /// Specialized splitting logic for TopK (limit) mode. + /// Specialized splitting logic for limit mode. /// - /// We only emit once when the TopK buffer is fulfilled or when input is fully consumed. + /// We only emit once when input is fully consumed. /// When the buffer is fulfilled and we are about to enter a new group, we stop consuming /// further ranges. fn split_batch_topk(&mut self, batch: DfRecordBatch) -> datafusion_common::Result<()> { @@ -917,16 +838,12 @@ impl PartSortStream { // Check if we're still in the same group let in_same_group = self.is_in_current_group(self.cur_part_idx); - // When TopK is fulfilled and we are switching to a new group, stop consuming further ranges if possible. - // read from topk heap and determine whether we can stop earlier. - if !in_same_group && self.can_stop_early(&batch.schema())? { - self.input_complete = true; - self.evaluating_batch = None; - return Ok(()); - } - - // Transition to a new group if needed if !in_same_group { + let next_group_idx = self.cur_group_idx + 1; + if self.can_stop_before_group(next_group_idx, sort_column.data_type())? { + self.input_complete = true; + return Ok(()); + } self.advance_to_next_group(); } @@ -1027,8 +944,10 @@ impl PartSortStream { loop { if self.input_complete { if let Some(sorted_batch) = self.sorted_buffer_if_non_empty()? { + self.mark_dynamic_filter_complete(); return Poll::Ready(Some(Ok(sorted_batch))); } + self.mark_dynamic_filter_complete(); return Poll::Ready(None); } @@ -1041,8 +960,10 @@ impl PartSortStream { if self.cur_part_idx >= self.partition_ranges.len() { // All partitions processed, discard remaining data if let Some(sorted_batch) = self.sorted_buffer_if_non_empty()? { + self.mark_dynamic_filter_complete(); return Poll::Ready(Some(Ok(sorted_batch))); } + self.mark_dynamic_filter_complete(); return Poll::Ready(None); } @@ -1107,60 +1028,6 @@ mod test { use super::*; use crate::test_util::{MockInputExec, new_ts_array}; - #[tokio::test] - async fn test_can_stop_early_with_empty_topk_buffer() { - let unit = TimeUnit::Millisecond; - let schema = Arc::new(Schema::new(vec![Field::new( - "ts", - DataType::Timestamp(unit, None), - false, - )])); - - // Build a minimal PartSortExec and stream, but inject a dynamic filter that - // always evaluates to false so TopK will filter out all rows internally. - let mock_input = Arc::new(MockInputExec::new(vec![vec![]], schema.clone())); - let exec = PartSortExec::try_new( - PhysicalSortExpr { - expr: Arc::new(Column::new("ts", 0)), - options: SortOptions { - descending: true, - ..Default::default() - }, - }, - Some(3), - vec![vec![]], - mock_input.clone(), - ) - .unwrap(); - - let filter = Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new( - DynamicFilterPhysicalExpr::new(vec![], lit(false)), - )))); - - let input_stream = mock_input - .execute(0, Arc::new(TaskContext::default())) - .unwrap(); - let mut stream = PartSortStream::new( - Arc::new(TaskContext::default()), - &exec, - Some(3), - input_stream, - vec![], - 0, - Some(filter), - ) - .unwrap(); - - // Push 3 rows so the external counter reaches `limit`, while TopK keeps no rows. - let batch = DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![1, 2, 3])]) - .unwrap(); - stream.push_buffer(batch).unwrap(); - - // The TopK result buffer is empty, so we cannot determine early-stop. - // Ensure this path returns `Ok(false)` (and, importantly, does not panic). - assert!(!stream.can_stop_early(&schema).unwrap()); - } - #[ignore = "hard to gen expected data correctly here, TODO(discord9): fix it later"] #[tokio::test] async fn fuzzy_test() { @@ -2918,88 +2785,4 @@ mod test { ) .await; } - - /// First group: [0,20), data: [0, 5, 15] - /// Second group: [10, 30), data: [21, 25, 29] - /// after first group, calling early stop manually, and check if filter is updated - #[tokio::test] - async fn test_early_stop_check_update_dyn_filter() { - let unit = TimeUnit::Millisecond; - let schema = Arc::new(Schema::new(vec![Field::new( - "ts", - DataType::Timestamp(unit, None), - false, - )])); - - let mock_input = Arc::new(MockInputExec::new(vec![vec![]], schema.clone())); - let exec = PartSortExec::try_new( - PhysicalSortExpr { - expr: Arc::new(Column::new("ts", 0)), - options: SortOptions { - descending: false, - ..Default::default() - }, - }, - Some(3), - vec![vec![ - PartitionRange { - start: Timestamp::new(0, unit.into()), - end: Timestamp::new(20, unit.into()), - num_rows: 3, - identifier: 1, - }, - PartitionRange { - start: Timestamp::new(10, unit.into()), - end: Timestamp::new(30, unit.into()), - num_rows: 3, - identifier: 1, - }, - ]], - mock_input.clone(), - ) - .unwrap(); - - let filter = exec.filter.clone().unwrap(); - let input_stream = mock_input - .execute(0, Arc::new(TaskContext::default())) - .unwrap(); - let mut stream = PartSortStream::new( - Arc::new(TaskContext::default()), - &exec, - Some(3), - input_stream, - vec![], - 0, - Some(filter.clone()), - ) - .unwrap(); - - // initially, snapshot_generation is 1 - assert_eq!(filter.read().expr().snapshot_generation(), 1); - let batch = - DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![0, 5, 15])]) - .unwrap(); - stream.push_buffer(batch).unwrap(); - - // after pushing first batch, snapshot_generation is updated to 2 - assert_eq!(filter.read().expr().snapshot_generation(), 2); - assert!(!stream.can_stop_early(&schema).unwrap()); - // still two as not updated - assert_eq!(filter.read().expr().snapshot_generation(), 2); - - let _ = stream.sort_top_buffer().unwrap(); - - let batch = - DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![21, 25, 29])]) - .unwrap(); - stream.push_buffer(batch).unwrap(); - // still two as not updated - assert_eq!(filter.read().expr().snapshot_generation(), 2); - let new = stream.sort_top_buffer().unwrap(); - // still two as not updated - assert_eq!(filter.read().expr().snapshot_generation(), 2); - - // dyn filter kick in, and filter out all rows >= 15(the filter is rows<15) - assert_eq!(new.num_rows(), 0) - } } diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 93d3689b1e..b35b30968a 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -24,7 +24,7 @@ api.workspace = true arrow.workspace = true arrow-flight.workspace = true arrow-ipc.workspace = true -arrow-pg = "0.12" +arrow-pg = "0.13" arrow-schema.workspace = true async-trait.workspace = true auth.workspace = true diff --git a/src/table/src/predicate.rs b/src/table/src/predicate.rs index daf404d5d6..e559e2c296 100644 --- a/src/table/src/predicate.rs +++ b/src/table/src/predicate.rs @@ -605,7 +605,7 @@ mod tests { .unwrap(); let write_props = WriterProperties::builder() - .set_max_row_group_size(10) + .set_max_row_group_row_count(Some(10)) .build(); let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(write_props)).unwrap();