diff --git a/.github/cargo-blacklist.txt b/.github/cargo-blacklist.txt index d2f071130e..32e7878a86 100644 --- a/.github/cargo-blacklist.txt +++ b/.github/cargo-blacklist.txt @@ -1,3 +1,2 @@ native-tls openssl -aws-lc-sys diff --git a/Cargo.lock b/Cargo.lock index d66fa2d686..05cccf77f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -200,9 +200,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.98" +version = "1.0.102" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" +checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" [[package]] name = "anymap2" @@ -982,6 +982,29 @@ dependencies = [ "cc", ] +[[package]] +name = "aws-lc-rs" +version = "1.16.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ec6fb3fe69024a75fa7e1bfb48aa6cf59706a101658ea01bfd33b2b248a038f" +dependencies = [ + "aws-lc-sys", + "untrusted 0.7.1", + "zeroize", +] + +[[package]] +name = "aws-lc-sys" +version = "0.40.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f50037ee5e1e41e7b8f9d161680a725bd1626cb6f8c7e901f91f942850852fe7" +dependencies = [ + "cc", + "cmake", + "dunce", + "fs_extra", +] + [[package]] name = "axum" version = "0.6.20" @@ -1129,9 +1152,9 @@ dependencies = [ [[package]] name = "backon" -version = "1.5.1" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "302eaff5357a264a2c42f127ecb8bac761cf99749fc3dc95677e2743991f99e7" +checksum = "cffb0e931875b666fc4fcb20fee52e9bbd1ef836fd9e9e04ec21555f9f85f7ef" dependencies = [ "fastrand", "gloo-timers", @@ -1252,7 +1275,7 @@ version = "0.72.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4f72209734318d0b619a5e0f5129918b848c416e122a3c4ce054e03cb87b726f" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.11.1", "cexpr", "clang-sys", "itertools 0.13.0", @@ -1287,11 +1310,11 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.9.1" +version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b8e56985ec62d17e9c1001dc89c88ecd7dc08e47eba5ec7c29c7b5eeecde967" +checksum = "c4512299f36f043ab09a583e57bceb5a5aab7a73db1805848e8fef3c9e8c78b3" dependencies = [ - "serde", + "serde_core", ] [[package]] @@ -1654,6 +1677,12 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4f4c707c6a209cbe82d10abd08e1ea8995e9ea937d2550646e02798948992be0" +[[package]] +name = "cesu8" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" + [[package]] name = "cexpr" version = "0.6.0" @@ -1933,7 +1962,7 @@ dependencies = [ "paste", "query", "rand 0.9.1", - "reqwest", + "reqwest 0.13.2", "serde", "serde_json", "servers", @@ -2084,7 +2113,7 @@ dependencies = [ "query", "rand 0.9.1", "regex", - "reqwest", + "reqwest 0.13.2", "serde", "serde_json", "servers", @@ -2139,7 +2168,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c" dependencies = [ "lazy_static", - "windows-sys 0.59.0", + "windows-sys 0.48.0", +] + +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "memchr", ] [[package]] @@ -2227,7 +2266,6 @@ dependencies = [ "futures", "lazy_static", "object-store", - "object_store_opendal", "orc-rust", "parquet", "paste", @@ -2382,7 +2420,7 @@ dependencies = [ "common-test-util", "common-version", "hyper 0.14.32", - "reqwest", + "reqwest 0.13.2", "serde", "tempfile", "tokio", @@ -3319,6 +3357,22 @@ dependencies = [ "memchr", ] +[[package]] +name = "ctor" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "424e0138278faeb2b401f174ad17e715c829512d74f3d1e81eb43365c2e0590e" +dependencies = [ + "ctor-proc-macro", + "dtor", +] + +[[package]] +name = "ctor-proc-macro" +version = "0.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52560adf09603e58c9a7ee1fe1dcb95a16927b17c127f0ac02d6e768a0e25bc1" + [[package]] name = "ctr" version = "0.9.2" @@ -3556,7 +3610,7 @@ dependencies = [ "itertools 0.14.0", "liblzma", "log", - "object_store", + "object_store 0.12.5", "parking_lot 0.12.4", "parquet", "rand 0.9.1", @@ -3588,7 +3642,7 @@ dependencies = [ "futures", "itertools 0.14.0", "log", - "object_store", + "object_store 0.12.5", "parking_lot 0.12.4", "tokio", ] @@ -3612,7 +3666,7 @@ dependencies = [ "futures", "itertools 0.14.0", "log", - "object_store", + "object_store 0.12.5", ] [[package]] @@ -3630,7 +3684,7 @@ dependencies = [ "itertools 0.14.0", "libc", "log", - "object_store", + "object_store 0.12.5", "parquet", "paste", "recursive", @@ -3675,7 +3729,7 @@ dependencies = [ "itertools 0.14.0", "liblzma", "log", - "object_store", + "object_store 0.12.5", "rand 0.9.1", "tokio", "tokio-util", @@ -3702,7 +3756,7 @@ dependencies = [ "datafusion-session", "futures", "itertools 0.14.0", - "object_store", + "object_store 0.12.5", "tokio", ] @@ -3723,7 +3777,7 @@ dependencies = [ "datafusion-physical-plan", "datafusion-session", "futures", - "object_store", + "object_store 0.12.5", "regex", "tokio", ] @@ -3745,7 +3799,7 @@ dependencies = [ "datafusion-physical-plan", "datafusion-session", "futures", - "object_store", + "object_store 0.12.5", "serde_json", "tokio", "tokio-stream", @@ -3774,7 +3828,7 @@ dependencies = [ "futures", "itertools 0.14.0", "log", - "object_store", + "object_store 0.12.5", "parking_lot 0.12.4", "parquet", "tokio", @@ -3800,7 +3854,7 @@ dependencies = [ "datafusion-physical-expr-common", "futures", "log", - "object_store", + "object_store 0.12.5", "parking_lot 0.12.4", "rand 0.9.1", "tempfile", @@ -4009,7 +4063,7 @@ dependencies = [ "datafusion", "futures", "futures-util", - "object_store", + "object_store 0.12.5", "orc-rust", "tokio", ] @@ -4188,7 +4242,7 @@ dependencies = [ "datafusion", "half", "itertools 0.14.0", - "object_store", + "object_store 0.12.5", "pbjson-types", "prost 0.14.1", "substrait 0.62.2", @@ -4249,7 +4303,7 @@ dependencies = [ "prometheus 0.14.0", "prost 0.14.1", "query", - "reqwest", + "reqwest 0.13.2", "serde", "serde_json", "servers", @@ -4661,6 +4715,27 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ea8a8b81cacc08888170eef4d13b775126db426d0b348bee9d18c2c1eaf123cf" +[[package]] +name = "dtor" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "404d02eeb088a82cfd873006cb713fe411306c7d182c344905e101fb1167d301" +dependencies = [ + "dtor-proc-macro", +] + +[[package]] +name = "dtor-proc-macro" +version = "0.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f678cf4a922c215c63e0de95eb1ff08a958a81d47e485cf9da1e27bf6305cfa5" + +[[package]] +name = "dunce" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" + [[package]] name = "duration-str" version = "0.11.3" @@ -4964,7 +5039,6 @@ dependencies = [ "datatypes", "futures", "object-store", - "object_store_opendal", "serde", "serde_json", "snafu 0.8.6", @@ -5015,7 +5089,7 @@ version = "25.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1045398c1bfd89168b5fd3f1fc11f6e70b34f6f66300c87d44d3de849463abf1" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.11.1", "rustc_version", ] @@ -5266,7 +5340,7 @@ dependencies = [ "prost 0.14.1", "query", "rand 0.9.1", - "reqwest", + "reqwest 0.13.2", "serde", "serde_json", "servers", @@ -5362,6 +5436,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "fs_extra" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" + [[package]] name = "fsevent-sys" version = "4.1.0" @@ -5654,7 +5734,7 @@ version = "0.20.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b88256088d75a56f8ecfa070513a775dd9107f6530ef14919dac831af9cfe2b" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.11.1", "libc", "libgit2-sys", "log", @@ -6607,7 +6687,7 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f37dccff2791ab604f9babef0ba14fbe0be30bd368dc541e2b08d07c8aa908f3" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.11.1", "inotify-sys", "libc", ] @@ -6786,6 +6866,93 @@ dependencies = [ "regex", ] +[[package]] +name = "jiff" +version = "0.2.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a3546dc96b6d42c5f24902af9e2538e82e39ad350b0c766eb3fbf2d8f3d8359" +dependencies = [ + "jiff-static", + "jiff-tzdb-platform", + "js-sys", + "log", + "portable-atomic", + "portable-atomic-util", + "serde_core", + "wasm-bindgen", + "windows-sys 0.61.2", +] + +[[package]] +name = "jiff-static" +version = "0.2.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a8c8b344124222efd714b73bb41f8b5120b27a7cc1c75593a6ff768d9d05aa4" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + +[[package]] +name = "jiff-tzdb" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c900ef84826f1338a557697dc8fc601df9ca9af4ac137c7fb61d4c6f2dfd3076" + +[[package]] +name = "jiff-tzdb-platform" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "875a5a69ac2bab1a891711cf5eccbec1ce0341ea805560dcd90b7a2e925132e8" +dependencies = [ + "jiff-tzdb", +] + +[[package]] +name = "jni" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a87aa2bb7d2af34197c04845522473242e1aa17c12f4935d5856491a7fb8c97" +dependencies = [ + "cesu8", + "cfg-if", + "combine", + "jni-sys 0.3.1", + "log", + "thiserror 1.0.69", + "walkdir", + "windows-sys 0.45.0", +] + +[[package]] +name = "jni-sys" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41a652e1f9b6e0275df1f15b32661cf0d4b78d4d87ddec5e0c3c20f097433258" +dependencies = [ + "jni-sys 0.4.1", +] + +[[package]] +name = "jni-sys" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6377a88cb3910bee9b0fa88d4f42e1d2da8e79915598f65fb0c7ee14c878af2" +dependencies = [ + "jni-sys-macros", +] + +[[package]] +name = "jni-sys-macros" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38c0b942f458fe50cdac086d2f946512305e5631e720728f2a61aabcd47a6264" +dependencies = [ + "quote", + "syn 2.0.117", +] + [[package]] name = "jobserver" version = "0.1.33" @@ -6798,10 +6965,12 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.77" +version = "0.3.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cfaf33c695fc6e08064efbc1f72ec937429614f25eef83af942d0e227c3a28f" +checksum = "2964e92d1d9dc3364cae4d718d93f227e3abb088e747d92e0395bfdedf1c12ca" dependencies = [ + "cfg-if", + "futures-util", "once_cell", "wasm-bindgen", ] @@ -6886,16 +7055,18 @@ dependencies = [ [[package]] name = "jsonwebtoken" -version = "9.3.1" +version = "10.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a87cc7a48537badeae96744432de36f4be2b4a34a05a5ef32e9dd8a1c169dde" +checksum = "0529410abe238729a60b108898784df8984c87f6054c9c4fcacc47e4803c1ce1" dependencies = [ + "aws-lc-rs", "base64 0.22.1", + "getrandom 0.2.16", "js-sys", "pem", - "ring", "serde", "serde_json", + "signature", "simple_asn1", ] @@ -7290,7 +7461,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667" dependencies = [ "cfg-if", - "windows-targets 0.52.6", + "windows-targets 0.48.5", ] [[package]] @@ -7325,7 +7496,7 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1580801010e535496706ba011c15f8532df6b42297d2e471fec38ceadd8c0638" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.11.1", "libc", "redox_syscall 0.5.13", ] @@ -7679,6 +7850,15 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae960838283323069879657ca3de837e9f7bbb4c7bf6ea7f1b290d5e9476d2e0" +[[package]] +name = "mea" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6747f54621d156e1b47eb6b25f39a941b9fc347f98f67d25d8881ff99e8ed832" +dependencies = [ + "slab", +] + [[package]] name = "measure_time" version = "0.9.0" @@ -7826,7 +8006,7 @@ dependencies = [ "toml 0.8.23", "tonic 0.14.2", "tower 0.5.2", - "tower-http 0.6.6", + "tower-http 0.6.8", "tracing", "tracing-subscriber", "typetag", @@ -8249,7 +8429,7 @@ dependencies = [ "base64 0.21.7", "bigdecimal 0.4.8", "bindgen", - "bitflags 2.9.1", + "bitflags 2.11.1", "bitvec", "btoi", "byteorder", @@ -8287,7 +8467,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34a9141e735d5bb02414a7ac03add09522466d4db65bdd827069f76ae0850e58" dependencies = [ "base64 0.22.1", - "bitflags 2.9.1", + "bitflags 2.11.1", "btoi", "byteorder", "bytes", @@ -8399,7 +8579,7 @@ version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.11.1", "cfg-if", "cfg_aliases 0.1.1", "libc", @@ -8411,7 +8591,7 @@ version = "0.30.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.11.1", "cfg-if", "cfg_aliases 0.2.1", "libc", @@ -8448,7 +8628,7 @@ version = "8.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2fee8403b3d66ac7b26aee6e40a897d85dc5ce26f44da36b8b73e987cc52e943" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.11.1", "filetime", "fsevent-sys", "inotify", @@ -8708,7 +8888,9 @@ name = "object-store" version = "1.0.1" dependencies = [ "anyhow", + "async-trait", "bytes", + "chrono", "common-base", "common-error", "common-macro", @@ -8718,11 +8900,16 @@ dependencies = [ "futures", "humantime-serde", "lazy_static", + "object_store 0.12.5", + "object_store 0.13.2", + "object_store_opendal", "opendal", "prometheus 0.14.0", - "reqwest", + "rand 0.9.1", + "reqwest 0.13.2", "serde", "snafu 0.8.6", + "tempfile", "tokio", "uuid", ] @@ -8752,15 +8939,42 @@ dependencies = [ ] [[package]] -name = "object_store_opendal" -version = "0.54.0" +name = "object_store" +version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ce697ee723fdc3eaf6c457abf4059034be15167022b18b619993802cd1443d5" +checksum = "622acbc9100d3c10e2ee15804b0caa40e55c933d5aa53814cd520805b7958a49" dependencies = [ "async-trait", "bytes", + "chrono", + "futures-channel", + "futures-core", + "futures-util", + "http 1.3.1", + "humantime", + "itertools 0.14.0", + "parking_lot 0.12.4", + "percent-encoding", + "thiserror 2.0.17", + "tokio", + "tracing", + "url", + "walkdir", + "wasm-bindgen-futures", + "web-time", +] + +[[package]] +name = "object_store_opendal" +version = "0.56.0" +source = "git+https://github.com/apache/opendal.git?tag=v0.56.0-rc.2#8332367dd7629bebc7759a11a5bbbd941dd060e9" +dependencies = [ + "async-trait", + "bytes", + "chrono", "futures", - "object_store", + "mea", + "object_store 0.13.2", "opendal", "pin-project", "tokio", @@ -8810,7 +9024,7 @@ version = "6.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "336b9c63443aceef14bea841b899035ae3abe89b7c486aaf4c5bd8aafedac3f0" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.11.1", "libc", "once_cell", "onig_sys", @@ -8840,33 +9054,229 @@ checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" [[package]] name = "opendal" -version = "0.54.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42afda58fa2cf50914402d132cc1caacff116a85d10c72ab2082bb7c50021754" +version = "0.56.0" +source = "git+https://github.com/apache/opendal.git?tag=v0.56.0-rc.2#8332367dd7629bebc7759a11a5bbbd941dd060e9" +dependencies = [ + "ctor", + "opendal-core", + "opendal-layer-concurrent-limit", + "opendal-layer-logging", + "opendal-layer-prometheus", + "opendal-layer-retry", + "opendal-layer-timeout", + "opendal-layer-tracing", + "opendal-service-azblob", + "opendal-service-fs", + "opendal-service-gcs", + "opendal-service-http", + "opendal-service-oss", + "opendal-service-s3", +] + +[[package]] +name = "opendal-core" +version = "0.56.0" +source = "git+https://github.com/apache/opendal.git?tag=v0.56.0-rc.2#8332367dd7629bebc7759a11a5bbbd941dd060e9" dependencies = [ "anyhow", - "backon", "base64 0.22.1", "bytes", - "chrono", - "crc32c", "futures", - "getrandom 0.2.16", "http 1.3.1", "http-body 1.0.1", + "jiff", "log", "md-5", + "mea", "percent-encoding", - "prometheus 0.14.0", "quick-xml 0.38.4", - "reqsign", - "reqwest", + "reqsign-core", + "reqwest 0.13.2", "serde", "serde_json", - "sha2", "tokio", - "tracing", + "url", "uuid", + "web-time", +] + +[[package]] +name = "opendal-layer-concurrent-limit" +version = "0.56.0" +source = "git+https://github.com/apache/opendal.git?tag=v0.56.0-rc.2#8332367dd7629bebc7759a11a5bbbd941dd060e9" +dependencies = [ + "futures", + "http 1.3.1", + "mea", + "opendal-core", +] + +[[package]] +name = "opendal-layer-logging" +version = "0.56.0" +source = "git+https://github.com/apache/opendal.git?tag=v0.56.0-rc.2#8332367dd7629bebc7759a11a5bbbd941dd060e9" +dependencies = [ + "log", + "opendal-core", +] + +[[package]] +name = "opendal-layer-observe-metrics-common" +version = "0.56.0" +source = "git+https://github.com/apache/opendal.git?tag=v0.56.0-rc.2#8332367dd7629bebc7759a11a5bbbd941dd060e9" +dependencies = [ + "futures", + "http 1.3.1", + "opendal-core", +] + +[[package]] +name = "opendal-layer-prometheus" +version = "0.56.0" +source = "git+https://github.com/apache/opendal.git?tag=v0.56.0-rc.2#8332367dd7629bebc7759a11a5bbbd941dd060e9" +dependencies = [ + "opendal-core", + "opendal-layer-observe-metrics-common", + "prometheus 0.14.0", +] + +[[package]] +name = "opendal-layer-retry" +version = "0.56.0" +source = "git+https://github.com/apache/opendal.git?tag=v0.56.0-rc.2#8332367dd7629bebc7759a11a5bbbd941dd060e9" +dependencies = [ + "backon", + "log", + "opendal-core", +] + +[[package]] +name = "opendal-layer-timeout" +version = "0.56.0" +source = "git+https://github.com/apache/opendal.git?tag=v0.56.0-rc.2#8332367dd7629bebc7759a11a5bbbd941dd060e9" +dependencies = [ + "opendal-core", + "tokio", +] + +[[package]] +name = "opendal-layer-tracing" +version = "0.56.0" +source = "git+https://github.com/apache/opendal.git?tag=v0.56.0-rc.2#8332367dd7629bebc7759a11a5bbbd941dd060e9" +dependencies = [ + "futures", + "http 1.3.1", + "opendal-core", + "tracing", +] + +[[package]] +name = "opendal-service-azblob" +version = "0.56.0" +source = "git+https://github.com/apache/opendal.git?tag=v0.56.0-rc.2#8332367dd7629bebc7759a11a5bbbd941dd060e9" +dependencies = [ + "base64 0.22.1", + "bytes", + "http 1.3.1", + "log", + "opendal-core", + "opendal-service-azure-common", + "quick-xml 0.38.4", + "reqsign-azure-storage", + "reqsign-core", + "reqsign-file-read-tokio", + "serde", + "sha2", + "uuid", +] + +[[package]] +name = "opendal-service-azure-common" +version = "0.56.0" +source = "git+https://github.com/apache/opendal.git?tag=v0.56.0-rc.2#8332367dd7629bebc7759a11a5bbbd941dd060e9" +dependencies = [ + "http 1.3.1", + "opendal-core", +] + +[[package]] +name = "opendal-service-fs" +version = "0.56.0" +source = "git+https://github.com/apache/opendal.git?tag=v0.56.0-rc.2#8332367dd7629bebc7759a11a5bbbd941dd060e9" +dependencies = [ + "bytes", + "log", + "opendal-core", + "serde", + "tokio", + "xattr", +] + +[[package]] +name = "opendal-service-gcs" +version = "0.56.0" +source = "git+https://github.com/apache/opendal.git?tag=v0.56.0-rc.2#8332367dd7629bebc7759a11a5bbbd941dd060e9" +dependencies = [ + "async-trait", + "bytes", + "http 1.3.1", + "log", + "opendal-core", + "percent-encoding", + "quick-xml 0.38.4", + "reqsign-core", + "reqsign-file-read-tokio", + "reqsign-google", + "serde", + "serde_json", + "tokio", +] + +[[package]] +name = "opendal-service-http" +version = "0.56.0" +source = "git+https://github.com/apache/opendal.git?tag=v0.56.0-rc.2#8332367dd7629bebc7759a11a5bbbd941dd060e9" +dependencies = [ + "http 1.3.1", + "log", + "opendal-core", + "serde", +] + +[[package]] +name = "opendal-service-oss" +version = "0.56.0" +source = "git+https://github.com/apache/opendal.git?tag=v0.56.0-rc.2#8332367dd7629bebc7759a11a5bbbd941dd060e9" +dependencies = [ + "bytes", + "http 1.3.1", + "log", + "opendal-core", + "quick-xml 0.38.4", + "reqsign-aliyun-oss", + "reqsign-core", + "reqsign-file-read-tokio", + "serde", +] + +[[package]] +name = "opendal-service-s3" +version = "0.56.0" +source = "git+https://github.com/apache/opendal.git?tag=v0.56.0-rc.2#8332367dd7629bebc7759a11a5bbbd941dd060e9" +dependencies = [ + "base64 0.22.1", + "bytes", + "crc32c", + "http 1.3.1", + "log", + "md-5", + "opendal-core", + "quick-xml 0.38.4", + "reqsign-aws-v4", + "reqsign-core", + "reqsign-file-read-tokio", + "serde", + "url", ] [[package]] @@ -8940,7 +9350,7 @@ dependencies = [ "bytes", "http 1.3.1", "opentelemetry 0.30.0", - "reqwest", + "reqwest 0.12.28", ] [[package]] @@ -8955,7 +9365,7 @@ dependencies = [ "opentelemetry-proto 0.30.0", "opentelemetry_sdk 0.30.0", "prost 0.13.5", - "reqwest", + "reqwest 0.12.28", "thiserror 2.0.17", "tokio", "tonic 0.13.1", @@ -9078,7 +9488,6 @@ dependencies = [ "meter-macros", "moka", "object-store", - "object_store_opendal", "partition", "path-slash", "prometheus 0.14.0", @@ -9343,7 +9752,7 @@ dependencies = [ "num-bigint", "num-integer", "num-traits", - "object_store", + "object_store 0.12.5", "paste", "seq-macro", "simdutf8", @@ -9591,6 +10000,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24bd4e6b1bfddc5c6420dee6602ec80946700b4c31ddcb64ee190ad6d979c210" dependencies = [ "async-trait", + "aws-lc-rs", "base64 0.22.1", "bytes", "chrono", @@ -9602,7 +10012,6 @@ dependencies = [ "pg_interval_2", "postgres-types", "rand 0.10.0", - "ring", "rust_decimal", "rustls-pki-types", "ryu", @@ -9919,6 +10328,15 @@ version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" +[[package]] +name = "portable-atomic-util" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a106d1259c23fac8e543272398ae0e3c0b8d33c88ed73d0cc71b0f1d902618" +dependencies = [ + "portable-atomic", +] + [[package]] name = "postgres-protocol" version = "0.6.8" @@ -10137,7 +10555,7 @@ version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cc5b72d8145275d844d4b5f6d4e1eef00c8cd889edb6035c21675d1bb1f45c9f" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.11.1", "hex", "procfs-core", "rustix 0.38.44", @@ -10149,7 +10567,7 @@ version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "239df02d8349b06fc07398a3a1697b06418223b1c7725085e801e7c0fc6a12ec" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.11.1", "hex", ] @@ -10244,7 +10662,7 @@ version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bee689443a2bd0a16ab0348b52ee43e3b2d1b1f931c8aa5c9f8de4c86fbe8c40" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.11.1", "num-traits", "rand 0.9.1", "rand_chacha 0.9.0", @@ -10300,7 +10718,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" dependencies = [ "bytes", - "heck 0.5.0", + "heck 0.4.1", "itertools 0.12.1", "log", "multimap", @@ -10320,7 +10738,7 @@ version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac6c3320f9abac597dcbc668774ef006702672474aad53c6d596b62e487b40b1" dependencies = [ - "heck 0.5.0", + "heck 0.4.1", "itertools 0.14.0", "log", "multimap", @@ -10558,7 +10976,7 @@ dependencies = [ "async-walkdir", "auto_impl", "base64 0.22.1", - "bitflags 2.9.1", + "bitflags 2.11.1", "bytes", "common-base", "common-error", @@ -10587,7 +11005,7 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e8bbe1a966bd2f362681a44f6edce3c2310ac21e4d5067a6e7ec396297a6ea0" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.11.1", "memchr", "unicase", ] @@ -10704,7 +11122,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "331e97a1af0bf59823e6eadffe373d7b27f485be8748f71471c662c1f269b7fb" dependencies = [ "memchr", - "serde", ] [[package]] @@ -10717,6 +11134,16 @@ dependencies = [ "serde", ] +[[package]] +name = "quick-xml" +version = "0.39.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "958f21e8e7ceb5a1aa7fa87fab28e7c75976e0bfe7e23ff069e0a260f894067d" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quinn" version = "0.11.8" @@ -10743,6 +11170,7 @@ version = "0.11.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098" dependencies = [ + "aws-lc-rs", "bytes", "getrandom 0.3.3", "lru-slab", @@ -11007,7 +11435,7 @@ version = "0.5.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0d04b7d0ee6b4a0207a0a7adb104d23ecb0b47d6beae7152d0fa34b692b29fd6" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.11.1", ] [[package]] @@ -11111,42 +11539,123 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51743d3e274e2b18df81c4dc6caf8a5b8e15dbe799e0dca05c7617380094e884" [[package]] -name = "reqsign" -version = "0.16.5" +name = "reqsign-aliyun-oss" +version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43451dbf3590a7590684c25fb8d12ecdcc90ed3ac123433e500447c7d77ed701" +checksum = "57ac2757f3140aa2e213b554148ae0b52733e624fc6723f0cc6bb3d440176c95" dependencies = [ "anyhow", - "async-trait", - "base64 0.22.1", - "chrono", "form_urlencoded", - "getrandom 0.2.16", - "hex", - "hmac", - "home", "http 1.3.1", - "jsonwebtoken", "log", - "once_cell", "percent-encoding", - "quick-xml 0.37.5", - "rand 0.8.5", - "reqwest", - "rsa", + "reqsign-core", "rust-ini 0.21.1", "serde", "serde_json", +] + +[[package]] +name = "reqsign-aws-v4" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44eaca382e94505a49f1a4849658d153aebf79d9c1a58e5dd3b10361511e9f43" +dependencies = [ + "anyhow", + "bytes", + "form_urlencoded", + "http 1.3.1", + "log", + "percent-encoding", + "quick-xml 0.39.2", + "reqsign-core", + "rust-ini 0.21.1", + "serde", + "serde_json", + "serde_urlencoded", "sha1", +] + +[[package]] +name = "reqsign-azure-storage" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a321980405d596bd34aaf95c4722a3de4128a67fd19e74a81a83aa3fdf082e6" +dependencies = [ + "anyhow", + "base64 0.22.1", + "bytes", + "form_urlencoded", + "http 1.3.1", + "jsonwebtoken", + "log", + "pem", + "percent-encoding", + "reqsign-core", + "rsa", + "serde", + "serde_json", + "sha1", +] + +[[package]] +name = "reqsign-core" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b10302cf0a7d7e7352ba211fc92c3c5bebf1286153e49cc5aa87348078a8e102" +dependencies = [ + "anyhow", + "base64 0.22.1", + "bytes", + "form_urlencoded", + "futures", + "hex", + "hmac", + "http 1.3.1", + "jiff", + "log", + "percent-encoding", + "sha1", + "sha2", + "windows-sys 0.61.2", +] + +[[package]] +name = "reqsign-file-read-tokio" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2d89295b3d17abea31851cc8de55d843d89c52132c864963c38d41920613dc5" +dependencies = [ + "anyhow", + "reqsign-core", + "tokio", +] + +[[package]] +name = "reqsign-google" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35cc609b49c69e76ecaceb775a03f792d1ed3e7755ab3548d4534fd801e3242e" +dependencies = [ + "form_urlencoded", + "http 1.3.1", + "jsonwebtoken", + "log", + "percent-encoding", + "reqsign-aws-v4", + "reqsign-core", + "rsa", + "serde", + "serde_json", "sha2", "tokio", ] [[package]] name = "reqwest" -version = "0.12.24" +version = "0.12.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d0946410b9f7b082a427e4ef5c8ff541a88b357bc6c637c40db3a68ac70a36f" +checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147" dependencies = [ "base64 0.22.1", "bytes", @@ -11161,13 +11670,54 @@ dependencies = [ "hyper-util", "js-sys", "log", + "percent-encoding", + "pin-project-lite", + "quinn", + "rustls", + "rustls-pki-types", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper 1.0.2", + "tokio", + "tokio-rustls", + "tokio-util", + "tower 0.5.2", + "tower-http 0.6.8", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "wasm-streams 0.4.2", + "web-sys", + "webpki-roots 1.0.1", +] + +[[package]] +name = "reqwest" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab3f43e3283ab1488b624b44b0e988d0acea0b3214e694730a055cb6b2efa801" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures-core", + "futures-util", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "hyper 1.6.0", + "hyper-rustls", + "hyper-util", + "js-sys", + "log", "mime_guess", "percent-encoding", "pin-project-lite", "quinn", "rustls", - "rustls-native-certs 0.8.1", "rustls-pki-types", + "rustls-platform-verifier", "serde", "serde_json", "serde_urlencoded", @@ -11176,14 +11726,13 @@ dependencies = [ "tokio-rustls", "tokio-util", "tower 0.5.2", - "tower-http 0.6.6", + "tower-http 0.6.8", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", - "wasm-streams", + "wasm-streams 0.5.0", "web-sys", - "webpki-roots 1.0.1", ] [[package]] @@ -11205,7 +11754,7 @@ dependencies = [ "cfg-if", "getrandom 0.2.16", "libc", - "untrusted", + "untrusted 0.9.0", "windows-sys 0.52.0", ] @@ -11508,7 +12057,7 @@ version = "0.38.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.11.1", "errno", "libc", "linux-raw-sys 0.4.15", @@ -11521,7 +12070,7 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c71e83d6afe7ff64890ec6b71d6a69bb8a610ab78ce364b3352876bb4c801266" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.11.1", "errno", "libc", "linux-raw-sys 0.9.4", @@ -11534,6 +12083,7 @@ version = "0.23.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7160e3e10bf4535308537f3c4e1641468cd0e485175d6163087c0393c7d46643" dependencies = [ + "aws-lc-rs", "log", "once_cell", "ring", @@ -11565,7 +12115,7 @@ dependencies = [ "openssl-probe", "rustls-pki-types", "schannel", - "security-framework 3.2.0", + "security-framework 3.7.0", ] [[package]] @@ -11587,15 +12137,43 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-platform-verifier" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d99feebc72bae7ab76ba994bb5e121b8d83d910ca40b36e0921f53becc41784" +dependencies = [ + "core-foundation 0.10.1", + "core-foundation-sys", + "jni", + "log", + "once_cell", + "rustls", + "rustls-native-certs 0.8.1", + "rustls-platform-verifier-android", + "rustls-webpki", + "security-framework 3.7.0", + "security-framework-sys", + "webpki-root-certs", + "windows-sys 0.61.2", +] + +[[package]] +name = "rustls-platform-verifier-android" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f" + [[package]] name = "rustls-webpki" version = "0.103.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df33b2b81ac578cabaf06b89b0631153a3f416b0a886e8a7a1707fb51abbd1ef" dependencies = [ + "aws-lc-rs", "ring", "rustls-pki-types", - "untrusted", + "untrusted 0.9.0", ] [[package]] @@ -11777,7 +12355,7 @@ version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.11.1", "core-foundation 0.9.4", "core-foundation-sys", "libc", @@ -11786,11 +12364,11 @@ dependencies = [ [[package]] name = "security-framework" -version = "3.2.0" +version = "3.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "271720403f46ca04f7ba6f55d438f8bd878d6b8ca0a1046e8228c4145bcbb316" +checksum = "b7f4bc775c73d9a02cde8bf7b2ec4c9d12743edf609006c7facc23998404cd1d" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.11.1", "core-foundation 0.10.1", "core-foundation-sys", "libc", @@ -11799,9 +12377,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.14.0" +version = "2.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49db231d56a190491cb4aeda9527f1ad45345af50b0851622a7adb8c03b01c32" +checksum = "6ce2691df843ecc5d231c0b14ece2acc3efb62c0a398c7e1d875f3983ce020e3" dependencies = [ "core-foundation-sys", "libc", @@ -12079,7 +12657,7 @@ dependencies = [ "quoted-string", "rand 0.9.1", "regex", - "reqwest", + "reqwest 0.13.2", "rust-embed", "rust_decimal", "rustls", @@ -12109,7 +12687,7 @@ dependencies = [ "tonic 0.14.2", "tonic-reflection", "tower 0.5.2", - "tower-http 0.6.6", + "tower-http 0.6.8", "tracing", "tracing-opentelemetry", "urlencoding", @@ -12375,7 +12953,7 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1961e2ef424c1424204d3a5d6975f934f56b6d50ff5732382d84ebf460e147f7" dependencies = [ - "heck 0.5.0", + "heck 0.4.1", "proc-macro2", "quote", "syn 2.0.117", @@ -12528,7 +13106,7 @@ dependencies = [ "local-ip-address", "mysql", "num_cpus", - "reqwest", + "reqwest 0.12.28", "serde", "serde_json", "sha2", @@ -12672,7 +13250,7 @@ checksum = "aa003f0038df784eb8fecbbac13affe3da23b45194bd57dba231c8f48199c526" dependencies = [ "atoi", "base64 0.22.1", - "bitflags 2.9.1", + "bitflags 2.11.1", "byteorder", "bytes", "chrono", @@ -12716,7 +13294,7 @@ checksum = "db58fcd5a53cf07c184b154801ff91347e4c30d17a3562a635ff028ad5deda46" dependencies = [ "atoi", "base64 0.22.1", - "bitflags 2.9.1", + "bitflags 2.11.1", "byteorder", "chrono", "crc", @@ -13448,7 +14026,8 @@ dependencies = [ "paste", "rand 0.9.1", "rand_chacha 0.9.0", - "reqwest", + "reqwest 0.13.2", + "rustls", "schemars", "serde", "serde_json", @@ -14139,7 +14718,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" dependencies = [ "base64 0.21.7", - "bitflags 2.9.1", + "bitflags 2.11.1", "bytes", "http 1.3.1", "http-body 1.0.1", @@ -14153,13 +14732,13 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.6.6" +version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2" +checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" dependencies = [ "async-compression", "base64 0.22.1", - "bitflags 2.9.1", + "bitflags 2.11.1", "bytes", "futures-core", "futures-util", @@ -14513,6 +15092,12 @@ version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "untrusted" version = "0.9.0" @@ -14833,48 +15418,32 @@ checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" [[package]] name = "wasm-bindgen" -version = "0.2.100" +version = "0.2.118" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5" +checksum = "0bf938a0bacb0469e83c1e148908bd7d5a6010354cf4fb73279b7447422e3a89" dependencies = [ "cfg-if", "once_cell", "rustversion", "wasm-bindgen-macro", -] - -[[package]] -name = "wasm-bindgen-backend" -version = "0.2.100" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f0a0651a5c2bc21487bde11ee802ccaf4c51935d0d3d42a6101f98161700bc6" -dependencies = [ - "bumpalo", - "log", - "proc-macro2", - "quote", - "syn 2.0.117", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-futures" -version = "0.4.50" +version = "0.4.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "555d470ec0bc3bb57890405e5d4322cc9ea83cebb085523ced7be4144dac1e61" +checksum = "f371d383f2fb139252e0bfac3b81b265689bf45b6874af544ffa4c975ac1ebf8" dependencies = [ - "cfg-if", "js-sys", - "once_cell", "wasm-bindgen", - "web-sys", ] [[package]] name = "wasm-bindgen-macro" -version = "0.2.100" +version = "0.2.118" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fe63fc6d09ed3792bd0897b314f53de8e16568c2b3f7982f468c0bf9bd0b407" +checksum = "eeff24f84126c0ec2db7a449f0c2ec963c6a49efe0698c4242929da037ca28ed" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -14882,22 +15451,22 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.100" +version = "0.2.118" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de" +checksum = "9d08065faf983b2b80a79fd87d8254c409281cf7de75fc4b773019824196c904" dependencies = [ + "bumpalo", "proc-macro2", "quote", "syn 2.0.117", - "wasm-bindgen-backend", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-shared" -version = "0.2.100" +version = "0.2.118" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a05d73b933a847d6cccdda8f838a22ff101ad9bf93e33684f39c1f5f0eece3d" +checksum = "5fd04d9e306f1907bd13c6361b5c6bfc7b3b3c095ed3f8a9246390f8dbdee129" dependencies = [ "unicode-ident", ] @@ -14937,13 +15506,26 @@ dependencies = [ "web-sys", ] +[[package]] +name = "wasm-streams" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d1ec4f6517c9e11ae630e200b2b65d193279042e28edd4a2cda233e46670bbb" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasmparser" version = "0.244.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.11.1", "hashbrown 0.15.4", "indexmap 2.13.0", "semver", @@ -14951,9 +15533,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.77" +version = "0.3.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33b6dd2ef9186f1f2072e409e99cd22a975331a6b3591b12c764e0e55c60d5d2" +checksum = "4f2dfbb17949fa2088e5d39408c48368947b86f7834484e87b73de55bc14d97d" dependencies = [ "js-sys", "wasm-bindgen", @@ -14976,7 +15558,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed63aea5ce73d0ff405984102c42de94fc55a6b75765d621c65262469b3c9b53" dependencies = [ "ring", - "untrusted", + "untrusted 0.9.0", +] + +[[package]] +name = "webpki-root-certs" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31141ce3fc3e300ae89b78c0dd67f9708061d1d2eda54b8209346fd6be9a92c" +dependencies = [ + "rustls-pki-types", ] [[package]] @@ -15040,7 +15631,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.48.0", ] [[package]] @@ -15210,6 +15801,15 @@ dependencies = [ "windows-link 0.1.3", ] +[[package]] +name = "windows-sys" +version = "0.45.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" +dependencies = [ + "windows-targets 0.42.2", +] + [[package]] name = "windows-sys" version = "0.48.0" @@ -15246,6 +15846,21 @@ dependencies = [ "windows-link 0.2.1", ] +[[package]] +name = "windows-targets" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" +dependencies = [ + "windows_aarch64_gnullvm 0.42.2", + "windows_aarch64_msvc 0.42.2", + "windows_i686_gnu 0.42.2", + "windows_i686_msvc 0.42.2", + "windows_x86_64_gnu 0.42.2", + "windows_x86_64_gnullvm 0.42.2", + "windows_x86_64_msvc 0.42.2", +] + [[package]] name = "windows-targets" version = "0.48.5" @@ -15286,6 +15901,12 @@ dependencies = [ "windows-link 0.1.3", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.48.5" @@ -15298,6 +15919,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" +[[package]] +name = "windows_aarch64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" + [[package]] name = "windows_aarch64_msvc" version = "0.48.5" @@ -15310,6 +15937,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" +[[package]] +name = "windows_i686_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" + [[package]] name = "windows_i686_gnu" version = "0.48.5" @@ -15328,6 +15961,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" +[[package]] +name = "windows_i686_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" + [[package]] name = "windows_i686_msvc" version = "0.48.5" @@ -15340,6 +15979,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" +[[package]] +name = "windows_x86_64_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" + [[package]] name = "windows_x86_64_gnu" version = "0.48.5" @@ -15352,6 +15997,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" @@ -15364,6 +16015,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" +[[package]] +name = "windows_x86_64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" + [[package]] name = "windows_x86_64_msvc" version = "0.48.5" @@ -15429,7 +16086,7 @@ version = "0.39.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.11.1", ] [[package]] @@ -15470,7 +16127,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" dependencies = [ "anyhow", - "bitflags 2.9.1", + "bitflags 2.11.1", "indexmap 2.13.0", "log", "serde", diff --git a/Cargo.toml b/Cargo.toml index aa3d3d0c6c..bc3e5bcc81 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -110,6 +110,7 @@ arrow-schema = { version = "57.3", features = ["serde"] } async-stream = "0.3" async-trait = "0.1" # Remember to update axum-extra, axum-macros when updating axum +arrow_object_store = { package = "object_store", version = "0.13.2" } axum = "0.8" axum-extra = "0.10" axum-macros = "0.5" @@ -141,6 +142,7 @@ datafusion-physical-expr = "=52.1" datafusion-physical-plan = "=52.1" datafusion-sql = "=52.1" datafusion-substrait = "=52.1" +datafusion_object_store = { package = "object_store", version = "0.12.5" } deadpool = "0.12" deadpool-postgres = "0.14" derive_builder = "0.20" @@ -174,7 +176,7 @@ nalgebra = "0.33" nix = { version = "0.30.1", default-features = false, features = ["event", "fs", "process"] } notify = "8.0" num_cpus = "1.16" -object_store_opendal = "0.54" +object_store_opendal = { git = "https://github.com/apache/opendal.git", tag = "v0.56.0-rc.2" } once_cell = "1.18" opentelemetry-proto = { version = "0.31", features = [ "gen-tonic", @@ -201,14 +203,17 @@ rand = "0.9" ratelimit = "0.10" regex = "1.12" regex-automata = "0.4" -reqwest = { version = "0.12", default-features = false, features = [ +reqwest = { version = "0.13", default-features = false, features = [ + "form", "json", - "rustls-tls-native-roots", + "query", + "rustls", "stream", "multipart", ] } url = "2.3" # Branch: feat/request-timeout +hostname = "0.4.0" rskafka = { git = "https://github.com/GreptimeTeam/rskafka.git", rev = "f5688f83e7da591cda3f2674c2408b4c0ed4ed50", features = [ "transport-tls", ] } @@ -216,8 +221,6 @@ rstest = "0.25" rstest_reuse = "0.7" rust_decimal = "1.33" rustc-hash = "2.0" -# It is worth noting that we should try to avoid using aws-lc-rs until it can be compiled on various platforms. -hostname = "0.4.0" rustls = { version = "0.23.25", default-features = false } sea-query = "0.32" serde = { version = "1.0", features = ["derive"] } @@ -232,7 +235,8 @@ sqlx = { version = "0.8", default-features = false, features = [ "any", "macros", "json", - "runtime-tokio-rustls", + "runtime-tokio", + "tls-rustls-aws-lc-rs", "rust_decimal", ] } strum = { version = "0.27", features = ["derive"] } @@ -244,7 +248,7 @@ tokio-rustls = { version = "0.26.2", default-features = false } tokio-stream = "0.1" tokio-util = { version = "0.7", features = ["io-util", "compat"] } toml = "0.8.8" -tonic = { version = "0.14", features = ["tls-ring", "gzip", "zstd"] } +tonic = { version = "0.14", features = ["tls-aws-lc", "gzip", "zstd"] } tower = "0.5" tower-http = "0.6" tracing = "0.1" diff --git a/src/cli/src/data/snapshot_storage.rs b/src/cli/src/data/snapshot_storage.rs index 6bc71153df..be94b197df 100644 --- a/src/cli/src/data/snapshot_storage.rs +++ b/src/cli/src/data/snapshot_storage.rs @@ -486,7 +486,8 @@ impl SnapshotStorage for OpenDalStorage { async fn delete_snapshot(&self) -> Result<()> { self.object_store - .remove_all("/") + .delete_with("/") + .recursive(true) .await .context(StorageOperationSnafu { operation: "delete snapshot", diff --git a/src/cli/src/metadata/snapshot.rs b/src/cli/src/metadata/snapshot.rs index 648d3a687d..59b2599ad9 100644 --- a/src/cli/src/metadata/snapshot.rs +++ b/src/cli/src/metadata/snapshot.rs @@ -16,7 +16,7 @@ use async_trait::async_trait; use clap::{Parser, Subcommand}; use common_error::ext::BoxedError; use common_meta::snapshot::MetadataSnapshotManager; -use object_store::{ObjectStore, Scheme}; +use object_store::{ObjectStore, services}; use crate::Tool; use crate::common::{ObjectStoreConfig, StoreConfig, new_fs_object_store}; @@ -276,7 +276,7 @@ fn build_object_store_and_resolve_file_path( None => new_fs_object_store(fs_root)?, }; - let file_path = if matches!(object_store.info().scheme(), Scheme::Fs) { + let file_path = if object_store.info().scheme() == services::FS_SCHEME { resolve_relative_path_with_current_dir(file_path).map_err(BoxedError::new)? } else { file_path.to_string() diff --git a/src/cmd/src/bin/greptime.rs b/src/cmd/src/bin/greptime.rs index 9c48d1fd6a..8c791bddcf 100644 --- a/src/cmd/src/bin/greptime.rs +++ b/src/cmd/src/bin/greptime.rs @@ -22,7 +22,7 @@ use cmd::options::GlobalOptions; use cmd::{App, cli, datanode, flownode, frontend, metasrv, standalone}; use common_base::Plugins; use common_version::{product_name, verbose_version, version}; -use servers::install_ring_crypto_provider; +use servers::install_default_crypto_provider; #[derive(Parser)] #[command(name = product_name(), author, version, long_version = verbose_version(), about)] @@ -98,7 +98,7 @@ async fn main() -> Result<()> { async fn main_body() -> Result<()> { setup_human_panic(); - install_ring_crypto_provider().map_err(|msg| InitTlsProviderSnafu { msg }.build())?; + install_default_crypto_provider().map_err(|msg| InitTlsProviderSnafu { msg }.build())?; start(Command::parse()).await } diff --git a/src/common/datasource/Cargo.toml b/src/common/datasource/Cargo.toml index ae81c6ba98..7df293d6d3 100644 --- a/src/common/datasource/Cargo.toml +++ b/src/common/datasource/Cargo.toml @@ -32,7 +32,6 @@ datatypes.workspace = true futures.workspace = true lazy_static.workspace = true object-store.workspace = true -object_store_opendal.workspace = true orc-rust = { version = "0.7", default-features = false, features = ["async"] } parquet.workspace = true paste.workspace = true diff --git a/src/common/datasource/src/file_format.rs b/src/common/datasource/src/file_format.rs index 7f4a7c65b4..a6a358c9e4 100644 --- a/src/common/datasource/src/file_format.rs +++ b/src/common/datasource/src/file_format.rs @@ -42,7 +42,6 @@ use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use futures::{StreamExt, TryStreamExt}; use object_store::ObjectStore; -use object_store_opendal::OpendalStore; use snafu::ResultExt; use tokio::io::AsyncWriteExt; use tokio_util::compat::FuturesAsyncWriteCompatExt; @@ -317,7 +316,7 @@ pub async fn file_to_stream( .with_file_compression_type(df_compression) .build(); - let store = Arc::new(OpendalStore::new(store.clone())); + let store = Arc::new(object_store::compat::OpendalStore::new(store.clone())); let file_opener = config.file_source().create_file_opener(store, &config, 0)?; let stream = FileStream::new(&config, 0, file_opener, &ExecutionPlanMetricsSet::new())?; diff --git a/src/common/datasource/src/file_format/tests.rs b/src/common/datasource/src/file_format/tests.rs index 75d74b53cd..6ef669ab7b 100644 --- a/src/common/datasource/src/file_format/tests.rs +++ b/src/common/datasource/src/file_format/tests.rs @@ -44,7 +44,7 @@ struct Test<'a> { impl Test<'_> { async fn run(self, store: &ObjectStore) { - let store = Arc::new(object_store_opendal::OpendalStore::new(store.clone())); + let store = Arc::new(object_store::compat::OpendalStore::new(store.clone())); let file_opener = self .file_source .create_file_opener(store, &self.config, 0) diff --git a/src/common/datasource/src/test_util.rs b/src/common/datasource/src/test_util.rs index ea2b0c768c..0a13d9c6e8 100644 --- a/src/common/datasource/src/test_util.rs +++ b/src/common/datasource/src/test_util.rs @@ -103,7 +103,7 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi test_util::TEST_BATCH_SIZE, schema.clone(), FileCompressionType::UNCOMPRESSED, - Arc::new(object_store_opendal::OpendalStore::new(store.clone())), + Arc::new(object_store::compat::OpendalStore::new(store.clone())), true, ); @@ -157,7 +157,7 @@ pub async fn setup_stream_to_csv_test( let csv_opener = csv_source .create_file_opener( - Arc::new(object_store_opendal::OpendalStore::new(store.clone())), + Arc::new(object_store::compat::OpendalStore::new(store.clone())), &config, 0, ) diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 136bc268f0..5134c40a80 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -15,7 +15,7 @@ pg_kvbackend = [ "dep:rustls-native-certs", "dep:rustls", ] -mysql_kvbackend = ["dep:sqlx"] +mysql_kvbackend = ["dep:sqlx", "dep:rustls"] enterprise = ["prost-types"] [lints] @@ -67,7 +67,12 @@ prost-types = { workspace = true, optional = true } rand.workspace = true regex.workspace = true rskafka.workspace = true -rustls = { workspace = true, default-features = false, features = ["ring", "logging", "std", "tls12"], optional = true } +rustls = { workspace = true, default-features = false, features = [ + "aws_lc_rs", + "logging", + "std", + "tls12", +], optional = true } rustls-native-certs = { version = "0.7", optional = true } rustls-pemfile = { version = "2.0", optional = true } serde.workspace = true diff --git a/src/common/meta/src/kv_backend/rds.rs b/src/common/meta/src/kv_backend/rds.rs index fd88496bc3..1e94741dd8 100644 --- a/src/common/meta/src/kv_backend/rds.rs +++ b/src/common/meta/src/kv_backend/rds.rs @@ -15,12 +15,14 @@ use std::any::Any; use std::collections::HashMap; use std::marker::PhantomData; +#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] +use std::sync::OnceLock; use std::time::Duration; use backon::{BackoffBuilder, ExponentialBuilder}; -use common_telemetry::debug; +use common_telemetry::{debug, info}; -use crate::error::{Error, RdsTransactionRetryFailedSnafu, Result}; +use crate::error::{Error, RdsTransactionRetryFailedSnafu, Result, UnexpectedSnafu}; use crate::kv_backend::txn::{ Compare, Txn as KvTxn, TxnOp, TxnOpResponse, TxnResponse as KvTxnResponse, }; @@ -51,6 +53,38 @@ pub use mysql::MySqlStore; const RDS_STORE_TXN_RETRY_COUNT: usize = 3; +#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] +static RUSTLS_CRYPTO_PROVIDER_INIT: OnceLock> = OnceLock::new(); + +#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] +pub(crate) fn ensure_rustls_crypto_provider_installed() -> Result<()> { + RUSTLS_CRYPTO_PROVIDER_INIT + .get_or_init(|| { + if rustls::crypto::CryptoProvider::get_default().is_some() { + return Ok(()); + } + + match rustls::crypto::CryptoProvider::install_default( + rustls::crypto::aws_lc_rs::default_provider(), + ) { + Ok(()) => Ok(()), + Err(_provider) if rustls::crypto::CryptoProvider::get_default().is_some() => { + Ok(()) + } + Err(provider) => Err(format!( + "Failed to install rustls CryptoProvider, existing default: {:?}, attempted provider: {:?}", + rustls::crypto::CryptoProvider::get_default(), + provider + )), + } + }) + .clone() + .map_err(|err_msg| { + info!("Failed to install rustls crypto provider: {err_msg}"); + UnexpectedSnafu { err_msg }.build() + }) +} + /// Query executor for rds. It can execute queries or generate a transaction executor. #[async_trait::async_trait] pub trait Executor: Send + Sync { diff --git a/src/common/meta/src/kv_backend/rds/mysql.rs b/src/common/meta/src/kv_backend/rds/mysql.rs index ea8bab4704..6791d58f93 100644 --- a/src/common/meta/src/kv_backend/rds/mysql.rs +++ b/src/common/meta/src/kv_backend/rds/mysql.rs @@ -32,6 +32,7 @@ use crate::kv_backend::rds::{ Executor, ExecutorFactory, ExecutorImpl, KvQueryExecutor, RDS_STORE_OP_BATCH_DELETE, RDS_STORE_OP_BATCH_GET, RDS_STORE_OP_BATCH_PUT, RDS_STORE_OP_RANGE_DELETE, RDS_STORE_OP_RANGE_QUERY, RDS_STORE_TXN_RETRY_COUNT, RdsStore, Transaction, + ensure_rustls_crypto_provider_installed, }; use crate::rpc::KeyValue; use crate::rpc::store::{ @@ -620,6 +621,7 @@ impl MySqlStore { /// Create [MySqlStore] impl of [KvBackendRef] from url. pub async fn with_url(url: &str, table_name: &str, max_txn_ops: usize) -> Result { + ensure_rustls_crypto_provider_installed()?; let pool = MySqlPool::connect(url) .await .context(CreateMySqlPoolSnafu)?; @@ -687,6 +689,7 @@ mod tests { if endpoints.is_empty() { return None; } + ensure_rustls_crypto_provider_installed().unwrap(); Some(MySqlPool::connect(&endpoints).await.unwrap()) } @@ -984,6 +987,7 @@ mod tests { async fn test_mysql_with_tls() { common_telemetry::init_default_ut_logging(); maybe_skip_mysql_integration_test!(); + ensure_rustls_crypto_provider_installed().unwrap(); let endpoint = std::env::var("GT_MYSQL_ENDPOINTS").unwrap(); let opts = endpoint @@ -998,6 +1002,7 @@ mod tests { async fn test_mysql_with_mtls() { common_telemetry::init_default_ut_logging(); maybe_skip_mysql_integration_test!(); + ensure_rustls_crypto_provider_installed().unwrap(); let endpoint = std::env::var("GT_MYSQL_ENDPOINTS").unwrap(); let certs_dir = test_certs_dir(); @@ -1015,6 +1020,7 @@ mod tests { async fn test_mysql_with_tls_verify_ca() { common_telemetry::init_default_ut_logging(); maybe_skip_mysql_integration_test!(); + ensure_rustls_crypto_provider_installed().unwrap(); let endpoint = std::env::var("GT_MYSQL_ENDPOINTS").unwrap(); let certs_dir = test_certs_dir(); @@ -1033,6 +1039,7 @@ mod tests { async fn test_mysql_with_tls_verify_ident() { common_telemetry::init_default_ut_logging(); maybe_skip_mysql_integration_test!(); + ensure_rustls_crypto_provider_installed().unwrap(); let endpoint = std::env::var("GT_MYSQL_ENDPOINTS").unwrap(); let certs_dir = test_certs_dir(); diff --git a/src/common/meta/src/kv_backend/rds/postgres.rs b/src/common/meta/src/kv_backend/rds/postgres.rs index b8d0cd4a6d..4a2f8b1356 100644 --- a/src/common/meta/src/kv_backend/rds/postgres.rs +++ b/src/common/meta/src/kv_backend/rds/postgres.rs @@ -41,6 +41,7 @@ use crate::kv_backend::rds::{ Executor, ExecutorFactory, ExecutorImpl, KvQueryExecutor, RDS_STORE_OP_BATCH_DELETE, RDS_STORE_OP_BATCH_GET, RDS_STORE_OP_BATCH_PUT, RDS_STORE_OP_RANGE_DELETE, RDS_STORE_OP_RANGE_QUERY, RDS_STORE_TXN_RETRY_COUNT, RdsStore, Transaction, + ensure_rustls_crypto_provider_installed, }; use crate::rpc::KeyValue; use crate::rpc::store::{ @@ -422,6 +423,7 @@ pub fn create_postgres_tls_connector(tls_config: &TlsOption) -> Result { @@ -516,7 +518,7 @@ impl ServerCertVerifier for AcceptAnyVerifier { fn supported_verify_schemes(&self) -> Vec { // Support all signature schemes - rustls::crypto::ring::default_provider() + rustls::crypto::aws_lc_rs::default_provider() .signature_verification_algorithms .supported_schemes() } @@ -544,7 +546,7 @@ impl ServerCertVerifier for NoHostnameVerification { &self.roots, intermediates, now, - rustls::crypto::ring::default_provider() + rustls::crypto::aws_lc_rs::default_provider() .signature_verification_algorithms .all, )?; @@ -562,7 +564,7 @@ impl ServerCertVerifier for NoHostnameVerification { message, cert, dss, - &rustls::crypto::ring::default_provider().signature_verification_algorithms, + &rustls::crypto::aws_lc_rs::default_provider().signature_verification_algorithms, ) } @@ -576,13 +578,13 @@ impl ServerCertVerifier for NoHostnameVerification { message, cert, dss, - &rustls::crypto::ring::default_provider().signature_verification_algorithms, + &rustls::crypto::aws_lc_rs::default_provider().signature_verification_algorithms, ) } fn supported_verify_schemes(&self) -> Vec { // Support all signature schemes - rustls::crypto::ring::default_provider() + rustls::crypto::aws_lc_rs::default_provider() .signature_verification_algorithms .supported_schemes() } diff --git a/src/common/wal/Cargo.toml b/src/common/wal/Cargo.toml index 8332f2341f..b9ca39efca 100644 --- a/src/common/wal/Cargo.toml +++ b/src/common/wal/Cargo.toml @@ -19,7 +19,7 @@ futures-util.workspace = true humantime-serde.workspace = true num_cpus.workspace = true rskafka.workspace = true -rustls = { workspace = true, default-features = false, features = ["ring", "logging", "std", "tls12"] } +rustls = { workspace = true, default-features = false, features = ["aws_lc_rs", "logging", "std", "tls12"] } rustls-native-certs = "0.7" rustls-pemfile = "2.1" serde.workspace = true diff --git a/src/file-engine/Cargo.toml b/src/file-engine/Cargo.toml index 1ef7f8ea52..8d69512fa1 100644 --- a/src/file-engine/Cargo.toml +++ b/src/file-engine/Cargo.toml @@ -30,7 +30,6 @@ datafusion-orc.workspace = true datatypes.workspace = true futures.workspace = true object-store.workspace = true -object_store_opendal.workspace = true serde = { version = "1.0", features = ["derive"] } serde_json.workspace = true snafu.workspace = true diff --git a/src/file-engine/src/query/file_stream.rs b/src/file-engine/src/query/file_stream.rs index 73453589dc..931dfabe62 100644 --- a/src/file-engine/src/query/file_stream.rs +++ b/src/file-engine/src/query/file_stream.rs @@ -61,7 +61,7 @@ fn build_record_batch_stream( .with_file_group(FileGroup::new(files)) .build(); - let store = Arc::new(object_store_opendal::OpendalStore::new( + let store = Arc::new(object_store::compat::OpendalStore::new( scan_plan_config.store.clone(), )); diff --git a/src/metric-engine/src/engine/flush.rs b/src/metric-engine/src/engine/flush.rs index 8c0f33aaf3..47120555ff 100644 --- a/src/metric-engine/src/engine/flush.rs +++ b/src/metric-engine/src/engine/flush.rs @@ -151,6 +151,8 @@ ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, .map(|mut e| { let i = e.file_path.rfind('/').unwrap(); e.file_path.replace_range(i..(i + 37), "/"); + e.file_size = None; + e.last_modified_ms = None; format!("\n{:?}", e) }) .sorted() diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 33180ebf46..b6adb6eb59 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -499,7 +499,7 @@ impl AccessLayer { let file_size = if file_size == 0 { None } else { Some(file_size) }; let last_modified_ms = metadata .last_modified() - .map(|ts| Timestamp::new_millisecond(ts.timestamp_millis())); + .map(|ts| Timestamp::new_millisecond(ts.into_inner().as_millisecond())); let entry = StorageSstEntry { file_path: path.to_string(), diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 5c2bd4fd4e..7b8def8207 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -970,6 +970,8 @@ async fn test_list_ssts_with_format( .map(|mut e| { let i = e.file_path.rfind('/').unwrap(); e.file_path.replace_range(i..(i + 37), "/"); + e.file_size = None; + e.last_modified_ms = None; format!("\n{:?}", e) }) .sorted() diff --git a/src/mito2/src/engine/staging_test.rs b/src/mito2/src/engine/staging_test.rs index 9846933d1f..1b5a6538d8 100644 --- a/src/mito2/src/engine/staging_test.rs +++ b/src/mito2/src/engine/staging_test.rs @@ -26,8 +26,8 @@ use common_recordbatch::RecordBatches; use datatypes::value::Value; use object_store::Buffer; use object_store::layers::mock::{ - Entry, Error as MockError, ErrorKind, List, Lister, Metadata, MockLayerBuilder, - Result as MockResult, Write, Writer, + Delete, Deleter, Entry, Error as MockError, ErrorKind, List, Lister, Metadata, + MockLayerBuilder, OpDelete, Result as MockResult, Write, Writer, }; use partition::expr::{PartitionExpr, col}; use store_api::region_engine::{ @@ -1152,6 +1152,23 @@ impl Write for MockWriter { } } +struct MockDeleter { + inner: Deleter, +} + +impl Delete for MockDeleter { + async fn delete(&mut self, path: &str, args: OpDelete) -> MockResult<()> { + if path.contains("staging") { + return Err(MockError::new(ErrorKind::Unexpected, "mock error")); + } + self.inner.delete(path, args).await + } + + async fn close(&mut self) -> MockResult<()> { + self.inner.close().await + } +} + async fn test_enter_staging_error(env: &mut TestEnv, flat_format: bool) { let partition_expr = default_partition_expr(); let engine = env @@ -1201,6 +1218,7 @@ async fn test_enter_staging_clean_staging_manifest_error_with_format(flat_format inner: lister, }) })) + .deleter_factory(Arc::new(|deleter| Box::new(MockDeleter { inner: deleter }))) .build() .unwrap(); let mut env = TestEnv::new().await.with_mock_layer(mock_layer); diff --git a/src/mito2/src/manifest/storage/staging.rs b/src/mito2/src/manifest/storage/staging.rs index 95ad00fb8c..dd0105a091 100644 --- a/src/mito2/src/manifest/storage/staging.rs +++ b/src/mito2/src/manifest/storage/staging.rs @@ -191,13 +191,15 @@ impl StagingStorage { pub(crate) async fn clear(&self) -> Result<()> { self.delta_storage .object_store() - .remove_all(self.delta_storage.path()) + .delete_with(self.delta_storage.path()) + .recursive(true) .await .context(OpenDalSnafu)?; self.blob_storage .object_store() - .remove_all(self.blob_storage.path()) + .delete_with(self.blob_storage.path()) + .recursive(true) .await .context(OpenDalSnafu)?; diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index 89f53e5627..64b4785eae 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -495,23 +495,23 @@ async fn test_checkpoint_bypass_in_staging_mode() { assert_eq!(last_version, 16); } -/// A deleter that fails on `flush`, simulating the S3 batch-delete failure +/// A deleter that fails on `close`, simulating the S3 batch-delete failure /// described in issue #7986. struct FailingDeleter { inner: oio::Deleter, - flush_calls: Arc, + close_calls: Arc, } impl oio::Delete for FailingDeleter { - fn delete(&mut self, path: &str, args: OpDelete) -> MockResult<()> { - self.inner.delete(path, args) + async fn delete(&mut self, path: &str, args: OpDelete) -> MockResult<()> { + self.inner.delete(path, args).await } - async fn flush(&mut self) -> MockResult { - self.flush_calls.fetch_add(1, Ordering::Relaxed); + async fn close(&mut self) -> MockResult<()> { + self.close_calls.fetch_add(1, Ordering::Relaxed); Err(MockError::new( ErrorKind::Unexpected, - "mock manifest delete flush failure", + "mock manifest delete close failure", )) } } @@ -520,13 +520,13 @@ impl oio::Delete for FailingDeleter { async fn checkpoint_advances_and_recovery_works_when_delete_fails() { common_telemetry::init_default_ut_logging(); - let flush_calls = Arc::new(AtomicUsize::new(0)); - let factory_flush_calls = flush_calls.clone(); + let close_calls = Arc::new(AtomicUsize::new(0)); + let factory_close_calls = close_calls.clone(); let mock_layer = MockLayerBuilder::default() .deleter_factory(Arc::new(move |inner| { Box::new(FailingDeleter { inner, - flush_calls: factory_flush_calls.clone(), + close_calls: factory_close_calls.clone(), }) })) .build() @@ -548,7 +548,7 @@ async fn checkpoint_advances_and_recovery_works_when_delete_fails() { } // The checkpointer must have attempted to delete stale files at least once. - assert!(flush_calls.load(Ordering::Relaxed) > 0); + assert!(close_calls.load(Ordering::Relaxed) > 0); // Despite delete failures, the in-memory checkpoint marker advances so // subsequent `maybe_do_checkpoint` calls compute correct ranges. diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index c1240c3829..ffdaba9154 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -1007,7 +1007,7 @@ async fn preload_parquet_meta_cache_for_files( return 0; } - let allow_direct_load = matches!(object_store.info().scheme(), object_store::Scheme::Fs); + let allow_direct_load = object_store.info().scheme() == object_store::services::FS_SCHEME; // Sort by time range so we can prefer preloading newer files first. files.sort_by_key(|b| std::cmp::Reverse(b.meta_ref().time_range.1)); diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index 7c1598787e..b4ee3801bf 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -70,20 +70,14 @@ impl fmt::Debug for LocalFilePurger { #[cfg(not(debug_assertions))] /// Whether to enable GC for the file purger. -pub fn should_enable_gc( - global_gc_enabled: bool, - object_store_scheme: object_store::Scheme, -) -> bool { - global_gc_enabled && object_store_scheme != object_store::Scheme::Fs +pub fn should_enable_gc(global_gc_enabled: bool, object_store_scheme: &'static str) -> bool { + global_gc_enabled && object_store_scheme != object_store::services::FS_SCHEME } #[cfg(debug_assertions)] /// For debug build, we may use Fs as the object store scheme, /// so we need to enable GC for local file system. -pub fn should_enable_gc( - global_gc_enabled: bool, - _object_store_scheme: object_store::Scheme, -) -> bool { +pub fn should_enable_gc(global_gc_enabled: bool, _object_store_scheme: &'static str) -> bool { global_gc_enabled } diff --git a/src/mito2/src/sst/index/store.rs b/src/mito2/src/sst/index/store.rs index 1662c6d876..62a5981298 100644 --- a/src/mito2/src/sst/index/store.rs +++ b/src/mito2/src/sst/index/store.rs @@ -142,10 +142,11 @@ impl InstrumentedStore { Ok(list) } - /// Proxies to [`ObjectStore::remove_all`]. + /// Recursively deletes all objects under the given path. pub async fn remove_all(&self, path: &str) -> Result<()> { self.object_store - .remove_all(path) + .delete_with(path) + .recursive(true) .await .context(OpenDalSnafu) } diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index 5d7149768c..00a96b0d16 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -290,7 +290,8 @@ pub(crate) async fn remove_region_dir_once( .context(OpenDalSnafu)?; // then remove the marker with this dir object_store - .remove_all(region_path) + .delete_with(region_path) + .recursive(true) .await .context(OpenDalSnafu)?; Ok(true) diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index 7247c5892c..65a3688024 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -12,16 +12,19 @@ services-memory = ["opendal/services-memory"] testing = ["derive_builder"] [dependencies] +async-trait.workspace = true bytes.workspace = true +chrono.workspace = true common-base.workspace = true common-error.workspace = true common-macro.workspace = true common-telemetry.workspace = true +datafusion_object_store.workspace = true derive_builder = { workspace = true, optional = true } futures.workspace = true humantime-serde.workspace = true lazy_static.workspace = true -opendal = { version = "0.54", features = [ +opendal = { git = "https://github.com/apache/opendal.git", tag = "v0.56.0-rc.2", features = [ "layers-tracing", "layers-prometheus", "services-azblob", @@ -40,6 +43,10 @@ uuid.workspace = true [dev-dependencies] anyhow = "1.0" +arrow_object_store.workspace = true common-telemetry.workspace = true common-test-util.workspace = true +object_store_opendal.workspace = true +rand.workspace = true +tempfile.workspace = true tokio.workspace = true diff --git a/src/object-store/src/compat.rs b/src/object-store/src/compat.rs new file mode 100644 index 0000000000..3e46355bd1 --- /dev/null +++ b/src/object-store/src/compat.rs @@ -0,0 +1,1053 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::fmt::{self, Debug, Display, Formatter}; +use std::future::IntoFuture; +use std::io; +use std::ops::Range; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use datafusion_object_store::path::Path; +use datafusion_object_store::{ + Attribute, Attributes, GetOptions, GetRange, GetResult, GetResultPayload, ListResult, + MultipartUpload, ObjectMeta, ObjectStore as ArrowObjectStore, PutMode, PutMultipartOptions, + PutOptions, PutPayload, PutResult, UploadPart, +}; +use futures::stream::BoxStream; +use futures::{FutureExt, StreamExt, TryStreamExt}; +use opendal::options::CopyOptions; +use opendal::raw::percent_decode_path; +use opendal::{Buffer, Operator, OperatorInfo, Writer}; +use tokio::sync::{Mutex, oneshot}; + +/// OpendalStore implements ObjectStore trait by using opendal. +/// +/// This allows users to use opendal as an object store without extra cost. +/// +/// Visit [`opendal::services`] for more information about supported services. +/// +/// ```no_run +/// use std::sync::Arc; +/// +/// use bytes::Bytes; +/// use object_store::path::Path; +/// use object_store::ObjectStore; +/// use object_store_opendal::OpendalStore; +/// use opendal::services::S3; +/// use opendal::{Builder, Operator}; +/// +/// #[tokio::main] +/// async fn main() { +/// let builder = S3::default() +/// .access_key_id("my_access_key") +/// .secret_access_key("my_secret_key") +/// .endpoint("my_endpoint") +/// .region("my_region"); +/// +/// // Create a new operator +/// let operator = Operator::new(builder).unwrap().finish(); +/// +/// // Create a new object store +/// let object_store = Arc::new(OpendalStore::new(operator)); +/// +/// let path = Path::from("data/nested/test.txt"); +/// let bytes = Bytes::from_static(b"hello, world! I am nested."); +/// +/// object_store.put(&path, bytes.clone().into()).await.unwrap(); +/// +/// let content = object_store +/// .get(&path) +/// .await +/// .unwrap() +/// .bytes() +/// .await +/// .unwrap(); +/// +/// assert_eq!(content, bytes); +/// } +/// ``` +#[derive(Clone)] +pub struct OpendalStore { + info: Arc, + inner: Operator, +} + +impl OpendalStore { + /// Create OpendalStore by given Operator. + pub fn new(op: Operator) -> Self { + Self { + info: op.info().into(), + inner: op, + } + } + + /// Get the Operator info. + pub fn info(&self) -> &OperatorInfo { + self.info.as_ref() + } + + /// Copy a file from one location to another. + async fn copy_request( + &self, + from: &Path, + to: &Path, + if_not_exists: bool, + ) -> datafusion_object_store::Result<()> { + let mut copy_options = CopyOptions::default(); + if if_not_exists { + copy_options.if_not_exists = true; + } + + // Perform the copy operation + self.inner + .copy_options( + &percent_decode_path(from.as_ref()), + &percent_decode_path(to.as_ref()), + copy_options, + ) + .await + .map_err(|err| { + if if_not_exists && err.kind() == opendal::ErrorKind::AlreadyExists { + datafusion_object_store::Error::AlreadyExists { + path: to.to_string(), + source: Box::new(err), + } + } else { + format_object_store_error(err, from.as_ref()) + } + })?; + + Ok(()) + } +} + +impl Debug for OpendalStore { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("OpendalStore") + .field("scheme", &self.info.scheme()) + .field("name", &self.info.name()) + .field("root", &self.info.root()) + .field("capability", &self.info.full_capability()) + .finish() + } +} + +impl Display for OpendalStore { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + let info = self.inner.info(); + write!( + f, + "Opendal({}, bucket={}, root={})", + info.scheme(), + info.name(), + info.root() + ) + } +} + +impl From for OpendalStore { + fn from(value: Operator) -> Self { + Self::new(value) + } +} + +#[async_trait] +impl ArrowObjectStore for OpendalStore { + async fn put_opts( + &self, + location: &Path, + bytes: PutPayload, + opts: PutOptions, + ) -> datafusion_object_store::Result { + let decoded_location = percent_decode_path(location.as_ref()); + let mut future_write = self + .inner + .write_with(&decoded_location, Buffer::from_iter(bytes)); + let opts_mode = opts.mode.clone(); + match opts.mode { + PutMode::Overwrite => {} + PutMode::Create => { + future_write = future_write.if_not_exists(true); + } + PutMode::Update(update_version) => { + let Some(etag) = update_version.e_tag else { + return Err(datafusion_object_store::Error::NotSupported { + source: Box::new(opendal::Error::new( + opendal::ErrorKind::Unsupported, + "etag is required for conditional put", + )), + }); + }; + future_write = future_write.if_match(etag.as_str()); + } + } + let rp = future_write.await.map_err(|err| { + match format_object_store_error(err, location.as_ref()) { + datafusion_object_store::Error::Precondition { path, source } + if opts_mode == PutMode::Create => + { + datafusion_object_store::Error::AlreadyExists { path, source } + } + e => e, + } + })?; + + let e_tag = rp.etag().map(|s| s.to_string()); + let version = rp.version().map(|s| s.to_string()); + + Ok(PutResult { e_tag, version }) + } + + async fn put_multipart( + &self, + location: &Path, + ) -> datafusion_object_store::Result> { + let decoded_location = percent_decode_path(location.as_ref()); + let writer = self + .inner + .writer_with(&decoded_location) + .concurrent(8) + .await + .map_err(|err| format_object_store_error(err, location.as_ref()))?; + let upload = OpendalMultipartUpload::new(writer, location.clone()); + + Ok(Box::new(upload)) + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> datafusion_object_store::Result> { + const DEFAULT_CONCURRENT: usize = 8; + + let mut options = opendal::options::WriteOptions { + concurrent: DEFAULT_CONCURRENT, + ..Default::default() + }; + + let mut user_metadata = HashMap::new(); + + for (key, value) in opts.attributes.iter() { + match key { + Attribute::CacheControl => { + options.cache_control = Some(value.to_string()); + } + Attribute::ContentDisposition => { + options.content_disposition = Some(value.to_string()); + } + Attribute::ContentEncoding => { + options.content_encoding = Some(value.to_string()); + } + Attribute::ContentLanguage => continue, + Attribute::ContentType => { + options.content_type = Some(value.to_string()); + } + Attribute::Metadata(k) => { + user_metadata.insert(k.to_string(), value.to_string()); + } + _ => {} + } + } + + if !user_metadata.is_empty() { + options.user_metadata = Some(user_metadata); + } + + let decoded_location = percent_decode_path(location.as_ref()); + let writer = self + .inner + .writer_options(&decoded_location, options) + .await + .map_err(|err| format_object_store_error(err, location.as_ref()))?; + let upload = OpendalMultipartUpload::new(writer, location.clone()); + + Ok(Box::new(upload)) + } + + async fn get_opts( + &self, + location: &Path, + options: GetOptions, + ) -> datafusion_object_store::Result { + let raw_location = percent_decode_path(location.as_ref()); + let meta = { + let mut s = self.inner.stat_with(&raw_location); + if let Some(version) = &options.version { + s = s.version(version.as_str()) + } + if let Some(if_match) = &options.if_match { + s = s.if_match(if_match.as_str()); + } + if let Some(if_none_match) = &options.if_none_match { + s = s.if_none_match(if_none_match.as_str()); + } + if let Some(if_modified_since) = + options.if_modified_since.and_then(datetime_to_timestamp) + { + s = s.if_modified_since(if_modified_since); + } + if let Some(if_unmodified_since) = + options.if_unmodified_since.and_then(datetime_to_timestamp) + { + s = s.if_unmodified_since(if_unmodified_since); + } + s.await + .map_err(|err| format_object_store_error(err, location.as_ref()))? + }; + + let mut attributes = Attributes::new(); + if let Some(user_meta) = meta.user_metadata() { + for (key, value) in user_meta { + attributes.insert( + Attribute::Metadata(key.clone().into()), + value.clone().into(), + ); + } + } + + let meta = ObjectMeta { + location: location.clone(), + last_modified: meta + .last_modified() + .and_then(timestamp_to_datetime) + .unwrap_or_default(), + size: meta.content_length(), + e_tag: meta.etag().map(|x| x.to_string()), + version: meta.version().map(|x| x.to_string()), + }; + + if options.head { + return Ok(GetResult { + payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())), + range: 0..0, + meta, + attributes, + }); + } + + let reader = { + let mut r = self.inner.reader_with(raw_location.as_ref()); + if let Some(version) = options.version { + r = r.version(version.as_str()); + } + if let Some(if_match) = options.if_match { + r = r.if_match(if_match.as_str()); + } + if let Some(if_none_match) = options.if_none_match { + r = r.if_none_match(if_none_match.as_str()); + } + if let Some(if_modified_since) = + options.if_modified_since.and_then(datetime_to_timestamp) + { + r = r.if_modified_since(if_modified_since); + } + if let Some(if_unmodified_since) = + options.if_unmodified_since.and_then(datetime_to_timestamp) + { + r = r.if_unmodified_since(if_unmodified_since); + } + r.await + .map_err(|err| format_object_store_error(err, location.as_ref()))? + }; + + let read_range = match options.range { + Some(GetRange::Bounded(r)) => { + if r.start >= r.end || r.start >= meta.size { + 0..0 + } else { + let end = r.end.min(meta.size); + r.start..end + } + } + Some(GetRange::Offset(r)) => { + if r < meta.size { + r..meta.size + } else { + 0..0 + } + } + Some(GetRange::Suffix(r)) if r < meta.size => (meta.size - r)..meta.size, + _ => 0..meta.size, + }; + + let stream = reader + .into_bytes_stream(read_range.start..read_range.end) + .await + .map_err(|err| format_object_store_error(err, location.as_ref()))? + .map_ok(|buf| buf) + .map_err(|err: io::Error| datafusion_object_store::Error::Generic { + store: "IoError", + source: Box::new(err), + }); + + Ok(GetResult { + payload: GetResultPayload::Stream(Box::pin(stream)), + range: read_range.start..read_range.end, + meta, + attributes, + }) + } + + async fn get_range( + &self, + location: &Path, + range: Range, + ) -> datafusion_object_store::Result { + let raw_location = percent_decode_path(location.as_ref()); + let reader = self + .inner + .reader_with(&raw_location) + .await + .map_err(|err| format_object_store_error(err, location.as_ref()))?; + + reader + .read(range.start..range.end) + .await + .map(|buf| buf.to_bytes()) + .map_err(|err| format_object_store_error(err, location.as_ref())) + } + + async fn delete(&self, location: &Path) -> datafusion_object_store::Result<()> { + let decoded_location = percent_decode_path(location.as_ref()); + self.inner + .delete(&decoded_location) + .await + .map_err(|err| format_object_store_error(err, location.as_ref()))?; + + Ok(()) + } + + fn list( + &self, + prefix: Option<&Path>, + ) -> BoxStream<'static, datafusion_object_store::Result> { + // object_store `Path` always removes trailing slash + // need to add it back + let path = prefix.map_or("".into(), |x| { + format!("{}/", percent_decode_path(x.as_ref())) + }); + + let this = self.clone(); + let fut = async move { + let stream = this + .inner + .lister_with(&path) + .recursive(true) + .await + .map_err(|err| format_object_store_error(err, &path))?; + + let stream = stream.then(|res| async { + let entry = res.map_err(|err| format_object_store_error(err, ""))?; + let meta = entry.metadata(); + + Ok(format_object_meta(entry.path(), meta)) + }); + Ok::<_, datafusion_object_store::Error>(stream) + }; + + fut.into_stream().try_flatten().boxed() + } + + fn list_with_offset( + &self, + prefix: Option<&Path>, + offset: &Path, + ) -> BoxStream<'static, datafusion_object_store::Result> { + let path = prefix.map_or("".into(), |x| { + format!("{}/", percent_decode_path(x.as_ref())) + }); + let offset = offset.clone(); + + // clone self for 'static lifetime + // clone self is cheap + let this = self.clone(); + + let fut = async move { + let list_with_start_after = this.inner.info().full_capability().list_with_start_after; + let mut fut = this.inner.lister_with(&path).recursive(true); + + // Use native start_after support if possible. + if list_with_start_after { + fut = fut.start_after(offset.as_ref()); + } + + let lister = fut + .await + .map_err(|err| format_object_store_error(err, &path))? + .then(move |entry| { + let path = path.clone(); + let this = this.clone(); + async move { + let entry = entry.map_err(|err| format_object_store_error(err, &path))?; + let (path, metadata) = entry.into_parts(); + + // If it's a dir or last_modified is present, we can use it directly. + if metadata.is_dir() || metadata.last_modified().is_some() { + let object_meta = format_object_meta(&path, &metadata); + return Ok(object_meta); + } + + let metadata = this + .inner + .stat(&path) + .await + .map_err(|err| format_object_store_error(err, &path))?; + let object_meta = format_object_meta(&path, &metadata); + Ok::<_, datafusion_object_store::Error>(object_meta) + } + }) + .boxed(); + + let stream = if list_with_start_after { + lister + } else { + lister + .try_filter(move |entry| futures::future::ready(entry.location > offset)) + .boxed() + }; + + Ok::<_, datafusion_object_store::Error>(stream) + }; + + fut.into_stream().try_flatten().boxed() + } + + async fn list_with_delimiter( + &self, + prefix: Option<&Path>, + ) -> datafusion_object_store::Result { + let path = prefix.map_or("".into(), |x| { + format!("{}/", percent_decode_path(x.as_ref())) + }); + let mut stream = self + .inner + .lister_with(&path) + .into_future() + .await + .map_err(|err| format_object_store_error(err, &path))?; + + let mut common_prefixes = Vec::new(); + let mut objects = Vec::new(); + + while let Some(res) = stream.next().await { + let entry = res.map_err(|err| format_object_store_error(err, ""))?; + let meta = entry.metadata(); + + if meta.is_dir() { + common_prefixes.push(entry.path().into()); + } else if meta.last_modified().is_some() { + objects.push(format_object_meta(entry.path(), meta)); + } else { + let meta = self + .inner + .stat(entry.path()) + .await + .map_err(|err| format_object_store_error(err, entry.path()))?; + objects.push(format_object_meta(entry.path(), &meta)); + } + } + + Ok(ListResult { + common_prefixes, + objects, + }) + } + + async fn copy(&self, from: &Path, to: &Path) -> datafusion_object_store::Result<()> { + self.copy_request(from, to, false).await + } + + async fn rename(&self, from: &Path, to: &Path) -> datafusion_object_store::Result<()> { + self.inner + .rename( + &percent_decode_path(from.as_ref()), + &percent_decode_path(to.as_ref()), + ) + .await + .map_err(|err| format_object_store_error(err, from.as_ref()))?; + + Ok(()) + } + + async fn copy_if_not_exists( + &self, + from: &Path, + to: &Path, + ) -> datafusion_object_store::Result<()> { + self.copy_request(from, to, true).await + } +} + +/// `MultipartUpload` implementation based on `Writer` in opendal. +/// +/// # Notes +/// +/// OpenDAL writer can handle concurrent internally we don't generate real `UploadPart` like existing +/// implementation do. Instead, we just write the part and notify the next task to be written. +/// +/// The lock here doesn't really involve the write process, it's just for the notify mechanism. +struct OpendalMultipartUpload { + writer: Arc>, + location: Path, + next_notify: oneshot::Receiver<()>, +} + +impl OpendalMultipartUpload { + fn new(writer: Writer, location: Path) -> Self { + // an immediately dropped sender for the first part to write without waiting + let (_, rx) = oneshot::channel(); + + Self { + writer: Arc::new(Mutex::new(writer)), + location, + next_notify: rx, + } + } +} + +#[async_trait] +impl MultipartUpload for OpendalMultipartUpload { + fn put_part(&mut self, data: PutPayload) -> UploadPart { + let writer = self.writer.clone(); + let location = self.location.clone(); + + // Generate next notify which will be notified after the current part is written. + let (tx, rx) = oneshot::channel(); + // Fetch the notify for current part to wait for it to be written. + let last_rx = std::mem::replace(&mut self.next_notify, rx); + + async move { + // Wait for the previous part to be written + let _ = last_rx.await; + + let mut writer = writer.lock().await; + let result = writer + .write(Buffer::from_iter(data)) + .await + .map_err(|err| format_object_store_error(err, location.as_ref())); + + // Notify the next part to be written + drop(tx); + + result + } + .boxed() + } + + async fn complete(&mut self) -> datafusion_object_store::Result { + let mut writer = self.writer.lock().await; + let metadata = writer + .close() + .await + .map_err(|err| format_object_store_error(err, self.location.as_ref()))?; + + let e_tag = metadata.etag().map(|s| s.to_string()); + let version = metadata.version().map(|s| s.to_string()); + + Ok(PutResult { e_tag, version }) + } + + async fn abort(&mut self) -> datafusion_object_store::Result<()> { + let mut writer = self.writer.lock().await; + writer + .abort() + .await + .map_err(|err| format_object_store_error(err, self.location.as_ref())) + } +} + +impl Debug for OpendalMultipartUpload { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("OpendalMultipartUpload") + .field("location", &self.location) + .finish() + } +} + +fn format_object_store_error(err: opendal::Error, path: &str) -> datafusion_object_store::Error { + match err.kind() { + opendal::ErrorKind::NotFound => datafusion_object_store::Error::NotFound { + path: path.to_string(), + source: Box::new(err), + }, + opendal::ErrorKind::Unsupported => datafusion_object_store::Error::NotSupported { + source: Box::new(err), + }, + opendal::ErrorKind::AlreadyExists => datafusion_object_store::Error::AlreadyExists { + path: path.to_string(), + source: Box::new(err), + }, + opendal::ErrorKind::ConditionNotMatch => datafusion_object_store::Error::Precondition { + path: path.to_string(), + source: Box::new(err), + }, + kind => datafusion_object_store::Error::Generic { + store: kind.into_static(), + source: Box::new(err), + }, + } +} + +fn format_object_meta(path: &str, meta: &opendal::Metadata) -> ObjectMeta { + ObjectMeta { + location: path.into(), + last_modified: meta + .last_modified() + .and_then(timestamp_to_datetime) + .unwrap_or_default(), + size: meta.content_length(), + e_tag: meta.etag().map(|x| x.to_string()), + version: meta.version().map(|x| x.to_string()), + } +} + +fn timestamp_to_datetime(ts: opendal::raw::Timestamp) -> Option> { + let ts = ts.into_inner(); + chrono::DateTime::::from_timestamp(ts.as_second(), ts.subsec_nanosecond() as u32) +} + +fn datetime_to_timestamp(dt: chrono::DateTime) -> Option { + opendal::raw::Timestamp::new(dt.timestamp(), dt.timestamp_subsec_nanos() as i32).ok() +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use bytes::Bytes; + use datafusion_object_store::path::Path; + use datafusion_object_store::{ObjectStore as ArrowObjectStore, WriteMultipart}; + use opendal::{Operator, services}; + use rand::{Rng, RngCore}; + + use super::*; + + async fn create_test_object_store() -> Arc { + let op = Operator::new(services::Memory::default()).unwrap().finish(); + let object_store = Arc::new(OpendalStore::new(op)); + + let path: Path = "data/test.txt".into(); + let bytes = Bytes::from_static(b"hello, world!"); + object_store.put(&path, bytes.into()).await.unwrap(); + + let path: Path = "data/nested/test.txt".into(); + let bytes = Bytes::from_static(b"hello, world! I am nested."); + object_store.put(&path, bytes.into()).await.unwrap(); + + object_store + } + + #[tokio::test] + async fn test_basic() { + let op = Operator::new(services::Memory::default()).unwrap().finish(); + let object_store: Arc = Arc::new(OpendalStore::new(op)); + + // Retrieve a specific file + let path: Path = "data/test.txt".into(); + + let bytes = Bytes::from_static(b"hello, world!"); + object_store.put(&path, bytes.clone().into()).await.unwrap(); + + let meta = object_store.head(&path).await.unwrap(); + + assert_eq!(meta.size, 13); + + assert_eq!( + object_store + .get(&path) + .await + .unwrap() + .bytes() + .await + .unwrap(), + bytes + ); + } + + #[tokio::test] + async fn test_put_multipart() { + let op = Operator::new(services::Memory::default()).unwrap().finish(); + let object_store: Arc = Arc::new(OpendalStore::new(op)); + + let mut rng = rand::rng(); + + // Case complete + let path: Path = "data/test_complete.txt".into(); + let upload = object_store.put_multipart(&path).await.unwrap(); + + let mut write = WriteMultipart::new(upload); + + let mut all_bytes = vec![]; + let round = rng.random_range(1..=1024); + for _ in 0..round { + let size = rng.random_range(1..=1024); + let mut bytes = vec![0; size]; + rng.fill_bytes(&mut bytes); + + all_bytes.extend_from_slice(&bytes); + write.put(bytes.into()); + } + + let _ = write.finish().await.unwrap(); + + let meta = object_store.head(&path).await.unwrap(); + + assert_eq!(meta.size, all_bytes.len() as u64); + + assert_eq!( + object_store + .get(&path) + .await + .unwrap() + .bytes() + .await + .unwrap(), + Bytes::from(all_bytes) + ); + + // Case abort + let path: Path = "data/test_abort.txt".into(); + let mut upload = object_store.put_multipart(&path).await.unwrap(); + upload.put_part(vec![1; 1024].into()).await.unwrap(); + upload.abort().await.unwrap(); + + let res = object_store.head(&path).await; + let err = res.unwrap_err(); + + assert!(matches!( + err, + datafusion_object_store::Error::NotFound { .. } + )) + } + + #[tokio::test] + async fn test_list() { + let object_store = create_test_object_store().await; + let path: Path = "data/".into(); + let results = object_store.list(Some(&path)).collect::>().await; + assert_eq!(results.len(), 2); + let mut locations = results + .iter() + .map(|x| x.as_ref().unwrap().location.as_ref()) + .collect::>(); + + let expected_files = vec![ + ( + "data/nested/test.txt", + Bytes::from_static(b"hello, world! I am nested."), + ), + ("data/test.txt", Bytes::from_static(b"hello, world!")), + ]; + + let expected_locations = expected_files.iter().map(|x| x.0).collect::>(); + + locations.sort(); + assert_eq!(locations, expected_locations); + + for (location, bytes) in expected_files { + let path: Path = location.into(); + assert_eq!( + object_store + .get(&path) + .await + .unwrap() + .bytes() + .await + .unwrap(), + bytes + ); + } + } + + #[tokio::test] + async fn test_list_with_delimiter() { + let object_store = create_test_object_store().await; + let path: Path = "data/".into(); + let result = object_store.list_with_delimiter(Some(&path)).await.unwrap(); + assert_eq!(result.objects.len(), 1); + assert_eq!(result.common_prefixes.len(), 1); + assert_eq!(result.objects[0].location.as_ref(), "data/test.txt"); + assert_eq!(result.common_prefixes[0].as_ref(), "data/nested"); + } + + #[tokio::test] + async fn test_list_with_offset() { + let object_store = create_test_object_store().await; + let path: Path = "data/".into(); + let offset: Path = "data/nested/test.txt".into(); + let result = object_store + .list_with_offset(Some(&path), &offset) + .collect::>() + .await; + assert_eq!(result.len(), 1); + assert_eq!( + result[0].as_ref().unwrap().location.as_ref(), + "data/test.txt" + ); + } + + mod stat_counter { + use std::sync::atomic::{AtomicUsize, Ordering}; + + use super::*; + + #[derive(Debug, Clone)] + pub struct StatCounterLayer { + count: Arc, + } + + impl StatCounterLayer { + pub fn new(count: Arc) -> Self { + Self { count } + } + } + + impl opendal::raw::Layer for StatCounterLayer { + type LayeredAccess = StatCounterAccessor; + + fn layer(&self, inner: A) -> Self::LayeredAccess { + StatCounterAccessor { + inner, + count: self.count.clone(), + } + } + } + + #[derive(Debug, Clone)] + pub struct StatCounterAccessor { + inner: A, + count: Arc, + } + + impl opendal::raw::LayeredAccess for StatCounterAccessor { + type Inner = A; + type Reader = A::Reader; + type Writer = A::Writer; + type Lister = A::Lister; + type Deleter = A::Deleter; + + fn inner(&self) -> &Self::Inner { + &self.inner + } + + async fn stat( + &self, + path: &str, + args: opendal::raw::OpStat, + ) -> opendal::Result { + self.count.fetch_add(1, Ordering::SeqCst); + self.inner.stat(path, args).await + } + + async fn read( + &self, + path: &str, + args: opendal::raw::OpRead, + ) -> opendal::Result<(opendal::raw::RpRead, Self::Reader)> { + self.inner.read(path, args).await + } + + async fn write( + &self, + path: &str, + args: opendal::raw::OpWrite, + ) -> opendal::Result<(opendal::raw::RpWrite, Self::Writer)> { + self.inner.write(path, args).await + } + + async fn delete(&self) -> opendal::Result<(opendal::raw::RpDelete, Self::Deleter)> { + self.inner.delete().await + } + + async fn list( + &self, + path: &str, + args: opendal::raw::OpList, + ) -> opendal::Result<(opendal::raw::RpList, Self::Lister)> { + self.inner.list(path, args).await + } + + async fn copy( + &self, + from: &str, + to: &str, + args: opendal::raw::OpCopy, + ) -> opendal::Result { + self.inner.copy(from, to, args).await + } + + async fn rename( + &self, + from: &str, + to: &str, + args: opendal::raw::OpRename, + ) -> opendal::Result { + self.inner.rename(from, to, args).await + } + } + } + + #[tokio::test] + async fn test_get_range_no_stat() { + use std::sync::atomic::{AtomicUsize, Ordering}; + + // Create a stat counter and operator with tracking layer + let stat_count = Arc::new(AtomicUsize::new(0)); + let op = Operator::new(opendal::services::Memory::default()) + .unwrap() + .layer(stat_counter::StatCounterLayer::new(stat_count.clone())) + .finish(); + let store = OpendalStore::new(op); + + // Create a test file + let location = "test_get_range.txt".into(); + let value = Bytes::from_static(b"Hello, world!"); + store.put(&location, value.clone().into()).await.unwrap(); + + // Reset counter after put + stat_count.store(0, Ordering::SeqCst); + + // Test 1: get_range should NOT call stat() + let ret = store.get_range(&location, 0..5).await.unwrap(); + assert_eq!(Bytes::from_static(b"Hello"), ret); + assert_eq!( + stat_count.load(Ordering::SeqCst), + 0, + "get_range should not call stat()" + ); + + // Reset counter + stat_count.store(0, Ordering::SeqCst); + + // Test 2: get_opts SHOULD call stat() to get metadata + let opts = datafusion_object_store::GetOptions { + range: Some(datafusion_object_store::GetRange::Bounded(0..5)), + ..Default::default() + }; + let ret = store.get_opts(&location, opts).await.unwrap(); + let data = ret.bytes().await.unwrap(); + assert_eq!(Bytes::from_static(b"Hello"), data); + assert!( + stat_count.load(Ordering::SeqCst) > 0, + "get_opts should call stat() to get metadata" + ); + + // Cleanup + store.delete(&location).await.unwrap(); + } +} diff --git a/src/object-store/src/layers/mock.rs b/src/object-store/src/layers/mock.rs index e55af3bfe0..3df8aae535 100644 --- a/src/object-store/src/layers/mock.rs +++ b/src/object-store/src/layers/mock.rs @@ -131,12 +131,12 @@ pub struct MockDeleter { } impl oio::Delete for MockDeleter { - fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { - self.inner.delete(path, args) + async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { + self.inner.delete(path, args).await } - async fn flush(&mut self) -> Result { - self.inner.flush().await + async fn close(&mut self) -> Result<()> { + self.inner.close().await } } diff --git a/src/object-store/src/lib.rs b/src/object-store/src/lib.rs index e7ee6afcdb..3a5f72c5ce 100644 --- a/src/object-store/src/lib.rs +++ b/src/object-store/src/lib.rs @@ -15,9 +15,10 @@ pub use opendal::raw::{Access, HttpClient}; pub use opendal::{ Buffer, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, FuturesAsyncReader, - FuturesAsyncWriter, Lister, Operator as ObjectStore, Reader, Result, Scheme, Writer, services, + FuturesAsyncWriter, Lister, Operator as ObjectStore, Reader, Result, Writer, services, }; +pub mod compat; pub mod config; pub mod error; pub mod factory; diff --git a/src/object-store/src/test_util.rs b/src/object-store/src/test_util.rs index 3279db7b7c..af68978b0e 100644 --- a/src/object-store/src/test_util.rs +++ b/src/object-store/src/test_util.rs @@ -32,7 +32,7 @@ impl TempFolder { } pub async fn remove_all(&self) -> Result<()> { - self.store.remove_all(&self.path).await + self.store.delete_with(&self.path).recursive(true).await } } diff --git a/src/object-store/src/util.rs b/src/object-store/src/util.rs index a402b8237c..f43e5a8d83 100644 --- a/src/object-store/src/util.rs +++ b/src/object-store/src/util.rs @@ -131,7 +131,7 @@ pub fn normalize_path(path: &str) -> String { pub fn with_instrument_layers(object_store: ObjectStore, path_label: bool) -> ObjectStore { object_store .layer(LoggingLayer::new(DefaultLoggingInterceptor)) - .layer(TracingLayer) + .layer(TracingLayer::new()) .layer(crate::layers::build_prometheus_metrics_layer(path_label)) } diff --git a/src/object-store/tests/object_store_test.rs b/src/object-store/tests/object_store_test.rs index 3dac15da46..4ade4fc383 100644 --- a/src/object-store/tests/object_store_test.rs +++ b/src/object-store/tests/object_store_test.rs @@ -13,17 +13,22 @@ // limitations under the License. use std::env; +use std::sync::Arc; use anyhow::Result; +use arrow_object_store::path::Path; +use arrow_object_store::{ObjectStore as ArrowObjectStore, ObjectStoreExt}; +use bytes::Bytes; use common_telemetry::info; -use common_test_util::temp_dir::create_temp_dir; use futures::TryStreamExt; use object_store::ObjectStore; use object_store::services::{Fs, S3}; use object_store::test_util::TempFolder; +use object_store_opendal::OpendalStore; use opendal::EntryMode; -use opendal::services::{Azblob, Gcs, Oss}; +use opendal::services::{Azblob, Gcs, Memory, Oss}; use prometheus::{Encoder, TextEncoder}; +use tempfile::TempDir; async fn test_object_crud(store: &ObjectStore) -> Result<()> { // Create object handler. @@ -220,10 +225,39 @@ fn assert_opendal_metrics() { ); } +fn create_temp_dir(prefix: &str) -> Result { + Ok(tempfile::Builder::new().prefix(prefix).tempdir()?) +} + +#[tokio::test] +async fn test_opendal_memory_smoke() -> Result<()> { + let op = opendal::Operator::new(Memory::default())?.finish(); + let store: OpendalStore = OpendalStore::new(op); + assert_eq!("memory", store.info().scheme()); + assert!(format!("{store}").contains("memory")); + let store: Arc = Arc::new(store); + let location = Path::from("smoke/test.txt"); + store + .put(&location, Bytes::from_static(b"hello, memory").into()) + .await?; + + let content = store.get(&location).await?.bytes().await?; + assert_eq!(content, Bytes::from_static(b"hello, memory")); + + let listed = store + .list(Some(&Path::from("smoke"))) + .try_collect::>() + .await?; + assert_eq!(listed.len(), 1); + assert_eq!(listed[0].location, location); + + Ok(()) +} + #[tokio::test] async fn test_fs_backend() -> Result<()> { - let data_dir = create_temp_dir("test_fs_backend"); - let tmp_dir = create_temp_dir("test_fs_backend"); + let data_dir = create_temp_dir("test_fs_backend")?; + let tmp_dir = create_temp_dir("test_fs_backend")?; let builder = Fs::default() .root(&data_dir.path().to_string_lossy()) .atomic_write_dir(&tmp_dir.path().to_string_lossy()); diff --git a/src/operator/Cargo.toml b/src/operator/Cargo.toml index 139d08378e..ea353f4ced 100644 --- a/src/operator/Cargo.toml +++ b/src/operator/Cargo.toml @@ -55,7 +55,6 @@ meter-core.workspace = true meter-macros.workspace = true moka = { workspace = true, features = ["future"] } object-store.workspace = true -object_store_opendal.workspace = true partition.workspace = true prometheus.workspace = true prost.workspace = true diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index d4366e5d1f..ddf4cc7210 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -94,7 +94,7 @@ parking_lot.workspace = true partition.workspace = true pg_interval = { version = "0.5.2", package = "pg_interval_2" } pgwire = { version = "0.38.3", default-features = false, features = [ - "server-api-ring", + "server-api-aws-lc-rs", "pg-ext-types", ] } pin-project = "1.0" @@ -110,7 +110,7 @@ regex.workspace = true reqwest.workspace = true rust-embed = { version = "6.6", optional = true, features = ["debug-embed"] } rust_decimal = { workspace = true, features = ["db-postgres"] } -rustls = { workspace = true, default-features = false, features = ["ring", "logging", "std", "tls12"] } +rustls = { workspace = true, default-features = false, features = ["aws_lc_rs", "logging", "std", "tls12"] } rustls-pemfile = "2.0" rustls-pki-types = "1.0" serde.workspace = true @@ -153,7 +153,7 @@ common-test-util.workspace = true criterion = "0.5" json5 = "0.4" mysql_async = { version = "0.35", default-features = false, features = [ - "default-rustls-ring", + "default-rustls", ] } permutation = "0.4" rand.workspace = true diff --git a/src/servers/build.rs b/src/servers/build.rs index 5f7081d024..0e38a7f83f 100644 --- a/src/servers/build.rs +++ b/src/servers/build.rs @@ -19,6 +19,7 @@ fn main() { #[cfg(feature = "dashboard")] fn fetch_dashboard_assets() { + use std::path::PathBuf; use std::process::{Command, Stdio}; let message = "Failed to fetch dashboard assets"; @@ -30,7 +31,16 @@ or it's a network error, just try again or enable/disable some proxy."#; let mut dir = std::env::current_dir().unwrap(); dir.pop(); dir.pop(); - dir.push("scripts"); + let scripts_dir = dir.join("scripts"); + let dashboard_dist = dir.join(PathBuf::from("src/servers/dashboard/dist")); + + if dashboard_dist.join("index.html").exists() { + println!("cargo:rerun-if-changed=dashboard/VERSION"); + println!("cargo:rerun-if-changed=dashboard/dist"); + return; + } + + dir = scripts_dir; let out_dir = std::env::var("OUT_DIR").unwrap(); diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index ab4caafc34..6ddd14c69d 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -50,6 +50,7 @@ use tonic::{Request, Response, Status}; use tonic_reflection::server::v1::{ServerReflection, ServerReflectionServer}; use crate::error::{AlreadyStartedSnafu, InternalSnafu, Result, StartGrpcSnafu, TcpBindSnafu}; +use crate::install_default_crypto_provider; use crate::metrics::MetricsMiddlewareLayer; use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler}; use crate::query_handler::OpenTelemetryProtocolHandlerRef; @@ -357,6 +358,11 @@ impl Server for GrpcServer { let mut builder = tonic::transport::Server::builder().layer(metrics_layer); if let Some(tls_config) = self.tls_config.clone() { + // tonic builds the underlying rustls server config here, which requires a + // process-level crypto provider to be installed first. + if let Err(err) = install_default_crypto_provider() { + warn!("Failed to install default rustls crypto provider: {err}"); + } builder = builder.tls_config(tls_config).context(StartGrpcSnafu)?; } diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index 44587783be..242d63e35e 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -64,13 +64,13 @@ pub struct SqlPlan { schema: Option, } -/// Install the ring crypto provider for rustls process-wide. see: +/// Install the default crypto provider for rustls process-wide. see: /// /// https://docs.rs/rustls/latest/rustls/crypto/struct.CryptoProvider.html#using-the-per-process-default-cryptoprovider /// /// for more information. -pub fn install_ring_crypto_provider() -> Result<(), String> { - rustls::crypto::CryptoProvider::install_default(rustls::crypto::ring::default_provider()) +pub fn install_default_crypto_provider() -> Result<(), String> { + rustls::crypto::CryptoProvider::install_default(rustls::crypto::aws_lc_rs::default_provider()) .map_err(|ret| { format!( "CryptoProvider already installed as: {:?}, but providing {:?}", diff --git a/src/servers/src/tls.rs b/src/servers/src/tls.rs index 5f461731ef..51f5818b49 100644 --- a/src/servers/src/tls.rs +++ b/src/servers/src/tls.rs @@ -239,7 +239,7 @@ pub fn maybe_watch_server_tls_config( #[cfg(test)] mod tests { use super::*; - use crate::install_ring_crypto_provider; + use crate::install_default_crypto_provider; use crate::tls::TlsMode::Disable; #[test] @@ -510,7 +510,7 @@ mod tests { #[test] fn test_tls_file_change_watch() { common_telemetry::init_default_ut_logging(); - let _ = install_ring_crypto_provider(); + let _ = install_default_crypto_provider(); let dir = tempfile::tempdir().unwrap(); let cert_path = dir.path().join("server.crt"); diff --git a/src/servers/tests/mysql/mysql_server_test.rs b/src/servers/tests/mysql/mysql_server_test.rs index 94fcebf13c..e0cb086dda 100644 --- a/src/servers/tests/mysql/mysql_server_test.rs +++ b/src/servers/tests/mysql/mysql_server_test.rs @@ -27,7 +27,7 @@ use datatypes::value::Value; use mysql_async::prelude::*; use mysql_async::{Conn, Row, SslOpts}; use servers::error::Result; -use servers::install_ring_crypto_provider; +use servers::install_default_crypto_provider; use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; use servers::server::Server; use servers::tls::{ReloadableTlsServerConfig, TlsOption}; @@ -45,7 +45,7 @@ struct MysqlOpts<'a> { } fn create_mysql_server(table: TableRef, opts: MysqlOpts<'_>) -> Result> { - let _ = install_ring_crypto_provider(); + let _ = install_default_crypto_provider(); let query_handler = create_testing_sql_query_handler(table); let io_runtime = RuntimeBuilder::default() .worker_threads(4) diff --git a/src/servers/tests/postgres/mod.rs b/src/servers/tests/postgres/mod.rs index 311e630e0e..5a3e97e3e7 100644 --- a/src/servers/tests/postgres/mod.rs +++ b/src/servers/tests/postgres/mod.rs @@ -27,7 +27,7 @@ use rustls::client::danger::{ServerCertVerified, ServerCertVerifier}; use rustls::{Error, SignatureScheme}; use rustls_pki_types::{CertificateDer, ServerName}; use servers::error::Result; -use servers::install_ring_crypto_provider; +use servers::install_default_crypto_provider; use servers::postgres::PostgresServer; use servers::server::Server; use servers::tls::{ReloadableTlsServerConfig, TlsOption}; @@ -365,7 +365,7 @@ async fn test_extended_query() -> Result<()> { async fn start_test_server(server_tls: TlsOption) -> Result { common_telemetry::init_default_ut_logging(); - let _ = install_ring_crypto_provider(); + let _ = install_default_crypto_provider(); let table = MemTable::default_numbers_table(); let mut pg_server = create_postgres_server(table, false, server_tls, None)?; diff --git a/tests-fuzz/Cargo.toml b/tests-fuzz/Cargo.toml index bc687092c0..a2717c0534 100644 --- a/tests-fuzz/Cargo.toml +++ b/tests-fuzz/Cargo.toml @@ -46,6 +46,7 @@ paste.workspace = true rand = { workspace = true } rand_chacha = "0.9" reqwest = { workspace = true } +rustls = { workspace = true, default-features = false, features = ["aws_lc_rs", "std", "tls12"] } schemars = "0.8" serde = { workspace = true } serde_json = { workspace = true } diff --git a/tests-fuzz/src/lib.rs b/tests-fuzz/src/lib.rs index 8900350224..1ecf4909f3 100644 --- a/tests-fuzz/src/lib.rs +++ b/tests-fuzz/src/lib.rs @@ -21,5 +21,19 @@ pub mod translator; pub mod utils; pub mod validator; +use std::sync::OnceLock; + #[cfg(test)] pub mod test_utils; + +static RUSTLS_CRYPTO_PROVIDER_INIT: OnceLock<()> = OnceLock::new(); + +pub fn install_rustls_crypto_provider() { + RUSTLS_CRYPTO_PROVIDER_INIT.get_or_init(|| { + if rustls::crypto::CryptoProvider::get_default().is_none() { + let _ = rustls::crypto::CryptoProvider::install_default( + rustls::crypto::aws_lc_rs::default_provider(), + ); + } + }); +} diff --git a/tests-fuzz/src/utils.rs b/tests-fuzz/src/utils.rs index d55abab3c2..0f2f982262 100644 --- a/tests-fuzz/src/utils.rs +++ b/tests-fuzz/src/utils.rs @@ -52,6 +52,8 @@ const GT_MYSQL_ADDR: &str = "GT_MYSQL_ADDR"; /// Connects to GreptimeDB via env variables. pub async fn init_greptime_connections_via_env() -> Connections { + crate::install_rustls_crypto_provider(); + let _ = dotenv::dotenv(); let mysql = if let Ok(addr) = env::var(GT_MYSQL_ADDR) { Some(addr) diff --git a/tests/cases/standalone/common/information_schema/ssts.result b/tests/cases/standalone/common/information_schema/ssts.result index bf0642f667..46d52c4204 100644 --- a/tests/cases/standalone/common/information_schema/ssts.result +++ b/tests/cases/standalone/common/information_schema/ssts.result @@ -129,19 +129,20 @@ SELECT * FROM information_schema.ssts_index_meta ORDER BY meta_json; -- SQLNESS REPLACE (\s+\d+\s+) -- SQLNESS REPLACE ([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}) +-- SQLNESS REPLACE (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3,9}) -- SQLNESS REPLACE (/public/\d+/\d+_\d+) /public//_ SELECT * FROM information_schema.ssts_storage order by file_path; -+---------------------------------------------------------------------------------------------+-----------+------------------+---------+ -| file_path | file_size | last_modified_ms | node_id | -+---------------------------------------------------------------------------------------------+-----------+------------------+---------+ -| data/greptime/public//_/.parquet | | || -| data/greptime/public//_/index/.puffin | | || -| data/greptime/public//_/.parquet | | || -| data/greptime/public//_/index/.puffin | | || -| data/greptime/public//_/.parquet | | || -| data/greptime/public//_/index/.puffin | | || -+---------------------------------------------------------------------------------------------+-----------+------------------+---------+ ++---------------------------------------------------------------------------------------------+-----------+-------------------------+---------+ +| file_path | file_size | last_modified_ms | node_id | ++---------------------------------------------------------------------------------------------+-----------+-------------------------+---------+ +| data/greptime/public//_/.parquet || || +| data/greptime/public//_/index/.puffin || || +| data/greptime/public//_/.parquet || || +| data/greptime/public//_/index/.puffin || || +| data/greptime/public//_/.parquet || || +| data/greptime/public//_/index/.puffin || || ++---------------------------------------------------------------------------------------------+-----------+-------------------------+---------+ INSERT INTO sst_case VALUES (24, 'foo', 'foo', 100), @@ -205,23 +206,24 @@ SELECT * FROM information_schema.ssts_index_meta ORDER BY meta_json; -- SQLNESS REPLACE (\s+\d+\s+) -- SQLNESS REPLACE ([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}) +-- SQLNESS REPLACE (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3,9}) -- SQLNESS REPLACE (/public/\d+/\d+_\d+) /public//_ SELECT * FROM information_schema.ssts_storage order by file_path; -+---------------------------------------------------------------------------------------------+-----------+------------------+---------+ -| file_path | file_size | last_modified_ms | node_id | -+---------------------------------------------------------------------------------------------+-----------+------------------+---------+ -| data/greptime/public//_/.parquet | | || -| data/greptime/public//_/.parquet | | || -| data/greptime/public//_/index/.puffin | | || -| data/greptime/public//_/index/.puffin | | || -| data/greptime/public//_/.parquet | | || -| data/greptime/public//_/.parquet | | || -| data/greptime/public//_/index/.puffin | | || -| data/greptime/public//_/index/.puffin | | || -| data/greptime/public//_/.parquet | | || -| data/greptime/public//_/index/.puffin | | || -+---------------------------------------------------------------------------------------------+-----------+------------------+---------+ ++---------------------------------------------------------------------------------------------+-----------+-------------------------+---------+ +| file_path | file_size | last_modified_ms | node_id | ++---------------------------------------------------------------------------------------------+-----------+-------------------------+---------+ +| data/greptime/public//_/.parquet || || +| data/greptime/public//_/.parquet || || +| data/greptime/public//_/index/.puffin || || +| data/greptime/public//_/index/.puffin || || +| data/greptime/public//_/.parquet || || +| data/greptime/public//_/.parquet || || +| data/greptime/public//_/index/.puffin || || +| data/greptime/public//_/index/.puffin || || +| data/greptime/public//_/.parquet || || +| data/greptime/public//_/index/.puffin || || ++---------------------------------------------------------------------------------------------+-----------+-------------------------+---------+ DROP TABLE sst_case; diff --git a/tests/cases/standalone/common/information_schema/ssts.sql b/tests/cases/standalone/common/information_schema/ssts.sql index f411c8f67a..8aed21f1c1 100644 --- a/tests/cases/standalone/common/information_schema/ssts.sql +++ b/tests/cases/standalone/common/information_schema/ssts.sql @@ -40,6 +40,7 @@ SELECT * FROM information_schema.ssts_index_meta ORDER BY meta_json; -- SQLNESS REPLACE (\s+\d+\s+) -- SQLNESS REPLACE ([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}) +-- SQLNESS REPLACE (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3,9}) -- SQLNESS REPLACE (/public/\d+/\d+_\d+) /public//_ SELECT * FROM information_schema.ssts_storage order by file_path; @@ -67,6 +68,7 @@ SELECT * FROM information_schema.ssts_index_meta ORDER BY meta_json; -- SQLNESS REPLACE (\s+\d+\s+) -- SQLNESS REPLACE ([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}) +-- SQLNESS REPLACE (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3,9}) -- SQLNESS REPLACE (/public/\d+/\d+_\d+) /public//_ SELECT * FROM information_schema.ssts_storage order by file_path; diff --git a/tests/runner/Cargo.toml b/tests/runner/Cargo.toml index db1d1402aa..6b41e5a61e 100644 --- a/tests/runner/Cargo.toml +++ b/tests/runner/Cargo.toml @@ -19,9 +19,9 @@ datatypes = { workspace = true } flate2 = "1.0" hex = "0.4" local-ip-address = "0.6" -mysql = { version = "26", default-features = false, features = ["minimal", "rustls-tls-ring"] } +mysql = { version = "26", default-features = false, features = ["minimal", "rustls-tls"] } num_cpus = "1.16" -reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] } +reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "stream"] } serde.workspace = true serde_json.workspace = true sha2 = "0.10"