chore: upgrade Arrow to version 28, and DataFusion to 15 (#771)

Co-authored-by: luofucong <luofucong@greptime.com>
This commit is contained in:
LFC
2022-12-21 17:02:11 +08:00
committed by GitHub
parent 539ead5460
commit 77182f5024
41 changed files with 328 additions and 158 deletions

264
Cargo.lock generated
View File

@@ -194,38 +194,35 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6"
[[package]]
name = "arrow"
version = "26.0.0"
version = "28.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e24e2bcd431a4aa0ff003fdd2dc21c78cfb42f31459c89d2312c2746fe17a5ac"
checksum = "aed9849f86164fad5cb66ce4732782b15f1bc97f8febab04e782c20cce9d4b6c"
dependencies = [
"ahash 0.8.2",
"arrow-array",
"arrow-buffer",
"arrow-cast",
"arrow-csv",
"arrow-data",
"arrow-ipc",
"arrow-json",
"arrow-schema",
"arrow-select",
"bitflags",
"chrono",
"comfy-table",
"csv",
"flatbuffers",
"half 2.1.0",
"hashbrown 0.12.3",
"indexmap",
"lazy_static",
"lexical-core",
"hashbrown 0.13.1",
"multiversion",
"num",
"regex",
"regex-syntax",
"serde_json",
]
[[package]]
name = "arrow-array"
version = "26.0.0"
version = "28.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c9044300874385f19e77cbf90911e239bd23630d8f23bb0f948f9067998a13b7"
checksum = "6b8504cf0a6797e908eecf221a865e7d339892720587f87c8b90262863015b08"
dependencies = [
"ahash 0.8.2",
"arrow-buffer",
@@ -233,25 +230,59 @@ dependencies = [
"arrow-schema",
"chrono",
"half 2.1.0",
"hashbrown 0.12.3",
"hashbrown 0.13.1",
"num",
]
[[package]]
name = "arrow-buffer"
version = "26.0.0"
version = "28.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78476cbe9e3f808dcecab86afe42d573863c63e149c62e6e379ed2522743e626"
checksum = "d6de64a27cea684b24784647d9608314bc80f7c4d55acb44a425e05fab39d916"
dependencies = [
"half 2.1.0",
"num",
]
[[package]]
name = "arrow-data"
version = "26.0.0"
name = "arrow-cast"
version = "28.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d916feee158c485dad4f701cba31bc9a90a8db87d9df8e2aa8adc0c20a2bbb9"
checksum = "bec4a54502eefe05923c385c90a005d69474fa06ca7aa2a2b123c9f9532f6178"
dependencies = [
"arrow-array",
"arrow-buffer",
"arrow-data",
"arrow-schema",
"arrow-select",
"chrono",
"lexical-core",
"num",
]
[[package]]
name = "arrow-csv"
version = "28.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7902bbf8127eac48554fe902775303377047ad49a9fd473c2b8cb399d092080"
dependencies = [
"arrow-array",
"arrow-buffer",
"arrow-cast",
"arrow-data",
"arrow-schema",
"chrono",
"csv",
"lazy_static",
"lexical-core",
"regex",
]
[[package]]
name = "arrow-data"
version = "28.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e4882efe617002449d5c6b5de9ddb632339074b36df8a96ea7147072f1faa8a"
dependencies = [
"arrow-buffer",
"arrow-schema",
@@ -260,19 +291,51 @@ dependencies = [
]
[[package]]
name = "arrow-schema"
version = "26.0.0"
name = "arrow-ipc"
version = "28.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f9406eb7834ca6bd8350d1baa515d18b9fcec487eddacfb62f5e19511f7bd37"
checksum = "fa0703a6de2785828561b03a4d7793ecd333233e1b166316b4bfc7cfce55a4a7"
dependencies = [
"arrow-array",
"arrow-buffer",
"arrow-cast",
"arrow-data",
"arrow-schema",
"flatbuffers",
]
[[package]]
name = "arrow-json"
version = "28.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3bd23fc8c6d251f96cd63b96fece56bbb9710ce5874a627cb786e2600673595a"
dependencies = [
"arrow-array",
"arrow-buffer",
"arrow-cast",
"arrow-data",
"arrow-schema",
"chrono",
"half 2.1.0",
"indexmap",
"num",
"serde_json",
]
[[package]]
name = "arrow-schema"
version = "28.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da9f143882a80be168538a60e298546314f50f11f2a288c8d73e11108da39d26"
dependencies = [
"serde",
]
[[package]]
name = "arrow-select"
version = "26.0.0"
version = "28.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6593a01586751c74498495d2f5a01fcd438102b52965c11dd98abf4ebcacef37"
checksum = "520406331d4ad60075359524947ebd804e479816439af82bcb17f8d280d9b38c"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -334,6 +397,7 @@ dependencies = [
"memchr",
"pin-project-lite",
"tokio",
"xz2",
]
[[package]]
@@ -1904,9 +1968,9 @@ dependencies = [
[[package]]
name = "datafusion"
version = "14.0.0"
version = "15.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7a8411475928479fe57af18698626f0a44f3c29153e051dce45f7455c08a6d5"
checksum = "b75a088adf79515b04fd3895c1a14dc249c8f7a7f27b59870a05546fe9a55542"
dependencies = [
"ahash 0.8.2",
"arrow",
@@ -1915,6 +1979,7 @@ dependencies = [
"bytes",
"bzip2",
"chrono",
"dashmap",
"datafusion-common",
"datafusion-expr",
"datafusion-optimizer",
@@ -1924,13 +1989,12 @@ dependencies = [
"flate2",
"futures",
"glob",
"hashbrown 0.12.3",
"hashbrown 0.13.1",
"itertools",
"lazy_static",
"log",
"num_cpus",
"object_store",
"ordered-float 3.4.0",
"parking_lot",
"parquet",
"paste",
@@ -1938,6 +2002,7 @@ dependencies = [
"pin-project-lite",
"rand 0.8.5",
"smallvec",
"sqllogictest",
"sqlparser",
"tempfile",
"tokio",
@@ -1945,27 +2010,27 @@ dependencies = [
"tokio-util",
"url",
"uuid",
"xz2",
]
[[package]]
name = "datafusion-common"
version = "14.0.0"
version = "15.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15f1ffcbc1f040c9ab99f41db1c743d95aff267bb2e7286aaa010738b7402251"
checksum = "7b17262b899f79afdf502846d1138a8b48441afe24dc6e07c922105289248137"
dependencies = [
"arrow",
"chrono",
"object_store",
"ordered-float 3.4.0",
"parquet",
"sqlparser",
]
[[package]]
name = "datafusion-expr"
version = "14.0.0"
version = "15.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1883d9590d303ef38fa295567e7fdb9f8f5f511fcc167412d232844678cd295c"
checksum = "533d2226b4636a1306d1f6f4ac02e436947c5d6e8bfc85f6d8f91a425c10a407"
dependencies = [
"ahash 0.8.2",
"arrow",
@@ -1976,9 +2041,9 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
version = "14.0.0"
version = "15.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2127d46d566ab3463d70da9675fc07b9d634be8d17e80d0e1ce79600709fe651"
checksum = "ce7ba274267b6baf1714a67727249aa56d648c8814b0f4c43387fbe6d147e619"
dependencies = [
"arrow",
"async-trait",
@@ -1986,15 +2051,15 @@ dependencies = [
"datafusion-common",
"datafusion-expr",
"datafusion-physical-expr",
"hashbrown 0.12.3",
"hashbrown 0.13.1",
"log",
]
[[package]]
name = "datafusion-physical-expr"
version = "14.0.0"
version = "15.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d108b6fe8eeb317ecad1d74619e8758de49cccc8c771b56c97962fd52eaae23"
checksum = "f35cb53e6c2f9c40accdf45aef2be7fde030ea3051b1145a059d96109e65b0bf"
dependencies = [
"ahash 0.8.2",
"arrow",
@@ -2007,12 +2072,11 @@ dependencies = [
"datafusion-expr",
"datafusion-row",
"half 2.1.0",
"hashbrown 0.12.3",
"hashbrown 0.13.1",
"itertools",
"lazy_static",
"md-5",
"num-traits",
"ordered-float 3.4.0",
"paste",
"rand 0.8.5",
"regex",
@@ -2023,9 +2087,9 @@ dependencies = [
[[package]]
name = "datafusion-row"
version = "14.0.0"
version = "15.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43537b6377d506e4788bf21e9ed943340e076b48ca4d077e6ea4405ca5e54a1c"
checksum = "27c77b1229ae5cf6a6e0e2ba43ed4e98131dbf1cc4a97fad17c94230b32e0812"
dependencies = [
"arrow",
"datafusion-common",
@@ -2035,11 +2099,11 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "14.0.0"
version = "15.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "244d08d4710e1088d9c0949c9b5b8d68d9cf2cde7203134a4cc389e870fe2354"
checksum = "569423fa8a50db39717080949e3b4f8763582b87baf393cc3fcf27cc21467ba7"
dependencies = [
"arrow",
"arrow-schema",
"datafusion-common",
"datafusion-expr",
"sqlparser",
@@ -2166,6 +2230,12 @@ version = "0.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56254986775e3233ffa9c4d7d3faaf6d36a2c09d30b20687e9f88bc8bafc16c8"
[[package]]
name = "difference"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "524cbf6897b527295dff137cec09ecf3a05f4fddffd7dfcd1585403449e74198"
[[package]]
name = "digest"
version = "0.10.6"
@@ -2857,6 +2927,15 @@ dependencies = [
"ahash 0.7.6",
]
[[package]]
name = "hashbrown"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33ff8ae62cd3a9102e5637afc8452c55acf3844001bd5374e0b0bd7b6616c038"
dependencies = [
"ahash 0.8.2",
]
[[package]]
name = "hdrhistogram"
version = "7.5.2"
@@ -3373,6 +3452,17 @@ version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "348108ab3fba42ec82ff6e9564fc4ca0247bdccdc68dd8af9764bbc79c3c8ffb"
[[package]]
name = "libtest-mimic"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7b603516767d1ab23d0de09d023e62966c3322f7148297c35cf3d97aa8b37fa"
dependencies = [
"clap 4.0.29",
"termcolor",
"threadpool",
]
[[package]]
name = "libz-sys"
version = "1.1.8"
@@ -3535,6 +3625,17 @@ dependencies = [
"twox-hash",
]
[[package]]
name = "lzma-sys"
version = "0.1.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fda04ab3764e6cde78b9974eec4f779acaba7c4e84b36eca3cf77c581b85d27"
dependencies = [
"cc",
"libc",
"pkg-config",
]
[[package]]
name = "mac_address"
version = "1.1.4"
@@ -4344,6 +4445,15 @@ dependencies = [
"num-traits",
]
[[package]]
name = "ordered-float"
version = "2.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7940cf2ca942593318d07fcf2596cdca60a85c9e7fab408a5e21a4f9dcd40d87"
dependencies = [
"num-traits",
]
[[package]]
name = "ordered-float"
version = "3.4.0"
@@ -4430,26 +4540,34 @@ dependencies = [
[[package]]
name = "parquet"
version = "26.0.0"
version = "28.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3bf8fa7ab6572791325a8595f55dc532dde88b996ae10a5ca8a2db746784ecc4"
checksum = "21433e9209111bb3720b747f2f137e0d115af1af0420a7a1c26b6e88227fa353"
dependencies = [
"ahash 0.8.2",
"arrow",
"arrow-array",
"arrow-buffer",
"arrow-cast",
"arrow-data",
"arrow-ipc",
"arrow-schema",
"arrow-select",
"base64 0.13.1",
"brotli",
"bytes",
"chrono",
"flate2",
"futures",
"hashbrown 0.12.3",
"hashbrown 0.13.1",
"lz4",
"num",
"num-bigint",
"paste",
"seq-macro",
"snap",
"thrift 0.16.0",
"thrift 0.17.0",
"tokio",
"twox-hash",
"zstd",
]
@@ -6571,6 +6689,25 @@ dependencies = [
"sqlparser",
]
[[package]]
name = "sqllogictest"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba41e01d229d7725401de371e323851f82d839d68732a06162405362b60852fe"
dependencies = [
"async-trait",
"difference",
"futures",
"glob",
"humantime",
"itertools",
"libtest-mimic",
"regex",
"tempfile",
"thiserror",
"tracing",
]
[[package]]
name = "sqlness"
version = "0.1.0"
@@ -6600,9 +6737,9 @@ dependencies = [
[[package]]
name = "sqlparser"
version = "0.26.0"
version = "0.27.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86be66ea0b2b22749cfa157d16e2e84bf793e626a3375f4d378dc289fa03affb"
checksum = "aba319938d4bfe250a769ac88278b629701024fe16f34257f9563bc628081970"
dependencies = [
"log",
]
@@ -7132,13 +7269,13 @@ dependencies = [
[[package]]
name = "thrift"
version = "0.16.0"
version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09678c4cdbb4eed72e18b7c2af1329c69825ed16fcbac62d083fc3e2b0590ff0"
checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09"
dependencies = [
"byteorder",
"integer-encoding",
"ordered-float 1.1.1",
"ordered-float 2.10.0",
]
[[package]]
@@ -8326,19 +8463,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2d7d3948613f75c98fd9328cfdcc45acc4d360655289d0a7d4ec931392200a3"
[[package]]
name = "zstd"
version = "0.11.2+zstd.1.5.2"
name = "xz2"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20cc960326ece64f010d2d2107537f26dc589a6573a316bd5b1dba685fa5fde4"
checksum = "388c44dc09d76f1536602ead6d325eb532f5c122f17782bd57fb47baeeb767e2"
dependencies = [
"lzma-sys",
]
[[package]]
name = "zstd"
version = "0.12.1+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c947d2adc84ff9a59f2e3c03b81aa4128acf28d6ad7d56273f7e8af14e47bea"
dependencies = [
"zstd-safe",
]
[[package]]
name = "zstd-safe"
version = "5.0.2+zstd.1.5.2"
version = "6.0.2+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d2a5585e04f9eea4b2a3d1eca508c4dee9592a89ef6f450c11719da0726f4db"
checksum = "a6cf39f730b440bab43da8fb5faf5f254574462f73f260f85f7987f32154ff17"
dependencies = [
"libc",
"zstd-sys",

View File

@@ -5,10 +5,10 @@ edition = "2021"
license = "Apache-2.0"
[dependencies]
arrow = "26.0.0"
arrow = "28.0"
clap = { version = "4.0", features = ["derive"] }
client = { path = "../src/client" }
indicatif = "0.17.1"
itertools = "0.10.5"
parquet = "26.0.0"
parquet = "28.0"
tokio = { version = "1.21", features = ["full"] }

View File

@@ -19,7 +19,7 @@ common-recordbatch = { path = "../common/recordbatch" }
common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
datafusion = "14.0.0"
datafusion = "15.0"
datatypes = { path = "../datatypes" }
futures = "0.3"
futures-util = "0.3"

View File

@@ -61,7 +61,7 @@ impl Table for SystemCatalogTable {
async fn scan(
&self,
_projection: &Option<Vec<usize>>,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> table::Result<PhysicalPlanRef> {
@@ -129,7 +129,7 @@ impl SystemCatalogTable {
let ctx = SessionContext::new();
let scan = self
.table
.scan(&full_projection, &[], None)
.scan(full_projection, &[], None)
.await
.context(error::SystemCatalogTableScanSnafu)?;
let stream = scan

View File

@@ -77,7 +77,7 @@ impl Table for Tables {
async fn scan(
&self,
_projection: &Option<Vec<usize>>,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> table::error::Result<PhysicalPlanRef> {
@@ -370,7 +370,7 @@ mod tests {
.unwrap();
let tables = Tables::new(catalog_list, "test_engine".to_string());
let tables_stream = tables.scan(&None, &[], None).await.unwrap();
let tables_stream = tables.scan(None, &[], None).await.unwrap();
let session_ctx = SessionContext::new();
let mut tables_stream = tables_stream.execute(0, session_ctx.task_ctx()).unwrap();

View File

@@ -15,7 +15,7 @@ common-grpc-expr = { path = "../common/grpc-expr" }
common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
common-time = { path = "../common/time" }
datafusion = "14.0.0"
datafusion = "15.0"
datatypes = { path = "../datatypes" }
enum_dispatch = "0.3"
parking_lot = "0.12"

View File

@@ -11,7 +11,7 @@ common-error = { path = "../error" }
common-function-macro = { path = "../function-macro" }
common-query = { path = "../query" }
common-time = { path = "../time" }
datafusion-common = "14.0.0"
datafusion-common = "15.0"
datatypes = { path = "../../datatypes" }
libc = "0.2"
num = "0.4"

View File

@@ -725,7 +725,7 @@ mod tests {
async fn scan(
&self,
_projection: &Option<Vec<usize>>,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> TableResult<PhysicalPlanRef> {

View File

@@ -13,7 +13,7 @@ common-query = { path = "../query" }
common-recordbatch = { path = "../recordbatch" }
common-runtime = { path = "../runtime" }
dashmap = "5.4"
datafusion = "14.0.0"
datafusion = "15.0"
datatypes = { path = "../../datatypes" }
snafu = { version = "0.7", features = ["backtraces"] }
tokio = { version = "1.0", features = ["full"] }

View File

@@ -9,9 +9,9 @@ async-trait = "0.1"
common-error = { path = "../error" }
common-recordbatch = { path = "../recordbatch" }
common-time = { path = "../time" }
datafusion = "14.0.0"
datafusion-common = "14.0.0"
datafusion-expr = "14.0.0"
datafusion = "15.0"
datafusion-common = "15.0"
datafusion-expr = "15.0"
datatypes = { path = "../../datatypes" }
snafu = { version = "0.7", features = ["backtraces"] }
statrs = "0.15"

View File

@@ -175,4 +175,9 @@ impl DfAccumulator for DfAccumulatorAdaptor {
.map_err(Error::from)?;
Ok(scalar_value)
}
fn size(&self) -> usize {
// TODO(LFC): Implement new "size" method for Accumulator.
0
}
}

View File

@@ -233,7 +233,7 @@ mod test {
async fn scan(
&self,
_ctx: &SessionState,
_projection: &Option<Vec<usize>>,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> DfResult<Arc<dyn DfPhysicalPlan>> {

View File

@@ -6,8 +6,8 @@ license = "Apache-2.0"
[dependencies]
common-error = { path = "../error" }
datafusion = "14.0.0"
datafusion-common = "14.0.0"
datafusion = "15.0"
datafusion-common = "15.0"
datatypes = { path = "../../datatypes" }
futures = "0.3"
paste = "1.0"

View File

@@ -227,7 +227,7 @@ mod tests {
let output = serde_json::to_string(&batch).unwrap();
assert_eq!(
r#"{"schema":{"fields":[{"name":"number","data_type":"UInt32","nullable":false,"dict_id":0,"dict_is_ordered":false}],"metadata":{"greptime:version":"0"}},"columns":[[0,1,2,3,4,5,6,7,8,9]]}"#,
r#"{"schema":{"fields":[{"name":"number","data_type":"UInt32","nullable":false,"dict_id":0,"dict_is_ordered":false,"metadata":{}}],"metadata":{"greptime:version":"0"}},"columns":[[0,1,2,3,4,5,6,7,8,9]]}"#,
output
);
}

View File

@@ -10,8 +10,8 @@ catalog = { path = "../../catalog" }
common-catalog = { path = "../catalog" }
common-error = { path = "../error" }
common-telemetry = { path = "../telemetry" }
datafusion = "14.0.0"
datafusion-expr = "14.0.0"
datafusion = "15.0"
datafusion-expr = "15.0"
datatypes = { path = "../../datatypes" }
futures = "0.3"
prost = "0.9"

View File

@@ -19,7 +19,7 @@ use catalog::CatalogManagerRef;
use common_error::prelude::BoxedError;
use common_telemetry::debug;
use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
use datafusion::common::ToDFSchema;
use datafusion::common::{DFField, DFSchema};
use datafusion::datasource::DefaultTableSource;
use datafusion::physical_plan::project_schema;
use datafusion_expr::{Filter, LogicalPlan, TableScan, TableSource};
@@ -262,10 +262,20 @@ impl DFLogicalSubstraitConvertor {
};
// Calculate the projected schema
let projected_schema = project_schema(&stored_schema, projection.as_ref())
.context(DFInternalSnafu)?
.to_dfschema_ref()
.context(DFInternalSnafu)?;
let qualified = &format!("{}.{}.{}", catalog_name, schema_name, table_name);
let projected_schema = Arc::new(
project_schema(&stored_schema, projection.as_ref())
.and_then(|x| {
DFSchema::new_with_metadata(
x.fields()
.iter()
.map(|f| DFField::from_qualified(qualified, f.clone()))
.collect(),
x.metadata().clone(),
)
})
.context(DFInternalSnafu)?,
);
ctx.set_df_schema(projected_schema.clone());

View File

@@ -25,7 +25,7 @@ common-recordbatch = { path = "../common/recordbatch" }
common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
datafusion = "14.0.0"
datafusion = "15.0"
datatypes = { path = "../datatypes" }
futures = "0.3"
hyper = { version = "0.14", features = ["full"] }
@@ -57,5 +57,5 @@ tower-http = { version = "0.3", features = ["full"] }
axum-test-helper = { git = "https://github.com/sunng87/axum-test-helper.git", branch = "patch-1" }
client = { path = "../client" }
common-query = { path = "../common/query" }
datafusion-common = "14.0.0"
datafusion-common = "15.0"
tempdir = "0.3"

View File

@@ -176,7 +176,7 @@ mod tests {
async fn scan(
&self,
_projection: &Option<Vec<usize>>,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> TableResult<PhysicalPlanRef> {

View File

@@ -9,12 +9,12 @@ default = []
test = []
[dependencies]
arrow = { version = "26.0" }
arrow-schema = { version = "26.0", features = ["serde"] }
arrow = "28.0"
arrow-schema = { version = "28.0", features = ["serde"] }
common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-time = { path = "../common/time" }
datafusion-common = "14.0"
datafusion-common = "15.0"
enum_dispatch = "0.3"
num = "0.4"
num-traits = "0.2"

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use std::collections::HashMap;
use arrow::datatypes::Field;
use serde::{Deserialize, Serialize};
@@ -23,7 +23,7 @@ use crate::error::{self, Error, Result};
use crate::schema::constraint::ColumnDefaultConstraint;
use crate::vectors::VectorRef;
pub type Metadata = BTreeMap<String, String>;
pub type Metadata = HashMap<String, String>;
/// Key used to store whether the column is time index in arrow field's metadata.
const TIME_INDEX_KEY: &str = "greptime:time_index";
@@ -131,7 +131,7 @@ impl TryFrom<&Field> for ColumnSchema {
fn try_from(field: &Field) -> Result<ColumnSchema> {
let data_type = ConcreteDataType::try_from(field.data_type())?;
let mut metadata = field.metadata().cloned().unwrap_or_default();
let mut metadata = field.metadata().clone();
let default_constraint = match metadata.remove(DEFAULT_CONSTRAINT_KEY) {
Some(json) => {
Some(serde_json::from_str(&json).context(error::DeserializeSnafu { json })?)
@@ -176,7 +176,7 @@ impl TryFrom<&ColumnSchema> for Field {
column_schema.data_type.as_arrow_type(),
column_schema.is_nullable(),
)
.with_metadata(Some(metadata)))
.with_metadata(metadata))
}
}
@@ -215,11 +215,7 @@ mod tests {
assert!(field.is_nullable());
assert_eq!(
"{\"Value\":{\"Int32\":99}}",
field
.metadata()
.unwrap()
.get(DEFAULT_CONSTRAINT_KEY)
.unwrap()
field.metadata().get(DEFAULT_CONSTRAINT_KEY).unwrap()
);
let new_column_schema = ColumnSchema::try_from(&field).unwrap();
@@ -241,12 +237,8 @@ mod tests {
.is_none());
let field = Field::try_from(&column_schema).unwrap();
assert_eq!("v1", field.metadata().unwrap().get("k1").unwrap());
assert!(field
.metadata()
.unwrap()
.get(DEFAULT_CONSTRAINT_KEY)
.is_some());
assert_eq!("v1", field.metadata().get("k1").unwrap());
assert!(field.metadata().get(DEFAULT_CONSTRAINT_KEY).is_some());
let new_column_schema = ColumnSchema::try_from(&field).unwrap();
assert_eq!(column_schema, new_column_schema);

View File

@@ -544,12 +544,15 @@ impl TryFrom<ScalarValue> for Value {
.map(|x| Value::Timestamp(Timestamp::new(x, TimeUnit::Nanosecond)))
.unwrap_or(Value::Null),
ScalarValue::Decimal128(_, _, _)
| ScalarValue::Time64(_)
| ScalarValue::IntervalYearMonth(_)
| ScalarValue::IntervalDayTime(_)
| ScalarValue::IntervalMonthDayNano(_)
| ScalarValue::Struct(_, _)
| ScalarValue::Dictionary(_, _) => {
| ScalarValue::Dictionary(_, _)
| ScalarValue::Time32Second(_)
| ScalarValue::Time32Millisecond(_)
| ScalarValue::Time64Microsecond(_)
| ScalarValue::Time64Nanosecond(_) => {
return error::UnsupportedArrowTypeSnafu {
arrow_type: v.get_datatype(),
}

View File

@@ -195,12 +195,15 @@ impl Helper {
ConstantVector::new(Arc::new(TimestampNanosecondVector::from(vec![v])), length)
}
ScalarValue::Decimal128(_, _, _)
| ScalarValue::Time64(_)
| ScalarValue::IntervalYearMonth(_)
| ScalarValue::IntervalDayTime(_)
| ScalarValue::IntervalMonthDayNano(_)
| ScalarValue::Struct(_, _)
| ScalarValue::Dictionary(_, _) => {
| ScalarValue::Dictionary(_, _)
| ScalarValue::Time32Second(_)
| ScalarValue::Time32Millisecond(_)
| ScalarValue::Time64Microsecond(_)
| ScalarValue::Time64Nanosecond(_) => {
return error::ConversionSnafu {
from: format!("Unsupported scalar value: {}", value),
}

View File

@@ -22,9 +22,9 @@ common-recordbatch = { path = "../common/recordbatch" }
common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
datafusion = "14.0.0"
datafusion-common = "14.0.0"
datafusion-expr = "14.0.0"
datafusion = "15.0"
datafusion-common = "15.0"
datafusion-expr = "15.0"
datanode = { path = "../datanode" }
datatypes = { path = "../datatypes" }
futures = "0.3"

View File

@@ -97,7 +97,7 @@ impl Table for DistTable {
async fn scan(
&self,
projection: &Option<Vec<usize>>,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> table::Result<PhysicalPlanRef> {
@@ -121,7 +121,7 @@ impl Table for DistTable {
partition_execs.push(Arc::new(PartitionExec {
table_name: self.table_name.clone(),
datanode_instance,
projection: projection.clone(),
projection: projection.cloned(),
filters: filters.to_vec(),
limit,
batches: Arc::new(RwLock::new(None)),
@@ -385,8 +385,8 @@ impl DistTable {
}
}
fn project_schema(table_schema: SchemaRef, projection: &Option<Vec<usize>>) -> SchemaRef {
if let Some(projection) = &projection {
fn project_schema(table_schema: SchemaRef, projection: Option<&Vec<usize>>) -> SchemaRef {
if let Some(projection) = projection {
let columns = table_schema.column_schemas();
let projected = projection
.iter()
@@ -864,7 +864,7 @@ mod test {
) {
let expected_output = expected_output.into_iter().join("\n");
let table_scan = table
.scan(&projection, filters.as_slice(), None)
.scan(projection.as_ref(), filters.as_slice(), None)
.await
.unwrap();
assert_eq!(

View File

@@ -19,8 +19,8 @@ common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
datafusion = "14.0.0"
datafusion-common = "14.0.0"
datafusion = "15.0"
datafusion-common = "15.0"
datatypes = { path = "../datatypes" }
futures = "0.3"
log-store = { path = "../log-store" }

View File

@@ -611,7 +611,7 @@ mod tests {
assert_eq!(2, table.insert(insert_req).await.unwrap());
let session_ctx = SessionContext::new();
let stream = table.scan(&None, &[], None).await.unwrap();
let stream = table.scan(None, &[], None).await.unwrap();
let stream = stream.execute(0, session_ctx.task_ctx()).unwrap();
let batches = util::collect(stream).await.unwrap();
assert_eq!(1, batches.len());
@@ -643,7 +643,7 @@ mod tests {
assert_eq!(2, table.insert(insert_req).await.unwrap());
let session_ctx = SessionContext::new();
let stream = table.scan(&None, &[], None).await.unwrap();
let stream = table.scan(None, &[], None).await.unwrap();
let stream = stream.execute(0, session_ctx.task_ctx()).unwrap();
let batches = util::collect(stream).await.unwrap();
assert_eq!(1, batches.len());
@@ -737,7 +737,7 @@ mod tests {
assert_eq!(2, table.insert(insert_req).await.unwrap());
let session_ctx = SessionContext::new();
let stream = table.scan(&None, &[], None).await.unwrap();
let stream = table.scan(None, &[], None).await.unwrap();
let stream = stream.execute(0, session_ctx.task_ctx()).unwrap();
let batches = util::collect(stream).await.unwrap();
assert_eq!(1, batches.len());
@@ -758,7 +758,7 @@ mod tests {
assert_eq!(tss, *batch.column(3));
// Scan with projections: cpu and memory
let stream = table.scan(&Some(vec![1, 2]), &[], None).await.unwrap();
let stream = table.scan(Some(&vec![1, 2]), &[], None).await.unwrap();
let stream = stream.execute(0, session_ctx.task_ctx()).unwrap();
let batches = util::collect(stream).await.unwrap();
assert_eq!(1, batches.len());
@@ -776,7 +776,7 @@ mod tests {
assert_eq!(memories, *batch.column(1));
// Scan with projections: only ts
let stream = table.scan(&Some(vec![3]), &[], None).await.unwrap();
let stream = table.scan(Some(&vec![3]), &[], None).await.unwrap();
let stream = stream.execute(0, session_ctx.task_ctx()).unwrap();
let batches = util::collect(stream).await.unwrap();
assert_eq!(1, batches.len());
@@ -819,7 +819,7 @@ mod tests {
assert_eq!(test_batch_size, table.insert(insert_req).await.unwrap());
let session_ctx = SessionContext::new();
let stream = table.scan(&None, &[], None).await.unwrap();
let stream = table.scan(None, &[], None).await.unwrap();
let stream = stream.execute(0, session_ctx.task_ctx()).unwrap();
let batches = util::collect(stream).await.unwrap();
let mut total = 0;

View File

@@ -166,14 +166,14 @@ impl<R: Region> Table for MitoTable<R> {
async fn scan(
&self,
projection: &Option<Vec<usize>>,
projection: Option<&Vec<usize>>,
filters: &[Expr],
_limit: Option<usize>,
) -> TableResult<PhysicalPlanRef> {
let read_ctx = ReadContext::default();
let snapshot = self.region.snapshot(&read_ctx).map_err(TableError::new)?;
let projection = self.transform_projection(&self.region, projection.clone())?;
let projection = self.transform_projection(&self.region, projection.cloned())?;
let filters = filters.into();
let scan_request = ScanRequest {
projection,

View File

@@ -15,12 +15,12 @@ common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
datafusion = "14.0.0"
datafusion-common = "14.0.0"
datafusion-expr = "14.0.0"
datafusion-optimizer = "14.0.0"
datafusion-physical-expr = "14.0.0"
datafusion-sql = "14.0.0"
datafusion = "15.0"
datafusion-common = "15.0"
datafusion-expr = "15.0"
datafusion-optimizer = "15.0"
datafusion-physical-expr = "15.0"
datafusion-sql = "15.0"
datatypes = { path = "../datatypes" }
futures = "0.3"
futures-util = "0.3"

View File

@@ -21,6 +21,7 @@ use datafusion::error::Result as DfResult;
use datafusion::physical_plan::udaf::AggregateUDF;
use datafusion::physical_plan::udf::ScalarUDF;
use datafusion::sql::planner::{ContextProvider, SqlToRel};
use datafusion_common::ScalarValue;
use datafusion_expr::TableSource;
use datatypes::arrow::datatypes::DataType;
use session::context::QueryContextRef;
@@ -126,4 +127,8 @@ impl ContextProvider for DfContextProviderAdapter {
fn get_variable_type(&self, variable_names: &[String]) -> Option<DataType> {
self.state.get_variable_type(variable_names)
}
fn get_config_option(&self, variable: &str) -> Option<ScalarValue> {
self.state.get_config_option(variable)
}
}

View File

@@ -27,6 +27,7 @@ use datafusion::execution::context::{SessionConfig, SessionState};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::udf::ScalarUDF;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::ScalarValue;
use datafusion_expr::{LogicalPlan as DfLogicalPlan, TableSource};
use datafusion_optimizer::optimizer::{Optimizer, OptimizerConfig};
use datafusion_sql::planner::ContextProvider;
@@ -144,6 +145,11 @@ impl QueryEngineState {
state.get_variable_type(variable_names)
}
pub(crate) fn get_config_option(&self, variable: &str) -> Option<ScalarValue> {
let state = self.df_context.state.read();
state.get_config_option(variable)
}
pub(crate) fn optimize(&self, plan: &DfLogicalPlan) -> DfResult<DfLogicalPlan> {
self.df_context.optimize(plan)
}

View File

@@ -33,10 +33,10 @@ common-recordbatch = { path = "../common/recordbatch" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
console = "0.15"
datafusion = { version = "14.0.0", optional = true }
datafusion-common = { version = "14.0.0", optional = true }
datafusion-expr = { version = "14.0.0", optional = true }
datafusion-physical-expr = { version = "14.0.0", optional = true }
datafusion = { version = "15.0", optional = true }
datafusion-common = { version = "15.0", optional = true }
datafusion-expr = { version = "15.0", optional = true }
datafusion-physical-expr = { version = "15.0", optional = true }
datatypes = { path = "../datatypes" }
futures = "0.3"
futures-util = "0.3"

View File

@@ -15,4 +15,4 @@ itertools = "0.10"
mito = { path = "../mito" }
once_cell = "1.10"
snafu = { version = "0.7", features = ["backtraces"] }
sqlparser = "0.26"
sqlparser = "0.27"

View File

@@ -253,7 +253,7 @@ impl<'a> ParserContext<'a> {
.parse_column_def()
.context(SyntaxSnafu { sql: self.sql })?;
if !matches!(column.data_type, DataType::Timestamp(_))
if !matches!(column.data_type, DataType::Timestamp(_, _))
|| matches!(self.parser.peek_token(), Token::Comma)
{
columns.push(column);
@@ -967,7 +967,7 @@ ENGINE=mito";
assert!(result
.unwrap_err()
.to_string()
.contains("sql parser error: Expected a concrete value, found: MAXVALU"));
.contains("Please provide an extra partition that is bounded by 'MAXVALUE'."));
}
fn assert_column_def(column: &ColumnDef, name: &str, data_type: &str) {

View File

@@ -300,7 +300,7 @@ pub fn sql_data_type_to_concrete_data_type(data_type: &SqlDataType) -> Result<Co
SqlDataType::Double => Ok(ConcreteDataType::float64_datatype()),
SqlDataType::Boolean => Ok(ConcreteDataType::boolean_datatype()),
SqlDataType::Date => Ok(ConcreteDataType::date_datatype()),
SqlDataType::Custom(obj_name) => match &obj_name.0[..] {
SqlDataType::Custom(obj_name, _) => match &obj_name.0[..] {
[type_name] => {
if type_name
.value
@@ -319,7 +319,7 @@ pub fn sql_data_type_to_concrete_data_type(data_type: &SqlDataType) -> Result<Co
}
.fail(),
},
SqlDataType::Timestamp(_) => Ok(ConcreteDataType::timestamp_millisecond_datatype()),
SqlDataType::Timestamp(_, _) => Ok(ConcreteDataType::timestamp_millisecond_datatype()),
_ => error::SqlTypeNotSupportedSnafu {
t: data_type.clone(),
}
@@ -373,11 +373,11 @@ mod tests {
check_type(SqlDataType::Boolean, ConcreteDataType::boolean_datatype());
check_type(SqlDataType::Date, ConcreteDataType::date_datatype());
check_type(
SqlDataType::Custom(ObjectName(vec![Ident::new("datetime")])),
SqlDataType::Custom(ObjectName(vec![Ident::new("datetime")]), vec![]),
ConcreteDataType::datetime_datatype(),
);
check_type(
SqlDataType::Timestamp(TimezoneInfo::None),
SqlDataType::Timestamp(None, TimezoneInfo::None),
ConcreteDataType::timestamp_millisecond_datatype(),
);
}

View File

@@ -21,7 +21,7 @@ futures = "0.3"
futures-util = "0.3"
lazy_static = "1.4"
object-store = { path = "../object-store" }
parquet = { version = "26", features = ["async"] }
parquet = { version = "28.0", features = ["async"] }
paste = "1.0"
planus = "0.2"
prost = "0.11"

View File

@@ -12,9 +12,9 @@ common-error = { path = "../common/error" }
common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
common-telemetry = { path = "../common/telemetry" }
datafusion = "14.0.0"
datafusion-common = "14.0.0"
datafusion-expr = "14.0.0"
datafusion = "15.0"
datafusion-common = "15.0"
datafusion-expr = "15.0"
datatypes = { path = "../datatypes" }
derive_builder = "0.11"
futures = "0.3"
@@ -26,7 +26,7 @@ store-api = { path = "../store-api" }
tokio = { version = "1.18", features = ["full"] }
[dev-dependencies]
datafusion-expr = "14.0.0"
parquet = { version = "26", features = ["async"] }
datafusion-expr = "15.0"
parquet = { version = "28.0", features = ["async"] }
tempdir = "0.3"
tokio-util = { version = "0.7", features = ["compat"] }

View File

@@ -54,7 +54,7 @@ pub trait Table: Send + Sync {
/// Scan the table and returns a SendableRecordBatchStream.
async fn scan(
&self,
projection: &Option<Vec<usize>>,
projection: Option<&Vec<usize>>,
filters: &[Expr],
// limit can be used to reduce the amount scanned
// from the datasource as a performance optimization.

View File

@@ -69,7 +69,7 @@ impl TableProvider for DfTableProviderAdapter {
async fn scan(
&self,
_ctx: &SessionState,
projection: &Option<Vec<usize>>,
projection: Option<&Vec<usize>>,
filters: &[DfExpr],
limit: Option<usize>,
) -> DfResult<Arc<dyn DfPhysicalPlan>> {
@@ -134,7 +134,7 @@ impl Table for TableAdapter {
async fn scan(
&self,
projection: &Option<Vec<usize>>,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<PhysicalPlanRef> {

View File

@@ -99,7 +99,7 @@ impl Table for NumbersTable {
async fn scan(
&self,
_projection: &Option<Vec<usize>>,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
limit: Option<usize>,
) -> Result<PhysicalPlanRef> {

View File

@@ -71,7 +71,7 @@ impl Table for EmptyTable {
async fn scan(
&self,
_projection: &Option<Vec<usize>>,
_projection: Option<&Vec<usize>>,
_filters: &[common_query::prelude::Expr],
_limit: Option<usize>,
) -> Result<PhysicalPlanRef> {

View File

@@ -140,7 +140,7 @@ impl Table for MemTable {
async fn scan(
&self,
projection: &Option<Vec<usize>>,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
limit: Option<usize>,
) -> Result<PhysicalPlanRef> {
@@ -211,7 +211,7 @@ mod test {
let ctx = SessionContext::new();
let table = build_testing_table();
let scan_stream = table.scan(&Some(vec![1]), &[], None).await.unwrap();
let scan_stream = table.scan(Some(&vec![1]), &[], None).await.unwrap();
let scan_stream = scan_stream.execute(0, ctx.task_ctx()).unwrap();
let recordbatch = util::collect(scan_stream).await.unwrap();
assert_eq!(1, recordbatch.len());
@@ -232,7 +232,7 @@ mod test {
let ctx = SessionContext::new();
let table = build_testing_table();
let scan_stream = table.scan(&None, &[], Some(2)).await.unwrap();
let scan_stream = table.scan(None, &[], Some(2)).await.unwrap();
let scan_stream = scan_stream.execute(0, ctx.task_ctx()).unwrap();
let recordbatch = util::collect(scan_stream).await.unwrap();
assert_eq!(1, recordbatch.len());