mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-26 08:00:01 +00:00
Compare commits
8 Commits
v0.8.0-nig
...
v0.8.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
258675b75e | ||
|
|
11a08cb272 | ||
|
|
e9b178b8b9 | ||
|
|
3477fde0e5 | ||
|
|
9baa431656 | ||
|
|
e2a1cb5840 | ||
|
|
f696f41a02 | ||
|
|
0168d43d60 |
2
.github/workflows/release.yml
vendored
2
.github/workflows/release.yml
vendored
@@ -91,7 +91,7 @@ env:
|
||||
# The scheduled version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-YYYYMMDD', like v0.2.0-nigthly-20230313;
|
||||
NIGHTLY_RELEASE_PREFIX: nightly
|
||||
# Note: The NEXT_RELEASE_VERSION should be modified manually by every formal release.
|
||||
NEXT_RELEASE_VERSION: v0.8.0
|
||||
NEXT_RELEASE_VERSION: v0.9.0
|
||||
|
||||
jobs:
|
||||
allocate-runners:
|
||||
|
||||
155
Cargo.lock
generated
155
Cargo.lock
generated
@@ -214,7 +214,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
|
||||
|
||||
[[package]]
|
||||
name = "api"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"common-base",
|
||||
"common-decimal",
|
||||
@@ -703,7 +703,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "auth"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -877,7 +877,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "benchmarks"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow",
|
||||
@@ -1220,7 +1220,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cache"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"catalog",
|
||||
"common-error",
|
||||
@@ -1254,7 +1254,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
|
||||
|
||||
[[package]]
|
||||
name = "catalog"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow",
|
||||
@@ -1540,7 +1540,7 @@ checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce"
|
||||
|
||||
[[package]]
|
||||
name = "client"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arc-swap",
|
||||
@@ -1569,7 +1569,7 @@ dependencies = [
|
||||
"serde_json",
|
||||
"snafu 0.8.2",
|
||||
"substrait 0.17.1",
|
||||
"substrait 0.7.2",
|
||||
"substrait 0.8.0",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tonic 0.11.0",
|
||||
@@ -1599,7 +1599,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cmd"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"auth",
|
||||
@@ -1655,7 +1655,7 @@ dependencies = [
|
||||
"session",
|
||||
"snafu 0.8.2",
|
||||
"store-api",
|
||||
"substrait 0.7.2",
|
||||
"substrait 0.8.0",
|
||||
"table",
|
||||
"temp-env",
|
||||
"tempfile",
|
||||
@@ -1699,7 +1699,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
|
||||
|
||||
[[package]]
|
||||
name = "common-base"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"anymap",
|
||||
"bitvec",
|
||||
@@ -1715,7 +1715,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-catalog"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"common-error",
|
||||
@@ -1726,7 +1726,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-config"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"common-base",
|
||||
"common-error",
|
||||
@@ -1749,7 +1749,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-datasource"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-schema",
|
||||
@@ -1781,7 +1781,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-decimal"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"bigdecimal",
|
||||
"common-error",
|
||||
@@ -1794,7 +1794,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-error"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"snafu 0.8.2",
|
||||
"strum 0.25.0",
|
||||
@@ -1802,7 +1802,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-frontend"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -1817,7 +1817,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-function"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arc-swap",
|
||||
@@ -1850,7 +1850,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-greptimedb-telemetry"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"common-runtime",
|
||||
@@ -1867,7 +1867,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-grpc"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow-flight",
|
||||
@@ -1893,7 +1893,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-grpc-expr"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"common-base",
|
||||
@@ -1910,7 +1910,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-macro"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"common-query",
|
||||
@@ -1925,7 +1925,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-mem-prof"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"common-error",
|
||||
"common-macro",
|
||||
@@ -1938,7 +1938,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-meta"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"anymap2",
|
||||
"api",
|
||||
@@ -1991,11 +1991,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-plugins"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
|
||||
[[package]]
|
||||
name = "common-procedure"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
@@ -2020,7 +2020,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-procedure-test"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"common-procedure",
|
||||
@@ -2028,7 +2028,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-query"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -2043,7 +2043,7 @@ dependencies = [
|
||||
"datatypes",
|
||||
"serde",
|
||||
"snafu 0.8.2",
|
||||
"sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=c919990bf62ad38d2b0c0a3bc90b26ad919d51b0)",
|
||||
"sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e4e496b8d62416ad50ce70a1b460c7313610cf5d)",
|
||||
"sqlparser_derive 0.1.1",
|
||||
"statrs",
|
||||
"tokio",
|
||||
@@ -2051,7 +2051,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-recordbatch"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"common-error",
|
||||
@@ -2070,7 +2070,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-runtime"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"common-error",
|
||||
@@ -2090,7 +2090,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-telemetry"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"atty",
|
||||
"backtrace",
|
||||
@@ -2117,7 +2117,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-test-util"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"client",
|
||||
"common-query",
|
||||
@@ -2129,7 +2129,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-time"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"chrono",
|
||||
@@ -2145,7 +2145,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-version"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"build-data",
|
||||
"schemars",
|
||||
@@ -2154,7 +2154,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-wal"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"common-base",
|
||||
"common-error",
|
||||
@@ -3154,7 +3154,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "datanode"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow-flight",
|
||||
@@ -3203,7 +3203,7 @@ dependencies = [
|
||||
"session",
|
||||
"snafu 0.8.2",
|
||||
"store-api",
|
||||
"substrait 0.7.2",
|
||||
"substrait 0.8.0",
|
||||
"table",
|
||||
"tokio",
|
||||
"toml 0.8.12",
|
||||
@@ -3212,7 +3212,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "datatypes"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -3723,7 +3723,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "file-engine"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -3825,7 +3825,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "flow"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -3835,8 +3835,11 @@ dependencies = [
|
||||
"common-decimal",
|
||||
"common-error",
|
||||
"common-frontend",
|
||||
"common-function",
|
||||
"common-macro",
|
||||
"common-meta",
|
||||
"common-query",
|
||||
"common-recordbatch",
|
||||
"common-runtime",
|
||||
"common-telemetry",
|
||||
"common-time",
|
||||
@@ -3863,7 +3866,7 @@ dependencies = [
|
||||
"snafu 0.8.2",
|
||||
"store-api",
|
||||
"strum 0.25.0",
|
||||
"substrait 0.7.2",
|
||||
"substrait 0.8.0",
|
||||
"table",
|
||||
"tokio",
|
||||
"tonic 0.11.0",
|
||||
@@ -3901,7 +3904,7 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
|
||||
|
||||
[[package]]
|
||||
name = "frontend"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arc-swap",
|
||||
@@ -3947,7 +3950,7 @@ dependencies = [
|
||||
"session",
|
||||
"snafu 0.8.2",
|
||||
"sql",
|
||||
"sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=c919990bf62ad38d2b0c0a3bc90b26ad919d51b0)",
|
||||
"sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e4e496b8d62416ad50ce70a1b460c7313610cf5d)",
|
||||
"store-api",
|
||||
"strfmt",
|
||||
"table",
|
||||
@@ -4719,7 +4722,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "index"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"asynchronous-codec",
|
||||
@@ -5286,7 +5289,7 @@ checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c"
|
||||
|
||||
[[package]]
|
||||
name = "log-store"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
@@ -5582,7 +5585,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "meta-client"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -5608,7 +5611,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "meta-srv"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -5684,7 +5687,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "metric-engine"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"aquamarine",
|
||||
@@ -5766,7 +5769,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "mito2"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"aquamarine",
|
||||
@@ -6385,7 +6388,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "object-store"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
@@ -6626,7 +6629,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "operator"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -6670,9 +6673,9 @@ dependencies = [
|
||||
"session",
|
||||
"snafu 0.8.2",
|
||||
"sql",
|
||||
"sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=c919990bf62ad38d2b0c0a3bc90b26ad919d51b0)",
|
||||
"sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e4e496b8d62416ad50ce70a1b460c7313610cf5d)",
|
||||
"store-api",
|
||||
"substrait 0.7.2",
|
||||
"substrait 0.8.0",
|
||||
"table",
|
||||
"tokio",
|
||||
"tonic 0.11.0",
|
||||
@@ -6916,7 +6919,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "partition"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -6932,7 +6935,7 @@ dependencies = [
|
||||
"serde_json",
|
||||
"snafu 0.8.2",
|
||||
"sql",
|
||||
"sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=c919990bf62ad38d2b0c0a3bc90b26ad919d51b0)",
|
||||
"sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e4e496b8d62416ad50ce70a1b460c7313610cf5d)",
|
||||
"store-api",
|
||||
"table",
|
||||
]
|
||||
@@ -7262,7 +7265,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "plugins"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"auth",
|
||||
"common-base",
|
||||
@@ -7540,7 +7543,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "promql"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"async-recursion",
|
||||
@@ -7753,7 +7756,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "puffin"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bitflags 2.5.0",
|
||||
@@ -7864,7 +7867,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "query"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"api",
|
||||
@@ -7921,7 +7924,7 @@ dependencies = [
|
||||
"stats-cli",
|
||||
"store-api",
|
||||
"streaming-stats",
|
||||
"substrait 0.7.2",
|
||||
"substrait 0.8.0",
|
||||
"table",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
@@ -9228,7 +9231,7 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
|
||||
|
||||
[[package]]
|
||||
name = "script"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arc-swap",
|
||||
@@ -9498,7 +9501,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "servers"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"aide",
|
||||
"api",
|
||||
@@ -9602,7 +9605,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "session"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arc-swap",
|
||||
@@ -9880,7 +9883,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "sql"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"chrono",
|
||||
@@ -9903,7 +9906,7 @@ dependencies = [
|
||||
"lazy_static",
|
||||
"regex",
|
||||
"snafu 0.8.2",
|
||||
"sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=c919990bf62ad38d2b0c0a3bc90b26ad919d51b0)",
|
||||
"sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e4e496b8d62416ad50ce70a1b460c7313610cf5d)",
|
||||
"sqlparser_derive 0.1.1",
|
||||
"table",
|
||||
]
|
||||
@@ -9936,7 +9939,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "sqlness-runner"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"clap 4.5.4",
|
||||
@@ -9967,13 +9970,13 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "sqlparser"
|
||||
version = "0.44.0"
|
||||
source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=c919990bf62ad38d2b0c0a3bc90b26ad919d51b0#c919990bf62ad38d2b0c0a3bc90b26ad919d51b0"
|
||||
source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e4e496b8d62416ad50ce70a1b460c7313610cf5d#e4e496b8d62416ad50ce70a1b460c7313610cf5d"
|
||||
dependencies = [
|
||||
"lazy_static",
|
||||
"log",
|
||||
"regex",
|
||||
"sqlparser 0.44.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"sqlparser_derive 0.2.2 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=c919990bf62ad38d2b0c0a3bc90b26ad919d51b0)",
|
||||
"sqlparser_derive 0.2.2 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e4e496b8d62416ad50ce70a1b460c7313610cf5d)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -10001,7 +10004,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "sqlparser_derive"
|
||||
version = "0.2.2"
|
||||
source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=c919990bf62ad38d2b0c0a3bc90b26ad919d51b0#c919990bf62ad38d2b0c0a3bc90b26ad919d51b0"
|
||||
source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e4e496b8d62416ad50ce70a1b460c7313610cf5d#e4e496b8d62416ad50ce70a1b460c7313610cf5d"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -10154,7 +10157,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "store-api"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"aquamarine",
|
||||
@@ -10320,7 +10323,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "substrait"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
@@ -10511,7 +10514,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "table"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"chrono",
|
||||
@@ -10620,7 +10623,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
|
||||
|
||||
[[package]]
|
||||
name = "tests-fuzz"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"arbitrary",
|
||||
"async-trait",
|
||||
@@ -10645,7 +10648,7 @@ dependencies = [
|
||||
"serde_json",
|
||||
"snafu 0.8.2",
|
||||
"sql",
|
||||
"sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=c919990bf62ad38d2b0c0a3bc90b26ad919d51b0)",
|
||||
"sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e4e496b8d62416ad50ce70a1b460c7313610cf5d)",
|
||||
"sqlx",
|
||||
"tinytemplate",
|
||||
"tokio",
|
||||
@@ -10653,7 +10656,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tests-integration"
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow-flight",
|
||||
@@ -10712,7 +10715,7 @@ dependencies = [
|
||||
"sql",
|
||||
"sqlx",
|
||||
"store-api",
|
||||
"substrait 0.7.2",
|
||||
"substrait 0.8.0",
|
||||
"table",
|
||||
"tempfile",
|
||||
"time",
|
||||
|
||||
@@ -64,7 +64,7 @@ members = [
|
||||
resolver = "2"
|
||||
|
||||
[workspace.package]
|
||||
version = "0.7.2"
|
||||
version = "0.8.0"
|
||||
edition = "2021"
|
||||
license = "Apache-2.0"
|
||||
|
||||
@@ -159,7 +159,7 @@ smallvec = { version = "1", features = ["serde"] }
|
||||
snafu = "0.8"
|
||||
sysinfo = "0.30"
|
||||
# on branch v0.44.x
|
||||
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "c919990bf62ad38d2b0c0a3bc90b26ad919d51b0", features = [
|
||||
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "e4e496b8d62416ad50ce70a1b460c7313610cf5d", features = [
|
||||
"visitor",
|
||||
] }
|
||||
strum = { version = "0.25", features = ["derive"] }
|
||||
|
||||
@@ -18,7 +18,7 @@ use common_error::ext::{BoxedError, ErrorExt};
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_error::{GREPTIME_DB_HEADER_ERROR_CODE, GREPTIME_DB_HEADER_ERROR_MSG};
|
||||
use common_macro::stack_trace_debug;
|
||||
use snafu::{Location, Snafu};
|
||||
use snafu::{location, Location, Snafu};
|
||||
use tonic::{Code, Status};
|
||||
|
||||
#[derive(Snafu)]
|
||||
@@ -83,14 +83,28 @@ pub enum Error {
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to request RegionServer, code: {}", code))]
|
||||
RegionServer { code: Code, source: BoxedError },
|
||||
RegionServer {
|
||||
code: Code,
|
||||
source: BoxedError,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
// Server error carried in Tonic Status's metadata.
|
||||
#[snafu(display("{}", msg))]
|
||||
Server { code: StatusCode, msg: String },
|
||||
Server {
|
||||
code: StatusCode,
|
||||
msg: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Illegal Database response: {err_msg}"))]
|
||||
IllegalDatabaseResponse { err_msg: String },
|
||||
IllegalDatabaseResponse {
|
||||
err_msg: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to send request with streaming: {}", err_msg))]
|
||||
ClientStreaming {
|
||||
@@ -148,7 +162,11 @@ impl From<Status> for Error {
|
||||
let msg = get_metadata_value(&e, GREPTIME_DB_HEADER_ERROR_MSG)
|
||||
.unwrap_or_else(|| e.message().to_string());
|
||||
|
||||
Self::Server { code, msg }
|
||||
Self::Server {
|
||||
code,
|
||||
msg,
|
||||
location: location!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -189,6 +189,7 @@ impl RegionRequester {
|
||||
error::Error::RegionServer {
|
||||
code,
|
||||
source: BoxedError::new(err),
|
||||
location: location!(),
|
||||
}
|
||||
})?
|
||||
.into_inner();
|
||||
@@ -272,7 +273,7 @@ mod test {
|
||||
err_msg: "blabla".to_string(),
|
||||
}),
|
||||
}));
|
||||
let Server { code, msg } = result.unwrap_err() else {
|
||||
let Server { code, msg, .. } = result.unwrap_err() else {
|
||||
unreachable!()
|
||||
};
|
||||
assert_eq!(code, StatusCode::Internal);
|
||||
|
||||
@@ -176,8 +176,12 @@ impl Export {
|
||||
}
|
||||
|
||||
/// Return a list of [`TableReference`] to be exported.
|
||||
/// Includes all tables under the given `catalog` and `schema`
|
||||
async fn get_table_list(&self, catalog: &str, schema: &str) -> Result<Vec<TableReference>> {
|
||||
/// Includes all tables under the given `catalog` and `schema`.
|
||||
async fn get_table_list(
|
||||
&self,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
) -> Result<(Vec<TableReference>, Vec<TableReference>)> {
|
||||
// Puts all metric table first
|
||||
let sql = format!(
|
||||
"select table_catalog, table_schema, table_name from \
|
||||
@@ -214,7 +218,7 @@ impl Export {
|
||||
debug!("Fetched table list: {:?}", records);
|
||||
|
||||
if records.is_empty() {
|
||||
return Ok(vec![]);
|
||||
return Ok((vec![], vec![]));
|
||||
}
|
||||
|
||||
let mut remaining_tables = Vec::with_capacity(records.len());
|
||||
@@ -232,11 +236,11 @@ impl Export {
|
||||
remaining_tables.push(table);
|
||||
}
|
||||
}
|
||||
let mut tables = Vec::with_capacity(metric_physical_tables.len() + remaining_tables.len());
|
||||
tables.extend(metric_physical_tables.into_iter());
|
||||
tables.extend(remaining_tables);
|
||||
|
||||
Ok(tables)
|
||||
Ok((
|
||||
metric_physical_tables.into_iter().collect(),
|
||||
remaining_tables,
|
||||
))
|
||||
}
|
||||
|
||||
async fn show_create_table(&self, catalog: &str, schema: &str, table: &str) -> Result<String> {
|
||||
@@ -265,15 +269,16 @@ impl Export {
|
||||
let semaphore_moved = semaphore.clone();
|
||||
tasks.push(async move {
|
||||
let _permit = semaphore_moved.acquire().await.unwrap();
|
||||
let table_list = self.get_table_list(&catalog, &schema).await?;
|
||||
let table_count = table_list.len();
|
||||
let (metric_physical_tables, remaining_tables) =
|
||||
self.get_table_list(&catalog, &schema).await?;
|
||||
let table_count = metric_physical_tables.len() + remaining_tables.len();
|
||||
tokio::fs::create_dir_all(&self.output_dir)
|
||||
.await
|
||||
.context(FileIoSnafu)?;
|
||||
let output_file =
|
||||
Path::new(&self.output_dir).join(format!("{catalog}-{schema}.sql"));
|
||||
let mut file = File::create(output_file).await.context(FileIoSnafu)?;
|
||||
for (c, s, t) in table_list {
|
||||
for (c, s, t) in metric_physical_tables.into_iter().chain(remaining_tables) {
|
||||
match self.show_create_table(&c, &s, &t).await {
|
||||
Err(e) => {
|
||||
error!(e; r#"Failed to export table "{}"."{}"."{}""#, c, s, t)
|
||||
@@ -322,15 +327,25 @@ impl Export {
|
||||
.await
|
||||
.context(FileIoSnafu)?;
|
||||
let output_dir = Path::new(&self.output_dir).join(format!("{catalog}-{schema}/"));
|
||||
|
||||
// copy database to
|
||||
let sql = format!(
|
||||
"copy database {} to '{}' with (format='parquet');",
|
||||
schema,
|
||||
output_dir.to_str().unwrap()
|
||||
);
|
||||
self.sql(&sql).await?;
|
||||
info!("finished exporting {catalog}.{schema} data");
|
||||
// Ignores metric physical tables
|
||||
let (metrics_tables, table_list) = self.get_table_list(&catalog, &schema).await?;
|
||||
for (_, _, table_name) in metrics_tables {
|
||||
warn!("Ignores metric physical table: {table_name}");
|
||||
}
|
||||
for (catalog_name, schema_name, table_name) in table_list {
|
||||
// copy table to
|
||||
let sql = format!(
|
||||
r#"Copy "{}"."{}"."{}" TO '{}{}.parquet' WITH (format='parquet');"#,
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name,
|
||||
output_dir.to_str().unwrap(),
|
||||
table_name,
|
||||
);
|
||||
info!("Executing sql: {sql}");
|
||||
self.sql(&sql).await?;
|
||||
}
|
||||
info!("Finished exporting {catalog}.{schema} data");
|
||||
|
||||
// export copy from sql
|
||||
let dir_filenames = match output_dir.read_dir() {
|
||||
|
||||
@@ -119,12 +119,11 @@ impl CreateFlowProcedure {
|
||||
&sink_table_name.table_name,
|
||||
))
|
||||
.await?;
|
||||
ensure!(
|
||||
!exists,
|
||||
error::TableAlreadyExistsSnafu {
|
||||
table_name: sink_table_name.to_string(),
|
||||
}
|
||||
);
|
||||
// TODO(discord9): due to undefined behavior in flow's plan in how to transform types in mfp, sometime flow can't deduce correct schema
|
||||
// and require manually create sink table
|
||||
if exists {
|
||||
common_telemetry::warn!("Table already exists, table: {}", sink_table_name);
|
||||
}
|
||||
|
||||
self.collect_source_tables().await?;
|
||||
self.allocate_flow_id().await?;
|
||||
|
||||
@@ -516,6 +516,7 @@ mod tests {
|
||||
use common_meta::key::datanode_table::DatanodeTableManager;
|
||||
use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use mito2::engine::MITO_ENGINE_NAME;
|
||||
use store_api::region_request::RegionRequest;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
@@ -528,7 +529,7 @@ mod tests {
|
||||
let txn = mgr
|
||||
.build_create_txn(
|
||||
1028,
|
||||
"mock",
|
||||
MITO_ENGINE_NAME,
|
||||
"foo/bar/weny",
|
||||
HashMap::from([("foo".to_string(), "bar".to_string())]),
|
||||
HashMap::default(),
|
||||
@@ -542,8 +543,9 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_initialize_region_server() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let mut mock_region_server = mock_region_server();
|
||||
let (mock_region, mut mock_region_handler) = MockRegionEngine::new();
|
||||
let (mock_region, mut mock_region_handler) = MockRegionEngine::new(MITO_ENGINE_NAME);
|
||||
|
||||
mock_region_server.register_engine(mock_region.clone());
|
||||
|
||||
|
||||
@@ -121,6 +121,7 @@ mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
use common_meta::instruction::{InstructionReply, UpgradeRegion};
|
||||
use mito2::engine::MITO_ENGINE_NAME;
|
||||
use store_api::region_engine::RegionRole;
|
||||
use store_api::storage::RegionId;
|
||||
use tokio::time::Instant;
|
||||
@@ -133,7 +134,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_region_not_exist() {
|
||||
let mut mock_region_server = mock_region_server();
|
||||
let (mock_engine, _) = MockRegionEngine::new();
|
||||
let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
|
||||
mock_region_server.register_engine(mock_engine);
|
||||
|
||||
let handler_context = HandlerContext {
|
||||
@@ -167,13 +168,14 @@ mod tests {
|
||||
let mock_region_server = mock_region_server();
|
||||
let region_id = RegionId::new(1024, 1);
|
||||
|
||||
let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(|region_engine| {
|
||||
region_engine.mock_role = Some(Some(RegionRole::Leader));
|
||||
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
|
||||
// Should be unreachable.
|
||||
unreachable!();
|
||||
}));
|
||||
});
|
||||
let (mock_engine, _) =
|
||||
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
|
||||
region_engine.mock_role = Some(Some(RegionRole::Leader));
|
||||
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
|
||||
// Should be unreachable.
|
||||
unreachable!();
|
||||
}));
|
||||
});
|
||||
mock_region_server.register_test_region(region_id, mock_engine);
|
||||
|
||||
let handler_context = HandlerContext {
|
||||
@@ -207,13 +209,14 @@ mod tests {
|
||||
let mock_region_server = mock_region_server();
|
||||
let region_id = RegionId::new(1024, 1);
|
||||
|
||||
let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(|region_engine| {
|
||||
// Region is not ready.
|
||||
region_engine.mock_role = Some(Some(RegionRole::Follower));
|
||||
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
|
||||
// Note: Don't change.
|
||||
region_engine.handle_request_delay = Some(Duration::from_secs(100));
|
||||
});
|
||||
let (mock_engine, _) =
|
||||
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
|
||||
// Region is not ready.
|
||||
region_engine.mock_role = Some(Some(RegionRole::Follower));
|
||||
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
|
||||
// Note: Don't change.
|
||||
region_engine.handle_request_delay = Some(Duration::from_secs(100));
|
||||
});
|
||||
mock_region_server.register_test_region(region_id, mock_engine);
|
||||
|
||||
let handler_context = HandlerContext {
|
||||
@@ -247,13 +250,14 @@ mod tests {
|
||||
let mock_region_server = mock_region_server();
|
||||
let region_id = RegionId::new(1024, 1);
|
||||
|
||||
let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(|region_engine| {
|
||||
// Region is not ready.
|
||||
region_engine.mock_role = Some(Some(RegionRole::Follower));
|
||||
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
|
||||
// Note: Don't change.
|
||||
region_engine.handle_request_delay = Some(Duration::from_millis(300));
|
||||
});
|
||||
let (mock_engine, _) =
|
||||
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
|
||||
// Region is not ready.
|
||||
region_engine.mock_role = Some(Some(RegionRole::Follower));
|
||||
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
|
||||
// Note: Don't change.
|
||||
region_engine.handle_request_delay = Some(Duration::from_millis(300));
|
||||
});
|
||||
mock_region_server.register_test_region(region_id, mock_engine);
|
||||
|
||||
let waits = vec![
|
||||
@@ -308,18 +312,19 @@ mod tests {
|
||||
let mock_region_server = mock_region_server();
|
||||
let region_id = RegionId::new(1024, 1);
|
||||
|
||||
let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(|region_engine| {
|
||||
// Region is not ready.
|
||||
region_engine.mock_role = Some(Some(RegionRole::Follower));
|
||||
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
|
||||
error::UnexpectedSnafu {
|
||||
violated: "mock_error".to_string(),
|
||||
}
|
||||
.fail()
|
||||
}));
|
||||
// Note: Don't change.
|
||||
region_engine.handle_request_delay = Some(Duration::from_millis(100));
|
||||
});
|
||||
let (mock_engine, _) =
|
||||
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
|
||||
// Region is not ready.
|
||||
region_engine.mock_role = Some(Some(RegionRole::Follower));
|
||||
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
|
||||
error::UnexpectedSnafu {
|
||||
violated: "mock_error".to_string(),
|
||||
}
|
||||
.fail()
|
||||
}));
|
||||
// Note: Don't change.
|
||||
region_engine.handle_request_delay = Some(Duration::from_millis(100));
|
||||
});
|
||||
mock_region_server.register_test_region(region_id, mock_engine);
|
||||
|
||||
let handler_context = HandlerContext {
|
||||
|
||||
@@ -34,6 +34,7 @@ use common_telemetry::{info, warn};
|
||||
use dashmap::DashMap;
|
||||
use futures_util::future::try_join_all;
|
||||
use metric_engine::engine::MetricEngine;
|
||||
use mito2::engine::MITO_ENGINE_NAME;
|
||||
use prost::Message;
|
||||
pub use query::dummy_catalog::{
|
||||
DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
|
||||
@@ -44,7 +45,9 @@ use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
|
||||
use servers::grpc::region_server::RegionServerHandler;
|
||||
use session::context::{QueryContextBuilder, QueryContextRef};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};
|
||||
use store_api::metric_engine_consts::{
|
||||
FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
|
||||
};
|
||||
use store_api::region_engine::{RegionEngineRef, RegionRole, SetReadonlyResponse};
|
||||
use store_api::region_request::{AffectedRows, RegionCloseRequest, RegionRequest};
|
||||
use store_api::storage::RegionId;
|
||||
@@ -403,7 +406,7 @@ impl RegionServerInner {
|
||||
let current_region_status = self.region_map.get(®ion_id);
|
||||
|
||||
let engine = match region_change {
|
||||
RegionChange::Register(ref engine_type, _) => match current_region_status {
|
||||
RegionChange::Register(attribute) => match current_region_status {
|
||||
Some(status) => match status.clone() {
|
||||
RegionEngineWithStatus::Registering(_) => {
|
||||
return Ok(CurrentEngine::EarlyReturn(0))
|
||||
@@ -417,8 +420,10 @@ impl RegionServerInner {
|
||||
.engines
|
||||
.read()
|
||||
.unwrap()
|
||||
.get(engine_type)
|
||||
.with_context(|| RegionEngineNotFoundSnafu { name: engine_type })?
|
||||
.get(attribute.engine())
|
||||
.with_context(|| RegionEngineNotFoundSnafu {
|
||||
name: attribute.engine(),
|
||||
})?
|
||||
.clone(),
|
||||
},
|
||||
RegionChange::Deregisters => match current_region_status {
|
||||
@@ -461,11 +466,13 @@ impl RegionServerInner {
|
||||
.start_timer();
|
||||
|
||||
let region_change = match &request {
|
||||
RegionRequest::Create(create) => RegionChange::Register(create.engine.clone(), false),
|
||||
RegionRequest::Create(create) => {
|
||||
let attribute = parse_region_attribute(&create.engine, &create.options)?;
|
||||
RegionChange::Register(attribute)
|
||||
}
|
||||
RegionRequest::Open(open) => {
|
||||
let is_opening_physical_region =
|
||||
open.options.contains_key(PHYSICAL_TABLE_METADATA_KEY);
|
||||
RegionChange::Register(open.engine.clone(), is_opening_physical_region)
|
||||
let attribute = parse_region_attribute(&open.engine, &open.options)?;
|
||||
RegionChange::Register(attribute)
|
||||
}
|
||||
RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters,
|
||||
RegionRequest::Put(_)
|
||||
@@ -514,7 +521,7 @@ impl RegionServerInner {
|
||||
region_change: &RegionChange,
|
||||
) {
|
||||
match region_change {
|
||||
RegionChange::Register(_, _) => {
|
||||
RegionChange::Register(_) => {
|
||||
self.region_map.insert(
|
||||
region_id,
|
||||
RegionEngineWithStatus::Registering(engine.clone()),
|
||||
@@ -533,7 +540,7 @@ impl RegionServerInner {
|
||||
fn unset_region_status(&self, region_id: RegionId, region_change: RegionChange) {
|
||||
match region_change {
|
||||
RegionChange::None => {}
|
||||
RegionChange::Register(_, _) | RegionChange::Deregisters => {
|
||||
RegionChange::Register(_) | RegionChange::Deregisters => {
|
||||
self.region_map.remove(®ion_id);
|
||||
}
|
||||
}
|
||||
@@ -548,15 +555,28 @@ impl RegionServerInner {
|
||||
let engine_type = engine.name();
|
||||
match region_change {
|
||||
RegionChange::None => {}
|
||||
RegionChange::Register(_, is_opening_physical_region) => {
|
||||
if is_opening_physical_region {
|
||||
self.register_logical_regions(&engine, region_id).await?;
|
||||
}
|
||||
|
||||
info!("Region {region_id} is registered to engine {engine_type}");
|
||||
RegionChange::Register(attribute) => {
|
||||
info!(
|
||||
"Region {region_id} is registered to engine {}",
|
||||
attribute.engine()
|
||||
);
|
||||
self.region_map
|
||||
.insert(region_id, RegionEngineWithStatus::Ready(engine));
|
||||
self.event_listener.on_region_registered(region_id);
|
||||
.insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
|
||||
|
||||
match attribute {
|
||||
RegionAttribute::Metric { physical } => {
|
||||
if physical {
|
||||
// Registers the logical regions belong to the physical region (`region_id`).
|
||||
self.register_logical_regions(&engine, region_id).await?;
|
||||
// We only send the `on_region_registered` event of the physical region.
|
||||
self.event_listener.on_region_registered(region_id);
|
||||
}
|
||||
}
|
||||
RegionAttribute::Mito => self.event_listener.on_region_registered(region_id),
|
||||
RegionAttribute::File => {
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
}
|
||||
RegionChange::Deregisters => {
|
||||
info!("Region {region_id} is deregistered from engine {engine_type}");
|
||||
@@ -699,10 +719,45 @@ impl RegionServerInner {
|
||||
|
||||
enum RegionChange {
|
||||
None,
|
||||
Register(String, bool),
|
||||
Register(RegionAttribute),
|
||||
Deregisters,
|
||||
}
|
||||
|
||||
fn parse_region_attribute(
|
||||
engine: &str,
|
||||
options: &HashMap<String, String>,
|
||||
) -> Result<RegionAttribute> {
|
||||
match engine {
|
||||
MITO_ENGINE_NAME => Ok(RegionAttribute::Mito),
|
||||
METRIC_ENGINE_NAME => {
|
||||
let physical = !options.contains_key(LOGICAL_TABLE_METADATA_KEY);
|
||||
|
||||
Ok(RegionAttribute::Metric { physical })
|
||||
}
|
||||
FILE_ENGINE_NAME => Ok(RegionAttribute::File),
|
||||
_ => error::UnexpectedSnafu {
|
||||
violated: format!("Unknown engine: {}", engine),
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
enum RegionAttribute {
|
||||
Mito,
|
||||
Metric { physical: bool },
|
||||
File,
|
||||
}
|
||||
|
||||
impl RegionAttribute {
|
||||
fn engine(&self) -> &'static str {
|
||||
match self {
|
||||
RegionAttribute::Mito => MITO_ENGINE_NAME,
|
||||
RegionAttribute::Metric { .. } => METRIC_ENGINE_NAME,
|
||||
RegionAttribute::File => FILE_ENGINE_NAME,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
@@ -723,7 +778,7 @@ mod tests {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut mock_region_server = mock_region_server();
|
||||
let (engine, _receiver) = MockRegionEngine::new();
|
||||
let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
|
||||
let engine_name = engine.name();
|
||||
|
||||
mock_region_server.register_engine(engine.clone());
|
||||
@@ -781,7 +836,7 @@ mod tests {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut mock_region_server = mock_region_server();
|
||||
let (engine, _receiver) = MockRegionEngine::new();
|
||||
let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
|
||||
|
||||
mock_region_server.register_engine(engine.clone());
|
||||
|
||||
@@ -832,7 +887,7 @@ mod tests {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut mock_region_server = mock_region_server();
|
||||
let (engine, _receiver) = MockRegionEngine::new();
|
||||
let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
|
||||
|
||||
mock_region_server.register_engine(engine.clone());
|
||||
|
||||
@@ -857,13 +912,15 @@ mod tests {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut mock_region_server = mock_region_server();
|
||||
let (engine, _receiver) =
|
||||
MockRegionEngine::with_mock_fn(Box::new(|_region_id, _request| {
|
||||
let (engine, _receiver) = MockRegionEngine::with_mock_fn(
|
||||
MITO_ENGINE_NAME,
|
||||
Box::new(|_region_id, _request| {
|
||||
error::UnexpectedSnafu {
|
||||
violated: "test".to_string(),
|
||||
}
|
||||
.fail()
|
||||
}));
|
||||
}),
|
||||
);
|
||||
|
||||
mock_region_server.register_engine(engine.clone());
|
||||
|
||||
@@ -904,7 +961,7 @@ mod tests {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut mock_region_server = mock_region_server();
|
||||
let (engine, _) = MockRegionEngine::new();
|
||||
let (engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
|
||||
mock_region_server.register_engine(engine.clone());
|
||||
|
||||
let region_id = RegionId::new(1024, 1);
|
||||
@@ -950,7 +1007,7 @@ mod tests {
|
||||
CurrentEngineTest {
|
||||
region_id,
|
||||
current_region_status: None,
|
||||
region_change: RegionChange::Register(engine.name().to_string(), false),
|
||||
region_change: RegionChange::Register(RegionAttribute::Mito),
|
||||
assert: Box::new(|result| {
|
||||
let current_engine = result.unwrap();
|
||||
assert_matches!(current_engine, CurrentEngine::Engine(_));
|
||||
@@ -959,7 +1016,7 @@ mod tests {
|
||||
CurrentEngineTest {
|
||||
region_id,
|
||||
current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
|
||||
region_change: RegionChange::Register(engine.name().to_string(), false),
|
||||
region_change: RegionChange::Register(RegionAttribute::Mito),
|
||||
assert: Box::new(|result| {
|
||||
let current_engine = result.unwrap();
|
||||
assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
|
||||
@@ -968,7 +1025,7 @@ mod tests {
|
||||
CurrentEngineTest {
|
||||
region_id,
|
||||
current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
|
||||
region_change: RegionChange::Register(engine.name().to_string(), false),
|
||||
region_change: RegionChange::Register(RegionAttribute::Mito),
|
||||
assert: Box::new(|result| {
|
||||
let err = result.unwrap_err();
|
||||
assert_eq!(err.status_code(), StatusCode::RegionBusy);
|
||||
@@ -977,7 +1034,7 @@ mod tests {
|
||||
CurrentEngineTest {
|
||||
region_id,
|
||||
current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
|
||||
region_change: RegionChange::Register(engine.name().to_string(), false),
|
||||
region_change: RegionChange::Register(RegionAttribute::Mito),
|
||||
assert: Box::new(|result| {
|
||||
let current_engine = result.unwrap();
|
||||
assert_matches!(current_engine, CurrentEngine::Engine(_));
|
||||
|
||||
@@ -106,10 +106,11 @@ pub struct MockRegionEngine {
|
||||
pub(crate) handle_request_delay: Option<Duration>,
|
||||
pub(crate) handle_request_mock_fn: Option<MockRequestHandler>,
|
||||
pub(crate) mock_role: Option<Option<RegionRole>>,
|
||||
engine: String,
|
||||
}
|
||||
|
||||
impl MockRegionEngine {
|
||||
pub fn new() -> (Arc<Self>, Receiver<(RegionId, RegionRequest)>) {
|
||||
pub fn new(engine: &str) -> (Arc<Self>, Receiver<(RegionId, RegionRequest)>) {
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(8);
|
||||
|
||||
(
|
||||
@@ -118,12 +119,14 @@ impl MockRegionEngine {
|
||||
sender: tx,
|
||||
handle_request_mock_fn: None,
|
||||
mock_role: None,
|
||||
engine: engine.to_string(),
|
||||
}),
|
||||
rx,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn with_mock_fn(
|
||||
engine: &str,
|
||||
mock_fn: MockRequestHandler,
|
||||
) -> (Arc<Self>, Receiver<(RegionId, RegionRequest)>) {
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(8);
|
||||
@@ -134,12 +137,16 @@ impl MockRegionEngine {
|
||||
sender: tx,
|
||||
handle_request_mock_fn: Some(mock_fn),
|
||||
mock_role: None,
|
||||
engine: engine.to_string(),
|
||||
}),
|
||||
rx,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn with_custom_apply_fn<F>(apply: F) -> (Arc<Self>, Receiver<(RegionId, RegionRequest)>)
|
||||
pub fn with_custom_apply_fn<F>(
|
||||
engine: &str,
|
||||
apply: F,
|
||||
) -> (Arc<Self>, Receiver<(RegionId, RegionRequest)>)
|
||||
where
|
||||
F: FnOnce(&mut MockRegionEngine),
|
||||
{
|
||||
@@ -149,6 +156,7 @@ impl MockRegionEngine {
|
||||
sender: tx,
|
||||
handle_request_mock_fn: None,
|
||||
mock_role: None,
|
||||
engine: engine.to_string(),
|
||||
};
|
||||
|
||||
apply(&mut region_engine);
|
||||
@@ -160,7 +168,7 @@ impl MockRegionEngine {
|
||||
#[async_trait::async_trait]
|
||||
impl RegionEngine for MockRegionEngine {
|
||||
fn name(&self) -> &str {
|
||||
"mock"
|
||||
&self.engine
|
||||
}
|
||||
|
||||
async fn handle_request(
|
||||
|
||||
@@ -26,7 +26,10 @@ futures = "0.3"
|
||||
# This fork is simply for keeping our dependency in our org, and pin the version
|
||||
# it is the same with upstream repo
|
||||
async-trait.workspace = true
|
||||
common-function.workspace = true
|
||||
common-meta.workspace = true
|
||||
common-query.workspace = true
|
||||
common-recordbatch.workspace = true
|
||||
enum-as-inner = "0.6.0"
|
||||
greptime-proto.workspace = true
|
||||
hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" }
|
||||
|
||||
@@ -18,7 +18,7 @@
|
||||
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use std::time::{Instant, SystemTime};
|
||||
|
||||
use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests};
|
||||
use catalog::CatalogManagerRef;
|
||||
@@ -49,7 +49,7 @@ use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
|
||||
use crate::compute::ErrCollector;
|
||||
use crate::expr::GlobalId;
|
||||
use crate::repr::{self, DiffRow, Row};
|
||||
use crate::transform::sql_to_flow_plan;
|
||||
use crate::transform::{register_function_to_query_engine, sql_to_flow_plan};
|
||||
|
||||
pub(crate) mod error;
|
||||
mod flownode_impl;
|
||||
@@ -120,6 +120,8 @@ impl FlownodeBuilder {
|
||||
);
|
||||
let query_engine = query_engine_factory.query_engine();
|
||||
|
||||
register_function_to_query_engine(&query_engine);
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
let node_id = Some(self.flow_node_id);
|
||||
@@ -261,7 +263,7 @@ impl FlownodeManager {
|
||||
let ctx = Arc::new(QueryContext::with(&catalog, &schema));
|
||||
// TODO(discord9): instead of auto build table from request schema, actually build table
|
||||
// before `create flow` to be able to assign pk and ts etc.
|
||||
let (primary_keys, schema) = if let Some(table_id) = self
|
||||
let (primary_keys, schema, is_auto_create) = if let Some(table_id) = self
|
||||
.table_info_source
|
||||
.get_table_id_from_name(&table_name)
|
||||
.await?
|
||||
@@ -278,54 +280,65 @@ impl FlownodeManager {
|
||||
.map(|i| meta.schema.column_schemas[i].name.clone())
|
||||
.collect_vec();
|
||||
let schema = meta.schema.column_schemas;
|
||||
(primary_keys, schema)
|
||||
let is_auto_create = schema
|
||||
.last()
|
||||
.map(|s| s.name == "__ts_placeholder")
|
||||
.unwrap_or(false);
|
||||
(primary_keys, schema, is_auto_create)
|
||||
} else {
|
||||
// TODO(discord9): get ts column from `RelationType` once we are done rewriting flow plan to attach ts
|
||||
let (primary_keys, schema) = {
|
||||
let node_ctx = self.node_context.lock().await;
|
||||
let gid: GlobalId = node_ctx
|
||||
.table_repr
|
||||
.get_by_name(&table_name)
|
||||
.map(|x| x.1)
|
||||
.unwrap();
|
||||
let schema = node_ctx
|
||||
.schema
|
||||
.get(&gid)
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
name: format!("Table name = {:?}", table_name),
|
||||
})?
|
||||
.clone();
|
||||
// TODO(discord9): use default key from schema
|
||||
let primary_keys = schema
|
||||
.keys
|
||||
.first()
|
||||
.map(|v| {
|
||||
v.column_indices
|
||||
.iter()
|
||||
.map(|i| format!("Col_{i}"))
|
||||
.collect_vec()
|
||||
})
|
||||
.unwrap_or_default();
|
||||
let ts_col = ColumnSchema::new(
|
||||
"ts",
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
true,
|
||||
)
|
||||
.with_time_index(true);
|
||||
// TODO(discord9): condiser remove buggy auto create by schema
|
||||
|
||||
let wout_ts = schema
|
||||
.column_types
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(idx, typ)| {
|
||||
ColumnSchema::new(format!("Col_{idx}"), typ.scalar_type, typ.nullable)
|
||||
})
|
||||
.collect_vec();
|
||||
let mut with_ts = wout_ts.clone();
|
||||
with_ts.push(ts_col);
|
||||
(primary_keys, with_ts)
|
||||
};
|
||||
(primary_keys, schema)
|
||||
let node_ctx = self.node_context.lock().await;
|
||||
let gid: GlobalId = node_ctx
|
||||
.table_repr
|
||||
.get_by_name(&table_name)
|
||||
.map(|x| x.1)
|
||||
.unwrap();
|
||||
let schema = node_ctx
|
||||
.schema
|
||||
.get(&gid)
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
name: format!("Table name = {:?}", table_name),
|
||||
})?
|
||||
.clone();
|
||||
// TODO(discord9): use default key from schema
|
||||
let primary_keys = schema
|
||||
.keys
|
||||
.first()
|
||||
.map(|v| {
|
||||
v.column_indices
|
||||
.iter()
|
||||
.map(|i| format!("Col_{i}"))
|
||||
.collect_vec()
|
||||
})
|
||||
.unwrap_or_default();
|
||||
let update_at = ColumnSchema::new(
|
||||
"update_at",
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
true,
|
||||
);
|
||||
// TODO(discord9): bugged so we can't infer time index from flow plan, so we have to manually set one
|
||||
let ts_col = ColumnSchema::new(
|
||||
"__ts_placeholder",
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
true,
|
||||
)
|
||||
.with_time_index(true);
|
||||
|
||||
let wout_ts = schema
|
||||
.column_types
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(idx, typ)| {
|
||||
ColumnSchema::new(format!("Col_{idx}"), typ.scalar_type, typ.nullable)
|
||||
})
|
||||
.collect_vec();
|
||||
|
||||
let mut with_ts = wout_ts.clone();
|
||||
with_ts.push(update_at);
|
||||
with_ts.push(ts_col);
|
||||
|
||||
(primary_keys, with_ts, true)
|
||||
};
|
||||
|
||||
let proto_schema = column_schemas_to_proto(schema, &primary_keys)?;
|
||||
@@ -336,16 +349,32 @@ impl FlownodeManager {
|
||||
table_name.join("."),
|
||||
reqs
|
||||
);
|
||||
|
||||
let now = SystemTime::now();
|
||||
let now = now
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.map(|s| s.as_millis() as repr::Timestamp)
|
||||
.unwrap_or_else(|_| {
|
||||
-(SystemTime::UNIX_EPOCH
|
||||
.duration_since(now)
|
||||
.unwrap()
|
||||
.as_millis() as repr::Timestamp)
|
||||
});
|
||||
for req in reqs {
|
||||
match req {
|
||||
DiffRequest::Insert(insert) => {
|
||||
let rows_proto: Vec<v1::Row> = insert
|
||||
.into_iter()
|
||||
.map(|(mut row, _ts)| {
|
||||
row.extend(Some(Value::from(
|
||||
common_time::Timestamp::new_millisecond(0),
|
||||
)));
|
||||
// `update_at` col
|
||||
row.extend([Value::from(common_time::Timestamp::new_millisecond(
|
||||
now,
|
||||
))]);
|
||||
// ts col, if auto create
|
||||
if is_auto_create {
|
||||
row.extend([Value::from(
|
||||
common_time::Timestamp::new_millisecond(0),
|
||||
)]);
|
||||
}
|
||||
row.into()
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
@@ -30,7 +30,7 @@ use crate::expr::GlobalId;
|
||||
use crate::repr::{DiffRow, RelationType, BROADCAST_CAP};
|
||||
|
||||
/// A context that holds the information of the dataflow
|
||||
#[derive(Default)]
|
||||
#[derive(Default, Debug)]
|
||||
pub struct FlownodeContext {
|
||||
/// mapping from source table to tasks, useful for schedule which task to run when a source table is updated
|
||||
pub source_to_tasks: BTreeMap<TableId, BTreeSet<FlowId>>,
|
||||
@@ -64,6 +64,7 @@ pub struct FlownodeContext {
|
||||
///
|
||||
/// receiver still use tokio broadcast channel, since only sender side need to know
|
||||
/// backpressure and adjust dataflow running duration to avoid blocking
|
||||
#[derive(Debug)]
|
||||
pub struct SourceSender {
|
||||
sender: broadcast::Sender<DiffRow>,
|
||||
send_buf: VecDeque<DiffRow>,
|
||||
|
||||
@@ -223,11 +223,11 @@ mod test {
|
||||
use hydroflow::scheduled::graph::Hydroflow;
|
||||
use hydroflow::scheduled::graph_ext::GraphExt;
|
||||
use hydroflow::scheduled::handoff::VecHandoff;
|
||||
use pretty_assertions::{assert_eq, assert_ne};
|
||||
|
||||
use super::*;
|
||||
use crate::expr::BinaryFunc;
|
||||
use crate::repr::Row;
|
||||
|
||||
pub fn run_and_check(
|
||||
state: &mut DataflowState,
|
||||
df: &mut Hydroflow,
|
||||
|
||||
@@ -739,6 +739,7 @@ mod test {
|
||||
use std::cell::RefCell;
|
||||
use std::rc::Rc;
|
||||
|
||||
use common_time::{DateTime, Interval, Timestamp};
|
||||
use datatypes::data_type::{ConcreteDataType, ConcreteDataType as CDT};
|
||||
use hydroflow::scheduled::graph::Hydroflow;
|
||||
|
||||
@@ -748,6 +749,165 @@ mod test {
|
||||
use crate::expr::{self, AggregateFunc, BinaryFunc, GlobalId, MapFilterProject, UnaryFunc};
|
||||
use crate::repr::{ColumnType, RelationType};
|
||||
|
||||
/// SELECT sum(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00')
|
||||
/// input table columns: number, ts
|
||||
/// expected: sum(number), window_start, window_end
|
||||
#[test]
|
||||
fn test_tumble_group_by() {
|
||||
let mut df = Hydroflow::new();
|
||||
let mut state = DataflowState::default();
|
||||
let mut ctx = harness_test_ctx(&mut df, &mut state);
|
||||
const START: i64 = 1625097600000;
|
||||
let rows = vec![
|
||||
(1u32, START + 1000),
|
||||
(2u32, START + 1500),
|
||||
(3u32, START + 2000),
|
||||
(1u32, START + 2500),
|
||||
(2u32, START + 3000),
|
||||
(3u32, START + 3500),
|
||||
];
|
||||
let rows = rows
|
||||
.into_iter()
|
||||
.map(|(number, ts)| {
|
||||
(
|
||||
Row::new(vec![number.into(), Timestamp::new_millisecond(ts).into()]),
|
||||
1,
|
||||
1,
|
||||
)
|
||||
})
|
||||
.collect_vec();
|
||||
|
||||
let collection = ctx.render_constant(rows.clone());
|
||||
ctx.insert_global(GlobalId::User(1), collection);
|
||||
|
||||
let aggr_expr = AggregateExpr {
|
||||
func: AggregateFunc::SumUInt32,
|
||||
expr: ScalarExpr::Column(0),
|
||||
distinct: false,
|
||||
};
|
||||
let expected = TypedPlan {
|
||||
typ: RelationType::new(vec![
|
||||
ColumnType::new(CDT::uint64_datatype(), true), // sum(number)
|
||||
ColumnType::new(CDT::datetime_datatype(), false), // window start
|
||||
ColumnType::new(CDT::datetime_datatype(), false), // window end
|
||||
]),
|
||||
// TODO(discord9): mfp indirectly ref to key columns
|
||||
/*
|
||||
.with_key(vec![1])
|
||||
.with_time_index(Some(0)),*/
|
||||
plan: Plan::Mfp {
|
||||
input: Box::new(
|
||||
Plan::Reduce {
|
||||
input: Box::new(
|
||||
Plan::Get {
|
||||
id: crate::expr::Id::Global(GlobalId::User(1)),
|
||||
}
|
||||
.with_types(RelationType::new(vec![
|
||||
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
|
||||
ColumnType::new(ConcreteDataType::datetime_datatype(), false),
|
||||
])),
|
||||
),
|
||||
key_val_plan: KeyValPlan {
|
||||
key_plan: MapFilterProject::new(2)
|
||||
.map(vec![
|
||||
ScalarExpr::Column(1).call_unary(
|
||||
UnaryFunc::TumbleWindowFloor {
|
||||
window_size: Interval::from_month_day_nano(
|
||||
0,
|
||||
0,
|
||||
1_000_000_000,
|
||||
),
|
||||
start_time: Some(DateTime::new(1625097600000)),
|
||||
},
|
||||
),
|
||||
ScalarExpr::Column(1).call_unary(
|
||||
UnaryFunc::TumbleWindowCeiling {
|
||||
window_size: Interval::from_month_day_nano(
|
||||
0,
|
||||
0,
|
||||
1_000_000_000,
|
||||
),
|
||||
start_time: Some(DateTime::new(1625097600000)),
|
||||
},
|
||||
),
|
||||
])
|
||||
.unwrap()
|
||||
.project(vec![2, 3])
|
||||
.unwrap()
|
||||
.into_safe(),
|
||||
val_plan: MapFilterProject::new(2)
|
||||
.project(vec![0, 1])
|
||||
.unwrap()
|
||||
.into_safe(),
|
||||
},
|
||||
reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
|
||||
full_aggrs: vec![aggr_expr.clone()],
|
||||
simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)],
|
||||
distinct_aggrs: vec![],
|
||||
}),
|
||||
}
|
||||
.with_types(
|
||||
RelationType::new(vec![
|
||||
ColumnType::new(CDT::datetime_datatype(), false), // window start
|
||||
ColumnType::new(CDT::datetime_datatype(), false), // window end
|
||||
ColumnType::new(CDT::uint64_datatype(), true), //sum(number)
|
||||
])
|
||||
.with_key(vec![1])
|
||||
.with_time_index(Some(0)),
|
||||
),
|
||||
),
|
||||
mfp: MapFilterProject::new(3)
|
||||
.map(vec![
|
||||
ScalarExpr::Column(2),
|
||||
ScalarExpr::Column(3),
|
||||
ScalarExpr::Column(0),
|
||||
ScalarExpr::Column(1),
|
||||
])
|
||||
.unwrap()
|
||||
.project(vec![4, 5, 6])
|
||||
.unwrap(),
|
||||
},
|
||||
};
|
||||
|
||||
let bundle = ctx.render_plan(expected).unwrap();
|
||||
|
||||
let output = get_output_handle(&mut ctx, bundle);
|
||||
drop(ctx);
|
||||
let expected = BTreeMap::from([(
|
||||
1,
|
||||
vec![
|
||||
(
|
||||
Row::new(vec![
|
||||
3u64.into(),
|
||||
Timestamp::new_millisecond(START + 1000).into(),
|
||||
Timestamp::new_millisecond(START + 2000).into(),
|
||||
]),
|
||||
1,
|
||||
1,
|
||||
),
|
||||
(
|
||||
Row::new(vec![
|
||||
4u64.into(),
|
||||
Timestamp::new_millisecond(START + 2000).into(),
|
||||
Timestamp::new_millisecond(START + 3000).into(),
|
||||
]),
|
||||
1,
|
||||
1,
|
||||
),
|
||||
(
|
||||
Row::new(vec![
|
||||
5u64.into(),
|
||||
Timestamp::new_millisecond(START + 3000).into(),
|
||||
Timestamp::new_millisecond(START + 4000).into(),
|
||||
]),
|
||||
1,
|
||||
1,
|
||||
),
|
||||
],
|
||||
)]);
|
||||
run_and_check(&mut state, &mut df, 1..2, expected, output);
|
||||
}
|
||||
|
||||
/// select avg(number) from number;
|
||||
#[test]
|
||||
fn test_avg_eval() {
|
||||
|
||||
@@ -17,8 +17,10 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::OnceLock;
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_telemetry::debug;
|
||||
use common_time::DateTime;
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use common_time::{DateTime, Timestamp};
|
||||
use datafusion_expr::Operator;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::types::cast;
|
||||
@@ -30,14 +32,14 @@ use snafu::{ensure, OptionExt, ResultExt};
|
||||
use strum::{EnumIter, IntoEnumIterator};
|
||||
use substrait::df_logical_plan::consumer::name_to_op;
|
||||
|
||||
use crate::adapter::error::{Error, InvalidQuerySnafu, PlanSnafu};
|
||||
use crate::adapter::error::{Error, ExternalSnafu, InvalidQuerySnafu, PlanSnafu};
|
||||
use crate::expr::error::{
|
||||
CastValueSnafu, DivisionByZeroSnafu, EvalError, InternalSnafu, TryFromValueSnafu,
|
||||
TypeMismatchSnafu,
|
||||
CastValueSnafu, DivisionByZeroSnafu, EvalError, InternalSnafu, OverflowSnafu,
|
||||
TryFromValueSnafu, TypeMismatchSnafu,
|
||||
};
|
||||
use crate::expr::signature::{GenericFn, Signature};
|
||||
use crate::expr::{InvalidArgumentSnafu, ScalarExpr};
|
||||
use crate::repr::{value_to_internal_ts, Row};
|
||||
use crate::expr::{InvalidArgumentSnafu, ScalarExpr, TypedExpr};
|
||||
use crate::repr::{self, value_to_internal_ts, Row};
|
||||
|
||||
/// UnmaterializableFunc is a function that can't be eval independently,
|
||||
/// and require special handling
|
||||
@@ -45,6 +47,11 @@ use crate::repr::{value_to_internal_ts, Row};
|
||||
pub enum UnmaterializableFunc {
|
||||
Now,
|
||||
CurrentSchema,
|
||||
TumbleWindow {
|
||||
ts: Box<TypedExpr>,
|
||||
window_size: common_time::Interval,
|
||||
start_time: Option<DateTime>,
|
||||
},
|
||||
}
|
||||
|
||||
impl UnmaterializableFunc {
|
||||
@@ -61,14 +68,51 @@ impl UnmaterializableFunc {
|
||||
output: ConcreteDataType::string_datatype(),
|
||||
generic_fn: GenericFn::CurrentSchema,
|
||||
},
|
||||
Self::TumbleWindow { .. } => Signature {
|
||||
input: smallvec![ConcreteDataType::timestamp_millisecond_datatype()],
|
||||
output: ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
generic_fn: GenericFn::TumbleWindow,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a UnmaterializableFunc from a string of the function name
|
||||
pub fn from_str(name: &str) -> Result<Self, Error> {
|
||||
match name {
|
||||
pub fn from_str_args(name: &str, args: Vec<TypedExpr>) -> Result<Self, Error> {
|
||||
match name.to_lowercase().as_str() {
|
||||
"now" => Ok(Self::Now),
|
||||
"current_schema" => Ok(Self::CurrentSchema),
|
||||
"tumble" => {
|
||||
let ts = args.first().context(InvalidQuerySnafu {
|
||||
reason: "Tumble window function requires a timestamp argument",
|
||||
})?;
|
||||
let window_size = args
|
||||
.get(1)
|
||||
.and_then(|expr| expr.expr.as_literal())
|
||||
.context(InvalidQuerySnafu {
|
||||
reason: "Tumble window function requires a window size argument"
|
||||
})?.as_string() // TODO(discord9): since df to substrait convertor does not support interval type yet, we need to take a string and cast it to interval instead
|
||||
.map(|s|cast(Value::from(s), &ConcreteDataType::interval_month_day_nano_datatype())).transpose().map_err(BoxedError::new).context(
|
||||
ExternalSnafu
|
||||
)?.and_then(|v|v.as_interval())
|
||||
.with_context(||InvalidQuerySnafu {
|
||||
reason: format!("Tumble window function requires window size argument to be a string describe a interval, found {:?}", args.get(1))
|
||||
})?;
|
||||
let start_time = match args.get(2) {
|
||||
Some(start_time) => start_time.expr.as_literal(),
|
||||
None => None,
|
||||
}
|
||||
.map(|s| cast(s.clone(), &ConcreteDataType::datetime_datatype())).transpose().map_err(BoxedError::new).context(ExternalSnafu)?.map(|v|v.as_datetime().with_context(
|
||||
||InvalidQuerySnafu {
|
||||
reason: format!("Tumble window function requires start time argument to be a datetime describe in string, found {:?}", args.get(2))
|
||||
}
|
||||
)).transpose()?;
|
||||
|
||||
Ok(Self::TumbleWindow {
|
||||
ts: Box::new(ts.clone()),
|
||||
window_size,
|
||||
start_time,
|
||||
})
|
||||
}
|
||||
_ => InvalidQuerySnafu {
|
||||
reason: format!("Unknown unmaterializable function: {}", name),
|
||||
}
|
||||
@@ -87,6 +131,14 @@ pub enum UnaryFunc {
|
||||
IsFalse,
|
||||
StepTimestamp,
|
||||
Cast(ConcreteDataType),
|
||||
TumbleWindowFloor {
|
||||
window_size: common_time::Interval,
|
||||
start_time: Option<DateTime>,
|
||||
},
|
||||
TumbleWindowCeiling {
|
||||
window_size: common_time::Interval,
|
||||
start_time: Option<DateTime>,
|
||||
},
|
||||
}
|
||||
|
||||
impl UnaryFunc {
|
||||
@@ -118,6 +170,16 @@ impl UnaryFunc {
|
||||
output: to.clone(),
|
||||
generic_fn: GenericFn::Cast,
|
||||
},
|
||||
Self::TumbleWindowFloor { .. } => Signature {
|
||||
input: smallvec![ConcreteDataType::timestamp_millisecond_datatype()],
|
||||
output: ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
generic_fn: GenericFn::TumbleWindow,
|
||||
},
|
||||
Self::TumbleWindowCeiling { .. } => Signature {
|
||||
input: smallvec![ConcreteDataType::timestamp_millisecond_datatype()],
|
||||
output: ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
generic_fn: GenericFn::TumbleWindow,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -211,10 +273,51 @@ impl UnaryFunc {
|
||||
debug!("Cast to type: {to:?}, result: {:?}", res);
|
||||
res
|
||||
}
|
||||
Self::TumbleWindowFloor {
|
||||
window_size,
|
||||
start_time,
|
||||
} => {
|
||||
let ts = get_ts_as_millisecond(arg)?;
|
||||
let start_time = start_time.map(|t| t.val()).unwrap_or(0);
|
||||
let window_size = (window_size.to_nanosecond() / 1_000_000) as repr::Duration; // nanosecond to millisecond
|
||||
let window_start = start_time + (ts - start_time) / window_size * window_size;
|
||||
|
||||
let ret = Timestamp::new_millisecond(window_start);
|
||||
Ok(Value::from(ret))
|
||||
}
|
||||
Self::TumbleWindowCeiling {
|
||||
window_size,
|
||||
start_time,
|
||||
} => {
|
||||
let ts = get_ts_as_millisecond(arg)?;
|
||||
let start_time = start_time.map(|t| t.val()).unwrap_or(0);
|
||||
let window_size = (window_size.to_nanosecond() / 1_000_000) as repr::Duration; // nanosecond to millisecond
|
||||
let window_start = start_time + (ts - start_time) / window_size * window_size;
|
||||
|
||||
let window_end = window_start + window_size;
|
||||
let ret = Timestamp::new_millisecond(window_end);
|
||||
Ok(Value::from(ret))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn get_ts_as_millisecond(arg: Value) -> Result<repr::Timestamp, EvalError> {
|
||||
let ts = if let Some(ts) = arg.as_timestamp() {
|
||||
ts.convert_to(TimeUnit::Millisecond)
|
||||
.context(OverflowSnafu)?
|
||||
.value()
|
||||
} else if let Some(ts) = arg.as_datetime() {
|
||||
ts.val()
|
||||
} else {
|
||||
InvalidArgumentSnafu {
|
||||
reason: "Expect input to be timestamp or datetime type",
|
||||
}
|
||||
.fail()?
|
||||
};
|
||||
Ok(ts)
|
||||
}
|
||||
|
||||
/// BinaryFunc is a function that takes two arguments.
|
||||
/// Also notice this enum doesn't contain function arguments, since the arguments are stored in the expression.
|
||||
///
|
||||
|
||||
@@ -26,10 +26,10 @@ use crate::adapter::error::{
|
||||
};
|
||||
use crate::expr::error::{EvalError, InvalidArgumentSnafu, OptimizeSnafu};
|
||||
use crate::expr::func::{BinaryFunc, UnaryFunc, UnmaterializableFunc, VariadicFunc};
|
||||
use crate::repr::ColumnType;
|
||||
use crate::repr::{ColumnType, RelationType};
|
||||
|
||||
/// A scalar expression with a known type.
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
|
||||
pub struct TypedExpr {
|
||||
/// The expression.
|
||||
pub expr: ScalarExpr,
|
||||
@@ -43,7 +43,73 @@ impl TypedExpr {
|
||||
}
|
||||
}
|
||||
|
||||
/// TODO(discord9): add tumble function here
|
||||
impl TypedExpr {
|
||||
/// expand multi-value expression to multiple expressions with new indices
|
||||
pub fn expand_multi_value(
|
||||
input_typ: &RelationType,
|
||||
exprs: &[TypedExpr],
|
||||
) -> Result<Vec<TypedExpr>, Error> {
|
||||
// old indices in mfp, expanded expr
|
||||
let mut ret = vec![];
|
||||
let input_arity = input_typ.column_types.len();
|
||||
for (old_idx, expr) in exprs.iter().enumerate() {
|
||||
if let ScalarExpr::CallUnmaterializable(UnmaterializableFunc::TumbleWindow {
|
||||
ts,
|
||||
window_size,
|
||||
start_time,
|
||||
}) = &expr.expr
|
||||
{
|
||||
let floor = UnaryFunc::TumbleWindowFloor {
|
||||
window_size: *window_size,
|
||||
start_time: *start_time,
|
||||
};
|
||||
let ceil = UnaryFunc::TumbleWindowCeiling {
|
||||
window_size: *window_size,
|
||||
start_time: *start_time,
|
||||
};
|
||||
let floor = ScalarExpr::CallUnary {
|
||||
func: floor,
|
||||
expr: Box::new(ts.expr.clone()),
|
||||
}
|
||||
.with_type(ts.typ.clone());
|
||||
ret.push((None, floor));
|
||||
|
||||
let ceil = ScalarExpr::CallUnary {
|
||||
func: ceil,
|
||||
expr: Box::new(ts.expr.clone()),
|
||||
}
|
||||
.with_type(ts.typ.clone());
|
||||
ret.push((None, ceil));
|
||||
} else {
|
||||
ret.push((Some(input_arity + old_idx), expr.clone()))
|
||||
}
|
||||
}
|
||||
|
||||
// get shuffled index(old_idx -> new_idx)
|
||||
// note index is offset by input_arity because mfp is designed to be first include input columns then intermediate columns
|
||||
let shuffle = ret
|
||||
.iter()
|
||||
.map(|(old_idx, _)| *old_idx) // [Option<opt_idx>]
|
||||
.enumerate()
|
||||
.map(|(new, old)| (old, new + input_arity))
|
||||
.flat_map(|(old, new)| old.map(|o| (o, new)))
|
||||
.chain((0..input_arity).map(|i| (i, i))) // also remember to chain the input columns as not changed
|
||||
.collect::<BTreeMap<_, _>>();
|
||||
|
||||
// shuffle expr's index
|
||||
let exprs = ret
|
||||
.into_iter()
|
||||
.map(|(_, mut expr)| {
|
||||
// invariant: it is expect that no expr will try to refer the column being expanded
|
||||
expr.expr.permute_map(&shuffle)?;
|
||||
Ok(expr)
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
Ok(dbg!(exprs))
|
||||
}
|
||||
}
|
||||
|
||||
/// A scalar expression, which can be evaluated to a value.
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
pub enum ScalarExpr {
|
||||
@@ -84,6 +150,10 @@ pub enum ScalarExpr {
|
||||
}
|
||||
|
||||
impl ScalarExpr {
|
||||
pub fn with_type(self, typ: ColumnType) -> TypedExpr {
|
||||
TypedExpr::new(self, typ)
|
||||
}
|
||||
|
||||
/// try to determine the type of the expression
|
||||
pub fn typ(&self, context: &[ColumnType]) -> Result<ColumnType, Error> {
|
||||
match self {
|
||||
|
||||
@@ -64,4 +64,5 @@ pub enum GenericFn {
|
||||
// unmaterized func
|
||||
Now,
|
||||
CurrentSchema,
|
||||
TumbleWindow,
|
||||
}
|
||||
|
||||
@@ -206,6 +206,15 @@ impl RelationType {
|
||||
self
|
||||
}
|
||||
|
||||
/// will also remove time index from keys if it's in keys
|
||||
pub fn with_time_index(mut self, time_index: Option<usize>) -> Self {
|
||||
self.time_index = time_index;
|
||||
for key in &mut self.keys {
|
||||
key.remove_col(time_index.unwrap_or(usize::MAX));
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// Computes the number of columns in the relation.
|
||||
pub fn arity(&self) -> usize {
|
||||
self.column_types.len()
|
||||
|
||||
@@ -130,12 +130,60 @@ pub async fn sql_to_flow_plan(
|
||||
Ok(flow_plan)
|
||||
}
|
||||
|
||||
/// register flow-specific functions to the query engine
|
||||
pub fn register_function_to_query_engine(engine: &Arc<dyn QueryEngine>) {
|
||||
engine.register_function(Arc::new(TumbleFunction {}));
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct TumbleFunction {}
|
||||
|
||||
const TUMBLE_NAME: &str = "tumble";
|
||||
|
||||
impl std::fmt::Display for TumbleFunction {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "{}", TUMBLE_NAME.to_ascii_uppercase())
|
||||
}
|
||||
}
|
||||
|
||||
impl common_function::function::Function for TumbleFunction {
|
||||
fn name(&self) -> &str {
|
||||
TUMBLE_NAME
|
||||
}
|
||||
|
||||
fn return_type(&self, _input_types: &[CDT]) -> common_query::error::Result<CDT> {
|
||||
Ok(CDT::datetime_datatype())
|
||||
}
|
||||
|
||||
fn signature(&self) -> common_query::prelude::Signature {
|
||||
common_query::prelude::Signature::variadic_any(common_query::prelude::Volatility::Immutable)
|
||||
}
|
||||
|
||||
fn eval(
|
||||
&self,
|
||||
_func_ctx: common_function::function::FunctionContext,
|
||||
_columns: &[datatypes::prelude::VectorRef],
|
||||
) -> common_query::error::Result<datatypes::prelude::VectorRef> {
|
||||
UnexpectedSnafu {
|
||||
reason: "Tumbler function is not implemented for datafusion executor",
|
||||
}
|
||||
.fail()
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_query::error::ExecuteSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::sync::Arc;
|
||||
|
||||
use catalog::RegisterTableRequest;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, NUMBERS_TABLE_ID};
|
||||
use common_time::{Date, DateTime};
|
||||
use datatypes::prelude::*;
|
||||
use datatypes::schema::Schema;
|
||||
use datatypes::vectors::VectorRef;
|
||||
use itertools::Itertools;
|
||||
use prost::Message;
|
||||
use query::parser::QueryLanguageParser;
|
||||
use query::plan::LogicalPlan;
|
||||
@@ -144,23 +192,45 @@ mod test {
|
||||
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
|
||||
use substrait_proto::proto;
|
||||
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
|
||||
use table::test_util::MemTable;
|
||||
|
||||
use super::*;
|
||||
use crate::adapter::node_context::IdToNameMap;
|
||||
use crate::repr::ColumnType;
|
||||
|
||||
pub fn create_test_ctx() -> FlownodeContext {
|
||||
let gid = GlobalId::User(0);
|
||||
let name = [
|
||||
"greptime".to_string(),
|
||||
"public".to_string(),
|
||||
"numbers".to_string(),
|
||||
];
|
||||
let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]);
|
||||
let mut schemas = HashMap::new();
|
||||
let mut tri_map = IdToNameMap::new();
|
||||
tri_map.insert(Some(name.clone()), Some(0), gid);
|
||||
{
|
||||
let gid = GlobalId::User(0);
|
||||
let name = [
|
||||
"greptime".to_string(),
|
||||
"public".to_string(),
|
||||
"numbers".to_string(),
|
||||
];
|
||||
let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]);
|
||||
|
||||
tri_map.insert(Some(name.clone()), Some(1024), gid);
|
||||
schemas.insert(gid, schema);
|
||||
}
|
||||
|
||||
{
|
||||
let gid = GlobalId::User(1);
|
||||
let name = [
|
||||
"greptime".to_string(),
|
||||
"public".to_string(),
|
||||
"numbers_with_ts".to_string(),
|
||||
];
|
||||
let schema = RelationType::new(vec![
|
||||
ColumnType::new(CDT::uint32_datatype(), false),
|
||||
ColumnType::new(CDT::datetime_datatype(), false),
|
||||
]);
|
||||
schemas.insert(gid, schema);
|
||||
tri_map.insert(Some(name.clone()), Some(1025), gid);
|
||||
}
|
||||
|
||||
FlownodeContext {
|
||||
schema: HashMap::from([(gid, schema)]),
|
||||
schema: schemas,
|
||||
table_repr: tri_map,
|
||||
query_context: Some(Arc::new(QueryContext::with("greptime", "public"))),
|
||||
..Default::default()
|
||||
@@ -177,9 +247,37 @@ mod test {
|
||||
table: NumbersTable::table(NUMBERS_TABLE_ID),
|
||||
};
|
||||
catalog_list.register_table_sync(req).unwrap();
|
||||
|
||||
let schema = vec![
|
||||
datatypes::schema::ColumnSchema::new("number", CDT::uint32_datatype(), false),
|
||||
datatypes::schema::ColumnSchema::new("ts", CDT::datetime_datatype(), false),
|
||||
];
|
||||
let mut columns = vec![];
|
||||
let numbers = (1..=10).collect_vec();
|
||||
let column: VectorRef = Arc::new(<u32 as Scalar>::VectorType::from_vec(numbers));
|
||||
columns.push(column);
|
||||
|
||||
let ts = (1..=10).collect_vec();
|
||||
let column: VectorRef = Arc::new(<DateTime as Scalar>::VectorType::from_vec(ts));
|
||||
columns.push(column);
|
||||
|
||||
let schema = Arc::new(Schema::new(schema));
|
||||
let recordbatch = common_recordbatch::RecordBatch::new(schema, columns).unwrap();
|
||||
let table = MemTable::table("numbers_with_ts", recordbatch);
|
||||
|
||||
let req_with_ts = RegisterTableRequest {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: "numbers_with_ts".to_string(),
|
||||
table_id: 1024,
|
||||
table,
|
||||
};
|
||||
catalog_list.register_table_sync(req_with_ts).unwrap();
|
||||
|
||||
let factory = query::QueryEngineFactory::new(catalog_list, None, None, None, false);
|
||||
|
||||
let engine = factory.query_engine();
|
||||
engine.register_function(Arc::new(TumbleFunction {}));
|
||||
|
||||
assert_eq!("datafusion", engine.name());
|
||||
engine
|
||||
|
||||
@@ -302,8 +302,26 @@ impl TypedPlan {
|
||||
return not_impl_err!("Aggregate without an input is not supported");
|
||||
};
|
||||
|
||||
let group_exprs =
|
||||
TypedExpr::from_substrait_agg_grouping(ctx, &agg.groupings, &input.typ, extensions)?;
|
||||
let group_exprs = {
|
||||
let group_exprs = TypedExpr::from_substrait_agg_grouping(
|
||||
ctx,
|
||||
&agg.groupings,
|
||||
&input.typ,
|
||||
extensions,
|
||||
)?;
|
||||
|
||||
TypedExpr::expand_multi_value(&input.typ, &group_exprs)?
|
||||
};
|
||||
|
||||
let time_index = group_exprs.iter().position(|expr| {
|
||||
matches!(
|
||||
&expr.expr,
|
||||
ScalarExpr::CallUnary {
|
||||
func: UnaryFunc::TumbleWindowFloor { .. },
|
||||
expr: _
|
||||
}
|
||||
)
|
||||
});
|
||||
|
||||
let (mut aggr_exprs, post_mfp) =
|
||||
AggregateExpr::from_substrait_agg_measures(ctx, &agg.measures, &input.typ, extensions)?;
|
||||
@@ -314,6 +332,7 @@ impl TypedPlan {
|
||||
input.typ.column_types.len(),
|
||||
)?;
|
||||
|
||||
// output type is group_exprs + aggr_exprs
|
||||
let output_type = {
|
||||
let mut output_types = Vec::new();
|
||||
// first append group_expr as key, then aggr_expr as value
|
||||
@@ -332,7 +351,8 @@ impl TypedPlan {
|
||||
} else {
|
||||
RelationType::new(output_types).with_key((0..group_exprs.len()).collect_vec())
|
||||
}
|
||||
};
|
||||
}
|
||||
.with_time_index(time_index);
|
||||
|
||||
// copy aggr_exprs to full_aggrs, and split them into simple_aggrs and distinct_aggrs
|
||||
// also set them input/output column
|
||||
@@ -406,6 +426,7 @@ impl TypedPlan {
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use common_time::{DateTime, Interval};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use pretty_assertions::{assert_eq, assert_ne};
|
||||
|
||||
@@ -414,6 +435,106 @@ mod test {
|
||||
use crate::repr::{self, ColumnType, RelationType};
|
||||
use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_tumble_parse() {
|
||||
let engine = create_test_query_engine();
|
||||
let sql = "SELECT sum(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 hour', '2021-07-01 00:00:00')";
|
||||
let plan = sql_to_substrait(engine.clone(), sql).await;
|
||||
|
||||
let mut ctx = create_test_ctx();
|
||||
let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).unwrap();
|
||||
|
||||
let aggr_expr = AggregateExpr {
|
||||
func: AggregateFunc::SumUInt32,
|
||||
expr: ScalarExpr::Column(0),
|
||||
distinct: false,
|
||||
};
|
||||
let expected = TypedPlan {
|
||||
typ: RelationType::new(vec![
|
||||
ColumnType::new(CDT::uint64_datatype(), true), // sum(number)
|
||||
ColumnType::new(CDT::datetime_datatype(), false), // window start
|
||||
ColumnType::new(CDT::datetime_datatype(), false), // window end
|
||||
]),
|
||||
// TODO(discord9): mfp indirectly ref to key columns
|
||||
/*
|
||||
.with_key(vec![1])
|
||||
.with_time_index(Some(0)),*/
|
||||
plan: Plan::Mfp {
|
||||
input: Box::new(
|
||||
Plan::Reduce {
|
||||
input: Box::new(
|
||||
Plan::Get {
|
||||
id: crate::expr::Id::Global(GlobalId::User(1)),
|
||||
}
|
||||
.with_types(RelationType::new(vec![
|
||||
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
|
||||
ColumnType::new(ConcreteDataType::datetime_datatype(), false),
|
||||
])),
|
||||
),
|
||||
key_val_plan: KeyValPlan {
|
||||
key_plan: MapFilterProject::new(2)
|
||||
.map(vec![
|
||||
ScalarExpr::Column(1).call_unary(
|
||||
UnaryFunc::TumbleWindowFloor {
|
||||
window_size: Interval::from_month_day_nano(
|
||||
0,
|
||||
0,
|
||||
3_600_000_000_000,
|
||||
),
|
||||
start_time: Some(DateTime::new(1625097600000)),
|
||||
},
|
||||
),
|
||||
ScalarExpr::Column(1).call_unary(
|
||||
UnaryFunc::TumbleWindowCeiling {
|
||||
window_size: Interval::from_month_day_nano(
|
||||
0,
|
||||
0,
|
||||
3_600_000_000_000,
|
||||
),
|
||||
start_time: Some(DateTime::new(1625097600000)),
|
||||
},
|
||||
),
|
||||
])
|
||||
.unwrap()
|
||||
.project(vec![2, 3])
|
||||
.unwrap()
|
||||
.into_safe(),
|
||||
val_plan: MapFilterProject::new(2)
|
||||
.project(vec![0, 1])
|
||||
.unwrap()
|
||||
.into_safe(),
|
||||
},
|
||||
reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
|
||||
full_aggrs: vec![aggr_expr.clone()],
|
||||
simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)],
|
||||
distinct_aggrs: vec![],
|
||||
}),
|
||||
}
|
||||
.with_types(
|
||||
RelationType::new(vec![
|
||||
ColumnType::new(CDT::datetime_datatype(), false), // window start
|
||||
ColumnType::new(CDT::datetime_datatype(), false), // window end
|
||||
ColumnType::new(CDT::uint64_datatype(), true), //sum(number)
|
||||
])
|
||||
.with_key(vec![1])
|
||||
.with_time_index(Some(0)),
|
||||
),
|
||||
),
|
||||
mfp: MapFilterProject::new(3)
|
||||
.map(vec![
|
||||
ScalarExpr::Column(2),
|
||||
ScalarExpr::Column(3),
|
||||
ScalarExpr::Column(0),
|
||||
ScalarExpr::Column(1),
|
||||
])
|
||||
.unwrap()
|
||||
.project(vec![4, 5, 6])
|
||||
.unwrap(),
|
||||
},
|
||||
};
|
||||
assert_eq!(flow_plan, expected);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_avg_group_by() {
|
||||
let engine = create_test_query_engine();
|
||||
@@ -514,7 +635,8 @@ mod test {
|
||||
let plan = sql_to_substrait(engine.clone(), sql).await;
|
||||
|
||||
let mut ctx = create_test_ctx();
|
||||
let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan);
|
||||
|
||||
let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).unwrap();
|
||||
|
||||
let aggr_exprs = vec![
|
||||
AggregateExpr {
|
||||
@@ -587,7 +709,7 @@ mod test {
|
||||
.unwrap(),
|
||||
},
|
||||
};
|
||||
assert_eq!(flow_plan.unwrap(), expected);
|
||||
assert_eq!(flow_plan, expected);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -71,7 +71,7 @@ impl TypedExpr {
|
||||
),
|
||||
})?;
|
||||
let arg_len = f.arguments.len();
|
||||
let arg_exprs: Vec<TypedExpr> = f
|
||||
let arg_typed_exprs: Vec<TypedExpr> = f
|
||||
.arguments
|
||||
.iter()
|
||||
.map(|arg| match &arg.arg_type {
|
||||
@@ -83,7 +83,8 @@ impl TypedExpr {
|
||||
.try_collect()?;
|
||||
|
||||
// literal's type is determined by the function and type of other args
|
||||
let (arg_exprs, arg_types): (Vec<_>, Vec<_>) = arg_exprs
|
||||
let (arg_exprs, arg_types): (Vec<_>, Vec<_>) = arg_typed_exprs
|
||||
.clone()
|
||||
.into_iter()
|
||||
.map(
|
||||
|TypedExpr {
|
||||
@@ -174,7 +175,9 @@ impl TypedExpr {
|
||||
};
|
||||
expr.optimize();
|
||||
Ok(TypedExpr::new(expr, ret_type))
|
||||
} else if let Ok(func) = UnmaterializableFunc::from_str(fn_name) {
|
||||
} else if let Ok(func) =
|
||||
UnmaterializableFunc::from_str_args(fn_name, arg_typed_exprs)
|
||||
{
|
||||
let ret_type = ColumnType::new_nullable(func.signature().output.clone());
|
||||
Ok(TypedExpr::new(
|
||||
ScalarExpr::CallUnmaterializable(func),
|
||||
|
||||
@@ -12,6 +12,8 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use itertools::Itertools;
|
||||
use snafu::OptionExt;
|
||||
use substrait_proto::proto::expression::MaskExpression;
|
||||
@@ -22,8 +24,8 @@ use substrait_proto::proto::{plan_rel, Plan as SubPlan, Rel};
|
||||
use crate::adapter::error::{
|
||||
Error, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::expr::{MapFilterProject, TypedExpr};
|
||||
use crate::plan::{Plan, TypedPlan};
|
||||
use crate::expr::{MapFilterProject, ScalarExpr, TypedExpr, UnaryFunc};
|
||||
use crate::plan::{KeyValPlan, Plan, ReducePlan, TypedPlan};
|
||||
use crate::repr::{self, RelationType};
|
||||
use crate::transform::{substrait_proto, FlownodeContext, FunctionExtensions};
|
||||
|
||||
@@ -75,6 +77,7 @@ impl TypedPlan {
|
||||
} else {
|
||||
return not_impl_err!("Projection without an input is not supported");
|
||||
};
|
||||
|
||||
let mut exprs: Vec<TypedExpr> = vec![];
|
||||
for e in &p.expressions {
|
||||
let expr = TypedExpr::from_substrait_rex(e, &input.typ, extensions)?;
|
||||
@@ -97,6 +100,127 @@ impl TypedPlan {
|
||||
};
|
||||
Ok(TypedPlan { typ, plan })
|
||||
} else {
|
||||
/// if reduce_plan contains the special function like tumble floor/ceiling, add them to the proj_exprs
|
||||
fn rewrite_projection_after_reduce(
|
||||
key_val_plan: KeyValPlan,
|
||||
_reduce_plan: ReducePlan,
|
||||
reduce_output_type: &RelationType,
|
||||
proj_exprs: &mut Vec<TypedExpr>,
|
||||
) -> Result<(), Error> {
|
||||
// TODO: get keys correctly
|
||||
let key_exprs = key_val_plan
|
||||
.key_plan
|
||||
.projection
|
||||
.clone()
|
||||
.into_iter()
|
||||
.map(|i| {
|
||||
if i < key_val_plan.key_plan.input_arity {
|
||||
ScalarExpr::Column(i)
|
||||
} else {
|
||||
key_val_plan.key_plan.expressions
|
||||
[i - key_val_plan.key_plan.input_arity]
|
||||
.clone()
|
||||
}
|
||||
})
|
||||
.collect_vec();
|
||||
let mut shift_offset = 0;
|
||||
let special_keys = key_exprs
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.filter(|(_idx, p)| {
|
||||
if matches!(
|
||||
p,
|
||||
ScalarExpr::CallUnary {
|
||||
func: UnaryFunc::TumbleWindowFloor { .. },
|
||||
..
|
||||
} | ScalarExpr::CallUnary {
|
||||
func: UnaryFunc::TumbleWindowCeiling { .. },
|
||||
..
|
||||
}
|
||||
) {
|
||||
if matches!(
|
||||
p,
|
||||
ScalarExpr::CallUnary {
|
||||
func: UnaryFunc::TumbleWindowFloor { .. },
|
||||
..
|
||||
}
|
||||
) {
|
||||
shift_offset += 1;
|
||||
}
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
})
|
||||
.collect_vec();
|
||||
let spec_key_arity = special_keys.len();
|
||||
if spec_key_arity == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
{
|
||||
// shift proj_exprs to the right by spec_key_arity
|
||||
let max_used_col_in_proj = proj_exprs
|
||||
.iter()
|
||||
.map(|expr| {
|
||||
expr.expr
|
||||
.get_all_ref_columns()
|
||||
.into_iter()
|
||||
.max()
|
||||
.unwrap_or_default()
|
||||
})
|
||||
.max()
|
||||
.unwrap_or_default();
|
||||
|
||||
let shuffle = (0..=max_used_col_in_proj)
|
||||
.map(|col| (col, col + shift_offset))
|
||||
.collect::<BTreeMap<_, _>>();
|
||||
for proj_expr in proj_exprs.iter_mut() {
|
||||
proj_expr.expr.permute_map(&shuffle)?;
|
||||
} // add key to the end
|
||||
for (key_idx, _key_expr) in special_keys {
|
||||
// here we assume the output type of reduce operator is just first keys columns, then append value columns
|
||||
proj_exprs.push(
|
||||
ScalarExpr::Column(key_idx).with_type(
|
||||
reduce_output_type.column_types[key_idx].clone(),
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
match input.plan.clone() {
|
||||
Plan::Reduce {
|
||||
key_val_plan,
|
||||
reduce_plan,
|
||||
..
|
||||
} => {
|
||||
rewrite_projection_after_reduce(
|
||||
key_val_plan,
|
||||
reduce_plan,
|
||||
&input.typ,
|
||||
&mut exprs,
|
||||
)?;
|
||||
}
|
||||
Plan::Mfp { input, mfp: _ } => {
|
||||
if let Plan::Reduce {
|
||||
key_val_plan,
|
||||
reduce_plan,
|
||||
..
|
||||
} = input.plan
|
||||
{
|
||||
rewrite_projection_after_reduce(
|
||||
key_val_plan,
|
||||
reduce_plan,
|
||||
&input.typ,
|
||||
&mut exprs,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
input.projection(exprs)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,7 +24,9 @@ use object_store::Entry;
|
||||
use regex::Regex;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
|
||||
use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest};
|
||||
use table::table_reference::TableReference;
|
||||
|
||||
use crate::error;
|
||||
use crate::error::{CatalogSnafu, InvalidCopyDatabasePathSnafu};
|
||||
@@ -65,11 +67,29 @@ impl StatementExecutor {
|
||||
|
||||
let mut exported_rows = 0;
|
||||
for table_name in table_names {
|
||||
// TODO(hl): also handles tables with metric engine.
|
||||
// TODO(hl): remove this hardcode once we've removed numbers table.
|
||||
if table_name == "numbers" {
|
||||
continue;
|
||||
}
|
||||
|
||||
let table = self
|
||||
.get_table(&TableReference {
|
||||
catalog: &req.catalog_name,
|
||||
schema: &req.schema_name,
|
||||
table: &table_name,
|
||||
})
|
||||
.await?;
|
||||
// Ignores physical tables of metric engine.
|
||||
if table.table_info().meta.engine == METRIC_ENGINE_NAME
|
||||
&& !table
|
||||
.table_info()
|
||||
.meta
|
||||
.options
|
||||
.extra_options
|
||||
.contains_key(LOGICAL_TABLE_METADATA_KEY)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
let mut table_file = req.location.clone();
|
||||
table_file.push_str(&table_name);
|
||||
table_file.push_str(suffix);
|
||||
|
||||
@@ -19,6 +19,7 @@ use std::time::Duration;
|
||||
use arrow_schema::DataType;
|
||||
use async_recursion::async_recursion;
|
||||
use catalog::table_source::DfTableSourceProvider;
|
||||
use chrono::Utc;
|
||||
use common_time::interval::NANOS_PER_MILLI;
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use common_time::{Interval, Timestamp, Timezone};
|
||||
@@ -27,10 +28,13 @@ use datafusion::prelude::Column;
|
||||
use datafusion::scalar::ScalarValue;
|
||||
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter};
|
||||
use datafusion_common::{DFSchema, DataFusionError, Result as DFResult};
|
||||
use datafusion_expr::execution_props::ExecutionProps;
|
||||
use datafusion_expr::simplify::SimplifyContext;
|
||||
use datafusion_expr::{
|
||||
Aggregate, Analyze, Explain, Expr, ExprSchemable, Extension, LogicalPlan, LogicalPlanBuilder,
|
||||
Projection,
|
||||
};
|
||||
use datafusion_optimizer::simplify_expressions::ExprSimplifier;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use promql_parser::util::parse_duration;
|
||||
use session::context::QueryContextRef;
|
||||
@@ -108,34 +112,84 @@ fn parse_expr_to_string(args: &[Expr], i: usize) -> DFResult<String> {
|
||||
/// Parse a duraion expr:
|
||||
/// 1. duration string (e.g. `'1h'`)
|
||||
/// 2. Interval expr (e.g. `INTERVAL '1 year 3 hours 20 minutes'`)
|
||||
/// 3. An interval expr can be evaluated at the logical plan stage (e.g. `INTERVAL '2' day - INTERVAL '1' day`)
|
||||
fn parse_duration_expr(args: &[Expr], i: usize) -> DFResult<Duration> {
|
||||
let interval_to_duration = |interval: Interval| -> Duration {
|
||||
Duration::from_millis((interval.to_nanosecond() / NANOS_PER_MILLI as i128) as u64)
|
||||
};
|
||||
match args.get(i) {
|
||||
Some(Expr::Literal(ScalarValue::Utf8(Some(str)))) => {
|
||||
parse_duration(str).map_err(DataFusionError::Plan)
|
||||
}
|
||||
Some(Expr::Literal(ScalarValue::IntervalYearMonth(Some(i)))) => {
|
||||
Ok(interval_to_duration(Interval::from_i32(*i)))
|
||||
Some(expr) => {
|
||||
let ms = evaluate_expr_to_millisecond(args, i, true)?;
|
||||
if ms <= 0 {
|
||||
return Err(dispose_parse_error(Some(expr)));
|
||||
}
|
||||
Ok(Duration::from_millis(ms as u64))
|
||||
}
|
||||
Some(Expr::Literal(ScalarValue::IntervalDayTime(Some(i)))) => {
|
||||
Ok(interval_to_duration(Interval::from_i64(*i)))
|
||||
}
|
||||
Some(Expr::Literal(ScalarValue::IntervalMonthDayNano(Some(i)))) => {
|
||||
Ok(interval_to_duration(Interval::from_i128(*i)))
|
||||
}
|
||||
other => Err(dispose_parse_error(other)),
|
||||
None => Err(dispose_parse_error(None)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Evaluate a time calculation expr, case like:
|
||||
/// 1. `INTERVAL '1' day + INTERVAL '1 year 2 hours 3 minutes'`
|
||||
/// 2. `now() - INTERVAL '1' day` (when `interval_only==false`)
|
||||
///
|
||||
/// Output a millisecond timestamp
|
||||
///
|
||||
/// if `interval_only==true`, only accept expr with all interval type (case 2 will return a error)
|
||||
fn evaluate_expr_to_millisecond(args: &[Expr], i: usize, interval_only: bool) -> DFResult<i64> {
|
||||
let Some(expr) = args.get(i) else {
|
||||
return Err(dispose_parse_error(None));
|
||||
};
|
||||
if interval_only && !interval_only_in_expr(expr) {
|
||||
return Err(dispose_parse_error(Some(expr)));
|
||||
}
|
||||
let execution_props = ExecutionProps::new().with_query_execution_start_time(Utc::now());
|
||||
let info = SimplifyContext::new(&execution_props).with_schema(Arc::new(DFSchema::empty()));
|
||||
let interval_to_ms =
|
||||
|interval: Interval| -> i64 { (interval.to_nanosecond() / NANOS_PER_MILLI as i128) as i64 };
|
||||
let simplify_expr = ExprSimplifier::new(info).simplify(expr.clone())?;
|
||||
match simplify_expr {
|
||||
Expr::Literal(ScalarValue::TimestampNanosecond(ts_nanos, _))
|
||||
| Expr::Literal(ScalarValue::DurationNanosecond(ts_nanos)) => {
|
||||
ts_nanos.map(|v| v / 1_000_000)
|
||||
}
|
||||
Expr::Literal(ScalarValue::TimestampMicrosecond(ts_micros, _))
|
||||
| Expr::Literal(ScalarValue::DurationMicrosecond(ts_micros)) => {
|
||||
ts_micros.map(|v| v / 1_000)
|
||||
}
|
||||
Expr::Literal(ScalarValue::TimestampMillisecond(ts_millis, _))
|
||||
| Expr::Literal(ScalarValue::DurationMillisecond(ts_millis)) => ts_millis,
|
||||
Expr::Literal(ScalarValue::TimestampSecond(ts_secs, _))
|
||||
| Expr::Literal(ScalarValue::DurationSecond(ts_secs)) => ts_secs.map(|v| v * 1_000),
|
||||
Expr::Literal(ScalarValue::IntervalYearMonth(interval)) => {
|
||||
interval.map(|v| interval_to_ms(Interval::from_i32(v)))
|
||||
}
|
||||
Expr::Literal(ScalarValue::IntervalDayTime(interval)) => {
|
||||
interval.map(|v| interval_to_ms(Interval::from_i64(v)))
|
||||
}
|
||||
Expr::Literal(ScalarValue::IntervalMonthDayNano(interval)) => {
|
||||
interval.map(|v| interval_to_ms(Interval::from_i128(v)))
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
.ok_or_else(|| {
|
||||
DataFusionError::Plan(format!(
|
||||
"{} is not a expr can be evaluate and use in range query",
|
||||
expr.display_name().unwrap_or_default()
|
||||
))
|
||||
})
|
||||
}
|
||||
|
||||
/// Parse the `align to` clause and return a UTC timestamp with unit of millisecond,
|
||||
/// which is used as the basis for dividing time slot during the align operation.
|
||||
/// 1. NOW: align to current execute time
|
||||
/// 2. Timestamp string: align to specific timestamp
|
||||
/// 3. leave empty (as Default Option): align to unix epoch 0 (timezone aware)
|
||||
/// 3. An expr can be evaluated at the logical plan stage (e.g. `now() - INTERVAL '1' day`)
|
||||
/// 4. leave empty (as Default Option): align to unix epoch 0 (timezone aware)
|
||||
fn parse_align_to(args: &[Expr], i: usize, timezone: Option<&Timezone>) -> DFResult<i64> {
|
||||
let s = parse_str_expr(args, i)?;
|
||||
let Ok(s) = parse_str_expr(args, i) else {
|
||||
return evaluate_expr_to_millisecond(args, i, false);
|
||||
};
|
||||
let upper = s.to_uppercase();
|
||||
match upper.as_str() {
|
||||
"NOW" => return Ok(Timestamp::current_millis().value()),
|
||||
@@ -469,6 +523,25 @@ fn have_range_in_exprs(exprs: &[Expr]) -> bool {
|
||||
})
|
||||
}
|
||||
|
||||
fn interval_only_in_expr(expr: &Expr) -> bool {
|
||||
let mut all_interval = true;
|
||||
let _ = expr.apply(&mut |expr| {
|
||||
if !matches!(
|
||||
expr,
|
||||
Expr::Literal(ScalarValue::IntervalDayTime(_))
|
||||
| Expr::Literal(ScalarValue::IntervalMonthDayNano(_))
|
||||
| Expr::Literal(ScalarValue::IntervalYearMonth(_))
|
||||
| Expr::BinaryExpr(_)
|
||||
) {
|
||||
all_interval = false;
|
||||
Ok(TreeNodeRecursion::Stop)
|
||||
} else {
|
||||
Ok(TreeNodeRecursion::Continue)
|
||||
}
|
||||
});
|
||||
all_interval
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
|
||||
@@ -477,6 +550,7 @@ mod test {
|
||||
use catalog::memory::MemoryCatalogManager;
|
||||
use catalog::RegisterTableRequest;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use datafusion_expr::{BinaryExpr, Operator};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, Schema};
|
||||
use session::context::QueryContext;
|
||||
@@ -754,8 +828,42 @@ mod test {
|
||||
parse_duration_expr(&args, 0).unwrap(),
|
||||
parse_duration("1y4w").unwrap()
|
||||
);
|
||||
// test err
|
||||
// test index err
|
||||
assert!(parse_duration_expr(&args, 10).is_err());
|
||||
// test evaluate expr
|
||||
let args = vec![Expr::BinaryExpr(BinaryExpr {
|
||||
left: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
|
||||
Interval::from_year_month(10).to_i32(),
|
||||
)))),
|
||||
op: Operator::Plus,
|
||||
right: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
|
||||
Interval::from_year_month(10).to_i32(),
|
||||
)))),
|
||||
})];
|
||||
assert_eq!(
|
||||
parse_duration_expr(&args, 0).unwrap().as_millis(),
|
||||
interval_to_ms(Interval::from_year_month(20))
|
||||
);
|
||||
let args = vec![Expr::BinaryExpr(BinaryExpr {
|
||||
left: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
|
||||
Interval::from_year_month(10).to_i32(),
|
||||
)))),
|
||||
op: Operator::Minus,
|
||||
right: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
|
||||
Interval::from_year_month(10).to_i32(),
|
||||
)))),
|
||||
})];
|
||||
// test zero interval error
|
||||
assert!(parse_duration_expr(&args, 0).is_err());
|
||||
// test must all be interval
|
||||
let args = vec![Expr::BinaryExpr(BinaryExpr {
|
||||
left: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
|
||||
Interval::from_year_month(10).to_i32(),
|
||||
)))),
|
||||
op: Operator::Minus,
|
||||
right: Box::new(Expr::Literal(ScalarValue::Time64Microsecond(Some(0)))),
|
||||
})];
|
||||
assert!(parse_duration_expr(&args, 0).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -787,19 +895,56 @@ mod test {
|
||||
let args = vec![Expr::Literal(ScalarValue::Utf8(Some(
|
||||
"1970-01-01T00:00:00+08:00".into(),
|
||||
)))];
|
||||
assert!(parse_align_to(&args, 0, None).unwrap() == -8 * 60 * 60 * 1000);
|
||||
assert_eq!(parse_align_to(&args, 0, None).unwrap(), -8 * 60 * 60 * 1000);
|
||||
// timezone
|
||||
let args = vec![Expr::Literal(ScalarValue::Utf8(Some(
|
||||
"1970-01-01T00:00:00".into(),
|
||||
)))];
|
||||
assert!(
|
||||
assert_eq!(
|
||||
parse_align_to(
|
||||
&args,
|
||||
0,
|
||||
Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap())
|
||||
)
|
||||
.unwrap()
|
||||
== -8 * 60 * 60 * 1000
|
||||
.unwrap(),
|
||||
-8 * 60 * 60 * 1000
|
||||
);
|
||||
// test evaluate expr
|
||||
let args = vec![Expr::BinaryExpr(BinaryExpr {
|
||||
left: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
|
||||
Interval::from_year_month(10).to_i32(),
|
||||
)))),
|
||||
op: Operator::Plus,
|
||||
right: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
|
||||
Interval::from_year_month(10).to_i32(),
|
||||
)))),
|
||||
})];
|
||||
assert_eq!(
|
||||
parse_align_to(&args, 0, None).unwrap(),
|
||||
// 20 month
|
||||
20 * 30 * 24 * 60 * 60 * 1000
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_interval_only() {
|
||||
let expr = Expr::BinaryExpr(BinaryExpr {
|
||||
left: Box::new(Expr::Literal(ScalarValue::DurationMillisecond(Some(20)))),
|
||||
op: Operator::Minus,
|
||||
right: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
|
||||
Interval::from_year_month(10).to_i32(),
|
||||
)))),
|
||||
});
|
||||
assert!(!interval_only_in_expr(&expr));
|
||||
let expr = Expr::BinaryExpr(BinaryExpr {
|
||||
left: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
|
||||
Interval::from_year_month(10).to_i32(),
|
||||
)))),
|
||||
op: Operator::Minus,
|
||||
right: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
|
||||
Interval::from_year_month(10).to_i32(),
|
||||
)))),
|
||||
});
|
||||
assert!(interval_only_in_expr(&expr));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -308,6 +308,71 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_alter_change_column_alias_type() {
|
||||
let sql_1 = "ALTER TABLE my_metric_1 MODIFY COLUMN a MediumText";
|
||||
let mut result_1 = ParserContext::create_with_dialect(
|
||||
sql_1,
|
||||
&GreptimeDbDialect {},
|
||||
ParseOptions::default(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
match result_1.remove(0) {
|
||||
Statement::Alter(alter_table) => {
|
||||
assert_eq!("my_metric_1", alter_table.table_name().0[0].value);
|
||||
|
||||
let alter_operation = alter_table.alter_operation();
|
||||
assert_matches!(
|
||||
alter_operation,
|
||||
AlterTableOperation::ChangeColumnType { .. }
|
||||
);
|
||||
match alter_operation {
|
||||
AlterTableOperation::ChangeColumnType {
|
||||
column_name,
|
||||
target_type,
|
||||
} => {
|
||||
assert_eq!("a", column_name.value);
|
||||
assert_eq!(DataType::Text, *target_type);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
|
||||
let sql_2 = "ALTER TABLE my_metric_1 MODIFY COLUMN a TIMESTAMP_US";
|
||||
let mut result_2 = ParserContext::create_with_dialect(
|
||||
sql_2,
|
||||
&GreptimeDbDialect {},
|
||||
ParseOptions::default(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
match result_2.remove(0) {
|
||||
Statement::Alter(alter_table) => {
|
||||
assert_eq!("my_metric_1", alter_table.table_name().0[0].value);
|
||||
|
||||
let alter_operation = alter_table.alter_operation();
|
||||
assert_matches!(
|
||||
alter_operation,
|
||||
AlterTableOperation::ChangeColumnType { .. }
|
||||
);
|
||||
match alter_operation {
|
||||
AlterTableOperation::ChangeColumnType {
|
||||
column_name,
|
||||
target_type,
|
||||
} => {
|
||||
assert_eq!("a", column_name.value);
|
||||
assert!(matches!(target_type, DataType::Timestamp(Some(6), _)));
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_alter_rename_table() {
|
||||
let sql = "ALTER TABLE test_table table_t";
|
||||
|
||||
@@ -104,17 +104,19 @@ impl<'a> ParserContext<'a> {
|
||||
let (start, end, step, lookback) = match parser.peek_token().token {
|
||||
Token::LParen => {
|
||||
let _consume_lparen_token = parser.next_token();
|
||||
let start = Self::parse_string_or_number_or_word(parser, Token::Comma)?;
|
||||
let end = Self::parse_string_or_number_or_word(parser, Token::Comma)?;
|
||||
let delimiter_token = Self::find_next_delimiter_token(parser);
|
||||
let (step, lookback) = if Self::is_comma(&delimiter_token) {
|
||||
let step = Self::parse_string_or_number_or_word(parser, Token::Comma)?;
|
||||
let lookback = Self::parse_string_or_number_or_word(parser, Token::RParen).ok();
|
||||
(step, lookback)
|
||||
let start = Self::parse_string_or_number_or_word(parser, &[Token::Comma])?.0;
|
||||
let end = Self::parse_string_or_number_or_word(parser, &[Token::Comma])?.0;
|
||||
|
||||
let (step, delimiter) =
|
||||
Self::parse_string_or_number_or_word(parser, &[Token::Comma, Token::RParen])?;
|
||||
let lookback = if delimiter == Token::Comma {
|
||||
Self::parse_string_or_number_or_word(parser, &[Token::RParen])
|
||||
.ok()
|
||||
.map(|t| t.0)
|
||||
} else {
|
||||
let step = Self::parse_string_or_number_or_word(parser, Token::RParen)?;
|
||||
(step, None)
|
||||
None
|
||||
};
|
||||
|
||||
(start, end, step, lookback)
|
||||
}
|
||||
_ => ("0".to_string(), "0".to_string(), "5m".to_string(), None),
|
||||
@@ -123,22 +125,8 @@ impl<'a> ParserContext<'a> {
|
||||
Ok(TqlParameters::new(start, end, step, lookback, query))
|
||||
}
|
||||
|
||||
fn find_next_delimiter_token(parser: &mut Parser) -> Token {
|
||||
let mut n: usize = 0;
|
||||
while !(Self::is_comma(&parser.peek_nth_token(n).token)
|
||||
|| Self::is_rparen(&parser.peek_nth_token(n).token))
|
||||
{
|
||||
n += 1;
|
||||
}
|
||||
parser.peek_nth_token(n).token
|
||||
}
|
||||
|
||||
pub fn is_delimiter_token(token: &Token, delimiter_token: &Token) -> bool {
|
||||
match token {
|
||||
Token::Comma => Self::is_comma(delimiter_token),
|
||||
Token::RParen => Self::is_rparen(delimiter_token),
|
||||
_ => false,
|
||||
}
|
||||
pub fn comma_or_rparen(token: &Token) -> bool {
|
||||
Self::is_comma(token) || Self::is_rparen(token)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
@@ -155,15 +143,21 @@ impl<'a> ParserContext<'a> {
|
||||
self.peek_token_as_string().eq_ignore_ascii_case(VERBOSE)
|
||||
}
|
||||
|
||||
/// Try to parse and consume a string, number or word token.
|
||||
/// Return `Ok` if it's parsed and one of the given delimiter tokens is consumed.
|
||||
/// The string and matched delimiter will be returned as a tuple.
|
||||
fn parse_string_or_number_or_word(
|
||||
parser: &mut Parser,
|
||||
delimiter_token: Token,
|
||||
) -> std::result::Result<String, TQLError> {
|
||||
delimiter_tokens: &[Token],
|
||||
) -> std::result::Result<(String, Token), TQLError> {
|
||||
let mut tokens = vec![];
|
||||
|
||||
while !Self::is_delimiter_token(&parser.peek_token().token, &delimiter_token) {
|
||||
let token = parser.next_token();
|
||||
tokens.push(token.token);
|
||||
while !delimiter_tokens.contains(&parser.peek_token().token) {
|
||||
let token = parser.next_token().token;
|
||||
if matches!(token, Token::EOF) {
|
||||
break;
|
||||
}
|
||||
tokens.push(token);
|
||||
}
|
||||
let result = match tokens.len() {
|
||||
0 => Err(ParserError::ParserError(
|
||||
@@ -186,8 +180,15 @@ impl<'a> ParserContext<'a> {
|
||||
}
|
||||
_ => Self::parse_tokens(tokens),
|
||||
};
|
||||
parser.expect_token(&delimiter_token).context(ParserSnafu)?;
|
||||
result
|
||||
for token in delimiter_tokens {
|
||||
if parser.consume_token(token) {
|
||||
return result.map(|v| (v, token.clone()));
|
||||
}
|
||||
}
|
||||
Err(ParserError::ParserError(format!(
|
||||
"Delimiters not match {delimiter_tokens:?}"
|
||||
)))
|
||||
.context(ParserSnafu)
|
||||
}
|
||||
|
||||
fn parse_tokens(tokens: Vec<Token>) -> std::result::Result<String, TQLError> {
|
||||
@@ -733,5 +734,11 @@ mod tests {
|
||||
let result =
|
||||
ParserContext::create_with_dialect(sql, dialect, parse_options.clone()).unwrap_err();
|
||||
assert!(result.output_msg().contains("empty TQL query"));
|
||||
|
||||
// invalid token
|
||||
let sql = "tql eval (0, 0, '1s) t;;';";
|
||||
let result =
|
||||
ParserContext::create_with_dialect(sql, dialect, parse_options.clone()).unwrap_err();
|
||||
assert!(result.output_msg().contains("Delimiters not match"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,6 +39,10 @@ impl AlterTable {
|
||||
pub fn alter_operation(&self) -> &AlterTableOperation {
|
||||
&self.alter_operation
|
||||
}
|
||||
|
||||
pub fn alter_operation_mut(&mut self) -> &mut AlterTableOperation {
|
||||
&mut self.alter_operation
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for AlterTable {
|
||||
|
||||
@@ -20,6 +20,7 @@ use sqlparser::ast::{
|
||||
};
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::statements::alter::AlterTableOperation;
|
||||
use crate::statements::create::{CreateExternalTable, CreateTable};
|
||||
use crate::statements::statement::Statement;
|
||||
use crate::statements::transform::TransformRule;
|
||||
@@ -51,6 +52,13 @@ impl TransformRule for TypeAliasTransformRule {
|
||||
.iter_mut()
|
||||
.for_each(|ColumnDef { data_type, .. }| replace_type_alias(data_type));
|
||||
}
|
||||
Statement::Alter(alter_table) => {
|
||||
if let AlterTableOperation::ChangeColumnType { target_type, .. } =
|
||||
alter_table.alter_operation_mut()
|
||||
{
|
||||
replace_type_alias(target_type)
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
|
||||
@@ -39,6 +39,8 @@ pub const DATA_REGION_SUBDIR: &str = "data";
|
||||
|
||||
pub const METRIC_ENGINE_NAME: &str = "metric";
|
||||
|
||||
pub const FILE_ENGINE_NAME: &str = "file";
|
||||
|
||||
/// Metadata key present in the `CREATE TABLE ... WITH ()` clause. This key is
|
||||
/// used to identify the table is a physical metric table. E.g.:
|
||||
/// ```sql
|
||||
|
||||
@@ -98,11 +98,11 @@ Error: 3000(PlanQuery), DataFusion error: Error during planning: duration must b
|
||||
|
||||
SELECT min(val) RANGE '5s' FROM host ALIGN (INTERVAL '0' day);
|
||||
|
||||
Error: 2000(InvalidSyntax), Range Query: Can't use 0 as align in Range Query
|
||||
Error: 3000(PlanQuery), DataFusion error: Error during planning: Illegal argument `IntervalMonthDayNano("0")` in range select query
|
||||
|
||||
SELECT min(val) RANGE (INTERVAL '0' day) FROM host ALIGN '5s';
|
||||
|
||||
Error: 2000(InvalidSyntax), Range Query: Invalid Range expr `MIN(host.val) RANGE IntervalMonthDayNano("0")`, Can't use 0 as range in Range Query
|
||||
Error: 3000(PlanQuery), DataFusion error: Error during planning: Illegal argument `IntervalMonthDayNano("0")` in range select query
|
||||
|
||||
DROP TABLE host;
|
||||
|
||||
|
||||
@@ -82,6 +82,30 @@ SELECT ts, min(val) RANGE (INTERVAL '1' day) FROM host ALIGN (INTERVAL '1' day)
|
||||
| 2024-01-24T23:00:00 | 3 |
|
||||
+---------------------+------------------------------------------------------------------+
|
||||
|
||||
SELECT ts, min(val) RANGE (INTERVAL '2' day - INTERVAL '1' day) FROM host ALIGN (INTERVAL '2' day - INTERVAL '1' day) TO (now() - (now() + INTERVAL '1' hour)) by (1) ORDER BY ts;
|
||||
|
||||
+---------------------+-----------------------------------------------------------------------------------------------------------------+
|
||||
| ts | MIN(host.val) RANGE IntervalMonthDayNano("36893488147419103232") - IntervalMonthDayNano("18446744073709551616") |
|
||||
+---------------------+-----------------------------------------------------------------------------------------------------------------+
|
||||
| 2024-01-22T23:00:00 | 0 |
|
||||
| 2024-01-23T23:00:00 | 1 |
|
||||
| 2024-01-24T23:00:00 | 3 |
|
||||
+---------------------+-----------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
-- non-positive duration
|
||||
SELECT ts, min(val) RANGE (INTERVAL '1' day - INTERVAL '2' day) FROM host ALIGN (INTERVAL '1' day) TO '1900-01-01T00:00:00+01:00' by (1) ORDER BY ts;
|
||||
|
||||
Error: 3000(PlanQuery), DataFusion error: Error during planning: Illegal argument `IntervalMonthDayNano("18446744073709551616") - IntervalMonthDayNano("36893488147419103232")` in range select query
|
||||
|
||||
SELECT ts, min(val) RANGE (INTERVAL '1' day - INTERVAL '1' day) FROM host ALIGN (INTERVAL '1' day) TO '1900-01-01T00:00:00+01:00' by (1) ORDER BY ts;
|
||||
|
||||
Error: 3000(PlanQuery), DataFusion error: Error during planning: Illegal argument `IntervalMonthDayNano("18446744073709551616") - IntervalMonthDayNano("18446744073709551616")` in range select query
|
||||
|
||||
-- duration not all interval
|
||||
SELECT ts, min(val) RANGE (now() - INTERVAL '1' day) FROM host ALIGN (INTERVAL '1' day) TO '1900-01-01T00:00:00+01:00' by (1) ORDER BY ts;
|
||||
|
||||
Error: 3000(PlanQuery), DataFusion error: Error during planning: Illegal argument `now() - IntervalMonthDayNano("18446744073709551616")` in range select query
|
||||
|
||||
--- ALIGN TO with time zone ---
|
||||
set time_zone='Asia/Shanghai';
|
||||
|
||||
|
||||
@@ -26,6 +26,18 @@ SELECT ts, host, min(val) RANGE '1d' FROM host ALIGN '1d' TO '2023-01-01T00:00:0
|
||||
|
||||
SELECT ts, min(val) RANGE (INTERVAL '1' day) FROM host ALIGN (INTERVAL '1' day) TO '1900-01-01T00:00:00+01:00' by (1) ORDER BY ts;
|
||||
|
||||
SELECT ts, min(val) RANGE (INTERVAL '2' day - INTERVAL '1' day) FROM host ALIGN (INTERVAL '2' day - INTERVAL '1' day) TO (now() - (now() + INTERVAL '1' hour)) by (1) ORDER BY ts;
|
||||
|
||||
-- non-positive duration
|
||||
|
||||
SELECT ts, min(val) RANGE (INTERVAL '1' day - INTERVAL '2' day) FROM host ALIGN (INTERVAL '1' day) TO '1900-01-01T00:00:00+01:00' by (1) ORDER BY ts;
|
||||
|
||||
SELECT ts, min(val) RANGE (INTERVAL '1' day - INTERVAL '1' day) FROM host ALIGN (INTERVAL '1' day) TO '1900-01-01T00:00:00+01:00' by (1) ORDER BY ts;
|
||||
|
||||
-- duration not all interval
|
||||
|
||||
SELECT ts, min(val) RANGE (now() - INTERVAL '1' day) FROM host ALIGN (INTERVAL '1' day) TO '1900-01-01T00:00:00+01:00' by (1) ORDER BY ts;
|
||||
|
||||
--- ALIGN TO with time zone ---
|
||||
set time_zone='Asia/Shanghai';
|
||||
|
||||
|
||||
Reference in New Issue
Block a user