diff --git a/Cargo.lock b/Cargo.lock index 49871a94c1..a9332267f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -214,7 +214,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c" name = "api" version = "1.0.0" dependencies = [ - "arrow-schema 57.3.0", + "arrow-schema 58.3.0", "common-base", "common-decimal", "common-error", @@ -316,23 +316,23 @@ dependencies = [ [[package]] name = "arrow" -version = "57.3.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4754a624e5ae42081f464514be454b39711daae0458906dacde5f4c632f33a8" +checksum = "378530e55cd479eda3c14eb345310799717e6f76d0c332041e8487022166b471" dependencies = [ - "arrow-arith 57.3.0", - "arrow-array 57.3.0", - "arrow-buffer 57.3.0", - "arrow-cast 57.3.0", - "arrow-csv 57.3.0", - "arrow-data 57.3.0", - "arrow-ipc 57.3.0", - "arrow-json 57.3.0", - "arrow-ord 57.3.0", - "arrow-row 57.3.0", - "arrow-schema 57.3.0", - "arrow-select 57.3.0", - "arrow-string 57.3.0", + "arrow-arith 58.3.0", + "arrow-array 58.3.0", + "arrow-buffer 58.3.0", + "arrow-cast 58.3.0", + "arrow-csv 58.3.0", + "arrow-data 58.3.0", + "arrow-ipc 58.3.0", + "arrow-json 58.3.0", + "arrow-ord 58.3.0", + "arrow-row 58.3.0", + "arrow-schema 58.3.0", + "arrow-select 58.3.0", + "arrow-string 58.3.0", ] [[package]] @@ -351,14 +351,14 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "57.3.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7b3141e0ec5145a22d8694ea8b6d6f69305971c4fa1c1a13ef0195aef2d678b" +checksum = "a0ab212d2c1886e802f51c5212d78ebbcbb0bec980fff9dadc1eb8d45cd0b738" dependencies = [ - "arrow-array 57.3.0", - "arrow-buffer 57.3.0", - "arrow-data 57.3.0", - "arrow-schema 57.3.0", + "arrow-array 58.3.0", + "arrow-buffer 58.3.0", + "arrow-data 58.3.0", + "arrow-schema 58.3.0", "chrono", "num-traits", ] @@ -381,18 +381,18 @@ dependencies = [ [[package]] name = "arrow-array" -version = "57.3.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c8955af33b25f3b175ee10af580577280b4bd01f7e823d94c7cdef7cf8c9aef" +checksum = "cfd33d3e92f207444098c75b42de99d329562be0cf686b307b097cc52b4e999e" dependencies = [ "ahash 0.8.12", - "arrow-buffer 57.3.0", - "arrow-data 57.3.0", - "arrow-schema 57.3.0", + "arrow-buffer 58.3.0", + "arrow-data 58.3.0", + "arrow-schema 58.3.0", "chrono", "chrono-tz", "half", - "hashbrown 0.16.1", + "hashbrown 0.17.1", "num-complex", "num-integer", "num-traits", @@ -411,9 +411,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "57.3.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c697ddca96183182f35b3a18e50b9110b11e916d7b7799cbfd4d34662f2c56c2" +checksum = "0c6cd424c2693bcdbc150d843dc9d4d137dd2de4782ce6df491ad11a3a0416c0" dependencies = [ "bytes", "half", @@ -443,16 +443,16 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "57.3.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "646bbb821e86fd57189c10b4fcdaa941deaf4181924917b0daa92735baa6ada5" +checksum = "4c5aefb56a2c02e9e2b30746241058b85f8983f0fcff2ba0c6d09006e1cded7f" dependencies = [ - "arrow-array 57.3.0", - "arrow-buffer 57.3.0", - "arrow-data 57.3.0", - "arrow-ord 57.3.0", - "arrow-schema 57.3.0", - "arrow-select 57.3.0", + "arrow-array 58.3.0", + "arrow-buffer 58.3.0", + "arrow-data 58.3.0", + "arrow-ord 58.3.0", + "arrow-schema 58.3.0", + "arrow-select 58.3.0", "atoi", "base64 0.22.1", "chrono", @@ -480,13 +480,13 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "57.3.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8da746f4180004e3ce7b83c977daf6394d768332349d3d913998b10a120b790a" +checksum = "e94e8cf7e517657a52b91ea1263acf38c4ca62a84655d72458a3359b12ab97de" dependencies = [ - "arrow-array 57.3.0", - "arrow-cast 57.3.0", - "arrow-schema 57.3.0", + "arrow-array 58.3.0", + "arrow-cast 58.3.0", + "arrow-schema 58.3.0", "chrono", "csv", "csv-core", @@ -507,12 +507,12 @@ dependencies = [ [[package]] name = "arrow-data" -version = "57.3.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fdd994a9d28e6365aa78e15da3f3950c0fdcea6b963a12fa1c391afb637b304" +checksum = "3c88210023a2bfee1896af366309a3028fc3bcbd6515fa29a7990ee1baa08ee0" dependencies = [ - "arrow-buffer 57.3.0", - "arrow-schema 57.3.0", + "arrow-buffer 58.3.0", + "arrow-schema 58.3.0", "half", "num-integer", "num-traits", @@ -520,15 +520,15 @@ dependencies = [ [[package]] name = "arrow-flight" -version = "57.3.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58c5b083668e6230eae3eab2fc4b5fb989974c845d0aa538dde61a4327c78675" +checksum = "28abfe8bf9f124e5fc83b334af4fa58f8d0323ad25312ccb2d1da50178415704" dependencies = [ - "arrow-array 57.3.0", - "arrow-buffer 57.3.0", - "arrow-cast 57.3.0", - "arrow-ipc 57.3.0", - "arrow-schema 57.3.0", + "arrow-array 58.3.0", + "arrow-buffer 58.3.0", + "arrow-cast 58.3.0", + "arrow-ipc 58.3.0", + "arrow-schema 58.3.0", "base64 0.22.1", "bytes", "futures", @@ -555,17 +555,17 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "57.3.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abf7df950701ab528bf7c0cf7eeadc0445d03ef5d6ffc151eaae6b38a58feff1" +checksum = "238438f0834483703d88896db6fe5a7138b2230debc31b34c0336c2996e3c64f" dependencies = [ - "arrow-array 57.3.0", - "arrow-buffer 57.3.0", - "arrow-data 57.3.0", - "arrow-schema 57.3.0", - "arrow-select 57.3.0", + "arrow-array 58.3.0", + "arrow-buffer 58.3.0", + "arrow-data 58.3.0", + "arrow-schema 58.3.0", + "arrow-select 58.3.0", "flatbuffers", - "lz4_flex 0.12.1", + "lz4_flex 0.13.1", "zstd", ] @@ -593,15 +593,16 @@ dependencies = [ [[package]] name = "arrow-json" -version = "57.3.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ff8357658bedc49792b13e2e862b80df908171275f8e6e075c460da5ee4bf86" +checksum = "205ca2119e6d679d5c133c6f30e68f027738d95ed948cf77677ea69c7800036b" dependencies = [ - "arrow-array 57.3.0", - "arrow-buffer 57.3.0", - "arrow-cast 57.3.0", - "arrow-data 57.3.0", - "arrow-schema 57.3.0", + "arrow-array 58.3.0", + "arrow-buffer 58.3.0", + "arrow-cast 58.3.0", + "arrow-ord 58.3.0", + "arrow-schema 58.3.0", + "arrow-select 58.3.0", "chrono", "half", "indexmap 2.13.0", @@ -630,27 +631,28 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "57.3.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7d8f1870e03d4cbed632959498bcc84083b5a24bded52905ae1695bd29da45b" +checksum = "1bffd8fd2579286a5d63bac898159873e5094a79009940bcb42bbfce4f19f1d0" dependencies = [ - "arrow-array 57.3.0", - "arrow-buffer 57.3.0", - "arrow-data 57.3.0", - "arrow-schema 57.3.0", - "arrow-select 57.3.0", + "arrow-array 58.3.0", + "arrow-buffer 58.3.0", + "arrow-data 58.3.0", + "arrow-schema 58.3.0", + "arrow-select 58.3.0", ] [[package]] name = "arrow-pg" -version = "0.12.2" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87077429e1f81c2a024d0a62eaec0bbc101913c0ba7ce676bf90d21a4dbc79f0" +checksum = "34ec6f5d8b2025c5950e554ec2b3b4c4d6bd55b4d59b9f50c2b5eed4906c0f64" dependencies = [ - "arrow 57.3.0", - "arrow-schema 57.3.0", + "arrow 58.3.0", + "arrow-schema 58.3.0", "bytes", "chrono", + "datafusion", "futures", "pg_interval_2", "pgwire", @@ -673,14 +675,14 @@ dependencies = [ [[package]] name = "arrow-row" -version = "57.3.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18228633bad92bff92a95746bbeb16e5fc318e8382b75619dec26db79e4de4c0" +checksum = "bab5994731204603c73ba69267616c50f80780774c6bb0476f1f830625115e0c" dependencies = [ - "arrow-array 57.3.0", - "arrow-buffer 57.3.0", - "arrow-data 57.3.0", - "arrow-schema 57.3.0", + "arrow-array 58.3.0", + "arrow-buffer 58.3.0", + "arrow-data 58.3.0", + "arrow-schema 58.3.0", "half", ] @@ -692,9 +694,9 @@ checksum = "b3aa9e59c611ebc291c28582077ef25c97f1975383f1479b12f3b9ffee2ffabe" [[package]] name = "arrow-schema" -version = "57.3.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c872d36b7bf2a6a6a2b40de9156265f0242910791db366a2c17476ba8330d68" +checksum = "f633dbfdf39c039ada1bf9e34c694816eb71fbb7dc78f613993b7245e078a1ed" dependencies = [ "serde", "serde_core", @@ -717,15 +719,15 @@ dependencies = [ [[package]] name = "arrow-select" -version = "57.3.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68bf3e3efbd1278f770d67e5dc410257300b161b93baedb3aae836144edcaf4b" +checksum = "8cd065c54172ac787cf3f2f8d4107e0d3fdc26edba76fdf4f4cc170258942222" dependencies = [ "ahash 0.8.12", - "arrow-array 57.3.0", - "arrow-buffer 57.3.0", - "arrow-data 57.3.0", - "arrow-schema 57.3.0", + "arrow-array 58.3.0", + "arrow-buffer 58.3.0", + "arrow-data 58.3.0", + "arrow-schema 58.3.0", "num-traits", ] @@ -748,15 +750,15 @@ dependencies = [ [[package]] name = "arrow-string" -version = "57.3.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85e968097061b3c0e9fe3079cf2e703e487890700546b5b0647f60fca1b5a8d8" +checksum = "29dd7cda3ab9692f43a2e4acc444d760cc17b12bb6d8232ddf64e9bab7c06b42" dependencies = [ - "arrow-array 57.3.0", - "arrow-buffer 57.3.0", - "arrow-data 57.3.0", - "arrow-schema 57.3.0", - "arrow-select 57.3.0", + "arrow-array 58.3.0", + "arrow-buffer 58.3.0", + "arrow-data 58.3.0", + "arrow-schema 58.3.0", + "arrow-select 58.3.0", "memchr", "num-traits", "regex", @@ -1585,8 +1587,8 @@ name = "catalog" version = "1.0.0" dependencies = [ "api", - "arrow 57.3.0", - "arrow-schema 57.3.0", + "arrow 58.3.0", + "arrow-schema 58.3.0", "async-stream", "async-trait", "bytes", @@ -2169,7 +2171,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c" dependencies = [ "lazy_static", - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] @@ -2248,8 +2250,8 @@ dependencies = [ name = "common-datasource" version = "1.0.0" dependencies = [ - "arrow 57.3.0", - "arrow-schema 57.3.0", + "arrow 58.3.0", + "arrow-schema 58.3.0", "async-compression", "async-trait", "bytes", @@ -2355,9 +2357,9 @@ dependencies = [ "api", "approx 0.5.1", "arc-swap", - "arrow 57.3.0", - "arrow-cast 57.3.0", - "arrow-schema 57.3.0", + "arrow 58.3.0", + "arrow-cast 58.3.0", + "arrow-schema 58.3.0", "async-trait", "bincode", "catalog", @@ -2752,7 +2754,7 @@ dependencies = [ name = "common-sql" version = "1.0.0" dependencies = [ - "arrow-schema 57.3.0", + "arrow-schema 58.3.0", "common-base", "common-decimal", "common-error", @@ -2829,7 +2831,7 @@ dependencies = [ name = "common-time" version = "1.0.0" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "chrono", "chrono-tz", "common-error", @@ -3575,11 +3577,11 @@ dependencies = [ [[package]] name = "datafusion" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", - "arrow-schema 57.3.0", + "arrow 58.3.0", + "arrow-schema 58.3.0", "async-trait", "bytes", "bzip2", @@ -3614,7 +3616,7 @@ dependencies = [ "itertools 0.14.0", "liblzma", "log", - "object_store 0.12.5", + "object_store", "parking_lot 0.12.4", "parquet", "rand 0.9.1", @@ -3629,10 +3631,10 @@ dependencies = [ [[package]] name = "datafusion-catalog" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "async-trait", "dashmap", "datafusion-common", @@ -3646,17 +3648,17 @@ dependencies = [ "futures", "itertools 0.14.0", "log", - "object_store 0.12.5", + "object_store", "parking_lot 0.12.4", "tokio", ] [[package]] name = "datafusion-catalog-listing" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "async-trait", "datafusion-catalog", "datafusion-common", @@ -3670,17 +3672,17 @@ dependencies = [ "futures", "itertools 0.14.0", "log", - "object_store 0.12.5", + "object_store", ] [[package]] name = "datafusion-common" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ "ahash 0.8.12", - "arrow 57.3.0", - "arrow-ipc 57.3.0", + "arrow 58.3.0", + "arrow-ipc 58.3.0", "chrono", "half", "hashbrown 0.16.1", @@ -3688,7 +3690,7 @@ dependencies = [ "itertools 0.14.0", "libc", "log", - "object_store 0.12.5", + "object_store", "parquet", "paste", "recursive", @@ -3699,8 +3701,8 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ "futures", "log", @@ -3709,10 +3711,10 @@ dependencies = [ [[package]] name = "datafusion-datasource" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "async-compression", "async-trait", "bytes", @@ -3733,7 +3735,7 @@ dependencies = [ "itertools 0.14.0", "liblzma", "log", - "object_store 0.12.5", + "object_store", "rand 0.9.1", "tokio", "tokio-util", @@ -3743,11 +3745,11 @@ dependencies = [ [[package]] name = "datafusion-datasource-arrow" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", - "arrow-ipc 57.3.0", + "arrow 58.3.0", + "arrow-ipc 58.3.0", "async-trait", "bytes", "datafusion-common", @@ -3760,16 +3762,16 @@ dependencies = [ "datafusion-session", "futures", "itertools 0.14.0", - "object_store 0.12.5", + "object_store", "tokio", ] [[package]] name = "datafusion-datasource-csv" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "async-trait", "bytes", "datafusion-common", @@ -3781,17 +3783,17 @@ dependencies = [ "datafusion-physical-plan", "datafusion-session", "futures", - "object_store 0.12.5", + "object_store", "regex", "tokio", ] [[package]] name = "datafusion-datasource-json" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "async-trait", "bytes", "datafusion-common", @@ -3803,7 +3805,7 @@ dependencies = [ "datafusion-physical-plan", "datafusion-session", "futures", - "object_store 0.12.5", + "object_store", "serde_json", "tokio", "tokio-stream", @@ -3811,10 +3813,10 @@ dependencies = [ [[package]] name = "datafusion-datasource-parquet" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "async-trait", "bytes", "datafusion-common", @@ -3832,7 +3834,7 @@ dependencies = [ "futures", "itertools 0.14.0", "log", - "object_store 0.12.5", + "object_store", "parking_lot 0.12.4", "parquet", "tokio", @@ -3840,16 +3842,16 @@ dependencies = [ [[package]] name = "datafusion-doc" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" [[package]] name = "datafusion-execution" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", - "arrow-buffer 57.3.0", + "arrow 58.3.0", + "arrow-buffer 58.3.0", "async-trait", "chrono", "dashmap", @@ -3858,7 +3860,7 @@ dependencies = [ "datafusion-physical-expr-common", "futures", "log", - "object_store 0.12.5", + "object_store", "parking_lot 0.12.4", "rand 0.9.1", "tempfile", @@ -3867,10 +3869,10 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "async-trait", "chrono", "datafusion-common", @@ -3889,10 +3891,10 @@ dependencies = [ [[package]] name = "datafusion-expr-common" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "datafusion-common", "indexmap 2.13.0", "itertools 0.14.0", @@ -3901,11 +3903,11 @@ dependencies = [ [[package]] name = "datafusion-functions" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", - "arrow-buffer 57.3.0", + "arrow 58.3.0", + "arrow-buffer 58.3.0", "base64 0.22.1", "blake2", "blake3", @@ -3932,11 +3934,11 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ "ahash 0.8.12", - "arrow 57.3.0", + "arrow 58.3.0", "datafusion-common", "datafusion-doc", "datafusion-execution", @@ -3953,11 +3955,11 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate-common" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ "ahash 0.8.12", - "arrow 57.3.0", + "arrow 58.3.0", "datafusion-common", "datafusion-expr-common", "datafusion-physical-expr-common", @@ -3965,11 +3967,11 @@ dependencies = [ [[package]] name = "datafusion-functions-nested" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", - "arrow-ord 57.3.0", + "arrow 58.3.0", + "arrow-ord 58.3.0", "datafusion-common", "datafusion-doc", "datafusion-execution", @@ -3982,16 +3984,17 @@ dependencies = [ "datafusion-physical-expr-common", "hashbrown 0.16.1", "itertools 0.14.0", + "itoa", "log", "paste", ] [[package]] name = "datafusion-functions-table" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "async-trait", "datafusion-catalog", "datafusion-common", @@ -4003,10 +4006,10 @@ dependencies = [ [[package]] name = "datafusion-functions-window" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "datafusion-common", "datafusion-doc", "datafusion-expr", @@ -4020,8 +4023,8 @@ dependencies = [ [[package]] name = "datafusion-functions-window-common" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ "datafusion-common", "datafusion-physical-expr-common", @@ -4029,8 +4032,8 @@ dependencies = [ [[package]] name = "datafusion-macros" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ "datafusion-doc", "quote", @@ -4039,10 +4042,10 @@ dependencies = [ [[package]] name = "datafusion-optimizer" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "chrono", "datafusion-common", "datafusion-expr", @@ -4058,26 +4061,26 @@ dependencies = [ [[package]] name = "datafusion-orc" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3afe28e177f33d54b7841add7679176c4354934ff3db350f43c013b4f9b0bad8" +version = "0.8.0" +source = "git+https://github.com/datafusion-contrib/datafusion-orc.git?rev=6c07fa282dc8d62db2aa4ded06ab55485efc811a#6c07fa282dc8d62db2aa4ded06ab55485efc811a" dependencies = [ "async-trait", "bytes", "datafusion", "futures", "futures-util", - "object_store 0.12.5", + "object_store", "orc-rust", "tokio", ] [[package]] name = "datafusion-pg-catalog" -version = "0.15.1" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69e170232116b48e2445e29127bd4834a694d68256c338749d7362421509e889" +checksum = "6970b964fdfc8698359860880cf1b3bee0032b5dffa3d2e4785739c99c879cae" dependencies = [ + "arrow-pg", "async-trait", "datafusion", "futures", @@ -4088,11 +4091,11 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ "ahash 0.8.12", - "arrow 57.3.0", + "arrow 58.3.0", "datafusion-common", "datafusion-expr", "datafusion-expr-common", @@ -4111,10 +4114,10 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-adapter" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "datafusion-common", "datafusion-expr", "datafusion-functions", @@ -4125,11 +4128,11 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ "ahash 0.8.12", - "arrow 57.3.0", + "arrow 58.3.0", "chrono", "datafusion-common", "datafusion-expr-common", @@ -4141,10 +4144,10 @@ dependencies = [ [[package]] name = "datafusion-physical-optimizer" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "datafusion-common", "datafusion-execution", "datafusion-expr", @@ -4159,13 +4162,13 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ "ahash 0.8.12", - "arrow 57.3.0", - "arrow-ord 57.3.0", - "arrow-schema 57.3.0", + "arrow 58.3.0", + "arrow-ord 58.3.0", + "arrow-schema 58.3.0", "async-trait", "datafusion-common", "datafusion-common-runtime", @@ -4190,10 +4193,10 @@ dependencies = [ [[package]] name = "datafusion-proto" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "chrono", "datafusion-catalog", "datafusion-catalog-listing", @@ -4210,27 +4213,27 @@ dependencies = [ "datafusion-physical-expr-common", "datafusion-physical-plan", "datafusion-proto-common", - "object_store 0.12.5", + "object_store", "prost 0.14.1", "rand 0.9.1", ] [[package]] name = "datafusion-proto-common" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "datafusion-common", "prost 0.14.1", ] [[package]] name = "datafusion-pruning" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "datafusion-common", "datafusion-datasource", "datafusion-expr-common", @@ -4243,8 +4246,8 @@ dependencies = [ [[package]] name = "datafusion-session" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ "async-trait", "datafusion-common", @@ -4256,10 +4259,10 @@ dependencies = [ [[package]] name = "datafusion-sql" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "bigdecimal 0.4.8", "chrono", "datafusion-common", @@ -4274,8 +4277,8 @@ dependencies = [ [[package]] name = "datafusion-substrait" -version = "52.1.0" -source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=02b82535e0160c4545667f36a03e1ff9d1d2e51f#02b82535e0160c4545667f36a03e1ff9d1d2e51f" +version = "53.1.0" +source = "git+https://github.com/GreptimeTeam/datafusion.git?rev=9c1ed8d9242408aad0a6d444c7c339bcb62f9be4#9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" dependencies = [ "async-recursion", "async-trait", @@ -4283,7 +4286,7 @@ dependencies = [ "datafusion", "half", "itertools 0.14.0", - "object_store 0.12.5", + "object_store", "pbjson-types", "prost 0.14.1", "substrait 0.62.2", @@ -4363,9 +4366,9 @@ dependencies = [ name = "datatypes" version = "1.0.0" dependencies = [ - "arrow 57.3.0", - "arrow-array 57.3.0", - "arrow-schema 57.3.0", + "arrow 58.3.0", + "arrow-array 58.3.0", + "arrow-schema 58.3.0", "common-base", "common-decimal", "common-error", @@ -5076,7 +5079,6 @@ dependencies = [ "common-time", "datafusion", "datafusion-expr", - "datafusion-orc", "datatypes", "futures", "object-store", @@ -5194,8 +5196,8 @@ name = "flow" version = "1.0.0" dependencies = [ "api", - "arrow 57.3.0", - "arrow-schema 57.3.0", + "arrow 58.3.0", + "arrow-schema 58.3.0", "async-recursion", "async-trait", "auth", @@ -5955,6 +5957,12 @@ dependencies = [ "foldhash 0.2.0", ] +[[package]] +name = "hashbrown" +version = "0.17.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed5909b6e89a2db4456e54cd5f673791d7eca6732202bbf2a9cc504fe2f9b84a" + [[package]] name = "hashlink" version = "0.10.0" @@ -7503,7 +7511,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -7809,9 +7817,9 @@ dependencies = [ [[package]] name = "lz4_flex" -version = "0.12.1" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98c23545df7ecf1b16c303910a69b079e8e251d60f7dd2cc9b4177f2afaf1746" +checksum = "7ef0d4ed8669f8f8826eb00dc878084aa8f253506c4fd5e8f58f5bce72ddb97e" dependencies = [ "twox-hash", ] @@ -8208,7 +8216,7 @@ version = "1.0.0" dependencies = [ "api", "aquamarine", - "arrow-schema 57.3.0", + "arrow-schema 58.3.0", "async-channel 1.9.0", "async-stream", "async-trait", @@ -8945,8 +8953,7 @@ dependencies = [ "futures", "humantime-serde", "lazy_static", - "object_store 0.12.5", - "object_store 0.13.2", + "object_store", "object_store_opendal", "opendal", "prometheus 0.14.0", @@ -8959,30 +8966,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "object_store" -version = "0.12.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbfbfff40aeccab00ec8a910b57ca8ecf4319b335c542f2edcd19dd25a1e2a00" -dependencies = [ - "async-trait", - "bytes", - "chrono", - "futures", - "http 1.3.1", - "humantime", - "itertools 0.14.0", - "parking_lot 0.12.4", - "percent-encoding", - "thiserror 2.0.17", - "tokio", - "tracing", - "url", - "walkdir", - "wasm-bindgen-futures", - "web-time", -] - [[package]] name = "object_store" version = "0.13.2" @@ -9020,7 +9003,7 @@ dependencies = [ "chrono", "futures", "mea", - "object_store 0.13.2", + "object_store", "opendal", "pin-project", "tokio", @@ -9514,8 +9497,8 @@ version = "1.0.0" dependencies = [ "ahash 0.8.12", "api", - "arrow 57.3.0", - "arrow-ipc 57.3.0", + "arrow 58.3.0", + "arrow-ipc 58.3.0", "async-stream", "async-trait", "bytes", @@ -9579,11 +9562,11 @@ dependencies = [ [[package]] name = "orc-rust" -version = "0.7.1" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8f1a357fb58dd9aab45ca21166d852f7894320d67ebeaa6d6b3fe557a9356c8" +checksum = "32b9867c4e7343218682ba11aceae1310c80735ec2beaf6124a0f8f848dad197" dependencies = [ - "arrow 57.3.0", + "arrow 58.3.0", "async-trait", "bytemuck", "bytes", @@ -9593,6 +9576,7 @@ dependencies = [ "flate2", "futures", "futures-util", + "log", "lz4_flex 0.11.6", "lzokay-native", "num", @@ -9796,18 +9780,17 @@ dependencies = [ [[package]] name = "parquet" -version = "57.3.0" +version = "58.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ee96b29972a257b855ff2341b37e61af5f12d6af1158b6dcdb5b31ea07bb3cb" +checksum = "5dafa7d01085b62a47dd0c1829550a0a36710ea9c4fe358a05a85477cec8a908" dependencies = [ "ahash 0.8.12", - "arrow-array 57.3.0", - "arrow-buffer 57.3.0", - "arrow-cast 57.3.0", - "arrow-data 57.3.0", - "arrow-ipc 57.3.0", - "arrow-schema 57.3.0", - "arrow-select 57.3.0", + "arrow-array 58.3.0", + "arrow-buffer 58.3.0", + "arrow-data 58.3.0", + "arrow-ipc 58.3.0", + "arrow-schema 58.3.0", + "arrow-select 58.3.0", "base64 0.22.1", "brotli", "bytes", @@ -9815,12 +9798,12 @@ dependencies = [ "flate2", "futures", "half", - "hashbrown 0.16.1", - "lz4_flex 0.12.1", + "hashbrown 0.17.1", + "lz4_flex 0.13.1", "num-bigint", "num-integer", "num-traits", - "object_store 0.12.5", + "object_store", "paste", "seq-macro", "simdutf8", @@ -10205,8 +10188,8 @@ version = "1.0.0" dependencies = [ "ahash 0.8.12", "api", - "arrow 57.3.0", - "arrow-schema 57.3.0", + "arrow 58.3.0", + "arrow-schema 58.3.0", "async-trait", "catalog", "chrono", @@ -10807,7 +10790,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac6c3320f9abac597dcbc668774ef006702672474aad53c6d596b62e487b40b1" dependencies = [ "heck 0.5.0", - "itertools 0.11.0", + "itertools 0.14.0", "log", "multimap", "once_cell", @@ -10855,7 +10838,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.11.0", + "itertools 0.14.0", "proc-macro2", "quote", "syn 2.0.117", @@ -10868,7 +10851,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" dependencies = [ "anyhow", - "itertools 0.11.0", + "itertools 0.14.0", "proc-macro2", "quote", "syn 2.0.117", @@ -11104,8 +11087,8 @@ dependencies = [ "ahash 0.8.12", "api", "arc-swap", - "arrow 57.3.0", - "arrow-schema 57.3.0", + "arrow 58.3.0", + "arrow-schema 58.3.0", "async-recursion", "async-stream", "async-trait", @@ -12079,9 +12062,9 @@ dependencies = [ [[package]] name = "rust_decimal" -version = "1.40.0" +version = "1.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61f703d19852dbf87cbc513643fa81428361eb6940f1ac14fd58155d295a3eb0" +checksum = "0c5108e3d4d903e21aac27f12ba5377b6b34f9f44b325e4894c7924169d06995" dependencies = [ "arrayvec", "borsh", @@ -12092,6 +12075,7 @@ dependencies = [ "rkyv", "serde", "serde_json", + "wasm-bindgen", ] [[package]] @@ -12636,11 +12620,11 @@ version = "1.0.0" dependencies = [ "ahash 0.8.12", "api", - "arrow 57.3.0", + "arrow 58.3.0", "arrow-flight", - "arrow-ipc 57.3.0", + "arrow-ipc 58.3.0", "arrow-pg", - "arrow-schema 57.3.0", + "arrow-schema 58.3.0", "async-trait", "auth", "axum 0.8.4", @@ -13105,7 +13089,7 @@ name = "sql" version = "1.0.0" dependencies = [ "api", - "arrow-buffer 57.3.0", + "arrow-buffer 58.3.0", "chrono", "common-base", "common-catalog", @@ -14392,9 +14376,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.49.0" +version = "1.50.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72a2903cd7736441aac9df9d7688bd0ce48edccaadf181c3b90be801e81d3d86" +checksum = "27ad5e34374e03cfffefc301becb44e9dc3c17584f414349ebe29ed26661822d" dependencies = [ "bytes", "libc", @@ -15518,6 +15502,7 @@ dependencies = [ "cfg-if", "once_cell", "rustversion", + "serde", "wasm-bindgen-macro", "wasm-bindgen-shared", ] @@ -15724,7 +15709,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 6778e98c61..20b2d983c6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -100,17 +100,16 @@ rust.unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] } # See for more detaiils: https://github.com/rust-lang/cargo/issues/11329 ahash = { version = "0.8", features = ["compile-time-rng"] } aquamarine = "0.6" -arrow = { version = "57.3", features = ["prettyprint"] } -arrow-array = { version = "57.3", default-features = false, features = ["chrono-tz"] } -arrow-buffer = "57.3" -arrow-cast = "57.3" -arrow-flight = "57.3" -arrow-ipc = { version = "57.3", default-features = false, features = ["lz4", "zstd"] } -arrow-schema = { version = "57.3", features = ["serde"] } +arrow = { version = "58.3", features = ["prettyprint"] } +arrow-array = { version = "58.3", default-features = false, features = ["chrono-tz"] } +arrow-buffer = "58.3" +arrow-cast = "58.3" +arrow-flight = "58.3" +arrow-ipc = { version = "58.3", default-features = false, features = ["lz4", "zstd"] } +arrow-schema = { version = "58.3", features = ["serde"] } async-stream = "0.3" async-trait = "0.1" # Remember to update axum-extra, axum-macros when updating axum -arrow_object_store = { package = "object_store", version = "0.13.2" } axum = "0.8" axum-extra = "0.10" axum-macros = "0.5" @@ -128,23 +127,23 @@ const_format = "0.2" criterion = "0.7" crossbeam-utils = "0.8" dashmap = "6.1" -datafusion = "=52.1" -datafusion-common = "=52.1" -datafusion-datasource = "=52.1" -datafusion-expr = "=52.1" -datafusion-expr-common = "=52.1" -datafusion-functions = "=52.1" -datafusion-functions-aggregate-common = "=52.1" -datafusion-functions-window-common = "=52.1" -datafusion-optimizer = "=52.1" -datafusion-orc = "0.7" -datafusion-pg-catalog = "0.15.1" -datafusion-physical-expr = "=52.1" -datafusion-physical-plan = "=52.1" -datafusion-proto = "=52.1" -datafusion-sql = "=52.1" -datafusion-substrait = "=52.1" -datafusion_object_store = { package = "object_store", version = "0.12.5" } +datafusion = "=53.1.0" +datafusion-common = "=53.1.0" +datafusion-datasource = "=53.1.0" +datafusion-expr = "=53.1.0" +datafusion-expr-common = "=53.1.0" +datafusion-functions = "=53.1.0" +datafusion-functions-aggregate-common = "=53.1.0" +datafusion-functions-window-common = "=53.1.0" +datafusion-optimizer = "=53.1.0" +datafusion-orc = { git = "https://github.com/datafusion-contrib/datafusion-orc.git", rev = "6c07fa282dc8d62db2aa4ded06ab55485efc811a" } +datafusion-pg-catalog = "0.16" +datafusion-physical-expr = "=53.1.0" +datafusion-physical-plan = "=53.1.0" +datafusion-proto = "=53.1.0" +datafusion-sql = "=53.1.0" +datafusion-substrait = "=53.1.0" +datafusion_object_store = { package = "object_store", version = "0.13.2" } deadpool = "0.12" deadpool-postgres = "0.14" derive_builder = "0.20" @@ -193,7 +192,7 @@ otel-arrow-rust = { git = "https://github.com/GreptimeTeam/otel-arrow", rev = "5 "server", ] } parking_lot = "0.12" -parquet = { version = "57.3", default-features = false, features = ["arrow", "async", "object_store"] } +parquet = { version = "58.3", default-features = false, features = ["arrow", "async", "object_store"] } paste = "1.0" pin-project = "1.0" pretty_assertions = "1.4.0" @@ -339,21 +338,21 @@ git = "https://github.com/GreptimeTeam/greptime-meter.git" rev = "5618e779cf2bb4755b499c630fba4c35e91898cb" [patch.crates-io] -datafusion = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" } -datafusion-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" } -datafusion-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" } -datafusion-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" } -datafusion-functions = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" } -datafusion-functions-aggregate-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" } -datafusion-functions-window-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" } -datafusion-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" } -datafusion-physical-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" } -datafusion-physical-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" } -datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" } -datafusion-proto = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" } -datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" } -datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" } -datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" } +datafusion = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-functions = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-functions-aggregate-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-functions-window-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-physical-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-physical-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-proto = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } +datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" } sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "2aefa08a8d69c96eec2d6d6703598a009bba6e4c" } # on branch v0.61.x [profile.release] diff --git a/src/common/datasource/Cargo.toml b/src/common/datasource/Cargo.toml index 7df293d6d3..470b5371f7 100644 --- a/src/common/datasource/Cargo.toml +++ b/src/common/datasource/Cargo.toml @@ -28,11 +28,12 @@ common-runtime.workspace = true common-telemetry.workspace = true datafusion.workspace = true datafusion-datasource.workspace = true +datafusion-orc.workspace = true datatypes.workspace = true futures.workspace = true lazy_static.workspace = true object-store.workspace = true -orc-rust = { version = "0.7", default-features = false, features = ["async"] } +orc-rust = { version = "0.8", default-features = false, features = ["async"] } parquet.workspace = true paste.workspace = true regex.workspace = true @@ -45,4 +46,3 @@ url.workspace = true [dev-dependencies] common-test-util.workspace = true -datafusion-orc.workspace = true diff --git a/src/common/datasource/src/file_format/orc.rs b/src/common/datasource/src/file_format/orc.rs index 6b00b41036..0a6b90ccc7 100644 --- a/src/common/datasource/src/file_format/orc.rs +++ b/src/common/datasource/src/file_format/orc.rs @@ -15,6 +15,7 @@ use arrow_schema::Schema; use async_trait::async_trait; use bytes::Bytes; +pub use datafusion_orc::OrcSource; use futures::FutureExt; use futures::future::BoxFuture; use object_store::ObjectStore; diff --git a/src/common/datasource/src/file_format/parquet.rs b/src/common/datasource/src/file_format/parquet.rs index 9c8e8d6ce8..43b281077e 100644 --- a/src/common/datasource/src/file_format/parquet.rs +++ b/src/common/datasource/src/file_format/parquet.rs @@ -176,11 +176,14 @@ impl AsyncFileReader for LazyParquetFileReader { .map_err(|e| ParquetError::External(Box::new(e)))?; let metadata_opts = options.map(|o| o.metadata_options().clone()); + let column_index_policy = + options.map_or(PageIndexPolicy::Skip, |o| o.column_index_policy()); + let offset_index_policy = + options.map_or(PageIndexPolicy::Skip, |o| o.offset_index_policy()); let metadata_reader = ParquetMetaDataReader::new() .with_metadata_options(metadata_opts) - .with_page_index_policy(PageIndexPolicy::from( - options.is_some_and(|o| o.page_index()), - )) + .with_column_index_policy(column_index_policy) + .with_offset_index_policy(offset_index_policy) .with_prefetch_hint(self.metadata_size_hint); let metadata = metadata_reader diff --git a/src/common/datasource/src/file_format/tests.rs b/src/common/datasource/src/file_format/tests.rs index 6ef669ab7b..93ab3b4409 100644 --- a/src/common/datasource/src/file_format/tests.rs +++ b/src/common/datasource/src/file_format/tests.rs @@ -26,11 +26,11 @@ use datafusion::execution::context::TaskContext; use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::prelude::SessionContext; -use datafusion_orc::OrcSource; use futures::StreamExt; use object_store::ObjectStore; use super::FORMAT_TYPE; +use crate::file_format::orc::OrcSource; use crate::file_format::parquet::DefaultParquetFileReaderFactory; use crate::file_format::{FileFormat, Format, OrcFormat}; use crate::test_util::{basic_schema_with_time_format, scan_config, test_basic_schema, test_store}; diff --git a/src/file-engine/Cargo.toml b/src/file-engine/Cargo.toml index 8d69512fa1..6c8c9e887d 100644 --- a/src/file-engine/Cargo.toml +++ b/src/file-engine/Cargo.toml @@ -26,7 +26,6 @@ common-test-util = { workspace = true, optional = true } common-time.workspace = true datafusion.workspace = true datafusion-expr.workspace = true -datafusion-orc.workspace = true datatypes.workspace = true futures.workspace = true object-store.workspace = true diff --git a/src/file-engine/src/query/file_stream.rs b/src/file-engine/src/query/file_stream.rs index 931dfabe62..eec8f8961d 100644 --- a/src/file-engine/src/query/file_stream.rs +++ b/src/file-engine/src/query/file_stream.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use common_datasource::file_format::Format; use common_datasource::file_format::csv::CsvFormat; +use common_datasource::file_format::orc::OrcSource; use common_datasource::file_format::parquet::DefaultParquetFileReaderFactory; use datafusion::common::ToDFSchema; use datafusion::config::CsvOptions; @@ -34,7 +35,6 @@ use datafusion::physical_plan::{ use datafusion::prelude::SessionContext; use datafusion_expr::expr::Expr; use datafusion_expr::utils::conjunction; -use datafusion_orc::OrcSource; use datatypes::schema::SchemaRef; use object_store::ObjectStore; use snafu::ResultExt; diff --git a/src/mito2/src/cache/test_util.rs b/src/mito2/src/cache/test_util.rs index ef3d8e9315..c5baae3c75 100644 --- a/src/mito2/src/cache/test_util.rs +++ b/src/mito2/src/cache/test_util.rs @@ -127,7 +127,10 @@ pub(crate) fn assert_parquet_metadata_equal(x: Arc, y: Arc DataPartEncoder<'a> { fn writer_props(self) -> WriterProperties { let mut builder = WriterProperties::builder(); if let Some(row_group_size) = self.row_group_size { - builder = builder.set_max_row_group_size(row_group_size) + builder = builder.set_max_row_group_row_count(Some(row_group_size)) } let ts_col = ColumnPath::new(vec![self.timestamp_column_name]); diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index cd22d2e551..4f4387fda6 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -590,7 +590,7 @@ mod tests { .set_key_value_metadata(Some(vec![key_value_meta])) .set_compression(Compression::ZSTD(ZstdLevel::default())) .set_encoding(Encoding::PLAIN) - .set_max_row_group_size(write_opts.row_group_size); + .set_max_row_group_row_count(Some(write_opts.row_group_size)); let writer_props = props_builder.build(); diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 13005ff9fc..373f488f04 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -425,7 +425,7 @@ where .set_key_value_metadata(Some(vec![key_value_meta])) .set_compression(Compression::ZSTD(ZstdLevel::default())) .set_encoding(Encoding::PLAIN) - .set_max_row_group_size(opts.row_group_size) + .set_max_row_group_row_count(Some(opts.row_group_size)) .set_column_index_truncate_length(None) .set_statistics_truncate_length(None); diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index 5334d0f30a..372fef8f9a 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -43,7 +43,6 @@ uuid.workspace = true [dev-dependencies] anyhow = "1.0" -arrow_object_store.workspace = true common-telemetry.workspace = true common-test-util.workspace = true object_store_opendal.workspace = true diff --git a/src/object-store/src/compat.rs b/src/object-store/src/compat.rs index 3e46355bd1..4498f8f3be 100644 --- a/src/object-store/src/compat.rs +++ b/src/object-store/src/compat.rs @@ -23,13 +23,13 @@ use async_trait::async_trait; use bytes::Bytes; use datafusion_object_store::path::Path; use datafusion_object_store::{ - Attribute, Attributes, GetOptions, GetRange, GetResult, GetResultPayload, ListResult, - MultipartUpload, ObjectMeta, ObjectStore as ArrowObjectStore, PutMode, PutMultipartOptions, - PutOptions, PutPayload, PutResult, UploadPart, + Attribute, Attributes, CopyMode, CopyOptions, GetOptions, GetRange, GetResult, + GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore as ArrowObjectStore, + PutMode, PutMultipartOptions, PutOptions, PutPayload, PutResult, UploadPart, }; use futures::stream::BoxStream; use futures::{FutureExt, StreamExt, TryStreamExt}; -use opendal::options::CopyOptions; +use opendal::options::CopyOptions as OpendalCopyOptions; use opendal::raw::percent_decode_path; use opendal::{Buffer, Operator, OperatorInfo, Writer}; use tokio::sync::{Mutex, oneshot}; @@ -107,7 +107,7 @@ impl OpendalStore { to: &Path, if_not_exists: bool, ) -> datafusion_object_store::Result<()> { - let mut copy_options = CopyOptions::default(); + let mut copy_options = OpendalCopyOptions::default(); if if_not_exists { copy_options.if_not_exists = true; } @@ -212,22 +212,6 @@ impl ArrowObjectStore for OpendalStore { Ok(PutResult { e_tag, version }) } - async fn put_multipart( - &self, - location: &Path, - ) -> datafusion_object_store::Result> { - let decoded_location = percent_decode_path(location.as_ref()); - let writer = self - .inner - .writer_with(&decoded_location) - .concurrent(8) - .await - .map_err(|err| format_object_store_error(err, location.as_ref()))?; - let upload = OpendalMultipartUpload::new(writer, location.clone()); - - Ok(Box::new(upload)) - } - async fn put_multipart_opts( &self, location: &Path, @@ -403,33 +387,49 @@ impl ArrowObjectStore for OpendalStore { }) } - async fn get_range( + async fn get_ranges( &self, location: &Path, - range: Range, - ) -> datafusion_object_store::Result { + ranges: &[Range], + ) -> datafusion_object_store::Result> { + if ranges.is_empty() { + return Ok(Vec::new()); + } + let raw_location = percent_decode_path(location.as_ref()); let reader = self .inner - .reader_with(&raw_location) + .reader_with(raw_location.as_ref()) + .await + .map_err(|err| format_object_store_error(err, location.as_ref()))?; + let buffers = reader + .fetch(ranges.to_vec()) .await .map_err(|err| format_object_store_error(err, location.as_ref()))?; - reader - .read(range.start..range.end) - .await - .map(|buf| buf.to_bytes()) - .map_err(|err| format_object_store_error(err, location.as_ref())) + Ok(buffers.into_iter().map(|buf| buf.to_bytes()).collect()) } - async fn delete(&self, location: &Path) -> datafusion_object_store::Result<()> { - let decoded_location = percent_decode_path(location.as_ref()); - self.inner - .delete(&decoded_location) - .await - .map_err(|err| format_object_store_error(err, location.as_ref()))?; - - Ok(()) + fn delete_stream( + &self, + locations: BoxStream<'static, datafusion_object_store::Result>, + ) -> BoxStream<'static, datafusion_object_store::Result> { + let this = self.clone(); + locations + .map(move |location| { + let this = this.clone(); + async move { + let location = location?; + let decoded_location = percent_decode_path(location.as_ref()); + this.inner + .delete(&decoded_location) + .await + .map_err(|err| format_object_store_error(err, location.as_ref()))?; + Ok(location) + } + }) + .buffered(10) + .boxed() } fn list( @@ -568,28 +568,14 @@ impl ArrowObjectStore for OpendalStore { }) } - async fn copy(&self, from: &Path, to: &Path) -> datafusion_object_store::Result<()> { - self.copy_request(from, to, false).await - } - - async fn rename(&self, from: &Path, to: &Path) -> datafusion_object_store::Result<()> { - self.inner - .rename( - &percent_decode_path(from.as_ref()), - &percent_decode_path(to.as_ref()), - ) - .await - .map_err(|err| format_object_store_error(err, from.as_ref()))?; - - Ok(()) - } - - async fn copy_if_not_exists( + async fn copy_opts( &self, from: &Path, to: &Path, + options: CopyOptions, ) -> datafusion_object_store::Result<()> { - self.copy_request(from, to, true).await + let if_not_exists = options.mode == CopyMode::Create; + self.copy_request(from, to, if_not_exists).await } } @@ -731,7 +717,9 @@ mod tests { use bytes::Bytes; use datafusion_object_store::path::Path; - use datafusion_object_store::{ObjectStore as ArrowObjectStore, WriteMultipart}; + use datafusion_object_store::{ + ObjectStore as ArrowObjectStore, ObjectStoreExt, WriteMultipart, + }; use opendal::{Operator, services}; use rand::{Rng, RngCore}; @@ -1003,7 +991,7 @@ mod tests { } #[tokio::test] - async fn test_get_range_no_stat() { + async fn test_get_ranges_no_stat() { use std::sync::atomic::{AtomicUsize, Ordering}; // Create a stat counter and operator with tracking layer @@ -1022,13 +1010,17 @@ mod tests { // Reset counter after put stat_count.store(0, Ordering::SeqCst); - // Test 1: get_range should NOT call stat() - let ret = store.get_range(&location, 0..5).await.unwrap(); - assert_eq!(Bytes::from_static(b"Hello"), ret); + // Test 1: get_ranges should NOT call stat() + let range = 0..5; + let ret = store + .get_ranges(&location, std::slice::from_ref(&range)) + .await + .unwrap(); + assert_eq!(vec![Bytes::from_static(b"Hello")], ret); assert_eq!( stat_count.load(Ordering::SeqCst), 0, - "get_range should not call stat()" + "get_ranges should not call stat()" ); // Reset counter diff --git a/src/object-store/tests/object_store_test.rs b/src/object-store/tests/object_store_test.rs index 4ade4fc383..d373972473 100644 --- a/src/object-store/tests/object_store_test.rs +++ b/src/object-store/tests/object_store_test.rs @@ -16,10 +16,10 @@ use std::env; use std::sync::Arc; use anyhow::Result; -use arrow_object_store::path::Path; -use arrow_object_store::{ObjectStore as ArrowObjectStore, ObjectStoreExt}; use bytes::Bytes; use common_telemetry::info; +use datafusion_object_store::path::Path; +use datafusion_object_store::{ObjectStore as ArrowObjectStore, ObjectStoreExt}; use futures::TryStreamExt; use object_store::ObjectStore; use object_store::services::{Fs, S3}; diff --git a/src/query/src/part_sort.rs b/src/query/src/part_sort.rs index 933f0e47cf..f1574ec334 100644 --- a/src/query/src/part_sort.rs +++ b/src/query/src/part_sort.rs @@ -23,16 +23,11 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use arrow::array::{ - ArrayRef, AsArray, TimestampMicrosecondArray, TimestampMillisecondArray, - TimestampNanosecondArray, TimestampSecondArray, -}; +use arrow::array::{Array, ArrayRef}; use arrow::compute::{concat, concat_batches, take_record_batch}; -use arrow_schema::{Schema, SchemaRef}; +use arrow_schema::{DataType, SchemaRef, TimeUnit}; use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream}; -use common_telemetry::warn; use common_time::Timestamp; -use common_time::timestamp::TimeUnit; use datafusion::common::arrow::compute::sort_to_indices; use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion::execution::{RecordBatchStream, TaskContext}; @@ -42,16 +37,16 @@ use datafusion::physical_plan::filter_pushdown::{ }; use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, TopK, - TopKDynamicFilters, + DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, +}; +use datafusion_common::{DataFusionError, ScalarValue, internal_err}; +use datafusion_expr::Operator; +use datafusion_physical_expr::expressions::{ + BinaryExpr, DynamicFilterPhysicalExpr, is_not_null, is_null, lit, }; -use datafusion_common::tree_node::{Transformed, TreeNode}; -use datafusion_common::{DataFusionError, internal_err}; -use datafusion_physical_expr::expressions::{Column, DynamicFilterPhysicalExpr, lit}; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; -use futures::{Stream, StreamExt}; +use futures::Stream; use itertools::Itertools; -use parking_lot::RwLock; use snafu::location; use store_api::region_engine::PartitionRange; @@ -102,9 +97,11 @@ fn group_ranges_by_primary_end( /// Sort input within given PartitionRange /// -/// Input is assumed to be segmented by empty RecordBatch, which indicates a new `PartitionRange` is starting +/// Partition range transitions are detected by comparing sort column values against +/// the current range boundaries (via [`PartSortStream::try_find_next_range`]). +/// Empty RecordBatches from upstream are tolerated but do not serve as range delimiters. /// -/// and this operator will sort each partition independently within the partition. +/// This operator sorts each partition independently. #[derive(Debug, Clone)] pub struct PartSortExec { /// Physical sort expressions(that is, sort by timestamp) @@ -115,10 +112,7 @@ pub struct PartSortExec { metrics: ExecutionPlanMetricsSet, partition_ranges: Vec>, properties: Arc, - /// Filter matching the state of the sort for dynamic filter pushdown. - /// If `limit` is `Some`, this will also be set and a TopK operator may be used. - /// If `limit` is `None`, this will be `None`. - filter: Option>>, + dynamic_filter: Option>, } impl PartSortExec { @@ -139,9 +133,7 @@ impl PartSortExec { properties.boundedness, )); - let filter = limit - .is_some() - .then(|| Self::create_filter(expression.expr.clone())); + let dynamic_filter = Self::new_dynamic_filter(&expression, limit); Ok(Self { expression, @@ -150,15 +142,20 @@ impl PartSortExec { metrics, partition_ranges, properties, - filter, + dynamic_filter, }) } - /// Add or reset `self.filter` to a new `TopKDynamicFilters`. - fn create_filter(expr: Arc) -> Arc> { - Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new( - DynamicFilterPhysicalExpr::new(vec![expr], lit(true)), - )))) + fn new_dynamic_filter( + expression: &PhysicalSortExpr, + limit: Option, + ) -> Option> { + limit.map(|_| { + Arc::new(DynamicFilterPhysicalExpr::new( + vec![expression.expr.clone()], + lit(true), + )) + }) } pub fn to_stream( @@ -185,7 +182,6 @@ impl PartSortExec { input_stream, self.partition_ranges[partition].clone(), partition, - self.filter.clone(), )?) as _; Ok(df_stream) @@ -276,28 +272,24 @@ impl ExecutionPlan for PartSortExec { &self, phase: FilterPushdownPhase, parent_filters: Vec>, - _config: &datafusion::config::ConfigOptions, + config: &datafusion::config::ConfigOptions, ) -> datafusion_common::Result { if !matches!(phase, FilterPushdownPhase::Post) { return FilterDescription::from_children(parent_filters, &self.children()); } let mut child = ChildFilterDescription::from_child(&parent_filters, &self.input)?; - - if let Some(filter) = &self.filter { - child = child.with_self_filter(filter.read().expr()); + if let Some(filter) = &self.dynamic_filter + && config.optimizer.enable_topk_dynamic_filter_pushdown + { + let filter: Arc = filter.clone(); + child = child.with_self_filter(filter); } - Ok(FilterDescription::new().with_child(child)) } fn reset_state(self: Arc) -> datafusion_common::Result> { - // shared dynamic filter needs to be reset - let new_filter = self - .limit - .is_some() - .then(|| Self::create_filter(self.expression.expr.clone())); - + let dynamic_filter = Self::new_dynamic_filter(&self.expression, self.limit); Ok(Arc::new(Self { expression: self.expression.clone(), limit: self.limit, @@ -305,25 +297,34 @@ impl ExecutionPlan for PartSortExec { metrics: self.metrics.clone(), partition_ranges: self.partition_ranges.clone(), properties: self.properties.clone(), - filter: new_filter, + dynamic_filter, })) } } enum PartSortBuffer { All(Vec), - /// TopK buffer with row count. - /// - /// Given this heap only keeps k element, the capacity of this buffer - /// is not accurate, and is only used for empty check. - Top(TopK, usize), + TopK(Vec), +} + +#[derive(Clone, Debug, PartialEq, Eq)] +enum TopKThreshold { + Null, + Value(i64), } impl PartSortBuffer { pub fn is_empty(&self) -> bool { match self { PartSortBuffer::All(v) => v.is_empty(), - PartSortBuffer::Top(_, cnt) => *cnt == 0, + PartSortBuffer::TopK(v) => v.is_empty(), + } + } + + pub fn num_rows(&self) -> usize { + match self { + PartSortBuffer::All(v) => v.iter().map(|batch| batch.num_rows()).sum(), + PartSortBuffer::TopK(v) => v.iter().map(|batch| batch.num_rows()).sum(), } } } @@ -343,16 +344,13 @@ struct PartSortStream { cur_part_idx: usize, evaluating_batch: Option, metrics: BaselineMetrics, - context: Arc, - root_metrics: ExecutionPlanMetricsSet, + dynamic_filter: Option>, + dynamic_filter_threshold: Option, /// Groups of ranges by primary end: (primary_end, start_idx_inclusive, end_idx_exclusive). /// Ranges in the same group must be processed together before outputting results. range_groups: Vec<(Timestamp, usize, usize)>, /// Current group being processed (index into range_groups). cur_group_idx: usize, - /// Dynamic Filter for all TopK instance, notice the `PartSortExec`/`PartSortStream`/`TopK` must share the same filter - /// so that updates from each `TopK` can be seen by others(and by the table scan operator). - filter: Option>>, } impl PartSortStream { @@ -363,30 +361,9 @@ impl PartSortStream { input: DfSendableRecordBatchStream, partition_ranges: Vec, partition: usize, - filter: Option>>, ) -> datafusion_common::Result { - let buffer = if let Some(limit) = limit { - let Some(filter) = filter.clone() else { - return internal_err!( - "TopKDynamicFilters must be provided when limit is set at {}", - snafu::location!() - ); - }; - - PartSortBuffer::Top( - TopK::try_new( - partition, - sort.schema().clone(), - vec![], - [sort.expression.clone()].into(), - limit, - context.session_config().batch_size(), - context.runtime_env(), - &sort.metrics, - filter.clone(), - )?, - 0, - ) + let buffer = if limit.is_some() { + PartSortBuffer::TopK(Vec::new()) } else { PartSortBuffer::All(Vec::new()) }; @@ -409,11 +386,10 @@ impl PartSortStream { cur_part_idx: 0, evaluating_batch: None, metrics: BaselineMetrics::new(&sort.metrics, partition), - context, - root_metrics: sort.metrics.clone(), + dynamic_filter: sort.dynamic_filter.clone(), + dynamic_filter_threshold: None, range_groups, cur_group_idx: 0, - filter, }) } } @@ -456,6 +432,20 @@ macro_rules! array_check_helper { }}; } +macro_rules! threshold_helper { + ($t:ty, $unit:expr, $arr:expr, $threshold_idx:expr) => {{ + let arr = $arr + .as_any() + .downcast_ref::>() + .unwrap(); + if arr.is_null($threshold_idx) { + TopKThreshold::Null + } else { + TopKThreshold::Value(arr.value($threshold_idx)) + } + }}; +} + impl PartSortStream { /// check whether the sort column's min/max value is within the current group's effective range. /// For group-based processing, data from multiple ranges with the same primary end @@ -532,98 +522,270 @@ impl PartSortStream { Ok(None) } - fn push_buffer(&mut self, batch: DfRecordBatch) -> datafusion_common::Result<()> { + fn push_buffer( + &mut self, + batch: DfRecordBatch, + sort_data_type: &DataType, + ) -> datafusion_common::Result<()> { + let topk = matches!(self.buffer, PartSortBuffer::TopK(_)); match &mut self.buffer { PartSortBuffer::All(v) => v.push(batch), - PartSortBuffer::Top(top, cnt) => { - *cnt += batch.num_rows(); - top.insert_batch(batch)?; - } + PartSortBuffer::TopK(v) => v.push(batch), + } + + if topk { + let threshold = self.compact_topk_buffer(sort_data_type)?; + self.update_dynamic_filter(sort_data_type, threshold)?; } Ok(()) } - /// Stop read earlier when current group do not overlap with any of those next group - /// If not overlap, we can stop read further input as current top k is final - /// Use dynamic filter to evaluate the next group's primary end - fn can_stop_early(&mut self, schema: &Arc) -> datafusion_common::Result { - let topk_cnt = match &self.buffer { - PartSortBuffer::Top(_, cnt) => *cnt, - _ => return Ok(false), - }; - // not fulfill topk yet - if Some(topk_cnt) < self.limit { - return Ok(false); - } - let next_group_primary_end = if self.cur_group_idx + 1 < self.range_groups.len() { - self.range_groups[self.cur_group_idx + 1].0 - } else { - // no next group - return Ok(false); + fn compact_topk_buffer( + &mut self, + sort_data_type: &DataType, + ) -> datafusion_common::Result> { + let Some(limit) = self.limit else { + return Ok(None); }; - // dyn filter is updated based on the last value of topk heap("threshold") - // it's a max-heap for a ASC TopK operator - // so can use dyn filter to prune data range - let filter = self - .filter - .as_ref() - .expect("TopKDynamicFilters must be provided when limit is set"); - let filter = filter.read().expr().current()?; - let mut ts_index = None; - // invariant: the filter must contain only the same column expr that's time index column - let filter = filter - .transform_down(|c| { - // rewrite all column's index as 0 - if let Some(column) = c.as_any().downcast_ref::() { - ts_index = Some(column.index()); - Ok(Transformed::yes( - Arc::new(Column::new(column.name(), 0)) as Arc - )) - } else { - Ok(Transformed::no(c)) - } - })? - .data; - let Some(ts_index) = ts_index else { - return Ok(false); // dyn filter is still true, cannot decide, continue read + let PartSortBuffer::TopK(buffer) = + std::mem::replace(&mut self.buffer, PartSortBuffer::TopK(Vec::new())) + else { + return Ok(None); }; - let field = if schema.fields().len() <= ts_index { - warn!( - "Schema mismatch when evaluating dynamic filter for PartSortExec at {}, schema: {:?}, ts_index: {}", - self.partition, schema, ts_index - ); - return Ok(false); // schema mismatch, cannot decide, continue read - } else { - schema.field(ts_index) - }; - let schema = Arc::new(Schema::new(vec![field.clone()])); - // convert next_group_primary_end to array&filter, if eval to false, means no overlap, can stop early - let primary_end_array = match next_group_primary_end.unit() { - TimeUnit::Second => Arc::new(TimestampSecondArray::from(vec![ - next_group_primary_end.value(), - ])) as ArrayRef, - TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from(vec![ - next_group_primary_end.value(), - ])) as ArrayRef, - TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from(vec![ - next_group_primary_end.value(), - ])) as ArrayRef, - TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from(vec![ - next_group_primary_end.value(), - ])) as ArrayRef, - }; - let primary_end_batch = DfRecordBatch::try_new(schema, vec![primary_end_array])?; - let res = filter.evaluate(&primary_end_batch)?; - let array = res.into_array(primary_end_batch.num_rows())?; - let filter = array.as_boolean().clone(); - let overlap = filter.iter().next().flatten(); - if let Some(false) = overlap { - Ok(true) - } else { - Ok(false) + + if limit == 0 || buffer.is_empty() { + self.buffer = PartSortBuffer::TopK(Vec::new()); + return Ok(None); } + + let total_rows: usize = buffer.iter().map(|batch| batch.num_rows()).sum(); + if total_rows <= limit { + self.buffer = PartSortBuffer::TopK(buffer); + return Ok(None); + } + + let topk = self.sort_record_batches(&buffer, Some(limit), false)?; + let threshold = self.threshold_from_sorted_batch(&topk, sort_data_type)?; + self.buffer = if topk.num_rows() == 0 { + PartSortBuffer::TopK(Vec::new()) + } else { + PartSortBuffer::TopK(vec![topk]) + }; + + Ok(threshold) + } + + fn threshold_from_sorted_batch( + &self, + batch: &DfRecordBatch, + sort_data_type: &DataType, + ) -> datafusion_common::Result> { + if batch.num_rows() == 0 { + return Ok(None); + } + + let threshold_idx = batch.num_rows() - 1; + let sort_column = self.expression.evaluate_to_sort_column(batch)?.values; + let threshold = downcast_ts_array!( + sort_data_type => (threshold_helper, sort_column, threshold_idx), + _ => internal_err!( + "Unsupported data type for sort column: {:?}", + sort_data_type + )?, + ); + + Ok(Some(threshold)) + } + + fn topk_threshold( + &self, + sort_data_type: &arrow_schema::DataType, + ) -> datafusion_common::Result> { + let Some(limit) = self.limit else { + return Ok(None); + }; + + if limit == 0 || self.buffer.num_rows() < limit { + return Ok(None); + } + + let buffer = match &self.buffer { + PartSortBuffer::All(buffer) | PartSortBuffer::TopK(buffer) => buffer, + }; + let mut sort_columns = Vec::with_capacity(buffer.len()); + let mut opt = None; + for batch in buffer { + let sort_column = self.expression.evaluate_to_sort_column(batch)?; + opt = opt.or(sort_column.options); + sort_columns.push(sort_column.values); + } + + let sort_column = + concat(&sort_columns.iter().map(|a| a.as_ref()).collect_vec()).map_err(|e| { + DataFusionError::ArrowError( + Box::new(e), + Some(format!("Fail to concat sort columns at {}", location!())), + ) + })?; + + let indices = sort_to_indices(&sort_column, opt, Some(limit)).map_err(|e| { + DataFusionError::ArrowError( + Box::new(e), + Some(format!("Fail to sort to indices at {}", location!())), + ) + })?; + + if indices.len() < limit { + return Ok(None); + } + + let threshold_idx = indices.value(indices.len() - 1) as usize; + let threshold = downcast_ts_array!( + sort_data_type => (threshold_helper, sort_column, threshold_idx), + _ => internal_err!( + "Unsupported data type for sort column: {:?}", + sort_data_type + )?, + ); + + Ok(Some(threshold)) + } + + fn threshold_scalar_value( + sort_data_type: &DataType, + threshold: &TopKThreshold, + ) -> datafusion_common::Result { + let value = match threshold { + TopKThreshold::Null => None, + TopKThreshold::Value(value) => Some(*value), + }; + + let scalar = match sort_data_type { + DataType::Timestamp(TimeUnit::Second, tz) => { + ScalarValue::TimestampSecond(value, tz.clone()) + } + DataType::Timestamp(TimeUnit::Millisecond, tz) => { + ScalarValue::TimestampMillisecond(value, tz.clone()) + } + DataType::Timestamp(TimeUnit::Microsecond, tz) => { + ScalarValue::TimestampMicrosecond(value, tz.clone()) + } + DataType::Timestamp(TimeUnit::Nanosecond, tz) => { + ScalarValue::TimestampNanosecond(value, tz.clone()) + } + _ => internal_err!( + "Unsupported data type for sort column: {:?}", + sort_data_type + )?, + }; + + Ok(scalar) + } + + fn build_dynamic_filter_expr( + &self, + sort_data_type: &DataType, + threshold: &TopKThreshold, + ) -> datafusion_common::Result> { + let op = if self.expression.options.descending { + Operator::Gt + } else { + Operator::Lt + }; + let value_null = matches!(threshold, TopKThreshold::Null); + let value = Self::threshold_scalar_value(sort_data_type, threshold)?; + let comparison: Arc = Arc::new(BinaryExpr::new( + self.expression.expr.clone(), + op, + lit(value), + )); + + match (self.expression.options.nulls_first, value_null) { + (true, true) => Ok(lit(false)), + (true, false) => Ok(Arc::new(BinaryExpr::new( + is_null(self.expression.expr.clone())?, + Operator::Or, + comparison, + ))), + (false, true) => is_not_null(self.expression.expr.clone()), + (false, false) => Ok(comparison), + } + } + + fn update_dynamic_filter( + &mut self, + sort_data_type: &DataType, + threshold: Option, + ) -> datafusion_common::Result<()> { + let Some(filter) = &self.dynamic_filter else { + return Ok(()); + }; + + let threshold = if let Some(threshold) = threshold { + threshold + } else { + let Some(threshold) = self.topk_threshold(sort_data_type)? else { + return Ok(()); + }; + threshold + }; + + if self.dynamic_filter_threshold.as_ref() == Some(&threshold) { + return Ok(()); + } + + let predicate = self.build_dynamic_filter_expr(sort_data_type, &threshold)?; + filter.update(predicate)?; + self.dynamic_filter_threshold = Some(threshold); + + Ok(()) + } + + /// Returns true when all rows in the next group are guaranteed to be worse + /// than the current top-k threshold. + fn can_stop_before_group( + &self, + group_idx: usize, + sort_data_type: &arrow_schema::DataType, + ) -> datafusion_common::Result { + if group_idx >= self.range_groups.len() { + return Ok(false); + } + + let threshold = if let Some(threshold) = &self.dynamic_filter_threshold { + threshold.clone() + } else { + let Some(threshold) = self.topk_threshold(sort_data_type)? else { + return Ok(false); + }; + threshold + }; + + let (_, start_idx, _) = self.range_groups[group_idx]; + let next_range = + project_partition_range_for_sort(self.partition_ranges[start_idx], sort_data_type)?; + let descending = self.expression.options.descending; + let next_primary = get_primary_end(&next_range, descending).value(); + + let can_stop = match threshold { + // When the k-th element is NULL: + // - nulls_first=true: nulls are the best values (Arrow sorts NULLs first + // regardless of ASC/DESC), so top-k is already optimal → stop early. + // - nulls_first=false: nulls are the worst, non-null values from the + // next group could displace them → continue reading. + TopKThreshold::Null => self.expression.options.nulls_first, + TopKThreshold::Value(value) => { + if descending { + value >= next_primary + } else { + value < next_primary + } + } + }; + + Ok(can_stop) } /// Check if the given partition index is within the current group. @@ -685,21 +847,20 @@ impl PartSortStream { fn sort_buffer(&mut self) -> datafusion_common::Result { match &mut self.buffer { PartSortBuffer::All(_) => self.sort_all_buffer(), - PartSortBuffer::Top(_, _) => self.sort_top_buffer(), + PartSortBuffer::TopK(_) => self.sort_topk_buffer(), } } - /// Internal method for sorting `All` buffer (without limit). - fn sort_all_buffer(&mut self) -> datafusion_common::Result { - let PartSortBuffer::All(buffer) = - std::mem::replace(&mut self.buffer, PartSortBuffer::All(Vec::new())) - else { - unreachable!("buffer type is checked before and should be All variant") - }; - + fn sort_record_batches( + &mut self, + buffer: &[DfRecordBatch], + limit: Option, + check_range: bool, + ) -> datafusion_common::Result { if buffer.is_empty() { return Ok(DfRecordBatch::new_empty(self.schema.clone())); } + let mut sort_columns = Vec::with_capacity(buffer.len()); let mut opt = None; for batch in buffer.iter() { @@ -716,7 +877,7 @@ impl PartSortStream { ) })?; - let indices = sort_to_indices(&sort_column, opt, self.limit).map_err(|e| { + let indices = sort_to_indices(&sort_column, opt, limit).map_err(|e| { DataFusionError::ArrowError( Box::new(e), Some(format!("Fail to sort to indices at {}", location!())), @@ -726,29 +887,31 @@ impl PartSortStream { return Ok(DfRecordBatch::new_empty(self.schema.clone())); } - self.check_in_range( - &sort_column, - ( - indices.value(0) as usize, - indices.value(indices.len() - 1) as usize, - ), - ) - .inspect_err(|_e| { - #[cfg(debug_assertions)] - common_telemetry::error!( - "Fail to check sort column in range at {}, current_idx: {}, num_rows: {}, err: {}", - self.partition, - self.cur_part_idx, - sort_column.len(), - _e - ); - })?; + if check_range { + self.check_in_range( + &sort_column, + ( + indices.value(0) as usize, + indices.value(indices.len() - 1) as usize, + ), + ) + .inspect_err(|_e| { + #[cfg(debug_assertions)] + common_telemetry::error!( + "Fail to check sort column in range at {}, current_idx: {}, num_rows: {}, err: {}", + self.partition, + self.cur_part_idx, + sort_column.len(), + _e + ); + })?; + } // reserve memory for the concat input and sorted output let total_mem: usize = buffer.iter().map(|r| r.get_array_memory_size()).sum(); self.reservation.try_grow(total_mem * 2)?; - let full_input = concat_batches(&self.schema, &buffer).map_err(|e| { + let full_input = concat_batches(&self.schema, buffer).map_err(|e| { DataFusionError::ArrowError( Box::new(e), Some(format!( @@ -774,63 +937,25 @@ impl PartSortStream { Ok(sorted) } - /// Internal method for sorting `Top` buffer (with limit). - fn sort_top_buffer(&mut self) -> datafusion_common::Result { - let Some(filter) = self.filter.clone() else { - return internal_err!( - "TopKDynamicFilters must be provided when sorting with limit at {}", - snafu::location!() - ); - }; - - let new_top_buffer = TopK::try_new( - self.partition, - self.schema().clone(), - vec![], - [self.expression.clone()].into(), - self.limit.unwrap(), - self.context.session_config().batch_size(), - self.context.runtime_env(), - &self.root_metrics, - filter, - )?; - let PartSortBuffer::Top(top_k, _) = - std::mem::replace(&mut self.buffer, PartSortBuffer::Top(new_top_buffer, 0)) + /// Internal method for sorting `All` buffer (without limit). + fn sort_all_buffer(&mut self) -> datafusion_common::Result { + let PartSortBuffer::All(buffer) = + std::mem::replace(&mut self.buffer, PartSortBuffer::All(Vec::new())) else { - unreachable!("buffer type is checked before and should be Top variant") + unreachable!() }; - let mut result_stream = top_k.emit()?; - let mut placeholder_ctx = std::task::Context::from_waker(futures::task::noop_waker_ref()); - let mut results = vec![]; - // according to the current implementation of `TopK`, the result stream will always be ready - loop { - match result_stream.poll_next_unpin(&mut placeholder_ctx) { - Poll::Ready(Some(batch)) => { - let batch = batch?; - results.push(batch); - } - Poll::Pending => { - #[cfg(debug_assertions)] - unreachable!("TopK result stream should always be ready") - } - Poll::Ready(None) => { - break; - } - } - } + self.sort_record_batches(&buffer, self.limit, self.limit.is_none()) + } - let concat_batch = concat_batches(&self.schema, &results).map_err(|e| { - DataFusionError::ArrowError( - Box::new(e), - Some(format!( - "Fail to concat top k result record batch when sorting at {}", - location!() - )), - ) - })?; + fn sort_topk_buffer(&mut self) -> datafusion_common::Result { + let PartSortBuffer::TopK(buffer) = + std::mem::replace(&mut self.buffer, PartSortBuffer::TopK(Vec::new())) + else { + unreachable!() + }; - Ok(concat_batch) + self.sort_record_batches(&buffer, self.limit, false) } /// Sorts current buffer and returns `None` when there is nothing to emit. @@ -847,6 +972,12 @@ impl PartSortStream { } } + fn mark_dynamic_filter_complete(&self) { + if let Some(filter) = &self.dynamic_filter { + filter.mark_complete(); + } + } + /// Try to split the input batch if it contains data that exceeds the current partition range. /// /// When the input batch contains data that exceeds the current partition range, this function @@ -860,14 +991,14 @@ impl PartSortStream { /// /// Returns `None` if the input batch is empty or fully within the current partition range /// (or we're still collecting data within the same group), and `Some(batch)` when we've - /// completed a group and have sorted output. When operating in TopK (limit) mode, this + /// completed a group and have sorted output. When operating in limit mode, this /// function will not emit intermediate batches; it only prepares state for a single final /// output. fn split_batch( &mut self, batch: DfRecordBatch, ) -> datafusion_common::Result> { - if matches!(self.buffer, PartSortBuffer::Top(_, _)) { + if self.limit.is_some() { self.split_batch_topk(batch)?; return Ok(None); } @@ -875,9 +1006,9 @@ impl PartSortStream { self.split_batch_all(batch) } - /// Specialized splitting logic for TopK (limit) mode. + /// Specialized splitting logic for limit mode. /// - /// We only emit once when the TopK buffer is fulfilled or when input is fully consumed. + /// We only emit once when input is fully consumed. /// When the buffer is fulfilled and we are about to enter a new group, we stop consuming /// further ranges. fn split_batch_topk(&mut self, batch: DfRecordBatch) -> datafusion_common::Result<()> { @@ -893,7 +1024,7 @@ impl PartSortStream { let next_range_idx = self.try_find_next_range(&sort_column)?; let Some(idx) = next_range_idx else { - self.push_buffer(batch)?; + self.push_buffer(batch, sort_column.data_type())?; // keep polling input for next batch return Ok(()); }; @@ -901,7 +1032,7 @@ impl PartSortStream { let this_range = batch.slice(0, idx); let remaining_range = batch.slice(idx, batch.num_rows() - idx); if this_range.num_rows() != 0 { - self.push_buffer(this_range)?; + self.push_buffer(this_range, sort_column.data_type())?; } // Step to next proper PartitionRange @@ -917,16 +1048,12 @@ impl PartSortStream { // Check if we're still in the same group let in_same_group = self.is_in_current_group(self.cur_part_idx); - // When TopK is fulfilled and we are switching to a new group, stop consuming further ranges if possible. - // read from topk heap and determine whether we can stop earlier. - if !in_same_group && self.can_stop_early(&batch.schema())? { - self.input_complete = true; - self.evaluating_batch = None; - return Ok(()); - } - - // Transition to a new group if needed if !in_same_group { + let next_group_idx = self.cur_group_idx + 1; + if self.can_stop_before_group(next_group_idx, sort_column.data_type())? { + self.input_complete = true; + return Ok(()); + } self.advance_to_next_group(); } @@ -938,7 +1065,7 @@ impl PartSortStream { } else if remaining_range.num_rows() != 0 { // remaining batch is within the current partition range // push to the buffer and continue polling - self.push_buffer(remaining_range)?; + self.push_buffer(remaining_range, sort_column.data_type())?; } Ok(()) @@ -960,7 +1087,7 @@ impl PartSortStream { let next_range_idx = self.try_find_next_range(&sort_column)?; let Some(idx) = next_range_idx else { - self.push_buffer(batch)?; + self.push_buffer(batch, sort_column.data_type())?; // keep polling input for next batch return Ok(None); }; @@ -968,7 +1095,7 @@ impl PartSortStream { let this_range = batch.slice(0, idx); let remaining_range = batch.slice(idx, batch.num_rows() - idx); if this_range.num_rows() != 0 { - self.push_buffer(this_range)?; + self.push_buffer(this_range, sort_column.data_type())?; } // Step to next proper PartitionRange @@ -993,7 +1120,7 @@ impl PartSortStream { } else { // remaining batch is within the current partition range if remaining_range.num_rows() != 0 { - self.push_buffer(remaining_range)?; + self.push_buffer(remaining_range, sort_column.data_type())?; } } // Return None to continue collecting within the same group @@ -1013,7 +1140,7 @@ impl PartSortStream { // remaining batch is within the current partition range // push to the buffer and continue polling if remaining_range.num_rows() != 0 { - self.push_buffer(remaining_range)?; + self.push_buffer(remaining_range, sort_column.data_type())?; } } @@ -1027,8 +1154,10 @@ impl PartSortStream { loop { if self.input_complete { if let Some(sorted_batch) = self.sorted_buffer_if_non_empty()? { + self.mark_dynamic_filter_complete(); return Poll::Ready(Some(Ok(sorted_batch))); } + self.mark_dynamic_filter_complete(); return Poll::Ready(None); } @@ -1041,8 +1170,10 @@ impl PartSortStream { if self.cur_part_idx >= self.partition_ranges.len() { // All partitions processed, discard remaining data if let Some(sorted_batch) = self.sorted_buffer_if_non_empty()? { + self.mark_dynamic_filter_complete(); return Poll::Ready(Some(Ok(sorted_batch))); } + self.mark_dynamic_filter_complete(); return Poll::Ready(None); } @@ -1094,8 +1225,8 @@ mod test { use std::sync::Arc; use arrow::array::{ - TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, - TimestampSecondArray, + BooleanArray, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampNanosecondArray, TimestampSecondArray, }; use arrow::json::ArrayWriter; use arrow_schema::{DataType, Field, Schema, SortOptions, TimeUnit}; @@ -1107,60 +1238,6 @@ mod test { use super::*; use crate::test_util::{MockInputExec, new_ts_array}; - #[tokio::test] - async fn test_can_stop_early_with_empty_topk_buffer() { - let unit = TimeUnit::Millisecond; - let schema = Arc::new(Schema::new(vec![Field::new( - "ts", - DataType::Timestamp(unit, None), - false, - )])); - - // Build a minimal PartSortExec and stream, but inject a dynamic filter that - // always evaluates to false so TopK will filter out all rows internally. - let mock_input = Arc::new(MockInputExec::new(vec![vec![]], schema.clone())); - let exec = PartSortExec::try_new( - PhysicalSortExpr { - expr: Arc::new(Column::new("ts", 0)), - options: SortOptions { - descending: true, - ..Default::default() - }, - }, - Some(3), - vec![vec![]], - mock_input.clone(), - ) - .unwrap(); - - let filter = Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new( - DynamicFilterPhysicalExpr::new(vec![], lit(false)), - )))); - - let input_stream = mock_input - .execute(0, Arc::new(TaskContext::default())) - .unwrap(); - let mut stream = PartSortStream::new( - Arc::new(TaskContext::default()), - &exec, - Some(3), - input_stream, - vec![], - 0, - Some(filter), - ) - .unwrap(); - - // Push 3 rows so the external counter reaches `limit`, while TopK keeps no rows. - let batch = DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![1, 2, 3])]) - .unwrap(); - stream.push_buffer(batch).unwrap(); - - // The TopK result buffer is empty, so we cannot determine early-stop. - // Ensure this path returns `Ok(false)` (and, importantly, does not panic). - assert!(!stream.can_stop_early(&schema).unwrap()); - } - #[ignore = "hard to gen expected data correctly here, TODO(discord9): fix it later"] #[tokio::test] async fn fuzzy_test() { @@ -1770,6 +1847,130 @@ mod test { .await; } + #[test] + fn test_topk_buffer_is_bounded_and_updates_dynamic_filter() { + let unit = TimeUnit::Millisecond; + let schema = Arc::new(Schema::new(vec![Field::new( + "ts", + DataType::Timestamp(unit, None), + false, + )])); + let sort_data_type = DataType::Timestamp(unit, None); + let partition_range = PartitionRange { + start: Timestamp::new(0, unit.into()), + end: Timestamp::new(10, unit.into()), + num_rows: 9, + identifier: 0, + }; + let mock_input = Arc::new(MockInputExec::new(vec![vec![]], schema.clone())); + let exec = PartSortExec::try_new( + PhysicalSortExpr { + expr: Arc::new(Column::new("ts", 0)), + options: SortOptions { + descending: true, + ..Default::default() + }, + }, + Some(3), + vec![vec![partition_range]], + mock_input.clone(), + ) + .unwrap(); + let input_stream = mock_input + .execute(0, Arc::new(TaskContext::default())) + .unwrap(); + let mut stream = PartSortStream::new( + Arc::new(TaskContext::default()), + &exec, + Some(3), + input_stream, + vec![partition_range], + 0, + ) + .unwrap(); + + for batch in [vec![1, 2, 3], vec![4, 5, 6], vec![0, 7, 8]] { + stream + .push_buffer( + DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, batch)]) + .unwrap(), + &sort_data_type, + ) + .unwrap(); + assert_eq!(stream.buffer.num_rows(), 3); + } + + let dynamic_filter = stream.dynamic_filter.as_ref().unwrap().clone(); + let probe = DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![5, 6, 7])]) + .unwrap(); + let predicate = dynamic_filter.current().unwrap(); + let result = predicate + .evaluate(&probe) + .unwrap() + .into_array(probe.num_rows()) + .unwrap(); + let result = result.as_any().downcast_ref::().unwrap(); + assert_eq!(result, &BooleanArray::from(vec![false, false, true])); + + let expected = + DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![8, 7, 6])]) + .unwrap(); + assert_eq!(stream.sort_buffer().unwrap(), expected); + } + + #[test] + fn test_topk_limit_zero_clears_buffer_without_threshold() { + let unit = TimeUnit::Millisecond; + let schema = Arc::new(Schema::new(vec![Field::new( + "ts", + DataType::Timestamp(unit, None), + false, + )])); + let sort_data_type = DataType::Timestamp(unit, None); + let partition_range = PartitionRange { + start: Timestamp::new(0, unit.into()), + end: Timestamp::new(10, unit.into()), + num_rows: 3, + identifier: 0, + }; + let mock_input = Arc::new(MockInputExec::new(vec![vec![]], schema.clone())); + let exec = PartSortExec::try_new( + PhysicalSortExpr { + expr: Arc::new(Column::new("ts", 0)), + options: SortOptions { + descending: true, + ..Default::default() + }, + }, + Some(0), + vec![vec![partition_range]], + mock_input.clone(), + ) + .unwrap(); + let input_stream = mock_input + .execute(0, Arc::new(TaskContext::default())) + .unwrap(); + let mut stream = PartSortStream::new( + Arc::new(TaskContext::default()), + &exec, + Some(0), + input_stream, + vec![partition_range], + 0, + ) + .unwrap(); + + stream + .push_buffer( + DfRecordBatch::try_new(schema, vec![new_ts_array(unit, vec![1, 2, 3])]).unwrap(), + &sort_data_type, + ) + .unwrap(); + + assert_eq!(stream.buffer.num_rows(), 0); + assert_eq!(stream.dynamic_filter_threshold, None); + } + /// Test that verifies early termination behavior. /// Once we've produced limit * num_partitions rows, we should stop /// pulling from input stream. @@ -2918,88 +3119,4 @@ mod test { ) .await; } - - /// First group: [0,20), data: [0, 5, 15] - /// Second group: [10, 30), data: [21, 25, 29] - /// after first group, calling early stop manually, and check if filter is updated - #[tokio::test] - async fn test_early_stop_check_update_dyn_filter() { - let unit = TimeUnit::Millisecond; - let schema = Arc::new(Schema::new(vec![Field::new( - "ts", - DataType::Timestamp(unit, None), - false, - )])); - - let mock_input = Arc::new(MockInputExec::new(vec![vec![]], schema.clone())); - let exec = PartSortExec::try_new( - PhysicalSortExpr { - expr: Arc::new(Column::new("ts", 0)), - options: SortOptions { - descending: false, - ..Default::default() - }, - }, - Some(3), - vec![vec![ - PartitionRange { - start: Timestamp::new(0, unit.into()), - end: Timestamp::new(20, unit.into()), - num_rows: 3, - identifier: 1, - }, - PartitionRange { - start: Timestamp::new(10, unit.into()), - end: Timestamp::new(30, unit.into()), - num_rows: 3, - identifier: 1, - }, - ]], - mock_input.clone(), - ) - .unwrap(); - - let filter = exec.filter.clone().unwrap(); - let input_stream = mock_input - .execute(0, Arc::new(TaskContext::default())) - .unwrap(); - let mut stream = PartSortStream::new( - Arc::new(TaskContext::default()), - &exec, - Some(3), - input_stream, - vec![], - 0, - Some(filter.clone()), - ) - .unwrap(); - - // initially, snapshot_generation is 1 - assert_eq!(filter.read().expr().snapshot_generation(), 1); - let batch = - DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![0, 5, 15])]) - .unwrap(); - stream.push_buffer(batch).unwrap(); - - // after pushing first batch, snapshot_generation is updated to 2 - assert_eq!(filter.read().expr().snapshot_generation(), 2); - assert!(!stream.can_stop_early(&schema).unwrap()); - // still two as not updated - assert_eq!(filter.read().expr().snapshot_generation(), 2); - - let _ = stream.sort_top_buffer().unwrap(); - - let batch = - DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![21, 25, 29])]) - .unwrap(); - stream.push_buffer(batch).unwrap(); - // still two as not updated - assert_eq!(filter.read().expr().snapshot_generation(), 2); - let new = stream.sort_top_buffer().unwrap(); - // still two as not updated - assert_eq!(filter.read().expr().snapshot_generation(), 2); - - // dyn filter kick in, and filter out all rows >= 15(the filter is rows<15) - assert_eq!(new.num_rows(), 0) - } } diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 93d3689b1e..b35b30968a 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -24,7 +24,7 @@ api.workspace = true arrow.workspace = true arrow-flight.workspace = true arrow-ipc.workspace = true -arrow-pg = "0.12" +arrow-pg = "0.13" arrow-schema.workspace = true async-trait.workspace = true auth.workspace = true diff --git a/src/table/src/predicate.rs b/src/table/src/predicate.rs index daf404d5d6..e559e2c296 100644 --- a/src/table/src/predicate.rs +++ b/src/table/src/predicate.rs @@ -605,7 +605,7 @@ mod tests { .unwrap(); let write_props = WriterProperties::builder() - .set_max_row_group_size(10) + .set_max_row_group_row_count(Some(10)) .build(); let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(write_props)).unwrap();