Compare commits

..

15 Commits

Author SHA1 Message Date
evenyag
941906dc74 chore: bump version to v0.15.2
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-11 00:24:21 +08:00
Ruihang Xia
cbf251d0f0 fix: expand on conditional commutative as well (#6484)
* fix: expand on conditional commutative as well

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: discord9 <discord9@163.com>

* update sqlness result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: discord9 <discord9@163.com>

* add logging to figure test failure

Signed-off-by: discord9 <discord9@163.com>

* revert

Signed-off-by: discord9 <discord9@163.com>

* feat: stream drop record metrics

Signed-off-by: discord9 <discord9@163.com>

* Revert "feat: stream drop record metrics"

This reverts commit 6a16946a5b8ea37557bbb1b600847d24274d6500.

Signed-off-by: discord9 <discord9@163.com>

* feat: stream drop record metrics

Signed-off-by: discord9 <discord9@163.com>

refactor: move logging to drop too

Signed-off-by: discord9 <discord9@163.com>

fix: drop input stream before collect metrics

Signed-off-by: discord9 <discord9@163.com>

* fix: expand differently

Signed-off-by: discord9 <discord9@163.com>

* test: update sqlness

Signed-off-by: discord9 <discord9@163.com>

* chore: more dbg

Signed-off-by: discord9 <discord9@163.com>

* Revert "feat: stream drop record metrics"

This reverts commit 3eda4a2257928d95cf9c1328ae44fae84cfbb017.

Signed-off-by: discord9 <discord9@163.com>

* test: sqlness redacted

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: discord9 <discord9@163.com>
Co-authored-by: discord9 <discord9@163.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-11 00:24:21 +08:00
shuiyisong
1519379262 chore: skip calc ts in doc 2 with transform (#6509)
Signed-off-by: shuiyisong <xixing.sys@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-10 22:40:07 +08:00
localhost
4bfe02ec7f chore: remove region id to reduce time series (#6506)
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-10 22:40:07 +08:00
Weny Xu
ecacf1333e fix: correctly update partition key indices during alter table operations (#6494)
* fix: correctly update partition key indices in alter table operations

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: add sqlness tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-10 22:40:07 +08:00
Yingwen
92fa33c250 fix: range query returns range selector error when table not found (#6481)
* test: add sqlness test for range vector with non-existence metric

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: handle empty metric for matrix selector

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: update sqlness result

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: add newline

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-10 22:40:07 +08:00
shuiyisong
8b2d1a3753 fix: skip nan in prom remote write pipeline (#6489)
Signed-off-by: shuiyisong <xixing.sys@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-10 22:40:07 +08:00
Ning Sun
13401c94e0 feat: allow alternative version string (#6472)
* feat: allow alternative version string

* refactor: rename original version function to verbose_version

Signed-off-by: Ning Sun <sunning@greptime.com>

---------

Signed-off-by: Ning Sun <sunning@greptime.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-10 22:40:07 +08:00
shuiyisong
fd637dae47 chore: sort range query return values (#6474)
* chore: sort range query return values

* chore: add comments

* chore: add is_sorted check

* fix: test

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-10 22:40:07 +08:00
dennis zhuang
69fac19770 fix: empty statements hang (#6480)
* fix: empty statements hang

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* tests: add cases

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

---------

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-10 22:40:07 +08:00
discord9
6435b97314 fix: stricter win sort condition (#6477)
test: sqlness

test: fix sqlness redacted

Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-10 22:40:07 +08:00
Weny Xu
726e3909fe fix(metric-engine): handle stale metadata region recovery failures (#6395)
* fix(metric-engine): handle stale metadata region recovery failures

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: add unit tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-10 22:40:07 +08:00
evenyag
00d759e828 chore: bump version to v0.15.1
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-04 22:53:46 +08:00
Lei, HUANG
0042ea6462 fix: filter empty batch in bulk insert api (#6459)
* fix/filter-empty-batch-in-bulk-insert-api:
 **Add Early Return for Empty Record Batches in `bulk_insert.rs`**

 - Implemented an early return in the `Inserter` implementation to handle cases where `record_batch.num_rows()` is zero, improving efficiency by avoiding unnecessary processing.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix/filter-empty-batch-in-bulk-insert-api:
 **Improve Bulk Insert Handling**

 - **`handle_bulk_insert.rs`**: Added a check to handle cases where the batch has zero rows, immediately returning and sending a success response with zero rows processed.
 - **`bulk_insert.rs`**: Enhanced logic to skip processing for masks that select none, optimizing the bulk insert operation by avoiding unnecessary iterations.

 These changes improve the efficiency and robustness of the bulk insert process by handling edge cases more effectively.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix/filter-empty-batch-in-bulk-insert-api:
 ### Refactor and Error Handling Enhancements

 - **Refactored Timestamp Handling**: Introduced `timestamp_array_to_primitive` function in `timestamp.rs` to streamline conversion of timestamp arrays to primitive arrays, reducing redundancy in `handle_bulk_insert.rs` and `bulk_insert.rs`.
 - **Error Handling**: Added `InconsistentTimestampLength` error in `error.rs` to handle mismatched timestamp column lengths in bulk insert operations.
 - **Bulk Insert Logic**: Updated `handle_bulk_insert.rs` to utilize the new timestamp conversion function and added checks for timestamp length consistency.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix/filter-empty-batch-in-bulk-insert-api:
 **Refactor `bulk_insert.rs` to streamline imports**

 - Simplified import statements by removing unused timestamp-related arrays and data types from the `arrow` crate in `bulk_insert.rs`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-04 22:53:46 +08:00
Zhenchi
d06450715f fix: add backward compatibility for SkippingIndexOptions deserialization (#6458)
* fix: add backward compatibility for `SkippingIndexOptions` deserialization

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-04 22:53:46 +08:00
54 changed files with 1284 additions and 240 deletions

View File

@@ -12,3 +12,6 @@ fetch = true
checkout = true
list_files = true
internal_use_git2 = false
[env]
CARGO_WORKSPACE_DIR = { value = "", relative = true }

166
Cargo.lock generated
View File

@@ -211,7 +211,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
[[package]]
name = "api"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"common-base",
"common-decimal",
@@ -944,7 +944,7 @@ dependencies = [
[[package]]
name = "auth"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"api",
"async-trait",
@@ -1586,7 +1586,7 @@ dependencies = [
[[package]]
name = "cache"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"catalog",
"common-error",
@@ -1602,6 +1602,17 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acbc26382d871df4b7442e3df10a9402bf3cf5e55cbd66f12be38861425f0564"
[[package]]
name = "cargo-manifest"
version = "0.19.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d8af896b707212cd0e99c112a78c9497dd32994192a463ed2f7419d29bd8c6"
dependencies = [
"serde",
"thiserror 2.0.12",
"toml 0.8.19",
]
[[package]]
name = "cast"
version = "0.3.0"
@@ -1610,7 +1621,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "catalog"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"api",
"arrow 54.2.1",
@@ -1948,7 +1959,7 @@ checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97"
[[package]]
name = "cli"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"async-stream",
"async-trait",
@@ -1993,7 +2004,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.15.0",
"substrait 0.15.2",
"table",
"tempfile",
"tokio",
@@ -2002,7 +2013,7 @@ dependencies = [
[[package]]
name = "client"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"api",
"arc-swap",
@@ -2032,7 +2043,7 @@ dependencies = [
"rand 0.9.0",
"serde_json",
"snafu 0.8.5",
"substrait 0.15.0",
"substrait 0.15.2",
"substrait 0.37.3",
"tokio",
"tokio-stream",
@@ -2073,7 +2084,7 @@ dependencies = [
[[package]]
name = "cmd"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"async-trait",
"auth",
@@ -2134,7 +2145,7 @@ dependencies = [
"snafu 0.8.5",
"stat",
"store-api",
"substrait 0.15.0",
"substrait 0.15.2",
"table",
"temp-env",
"tempfile",
@@ -2181,7 +2192,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
[[package]]
name = "common-base"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"anymap2",
"async-trait",
@@ -2203,11 +2214,11 @@ dependencies = [
[[package]]
name = "common-catalog"
version = "0.15.0"
version = "0.15.2"
[[package]]
name = "common-config"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"common-base",
"common-error",
@@ -2232,7 +2243,7 @@ dependencies = [
[[package]]
name = "common-datasource"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"arrow 54.2.1",
"arrow-schema 54.3.1",
@@ -2269,7 +2280,7 @@ dependencies = [
[[package]]
name = "common-decimal"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"bigdecimal 0.4.8",
"common-error",
@@ -2282,7 +2293,7 @@ dependencies = [
[[package]]
name = "common-error"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"common-macro",
"http 1.1.0",
@@ -2293,7 +2304,7 @@ dependencies = [
[[package]]
name = "common-frontend"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"async-trait",
"common-error",
@@ -2309,7 +2320,7 @@ dependencies = [
[[package]]
name = "common-function"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"ahash 0.8.11",
"api",
@@ -2362,7 +2373,7 @@ dependencies = [
[[package]]
name = "common-greptimedb-telemetry"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"async-trait",
"common-runtime",
@@ -2379,7 +2390,7 @@ dependencies = [
[[package]]
name = "common-grpc"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"api",
"arrow-flight",
@@ -2411,7 +2422,7 @@ dependencies = [
[[package]]
name = "common-grpc-expr"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"api",
"common-base",
@@ -2430,7 +2441,7 @@ dependencies = [
[[package]]
name = "common-macro"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"arc-swap",
"common-query",
@@ -2444,7 +2455,7 @@ dependencies = [
[[package]]
name = "common-mem-prof"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"anyhow",
"common-error",
@@ -2460,7 +2471,7 @@ dependencies = [
[[package]]
name = "common-meta"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"anymap2",
"api",
@@ -2525,7 +2536,7 @@ dependencies = [
[[package]]
name = "common-options"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"common-grpc",
"humantime-serde",
@@ -2534,11 +2545,11 @@ dependencies = [
[[package]]
name = "common-plugins"
version = "0.15.0"
version = "0.15.2"
[[package]]
name = "common-pprof"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"common-error",
"common-macro",
@@ -2550,7 +2561,7 @@ dependencies = [
[[package]]
name = "common-procedure"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"async-stream",
"async-trait",
@@ -2577,7 +2588,7 @@ dependencies = [
[[package]]
name = "common-procedure-test"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"async-trait",
"common-procedure",
@@ -2586,7 +2597,7 @@ dependencies = [
[[package]]
name = "common-query"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"api",
"async-trait",
@@ -2612,7 +2623,7 @@ dependencies = [
[[package]]
name = "common-recordbatch"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"arc-swap",
"common-error",
@@ -2632,7 +2643,7 @@ dependencies = [
[[package]]
name = "common-runtime"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -2662,17 +2673,18 @@ dependencies = [
[[package]]
name = "common-session"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"strum 0.27.1",
]
[[package]]
name = "common-telemetry"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"backtrace",
"common-error",
"common-version",
"console-subscriber",
"greptime-proto",
"humantime-serde",
@@ -2696,7 +2708,7 @@ dependencies = [
[[package]]
name = "common-test-util"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"client",
"common-grpc",
@@ -2709,7 +2721,7 @@ dependencies = [
[[package]]
name = "common-time"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"arrow 54.2.1",
"chrono",
@@ -2727,9 +2739,10 @@ dependencies = [
[[package]]
name = "common-version"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"build-data",
"cargo-manifest",
"const_format",
"serde",
"shadow-rs",
@@ -2737,7 +2750,7 @@ dependencies = [
[[package]]
name = "common-wal"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"common-base",
"common-error",
@@ -2760,7 +2773,7 @@ dependencies = [
[[package]]
name = "common-workload"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"api",
"common-telemetry",
@@ -3716,7 +3729,7 @@ dependencies = [
[[package]]
name = "datanode"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"api",
"arrow-flight",
@@ -3769,7 +3782,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.15.0",
"substrait 0.15.2",
"table",
"tokio",
"toml 0.8.19",
@@ -3778,7 +3791,7 @@ dependencies = [
[[package]]
name = "datatypes"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"arrow 54.2.1",
"arrow-array 54.2.1",
@@ -4438,7 +4451,7 @@ checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6"
[[package]]
name = "file-engine"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"api",
"async-trait",
@@ -4575,7 +4588,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
[[package]]
name = "flow"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"api",
"arrow 54.2.1",
@@ -4640,7 +4653,7 @@ dependencies = [
"sql",
"store-api",
"strum 0.27.1",
"substrait 0.15.0",
"substrait 0.15.2",
"table",
"tokio",
"tonic 0.12.3",
@@ -4695,7 +4708,7 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
[[package]]
name = "frontend"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"api",
"arc-swap",
@@ -4755,7 +4768,7 @@ dependencies = [
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"store-api",
"strfmt",
"substrait 0.15.0",
"substrait 0.15.2",
"table",
"tokio",
"tokio-util",
@@ -5916,7 +5929,7 @@ dependencies = [
[[package]]
name = "index"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"async-trait",
"asynchronous-codec",
@@ -6801,7 +6814,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]]
name = "log-query"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"chrono",
"common-error",
@@ -6813,7 +6826,7 @@ dependencies = [
[[package]]
name = "log-store"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"async-stream",
"async-trait",
@@ -7111,7 +7124,7 @@ dependencies = [
[[package]]
name = "meta-client"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"api",
"async-trait",
@@ -7139,7 +7152,7 @@ dependencies = [
[[package]]
name = "meta-srv"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"api",
"async-trait",
@@ -7230,7 +7243,7 @@ dependencies = [
[[package]]
name = "metric-engine"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"api",
"aquamarine",
@@ -7320,7 +7333,7 @@ dependencies = [
[[package]]
name = "mito-codec"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"api",
"bytes",
@@ -7343,7 +7356,7 @@ dependencies = [
[[package]]
name = "mito2"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"api",
"aquamarine",
@@ -8093,7 +8106,7 @@ dependencies = [
[[package]]
name = "object-store"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"anyhow",
"bytes",
@@ -8407,7 +8420,7 @@ dependencies = [
[[package]]
name = "operator"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"ahash 0.8.11",
"api",
@@ -8462,7 +8475,7 @@ dependencies = [
"sql",
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"store-api",
"substrait 0.15.0",
"substrait 0.15.2",
"table",
"tokio",
"tokio-util",
@@ -8729,7 +8742,7 @@ dependencies = [
[[package]]
name = "partition"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"api",
"async-trait",
@@ -9017,7 +9030,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pipeline"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"ahash 0.8.11",
"api",
@@ -9160,7 +9173,7 @@ dependencies = [
[[package]]
name = "plugins"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"auth",
"clap 4.5.19",
@@ -9473,7 +9486,7 @@ dependencies = [
[[package]]
name = "promql"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"ahash 0.8.11",
"async-trait",
@@ -9755,7 +9768,7 @@ dependencies = [
[[package]]
name = "puffin"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"async-compression 0.4.13",
"async-trait",
@@ -9797,7 +9810,7 @@ dependencies = [
[[package]]
name = "query"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"ahash 0.8.11",
"api",
@@ -9863,7 +9876,7 @@ dependencies = [
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"statrs",
"store-api",
"substrait 0.15.0",
"substrait 0.15.2",
"table",
"tokio",
"tokio-stream",
@@ -11149,7 +11162,7 @@ dependencies = [
[[package]]
name = "servers"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"ahash 0.8.11",
"api",
@@ -11270,7 +11283,7 @@ dependencies = [
[[package]]
name = "session"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"api",
"arc-swap",
@@ -11609,7 +11622,7 @@ dependencies = [
[[package]]
name = "sql"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"api",
"chrono",
@@ -11664,7 +11677,7 @@ dependencies = [
[[package]]
name = "sqlness-runner"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -11964,7 +11977,7 @@ dependencies = [
[[package]]
name = "stat"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"nix 0.30.1",
]
@@ -11990,7 +12003,7 @@ dependencies = [
[[package]]
name = "store-api"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"api",
"aquamarine",
@@ -12151,7 +12164,7 @@ dependencies = [
[[package]]
name = "substrait"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"async-trait",
"bytes",
@@ -12331,7 +12344,7 @@ dependencies = [
[[package]]
name = "table"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"api",
"async-trait",
@@ -12592,7 +12605,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
[[package]]
name = "tests-fuzz"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"arbitrary",
"async-trait",
@@ -12636,7 +12649,7 @@ dependencies = [
[[package]]
name = "tests-integration"
version = "0.15.0"
version = "0.15.2"
dependencies = [
"api",
"arrow-flight",
@@ -12703,7 +12716,7 @@ dependencies = [
"sql",
"sqlx",
"store-api",
"substrait 0.15.0",
"substrait 0.15.2",
"table",
"tempfile",
"time",
@@ -13073,6 +13086,7 @@ version = "0.8.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1ed1f98e3fdc28d6d910e6737ae6ab1a93bf1985935a1193e68f93eeb68d24e"
dependencies = [
"indexmap 2.9.0",
"serde",
"serde_spanned",
"toml_datetime",

View File

@@ -71,7 +71,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.15.0"
version = "0.15.2"
edition = "2021"
license = "Apache-2.0"

View File

@@ -20,11 +20,11 @@ use cmd::error::{InitTlsProviderSnafu, Result};
use cmd::options::GlobalOptions;
use cmd::{cli, datanode, flownode, frontend, metasrv, standalone, App};
use common_base::Plugins;
use common_version::version;
use common_version::{verbose_version, version};
use servers::install_ring_crypto_provider;
#[derive(Parser)]
#[command(name = "greptime", author, version, long_version = version(), about)]
#[command(name = "greptime", author, version, long_version = verbose_version(), about)]
#[command(propagate_version = true)]
pub(crate) struct Command {
#[clap(subcommand)]
@@ -143,10 +143,8 @@ async fn start(cli: Command) -> Result<()> {
}
fn setup_human_panic() {
human_panic::setup_panic!(
human_panic::Metadata::new("GreptimeDB", env!("CARGO_PKG_VERSION"))
.homepage("https://github.com/GreptimeTeam/greptimedb/discussions")
);
human_panic::setup_panic!(human_panic::Metadata::new("GreptimeDB", version())
.homepage("https://github.com/GreptimeTeam/greptimedb/discussions"));
common_telemetry::set_panic_hook();
}

View File

@@ -19,7 +19,7 @@ use catalog::kvbackend::MetaKvBackend;
use common_base::Plugins;
use common_meta::cache::LayeredCacheRegistryBuilder;
use common_telemetry::info;
use common_version::{short_version, version};
use common_version::{short_version, verbose_version};
use datanode::datanode::DatanodeBuilder;
use datanode::service::DatanodeServiceBuilder;
use meta_client::MetaClientType;
@@ -67,7 +67,7 @@ impl InstanceBuilder {
None,
);
log_versions(version(), short_version(), APP_NAME);
log_versions(verbose_version(), short_version(), APP_NAME);
create_resource_limit_metrics(APP_NAME);
plugins::setup_datanode_plugins(plugins, &opts.plugins, dn_opts)

View File

@@ -32,7 +32,7 @@ use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::TableMetadataManager;
use common_telemetry::info;
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
use common_version::{short_version, version};
use common_version::{short_version, verbose_version};
use flow::{
get_flow_auth_options, FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder,
FrontendClient, FrontendInvoker,
@@ -279,7 +279,7 @@ impl StartCommand {
None,
);
log_versions(version(), short_version(), APP_NAME);
log_versions(verbose_version(), short_version(), APP_NAME);
create_resource_limit_metrics(APP_NAME);
info!("Flownode start command: {:#?}", self);

View File

@@ -33,7 +33,7 @@ use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_telemetry::info;
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
use common_time::timezone::set_default_timezone;
use common_version::{short_version, version};
use common_version::{short_version, verbose_version};
use frontend::frontend::Frontend;
use frontend::heartbeat::HeartbeatTask;
use frontend::instance::builder::FrontendBuilder;
@@ -282,7 +282,7 @@ impl StartCommand {
opts.component.slow_query.as_ref(),
);
log_versions(version(), short_version(), APP_NAME);
log_versions(verbose_version(), short_version(), APP_NAME);
create_resource_limit_metrics(APP_NAME);
info!("Frontend start command: {:#?}", self);

View File

@@ -112,7 +112,7 @@ pub trait App: Send {
pub fn log_versions(version: &str, short_version: &str, app: &str) {
// Report app version as gauge.
APP_VERSION
.with_label_values(&[env!("CARGO_PKG_VERSION"), short_version, app])
.with_label_values(&[common_version::version(), short_version, app])
.inc();
// Log version and argument flags.

View File

@@ -22,7 +22,7 @@ use common_base::Plugins;
use common_config::Configurable;
use common_telemetry::info;
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
use common_version::{short_version, version};
use common_version::{short_version, verbose_version};
use meta_srv::bootstrap::MetasrvInstance;
use meta_srv::metasrv::BackendImpl;
use snafu::ResultExt;
@@ -320,7 +320,7 @@ impl StartCommand {
None,
);
log_versions(version(), short_version(), APP_NAME);
log_versions(verbose_version(), short_version(), APP_NAME);
create_resource_limit_metrics(APP_NAME);
info!("Metasrv start command: {:#?}", self);

View File

@@ -51,7 +51,7 @@ use common_telemetry::logging::{
LoggingOptions, SlowQueryOptions, TracingOptions, DEFAULT_LOGGING_DIR,
};
use common_time::timezone::set_default_timezone;
use common_version::{short_version, version};
use common_version::{short_version, verbose_version};
use common_wal::config::DatanodeWalConfig;
use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig};
use datanode::datanode::{Datanode, DatanodeBuilder};
@@ -466,7 +466,7 @@ impl StartCommand {
opts.component.slow_query.as_ref(),
);
log_versions(version(), short_version(), APP_NAME);
log_versions(verbose_version(), short_version(), APP_NAME);
create_resource_limit_metrics(APP_NAME);
info!("Standalone start command: {:#?}", self);

View File

@@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use std::sync::Arc;
use std::{env, fmt};
use common_query::error::Result;
use common_query::prelude::{Signature, Volatility};
@@ -47,7 +47,7 @@ impl Function for PGVersionFunction {
fn eval(&self, _func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
let result = StringVector::from(vec![format!(
"PostgreSQL 16.3 GreptimeDB {}",
env!("CARGO_PKG_VERSION")
common_version::version()
)]);
Ok(Arc::new(result))
}

View File

@@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use std::sync::Arc;
use std::{env, fmt};
use common_query::error::Result;
use common_query::prelude::{Signature, Volatility};
@@ -52,13 +52,13 @@ impl Function for VersionFunction {
"{}-greptimedb-{}",
std::env::var("GREPTIMEDB_MYSQL_SERVER_VERSION")
.unwrap_or_else(|_| "8.4.2".to_string()),
env!("CARGO_PKG_VERSION")
common_version::version()
)
}
Channel::Postgres => {
format!("16.3-greptimedb-{}", env!("CARGO_PKG_VERSION"))
format!("16.3-greptimedb-{}", common_version::version())
}
_ => env!("CARGO_PKG_VERSION").to_string(),
_ => common_version::version().to_string(),
};
let result = StringVector::from(vec![version]);
Ok(Arc::new(result))

View File

@@ -14,6 +14,7 @@ workspace = true
[dependencies]
backtrace = "0.3"
common-error.workspace = true
common-version.workspace = true
console-subscriber = { version = "0.1", optional = true }
greptime-proto.workspace = true
humantime-serde.workspace = true

View File

@@ -384,7 +384,7 @@ pub fn init_global_logging(
resource::SERVICE_INSTANCE_ID,
node_id.unwrap_or("none".to_string()),
),
KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
KeyValue::new(resource::SERVICE_VERSION, common_version::version()),
KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
]));

View File

@@ -17,4 +17,5 @@ shadow-rs.workspace = true
[build-dependencies]
build-data = "0.2"
cargo-manifest = "0.19"
shadow-rs.workspace = true

View File

@@ -14,8 +14,10 @@
use std::collections::BTreeSet;
use std::env;
use std::path::PathBuf;
use build_data::{format_timestamp, get_source_time};
use cargo_manifest::Manifest;
use shadow_rs::{BuildPattern, ShadowBuilder, CARGO_METADATA, CARGO_TREE};
fn main() -> shadow_rs::SdResult<()> {
@@ -33,6 +35,24 @@ fn main() -> shadow_rs::SdResult<()> {
// solve the problem where the "CARGO_MANIFEST_DIR" is not what we want when this repo is
// made as a submodule in another repo.
let src_path = env::var("CARGO_WORKSPACE_DIR").or_else(|_| env::var("CARGO_MANIFEST_DIR"))?;
let manifest = Manifest::from_path(PathBuf::from(&src_path).join("Cargo.toml"))
.expect("Failed to parse Cargo.toml");
if let Some(product_version) = manifest.workspace.as_ref().and_then(|w| {
w.metadata.as_ref().and_then(|m| {
m.get("greptime")
.and_then(|g| g.get("product_version").and_then(|v| v.as_str()))
})
}) {
println!(
"cargo:rustc-env=GREPTIME_PRODUCT_VERSION={}",
product_version
);
} else {
let version = env::var("CARGO_PKG_VERSION").unwrap();
println!("cargo:rustc-env=GREPTIME_PRODUCT_VERSION={}", version,);
}
let out_path = env::var("OUT_DIR")?;
let _ = ShadowBuilder::builder()

View File

@@ -105,13 +105,17 @@ pub const fn build_info() -> BuildInfo {
build_time: env!("BUILD_TIMESTAMP"),
rustc: build::RUST_VERSION,
target: build::BUILD_TARGET,
version: build::PKG_VERSION,
version: env!("GREPTIME_PRODUCT_VERSION"),
}
}
const BUILD_INFO: BuildInfo = build_info();
pub const fn version() -> &'static str {
BUILD_INFO.version
}
pub const fn verbose_version() -> &'static str {
const_format::formatcp!(
"\nbranch: {}\ncommit: {}\nclean: {}\nversion: {}",
BUILD_INFO.branch,

View File

@@ -27,14 +27,14 @@ lazy_static! {
pub static ref HANDLE_REGION_REQUEST_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_datanode_handle_region_request_elapsed",
"datanode handle region request elapsed",
&[REGION_ID, REGION_REQUEST_TYPE]
&[REGION_REQUEST_TYPE]
)
.unwrap();
/// The number of rows in region request received by region server, labeled with request type.
pub static ref REGION_CHANGED_ROW_COUNT: IntCounterVec = register_int_counter_vec!(
"greptime_datanode_region_changed_row_count",
"datanode region changed row count",
&[REGION_ID, REGION_REQUEST_TYPE]
&[REGION_REQUEST_TYPE]
)
.unwrap();
/// The elapsed time since the last received heartbeat.

View File

@@ -915,9 +915,8 @@ impl RegionServerInner {
request: RegionRequest,
) -> Result<RegionResponse> {
let request_type = request.request_type();
let region_id_str = region_id.to_string();
let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
.with_label_values(&[&region_id_str, request_type])
.with_label_values(&[request_type])
.start_timer();
let region_change = match &request {
@@ -957,7 +956,7 @@ impl RegionServerInner {
// Update metrics
if matches!(region_change, RegionChange::Ingest) {
crate::metrics::REGION_CHANGED_ROW_COUNT
.with_label_values(&[&region_id_str, request_type])
.with_label_values(&[request_type])
.inc_by(result.affected_rows as u64);
}
// Sets corresponding region status to ready.

View File

@@ -527,7 +527,7 @@ pub struct FulltextOptions {
#[serde(default = "fulltext_options_default_granularity")]
pub granularity: u32,
/// The false positive rate of the fulltext index (for bloom backend only)
#[serde(default = "fulltext_options_default_false_positive_rate_in_10000")]
#[serde(default = "index_options_default_false_positive_rate_in_10000")]
pub false_positive_rate_in_10000: u32,
}
@@ -535,7 +535,7 @@ fn fulltext_options_default_granularity() -> u32 {
DEFAULT_GRANULARITY
}
fn fulltext_options_default_false_positive_rate_in_10000() -> u32 {
fn index_options_default_false_positive_rate_in_10000() -> u32 {
(DEFAULT_FALSE_POSITIVE_RATE * 10000.0) as u32
}
@@ -773,6 +773,7 @@ pub struct SkippingIndexOptions {
/// The granularity of the skip index.
pub granularity: u32,
/// The false positive rate of the skip index (in ten-thousandths, e.g., 100 = 1%).
#[serde(default = "index_options_default_false_positive_rate_in_10000")]
pub false_positive_rate_in_10000: u32,
/// The type of the skip index.
#[serde(default)]
@@ -1179,4 +1180,59 @@ mod tests {
assert!(column_schema.default_constraint.is_none());
assert!(column_schema.metadata.is_empty());
}
#[test]
fn test_skipping_index_options_deserialization() {
let original_options = "{\"granularity\":1024,\"false-positive-rate-in-10000\":10,\"index-type\":\"BloomFilter\"}";
let options = serde_json::from_str::<SkippingIndexOptions>(original_options).unwrap();
assert_eq!(1024, options.granularity);
assert_eq!(SkippingIndexType::BloomFilter, options.index_type);
assert_eq!(0.001, options.false_positive_rate());
let options_str = serde_json::to_string(&options).unwrap();
assert_eq!(options_str, original_options);
}
#[test]
fn test_skipping_index_options_deserialization_v0_14_to_v0_15() {
let options = "{\"granularity\":10240,\"index-type\":\"BloomFilter\"}";
let options = serde_json::from_str::<SkippingIndexOptions>(options).unwrap();
assert_eq!(10240, options.granularity);
assert_eq!(SkippingIndexType::BloomFilter, options.index_type);
assert_eq!(DEFAULT_FALSE_POSITIVE_RATE, options.false_positive_rate());
let options_str = serde_json::to_string(&options).unwrap();
assert_eq!(options_str, "{\"granularity\":10240,\"false-positive-rate-in-10000\":100,\"index-type\":\"BloomFilter\"}");
}
#[test]
fn test_fulltext_options_deserialization() {
let original_options = "{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false,\"backend\":\"bloom\",\"granularity\":1024,\"false-positive-rate-in-10000\":10}";
let options = serde_json::from_str::<FulltextOptions>(original_options).unwrap();
assert!(!options.case_sensitive);
assert!(options.enable);
assert_eq!(FulltextBackend::Bloom, options.backend);
assert_eq!(FulltextAnalyzer::default(), options.analyzer);
assert_eq!(1024, options.granularity);
assert_eq!(0.001, options.false_positive_rate());
let options_str = serde_json::to_string(&options).unwrap();
assert_eq!(options_str, original_options);
}
#[test]
fn test_fulltext_options_deserialization_v0_14_to_v0_15() {
// 0.14 to 0.15
let options = "{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false,\"backend\":\"bloom\"}";
let options = serde_json::from_str::<FulltextOptions>(options).unwrap();
assert!(!options.case_sensitive);
assert!(options.enable);
assert_eq!(FulltextBackend::Bloom, options.backend);
assert_eq!(FulltextAnalyzer::default(), options.analyzer);
assert_eq!(DEFAULT_GRANULARITY, options.granularity);
assert_eq!(DEFAULT_FALSE_POSITIVE_RATE, options.false_positive_rate());
let options_str = serde_json::to_string(&options).unwrap();
assert_eq!(options_str, "{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false,\"backend\":\"bloom\",\"granularity\":10240,\"false-positive-rate-in-10000\":100}");
}
}

View File

@@ -12,6 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use arrow_array::{
ArrayRef, PrimitiveArray, TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray, TimestampSecondArray,
};
use arrow_schema::DataType;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use paste::paste;
@@ -138,6 +143,41 @@ define_timestamp_with_unit!(Millisecond);
define_timestamp_with_unit!(Microsecond);
define_timestamp_with_unit!(Nanosecond);
pub fn timestamp_array_to_primitive(
ts_array: &ArrayRef,
) -> Option<(
PrimitiveArray<arrow_array::types::Int64Type>,
arrow::datatypes::TimeUnit,
)> {
let DataType::Timestamp(unit, _) = ts_array.data_type() else {
return None;
};
let ts_primitive = match unit {
arrow_schema::TimeUnit::Second => ts_array
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap()
.reinterpret_cast::<arrow_array::types::Int64Type>(),
arrow_schema::TimeUnit::Millisecond => ts_array
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.reinterpret_cast::<arrow_array::types::Int64Type>(),
arrow_schema::TimeUnit::Microsecond => ts_array
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap()
.reinterpret_cast::<arrow_array::types::Int64Type>(),
arrow_schema::TimeUnit::Nanosecond => ts_array
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap()
.reinterpret_cast::<arrow_array::types::Int64Type>(),
};
Some((ts_primitive, *unit))
}
#[cfg(test)]
mod tests {
use common_time::timezone::set_default_timezone;

View File

@@ -380,6 +380,13 @@ impl SqlQueryHandler for Instance {
.and_then(|stmts| query_interceptor.post_parsing(stmts, query_ctx.clone()))
{
Ok(stmts) => {
if stmts.is_empty() {
return vec![InvalidSqlSnafu {
err_msg: "empty statements",
}
.fail()];
}
let mut results = Vec::with_capacity(stmts.len());
for stmt in stmts {
if let Err(e) = checker

View File

@@ -473,8 +473,9 @@ struct MetricEngineInner {
mod test {
use std::collections::HashMap;
use common_telemetry::info;
use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
use store_api::region_request::{RegionCloseRequest, RegionOpenRequest};
use store_api::region_request::{RegionCloseRequest, RegionFlushRequest, RegionOpenRequest};
use super::*;
use crate::test_util::TestEnv;
@@ -559,4 +560,90 @@ mod test {
assert!(env.metric().region_statistic(logical_region_id).is_none());
assert!(env.metric().region_statistic(physical_region_id).is_some());
}
#[tokio::test]
async fn test_open_region_failure() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let physical_region_id = env.default_physical_region_id();
let metric_engine = env.metric();
metric_engine
.handle_request(
physical_region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)
.await
.unwrap();
let path = format!("{}/metadata/", env.default_region_dir());
let object_store = env.get_object_store().unwrap();
let list = object_store.list(&path).await.unwrap();
// Delete parquet files in metadata region
for entry in list {
if entry.metadata().is_dir() {
continue;
}
if entry.name().ends_with("parquet") {
info!("deleting {}", entry.path());
object_store.delete(entry.path()).await.unwrap();
}
}
let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
.into_iter()
.collect();
let open_request = RegionOpenRequest {
engine: METRIC_ENGINE_NAME.to_string(),
region_dir: env.default_region_dir(),
options: physical_region_option,
skip_wal_replay: false,
};
// Opening an already opened region should succeed.
// Since the region is already open, no metadata recovery operations will be performed.
metric_engine
.handle_request(physical_region_id, RegionRequest::Open(open_request))
.await
.unwrap();
// Close the region
metric_engine
.handle_request(
physical_region_id,
RegionRequest::Close(RegionCloseRequest {}),
)
.await
.unwrap();
// Try to reopen region.
let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
.into_iter()
.collect();
let open_request = RegionOpenRequest {
engine: METRIC_ENGINE_NAME.to_string(),
region_dir: env.default_region_dir(),
options: physical_region_option,
skip_wal_replay: false,
};
let err = metric_engine
.handle_request(physical_region_id, RegionRequest::Open(open_request))
.await
.unwrap_err();
// Failed to open region because of missing parquet files.
assert_eq!(err.status_code(), StatusCode::StorageUnavailable);
let mito_engine = metric_engine.mito();
let data_region_id = utils::to_data_region_id(physical_region_id);
let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
// The metadata/data region should be closed.
let err = mito_engine.get_metadata(data_region_id).await.unwrap_err();
assert_eq!(err.status_code(), StatusCode::RegionNotFound);
let err = mito_engine
.get_metadata(metadata_region_id)
.await
.unwrap_err();
assert_eq!(err.status_code(), StatusCode::RegionNotFound);
}
}

View File

@@ -59,7 +59,7 @@ impl MetricEngineInner {
}
}
async fn close_physical_region(&self, region_id: RegionId) -> Result<AffectedRows> {
pub(crate) async fn close_physical_region(&self, region_id: RegionId) -> Result<AffectedRows> {
let data_region_id = utils::to_data_region_id(region_id);
let metadata_region_id = utils::to_metadata_region_id(region_id);

View File

@@ -17,7 +17,7 @@
use api::region::RegionResponse;
use api::v1::SemanticType;
use common_error::ext::BoxedError;
use common_telemetry::info;
use common_telemetry::{error, info, warn};
use datafusion::common::HashMap;
use mito2::engine::MITO_ENGINE_NAME;
use object_store::util::join_dir;
@@ -94,6 +94,21 @@ impl MetricEngineInner {
Ok(responses)
}
// If the metadata region is opened with a stale manifest,
// the metric engine may fail to recover logical tables from the metadata region,
// as the manifest could reference files that have already been deleted
// due to compaction operations performed by the region leader.
async fn close_physical_region_on_recovery_failure(&self, physical_region_id: RegionId) {
info!(
"Closing metadata region {} and data region {} on metadata recovery failure",
utils::to_metadata_region_id(physical_region_id),
utils::to_data_region_id(physical_region_id)
);
if let Err(err) = self.close_physical_region(physical_region_id).await {
error!(err; "Failed to close physical region {}", physical_region_id);
}
}
async fn open_physical_region_with_results(
&self,
metadata_region_result: Option<std::result::Result<RegionResponse, BoxedError>>,
@@ -119,8 +134,14 @@ impl MetricEngineInner {
region_type: "data",
})?;
self.recover_states(physical_region_id, physical_region_options)
.await?;
if let Err(err) = self
.recover_states(physical_region_id, physical_region_options)
.await
{
self.close_physical_region_on_recovery_failure(physical_region_id)
.await;
return Err(err);
}
Ok(data_region_response)
}
@@ -139,11 +160,31 @@ impl MetricEngineInner {
request: RegionOpenRequest,
) -> Result<AffectedRows> {
if request.is_physical_table() {
if self
.state
.read()
.unwrap()
.physical_region_states()
.get(&region_id)
.is_some()
{
warn!(
"The physical region {} is already open, ignore the open request",
region_id
);
return Ok(0);
}
// open physical region and recover states
let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
self.open_physical_region(region_id, request).await?;
self.recover_states(region_id, physical_region_options)
.await?;
if let Err(err) = self
.recover_states(region_id, physical_region_options)
.await
{
self.close_physical_region_on_recovery_failure(region_id)
.await;
return Err(err);
}
Ok(0)
} else {

View File

@@ -23,6 +23,7 @@ use mito2::config::MitoConfig;
use mito2::engine::MitoEngine;
use mito2::test_util::TestEnv as MitoTestEnv;
use object_store::util::join_dir;
use object_store::ObjectStore;
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::{
LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
@@ -74,6 +75,10 @@ impl TestEnv {
join_dir(&env_root, "data")
}
pub fn get_object_store(&self) -> Option<ObjectStore> {
self.mito_env.get_object_store()
}
/// Returns a reference to the engine.
pub fn mito(&self) -> MitoEngine {
self.mito.clone()

View File

@@ -1020,6 +1020,18 @@ pub enum Error {
location: Location,
source: mito_codec::error::Error,
},
#[snafu(display(
"Inconsistent timestamp column length, expect: {}, actual: {}",
expected,
actual
))]
InconsistentTimestampLength {
expected: usize,
actual: usize,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -1175,6 +1187,8 @@ impl ErrorExt for Error {
ConvertBulkWalEntry { source, .. } => source.status_code(),
Encode { source, .. } | Decode { source, .. } => source.status_code(),
InconsistentTimestampLength { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -15,15 +15,11 @@
//! Handles bulk insert requests.
use datatypes::arrow;
use datatypes::arrow::array::{
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray,
};
use datatypes::arrow::datatypes::{DataType, TimeUnit};
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataRef;
use store_api::region_request::RegionBulkInsertsRequest;
use crate::error::InconsistentTimestampLengthSnafu;
use crate::memtable::bulk::part::BulkPart;
use crate::request::{OptionOutputTx, SenderBulkRequest};
use crate::worker::RegionWorkerLoop;
@@ -41,6 +37,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.with_label_values(&["process_bulk_req"])
.start_timer();
let batch = request.payload;
if batch.num_rows() == 0 {
sender.send(Ok(0));
return;
}
let Some((ts_index, ts)) = batch
.schema()
@@ -60,55 +60,23 @@ impl<S: LogStore> RegionWorkerLoop<S> {
return;
};
let DataType::Timestamp(unit, _) = ts.data_type() else {
// safety: ts data type must be a timestamp type.
unreachable!()
};
if batch.num_rows() != ts.len() {
sender.send(
InconsistentTimestampLengthSnafu {
expected: batch.num_rows(),
actual: ts.len(),
}
.fail(),
);
return;
}
let (min_ts, max_ts) = match unit {
TimeUnit::Second => {
let ts = ts.as_any().downcast_ref::<TimestampSecondArray>().unwrap();
(
//safety: ts array must contain at least one row so this won't return None.
arrow::compute::min(ts).unwrap(),
arrow::compute::max(ts).unwrap(),
)
}
// safety: ts data type must be a timestamp type.
let (ts_primitive, _) = datatypes::timestamp::timestamp_array_to_primitive(ts).unwrap();
TimeUnit::Millisecond => {
let ts = ts
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
(
//safety: ts array must contain at least one row so this won't return None.
arrow::compute::min(ts).unwrap(),
arrow::compute::max(ts).unwrap(),
)
}
TimeUnit::Microsecond => {
let ts = ts
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap();
(
//safety: ts array must contain at least one row so this won't return None.
arrow::compute::min(ts).unwrap(),
arrow::compute::max(ts).unwrap(),
)
}
TimeUnit::Nanosecond => {
let ts = ts
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap();
(
//safety: ts array must contain at least one row so this won't return None.
arrow::compute::min(ts).unwrap(),
arrow::compute::max(ts).unwrap(),
)
}
};
// safety: we've checked ts.len() == batch.num_rows() and batch is not empty
let min_ts = arrow::compute::min(&ts_primitive).unwrap();
let max_ts = arrow::compute::max(&ts_primitive).unwrap();
let part = BulkPart {
batch,

View File

@@ -20,11 +20,7 @@ use api::v1::region::{
bulk_insert_request, region_request, BulkInsertRequest, RegionRequest, RegionRequestHeader,
};
use api::v1::ArrowIpc;
use arrow::array::{
Array, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray,
};
use arrow::datatypes::{DataType, Int64Type, TimeUnit};
use arrow::array::Array;
use arrow::record_batch::RecordBatch;
use common_base::AffectedRows;
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
@@ -62,6 +58,10 @@ impl Inserter {
};
decode_timer.observe_duration();
if record_batch.num_rows() == 0 {
return Ok(0);
}
// notify flownode to update dirty timestamps if flow is configured.
self.maybe_update_flow_dirty_window(table_info, record_batch.clone());
@@ -155,6 +155,9 @@ impl Inserter {
let mut raw_data_bytes = None;
for (peer, masks) in mask_per_datanode {
for (region_id, mask) in masks {
if mask.select_none() {
continue;
}
let rb = record_batch.clone();
let schema_bytes = schema_bytes.clone();
let node_manager = self.node_manager.clone();
@@ -304,32 +307,11 @@ fn extract_timestamps(rb: &RecordBatch, timestamp_index_name: &str) -> error::Re
if rb.num_rows() == 0 {
return Ok(vec![]);
}
let primitive = match ts_col.data_type() {
DataType::Timestamp(unit, _) => match unit {
TimeUnit::Second => ts_col
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
TimeUnit::Millisecond => ts_col
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
TimeUnit::Microsecond => ts_col
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
TimeUnit::Nanosecond => ts_col
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
},
t => {
return error::InvalidTimeIndexTypeSnafu { ty: t.clone() }.fail();
}
};
let (primitive, _) =
datatypes::timestamp::timestamp_array_to_primitive(ts_col).with_context(|| {
error::InvalidTimeIndexTypeSnafu {
ty: ts_col.data_type().clone(),
}
})?;
Ok(primitive.iter().flatten().collect())
}

View File

@@ -333,9 +333,9 @@ impl Pipeline {
table_suffix,
}));
}
// continue v2 process, check ts column and set the rest fields with auto-transform
// continue v2 process, and set the rest fields with auto-transform
// if transformer presents, then ts has been set
values_to_row(schema_info, val, pipeline_ctx, Some(values))?
values_to_row(schema_info, val, pipeline_ctx, Some(values), false)?
}
TransformerMode::AutoTransform(ts_name, time_unit) => {
// infer ts from the context
@@ -347,7 +347,7 @@ impl Pipeline {
));
let n_ctx =
PipelineContext::new(&def, pipeline_ctx.pipeline_param, pipeline_ctx.channel);
values_to_row(schema_info, val, &n_ctx, None)?
values_to_row(schema_info, val, &n_ctx, None, true)?
}
};

View File

@@ -420,15 +420,17 @@ pub(crate) fn values_to_row(
values: Value,
pipeline_ctx: &PipelineContext<'_>,
row: Option<Vec<GreptimeValue>>,
need_calc_ts: bool,
) -> Result<Row> {
let mut row: Vec<GreptimeValue> =
row.unwrap_or_else(|| Vec::with_capacity(schema_info.schema.len()));
let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
// calculate timestamp value based on the channel
let ts = calc_ts(pipeline_ctx, &values)?;
row.push(GreptimeValue { value_data: ts });
if need_calc_ts {
// calculate timestamp value based on the channel
let ts = calc_ts(pipeline_ctx, &values)?;
row.push(GreptimeValue { value_data: ts });
}
row.resize(schema_info.schema.len(), GreptimeValue { value_data: None });
@@ -608,7 +610,7 @@ fn identity_pipeline_inner(
skip_error
);
let row = unwrap_or_continue_if_err!(
values_to_row(&mut schema_info, pipeline_map, pipeline_ctx, None),
values_to_row(&mut schema_info, pipeline_map, pipeline_ctx, None, true),
skip_error
);

View File

@@ -340,7 +340,14 @@ impl ExecutionPlan for RangeManipulateExec {
}
fn required_input_distribution(&self) -> Vec<Distribution> {
self.input.required_input_distribution()
let input_requirement = self.input.required_input_distribution();
if input_requirement.is_empty() {
// if the input is EmptyMetric, its required_input_distribution() is empty so we can't
// use its input distribution.
vec![Distribution::UnspecifiedDistribution]
} else {
input_requirement
}
}
fn with_new_children(

View File

@@ -155,7 +155,23 @@ struct PlanRewriter {
/// Partition columns of the table in current pass
partition_cols: Option<Vec<String>>,
column_requirements: HashSet<Column>,
/// Whether to expand on next call
/// This is used to handle the case where a plan is transformed, but need to be expanded from it's
/// parent node. For example a Aggregate plan is split into two parts in frontend and datanode, and need
/// to be expanded from the parent node of the Aggregate plan.
expand_on_next_call: bool,
/// Expanding on next partial/conditional/transformed commutative plan
/// This is used to handle the case where a plan is transformed, but still
/// need to push down as many node as possible before next partial/conditional/transformed commutative
/// plan. I.e.
/// ```
/// Limit:
/// Sort:
/// ```
/// where `Limit` is partial commutative, and `Sort` is conditional commutative.
/// In this case, we need to expand the `Limit` plan,
/// so that we can push down the `Sort` plan as much as possible.
expand_on_next_part_cond_trans_commutative: bool,
new_child_plan: Option<LogicalPlan>,
}
@@ -177,15 +193,38 @@ impl PlanRewriter {
{
return true;
}
if self.expand_on_next_call {
self.expand_on_next_call = false;
return true;
}
if self.expand_on_next_part_cond_trans_commutative {
let comm = Categorizer::check_plan(plan, self.partition_cols.clone());
match comm {
Commutativity::PartialCommutative => {
// a small difference is that for partial commutative, we still need to
// expand on next call(so `Limit` can be pushed down)
self.expand_on_next_part_cond_trans_commutative = false;
self.expand_on_next_call = true;
}
Commutativity::ConditionalCommutative(_)
| Commutativity::TransformedCommutative { .. } => {
// for conditional commutative and transformed commutative, we can
// expand now
self.expand_on_next_part_cond_trans_commutative = false;
return true;
}
_ => (),
}
}
match Categorizer::check_plan(plan, self.partition_cols.clone()) {
Commutativity::Commutative => {}
Commutativity::PartialCommutative => {
if let Some(plan) = partial_commutative_transformer(plan) {
self.update_column_requirements(&plan);
self.expand_on_next_part_cond_trans_commutative = true;
self.stage.push(plan)
}
}
@@ -194,6 +233,7 @@ impl PlanRewriter {
&& let Some(plan) = transformer(plan)
{
self.update_column_requirements(&plan);
self.expand_on_next_part_cond_trans_commutative = true;
self.stage.push(plan)
}
}
@@ -202,7 +242,7 @@ impl PlanRewriter {
&& let Some(transformer_actions) = transformer(plan)
{
debug!(
"PlanRewriter: transformed plan: {:#?}\n from {plan}",
"PlanRewriter: transformed plan: {:?}\n from {plan}",
transformer_actions.extra_parent_plans
);
if let Some(last_stage) = transformer_actions.extra_parent_plans.last() {
@@ -226,6 +266,10 @@ impl PlanRewriter {
}
fn update_column_requirements(&mut self, plan: &LogicalPlan) {
debug!(
"PlanRewriter: update column requirements for plan: {plan}\n withcolumn_requirements: {:?}",
self.column_requirements
);
let mut container = HashSet::new();
for expr in plan.expressions() {
// this method won't fail
@@ -235,6 +279,10 @@ impl PlanRewriter {
for col in container {
self.column_requirements.insert(col);
}
debug!(
"PlanRewriter: updated column requirements: {:?}",
self.column_requirements
);
}
fn is_expanded(&self) -> bool {

View File

@@ -181,6 +181,15 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
is_batch_coalesced = true;
}
// only a very limited set of plans can exist between region scan and sort exec
// other plans might make this optimize wrong, so be safe here by limiting it
if !(plan.as_any().is::<ProjectionExec>()
|| plan.as_any().is::<FilterExec>()
|| plan.as_any().is::<CoalesceBatchesExec>())
{
partition_ranges = None;
}
// TODO(discord9): do this in logical plan instead as it's lessy bugy there
// Collects alias of the time index column.
if let Some(projection) = plan.as_any().downcast_ref::<ProjectionExec>() {
@@ -194,6 +203,14 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
}
if let Some(region_scan_exec) = plan.as_any().downcast_ref::<RegionScanExec>() {
// `PerSeries` distribution is not supported in windowed sort.
if region_scan_exec.distribution()
== Some(store_api::storage::TimeSeriesDistribution::PerSeries)
{
partition_ranges = None;
return Ok(Transformed::no(plan));
}
partition_ranges = Some(region_scan_exec.get_uncollapsed_partition_ranges());
// Reset time index column.
time_index = HashSet::from([region_scan_exec.time_index()]);

View File

@@ -96,9 +96,10 @@ impl PartSortExec {
if partition >= self.partition_ranges.len() {
internal_err!(
"Partition index out of range: {} >= {}",
"Partition index out of range: {} >= {} at {}",
partition,
self.partition_ranges.len()
self.partition_ranges.len(),
snafu::location!()
)?;
}
@@ -322,9 +323,10 @@ impl PartSortStream {
) -> datafusion_common::Result<()> {
if self.cur_part_idx >= self.partition_ranges.len() {
internal_err!(
"Partition index out of range: {} >= {}",
"Partition index out of range: {} >= {} at {}",
self.cur_part_idx,
self.partition_ranges.len()
self.partition_ranges.len(),
snafu::location!()
)?;
}
let cur_range = self.partition_ranges[self.cur_part_idx];
@@ -355,9 +357,10 @@ impl PartSortStream {
// check if the current partition index is out of range
if self.cur_part_idx >= self.partition_ranges.len() {
internal_err!(
"Partition index out of range: {} >= {}",
"Partition index out of range: {} >= {} at {}",
self.cur_part_idx,
self.partition_ranges.len()
self.partition_ranges.len(),
snafu::location!()
)?;
}
let cur_range = self.partition_ranges[self.cur_part_idx];

View File

@@ -716,17 +716,19 @@ impl PromPlanner {
..
} = vs;
let matchers = self.preprocess_label_matchers(matchers, name)?;
if let Some(empty_plan) = self.setup_context().await? {
return Ok(empty_plan);
}
ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
let range_ms = range.as_millis() as _;
self.ctx.range = Some(range_ms);
let normalize = self
.selector_to_series_normalize_plan(offset, matchers, true)
.await?;
// Some functions like rate may require special fields in the RangeManipulate plan
// so we can't skip RangeManipulate.
let normalize = match self.setup_context().await? {
Some(empty_plan) => empty_plan,
None => {
self.selector_to_series_normalize_plan(offset, matchers, true)
.await?
}
};
let manipulate = RangeManipulate::new(
self.ctx.start,
self.ctx.end,

View File

@@ -13,7 +13,7 @@
// limitations under the License.
//! prom supply the prometheus HTTP API Server compliance
use std::collections::{HashMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;
use axum::extract::{Path, Query, State};
@@ -62,7 +62,7 @@ use crate::prometheus_handler::PrometheusHandlerRef;
/// For [ValueType::Vector] result type
#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
pub struct PromSeriesVector {
pub metric: HashMap<String, String>,
pub metric: BTreeMap<String, String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub value: Option<(f64, String)>,
}
@@ -70,7 +70,7 @@ pub struct PromSeriesVector {
/// For [ValueType::Matrix] result type
#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
pub struct PromSeriesMatrix {
pub metric: HashMap<String, String>,
pub metric: BTreeMap<String, String>,
pub values: Vec<(f64, String)>,
}

View File

@@ -13,7 +13,8 @@
// limitations under the License.
//! prom supply the prometheus HTTP API Server compliance
use std::collections::HashMap;
use std::cmp::Ordering;
use std::collections::{BTreeMap, HashMap};
use axum::http::HeaderValue;
use axum::response::{IntoResponse, Response};
@@ -311,7 +312,7 @@ impl PrometheusJsonResponse {
let metric = tags
.into_iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect::<HashMap<_, _>>();
.collect::<BTreeMap<_, _>>();
match result {
PromQueryResult::Vector(ref mut v) => {
v.push(PromSeriesVector {
@@ -320,6 +321,11 @@ impl PrometheusJsonResponse {
});
}
PromQueryResult::Matrix(ref mut v) => {
// sort values by timestamp
if !values.is_sorted_by(|a, b| a.0 <= b.0) {
values.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(Ordering::Equal));
}
v.push(PromSeriesMatrix { metric, values });
}
PromQueryResult::Scalar(ref mut v) => {
@@ -331,6 +337,12 @@ impl PrometheusJsonResponse {
}
});
// sort matrix by metric
// see: https://prometheus.io/docs/prometheus/3.5/querying/api/#range-vectors
if let PromQueryResult::Matrix(ref mut v) = result {
v.sort_by(|a, b| a.metric.cmp(&b.metric));
}
let result_type_string = result_type.to_string();
let data = PrometheusResponse::PromData(PromData {
result_type: result_type_string,

View File

@@ -49,7 +49,7 @@ pub(crate) struct GreptimeDBStartupParameters {
impl GreptimeDBStartupParameters {
fn new() -> GreptimeDBStartupParameters {
GreptimeDBStartupParameters {
version: format!("16.3-greptimedb-{}", env!("CARGO_PKG_VERSION")),
version: format!("16.3-greptimedb-{}", common_version::version()),
}
}
}

View File

@@ -412,6 +412,10 @@ impl PromSeriesProcessor {
let one_sample = series.samples.len() == 1;
for s in series.samples.iter() {
// skip NaN value
if s.value.is_nan() {
continue;
}
let timestamp = s.timestamp;
pipeline_map.insert(GREPTIME_TIMESTAMP.to_string(), Value::Int64(timestamp));
pipeline_map.insert(GREPTIME_VALUE.to_string(), Value::Float64(s.value));

View File

@@ -95,6 +95,18 @@ pub enum Error {
location: Location,
},
#[snafu(display(
"Not allowed to remove partition column {} from table {}",
column_name,
table_name
))]
RemovePartitionColumn {
column_name: String,
table_name: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Failed to build column descriptor for table: {}, column: {}",
table_name,
@@ -193,6 +205,7 @@ impl ErrorExt for Error {
StatusCode::EngineExecuteQuery
}
Error::RemoveColumnInIndex { .. }
| Error::RemovePartitionColumn { .. }
| Error::BuildColumnDescriptor { .. }
| Error::InvalidAlterRequest { .. } => StatusCode::InvalidArguments,
Error::CastDefaultValue { source, .. } => source.status_code(),

View File

@@ -645,10 +645,19 @@ impl TableMeta {
msg: format!("Table {table_name} cannot add new columns {column_names:?}"),
})?;
let partition_key_indices = self
.partition_key_indices
.iter()
.map(|idx| table_schema.column_name_by_index(*idx))
// This unwrap is safe since we only add new columns.
.map(|name| new_schema.column_index_by_name(name).unwrap())
.collect();
// value_indices would be generated automatically.
let _ = meta_builder
.schema(Arc::new(new_schema))
.primary_key_indices(primary_key_indices);
.primary_key_indices(primary_key_indices)
.partition_key_indices(partition_key_indices);
Ok(meta_builder)
}
@@ -676,6 +685,14 @@ impl TableMeta {
}
);
ensure!(
!self.partition_key_indices.contains(&index),
error::RemovePartitionColumnSnafu {
column_name: *column_name,
table_name,
}
);
if let Some(ts_index) = timestamp_index {
// Not allowed to remove column in timestamp index.
ensure!(
@@ -725,9 +742,18 @@ impl TableMeta {
.map(|name| new_schema.column_index_by_name(name).unwrap())
.collect();
let partition_key_indices = self
.partition_key_indices
.iter()
.map(|idx| table_schema.column_name_by_index(*idx))
// This unwrap is safe since we don't allow removing a partition key column.
.map(|name| new_schema.column_index_by_name(name).unwrap())
.collect();
let _ = meta_builder
.schema(Arc::new(new_schema))
.primary_key_indices(primary_key_indices);
.primary_key_indices(primary_key_indices)
.partition_key_indices(partition_key_indices);
Ok(meta_builder)
}
@@ -1300,6 +1326,8 @@ fn unset_column_skipping_index_options(
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use datatypes::data_type::ConcreteDataType;
@@ -1308,6 +1336,7 @@ mod tests {
};
use super::*;
use crate::Error;
/// Create a test schema with 3 columns: `[col1 int32, ts timestampmills, col2 int32]`.
fn new_test_schema() -> Schema {
@@ -1385,6 +1414,11 @@ mod tests {
ConcreteDataType::string_datatype(),
true,
);
let yet_another_field = ColumnSchema::new(
"yet_another_field_after_ts",
ConcreteDataType::int64_datatype(),
true,
);
let alter_kind = AlterKind::AddColumns {
columns: vec![
AddColumnRequest {
@@ -1401,6 +1435,14 @@ mod tests {
}),
add_if_not_exists: false,
},
AddColumnRequest {
column_schema: yet_another_field,
is_key: true,
location: Some(AddColumnLocation::After {
column_name: "ts".to_string(),
}),
add_if_not_exists: false,
},
],
};
@@ -1756,6 +1798,29 @@ mod tests {
assert_eq!(StatusCode::InvalidArguments, err.status_code());
}
#[test]
fn test_remove_partition_column() {
let schema = Arc::new(new_test_schema());
let meta = TableMetaBuilder::empty()
.schema(schema)
.primary_key_indices(vec![])
.partition_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.build()
.unwrap();
// Remove column in primary key.
let alter_kind = AlterKind::DropColumns {
names: vec![String::from("col1")],
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind)
.err()
.unwrap();
assert_matches!(err, Error::RemovePartitionColumn { .. });
}
#[test]
fn test_change_key_column_data_type() {
let schema = Arc::new(new_test_schema());
@@ -1821,6 +1886,8 @@ mod tests {
let meta = TableMetaBuilder::empty()
.schema(schema)
.primary_key_indices(vec![0])
// partition col: col1, col2
.partition_key_indices(vec![0, 2])
.engine("engine")
.next_column_id(3)
.build()
@@ -1836,11 +1903,19 @@ mod tests {
.map(|column_schema| column_schema.name.clone())
.collect();
assert_eq!(
&["my_tag_first", "col1", "ts", "my_field_after_ts", "col2"],
&[
"my_tag_first", // primary key column
"col1", // partition column
"ts", // timestamp column
"yet_another_field_after_ts", // primary key column
"my_field_after_ts", // value column
"col2", // partition column
],
&names[..]
);
assert_eq!(&[0, 1], &new_meta.primary_key_indices[..]);
assert_eq!(&[2, 3, 4], &new_meta.value_indices[..]);
assert_eq!(&[0, 1, 3], &new_meta.primary_key_indices[..]);
assert_eq!(&[2, 4, 5], &new_meta.value_indices[..]);
assert_eq!(&[1, 5], &new_meta.partition_key_indices[..]);
}
#[test]

View File

@@ -2405,14 +2405,19 @@ processors:
ignore_missing: true
- vrl:
source: |
.log_id = .id
del(.id)
.from_source = "channel_2"
cond, err = .id1 > .id2
if (cond) {
.from_source = "channel_1"
}
del(.id1)
del(.id2)
.
transform:
- fields:
- log_id
type: int32
- from_source
type: string
- field: time
type: time
index: timestamp
@@ -2432,7 +2437,8 @@ transform:
let data_body = r#"
[
{
"id": "2436",
"id1": 2436,
"id2": 123,
"time": "2024-05-25 20:16:37.217"
}
]
@@ -2449,7 +2455,7 @@ transform:
"test_pipeline_with_vrl",
&client,
"select * from d_table",
"[[2436,1716668197217000000]]",
"[[\"channel_1\",1716668197217000000]]",
)
.await;

View File

@@ -152,6 +152,16 @@ pub async fn test_mysql_stmts(store_type: StorageType) {
conn.execute("SET TRANSACTION READ ONLY").await.unwrap();
// empty statements
let err = conn.execute(" ------- ;").await.unwrap_err();
assert!(err.to_string().contains("empty statements"));
let err = conn.execute("----------\n;").await.unwrap_err();
assert!(err.to_string().contains("empty statements"));
let err = conn.execute(" ;").await.unwrap_err();
assert!(err.to_string().contains("empty statements"));
let err = conn.execute(" \n ;").await.unwrap_err();
assert!(err.to_string().contains("empty statements"));
let _ = fe_mysql_server.shutdown().await;
guard.remove_all().await;
}

View File

@@ -174,3 +174,80 @@ DROP TABLE t;
Affected Rows: 0
CREATE TABLE my_table (
a INT PRIMARY KEY,
b STRING,
ts TIMESTAMP TIME INDEX,
)
PARTITION ON COLUMNS (a) (
a < 1000,
a >= 1000 AND a < 2000,
a >= 2000
);
Affected Rows: 0
INSERT INTO my_table VALUES
(100, 'a', 1),
(200, 'b', 2),
(1100, 'c', 3),
(1200, 'd', 4),
(2000, 'e', 5),
(2100, 'f', 6),
(2200, 'g', 7),
(2400, 'h', 8);
Affected Rows: 8
SELECT * FROM my_table WHERE a > 100 order by a;
+------+---+-------------------------+
| a | b | ts |
+------+---+-------------------------+
| 200 | b | 1970-01-01T00:00:00.002 |
| 1100 | c | 1970-01-01T00:00:00.003 |
| 1200 | d | 1970-01-01T00:00:00.004 |
| 2000 | e | 1970-01-01T00:00:00.005 |
| 2100 | f | 1970-01-01T00:00:00.006 |
| 2200 | g | 1970-01-01T00:00:00.007 |
| 2400 | h | 1970-01-01T00:00:00.008 |
+------+---+-------------------------+
SELECT count(*) FROM my_table WHERE a > 100;
+----------+
| count(*) |
+----------+
| 7 |
+----------+
ALTER TABLE my_table ADD COLUMN c STRING FIRST;
Affected Rows: 0
SELECT * FROM my_table WHERE a > 100 order by a;
+---+------+---+-------------------------+
| c | a | b | ts |
+---+------+---+-------------------------+
| | 200 | b | 1970-01-01T00:00:00.002 |
| | 1100 | c | 1970-01-01T00:00:00.003 |
| | 1200 | d | 1970-01-01T00:00:00.004 |
| | 2000 | e | 1970-01-01T00:00:00.005 |
| | 2100 | f | 1970-01-01T00:00:00.006 |
| | 2200 | g | 1970-01-01T00:00:00.007 |
| | 2400 | h | 1970-01-01T00:00:00.008 |
+---+------+---+-------------------------+
SELECT count(*) FROM my_table WHERE a > 100;
+----------+
| count(*) |
+----------+
| 7 |
+----------+
DROP TABLE my_table;
Affected Rows: 0

View File

@@ -47,3 +47,36 @@ SELECT * FROM t;
ALTER TABLE t ADD COLUMN x int xxx;
DROP TABLE t;
CREATE TABLE my_table (
a INT PRIMARY KEY,
b STRING,
ts TIMESTAMP TIME INDEX,
)
PARTITION ON COLUMNS (a) (
a < 1000,
a >= 1000 AND a < 2000,
a >= 2000
);
INSERT INTO my_table VALUES
(100, 'a', 1),
(200, 'b', 2),
(1100, 'c', 3),
(1200, 'd', 4),
(2000, 'e', 5),
(2100, 'f', 6),
(2200, 'g', 7),
(2400, 'h', 8);
SELECT * FROM my_table WHERE a > 100 order by a;
SELECT count(*) FROM my_table WHERE a > 100;
ALTER TABLE my_table ADD COLUMN c STRING FIRST;
SELECT * FROM my_table WHERE a > 100 order by a;
SELECT count(*) FROM my_table WHERE a > 100;
DROP TABLE my_table;

View File

@@ -31,3 +31,24 @@ DROP TABLE test;
Affected Rows: 0
CREATE TABLE my_table (
a INT PRIMARY KEY,
b STRING,
ts TIMESTAMP TIME INDEX,
)
PARTITION ON COLUMNS (a) (
a < 1000,
a >= 1000 AND a < 2000,
a >= 2000
);
Affected Rows: 0
ALTER TABLE my_table DROP COLUMN a;
Error: 1004(InvalidArguments), Not allowed to remove index column a from table my_table
DROP TABLE my_table;
Affected Rows: 0

View File

@@ -11,3 +11,18 @@ SELECT * FROM test;
ALTER TABLE test DROP COLUMN j;
DROP TABLE test;
CREATE TABLE my_table (
a INT PRIMARY KEY,
b STRING,
ts TIMESTAMP TIME INDEX,
)
PARTITION ON COLUMNS (a) (
a < 1000,
a >= 1000 AND a < 2000,
a >= 2000
);
ALTER TABLE my_table DROP COLUMN a;
DROP TABLE my_table;

View File

@@ -0,0 +1,201 @@
-- Test sum(rate()) function combinations
CREATE TABLE metrics (
ts TIMESTAMP TIME INDEX,
val DOUBLE,
host STRING,
service STRING,
PRIMARY KEY (host, service)
);
Affected Rows: 0
-- Insert test data with multiple time series
INSERT INTO metrics VALUES
-- host1, service1
(0, 10, 'host1', 'service1'),
(60000, 20, 'host1', 'service1'),
(120000, 30, 'host1', 'service1'),
(180000, 40, 'host1', 'service1'),
-- host1, service2
(0, 5, 'host1', 'service2'),
(60000, 15, 'host1', 'service2'),
(120000, 25, 'host1', 'service2'),
(180000, 35, 'host1', 'service2'),
-- host2, service1
(0, 8, 'host2', 'service1'),
(60000, 18, 'host2', 'service1'),
(120000, 28, 'host2', 'service1'),
(180000, 38, 'host2', 'service1');
Affected Rows: 12
-- Test basic sum(rate()) - sum rate across all series
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 180, '60s') sum(rate(metrics[1m]));
+---------------------+----------------------------------------------+
| ts | sum(prom_rate(ts_range,val,ts,Int64(60000))) |
+---------------------+----------------------------------------------+
| 1970-01-01T00:01:00 | 0.5 |
| 1970-01-01T00:02:00 | 0.5 |
| 1970-01-01T00:03:00 | 0.5 |
+---------------------+----------------------------------------------+
-- Test sum(rate()) with grouping by host
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 180, '60s') sum by(host) (rate(metrics[1m]));
+-------+---------------------+----------------------------------------------+
| host | ts | sum(prom_rate(ts_range,val,ts,Int64(60000))) |
+-------+---------------------+----------------------------------------------+
| host1 | 1970-01-01T00:01:00 | 0.3333333333333333 |
| host1 | 1970-01-01T00:02:00 | 0.3333333333333333 |
| host1 | 1970-01-01T00:03:00 | 0.3333333333333333 |
| host2 | 1970-01-01T00:01:00 | 0.16666666666666666 |
| host2 | 1970-01-01T00:02:00 | 0.16666666666666666 |
| host2 | 1970-01-01T00:03:00 | 0.16666666666666666 |
+-------+---------------------+----------------------------------------------+
-- Test sum(rate()) with grouping by service
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 180, '60s') sum by(service) (rate(metrics[1m]));
+----------+---------------------+----------------------------------------------+
| service | ts | sum(prom_rate(ts_range,val,ts,Int64(60000))) |
+----------+---------------------+----------------------------------------------+
| service1 | 1970-01-01T00:01:00 | 0.3333333333333333 |
| service1 | 1970-01-01T00:02:00 | 0.3333333333333333 |
| service1 | 1970-01-01T00:03:00 | 0.3333333333333333 |
| service2 | 1970-01-01T00:01:00 | 0.16666666666666666 |
| service2 | 1970-01-01T00:02:00 | 0.16666666666666666 |
| service2 | 1970-01-01T00:03:00 | 0.16666666666666666 |
+----------+---------------------+----------------------------------------------+
-- Test sum(rate()) with label filtering
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 180, '60s') sum(rate(metrics{host="host1"}[1m]));
+---------------------+----------------------------------------------+
| ts | sum(prom_rate(ts_range,val,ts,Int64(60000))) |
+---------------------+----------------------------------------------+
| 1970-01-01T00:01:00 | 0.3333333333333333 |
| 1970-01-01T00:02:00 | 0.3333333333333333 |
| 1970-01-01T00:03:00 | 0.3333333333333333 |
+---------------------+----------------------------------------------+
-- Test sum(rate()) with multiple label filters
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 180, '60s') sum(rate(metrics{host="host1", service="service1"}[1m]));
+---------------------+----------------------------------------------+
| ts | sum(prom_rate(ts_range,val,ts,Int64(60000))) |
+---------------------+----------------------------------------------+
| 1970-01-01T00:01:00 | 0.16666666666666666 |
| 1970-01-01T00:02:00 | 0.16666666666666666 |
| 1970-01-01T00:03:00 | 0.16666666666666666 |
+---------------------+----------------------------------------------+
-- Test sum(rate()) with regex label matching
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 180, '60s') sum(rate(metrics{host=~"host.*"}[1m]));
+---------------------+----------------------------------------------+
| ts | sum(prom_rate(ts_range,val,ts,Int64(60000))) |
+---------------------+----------------------------------------------+
| 1970-01-01T00:01:00 | 0.5 |
| 1970-01-01T00:02:00 | 0.5 |
| 1970-01-01T00:03:00 | 0.5 |
+---------------------+----------------------------------------------+
-- Test sum(rate()) with different time ranges
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 180, '60s') sum(rate(metrics[30s]));
++
++
-- Test sum(rate()) with longer evaluation window
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 240, '60s') sum(rate(metrics[1m]));
+---------------------+----------------------------------------------+
| ts | sum(prom_rate(ts_range,val,ts,Int64(60000))) |
+---------------------+----------------------------------------------+
| 1970-01-01T00:01:00 | 0.5 |
| 1970-01-01T00:02:00 | 0.5 |
| 1970-01-01T00:03:00 | 0.5 |
+---------------------+----------------------------------------------+
-- Test sum(rate()) combined with arithmetic operations
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 180, '60s') sum(rate(metrics[1m])) * 100;
+---------------------+-------------------------------------------------------------+
| ts | sum(prom_rate(ts_range,val,ts,Int64(60000))) * Float64(100) |
+---------------------+-------------------------------------------------------------+
| 1970-01-01T00:01:00 | 50.0 |
| 1970-01-01T00:02:00 | 50.0 |
| 1970-01-01T00:03:00 | 50.0 |
+---------------------+-------------------------------------------------------------+
-- Test sum(rate()) with grouping and arithmetic
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 180, '60s') sum by(host) (rate(metrics[1m])) * 60;
+-------+---------------------+------------------------------------------------------------+
| host | ts | sum(prom_rate(ts_range,val,ts,Int64(60000))) * Float64(60) |
+-------+---------------------+------------------------------------------------------------+
| host1 | 1970-01-01T00:01:00 | 20.0 |
| host1 | 1970-01-01T00:02:00 | 20.0 |
| host1 | 1970-01-01T00:03:00 | 20.0 |
| host2 | 1970-01-01T00:01:00 | 10.0 |
| host2 | 1970-01-01T00:02:00 | 10.0 |
| host2 | 1970-01-01T00:03:00 | 10.0 |
+-------+---------------------+------------------------------------------------------------+
-- Test querying non-existent table
TQL EVAL (0, 180, '60s') sum(rate(non_existent_table[1m]));
++
++
-- Test querying non-existent label
TQL EVAL (0, 180, '60s') sum(rate(metrics{non_existent_label="value"}[1m]));
++
++
-- Test querying non-existent label value
TQL EVAL (0, 180, '60s') sum(rate(metrics{host="non_existent_host"}[1m]));
++
++
-- Test querying multiple non-existent labels
TQL EVAL (0, 180, '60s') sum(rate(metrics{non_existent_label1="value1", non_existent_label2="value2"}[1m]));
++
++
-- Test querying mix of existing and non-existent labels
TQL EVAL (0, 180, '60s') sum(rate(metrics{host="host1", non_existent_label="value"}[1m]));
++
++
-- Test querying non-existent table with non-existent labels
TQL EVAL (0, 180, '60s') sum(rate(non_existent_table{non_existent_label="value"}[1m]));
++
++
-- Test querying non-existent table with multiple non-existent labels
TQL EVAL (0, 180, '60s') sum(rate(non_existent_table{label1="value1", label2="value2"}[1m]));
++
++
DROP TABLE metrics;
Affected Rows: 0

View File

@@ -0,0 +1,89 @@
-- Test sum(rate()) function combinations
CREATE TABLE metrics (
ts TIMESTAMP TIME INDEX,
val DOUBLE,
host STRING,
service STRING,
PRIMARY KEY (host, service)
);
-- Insert test data with multiple time series
INSERT INTO metrics VALUES
-- host1, service1
(0, 10, 'host1', 'service1'),
(60000, 20, 'host1', 'service1'),
(120000, 30, 'host1', 'service1'),
(180000, 40, 'host1', 'service1'),
-- host1, service2
(0, 5, 'host1', 'service2'),
(60000, 15, 'host1', 'service2'),
(120000, 25, 'host1', 'service2'),
(180000, 35, 'host1', 'service2'),
-- host2, service1
(0, 8, 'host2', 'service1'),
(60000, 18, 'host2', 'service1'),
(120000, 28, 'host2', 'service1'),
(180000, 38, 'host2', 'service1');
-- Test basic sum(rate()) - sum rate across all series
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 180, '60s') sum(rate(metrics[1m]));
-- Test sum(rate()) with grouping by host
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 180, '60s') sum by(host) (rate(metrics[1m]));
-- Test sum(rate()) with grouping by service
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 180, '60s') sum by(service) (rate(metrics[1m]));
-- Test sum(rate()) with label filtering
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 180, '60s') sum(rate(metrics{host="host1"}[1m]));
-- Test sum(rate()) with multiple label filters
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 180, '60s') sum(rate(metrics{host="host1", service="service1"}[1m]));
-- Test sum(rate()) with regex label matching
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 180, '60s') sum(rate(metrics{host=~"host.*"}[1m]));
-- Test sum(rate()) with different time ranges
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 180, '60s') sum(rate(metrics[30s]));
-- Test sum(rate()) with longer evaluation window
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 240, '60s') sum(rate(metrics[1m]));
-- Test sum(rate()) combined with arithmetic operations
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 180, '60s') sum(rate(metrics[1m])) * 100;
-- Test sum(rate()) with grouping and arithmetic
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 180, '60s') sum by(host) (rate(metrics[1m])) * 60;
-- Test querying non-existent table
TQL EVAL (0, 180, '60s') sum(rate(non_existent_table[1m]));
-- Test querying non-existent label
TQL EVAL (0, 180, '60s') sum(rate(metrics{non_existent_label="value"}[1m]));
-- Test querying non-existent label value
TQL EVAL (0, 180, '60s') sum(rate(metrics{host="non_existent_host"}[1m]));
-- Test querying multiple non-existent labels
TQL EVAL (0, 180, '60s') sum(rate(metrics{non_existent_label1="value1", non_existent_label2="value2"}[1m]));
-- Test querying mix of existing and non-existent labels
TQL EVAL (0, 180, '60s') sum(rate(metrics{host="host1", non_existent_label="value"}[1m]));
-- Test querying non-existent table with non-existent labels
TQL EVAL (0, 180, '60s') sum(rate(non_existent_table{non_existent_label="value"}[1m]));
-- Test querying non-existent table with multiple non-existent labels
TQL EVAL (0, 180, '60s') sum(rate(non_existent_table{label1="value1", label2="value2"}[1m]));
DROP TABLE metrics;

View File

@@ -233,3 +233,65 @@ DROP TABLE lightning;
Affected Rows: 0
CREATE TABLE IF NOT EXISTS `instance_job_metrics` (
`greptime_timestamp` TIMESTAMP(3) NOT NULL,
`greptime_value` DOUBLE NULL,
`instance` STRING NULL,
`job` STRING NULL,
TIME INDEX (`greptime_timestamp`),
PRIMARY KEY (`instance`, `job`)
);
Affected Rows: 0
INSERT INTO `instance_job_metrics` VALUES
('2023-10-01 00:00:01.000', 1696118400.0, 'node1', 'job1'),
('2023-10-01 00:00:02.000', 1696118400.0, 'node2', 'job1'),
('2023-10-01 00:00:03.000', 1696118400.0, 'node3', 'job2');
Affected Rows: 3
TQL EVAL('2023-10-01 00:00:00.000'::TIMESTAMP, '2023-10-01 00:00:05.000'::TIMESTAMP, '1s') sum(instance_job_metrics);
+---------------------+------------------------------------------+
| greptime_timestamp | sum(instance_job_metrics.greptime_value) |
+---------------------+------------------------------------------+
| 2023-10-01T00:00:01 | 1696118400.0 |
| 2023-10-01T00:00:02 | 3392236800.0 |
| 2023-10-01T00:00:03 | 5088355200.0 |
| 2023-10-01T00:00:04 | 5088355200.0 |
| 2023-10-01T00:00:05 | 5088355200.0 |
+---------------------+------------------------------------------+
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
TQL ANALYZE('2023-10-01 00:00:00.000'::TIMESTAMP, '2023-10-01 00:00:05.000'::TIMESTAMP, '1s') sum(instance_job_metrics);
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [greptime_timestamp@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[greptime_timestamp@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(instance_job_REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(instance_job_REDACTED
|_|_|_ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, greptime_value@1 as greptime_value] REDACTED
|_|_|_PromInstantManipulateExec: range=[1696118400000..1696118405000], lookback=[300000], interval=[1000], time index=[greptime_timestamp] REDACTED
|_|_|_PromSeriesDivideExec: tags=["instance", "job"] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
|_|_| Total rows: 5_|
+-+-+-+
DROP TABLE IF EXISTS `instance_job_metrics`;
Affected Rows: 0

View File

@@ -120,3 +120,30 @@ ORDER BY
true_collect_time DESC;
DROP TABLE lightning;
CREATE TABLE IF NOT EXISTS `instance_job_metrics` (
`greptime_timestamp` TIMESTAMP(3) NOT NULL,
`greptime_value` DOUBLE NULL,
`instance` STRING NULL,
`job` STRING NULL,
TIME INDEX (`greptime_timestamp`),
PRIMARY KEY (`instance`, `job`)
);
INSERT INTO `instance_job_metrics` VALUES
('2023-10-01 00:00:01.000', 1696118400.0, 'node1', 'job1'),
('2023-10-01 00:00:02.000', 1696118400.0, 'node2', 'job1'),
('2023-10-01 00:00:03.000', 1696118400.0, 'node3', 'job2');
TQL EVAL('2023-10-01 00:00:00.000'::TIMESTAMP, '2023-10-01 00:00:05.000'::TIMESTAMP, '1s') sum(instance_job_metrics);
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
TQL ANALYZE('2023-10-01 00:00:00.000'::TIMESTAMP, '2023-10-01 00:00:05.000'::TIMESTAMP, '1s') sum(instance_job_metrics);
DROP TABLE IF EXISTS `instance_job_metrics`;

View File

@@ -234,3 +234,58 @@ drop table test;
Affected Rows: 0
CREATE TABLE test2 (
"greptime_timestamp" TIMESTAMP(3) NOT NULL,
"greptime_value" DOUBLE NULL,
"shard" STRING NULL INVERTED INDEX,
TIME INDEX ("greptime_timestamp"),
PRIMARY KEY ("shard")
)
PARTITION ON COLUMNS ("shard") (
shard <= '2',
shard > '2'
);
Affected Rows: 0
TQL EVAL sum(test2);
++
++
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
TQL ANALYZE sum(test2);
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_SortPreservingMergeExec: [greptime_timestamp@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[greptime_timestamp@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(test2.greptime_value)] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(test2.greptime_value)] REDACTED
|_|_|_ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, greptime_value@1 as greptime_value] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[greptime_timestamp] REDACTED
|_|_|_PromSeriesDivideExec: tags=["shard"] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
| 1_| 1_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[greptime_timestamp] REDACTED
|_|_|_PromSeriesDivideExec: tags=["shard"] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
|_|_| Total rows: 0_|
+-+-+-+
DROP TABLE test2;
Affected Rows: 0

View File

@@ -95,3 +95,28 @@ TQL ANALYZE VERBOSE FORMAT JSON (0, 10, '5s') test;
TQL ANALYZE FORMAT TEXT (0, 10, '5s') test;
drop table test;
CREATE TABLE test2 (
"greptime_timestamp" TIMESTAMP(3) NOT NULL,
"greptime_value" DOUBLE NULL,
"shard" STRING NULL INVERTED INDEX,
TIME INDEX ("greptime_timestamp"),
PRIMARY KEY ("shard")
)
PARTITION ON COLUMNS ("shard") (
shard <= '2',
shard > '2'
);
TQL EVAL sum(test2);
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
TQL ANALYZE sum(test2);
DROP TABLE test2;