From 77182f50245d6a2facc0ec5f943b02f566bdab7d Mon Sep 17 00:00:00 2001 From: LFC Date: Wed, 21 Dec 2022 17:02:11 +0800 Subject: [PATCH] chore: upgrade Arrow to version 28, and DataFusion to 15 (#771) Co-authored-by: luofucong --- Cargo.lock | 264 ++++++++++++++---- benchmarks/Cargo.toml | 4 +- src/catalog/Cargo.toml | 2 +- src/catalog/src/system.rs | 4 +- src/catalog/src/tables.rs | 4 +- src/client/Cargo.toml | 2 +- src/common/function/Cargo.toml | 2 +- src/common/grpc-expr/src/insert.rs | 2 +- src/common/grpc/Cargo.toml | 2 +- src/common/query/Cargo.toml | 6 +- .../query/src/logical_plan/accumulator.rs | 5 + src/common/query/src/physical_plan.rs | 2 +- src/common/recordbatch/Cargo.toml | 4 +- src/common/recordbatch/src/recordbatch.rs | 2 +- src/common/substrait/Cargo.toml | 4 +- src/common/substrait/src/df_logical.rs | 20 +- src/datanode/Cargo.toml | 4 +- src/datanode/src/sql.rs | 2 +- src/datatypes/Cargo.toml | 6 +- src/datatypes/src/schema/column_schema.rs | 22 +- src/datatypes/src/value.rs | 7 +- src/datatypes/src/vectors/helper.rs | 7 +- src/frontend/Cargo.toml | 6 +- src/frontend/src/table.rs | 10 +- src/mito/Cargo.toml | 4 +- src/mito/src/engine.rs | 12 +- src/mito/src/table.rs | 4 +- src/query/Cargo.toml | 12 +- src/query/src/datafusion/planner.rs | 5 + src/query/src/query_engine/state.rs | 6 + src/script/Cargo.toml | 8 +- src/sql/Cargo.toml | 2 +- src/sql/src/parsers/create_parser.rs | 4 +- src/sql/src/statements.rs | 8 +- src/storage/Cargo.toml | 2 +- src/table/Cargo.toml | 10 +- src/table/src/table.rs | 2 +- src/table/src/table/adapter.rs | 4 +- src/table/src/table/numbers.rs | 2 +- src/table/src/test_util/empty_table.rs | 2 +- src/table/src/test_util/memtable.rs | 6 +- 41 files changed, 328 insertions(+), 158 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 866d173e86..70ac528e77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -194,38 +194,35 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" [[package]] name = "arrow" -version = "26.0.0" +version = "28.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e24e2bcd431a4aa0ff003fdd2dc21c78cfb42f31459c89d2312c2746fe17a5ac" +checksum = "aed9849f86164fad5cb66ce4732782b15f1bc97f8febab04e782c20cce9d4b6c" dependencies = [ "ahash 0.8.2", "arrow-array", "arrow-buffer", + "arrow-cast", + "arrow-csv", "arrow-data", + "arrow-ipc", + "arrow-json", "arrow-schema", "arrow-select", - "bitflags", "chrono", "comfy-table", - "csv", - "flatbuffers", "half 2.1.0", - "hashbrown 0.12.3", - "indexmap", - "lazy_static", - "lexical-core", + "hashbrown 0.13.1", "multiversion", "num", "regex", "regex-syntax", - "serde_json", ] [[package]] name = "arrow-array" -version = "26.0.0" +version = "28.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9044300874385f19e77cbf90911e239bd23630d8f23bb0f948f9067998a13b7" +checksum = "6b8504cf0a6797e908eecf221a865e7d339892720587f87c8b90262863015b08" dependencies = [ "ahash 0.8.2", "arrow-buffer", @@ -233,25 +230,59 @@ dependencies = [ "arrow-schema", "chrono", "half 2.1.0", - "hashbrown 0.12.3", + "hashbrown 0.13.1", "num", ] [[package]] name = "arrow-buffer" -version = "26.0.0" +version = "28.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78476cbe9e3f808dcecab86afe42d573863c63e149c62e6e379ed2522743e626" +checksum = "d6de64a27cea684b24784647d9608314bc80f7c4d55acb44a425e05fab39d916" dependencies = [ "half 2.1.0", "num", ] [[package]] -name = "arrow-data" -version = "26.0.0" +name = "arrow-cast" +version = "28.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d916feee158c485dad4f701cba31bc9a90a8db87d9df8e2aa8adc0c20a2bbb9" +checksum = "bec4a54502eefe05923c385c90a005d69474fa06ca7aa2a2b123c9f9532f6178" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "chrono", + "lexical-core", + "num", +] + +[[package]] +name = "arrow-csv" +version = "28.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7902bbf8127eac48554fe902775303377047ad49a9fd473c2b8cb399d092080" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", + "chrono", + "csv", + "lazy_static", + "lexical-core", + "regex", +] + +[[package]] +name = "arrow-data" +version = "28.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e4882efe617002449d5c6b5de9ddb632339074b36df8a96ea7147072f1faa8a" dependencies = [ "arrow-buffer", "arrow-schema", @@ -260,19 +291,51 @@ dependencies = [ ] [[package]] -name = "arrow-schema" -version = "26.0.0" +name = "arrow-ipc" +version = "28.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f9406eb7834ca6bd8350d1baa515d18b9fcec487eddacfb62f5e19511f7bd37" +checksum = "fa0703a6de2785828561b03a4d7793ecd333233e1b166316b4bfc7cfce55a4a7" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", + "flatbuffers", +] + +[[package]] +name = "arrow-json" +version = "28.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bd23fc8c6d251f96cd63b96fece56bbb9710ce5874a627cb786e2600673595a" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", + "chrono", + "half 2.1.0", + "indexmap", + "num", + "serde_json", +] + +[[package]] +name = "arrow-schema" +version = "28.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da9f143882a80be168538a60e298546314f50f11f2a288c8d73e11108da39d26" dependencies = [ "serde", ] [[package]] name = "arrow-select" -version = "26.0.0" +version = "28.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6593a01586751c74498495d2f5a01fcd438102b52965c11dd98abf4ebcacef37" +checksum = "520406331d4ad60075359524947ebd804e479816439af82bcb17f8d280d9b38c" dependencies = [ "arrow-array", "arrow-buffer", @@ -334,6 +397,7 @@ dependencies = [ "memchr", "pin-project-lite", "tokio", + "xz2", ] [[package]] @@ -1904,9 +1968,9 @@ dependencies = [ [[package]] name = "datafusion" -version = "14.0.0" +version = "15.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7a8411475928479fe57af18698626f0a44f3c29153e051dce45f7455c08a6d5" +checksum = "b75a088adf79515b04fd3895c1a14dc249c8f7a7f27b59870a05546fe9a55542" dependencies = [ "ahash 0.8.2", "arrow", @@ -1915,6 +1979,7 @@ dependencies = [ "bytes", "bzip2", "chrono", + "dashmap", "datafusion-common", "datafusion-expr", "datafusion-optimizer", @@ -1924,13 +1989,12 @@ dependencies = [ "flate2", "futures", "glob", - "hashbrown 0.12.3", + "hashbrown 0.13.1", "itertools", "lazy_static", "log", "num_cpus", "object_store", - "ordered-float 3.4.0", "parking_lot", "parquet", "paste", @@ -1938,6 +2002,7 @@ dependencies = [ "pin-project-lite", "rand 0.8.5", "smallvec", + "sqllogictest", "sqlparser", "tempfile", "tokio", @@ -1945,27 +2010,27 @@ dependencies = [ "tokio-util", "url", "uuid", + "xz2", ] [[package]] name = "datafusion-common" -version = "14.0.0" +version = "15.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15f1ffcbc1f040c9ab99f41db1c743d95aff267bb2e7286aaa010738b7402251" +checksum = "7b17262b899f79afdf502846d1138a8b48441afe24dc6e07c922105289248137" dependencies = [ "arrow", "chrono", "object_store", - "ordered-float 3.4.0", "parquet", "sqlparser", ] [[package]] name = "datafusion-expr" -version = "14.0.0" +version = "15.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1883d9590d303ef38fa295567e7fdb9f8f5f511fcc167412d232844678cd295c" +checksum = "533d2226b4636a1306d1f6f4ac02e436947c5d6e8bfc85f6d8f91a425c10a407" dependencies = [ "ahash 0.8.2", "arrow", @@ -1976,9 +2041,9 @@ dependencies = [ [[package]] name = "datafusion-optimizer" -version = "14.0.0" +version = "15.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2127d46d566ab3463d70da9675fc07b9d634be8d17e80d0e1ce79600709fe651" +checksum = "ce7ba274267b6baf1714a67727249aa56d648c8814b0f4c43387fbe6d147e619" dependencies = [ "arrow", "async-trait", @@ -1986,15 +2051,15 @@ dependencies = [ "datafusion-common", "datafusion-expr", "datafusion-physical-expr", - "hashbrown 0.12.3", + "hashbrown 0.13.1", "log", ] [[package]] name = "datafusion-physical-expr" -version = "14.0.0" +version = "15.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d108b6fe8eeb317ecad1d74619e8758de49cccc8c771b56c97962fd52eaae23" +checksum = "f35cb53e6c2f9c40accdf45aef2be7fde030ea3051b1145a059d96109e65b0bf" dependencies = [ "ahash 0.8.2", "arrow", @@ -2007,12 +2072,11 @@ dependencies = [ "datafusion-expr", "datafusion-row", "half 2.1.0", - "hashbrown 0.12.3", + "hashbrown 0.13.1", "itertools", "lazy_static", "md-5", "num-traits", - "ordered-float 3.4.0", "paste", "rand 0.8.5", "regex", @@ -2023,9 +2087,9 @@ dependencies = [ [[package]] name = "datafusion-row" -version = "14.0.0" +version = "15.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43537b6377d506e4788bf21e9ed943340e076b48ca4d077e6ea4405ca5e54a1c" +checksum = "27c77b1229ae5cf6a6e0e2ba43ed4e98131dbf1cc4a97fad17c94230b32e0812" dependencies = [ "arrow", "datafusion-common", @@ -2035,11 +2099,11 @@ dependencies = [ [[package]] name = "datafusion-sql" -version = "14.0.0" +version = "15.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "244d08d4710e1088d9c0949c9b5b8d68d9cf2cde7203134a4cc389e870fe2354" +checksum = "569423fa8a50db39717080949e3b4f8763582b87baf393cc3fcf27cc21467ba7" dependencies = [ - "arrow", + "arrow-schema", "datafusion-common", "datafusion-expr", "sqlparser", @@ -2166,6 +2230,12 @@ version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56254986775e3233ffa9c4d7d3faaf6d36a2c09d30b20687e9f88bc8bafc16c8" +[[package]] +name = "difference" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524cbf6897b527295dff137cec09ecf3a05f4fddffd7dfcd1585403449e74198" + [[package]] name = "digest" version = "0.10.6" @@ -2857,6 +2927,15 @@ dependencies = [ "ahash 0.7.6", ] +[[package]] +name = "hashbrown" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ff8ae62cd3a9102e5637afc8452c55acf3844001bd5374e0b0bd7b6616c038" +dependencies = [ + "ahash 0.8.2", +] + [[package]] name = "hdrhistogram" version = "7.5.2" @@ -3373,6 +3452,17 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "348108ab3fba42ec82ff6e9564fc4ca0247bdccdc68dd8af9764bbc79c3c8ffb" +[[package]] +name = "libtest-mimic" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7b603516767d1ab23d0de09d023e62966c3322f7148297c35cf3d97aa8b37fa" +dependencies = [ + "clap 4.0.29", + "termcolor", + "threadpool", +] + [[package]] name = "libz-sys" version = "1.1.8" @@ -3535,6 +3625,17 @@ dependencies = [ "twox-hash", ] +[[package]] +name = "lzma-sys" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fda04ab3764e6cde78b9974eec4f779acaba7c4e84b36eca3cf77c581b85d27" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "mac_address" version = "1.1.4" @@ -4344,6 +4445,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "ordered-float" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7940cf2ca942593318d07fcf2596cdca60a85c9e7fab408a5e21a4f9dcd40d87" +dependencies = [ + "num-traits", +] + [[package]] name = "ordered-float" version = "3.4.0" @@ -4430,26 +4540,34 @@ dependencies = [ [[package]] name = "parquet" -version = "26.0.0" +version = "28.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3bf8fa7ab6572791325a8595f55dc532dde88b996ae10a5ca8a2db746784ecc4" +checksum = "21433e9209111bb3720b747f2f137e0d115af1af0420a7a1c26b6e88227fa353" dependencies = [ "ahash 0.8.2", - "arrow", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-ipc", + "arrow-schema", + "arrow-select", "base64 0.13.1", "brotli", "bytes", "chrono", "flate2", "futures", - "hashbrown 0.12.3", + "hashbrown 0.13.1", "lz4", "num", "num-bigint", + "paste", "seq-macro", "snap", - "thrift 0.16.0", + "thrift 0.17.0", "tokio", + "twox-hash", "zstd", ] @@ -6571,6 +6689,25 @@ dependencies = [ "sqlparser", ] +[[package]] +name = "sqllogictest" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba41e01d229d7725401de371e323851f82d839d68732a06162405362b60852fe" +dependencies = [ + "async-trait", + "difference", + "futures", + "glob", + "humantime", + "itertools", + "libtest-mimic", + "regex", + "tempfile", + "thiserror", + "tracing", +] + [[package]] name = "sqlness" version = "0.1.0" @@ -6600,9 +6737,9 @@ dependencies = [ [[package]] name = "sqlparser" -version = "0.26.0" +version = "0.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86be66ea0b2b22749cfa157d16e2e84bf793e626a3375f4d378dc289fa03affb" +checksum = "aba319938d4bfe250a769ac88278b629701024fe16f34257f9563bc628081970" dependencies = [ "log", ] @@ -7132,13 +7269,13 @@ dependencies = [ [[package]] name = "thrift" -version = "0.16.0" +version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09678c4cdbb4eed72e18b7c2af1329c69825ed16fcbac62d083fc3e2b0590ff0" +checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" dependencies = [ "byteorder", "integer-encoding", - "ordered-float 1.1.1", + "ordered-float 2.10.0", ] [[package]] @@ -8326,19 +8463,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2d7d3948613f75c98fd9328cfdcc45acc4d360655289d0a7d4ec931392200a3" [[package]] -name = "zstd" -version = "0.11.2+zstd.1.5.2" +name = "xz2" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20cc960326ece64f010d2d2107537f26dc589a6573a316bd5b1dba685fa5fde4" +checksum = "388c44dc09d76f1536602ead6d325eb532f5c122f17782bd57fb47baeeb767e2" +dependencies = [ + "lzma-sys", +] + +[[package]] +name = "zstd" +version = "0.12.1+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c947d2adc84ff9a59f2e3c03b81aa4128acf28d6ad7d56273f7e8af14e47bea" dependencies = [ "zstd-safe", ] [[package]] name = "zstd-safe" -version = "5.0.2+zstd.1.5.2" +version = "6.0.2+zstd.1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d2a5585e04f9eea4b2a3d1eca508c4dee9592a89ef6f450c11719da0726f4db" +checksum = "a6cf39f730b440bab43da8fb5faf5f254574462f73f260f85f7987f32154ff17" dependencies = [ "libc", "zstd-sys", diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index ea8d78ef52..b64fff1b67 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -5,10 +5,10 @@ edition = "2021" license = "Apache-2.0" [dependencies] -arrow = "26.0.0" +arrow = "28.0" clap = { version = "4.0", features = ["derive"] } client = { path = "../src/client" } indicatif = "0.17.1" itertools = "0.10.5" -parquet = "26.0.0" +parquet = "28.0" tokio = { version = "1.21", features = ["full"] } diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 90adcf8e8a..f13cf2d321 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -19,7 +19,7 @@ common-recordbatch = { path = "../common/recordbatch" } common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } -datafusion = "14.0.0" +datafusion = "15.0" datatypes = { path = "../datatypes" } futures = "0.3" futures-util = "0.3" diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index 960be1fa24..a845c08143 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -61,7 +61,7 @@ impl Table for SystemCatalogTable { async fn scan( &self, - _projection: &Option>, + _projection: Option<&Vec>, _filters: &[Expr], _limit: Option, ) -> table::Result { @@ -129,7 +129,7 @@ impl SystemCatalogTable { let ctx = SessionContext::new(); let scan = self .table - .scan(&full_projection, &[], None) + .scan(full_projection, &[], None) .await .context(error::SystemCatalogTableScanSnafu)?; let stream = scan diff --git a/src/catalog/src/tables.rs b/src/catalog/src/tables.rs index 8dd59fb1bf..09e7779a16 100644 --- a/src/catalog/src/tables.rs +++ b/src/catalog/src/tables.rs @@ -77,7 +77,7 @@ impl Table for Tables { async fn scan( &self, - _projection: &Option>, + _projection: Option<&Vec>, _filters: &[Expr], _limit: Option, ) -> table::error::Result { @@ -370,7 +370,7 @@ mod tests { .unwrap(); let tables = Tables::new(catalog_list, "test_engine".to_string()); - let tables_stream = tables.scan(&None, &[], None).await.unwrap(); + let tables_stream = tables.scan(None, &[], None).await.unwrap(); let session_ctx = SessionContext::new(); let mut tables_stream = tables_stream.execute(0, session_ctx.task_ctx()).unwrap(); diff --git a/src/client/Cargo.toml b/src/client/Cargo.toml index 5c19f89970..f85e76b7a2 100644 --- a/src/client/Cargo.toml +++ b/src/client/Cargo.toml @@ -15,7 +15,7 @@ common-grpc-expr = { path = "../common/grpc-expr" } common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } common-time = { path = "../common/time" } -datafusion = "14.0.0" +datafusion = "15.0" datatypes = { path = "../datatypes" } enum_dispatch = "0.3" parking_lot = "0.12" diff --git a/src/common/function/Cargo.toml b/src/common/function/Cargo.toml index ce49cb5e5b..3c215614d3 100644 --- a/src/common/function/Cargo.toml +++ b/src/common/function/Cargo.toml @@ -11,7 +11,7 @@ common-error = { path = "../error" } common-function-macro = { path = "../function-macro" } common-query = { path = "../query" } common-time = { path = "../time" } -datafusion-common = "14.0.0" +datafusion-common = "15.0" datatypes = { path = "../../datatypes" } libc = "0.2" num = "0.4" diff --git a/src/common/grpc-expr/src/insert.rs b/src/common/grpc-expr/src/insert.rs index b4d2cbf4ba..a8273c62b6 100644 --- a/src/common/grpc-expr/src/insert.rs +++ b/src/common/grpc-expr/src/insert.rs @@ -725,7 +725,7 @@ mod tests { async fn scan( &self, - _projection: &Option>, + _projection: Option<&Vec>, _filters: &[Expr], _limit: Option, ) -> TableResult { diff --git a/src/common/grpc/Cargo.toml b/src/common/grpc/Cargo.toml index b1b5a25b6e..0b3a0bcb7a 100644 --- a/src/common/grpc/Cargo.toml +++ b/src/common/grpc/Cargo.toml @@ -13,7 +13,7 @@ common-query = { path = "../query" } common-recordbatch = { path = "../recordbatch" } common-runtime = { path = "../runtime" } dashmap = "5.4" -datafusion = "14.0.0" +datafusion = "15.0" datatypes = { path = "../../datatypes" } snafu = { version = "0.7", features = ["backtraces"] } tokio = { version = "1.0", features = ["full"] } diff --git a/src/common/query/Cargo.toml b/src/common/query/Cargo.toml index bd8f0bbf3a..a36cd4418a 100644 --- a/src/common/query/Cargo.toml +++ b/src/common/query/Cargo.toml @@ -9,9 +9,9 @@ async-trait = "0.1" common-error = { path = "../error" } common-recordbatch = { path = "../recordbatch" } common-time = { path = "../time" } -datafusion = "14.0.0" -datafusion-common = "14.0.0" -datafusion-expr = "14.0.0" +datafusion = "15.0" +datafusion-common = "15.0" +datafusion-expr = "15.0" datatypes = { path = "../../datatypes" } snafu = { version = "0.7", features = ["backtraces"] } statrs = "0.15" diff --git a/src/common/query/src/logical_plan/accumulator.rs b/src/common/query/src/logical_plan/accumulator.rs index cce139094e..4b83a7efa8 100644 --- a/src/common/query/src/logical_plan/accumulator.rs +++ b/src/common/query/src/logical_plan/accumulator.rs @@ -175,4 +175,9 @@ impl DfAccumulator for DfAccumulatorAdaptor { .map_err(Error::from)?; Ok(scalar_value) } + + fn size(&self) -> usize { + // TODO(LFC): Implement new "size" method for Accumulator. + 0 + } } diff --git a/src/common/query/src/physical_plan.rs b/src/common/query/src/physical_plan.rs index 42bb70087e..35b73be4d9 100644 --- a/src/common/query/src/physical_plan.rs +++ b/src/common/query/src/physical_plan.rs @@ -233,7 +233,7 @@ mod test { async fn scan( &self, _ctx: &SessionState, - _projection: &Option>, + _projection: Option<&Vec>, _filters: &[Expr], _limit: Option, ) -> DfResult> { diff --git a/src/common/recordbatch/Cargo.toml b/src/common/recordbatch/Cargo.toml index 634ec64410..94a32a965c 100644 --- a/src/common/recordbatch/Cargo.toml +++ b/src/common/recordbatch/Cargo.toml @@ -6,8 +6,8 @@ license = "Apache-2.0" [dependencies] common-error = { path = "../error" } -datafusion = "14.0.0" -datafusion-common = "14.0.0" +datafusion = "15.0" +datafusion-common = "15.0" datatypes = { path = "../../datatypes" } futures = "0.3" paste = "1.0" diff --git a/src/common/recordbatch/src/recordbatch.rs b/src/common/recordbatch/src/recordbatch.rs index 6b24a9c5a9..0ecbca104c 100644 --- a/src/common/recordbatch/src/recordbatch.rs +++ b/src/common/recordbatch/src/recordbatch.rs @@ -227,7 +227,7 @@ mod tests { let output = serde_json::to_string(&batch).unwrap(); assert_eq!( - r#"{"schema":{"fields":[{"name":"number","data_type":"UInt32","nullable":false,"dict_id":0,"dict_is_ordered":false}],"metadata":{"greptime:version":"0"}},"columns":[[0,1,2,3,4,5,6,7,8,9]]}"#, + r#"{"schema":{"fields":[{"name":"number","data_type":"UInt32","nullable":false,"dict_id":0,"dict_is_ordered":false,"metadata":{}}],"metadata":{"greptime:version":"0"}},"columns":[[0,1,2,3,4,5,6,7,8,9]]}"#, output ); } diff --git a/src/common/substrait/Cargo.toml b/src/common/substrait/Cargo.toml index 815a986d1e..931f6f8150 100644 --- a/src/common/substrait/Cargo.toml +++ b/src/common/substrait/Cargo.toml @@ -10,8 +10,8 @@ catalog = { path = "../../catalog" } common-catalog = { path = "../catalog" } common-error = { path = "../error" } common-telemetry = { path = "../telemetry" } -datafusion = "14.0.0" -datafusion-expr = "14.0.0" +datafusion = "15.0" +datafusion-expr = "15.0" datatypes = { path = "../../datatypes" } futures = "0.3" prost = "0.9" diff --git a/src/common/substrait/src/df_logical.rs b/src/common/substrait/src/df_logical.rs index a6a81fb6f5..c1323abc66 100644 --- a/src/common/substrait/src/df_logical.rs +++ b/src/common/substrait/src/df_logical.rs @@ -19,7 +19,7 @@ use catalog::CatalogManagerRef; use common_error::prelude::BoxedError; use common_telemetry::debug; use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; -use datafusion::common::ToDFSchema; +use datafusion::common::{DFField, DFSchema}; use datafusion::datasource::DefaultTableSource; use datafusion::physical_plan::project_schema; use datafusion_expr::{Filter, LogicalPlan, TableScan, TableSource}; @@ -262,10 +262,20 @@ impl DFLogicalSubstraitConvertor { }; // Calculate the projected schema - let projected_schema = project_schema(&stored_schema, projection.as_ref()) - .context(DFInternalSnafu)? - .to_dfschema_ref() - .context(DFInternalSnafu)?; + let qualified = &format!("{}.{}.{}", catalog_name, schema_name, table_name); + let projected_schema = Arc::new( + project_schema(&stored_schema, projection.as_ref()) + .and_then(|x| { + DFSchema::new_with_metadata( + x.fields() + .iter() + .map(|f| DFField::from_qualified(qualified, f.clone())) + .collect(), + x.metadata().clone(), + ) + }) + .context(DFInternalSnafu)?, + ); ctx.set_df_schema(projected_schema.clone()); diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index a245340ad8..38f887251a 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -25,7 +25,7 @@ common-recordbatch = { path = "../common/recordbatch" } common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } -datafusion = "14.0.0" +datafusion = "15.0" datatypes = { path = "../datatypes" } futures = "0.3" hyper = { version = "0.14", features = ["full"] } @@ -57,5 +57,5 @@ tower-http = { version = "0.3", features = ["full"] } axum-test-helper = { git = "https://github.com/sunng87/axum-test-helper.git", branch = "patch-1" } client = { path = "../client" } common-query = { path = "../common/query" } -datafusion-common = "14.0.0" +datafusion-common = "15.0" tempdir = "0.3" diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index e578bec1e9..b336ef8177 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -176,7 +176,7 @@ mod tests { async fn scan( &self, - _projection: &Option>, + _projection: Option<&Vec>, _filters: &[Expr], _limit: Option, ) -> TableResult { diff --git a/src/datatypes/Cargo.toml b/src/datatypes/Cargo.toml index 2841decb67..1e3e13155a 100644 --- a/src/datatypes/Cargo.toml +++ b/src/datatypes/Cargo.toml @@ -9,12 +9,12 @@ default = [] test = [] [dependencies] -arrow = { version = "26.0" } -arrow-schema = { version = "26.0", features = ["serde"] } +arrow = "28.0" +arrow-schema = { version = "28.0", features = ["serde"] } common-base = { path = "../common/base" } common-error = { path = "../common/error" } common-time = { path = "../common/time" } -datafusion-common = "14.0" +datafusion-common = "15.0" enum_dispatch = "0.3" num = "0.4" num-traits = "0.2" diff --git a/src/datatypes/src/schema/column_schema.rs b/src/datatypes/src/schema/column_schema.rs index 0577ca6aff..94fbba87ec 100644 --- a/src/datatypes/src/schema/column_schema.rs +++ b/src/datatypes/src/schema/column_schema.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::HashMap; use arrow::datatypes::Field; use serde::{Deserialize, Serialize}; @@ -23,7 +23,7 @@ use crate::error::{self, Error, Result}; use crate::schema::constraint::ColumnDefaultConstraint; use crate::vectors::VectorRef; -pub type Metadata = BTreeMap; +pub type Metadata = HashMap; /// Key used to store whether the column is time index in arrow field's metadata. const TIME_INDEX_KEY: &str = "greptime:time_index"; @@ -131,7 +131,7 @@ impl TryFrom<&Field> for ColumnSchema { fn try_from(field: &Field) -> Result { let data_type = ConcreteDataType::try_from(field.data_type())?; - let mut metadata = field.metadata().cloned().unwrap_or_default(); + let mut metadata = field.metadata().clone(); let default_constraint = match metadata.remove(DEFAULT_CONSTRAINT_KEY) { Some(json) => { Some(serde_json::from_str(&json).context(error::DeserializeSnafu { json })?) @@ -176,7 +176,7 @@ impl TryFrom<&ColumnSchema> for Field { column_schema.data_type.as_arrow_type(), column_schema.is_nullable(), ) - .with_metadata(Some(metadata))) + .with_metadata(metadata)) } } @@ -215,11 +215,7 @@ mod tests { assert!(field.is_nullable()); assert_eq!( "{\"Value\":{\"Int32\":99}}", - field - .metadata() - .unwrap() - .get(DEFAULT_CONSTRAINT_KEY) - .unwrap() + field.metadata().get(DEFAULT_CONSTRAINT_KEY).unwrap() ); let new_column_schema = ColumnSchema::try_from(&field).unwrap(); @@ -241,12 +237,8 @@ mod tests { .is_none()); let field = Field::try_from(&column_schema).unwrap(); - assert_eq!("v1", field.metadata().unwrap().get("k1").unwrap()); - assert!(field - .metadata() - .unwrap() - .get(DEFAULT_CONSTRAINT_KEY) - .is_some()); + assert_eq!("v1", field.metadata().get("k1").unwrap()); + assert!(field.metadata().get(DEFAULT_CONSTRAINT_KEY).is_some()); let new_column_schema = ColumnSchema::try_from(&field).unwrap(); assert_eq!(column_schema, new_column_schema); diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index 457c774606..7201ffbac4 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -544,12 +544,15 @@ impl TryFrom for Value { .map(|x| Value::Timestamp(Timestamp::new(x, TimeUnit::Nanosecond))) .unwrap_or(Value::Null), ScalarValue::Decimal128(_, _, _) - | ScalarValue::Time64(_) | ScalarValue::IntervalYearMonth(_) | ScalarValue::IntervalDayTime(_) | ScalarValue::IntervalMonthDayNano(_) | ScalarValue::Struct(_, _) - | ScalarValue::Dictionary(_, _) => { + | ScalarValue::Dictionary(_, _) + | ScalarValue::Time32Second(_) + | ScalarValue::Time32Millisecond(_) + | ScalarValue::Time64Microsecond(_) + | ScalarValue::Time64Nanosecond(_) => { return error::UnsupportedArrowTypeSnafu { arrow_type: v.get_datatype(), } diff --git a/src/datatypes/src/vectors/helper.rs b/src/datatypes/src/vectors/helper.rs index f3236ca0ec..e3a2eaaa58 100644 --- a/src/datatypes/src/vectors/helper.rs +++ b/src/datatypes/src/vectors/helper.rs @@ -195,12 +195,15 @@ impl Helper { ConstantVector::new(Arc::new(TimestampNanosecondVector::from(vec![v])), length) } ScalarValue::Decimal128(_, _, _) - | ScalarValue::Time64(_) | ScalarValue::IntervalYearMonth(_) | ScalarValue::IntervalDayTime(_) | ScalarValue::IntervalMonthDayNano(_) | ScalarValue::Struct(_, _) - | ScalarValue::Dictionary(_, _) => { + | ScalarValue::Dictionary(_, _) + | ScalarValue::Time32Second(_) + | ScalarValue::Time32Millisecond(_) + | ScalarValue::Time64Microsecond(_) + | ScalarValue::Time64Nanosecond(_) => { return error::ConversionSnafu { from: format!("Unsupported scalar value: {}", value), } diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 90c7120671..207ef93a00 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -22,9 +22,9 @@ common-recordbatch = { path = "../common/recordbatch" } common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } -datafusion = "14.0.0" -datafusion-common = "14.0.0" -datafusion-expr = "14.0.0" +datafusion = "15.0" +datafusion-common = "15.0" +datafusion-expr = "15.0" datanode = { path = "../datanode" } datatypes = { path = "../datatypes" } futures = "0.3" diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 12bbbdd49d..9b26dc2d0e 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -97,7 +97,7 @@ impl Table for DistTable { async fn scan( &self, - projection: &Option>, + projection: Option<&Vec>, filters: &[Expr], limit: Option, ) -> table::Result { @@ -121,7 +121,7 @@ impl Table for DistTable { partition_execs.push(Arc::new(PartitionExec { table_name: self.table_name.clone(), datanode_instance, - projection: projection.clone(), + projection: projection.cloned(), filters: filters.to_vec(), limit, batches: Arc::new(RwLock::new(None)), @@ -385,8 +385,8 @@ impl DistTable { } } -fn project_schema(table_schema: SchemaRef, projection: &Option>) -> SchemaRef { - if let Some(projection) = &projection { +fn project_schema(table_schema: SchemaRef, projection: Option<&Vec>) -> SchemaRef { + if let Some(projection) = projection { let columns = table_schema.column_schemas(); let projected = projection .iter() @@ -864,7 +864,7 @@ mod test { ) { let expected_output = expected_output.into_iter().join("\n"); let table_scan = table - .scan(&projection, filters.as_slice(), None) + .scan(projection.as_ref(), filters.as_slice(), None) .await .unwrap(); assert_eq!( diff --git a/src/mito/Cargo.toml b/src/mito/Cargo.toml index 583ff9f3f5..de743086a1 100644 --- a/src/mito/Cargo.toml +++ b/src/mito/Cargo.toml @@ -19,8 +19,8 @@ common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } -datafusion = "14.0.0" -datafusion-common = "14.0.0" +datafusion = "15.0" +datafusion-common = "15.0" datatypes = { path = "../datatypes" } futures = "0.3" log-store = { path = "../log-store" } diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index 480dd16cea..2c2f092617 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -611,7 +611,7 @@ mod tests { assert_eq!(2, table.insert(insert_req).await.unwrap()); let session_ctx = SessionContext::new(); - let stream = table.scan(&None, &[], None).await.unwrap(); + let stream = table.scan(None, &[], None).await.unwrap(); let stream = stream.execute(0, session_ctx.task_ctx()).unwrap(); let batches = util::collect(stream).await.unwrap(); assert_eq!(1, batches.len()); @@ -643,7 +643,7 @@ mod tests { assert_eq!(2, table.insert(insert_req).await.unwrap()); let session_ctx = SessionContext::new(); - let stream = table.scan(&None, &[], None).await.unwrap(); + let stream = table.scan(None, &[], None).await.unwrap(); let stream = stream.execute(0, session_ctx.task_ctx()).unwrap(); let batches = util::collect(stream).await.unwrap(); assert_eq!(1, batches.len()); @@ -737,7 +737,7 @@ mod tests { assert_eq!(2, table.insert(insert_req).await.unwrap()); let session_ctx = SessionContext::new(); - let stream = table.scan(&None, &[], None).await.unwrap(); + let stream = table.scan(None, &[], None).await.unwrap(); let stream = stream.execute(0, session_ctx.task_ctx()).unwrap(); let batches = util::collect(stream).await.unwrap(); assert_eq!(1, batches.len()); @@ -758,7 +758,7 @@ mod tests { assert_eq!(tss, *batch.column(3)); // Scan with projections: cpu and memory - let stream = table.scan(&Some(vec![1, 2]), &[], None).await.unwrap(); + let stream = table.scan(Some(&vec![1, 2]), &[], None).await.unwrap(); let stream = stream.execute(0, session_ctx.task_ctx()).unwrap(); let batches = util::collect(stream).await.unwrap(); assert_eq!(1, batches.len()); @@ -776,7 +776,7 @@ mod tests { assert_eq!(memories, *batch.column(1)); // Scan with projections: only ts - let stream = table.scan(&Some(vec![3]), &[], None).await.unwrap(); + let stream = table.scan(Some(&vec![3]), &[], None).await.unwrap(); let stream = stream.execute(0, session_ctx.task_ctx()).unwrap(); let batches = util::collect(stream).await.unwrap(); assert_eq!(1, batches.len()); @@ -819,7 +819,7 @@ mod tests { assert_eq!(test_batch_size, table.insert(insert_req).await.unwrap()); let session_ctx = SessionContext::new(); - let stream = table.scan(&None, &[], None).await.unwrap(); + let stream = table.scan(None, &[], None).await.unwrap(); let stream = stream.execute(0, session_ctx.task_ctx()).unwrap(); let batches = util::collect(stream).await.unwrap(); let mut total = 0; diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index d5f554a994..463e7c866e 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -166,14 +166,14 @@ impl Table for MitoTable { async fn scan( &self, - projection: &Option>, + projection: Option<&Vec>, filters: &[Expr], _limit: Option, ) -> TableResult { let read_ctx = ReadContext::default(); let snapshot = self.region.snapshot(&read_ctx).map_err(TableError::new)?; - let projection = self.transform_projection(&self.region, projection.clone())?; + let projection = self.transform_projection(&self.region, projection.cloned())?; let filters = filters.into(); let scan_request = ScanRequest { projection, diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index 1bb9da358a..a69e34cd25 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -15,12 +15,12 @@ common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } -datafusion = "14.0.0" -datafusion-common = "14.0.0" -datafusion-expr = "14.0.0" -datafusion-optimizer = "14.0.0" -datafusion-physical-expr = "14.0.0" -datafusion-sql = "14.0.0" +datafusion = "15.0" +datafusion-common = "15.0" +datafusion-expr = "15.0" +datafusion-optimizer = "15.0" +datafusion-physical-expr = "15.0" +datafusion-sql = "15.0" datatypes = { path = "../datatypes" } futures = "0.3" futures-util = "0.3" diff --git a/src/query/src/datafusion/planner.rs b/src/query/src/datafusion/planner.rs index 4c87654e3c..f796c2a53d 100644 --- a/src/query/src/datafusion/planner.rs +++ b/src/query/src/datafusion/planner.rs @@ -21,6 +21,7 @@ use datafusion::error::Result as DfResult; use datafusion::physical_plan::udaf::AggregateUDF; use datafusion::physical_plan::udf::ScalarUDF; use datafusion::sql::planner::{ContextProvider, SqlToRel}; +use datafusion_common::ScalarValue; use datafusion_expr::TableSource; use datatypes::arrow::datatypes::DataType; use session::context::QueryContextRef; @@ -126,4 +127,8 @@ impl ContextProvider for DfContextProviderAdapter { fn get_variable_type(&self, variable_names: &[String]) -> Option { self.state.get_variable_type(variable_names) } + + fn get_config_option(&self, variable: &str) -> Option { + self.state.get_config_option(variable) + } } diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index a72b0203e3..2819b579f5 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -27,6 +27,7 @@ use datafusion::execution::context::{SessionConfig, SessionState}; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::physical_plan::udf::ScalarUDF; use datafusion::physical_plan::ExecutionPlan; +use datafusion_common::ScalarValue; use datafusion_expr::{LogicalPlan as DfLogicalPlan, TableSource}; use datafusion_optimizer::optimizer::{Optimizer, OptimizerConfig}; use datafusion_sql::planner::ContextProvider; @@ -144,6 +145,11 @@ impl QueryEngineState { state.get_variable_type(variable_names) } + pub(crate) fn get_config_option(&self, variable: &str) -> Option { + let state = self.df_context.state.read(); + state.get_config_option(variable) + } + pub(crate) fn optimize(&self, plan: &DfLogicalPlan) -> DfResult { self.df_context.optimize(plan) } diff --git a/src/script/Cargo.toml b/src/script/Cargo.toml index 3c36632647..eae3550e5b 100644 --- a/src/script/Cargo.toml +++ b/src/script/Cargo.toml @@ -33,10 +33,10 @@ common-recordbatch = { path = "../common/recordbatch" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } console = "0.15" -datafusion = { version = "14.0.0", optional = true } -datafusion-common = { version = "14.0.0", optional = true } -datafusion-expr = { version = "14.0.0", optional = true } -datafusion-physical-expr = { version = "14.0.0", optional = true } +datafusion = { version = "15.0", optional = true } +datafusion-common = { version = "15.0", optional = true } +datafusion-expr = { version = "15.0", optional = true } +datafusion-physical-expr = { version = "15.0", optional = true } datatypes = { path = "../datatypes" } futures = "0.3" futures-util = "0.3" diff --git a/src/sql/Cargo.toml b/src/sql/Cargo.toml index ebdd0f172b..1d4679bc87 100644 --- a/src/sql/Cargo.toml +++ b/src/sql/Cargo.toml @@ -15,4 +15,4 @@ itertools = "0.10" mito = { path = "../mito" } once_cell = "1.10" snafu = { version = "0.7", features = ["backtraces"] } -sqlparser = "0.26" +sqlparser = "0.27" diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index 12dc8fa58e..362963a4dd 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -253,7 +253,7 @@ impl<'a> ParserContext<'a> { .parse_column_def() .context(SyntaxSnafu { sql: self.sql })?; - if !matches!(column.data_type, DataType::Timestamp(_)) + if !matches!(column.data_type, DataType::Timestamp(_, _)) || matches!(self.parser.peek_token(), Token::Comma) { columns.push(column); @@ -967,7 +967,7 @@ ENGINE=mito"; assert!(result .unwrap_err() .to_string() - .contains("sql parser error: Expected a concrete value, found: MAXVALU")); + .contains("Please provide an extra partition that is bounded by 'MAXVALUE'.")); } fn assert_column_def(column: &ColumnDef, name: &str, data_type: &str) { diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index c9a4d58484..24b1527819 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -300,7 +300,7 @@ pub fn sql_data_type_to_concrete_data_type(data_type: &SqlDataType) -> Result Ok(ConcreteDataType::float64_datatype()), SqlDataType::Boolean => Ok(ConcreteDataType::boolean_datatype()), SqlDataType::Date => Ok(ConcreteDataType::date_datatype()), - SqlDataType::Custom(obj_name) => match &obj_name.0[..] { + SqlDataType::Custom(obj_name, _) => match &obj_name.0[..] { [type_name] => { if type_name .value @@ -319,7 +319,7 @@ pub fn sql_data_type_to_concrete_data_type(data_type: &SqlDataType) -> Result Ok(ConcreteDataType::timestamp_millisecond_datatype()), + SqlDataType::Timestamp(_, _) => Ok(ConcreteDataType::timestamp_millisecond_datatype()), _ => error::SqlTypeNotSupportedSnafu { t: data_type.clone(), } @@ -373,11 +373,11 @@ mod tests { check_type(SqlDataType::Boolean, ConcreteDataType::boolean_datatype()); check_type(SqlDataType::Date, ConcreteDataType::date_datatype()); check_type( - SqlDataType::Custom(ObjectName(vec![Ident::new("datetime")])), + SqlDataType::Custom(ObjectName(vec![Ident::new("datetime")]), vec![]), ConcreteDataType::datetime_datatype(), ); check_type( - SqlDataType::Timestamp(TimezoneInfo::None), + SqlDataType::Timestamp(None, TimezoneInfo::None), ConcreteDataType::timestamp_millisecond_datatype(), ); } diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index 32c220cbab..19e78e8bed 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -21,7 +21,7 @@ futures = "0.3" futures-util = "0.3" lazy_static = "1.4" object-store = { path = "../object-store" } -parquet = { version = "26", features = ["async"] } +parquet = { version = "28.0", features = ["async"] } paste = "1.0" planus = "0.2" prost = "0.11" diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 9adc0e625f..bc42aa5c02 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -12,9 +12,9 @@ common-error = { path = "../common/error" } common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } common-telemetry = { path = "../common/telemetry" } -datafusion = "14.0.0" -datafusion-common = "14.0.0" -datafusion-expr = "14.0.0" +datafusion = "15.0" +datafusion-common = "15.0" +datafusion-expr = "15.0" datatypes = { path = "../datatypes" } derive_builder = "0.11" futures = "0.3" @@ -26,7 +26,7 @@ store-api = { path = "../store-api" } tokio = { version = "1.18", features = ["full"] } [dev-dependencies] -datafusion-expr = "14.0.0" -parquet = { version = "26", features = ["async"] } +datafusion-expr = "15.0" +parquet = { version = "28.0", features = ["async"] } tempdir = "0.3" tokio-util = { version = "0.7", features = ["compat"] } diff --git a/src/table/src/table.rs b/src/table/src/table.rs index 9aff8a061f..d5efd33b2e 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -54,7 +54,7 @@ pub trait Table: Send + Sync { /// Scan the table and returns a SendableRecordBatchStream. async fn scan( &self, - projection: &Option>, + projection: Option<&Vec>, filters: &[Expr], // limit can be used to reduce the amount scanned // from the datasource as a performance optimization. diff --git a/src/table/src/table/adapter.rs b/src/table/src/table/adapter.rs index 98ff82d08a..d9eb5a41e1 100644 --- a/src/table/src/table/adapter.rs +++ b/src/table/src/table/adapter.rs @@ -69,7 +69,7 @@ impl TableProvider for DfTableProviderAdapter { async fn scan( &self, _ctx: &SessionState, - projection: &Option>, + projection: Option<&Vec>, filters: &[DfExpr], limit: Option, ) -> DfResult> { @@ -134,7 +134,7 @@ impl Table for TableAdapter { async fn scan( &self, - projection: &Option>, + projection: Option<&Vec>, filters: &[Expr], limit: Option, ) -> Result { diff --git a/src/table/src/table/numbers.rs b/src/table/src/table/numbers.rs index 7664d8f0fd..473d80dd0a 100644 --- a/src/table/src/table/numbers.rs +++ b/src/table/src/table/numbers.rs @@ -99,7 +99,7 @@ impl Table for NumbersTable { async fn scan( &self, - _projection: &Option>, + _projection: Option<&Vec>, _filters: &[Expr], limit: Option, ) -> Result { diff --git a/src/table/src/test_util/empty_table.rs b/src/table/src/test_util/empty_table.rs index 55b5e4f25d..e9f085000e 100644 --- a/src/table/src/test_util/empty_table.rs +++ b/src/table/src/test_util/empty_table.rs @@ -71,7 +71,7 @@ impl Table for EmptyTable { async fn scan( &self, - _projection: &Option>, + _projection: Option<&Vec>, _filters: &[common_query::prelude::Expr], _limit: Option, ) -> Result { diff --git a/src/table/src/test_util/memtable.rs b/src/table/src/test_util/memtable.rs index c0cd028f45..af92f40eb9 100644 --- a/src/table/src/test_util/memtable.rs +++ b/src/table/src/test_util/memtable.rs @@ -140,7 +140,7 @@ impl Table for MemTable { async fn scan( &self, - projection: &Option>, + projection: Option<&Vec>, _filters: &[Expr], limit: Option, ) -> Result { @@ -211,7 +211,7 @@ mod test { let ctx = SessionContext::new(); let table = build_testing_table(); - let scan_stream = table.scan(&Some(vec![1]), &[], None).await.unwrap(); + let scan_stream = table.scan(Some(&vec![1]), &[], None).await.unwrap(); let scan_stream = scan_stream.execute(0, ctx.task_ctx()).unwrap(); let recordbatch = util::collect(scan_stream).await.unwrap(); assert_eq!(1, recordbatch.len()); @@ -232,7 +232,7 @@ mod test { let ctx = SessionContext::new(); let table = build_testing_table(); - let scan_stream = table.scan(&None, &[], Some(2)).await.unwrap(); + let scan_stream = table.scan(None, &[], Some(2)).await.unwrap(); let scan_stream = scan_stream.execute(0, ctx.task_ctx()).unwrap(); let recordbatch = util::collect(scan_stream).await.unwrap(); assert_eq!(1, recordbatch.len());