chore: bump arrow, parquet, datafusion and tonic (#1386)

* bump arrow, parquet, datafusion, tonic and greptime-proto

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add analyzer and fix test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy warnings

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update sqlness result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-04-15 00:03:15 +08:00
committed by GitHub
parent a5771e2ec3
commit a6e41cdd7b
42 changed files with 294 additions and 252 deletions

198
Cargo.lock generated
View File

@@ -185,8 +185,8 @@ dependencies = [
"greptime-proto",
"prost",
"snafu",
"tonic",
"tonic-build",
"tonic 0.9.1",
"tonic-build 0.9.1",
]
[[package]]
@@ -230,9 +230,9 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6"
[[package]]
name = "arrow"
version = "36.0.0"
version = "37.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "990dfa1a9328504aa135820da1c95066537b69ad94c04881b785f64328e0fa6b"
checksum = "1aea9fcb25bbb70f7f922f95b99ca29c1013dab47f6df61a6f24861842dd7f2e"
dependencies = [
"ahash 0.8.3",
"arrow-arith",
@@ -253,9 +253,9 @@ dependencies = [
[[package]]
name = "arrow-arith"
version = "36.0.0"
version = "37.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2b2e52de0ab54173f9b08232b7184c26af82ee7ab4ac77c83396633c90199fa"
checksum = "8d967b42f7b12c91fd78acd396b20c2973b184c8866846674abbb00c963e93ab"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -268,9 +268,9 @@ dependencies = [
[[package]]
name = "arrow-array"
version = "36.0.0"
version = "37.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e10849b60c17dbabb334be1f4ef7550701aa58082b71335ce1ed586601b2f423"
checksum = "3190f208ee7aa0f3596fa0098d42911dec5e123ca88c002a08b24877ad14c71e"
dependencies = [
"ahash 0.8.3",
"arrow-buffer",
@@ -285,9 +285,9 @@ dependencies = [
[[package]]
name = "arrow-buffer"
version = "36.0.0"
version = "37.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0746ae991b186be39933147117f8339eb1c4bbbea1c8ad37e7bf5851a1a06ba"
checksum = "5d33c733c5b6c44a0fc526f29c09546e04eb56772a7a21e48e602f368be381f6"
dependencies = [
"half 2.2.1",
"num",
@@ -295,9 +295,9 @@ dependencies = [
[[package]]
name = "arrow-cast"
version = "36.0.0"
version = "37.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b88897802515d7b193e38b27ddd9d9e43923d410a9e46307582d756959ee9595"
checksum = "abd349520b6a1ed4924ae2afc9d23330a3044319e4ec3d5b124c09e4d440ae87"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -312,9 +312,9 @@ dependencies = [
[[package]]
name = "arrow-csv"
version = "36.0.0"
version = "37.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c8220d9741fc37961262710ceebd8451a5b393de57c464f0267ffdda1775c0a"
checksum = "c80af3c3e290a2a7e1cc518f1471dff331878cb4af9a5b088bf030b89debf649"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -331,9 +331,9 @@ dependencies = [
[[package]]
name = "arrow-data"
version = "36.0.0"
version = "37.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "533f937efa1aaad9dc86f6a0e382c2fa736a4943e2090c946138079bdf060cef"
checksum = "b1c8361947aaa96d331da9df3f7a08bdd8ab805a449994c97f5c4d24c4b7e2cf"
dependencies = [
"arrow-buffer",
"arrow-schema",
@@ -343,9 +343,9 @@ dependencies = [
[[package]]
name = "arrow-flight"
version = "36.0.0"
version = "37.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd1362a6a02e31734c67eb2e9a30dca1689d306cfe2fc00bc6430512091480ce"
checksum = "bd1fc687f3e4ffe91ccb7f2ffb06143ff97029448d427a9641006242bcbd0c24"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -355,17 +355,17 @@ dependencies = [
"base64 0.21.0",
"bytes",
"futures",
"paste",
"prost",
"prost-derive",
"tokio",
"tonic",
"tonic 0.9.1",
]
[[package]]
name = "arrow-ipc"
version = "36.0.0"
version = "37.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18b75296ff01833f602552dff26a423fc213db8e5049b540ca4a00b1c957e41c"
checksum = "9a46ee000b9fbd1e8db6e8b26acb8c760838512b39d8c9f9d73892cb55351d50"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -377,9 +377,9 @@ dependencies = [
[[package]]
name = "arrow-json"
version = "36.0.0"
version = "37.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e501d3de4d612c90677594896ca6c0fa075665a7ff980dc4189bb531c17e19f6"
checksum = "4bf2366607be867ced681ad7f272371a5cf1fc2941328eef7b4fee14565166fb"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -391,14 +391,15 @@ dependencies = [
"indexmap",
"lexical-core",
"num",
"serde",
"serde_json",
]
[[package]]
name = "arrow-ord"
version = "36.0.0"
version = "37.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33d2671eb3793f9410230ac3efb0e6d36307be8a2dac5fad58ac9abde8e9f01e"
checksum = "304069901c867200e21ec868ae7521165875470ef2f1f6d58f979a443d63997e"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -411,9 +412,9 @@ dependencies = [
[[package]]
name = "arrow-row"
version = "36.0.0"
version = "37.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc11fa039338cebbf4e29cf709c8ac1d6a65c7540063d4a25f991ab255ca85c8"
checksum = "0d57fe8ceef3392fdd493269d8a2d589de17bafce151aacbffbddac7a57f441a"
dependencies = [
"ahash 0.8.3",
"arrow-array",
@@ -426,9 +427,9 @@ dependencies = [
[[package]]
name = "arrow-schema"
version = "36.0.0"
version = "37.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d04f17f7b86ded0b5baf98fe6123391c4343e031acc3ccc5fa604cc180bff220"
checksum = "a16b88a93ac8350f0200b1cd336a1f887315925b8dd7aa145a37b8bdbd8497a4"
dependencies = [
"bitflags 2.0.2",
"serde",
@@ -436,9 +437,9 @@ dependencies = [
[[package]]
name = "arrow-select"
version = "36.0.0"
version = "37.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "163e35de698098ff5f5f672ada9dc1f82533f10407c7a11e2cd09f3bcf31d18a"
checksum = "98e8a4d6ca37d5212439b24caad4d80743fcbb706706200dd174bb98e68fe9d8"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -449,9 +450,9 @@ dependencies = [
[[package]]
name = "arrow-string"
version = "36.0.0"
version = "37.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfdfbed1b10209f0dc68e6aa4c43dc76079af65880965c7c3b73f641f23d4aba"
checksum = "cbb594efa397eb6a546f42b1f8df3d242ea84dbfda5232e06035dc2b2e2c8459"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -1437,7 +1438,7 @@ dependencies = [
"substrait 0.1.1",
"substrait 0.4.2",
"tokio",
"tonic",
"tonic 0.9.1",
"tracing",
"tracing-subscriber",
]
@@ -1644,7 +1645,7 @@ dependencies = [
"rand",
"snafu",
"tokio",
"tonic",
"tonic 0.9.1",
"tower",
]
@@ -1837,7 +1838,7 @@ checksum = "e57ff02e8ad8e06ab9731d5dc72dc23bef9200778eae1a89d555d8c42e5d4a86"
dependencies = [
"prost",
"prost-types",
"tonic",
"tonic 0.8.3",
"tracing-core",
]
@@ -1859,7 +1860,7 @@ dependencies = [
"thread_local",
"tokio",
"tokio-stream",
"tonic",
"tonic 0.8.3",
"tracing",
"tracing-core",
"tracing-subscriber",
@@ -2220,11 +2221,13 @@ dependencies = [
[[package]]
name = "datafusion"
version = "21.1.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=21bf4ffccadfeea824ab6e29c0b872930d0e190a#21bf4ffccadfeea824ab6e29c0b872930d0e190a"
version = "22.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=74a778ca6016a853a3c3add3fa8c6f12f4fe4561#74a778ca6016a853a3c3add3fa8c6f12f4fe4561"
dependencies = [
"ahash 0.8.3",
"arrow",
"arrow-array",
"arrow-schema",
"async-compression",
"async-trait",
"bytes",
@@ -2267,8 +2270,8 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "21.1.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=21bf4ffccadfeea824ab6e29c0b872930d0e190a#21bf4ffccadfeea824ab6e29c0b872930d0e190a"
version = "22.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=74a778ca6016a853a3c3add3fa8c6f12f4fe4561#74a778ca6016a853a3c3add3fa8c6f12f4fe4561"
dependencies = [
"arrow",
"arrow-array",
@@ -2281,8 +2284,8 @@ dependencies = [
[[package]]
name = "datafusion-execution"
version = "21.1.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=21bf4ffccadfeea824ab6e29c0b872930d0e190a#21bf4ffccadfeea824ab6e29c0b872930d0e190a"
version = "22.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=74a778ca6016a853a3c3add3fa8c6f12f4fe4561#74a778ca6016a853a3c3add3fa8c6f12f4fe4561"
dependencies = [
"dashmap",
"datafusion-common",
@@ -2298,8 +2301,8 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "21.1.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=21bf4ffccadfeea824ab6e29c0b872930d0e190a#21bf4ffccadfeea824ab6e29c0b872930d0e190a"
version = "22.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=74a778ca6016a853a3c3add3fa8c6f12f4fe4561#74a778ca6016a853a3c3add3fa8c6f12f4fe4561"
dependencies = [
"ahash 0.8.3",
"arrow",
@@ -2309,8 +2312,8 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
version = "21.1.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=21bf4ffccadfeea824ab6e29c0b872930d0e190a#21bf4ffccadfeea824ab6e29c0b872930d0e190a"
version = "22.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=74a778ca6016a853a3c3add3fa8c6f12f4fe4561#74a778ca6016a853a3c3add3fa8c6f12f4fe4561"
dependencies = [
"arrow",
"async-trait",
@@ -2326,8 +2329,8 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "21.1.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=21bf4ffccadfeea824ab6e29c0b872930d0e190a#21bf4ffccadfeea824ab6e29c0b872930d0e190a"
version = "22.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=74a778ca6016a853a3c3add3fa8c6f12f4fe4561#74a778ca6016a853a3c3add3fa8c6f12f4fe4561"
dependencies = [
"ahash 0.8.3",
"arrow",
@@ -2357,8 +2360,8 @@ dependencies = [
[[package]]
name = "datafusion-row"
version = "21.1.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=21bf4ffccadfeea824ab6e29c0b872930d0e190a#21bf4ffccadfeea824ab6e29c0b872930d0e190a"
version = "22.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=74a778ca6016a853a3c3add3fa8c6f12f4fe4561#74a778ca6016a853a3c3add3fa8c6f12f4fe4561"
dependencies = [
"arrow",
"datafusion-common",
@@ -2368,8 +2371,8 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "21.1.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=21bf4ffccadfeea824ab6e29c0b872930d0e190a#21bf4ffccadfeea824ab6e29c0b872930d0e190a"
version = "22.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=74a778ca6016a853a3c3add3fa8c6f12f4fe4561#74a778ca6016a853a3c3add3fa8c6f12f4fe4561"
dependencies = [
"arrow",
"arrow-schema",
@@ -2440,7 +2443,7 @@ dependencies = [
"tokio",
"tokio-stream",
"toml",
"tonic",
"tonic 0.9.1",
"tower",
"tower-http",
"url",
@@ -2781,8 +2784,8 @@ dependencies = [
"prost",
"tokio",
"tokio-stream",
"tonic",
"tonic-build",
"tonic 0.8.3",
"tonic-build 0.8.4",
"tower",
"tower-service",
]
@@ -2981,7 +2984,7 @@ dependencies = [
"table",
"tokio",
"toml",
"tonic",
"tonic 0.9.1",
"tower",
"uuid",
]
@@ -3245,11 +3248,11 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=59afacdae59eae4241cfaf851021361caaeaed21#59afacdae59eae4241cfaf851021361caaeaed21"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=0bebe5f69c91cdfbce85cb8f45f9fcd28185261c#0bebe5f69c91cdfbce85cb8f45f9fcd28185261c"
dependencies = [
"prost",
"tonic",
"tonic-build",
"tonic 0.9.1",
"tonic-build 0.9.1",
]
[[package]]
@@ -4155,7 +4158,7 @@ dependencies = [
"table",
"tokio",
"tokio-stream",
"tonic",
"tonic 0.9.1",
"tower",
"tracing",
"tracing-subscriber",
@@ -4196,7 +4199,7 @@ dependencies = [
"table",
"tokio",
"tokio-stream",
"tonic",
"tonic 0.9.1",
"tower",
"tracing",
"tracing-subscriber",
@@ -4993,9 +4996,9 @@ dependencies = [
[[package]]
name = "parquet"
version = "36.0.0"
version = "37.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "321a15f8332645759f29875b07f8233d16ed8ec1b3582223de81625a9f8506b7"
checksum = "b5022d98333271f4ca3e87bab760498e61726bf5a6ca919123c80517e20ded29"
dependencies = [
"ahash 0.8.3",
"arrow-array",
@@ -7242,7 +7245,7 @@ dependencies = [
"tokio-rustls 0.24.0",
"tokio-stream",
"tokio-test",
"tonic",
"tonic 0.9.1",
"tonic-reflection",
"tower",
"tower-http",
@@ -7560,9 +7563,9 @@ dependencies = [
[[package]]
name = "sqlparser"
version = "0.32.0"
version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0366f270dbabb5cc2e4c88427dc4c08bba144f81e32fbd459a013f26a4d16aa0"
checksum = "355dc4d4b6207ca8a3434fc587db0a8016130a574dbcdbfb93d7f7b5bc5b211a"
dependencies = [
"log",
"sqlparser_derive",
@@ -7662,8 +7665,8 @@ dependencies = [
"table",
"tokio",
"tokio-util",
"tonic",
"tonic-build",
"tonic 0.9.1",
"tonic-build 0.9.1",
"uuid",
]
@@ -8449,9 +8452,7 @@ dependencies = [
"pin-project",
"prost",
"prost-derive",
"rustls-pemfile",
"tokio",
"tokio-rustls 0.23.4",
"tokio-stream",
"tokio-util",
"tower",
@@ -8461,6 +8462,37 @@ dependencies = [
"tracing-futures",
]
[[package]]
name = "tonic"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38bd8e87955eb13c1986671838177d6792cdc52af9bffced0d2c8a9a7f741ab3"
dependencies = [
"async-stream",
"async-trait",
"axum",
"base64 0.21.0",
"bytes",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"hyper-timeout",
"percent-encoding",
"pin-project",
"prost",
"rustls-pemfile",
"tokio",
"tokio-rustls 0.24.0",
"tokio-stream",
"tower",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tonic-build"
version = "0.8.4"
@@ -8475,17 +8507,29 @@ dependencies = [
]
[[package]]
name = "tonic-reflection"
version = "0.6.0"
name = "tonic-build"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67494bad4dda4c9bffae901dfe14e2b2c0f760adb4706dc10beeb81799f7f7b2"
checksum = "0f60a933bbea70c95d633c04c951197ddf084958abaa2ed502a3743bdd8d8dd7"
dependencies = [
"prettyplease",
"proc-macro2",
"prost-build",
"quote",
"syn 1.0.109",
]
[[package]]
name = "tonic-reflection"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c6980583b9694a05bb2deb0678f72c44564ff52b91cd22642f2f992cd8dd02f"
dependencies = [
"bytes",
"prost",
"prost-types",
"tokio",
"tokio-stream",
"tonic",
"tonic 0.9.1",
]
[[package]]

View File

@@ -52,33 +52,33 @@ edition = "2021"
license = "Apache-2.0"
[workspace.dependencies]
arrow = { version = "36.0" }
arrow-array = "36.0"
arrow-flight = "36.0"
arrow-schema = { version = "36.0", features = ["serde"] }
arrow = { version = "37.0" }
arrow-array = "37.0"
arrow-flight = "37.0"
arrow-schema = { version = "37.0", features = ["serde"] }
async-stream = "0.3"
async-trait = "0.1"
chrono = { version = "0.4", features = ["serde"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "21bf4ffccadfeea824ab6e29c0b872930d0e190a" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "21bf4ffccadfeea824ab6e29c0b872930d0e190a" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "21bf4ffccadfeea824ab6e29c0b872930d0e190a" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "21bf4ffccadfeea824ab6e29c0b872930d0e190a" }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "21bf4ffccadfeea824ab6e29c0b872930d0e190a" }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "21bf4ffccadfeea824ab6e29c0b872930d0e190a" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "74a778ca6016a853a3c3add3fa8c6f12f4fe4561" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "74a778ca6016a853a3c3add3fa8c6f12f4fe4561" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "74a778ca6016a853a3c3add3fa8c6f12f4fe4561" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "74a778ca6016a853a3c3add3fa8c6f12f4fe4561" }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "74a778ca6016a853a3c3add3fa8c6f12f4fe4561" }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "74a778ca6016a853a3c3add3fa8c6f12f4fe4561" }
futures = "0.3"
futures-util = "0.3"
parquet = "36.0"
parquet = "37.0"
paste = "1.0"
prost = "0.11"
rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
snafu = { version = "0.7", features = ["backtraces"] }
sqlparser = "0.32"
sqlparser = "0.33"
tempfile = "3"
tokio = { version = "1.24.2", features = ["full"] }
tokio-util = { version = "0.7", features = ["io-util"] }
tonic = { version = "0.8", features = ["tls"] }
tonic = { version = "0.9", features = ["tls"] }
uuid = { version = "1", features = ["serde", "v4", "fast-rng"] }
[profile.release]

View File

@@ -131,7 +131,7 @@ fn convert_record_batch(record_batch: RecordBatch) -> (Vec<Column>, u32) {
column_name: field.name().clone(),
values: Some(values),
null_mask: array
.data()
.to_data()
.nulls()
.map(|bitmap| bitmap.buffer().as_slice().to_vec())
.unwrap_or_default(),
@@ -225,7 +225,7 @@ fn build_values(column: &ArrayRef) -> (Values, ColumnDataType) {
| DataType::FixedSizeList(_, _)
| DataType::LargeList(_)
| DataType::Struct(_)
| DataType::Union(_, _, _)
| DataType::Union(_, _)
| DataType::Dictionary(_, _)
| DataType::Decimal128(_, _)
| DataType::Decimal256(_, _)

View File

@@ -10,10 +10,10 @@ common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-time = { path = "../common/time" }
datatypes = { path = "../datatypes" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "59afacdae59eae4241cfaf851021361caaeaed21" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0bebe5f69c91cdfbce85cb8f45f9fcd28185261c" }
prost.workspace = true
snafu = { version = "0.7", features = ["backtraces"] }
tonic.workspace = true
[build-dependencies]
tonic-build = "0.8"
tonic-build = "0.9"

View File

@@ -212,7 +212,7 @@ fn build_calc_fn(
fn calc(input: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
assert_eq!(input.len(), #num_params);
#( let #range_array_names = RangeArray::try_new(extract_array(&input[#param_numbers])?.data().clone().into())?; )*
#( let #range_array_names = RangeArray::try_new(extract_array(&input[#param_numbers])?.to_data().into())?; )*
// TODO(ruihang): add ensure!()

View File

@@ -752,6 +752,14 @@ mod utils {
BuiltinScalarFunction::Uuid => "uuid",
BuiltinScalarFunction::Struct => "struct",
BuiltinScalarFunction::ArrowTypeof => "arrow_type_of",
BuiltinScalarFunction::Acosh => "acosh",
BuiltinScalarFunction::Asinh => "asinh",
BuiltinScalarFunction::Atanh => "atanh",
BuiltinScalarFunction::Cbrt => "cbrt",
BuiltinScalarFunction::Cosh => "cosh",
BuiltinScalarFunction::Pi => "pi",
BuiltinScalarFunction::Sinh => "sinh",
BuiltinScalarFunction::Tanh => "tanh",
}
}
}

View File

@@ -236,7 +236,7 @@ mod tests {
(
DataType::Timestamp(
datatypes::arrow::datatypes::TimeUnit::Second,
Some("UTC".to_string()),
Some("UTC".into()),
),
true,
),
@@ -251,14 +251,14 @@ mod tests {
(
DataType::Timestamp(
datatypes::arrow::datatypes::TimeUnit::Second,
Some("UTC".to_string()),
Some("UTC".into()),
),
true,
),
(
DataType::Timestamp(
datatypes::arrow::datatypes::TimeUnit::Second,
Some("PDT".to_string()),
Some("PDT".into()),
),
true,
),
@@ -269,14 +269,14 @@ mod tests {
(
DataType::Timestamp(
datatypes::arrow::datatypes::TimeUnit::Second,
Some("UTC".to_string()),
Some("UTC".into()),
),
true,
),
(
DataType::Timestamp(
datatypes::arrow::datatypes::TimeUnit::Millisecond,
Some("UTC".to_string()),
Some("UTC".into()),
),
true,
),

View File

@@ -10,7 +10,7 @@ test = []
[dependencies]
arrow.workspace = true
arrow-array = "36"
arrow-array.workspace = true
arrow-schema.workspace = true
common-base = { path = "../common/base" }
common-error = { path = "../common/error" }

View File

@@ -362,7 +362,7 @@ mod tests {
ConcreteDataType::String(_)
));
assert_eq!(
ConcreteDataType::from_arrow_type(&ArrowDataType::List(Box::new(Field::new(
ConcreteDataType::from_arrow_type(&ArrowDataType::List(Arc::new(Field::new(
"item",
ArrowDataType::Int32,
true,

View File

@@ -272,7 +272,7 @@ impl TryFrom<Arc<ArrowSchema>> for Schema {
let mut column_schemas = Vec::with_capacity(arrow_schema.fields.len());
let mut name_to_index = HashMap::with_capacity(arrow_schema.fields.len());
for field in &arrow_schema.fields {
let column_schema = ColumnSchema::try_from(field)?;
let column_schema = ColumnSchema::try_from(field.as_ref())?;
name_to_index.insert(field.name().to_string(), column_schemas.len());
column_schemas.push(column_schema);
}

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use arrow::datatypes::{DataType as ArrowDataType, Field};
use serde::{Deserialize, Serialize};
@@ -63,7 +65,7 @@ impl DataType for ListType {
}
fn as_arrow_type(&self) -> ArrowDataType {
let field = Box::new(Field::new("item", self.item_type.as_arrow_type(), true));
let field = Arc::new(Field::new("item", self.item_type.as_arrow_type(), true));
ArrowDataType::List(field)
}
@@ -94,7 +96,7 @@ mod tests {
t.default_value()
);
assert_eq!(
ArrowDataType::List(Box::new(Field::new("item", ArrowDataType::Boolean, true))),
ArrowDataType::List(Arc::new(Field::new("item", ArrowDataType::Boolean, true))),
t.as_arrow_type()
);
assert_eq!(ConcreteDataType::boolean_datatype(), *t.item_type());

View File

@@ -15,6 +15,7 @@
use std::cmp::Ordering;
use std::fmt::{Display, Formatter};
use std::str::FromStr;
use std::sync::Arc;
use arrow::datatypes::{DataType as ArrowDataType, Field};
use common_base::bytes::{Bytes, StringBytes};
@@ -271,7 +272,7 @@ fn to_null_value(output_type: &ConcreteDataType) -> ScalarValue {
ConcreteDataType::DateTime(_) => ScalarValue::Date64(None),
ConcreteDataType::Timestamp(t) => timestamp_to_scalar_value(t.unit(), None),
ConcreteDataType::List(_) => {
ScalarValue::List(None, Box::new(new_item_field(output_type.as_arrow_type())))
ScalarValue::List(None, Arc::new(new_item_field(output_type.as_arrow_type())))
}
ConcreteDataType::Dictionary(dict) => ScalarValue::Dictionary(
Box::new(dict.key_type().as_arrow_type()),
@@ -490,7 +491,7 @@ impl ListValue {
Ok(ScalarValue::List(
vs,
Box::new(new_item_field(output_type.item_type().as_arrow_type())),
Arc::new(new_item_field(output_type.item_type().as_arrow_type())),
))
}
}

View File

@@ -217,8 +217,7 @@ macro_rules! impl_try_from_arrow_array_for_vector {
.with_context(|| crate::error::ConversionSnafu {
from: std::format!("{:?}", array.as_ref().data_type()),
})?
.data()
.clone();
.to_data();
let concrete_array = $Array::from(data);
Ok($Vector::from(concrete_array))
@@ -229,7 +228,7 @@ macro_rules! impl_try_from_arrow_array_for_vector {
macro_rules! impl_validity_for_vector {
($array: expr) => {
Validity::from_array_data($array.data())
Validity::from_array_data($array.to_data())
};
}

View File

@@ -38,13 +38,7 @@ impl BinaryVector {
}
fn to_array_data(&self) -> ArrayData {
self.array.data().clone()
}
fn from_array_data(data: ArrayData) -> BinaryVector {
BinaryVector {
array: BinaryArray::from(data),
}
self.array.to_data()
}
}
@@ -106,8 +100,8 @@ impl Vector for BinaryVector {
}
fn slice(&self, offset: usize, length: usize) -> VectorRef {
let data = self.array.data().slice(offset, length);
Arc::new(Self::from_array_data(data))
let array = self.array.slice(offset, length);
Arc::new(Self { array })
}
fn get(&self, index: usize) -> Value {
@@ -286,7 +280,7 @@ mod tests {
#[test]
fn test_from_arrow_array() {
let arrow_array = BinaryArray::from_iter_values([vec![1, 2, 3], vec![1, 2, 3]]);
let original = BinaryArray::from(arrow_array.data().clone());
let original = BinaryArray::from(arrow_array.to_data());
let vector = BinaryVector::from(arrow_array);
assert_eq!(original, vector.array);
}

View File

@@ -44,7 +44,7 @@ impl BooleanVector {
}
fn to_array_data(&self) -> ArrayData {
self.array.data().clone()
self.array.to_data()
}
fn from_array_data(data: ArrayData) -> BooleanVector {
@@ -132,7 +132,7 @@ impl Vector for BooleanVector {
}
fn slice(&self, offset: usize, length: usize) -> VectorRef {
let data = self.array.data().slice(offset, length);
let data = self.array.to_data().slice(offset, length);
Arc::new(Self::from_array_data(data))
}
@@ -259,7 +259,7 @@ mod tests {
assert!(!v.is_const());
assert!(v.validity().is_all_valid());
assert!(!v.only_null());
assert_eq!(64, v.memory_size());
assert_eq!(2, v.memory_size());
for (i, b) in bools.iter().enumerate() {
assert!(!v.is_null(i));

View File

@@ -24,7 +24,7 @@ pub type DateVectorBuilder = PrimitiveVectorBuilder<DateType>;
mod tests {
use std::sync::Arc;
use arrow::array::Array;
use arrow_array::ArrayRef;
use common_time::date::Date;
use super::*;
@@ -84,7 +84,7 @@ mod tests {
#[test]
fn test_date_from_arrow() {
let vector = DateVector::from_slice([1, 2]);
let arrow = vector.as_arrow().slice(0, vector.len());
let arrow: ArrayRef = Arc::new(vector.as_arrow().slice(0, vector.len()));
let vector2 = DateVector::try_from_arrow_array(&arrow).unwrap();
assert_eq!(vector, vector2);
}

View File

@@ -25,6 +25,7 @@ mod tests {
use std::sync::Arc;
use arrow::array::{Array, PrimitiveArray};
use arrow_array::ArrayRef;
use common_time::DateTime;
use datafusion_common::from_slice::FromSlice;
@@ -108,8 +109,8 @@ mod tests {
#[test]
fn test_datetime_from_arrow() {
let vector = DateTimeVector::from_wrapper_slice([DateTime::new(1), DateTime::new(2)]);
let arrow = vector.as_arrow().slice(0, vector.len());
let vector2 = DateTimeVector::try_from_arrow_array(&arrow).unwrap();
let arrow: ArrayRef = Arc::new(vector.as_arrow().slice(0, vector.len())) as _;
let vector2 = DateTimeVector::try_from_arrow_array(arrow).unwrap();
assert_eq!(vector, vector2);
}
}

View File

@@ -262,7 +262,7 @@ impl Helper {
| ArrowDataType::LargeList(_)
| ArrowDataType::FixedSizeList(_, _)
| ArrowDataType::Struct(_)
| ArrowDataType::Union(_, _, _)
| ArrowDataType::Union(_, _)
| ArrowDataType::Dictionary(_, _)
| ArrowDataType::Decimal128(_, _)
| ArrowDataType::Decimal256(_, _)
@@ -359,7 +359,7 @@ mod tests {
ScalarValue::Int32(Some(1)),
ScalarValue::Int32(Some(2)),
]),
Box::new(Field::new("item", ArrowDataType::Int32, true)),
Arc::new(Field::new("item", ArrowDataType::Int32, true)),
);
let vector = Helper::try_from_scalar_value(value, 3).unwrap();
assert_eq!(

View File

@@ -47,7 +47,7 @@ impl ListVector {
}
fn to_array_data(&self) -> ArrayData {
self.array.data().clone()
self.array.to_data()
}
fn from_array_data_and_type(data: ArrayData, item_type: ConcreteDataType) -> Self {
@@ -106,7 +106,7 @@ impl Vector for ListVector {
}
fn slice(&self, offset: usize, length: usize) -> VectorRef {
let data = self.array.data().slice(offset, length);
let data = self.array.to_data().slice(offset, length);
Arc::new(Self::from_array_data_and_type(data, self.item_type.clone()))
}
@@ -345,7 +345,7 @@ impl ScalarVectorBuilder for ListVectorBuilder {
let len = self.len();
let values_vector = self.values_builder.to_vector();
let values_arr = values_vector.to_arrow_array();
let values_data = values_arr.data();
let values_data = values_arr.to_data();
let offset_buffer = self.offsets_builder.finish();
let null_bit_buffer = self.null_buffer_builder.finish();
@@ -355,7 +355,7 @@ impl ScalarVectorBuilder for ListVectorBuilder {
let array_data_builder = ArrayData::builder(data_type)
.len(len)
.add_buffer(offset_buffer)
.add_child_data(values_data.clone())
.add_child_data(values_data)
.null_bit_buffer(null_bit_buffer);
let array_data = unsafe { array_data_builder.build_unchecked() };

View File

@@ -46,7 +46,7 @@ impl NullVector {
}
fn to_array_data(&self) -> ArrayData {
self.array.data().clone()
self.array.to_data()
}
}

View File

@@ -67,8 +67,7 @@ impl<T: LogicalPrimitiveType> PrimitiveVector<T> {
.with_context(|| error::ConversionSnafu {
from: format!("{:?}", array.as_ref().data_type()),
})?
.data()
.clone();
.to_data();
let concrete_array = PrimitiveArray::<T::ArrowPrimitive>::from(data);
Ok(Self::new(concrete_array))
}
@@ -82,30 +81,26 @@ impl<T: LogicalPrimitiveType> PrimitiveVector<T> {
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap()
.with_timezone_opt(None)
.data()
.clone(),
.with_timezone_opt(None::<String>)
.to_data(),
arrow_schema::TimeUnit::Millisecond => array
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.with_timezone_opt(None)
.data()
.clone(),
.with_timezone_opt(None::<String>)
.to_data(),
arrow_schema::TimeUnit::Microsecond => array
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap()
.with_timezone_opt(None)
.data()
.clone(),
.with_timezone_opt(None::<String>)
.to_data(),
arrow_schema::TimeUnit::Nanosecond => array
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap()
.with_timezone_opt(None)
.data()
.clone(),
.with_timezone_opt(None::<String>)
.to_data(),
},
_ => {
unreachable!()
@@ -146,7 +141,7 @@ impl<T: LogicalPrimitiveType> PrimitiveVector<T> {
}
fn to_array_data(&self) -> ArrayData {
self.array.data().clone()
self.array.to_data()
}
fn from_array_data(data: ArrayData) -> Self {
@@ -157,7 +152,7 @@ impl<T: LogicalPrimitiveType> PrimitiveVector<T> {
// To distinguish with `Vector::slice()`.
fn get_slice(&self, offset: usize, length: usize) -> Self {
let data = self.array.data().slice(offset, length);
let data = self.array.to_data().slice(offset, length);
Self::from_array_data(data)
}
}
@@ -206,7 +201,7 @@ impl<T: LogicalPrimitiveType> Vector for PrimitiveVector<T> {
}
fn slice(&self, offset: usize, length: usize) -> VectorRef {
let data = self.array.data().slice(offset, length);
let data = self.array.to_data().slice(offset, length);
Arc::new(Self::from_array_data(data))
}

View File

@@ -38,7 +38,7 @@ impl StringVector {
}
fn to_array_data(&self) -> ArrayData {
self.array.data().clone()
self.array.to_data()
}
fn from_array_data(data: ArrayData) -> Self {
@@ -146,7 +146,7 @@ impl Vector for StringVector {
}
fn slice(&self, offset: usize, length: usize) -> VectorRef {
let data = self.array.data().slice(offset, length);
let data = self.array.to_data().slice(offset, length);
Arc::new(Self::from_array_data(data))
}

View File

@@ -16,10 +16,10 @@ use arrow::array::ArrayData;
use arrow::buffer::NullBuffer;
#[derive(Debug, PartialEq)]
enum ValidityKind<'a> {
enum ValidityKind {
/// Whether the array slot is valid or not (null).
Slots {
bitmap: &'a NullBuffer,
bitmap: NullBuffer,
len: usize,
null_count: usize,
},
@@ -31,17 +31,17 @@ enum ValidityKind<'a> {
/// Validity of a vector.
#[derive(Debug, PartialEq)]
pub struct Validity<'a> {
kind: ValidityKind<'a>,
pub struct Validity {
kind: ValidityKind,
}
impl<'a> Validity<'a> {
impl Validity {
/// Creates a `Validity` from [`ArrayData`].
pub fn from_array_data(data: &'a ArrayData) -> Validity<'a> {
pub fn from_array_data(data: ArrayData) -> Validity {
match data.nulls() {
Some(bitmap) => Validity {
Some(null_buf) => Validity {
kind: ValidityKind::Slots {
bitmap,
bitmap: null_buf.clone(),
len: data.len(),
null_count: data.null_count(),
},
@@ -51,14 +51,14 @@ impl<'a> Validity<'a> {
}
/// Returns `Validity` that all elements are valid.
pub fn all_valid(len: usize) -> Validity<'a> {
pub fn all_valid(len: usize) -> Validity {
Validity {
kind: ValidityKind::AllValid { len },
}
}
/// Returns `Validity` that all elements are null.
pub fn all_null(len: usize) -> Validity<'a> {
pub fn all_null(len: usize) -> Validity {
Validity {
kind: ValidityKind::AllNull { len },
}
@@ -66,9 +66,9 @@ impl<'a> Validity<'a> {
/// Returns whether `i-th` bit is set.
pub fn is_set(&self, i: usize) -> bool {
match self.kind {
match &self.kind {
ValidityKind::Slots { bitmap, .. } => bitmap.is_valid(i),
ValidityKind::AllValid { len } => i < len,
ValidityKind::AllValid { len } => i < *len,
ValidityKind::AllNull { .. } => false,
}
}
@@ -136,7 +136,7 @@ mod tests {
#[test]
fn test_from_array_data() {
let array = Int32Array::from_iter([None, Some(1), None]);
let validity = Validity::from_array_data(array.data());
let validity = Validity::from_array_data(array.to_data());
assert_eq!(2, validity.null_count());
assert!(!validity.is_set(0));
assert!(validity.is_set(1));
@@ -145,13 +145,13 @@ mod tests {
assert!(!validity.is_all_valid());
let array = Int32Array::from_iter([None, None]);
let validity = Validity::from_array_data(array.data());
let validity = Validity::from_array_data(array.to_data());
assert!(validity.is_all_null());
assert!(!validity.is_all_valid());
assert_eq!(2, validity.null_count());
let array = Int32Array::from_iter_values([1, 2]);
let validity = Validity::from_array_data(array.data());
let validity = Validity::from_array_data(array.to_data());
assert!(!validity.is_all_null());
assert!(validity.is_all_valid());
assert_eq!(0, validity.null_count());

View File

@@ -382,11 +382,11 @@ mod test {
use catalog::error::Result;
use catalog::remote::{KvBackend, ValueIter};
use common_query::physical_plan::DfPhysicalPlanAdapter;
use common_query::DfPhysicalPlan;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::expressions::{col as physical_col, PhysicalSortExpr};
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use datafusion::sql::sqlparser;
use datafusion_expr::expr_fn::{and, binary_expr, col, or};
@@ -764,15 +764,13 @@ mod test {
let merge =
CoalescePartitionsExec::new(Arc::new(DfPhysicalPlanAdapter(table_scan.clone())));
let sort = SortExec::try_new(
let sort = SortExec::new(
vec![PhysicalSortExpr {
expr: physical_col("a", table_scan.schema().arrow_schema()).unwrap(),
options: SortOptions::default(),
}],
Arc::new(merge),
None,
)
.unwrap();
);
assert_eq!(sort.output_partitioning().partition_count(), 1);
let session_ctx = SessionContext::new();

View File

@@ -20,7 +20,7 @@ use std::task::{Context, Poll};
use datafusion::arrow::array::{Array, ArrayRef, Int64Array, TimestampMillisecondArray};
use datafusion::arrow::compute;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::datatypes::{Field, SchemaRef};
use datafusion::arrow::error::ArrowError;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::{DFField, DFSchema, DFSchemaRef};
@@ -109,13 +109,13 @@ impl RangeManipulate {
let Some(ts_col_index) = input_schema.index_of_column_by_name(None, time_index)? else {
return Err(datafusion::common::field_not_found(None::<TableReference>, time_index, input_schema.as_ref()))
};
let timestamp_range_field = columns[ts_col_index]
.field()
.clone()
.with_name(Self::build_timestamp_range_name(time_index));
columns.push(DFField::from(RangeArray::convert_field(
&timestamp_range_field,
)));
let ts_col_field = columns[ts_col_index].field();
let timestamp_range_field = Field::new(
Self::build_timestamp_range_name(time_index),
RangeArray::convert_field(ts_col_field).data_type().clone(),
ts_col_field.is_nullable(),
);
columns.push(DFField::from(timestamp_range_field));
// process value columns
for name in field_columns {

View File

@@ -81,9 +81,9 @@ impl<const IS_COUNTER: bool, const IS_RATE: bool> ExtrapolatedRate<IS_COUNTER, I
// construct matrix from input
let ts_array = extract_array(&input[0])?;
let ts_range = RangeArray::try_new(ts_array.data().clone().into())?;
let ts_range = RangeArray::try_new(ts_array.to_data().into())?;
let value_array = extract_array(&input[1])?;
let value_range = RangeArray::try_new(value_array.data().clone().into())?;
let value_range = RangeArray::try_new(value_array.to_data().into())?;
let ts = extract_array(&input[2])?;
let ts = ts
.as_any()

View File

@@ -87,8 +87,8 @@ impl HoltWinters {
let ts_array = extract_array(&input[0])?;
let value_array = extract_array(&input[1])?;
let ts_range: RangeArray = RangeArray::try_new(ts_array.data().clone().into())?;
let value_range: RangeArray = RangeArray::try_new(value_array.data().clone().into())?;
let ts_range: RangeArray = RangeArray::try_new(ts_array.to_data().into())?;
let value_range: RangeArray = RangeArray::try_new(value_array.to_data().into())?;
error::ensure(
ts_range.len() == value_range.len(),

View File

@@ -71,8 +71,8 @@ impl<const IS_RATE: bool> IDelta<IS_RATE> {
let ts_array = extract_array(&input[0])?;
let value_array = extract_array(&input[1])?;
let ts_range: RangeArray = RangeArray::try_new(ts_array.data().clone().into())?;
let value_range: RangeArray = RangeArray::try_new(value_array.data().clone().into())?;
let ts_range: RangeArray = RangeArray::try_new(ts_array.to_data().into())?;
let value_range: RangeArray = RangeArray::try_new(value_array.to_data().into())?;
error::ensure(
ts_range.len() == value_range.len(),
DataFusionError::Execution(format!(

View File

@@ -70,8 +70,8 @@ impl QuantileOverTime {
let ts_array = extract_array(&input[0])?;
let value_array = extract_array(&input[1])?;
let ts_range: RangeArray = RangeArray::try_new(ts_array.data().clone().into())?;
let value_range: RangeArray = RangeArray::try_new(value_array.data().clone().into())?;
let ts_range: RangeArray = RangeArray::try_new(ts_array.to_data().into())?;
let value_range: RangeArray = RangeArray::try_new(value_array.to_data().into())?;
error::ensure(
ts_range.len() == value_range.len(),
DataFusionError::Execution(format!(

View File

@@ -14,6 +14,8 @@
//An extended "array" based on [DictionaryArray].
use std::sync::Arc;
use datafusion::arrow::buffer::NullBuffer;
use datafusion::arrow::datatypes::Field;
use datatypes::arrow::array::{Array, ArrayData, ArrayRef, DictionaryArray, Int64Array};
@@ -123,13 +125,13 @@ impl RangeArray {
Box::new(values.data_type().clone()),
))
.len(key_array.len())
.add_buffer(key_array.data().buffers()[0].clone())
.add_child_data(values.data().clone());
match key_array.data().nulls() {
Some(buffer) if key_array.data().null_count() > 0 => {
.add_buffer(key_array.to_data().buffers()[0].clone())
.add_child_data(values.to_data());
match key_array.to_data().nulls() {
Some(buffer) if key_array.to_data().null_count() > 0 => {
data = data
.nulls(Some(buffer.clone()))
.null_count(key_array.data().null_count());
.null_count(key_array.to_data().null_count());
}
_ => data = data.null_count(0),
}
@@ -217,6 +219,7 @@ impl Array for RangeArray {
self
}
#[allow(deprecated)]
fn data(&self) -> &ArrayData {
self.array.data()
}
@@ -230,7 +233,7 @@ impl Array for RangeArray {
}
fn slice(&self, offset: usize, length: usize) -> ArrayRef {
self.array.slice(offset, length)
Arc::new(self.array.slice(offset, length))
}
fn nulls(&self) -> Option<&NullBuffer> {

View File

@@ -13,16 +13,15 @@
// limitations under the License.
use std::str::FromStr;
use std::sync::Arc;
use common_time::timestamp::{TimeUnit, Timestamp};
use datafusion::optimizer::optimizer::OptimizerRule;
use datafusion::optimizer::OptimizerConfig;
use datafusion_common::tree_node::{TreeNode, TreeNodeRewriter};
use datafusion::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
use datafusion_common::{DFSchemaRef, DataFusionError, Result, ScalarValue};
use datafusion_expr::{
Between, BinaryExpr, Expr, ExprSchemable, Filter, LogicalPlan, Operator, TableScan,
};
use datafusion_optimizer::analyzer::AnalyzerRule;
use datatypes::arrow::compute;
use datatypes::arrow::datatypes::DataType;
@@ -33,25 +32,20 @@ use datatypes::arrow::datatypes::DataType;
/// - string literal of boolean is converted to `Expr::Literal(ScalarValue::Boolean)`
pub struct TypeConversionRule;
impl OptimizerRule for TypeConversionRule {
impl AnalyzerRule for TypeConversionRule {
// TODO(ruihang): fix this warning
#[allow(deprecated)]
fn try_optimize(
&self,
plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
let mut converter = TypeConverter {
schemas: plan.all_schemas(),
};
match plan {
fn analyze(&self, plan: LogicalPlan, _config: &ConfigOptions) -> Result<LogicalPlan> {
let schemas = plan.all_schemas().into_iter().cloned().collect::<Vec<_>>();
plan.transform(&|plan| match plan {
LogicalPlan::Filter(filter) => {
let mut converter = TypeConverter {
schemas: schemas.clone(),
};
let rewritten = filter.predicate.clone().rewrite(&mut converter)?;
let Some(plan) = self.try_optimize(&filter.input, _config)? else { return Ok(None) };
Ok(Some(LogicalPlan::Filter(Filter::try_new(
Ok(Transformed::Yes(LogicalPlan::Filter(Filter::try_new(
rewritten,
Arc::new(plan),
filter.input,
)?)))
}
LogicalPlan::TableScan(TableScan {
@@ -62,18 +56,20 @@ impl OptimizerRule for TypeConversionRule {
filters,
fetch,
}) => {
let mut converter = TypeConverter {
schemas: schemas.clone(),
};
let rewrite_filters = filters
.clone()
.into_iter()
.map(|e| e.rewrite(&mut converter))
.collect::<Result<Vec<_>>>()?;
Ok(Some(LogicalPlan::TableScan(TableScan {
Ok(Transformed::Yes(LogicalPlan::TableScan(TableScan {
table_name: table_name.clone(),
source: source.clone(),
projection: projection.clone(),
projected_schema: projected_schema.clone(),
projection,
projected_schema,
filters: rewrite_filters,
fetch: *fetch,
fetch,
})))
}
LogicalPlan::Projection { .. }
@@ -94,20 +90,17 @@ impl OptimizerRule for TypeConversionRule {
| LogicalPlan::Distinct { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::Analyze { .. } => {
let inputs = plan.inputs();
let mut new_inputs = Vec::with_capacity(inputs.len());
for input in inputs {
let Some(plan) = self.try_optimize(input, _config)? else { return Ok(None) };
new_inputs.push(plan);
}
let mut converter = TypeConverter {
schemas: plan.all_schemas().into_iter().cloned().collect(),
};
let inputs = plan.inputs().into_iter().cloned().collect::<Vec<_>>();
let expr = plan
.expressions()
.into_iter()
.map(|e| e.rewrite(&mut converter))
.collect::<Result<Vec<_>>>()?;
datafusion_expr::utils::from_plan(plan, &expr, &new_inputs).map(Some)
datafusion_expr::utils::from_plan(&plan, &expr, &inputs).map(Transformed::Yes)
}
LogicalPlan::Subquery { .. }
@@ -120,8 +113,8 @@ impl OptimizerRule for TypeConversionRule {
| LogicalPlan::Dml(_)
| LogicalPlan::DescribeTable(_)
| LogicalPlan::Unnest(_)
| LogicalPlan::Statement(_) => Ok(Some(plan.clone())),
}
| LogicalPlan::Statement(_) => Ok(Transformed::No(plan)),
})
}
fn name(&self) -> &str {
@@ -129,11 +122,11 @@ impl OptimizerRule for TypeConversionRule {
}
}
struct TypeConverter<'a> {
schemas: Vec<&'a DFSchemaRef>,
struct TypeConverter {
schemas: Vec<DFSchemaRef>,
}
impl<'a> TypeConverter<'a> {
impl TypeConverter {
fn column_type(&self, expr: &Expr) -> Option<DataType> {
if let Expr::Column(_) = expr {
for schema in &self.schemas {
@@ -200,7 +193,7 @@ impl<'a> TypeConverter<'a> {
}
}
impl<'a> TreeNodeRewriter for TypeConverter<'a> {
impl TreeNodeRewriter for TypeConverter {
type N = Expr;
fn mutate(&mut self, expr: Expr) -> Result<Expr> {
@@ -299,6 +292,7 @@ fn string_to_timestamp_ms(string: &str) -> Result<ScalarValue> {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use datafusion_common::{Column, DFField, DFSchema};
use datafusion_sql::TableReference;
@@ -371,7 +365,7 @@ mod tests {
.unwrap(),
);
let mut converter = TypeConverter {
schemas: vec![&schema_ref],
schemas: vec![schema_ref],
};
assert_eq!(
@@ -404,7 +398,7 @@ mod tests {
.unwrap(),
);
let mut converter = TypeConverter {
schemas: vec![&schema_ref],
schemas: vec![schema_ref],
};
assert_eq!(

View File

@@ -28,7 +28,7 @@ use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::planner::DefaultPhysicalPlanner;
use datafusion::physical_plan::{ExecutionPlan, PhysicalPlanner};
use datafusion_expr::LogicalPlan as DfLogicalPlan;
use datafusion_optimizer::optimizer::Optimizer;
use datafusion_optimizer::analyzer::Analyzer;
use promql::extension_plan::PromExtensionPlanner;
use crate::datafusion::DfCatalogListAdapter;
@@ -58,16 +58,16 @@ impl QueryEngineState {
pub fn new(catalog_list: CatalogListRef, plugins: Arc<Plugins>) -> Self {
let runtime_env = Arc::new(RuntimeEnv::default());
let session_config = SessionConfig::new().with_create_default_catalog_and_schema(false);
let mut optimizer = Optimizer::new();
// Apply the type conversion rule first.
optimizer.rules.insert(0, Arc::new(TypeConversionRule {}));
let mut analyzer = Analyzer::new();
analyzer.rules.insert(0, Arc::new(TypeConversionRule));
let session_state = SessionState::with_config_rt_and_catalog_list(
session_config,
runtime_env,
Arc::new(DfCatalogListAdapter::new(catalog_list.clone())),
)
.with_optimizer_rules(optimizer.rules)
.with_analyzer_rules(analyzer.rules)
.with_query_planner(Arc::new(DfQueryPlanner::new()));
let df_context = SessionContext::with_state(session_state);

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Mutex;
use std::sync::{Arc, Mutex};
use arrow::pyarrow::PyArrowException;
use common_telemetry::{info, timer};
@@ -278,7 +278,7 @@ pub fn try_into_columnar_value(py: Python<'_>, obj: PyObject) -> PyResult<Column
if ret.is_empty() {
return Ok(ColumnarValue::Scalar(ScalarValue::List(
None,
Box::new(new_item_field(ArrowDataType::Null)),
Arc::new(new_item_field(ArrowDataType::Null)),
)));
}
let ty = ret[0].get_datatype();
@@ -291,7 +291,7 @@ pub fn try_into_columnar_value(py: Python<'_>, obj: PyObject) -> PyResult<Column
}
Ok(ColumnarValue::Scalar(ScalarValue::List(
Some(ret),
Box::new(new_item_field(ty)),
Arc::new(new_item_field(ty)),
)))
} else {
to_rust_types!(obj,

View File

@@ -103,7 +103,7 @@ impl PyVector {
}
fn numpy(&self, py: Python<'_>) -> PyResult<PyObject> {
let pa_arrow = self.to_arrow_array().data().to_pyarrow(py)?;
let pa_arrow = self.to_arrow_array().to_data().to_pyarrow(py)?;
let ndarray = pa_arrow.call_method0(py, "to_numpy")?;
Ok(ndarray)
}
@@ -304,7 +304,7 @@ impl PyVector {
}
/// Convert to `pyarrow` 's array
pub(crate) fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
self.to_arrow_array().data().to_pyarrow(py)
self.to_arrow_array().to_data().to_pyarrow(py)
}
/// Convert from `pyarrow`'s array
#[classmethod]

View File

@@ -17,6 +17,8 @@
#[cfg(test)]
mod test;
use std::sync::Arc;
use datafusion_common::{DataFusionError, ScalarValue};
use datafusion_expr::ColumnarValue as DFColValue;
use datafusion_physical_expr::AggregateExpr;
@@ -118,7 +120,7 @@ pub fn try_into_columnar_value(obj: PyObjectRef, vm: &VirtualMachine) -> PyResul
// TODO(dennis): empty list, we set type as null.
return Ok(DFColValue::Scalar(ScalarValue::List(
None,
Box::new(new_item_field(ArrowDataType::Null)),
Arc::new(new_item_field(ArrowDataType::Null)),
)));
}
@@ -131,7 +133,7 @@ pub fn try_into_columnar_value(obj: PyObjectRef, vm: &VirtualMachine) -> PyResul
}
Ok(DFColValue::Scalar(ScalarValue::List(
Some(ret),
Box::new(new_item_field(ty)),
Arc::new(new_item_field(ty)),
)))
} else {
Err(vm.new_type_error(format!(

View File

@@ -73,7 +73,7 @@ fn convert_scalar_to_py_obj_and_back() {
ScalarValue::Int64(Some(1)),
ScalarValue::Int64(Some(2)),
]),
Box::new(Field::new("item", ArrowDataType::Int64, false)),
Arc::new(Field::new("item", ArrowDataType::Int64, false)),
));
let to = try_into_py_obj(col, vm).unwrap();
let back = try_into_columnar_value(to, vm).unwrap();

View File

@@ -71,7 +71,7 @@ tokio-rustls = "0.24"
tokio-stream = { version = "0.1", features = ["net"] }
tokio.workspace = true
tonic.workspace = true
tonic-reflection = "0.6"
tonic-reflection = "0.9"
tower = { version = "0.4", features = ["full"] }
tower-http = { version = "0.3", features = ["full"] }

View File

@@ -453,6 +453,7 @@ async fn test_query_concurrently() -> Result<()> {
Ok(())
}
#[ignore = "https://github.com/GreptimeTeam/greptimedb/issues/1385"]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_query_prepared() -> Result<()> {
common_telemetry::init_default_ut_logging();

View File

@@ -51,7 +51,7 @@ log-store = { path = "../log-store" }
rand.workspace = true
[build-dependencies]
tonic-build = "0.8"
tonic-build = "0.9"
[[bench]]
name = "bench_main"

View File

@@ -194,7 +194,7 @@ SELECT a-10 AS k FROM test UNION SELECT a-10 AS l FROM test ORDER BY k;
SELECT a-10 AS k FROM test UNION SELECT a-10 AS l FROM test ORDER BY l;
Error: 3000(PlanQuery), No field named "l". Valid fields are "k".
Error: 3000(PlanQuery), No field named l. Valid fields are k.
SELECT a-10 AS k FROM test UNION SELECT a-10 AS l FROM test ORDER BY 1-k;

View File

@@ -36,7 +36,7 @@ SELECT a AS k, b FROM test UNION SELECT a AS k, b FROM test ORDER BY k;
SELECT a % 2, b FROM test UNION SELECT b, a % 2 AS k ORDER BY a % 2;
Error: 3000(PlanQuery), No field named "b".
Error: 3000(PlanQuery), No field named b.
SELECT a % 2, b FROM test UNION SELECT a % 2 AS k, b FROM test ORDER BY a % 2;

View File

@@ -24,7 +24,7 @@ select 4 + 0.5;
select "a";
Error: 3000(PlanQuery), No field named "a".
Error: 3000(PlanQuery), No field named a.
select "A";
@@ -32,7 +32,7 @@ Error: 3000(PlanQuery), No field named "A".
select * where "a" = "A";
Error: 3000(PlanQuery), No field named "a".
Error: 3000(PlanQuery), No field named a.
select TO_UNIXTIME('2023-03-01T06:35:02Z');