Compare commits

..

6 Commits

Author SHA1 Message Date
evenyag
5fc0c5706c chore: bump version to v0.15.4
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-08-04 22:19:40 +08:00
Ning Sun
4d768b2c31 feat: schema/database support for label_values (#6631)
* feat: initial support for __schema__ in label values

* feat: filter database with matches

* refactor: skip unnecessary check

* fix: resolve schema matcher in label values

* test: add a test case for table not exists

* refactor: add matchop check on db label

* chore: merge main

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-08-04 22:19:40 +08:00
Yingwen
b62f219810 feat: Add option to limit the files reading simultaneously (#6635)
* feat: limits the max number of files to scan at the same time

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: make max_concurrent_scan_files configurable

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: reduce concurrent scan files to 128

Signed-off-by: evenyag <realevenyag@gmail.com>

* docs: update config example

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: add test for max_concurrent_scan_files

Signed-off-by: evenyag <realevenyag@gmail.com>

* style: fix clippy

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: update config test

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-08-04 22:19:40 +08:00
Ruihang Xia
5d330fad17 feat: absent function in PromQL (#6618)
* feat: absent function in PromQL

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* impl serde

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* sqlness test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* ai suggests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* resolve PR comments

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* comment out some tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-08-04 22:19:40 +08:00
Ruihang Xia
dfdfae1a7b feat: support __schema__ and __database__ in Prom Remote Read (#6610)
* feat: support __schema__ and __database__ in Prom remote R/W

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix integration test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* revert remote write changes

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* check matcher type

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-08-04 22:19:40 +08:00
Ruihang Xia
822f0caf4b fix: only return the __name__ label when there is one (#6629)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-08-04 22:19:40 +08:00
28 changed files with 1823 additions and 153 deletions

154
Cargo.lock generated
View File

@@ -211,7 +211,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
[[package]]
name = "api"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"common-base",
"common-decimal",
@@ -944,7 +944,7 @@ dependencies = [
[[package]]
name = "auth"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"api",
"async-trait",
@@ -1586,7 +1586,7 @@ dependencies = [
[[package]]
name = "cache"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"catalog",
"common-error",
@@ -1621,7 +1621,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "catalog"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"api",
"arrow 54.2.1",
@@ -1959,7 +1959,7 @@ checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97"
[[package]]
name = "cli"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"async-stream",
"async-trait",
@@ -2004,7 +2004,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.15.3",
"substrait 0.15.4",
"table",
"tempfile",
"tokio",
@@ -2013,7 +2013,7 @@ dependencies = [
[[package]]
name = "client"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"api",
"arc-swap",
@@ -2043,7 +2043,7 @@ dependencies = [
"rand 0.9.0",
"serde_json",
"snafu 0.8.5",
"substrait 0.15.3",
"substrait 0.15.4",
"substrait 0.37.3",
"tokio",
"tokio-stream",
@@ -2084,7 +2084,7 @@ dependencies = [
[[package]]
name = "cmd"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"async-trait",
"auth",
@@ -2145,7 +2145,7 @@ dependencies = [
"snafu 0.8.5",
"stat",
"store-api",
"substrait 0.15.3",
"substrait 0.15.4",
"table",
"temp-env",
"tempfile",
@@ -2192,7 +2192,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
[[package]]
name = "common-base"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"anymap2",
"async-trait",
@@ -2214,11 +2214,11 @@ dependencies = [
[[package]]
name = "common-catalog"
version = "0.15.3"
version = "0.15.4"
[[package]]
name = "common-config"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"common-base",
"common-error",
@@ -2243,7 +2243,7 @@ dependencies = [
[[package]]
name = "common-datasource"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"arrow 54.2.1",
"arrow-schema 54.3.1",
@@ -2280,7 +2280,7 @@ dependencies = [
[[package]]
name = "common-decimal"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"bigdecimal 0.4.8",
"common-error",
@@ -2293,7 +2293,7 @@ dependencies = [
[[package]]
name = "common-error"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"common-macro",
"http 1.1.0",
@@ -2304,7 +2304,7 @@ dependencies = [
[[package]]
name = "common-frontend"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"async-trait",
"common-error",
@@ -2320,7 +2320,7 @@ dependencies = [
[[package]]
name = "common-function"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"ahash 0.8.11",
"api",
@@ -2373,7 +2373,7 @@ dependencies = [
[[package]]
name = "common-greptimedb-telemetry"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"async-trait",
"common-runtime",
@@ -2390,7 +2390,7 @@ dependencies = [
[[package]]
name = "common-grpc"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"api",
"arrow-flight",
@@ -2422,7 +2422,7 @@ dependencies = [
[[package]]
name = "common-grpc-expr"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"api",
"common-base",
@@ -2441,7 +2441,7 @@ dependencies = [
[[package]]
name = "common-macro"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"arc-swap",
"common-query",
@@ -2455,7 +2455,7 @@ dependencies = [
[[package]]
name = "common-mem-prof"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"anyhow",
"common-error",
@@ -2471,7 +2471,7 @@ dependencies = [
[[package]]
name = "common-meta"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"anymap2",
"api",
@@ -2536,7 +2536,7 @@ dependencies = [
[[package]]
name = "common-options"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"common-grpc",
"humantime-serde",
@@ -2545,11 +2545,11 @@ dependencies = [
[[package]]
name = "common-plugins"
version = "0.15.3"
version = "0.15.4"
[[package]]
name = "common-pprof"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"common-error",
"common-macro",
@@ -2561,7 +2561,7 @@ dependencies = [
[[package]]
name = "common-procedure"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"async-stream",
"async-trait",
@@ -2588,7 +2588,7 @@ dependencies = [
[[package]]
name = "common-procedure-test"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"async-trait",
"common-procedure",
@@ -2597,7 +2597,7 @@ dependencies = [
[[package]]
name = "common-query"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"api",
"async-trait",
@@ -2623,7 +2623,7 @@ dependencies = [
[[package]]
name = "common-recordbatch"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"arc-swap",
"common-error",
@@ -2643,7 +2643,7 @@ dependencies = [
[[package]]
name = "common-runtime"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -2673,14 +2673,14 @@ dependencies = [
[[package]]
name = "common-session"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"strum 0.27.1",
]
[[package]]
name = "common-telemetry"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"backtrace",
"common-error",
@@ -2708,7 +2708,7 @@ dependencies = [
[[package]]
name = "common-test-util"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"client",
"common-grpc",
@@ -2721,7 +2721,7 @@ dependencies = [
[[package]]
name = "common-time"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"arrow 54.2.1",
"chrono",
@@ -2739,7 +2739,7 @@ dependencies = [
[[package]]
name = "common-version"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"build-data",
"cargo-manifest",
@@ -2750,7 +2750,7 @@ dependencies = [
[[package]]
name = "common-wal"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"common-base",
"common-error",
@@ -2773,7 +2773,7 @@ dependencies = [
[[package]]
name = "common-workload"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"api",
"common-telemetry",
@@ -3729,7 +3729,7 @@ dependencies = [
[[package]]
name = "datanode"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"api",
"arrow-flight",
@@ -3782,7 +3782,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.15.3",
"substrait 0.15.4",
"table",
"tokio",
"toml 0.8.19",
@@ -3791,7 +3791,7 @@ dependencies = [
[[package]]
name = "datatypes"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"arrow 54.2.1",
"arrow-array 54.2.1",
@@ -4451,7 +4451,7 @@ checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6"
[[package]]
name = "file-engine"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"api",
"async-trait",
@@ -4588,7 +4588,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
[[package]]
name = "flow"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"api",
"arrow 54.2.1",
@@ -4653,7 +4653,7 @@ dependencies = [
"sql",
"store-api",
"strum 0.27.1",
"substrait 0.15.3",
"substrait 0.15.4",
"table",
"tokio",
"tonic 0.12.3",
@@ -4708,7 +4708,7 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
[[package]]
name = "frontend"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"api",
"arc-swap",
@@ -4768,7 +4768,7 @@ dependencies = [
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"store-api",
"strfmt",
"substrait 0.15.3",
"substrait 0.15.4",
"table",
"tokio",
"tokio-util",
@@ -5158,7 +5158,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=96c733f8472284d3c83a4c011dc6de9cf830c353#96c733f8472284d3c83a4c011dc6de9cf830c353"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a5d256ba4abb7393e0859ffbf7fca1e38f3433dc#a5d256ba4abb7393e0859ffbf7fca1e38f3433dc"
dependencies = [
"prost 0.13.5",
"serde",
@@ -5929,7 +5929,7 @@ dependencies = [
[[package]]
name = "index"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"async-trait",
"asynchronous-codec",
@@ -6814,7 +6814,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]]
name = "log-query"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"chrono",
"common-error",
@@ -6826,7 +6826,7 @@ dependencies = [
[[package]]
name = "log-store"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"async-stream",
"async-trait",
@@ -7124,7 +7124,7 @@ dependencies = [
[[package]]
name = "meta-client"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"api",
"async-trait",
@@ -7152,7 +7152,7 @@ dependencies = [
[[package]]
name = "meta-srv"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"api",
"async-trait",
@@ -7243,7 +7243,7 @@ dependencies = [
[[package]]
name = "metric-engine"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"api",
"aquamarine",
@@ -7333,7 +7333,7 @@ dependencies = [
[[package]]
name = "mito-codec"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"api",
"bytes",
@@ -7356,7 +7356,7 @@ dependencies = [
[[package]]
name = "mito2"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"api",
"aquamarine",
@@ -8106,7 +8106,7 @@ dependencies = [
[[package]]
name = "object-store"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"anyhow",
"bytes",
@@ -8420,7 +8420,7 @@ dependencies = [
[[package]]
name = "operator"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"ahash 0.8.11",
"api",
@@ -8475,7 +8475,7 @@ dependencies = [
"sql",
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"store-api",
"substrait 0.15.3",
"substrait 0.15.4",
"table",
"tokio",
"tokio-util",
@@ -8742,7 +8742,7 @@ dependencies = [
[[package]]
name = "partition"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"api",
"async-trait",
@@ -9030,7 +9030,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pipeline"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"ahash 0.8.11",
"api",
@@ -9173,7 +9173,7 @@ dependencies = [
[[package]]
name = "plugins"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"auth",
"clap 4.5.19",
@@ -9486,7 +9486,7 @@ dependencies = [
[[package]]
name = "promql"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"ahash 0.8.11",
"async-trait",
@@ -9768,7 +9768,7 @@ dependencies = [
[[package]]
name = "puffin"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"async-compression 0.4.13",
"async-trait",
@@ -9810,7 +9810,7 @@ dependencies = [
[[package]]
name = "query"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"ahash 0.8.11",
"api",
@@ -9876,7 +9876,7 @@ dependencies = [
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"statrs",
"store-api",
"substrait 0.15.3",
"substrait 0.15.4",
"table",
"tokio",
"tokio-stream",
@@ -11162,7 +11162,7 @@ dependencies = [
[[package]]
name = "servers"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"ahash 0.8.11",
"api",
@@ -11283,7 +11283,7 @@ dependencies = [
[[package]]
name = "session"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"api",
"arc-swap",
@@ -11622,7 +11622,7 @@ dependencies = [
[[package]]
name = "sql"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"api",
"chrono",
@@ -11677,7 +11677,7 @@ dependencies = [
[[package]]
name = "sqlness-runner"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -11977,7 +11977,7 @@ dependencies = [
[[package]]
name = "stat"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"nix 0.30.1",
]
@@ -12003,7 +12003,7 @@ dependencies = [
[[package]]
name = "store-api"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"api",
"aquamarine",
@@ -12164,7 +12164,7 @@ dependencies = [
[[package]]
name = "substrait"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"async-trait",
"bytes",
@@ -12344,7 +12344,7 @@ dependencies = [
[[package]]
name = "table"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"api",
"async-trait",
@@ -12605,7 +12605,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
[[package]]
name = "tests-fuzz"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"arbitrary",
"async-trait",
@@ -12649,7 +12649,7 @@ dependencies = [
[[package]]
name = "tests-integration"
version = "0.15.3"
version = "0.15.4"
dependencies = [
"api",
"arrow-flight",
@@ -12716,7 +12716,7 @@ dependencies = [
"sql",
"sqlx",
"store-api",
"substrait 0.15.3",
"substrait 0.15.4",
"table",
"tempfile",
"time",

View File

@@ -71,7 +71,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.15.3"
version = "0.15.4"
edition = "2021"
license = "Apache-2.0"
@@ -134,7 +134,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "96c733f8472284d3c83a4c011dc6de9cf830c353" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a5d256ba4abb7393e0859ffbf7fca1e38f3433dc" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -147,6 +147,7 @@
| `region_engine.mito.write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.max_concurrent_scan_files` | Integer | `128` | Maximum number of SST files to scan concurrently. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.<br/>To align with the old behavior, the default value is 0 (no restrictions). |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
@@ -496,6 +497,7 @@
| `region_engine.mito.write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.max_concurrent_scan_files` | Integer | `128` | Maximum number of SST files to scan concurrently. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.<br/>To align with the old behavior, the default value is 0 (no restrictions). |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |

View File

@@ -474,6 +474,9 @@ sst_write_buffer_size = "8MB"
## Capacity of the channel to send data from parallel scan tasks to the main task.
parallel_scan_channel_size = 32
## Maximum number of SST files to scan concurrently.
max_concurrent_scan_files = 128
## Whether to allow stale WAL entries read during replay.
allow_stale_entries = false

View File

@@ -565,6 +565,9 @@ sst_write_buffer_size = "8MB"
## Capacity of the channel to send data from parallel scan tasks to the main task.
parallel_scan_channel_size = 32
## Maximum number of SST files to scan concurrently.
max_concurrent_scan_files = 128
## Whether to allow stale WAL entries read during replay.
allow_stale_entries = false

View File

@@ -19,7 +19,8 @@ use datafusion::execution::registry::SerializerRegistry;
use datafusion_common::DataFusionError;
use datafusion_expr::UserDefinedLogicalNode;
use promql::extension_plan::{
EmptyMetric, InstantManipulate, RangeManipulate, ScalarCalculate, SeriesDivide, SeriesNormalize,
Absent, EmptyMetric, InstantManipulate, RangeManipulate, ScalarCalculate, SeriesDivide,
SeriesNormalize,
};
#[derive(Debug)]
@@ -65,6 +66,13 @@ impl SerializerRegistry for ExtensionSerializer {
.expect("Failed to downcast to SeriesDivide");
Ok(series_divide.serialize())
}
name if name == Absent::name() => {
let absent = node
.as_any()
.downcast_ref::<Absent>()
.expect("Failed to downcast to Absent");
Ok(absent.serialize())
}
name if name == EmptyMetric::name() => Err(DataFusionError::Substrait(
"EmptyMetric should not be serialized".to_string(),
)),
@@ -103,6 +111,10 @@ impl SerializerRegistry for ExtensionSerializer {
let scalar_calculate = ScalarCalculate::deserialize(bytes)?;
Ok(Arc::new(scalar_calculate))
}
name if name == Absent::name() => {
let absent = Absent::deserialize(bytes)?;
Ok(Arc::new(absent))
}
name if name == EmptyMetric::name() => Err(DataFusionError::Substrait(
"EmptyMetric should not be deserialized".to_string(),
)),

View File

@@ -21,9 +21,10 @@ use common_catalog::format_full_table_name;
use common_recordbatch::util;
use common_telemetry::tracing;
use datatypes::prelude::Value;
use promql_parser::label::{Matcher, Matchers};
use promql_parser::label::{MatchOp, Matcher, Matchers};
use query::promql;
use query::promql::planner::PromPlanner;
use servers::prom_store::{DATABASE_LABEL, SCHEMA_LABEL};
use servers::prometheus;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
@@ -114,7 +115,17 @@ impl Instance {
end: SystemTime,
ctx: &QueryContextRef,
) -> Result<Vec<String>> {
let table_schema = ctx.current_schema();
let table_schema = matchers
.iter()
.find_map(|m| {
if (m.name == SCHEMA_LABEL || m.name == DATABASE_LABEL) && m.op == MatchOp::Equal {
Some(m.value.clone())
} else {
None
}
})
.unwrap_or_else(|| ctx.current_schema());
let table = self
.catalog_manager
.table(ctx.current_catalog(), &table_schema, &metric, Some(ctx))

View File

@@ -30,6 +30,8 @@ use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
/// Default channel size for parallel scan task.
pub(crate) const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
/// Default maximum number of SST files to scan concurrently.
pub(crate) const DEFAULT_MAX_CONCURRENT_SCAN_FILES: usize = 128;
// Use `1/GLOBAL_WRITE_BUFFER_SIZE_FACTOR` of OS memory as global write buffer size in default mode
const GLOBAL_WRITE_BUFFER_SIZE_FACTOR: u64 = 8;
@@ -107,6 +109,8 @@ pub struct MitoConfig {
pub sst_write_buffer_size: ReadableSize,
/// Capacity of the channel to send data from parallel scan tasks to the main task (default 32).
pub parallel_scan_channel_size: usize,
/// Maximum number of SST files to scan concurrently (default 128).
pub max_concurrent_scan_files: usize,
/// Whether to allow stale entries read during replay.
pub allow_stale_entries: bool,
@@ -152,6 +156,7 @@ impl Default for MitoConfig {
write_cache_ttl: None,
sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
allow_stale_entries: false,
index: IndexConfig::default(),
inverted_index: InvertedIndexConfig::default(),

View File

@@ -506,6 +506,7 @@ impl EngineInner {
CacheStrategy::EnableAll(cache_manager),
)
.with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
.with_max_concurrent_scan_files(self.config.max_concurrent_scan_files)
.with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
.with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
.with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())

View File

@@ -13,6 +13,8 @@
// limitations under the License.
use api::v1::Rows;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::RecordBatches;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::TryStreamExt;
@@ -151,6 +153,58 @@ async fn test_scan_with_min_sst_sequence() {
.await;
}
#[tokio::test]
async fn test_max_concurrent_scan_files() {
let mut env = TestEnv::with_prefix("test_max_concurrent_scan_files").await;
let config = MitoConfig {
max_concurrent_scan_files: 2,
..Default::default()
};
let engine = env.create_engine(config).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let column_schemas = test_util::rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let put_and_flush = async |start, end| {
let rows = Rows {
schema: column_schemas.clone(),
rows: test_util::build_rows(start, end),
};
test_util::put_rows(&engine, region_id, rows).await;
test_util::flush_region(&engine, region_id, None).await;
};
// Write overlapping files.
put_and_flush(0, 4).await;
put_and_flush(3, 7).await;
put_and_flush(6, 9).await;
let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request).await.unwrap();
let Scanner::Seq(scanner) = scanner else {
panic!("Scanner should be seq scan");
};
let error = scanner.check_scan_limit().unwrap_err();
assert_eq!(StatusCode::RateLimited, error.status_code());
let request = ScanRequest {
distribution: Some(TimeSeriesDistribution::PerSeries),
..Default::default()
};
let scanner = engine.scanner(region_id, request).await.unwrap();
let Scanner::Series(scanner) = scanner else {
panic!("Scanner should be series scan");
};
let error = scanner.check_scan_limit().unwrap_err();
assert_eq!(StatusCode::RateLimited, error.status_code());
}
#[tokio::test]
async fn test_series_scan() {
let mut env = TestEnv::with_prefix("test_series_scan").await;

View File

@@ -1032,6 +1032,18 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Too many files to read concurrently: {}, max allowed: {}",
actual,
max
))]
TooManyFilesToRead {
actual: usize,
max: usize,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -1189,6 +1201,8 @@ impl ErrorExt for Error {
Encode { source, .. } | Decode { source, .. } => source.status_code(),
InconsistentTimestampLength { .. } => StatusCode::InvalidArguments,
TooManyFilesToRead { .. } => StatusCode::RateLimited,
}
}

View File

@@ -39,7 +39,7 @@ use tokio_stream::wrappers::ReceiverStream;
use crate::access_layer::AccessLayerRef;
use crate::cache::CacheStrategy;
use crate::config::DEFAULT_SCAN_CHANNEL_SIZE;
use crate::config::{DEFAULT_MAX_CONCURRENT_SCAN_FILES, DEFAULT_SCAN_CHANNEL_SIZE};
use crate::error::Result;
use crate::memtable::MemtableRange;
use crate::metrics::READ_SST_COUNT;
@@ -187,6 +187,8 @@ pub(crate) struct ScanRegion {
cache_strategy: CacheStrategy,
/// Capacity of the channel to send data from parallel scan tasks to the main task.
parallel_scan_channel_size: usize,
/// Maximum number of SST files to scan concurrently.
max_concurrent_scan_files: usize,
/// Whether to ignore inverted index.
ignore_inverted_index: bool,
/// Whether to ignore fulltext index.
@@ -214,6 +216,7 @@ impl ScanRegion {
request,
cache_strategy,
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
ignore_inverted_index: false,
ignore_fulltext_index: false,
ignore_bloom_filter: false,
@@ -232,6 +235,16 @@ impl ScanRegion {
self
}
/// Sets maximum number of SST files to scan concurrently.
#[must_use]
pub(crate) fn with_max_concurrent_scan_files(
mut self,
max_concurrent_scan_files: usize,
) -> Self {
self.max_concurrent_scan_files = max_concurrent_scan_files;
self
}
/// Sets whether to ignore inverted index.
#[must_use]
pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
@@ -421,6 +434,7 @@ impl ScanRegion {
.with_bloom_filter_index_applier(bloom_filter_applier)
.with_fulltext_index_applier(fulltext_index_applier)
.with_parallel_scan_channel_size(self.parallel_scan_channel_size)
.with_max_concurrent_scan_files(self.max_concurrent_scan_files)
.with_start_time(self.start_time)
.with_append_mode(self.version.options.append_mode)
.with_filter_deleted(self.filter_deleted)
@@ -597,6 +611,8 @@ pub struct ScanInput {
ignore_file_not_found: bool,
/// Capacity of the channel to send data from parallel scan tasks to the main task.
pub(crate) parallel_scan_channel_size: usize,
/// Maximum number of SST files to scan concurrently.
pub(crate) max_concurrent_scan_files: usize,
/// Index appliers.
inverted_index_applier: Option<InvertedIndexApplierRef>,
bloom_filter_index_applier: Option<BloomFilterIndexApplierRef>,
@@ -629,6 +645,7 @@ impl ScanInput {
cache_strategy: CacheStrategy::Disabled,
ignore_file_not_found: false,
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
inverted_index_applier: None,
bloom_filter_index_applier: None,
fulltext_index_applier: None,
@@ -693,6 +710,16 @@ impl ScanInput {
self
}
/// Sets maximum number of SST files to scan concurrently.
#[must_use]
pub(crate) fn with_max_concurrent_scan_files(
mut self,
max_concurrent_scan_files: usize,
) -> Self {
self.max_concurrent_scan_files = max_concurrent_scan_files;
self
}
/// Sets invereted index applier.
#[must_use]
pub(crate) fn with_inverted_index_applier(

View File

@@ -33,11 +33,11 @@ use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, Sc
use store_api::storage::TimeSeriesRowSelector;
use tokio::sync::Semaphore;
use crate::error::{PartitionOutOfRangeSnafu, Result};
use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu};
use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
use crate::read::last_row::LastRowReader;
use crate::read::merge::MergeReaderBuilder;
use crate::read::range::RangeBuilderList;
use crate::read::range::{RangeBuilderList, RangeMeta};
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::{
scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList,
@@ -347,6 +347,40 @@ impl SeqScan {
metrics
}
/// Finds the maximum number of files to read in a single partition range.
fn max_files_in_partition(ranges: &[RangeMeta], partition_ranges: &[PartitionRange]) -> usize {
partition_ranges
.iter()
.map(|part_range| {
let range_meta = &ranges[part_range.identifier];
range_meta.indices.len()
})
.max()
.unwrap_or(0)
}
/// Checks resource limit for the scanner.
pub(crate) fn check_scan_limit(&self) -> Result<()> {
// Check max file count limit for all partitions since we scan them in parallel.
let total_max_files: usize = self
.properties
.partitions
.iter()
.map(|partition| Self::max_files_in_partition(&self.stream_ctx.ranges, partition))
.sum();
let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
if total_max_files > max_concurrent_files {
return TooManyFilesToReadSnafu {
actual: total_max_files,
max: max_concurrent_files,
}
.fail();
}
Ok(())
}
}
impl RegionScanner for SeqScan {
@@ -372,6 +406,9 @@ impl RegionScanner for SeqScan {
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
self.properties.prepare(request);
self.check_scan_limit().map_err(BoxedError::new)?;
Ok(())
}

View File

@@ -37,7 +37,7 @@ use tokio::sync::Semaphore;
use crate::error::{
ComputeArrowSnafu, Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result,
ScanMultiTimesSnafu, ScanSeriesSnafu,
ScanMultiTimesSnafu, ScanSeriesSnafu, TooManyFilesToReadSnafu,
};
use crate::read::range::RangeBuilderList;
use crate::read::scan_region::{ScanInput, StreamContext};
@@ -201,6 +201,32 @@ impl SeriesScan {
let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
Ok(Box::pin(chained_stream))
}
/// Checks resource limit for the scanner.
pub(crate) fn check_scan_limit(&self) -> Result<()> {
// Sum the total number of files across all partitions
let total_files: usize = self
.properties
.partitions
.iter()
.flat_map(|partition| partition.iter())
.map(|part_range| {
let range_meta = &self.stream_ctx.ranges[part_range.identifier];
range_meta.indices.len()
})
.sum();
let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
if total_files > max_concurrent_files {
return TooManyFilesToReadSnafu {
actual: total_files,
max: max_concurrent_files,
}
.fail();
}
Ok(())
}
}
fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
@@ -236,6 +262,9 @@ impl RegionScanner for SeriesScan {
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
self.properties.prepare(request);
self.check_scan_limit().map_err(BoxedError::new)?;
Ok(())
}

View File

@@ -242,6 +242,7 @@ impl RegionScanner for UnorderedScan {
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
self.properties.prepare(request);
// UnorderedScan only scans one row group per partition so the resource requirement won't be too high.
Ok(())
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod absent;
mod empty_metric;
mod histogram_fold;
mod instant_manipulate;
@@ -24,6 +25,7 @@ mod series_divide;
mod test_util;
mod union_distinct_on;
pub use absent::{Absent, AbsentExec, AbsentStream};
use datafusion::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType};
pub use empty_metric::{build_special_time_expr, EmptyMetric, EmptyMetricExec, EmptyMetricStream};
pub use histogram_fold::{HistogramFold, HistogramFoldExec, HistogramFoldStream};

View File

@@ -0,0 +1,654 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::cmp::Ordering;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use datafusion::arrow::array::Array;
use datafusion::common::{DFSchemaRef, Result as DataFusionResult};
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::physical_expr::{EquivalenceProperties, LexRequirement, PhysicalSortRequirement};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::expressions::Column as ColumnExpr;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PlanProperties,
RecordBatchStream, SendableRecordBatchStream,
};
use datafusion_common::DFSchema;
use datafusion_expr::EmptyRelation;
use datatypes::arrow;
use datatypes::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray};
use datatypes::arrow::datatypes::{DataType, Field, SchemaRef, TimeUnit};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::arrow_array::StringArray;
use datatypes::compute::SortOptions;
use futures::{ready, Stream, StreamExt};
use greptime_proto::substrait_extension as pb;
use prost::Message;
use snafu::ResultExt;
use crate::error::DeserializeSnafu;
use crate::extension_plan::Millisecond;
/// Maximum number of rows per output batch
const ABSENT_BATCH_SIZE: usize = 8192;
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct Absent {
start: Millisecond,
end: Millisecond,
step: Millisecond,
time_index_column: String,
value_column: String,
fake_labels: Vec<(String, String)>,
input: LogicalPlan,
output_schema: DFSchemaRef,
}
impl PartialOrd for Absent {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
// compare on fields except schema and input
(
self.start,
self.end,
self.step,
&self.time_index_column,
&self.value_column,
&self.fake_labels,
)
.partial_cmp(&(
other.start,
other.end,
other.step,
&other.time_index_column,
&other.value_column,
&other.fake_labels,
))
}
}
impl UserDefinedLogicalNodeCore for Absent {
fn name(&self) -> &str {
Self::name()
}
fn inputs(&self) -> Vec<&LogicalPlan> {
vec![&self.input]
}
fn schema(&self) -> &DFSchemaRef {
&self.output_schema
}
fn expressions(&self) -> Vec<Expr> {
vec![]
}
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"PromAbsent: start={}, end={}, step={}",
self.start, self.end, self.step
)
}
fn with_exprs_and_inputs(
&self,
_exprs: Vec<Expr>,
inputs: Vec<LogicalPlan>,
) -> DataFusionResult<Self> {
if inputs.is_empty() {
return Err(datafusion::error::DataFusionError::Internal(
"Absent must have at least one input".to_string(),
));
}
Ok(Self {
start: self.start,
end: self.end,
step: self.step,
time_index_column: self.time_index_column.clone(),
value_column: self.value_column.clone(),
fake_labels: self.fake_labels.clone(),
input: inputs[0].clone(),
output_schema: self.output_schema.clone(),
})
}
}
impl Absent {
pub fn try_new(
start: Millisecond,
end: Millisecond,
step: Millisecond,
time_index_column: String,
value_column: String,
fake_labels: Vec<(String, String)>,
input: LogicalPlan,
) -> DataFusionResult<Self> {
let mut fields = vec![
Field::new(
&time_index_column,
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
Field::new(&value_column, DataType::Float64, true),
];
// remove duplicate fake labels
let mut fake_labels = fake_labels
.into_iter()
.collect::<HashMap<String, String>>()
.into_iter()
.collect::<Vec<_>>();
fake_labels.sort_unstable_by(|a, b| a.0.cmp(&b.0));
for (name, _) in fake_labels.iter() {
fields.push(Field::new(name, DataType::Utf8, true));
}
let output_schema = Arc::new(DFSchema::from_unqualified_fields(
fields.into(),
HashMap::new(),
)?);
Ok(Self {
start,
end,
step,
time_index_column,
value_column,
fake_labels,
input,
output_schema,
})
}
pub const fn name() -> &'static str {
"prom_absent"
}
pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
let output_schema = Arc::new(self.output_schema.as_arrow().clone());
let properties = PlanProperties::new(
EquivalenceProperties::new(output_schema.clone()),
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Bounded,
);
Arc::new(AbsentExec {
start: self.start,
end: self.end,
step: self.step,
time_index_column: self.time_index_column.clone(),
value_column: self.value_column.clone(),
fake_labels: self.fake_labels.clone(),
output_schema: output_schema.clone(),
input: exec_input,
properties,
metric: ExecutionPlanMetricsSet::new(),
})
}
pub fn serialize(&self) -> Vec<u8> {
pb::Absent {
start: self.start,
end: self.end,
step: self.step,
time_index_column: self.time_index_column.clone(),
value_column: self.value_column.clone(),
fake_labels: self
.fake_labels
.iter()
.map(|(name, value)| pb::LabelPair {
key: name.clone(),
value: value.clone(),
})
.collect(),
}
.encode_to_vec()
}
pub fn deserialize(bytes: &[u8]) -> DataFusionResult<Self> {
let pb_absent = pb::Absent::decode(bytes).context(DeserializeSnafu)?;
let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(DFSchema::empty()),
});
Self::try_new(
pb_absent.start,
pb_absent.end,
pb_absent.step,
pb_absent.time_index_column,
pb_absent.value_column,
pb_absent
.fake_labels
.iter()
.map(|label| (label.key.clone(), label.value.clone()))
.collect(),
placeholder_plan,
)
}
}
#[derive(Debug)]
pub struct AbsentExec {
start: Millisecond,
end: Millisecond,
step: Millisecond,
time_index_column: String,
value_column: String,
fake_labels: Vec<(String, String)>,
output_schema: SchemaRef,
input: Arc<dyn ExecutionPlan>,
properties: PlanProperties,
metric: ExecutionPlanMetricsSet,
}
impl ExecutionPlan for AbsentExec {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.output_schema.clone()
}
fn properties(&self) -> &PlanProperties {
&self.properties
}
fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::SinglePartition]
}
fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
vec![Some(LexRequirement::new(vec![PhysicalSortRequirement {
expr: Arc::new(
ColumnExpr::new_with_schema(&self.time_index_column, &self.input.schema()).unwrap(),
),
options: Some(SortOptions {
descending: false,
nulls_first: false,
}),
}]))]
}
fn maintains_input_order(&self) -> Vec<bool> {
vec![false]
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
assert!(!children.is_empty());
Ok(Arc::new(Self {
start: self.start,
end: self.end,
step: self.step,
time_index_column: self.time_index_column.clone(),
value_column: self.value_column.clone(),
fake_labels: self.fake_labels.clone(),
output_schema: self.output_schema.clone(),
input: children[0].clone(),
properties: self.properties.clone(),
metric: self.metric.clone(),
}))
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> DataFusionResult<SendableRecordBatchStream> {
let baseline_metric = BaselineMetrics::new(&self.metric, partition);
let input = self.input.execute(partition, context)?;
Ok(Box::pin(AbsentStream {
end: self.end,
step: self.step,
time_index_column_index: self
.input
.schema()
.column_with_name(&self.time_index_column)
.unwrap() // Safety: we have checked the column name in `try_new`
.0,
output_schema: self.output_schema.clone(),
fake_labels: self.fake_labels.clone(),
input,
metric: baseline_metric,
// Buffer for streaming output timestamps
output_timestamps: Vec::new(),
// Current timestamp in the output range
output_ts_cursor: self.start,
input_finished: false,
}))
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metric.clone_inner())
}
fn name(&self) -> &str {
"AbsentExec"
}
}
impl DisplayAs for AbsentExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"PromAbsentExec: start={}, end={}, step={}",
self.start, self.end, self.step
)
}
}
}
}
pub struct AbsentStream {
end: Millisecond,
step: Millisecond,
time_index_column_index: usize,
output_schema: SchemaRef,
fake_labels: Vec<(String, String)>,
input: SendableRecordBatchStream,
metric: BaselineMetrics,
// Buffer for streaming output timestamps
output_timestamps: Vec<Millisecond>,
// Current timestamp in the output range
output_ts_cursor: Millisecond,
input_finished: bool,
}
impl RecordBatchStream for AbsentStream {
fn schema(&self) -> SchemaRef {
self.output_schema.clone()
}
}
impl Stream for AbsentStream {
type Item = DataFusionResult<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if !self.input_finished {
match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
let timer = std::time::Instant::now();
if let Err(e) = self.process_input_batch(&batch) {
return Poll::Ready(Some(Err(e)));
}
self.metric.elapsed_compute().add_elapsed(timer);
// If we have enough data for a batch, output it
if self.output_timestamps.len() >= ABSENT_BATCH_SIZE {
let timer = std::time::Instant::now();
let result = self.flush_output_batch();
self.metric.elapsed_compute().add_elapsed(timer);
match result {
Ok(Some(batch)) => return Poll::Ready(Some(Ok(batch))),
Ok(None) => continue,
Err(e) => return Poll::Ready(Some(Err(e))),
}
}
}
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
None => {
self.input_finished = true;
let timer = std::time::Instant::now();
// Process any remaining absent timestamps
if let Err(e) = self.process_remaining_absent_timestamps() {
return Poll::Ready(Some(Err(e)));
}
let result = self.flush_output_batch();
self.metric.elapsed_compute().add_elapsed(timer);
return Poll::Ready(result.transpose());
}
}
} else {
return Poll::Ready(None);
}
}
}
}
impl AbsentStream {
fn process_input_batch(&mut self, batch: &RecordBatch) -> DataFusionResult<()> {
// Extract timestamps from this batch
let timestamp_array = batch.column(self.time_index_column_index);
let milli_ts_array = arrow::compute::cast(
timestamp_array,
&DataType::Timestamp(TimeUnit::Millisecond, None),
)?;
let timestamp_array = milli_ts_array
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
// Process against current output cursor position
for &input_ts in timestamp_array.values() {
// Generate absent timestamps up to this input timestamp
while self.output_ts_cursor < input_ts && self.output_ts_cursor <= self.end {
self.output_timestamps.push(self.output_ts_cursor);
self.output_ts_cursor += self.step;
}
// Skip the input timestamp if it matches our cursor
if self.output_ts_cursor == input_ts {
self.output_ts_cursor += self.step;
}
}
Ok(())
}
fn process_remaining_absent_timestamps(&mut self) -> DataFusionResult<()> {
// Generate all remaining absent timestamps (input is finished)
while self.output_ts_cursor <= self.end {
self.output_timestamps.push(self.output_ts_cursor);
self.output_ts_cursor += self.step;
}
Ok(())
}
fn flush_output_batch(&mut self) -> DataFusionResult<Option<RecordBatch>> {
if self.output_timestamps.is_empty() {
return Ok(None);
}
let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.output_schema.fields().len());
let num_rows = self.output_timestamps.len();
columns.push(Arc::new(TimestampMillisecondArray::from(
self.output_timestamps.clone(),
)) as _);
columns.push(Arc::new(Float64Array::from(vec![1.0; num_rows])) as _);
for (_, value) in self.fake_labels.iter() {
columns.push(Arc::new(StringArray::from_iter(std::iter::repeat_n(
Some(value.clone()),
num_rows,
))) as _);
}
let batch = RecordBatch::try_new(self.output_schema.clone(), columns)?;
self.output_timestamps.clear();
Ok(Some(batch))
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::prelude::SessionContext;
use datatypes::arrow::array::{Float64Array, TimestampMillisecondArray};
use super::*;
#[tokio::test]
async fn test_absent_basic() {
let schema = Arc::new(Schema::new(vec![
Field::new(
"timestamp",
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
Field::new("value", DataType::Float64, true),
]));
// Input has timestamps: 0, 2000, 4000
let timestamp_array = Arc::new(TimestampMillisecondArray::from(vec![0, 2000, 4000]));
let value_array = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0]));
let batch =
RecordBatch::try_new(schema.clone(), vec![timestamp_array, value_array]).unwrap();
let memory_exec = MemoryExec::try_new(&[vec![batch]], schema, None).unwrap();
let output_schema = Arc::new(Schema::new(vec![
Field::new(
"timestamp",
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
Field::new("value", DataType::Float64, true),
]));
let absent_exec = AbsentExec {
start: 0,
end: 5000,
step: 1000,
time_index_column: "timestamp".to_string(),
value_column: "value".to_string(),
fake_labels: vec![],
output_schema: output_schema.clone(),
input: Arc::new(memory_exec),
properties: PlanProperties::new(
EquivalenceProperties::new(output_schema.clone()),
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Bounded,
),
metric: ExecutionPlanMetricsSet::new(),
};
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let mut stream = absent_exec.execute(0, task_ctx).unwrap();
// Collect all output batches
let mut output_timestamps = Vec::new();
while let Some(batch_result) = stream.next().await {
let batch = batch_result.unwrap();
let ts_array = batch
.column(0)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
for i in 0..ts_array.len() {
if !ts_array.is_null(i) {
let ts = ts_array.value(i);
output_timestamps.push(ts);
}
}
}
// Should output absent timestamps: 1000, 3000, 5000
// (0, 2000, 4000 exist in input, so 1000, 3000, 5000 are absent)
assert_eq!(output_timestamps, vec![1000, 3000, 5000]);
}
#[tokio::test]
async fn test_absent_empty_input() {
let schema = Arc::new(Schema::new(vec![
Field::new(
"timestamp",
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
Field::new("value", DataType::Float64, true),
]));
// Empty input
let memory_exec = MemoryExec::try_new(&[vec![]], schema, None).unwrap();
let output_schema = Arc::new(Schema::new(vec![
Field::new(
"timestamp",
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
Field::new("value", DataType::Float64, true),
]));
let absent_exec = AbsentExec {
start: 0,
end: 2000,
step: 1000,
time_index_column: "timestamp".to_string(),
value_column: "value".to_string(),
fake_labels: vec![],
output_schema: output_schema.clone(),
input: Arc::new(memory_exec),
properties: PlanProperties::new(
EquivalenceProperties::new(output_schema.clone()),
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Bounded,
),
metric: ExecutionPlanMetricsSet::new(),
};
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let mut stream = absent_exec.execute(0, task_ctx).unwrap();
// Collect all output timestamps
let mut output_timestamps = Vec::new();
while let Some(batch_result) = stream.next().await {
let batch = batch_result.unwrap();
let ts_array = batch
.column(0)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
for i in 0..ts_array.len() {
if !ts_array.is_null(i) {
let ts = ts_array.value(i);
output_timestamps.push(ts);
}
}
}
// Should output all timestamps in range: 0, 1000, 2000
assert_eq!(output_timestamps, vec![0, 1000, 2000]);
}
}

View File

@@ -22,8 +22,8 @@ use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
use crate::extension_plan::{
EmptyMetric, HistogramFold, InstantManipulate, RangeManipulate, ScalarCalculate, SeriesDivide,
SeriesNormalize, UnionDistinctOn,
Absent, EmptyMetric, HistogramFold, InstantManipulate, RangeManipulate, ScalarCalculate,
SeriesDivide, SeriesNormalize, UnionDistinctOn,
};
pub struct PromExtensionPlanner;
@@ -57,6 +57,8 @@ impl ExtensionPlanner for PromExtensionPlanner {
physical_inputs[0].clone(),
physical_inputs[1].clone(),
)))
} else if let Some(node) = node.as_any().downcast_ref::<Absent>() {
Ok(Some(node.to_execution_plan(physical_inputs[0].clone())))
} else {
Ok(None)
}

View File

@@ -27,6 +27,7 @@ use datafusion::datasource::DefaultTableSource;
use datafusion::execution::context::SessionState;
use datafusion::functions_aggregate::average::avg_udaf;
use datafusion::functions_aggregate::count::count_udaf;
use datafusion::functions_aggregate::expr_fn::first_value;
use datafusion::functions_aggregate::grouping::grouping_udaf;
use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
use datafusion::functions_aggregate::stddev::stddev_pop_udaf;
@@ -50,7 +51,7 @@ use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTi
use datatypes::data_type::ConcreteDataType;
use itertools::Itertools;
use promql::extension_plan::{
build_special_time_expr, EmptyMetric, HistogramFold, InstantManipulate, Millisecond,
build_special_time_expr, Absent, EmptyMetric, HistogramFold, InstantManipulate, Millisecond,
RangeManipulate, ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn,
};
use promql::functions::{
@@ -86,6 +87,8 @@ use crate::promql::error::{
const SPECIAL_TIME_FUNCTION: &str = "time";
/// `scalar()` function in PromQL.
const SCALAR_FUNCTION: &str = "scalar";
/// `absent()` function in PromQL
const SPECIAL_ABSENT_FUNCTION: &str = "absent";
/// `histogram_quantile` function in PromQL
const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile";
/// `vector` function in PromQL
@@ -124,7 +127,10 @@ struct PromPlannerContext {
time_index_column: Option<String>,
field_columns: Vec<String>,
tag_columns: Vec<String>,
/// The matcher for field columns `__field__`.
field_column_matcher: Option<Vec<Matcher>>,
/// The matcher for selectors (normal matchers).
selector_matcher: Vec<Matcher>,
schema_name: Option<String>,
/// The range in millisecond of range selector. None if there is no range selector.
range: Option<Millisecond>,
@@ -148,6 +154,7 @@ impl PromPlannerContext {
self.field_columns = vec![];
self.tag_columns = vec![];
self.field_column_matcher = None;
self.selector_matcher.clear();
self.schema_name = None;
self.range = None;
}
@@ -830,6 +837,7 @@ impl PromPlanner {
}
SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await,
SCALAR_FUNCTION => return self.create_scalar_plan(args, session_state).await,
SPECIAL_ABSENT_FUNCTION => return self.create_absent_plan(args, session_state).await,
_ => {}
}
@@ -1001,6 +1009,7 @@ impl PromPlanner {
);
self.ctx.schema_name = Some(matcher.value.clone());
} else if matcher.name != METRIC_NAME {
self.ctx.selector_matcher.push(matcher.clone());
let _ = matchers.insert(matcher.clone());
}
}
@@ -1246,6 +1255,13 @@ impl PromPlanner {
) -> Result<Vec<DfExpr>> {
let mut exprs = Vec::with_capacity(label_matchers.matchers.len());
for matcher in label_matchers.matchers {
if matcher.name == SCHEMA_COLUMN_MATCHER
|| matcher.name == DB_COLUMN_MATCHER
|| matcher.name == FIELD_COLUMN_MATCHER
{
continue;
}
let col = if table_schema
.field_with_unqualified_name(&matcher.name)
.is_err()
@@ -2449,6 +2465,69 @@ impl PromPlanner {
Ok(scalar_plan)
}
/// Create a [SPECIAL_ABSENT_FUNCTION] plan
async fn create_absent_plan(
&mut self,
args: &PromFunctionArgs,
session_state: &SessionState,
) -> Result<LogicalPlan> {
if args.args.len() != 1 {
return FunctionInvalidArgumentSnafu {
fn_name: SPECIAL_ABSENT_FUNCTION.to_string(),
}
.fail();
}
let input = self.prom_expr_to_plan(&args.args[0], session_state).await?;
let time_index_expr = self.create_time_index_column_expr()?;
let first_field_expr =
self.create_field_column_exprs()?
.pop()
.with_context(|| ValueNotFoundSnafu {
table: self.ctx.table_name.clone().unwrap_or_default(),
})?;
let first_value_expr = first_value(first_field_expr, None);
let ordered_aggregated_input = LogicalPlanBuilder::from(input)
.aggregate(
vec![time_index_expr.clone()],
vec![first_value_expr.clone()],
)
.context(DataFusionPlanningSnafu)?
.sort(vec![time_index_expr.sort(true, false)])
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)?;
let fake_labels = self
.ctx
.selector_matcher
.iter()
.filter_map(|matcher| match matcher.op {
MatchOp::Equal => Some((matcher.name.clone(), matcher.value.clone())),
_ => None,
})
.collect::<Vec<_>>();
// Create the absent plan
let absent_plan = LogicalPlan::Extension(Extension {
node: Arc::new(
Absent::try_new(
self.ctx.start,
self.ctx.end,
self.ctx.interval,
self.ctx.time_index_column.as_ref().unwrap().clone(),
self.ctx.field_columns[0].clone(),
fake_labels,
ordered_aggregated_input,
)
.context(DataFusionPlanningSnafu)?,
),
});
Ok(absent_plan)
}
/// Try to build a DataFusion Literal Expression from PromQL Expr, return
/// `None` if the input is not a literal expression.
fn try_build_literal_expr(expr: &PromExpr) -> Option<DfExpr> {

View File

@@ -121,7 +121,7 @@ impl PrometheusGatewayService {
let result = self.handler.do_query(&query, ctx).await;
let (metric_name, mut result_type) =
match retrieve_metric_name_and_result_type(&query.query) {
Ok((metric_name, result_type)) => (metric_name.unwrap_or_default(), result_type),
Ok((metric_name, result_type)) => (metric_name, result_type),
Err(err) => {
return PrometheusJsonResponse::error(err.status_code(), err.output_msg())
}

View File

@@ -38,7 +38,7 @@ use crate::error::{self, InternalSnafu, PipelineSnafu, Result};
use crate::http::extractor::PipelineInfo;
use crate::http::header::{write_cost_header_map, GREPTIME_DB_HEADER_METRICS};
use crate::http::PromValidationMode;
use crate::prom_store::{snappy_decompress, zstd_decompress};
use crate::prom_store::{extract_schema_from_read_request, snappy_decompress, zstd_decompress};
use crate::proto::{PromSeriesProcessor, PromWriteRequest};
use crate::query_handler::{PipelineHandlerRef, PromStoreProtocolHandlerRef, PromStoreResponse};
@@ -117,6 +117,7 @@ pub async fn remote_write(
let is_zstd = content_encoding.contains(VM_ENCODING);
let mut processor = PromSeriesProcessor::default_processor();
if let Some(pipeline_name) = pipeline_info.pipeline_name {
let pipeline_def = PipelineDefinition::from_name(
&pipeline_name,
@@ -184,13 +185,19 @@ pub async fn remote_read(
) -> Result<PromStoreResponse> {
let db = params.db.clone().unwrap_or_default();
query_ctx.set_channel(Channel::Prometheus);
let request = decode_remote_read_request(body).await?;
// Extract schema from special labels and set it in query context
if let Some(schema) = extract_schema_from_read_request(&request) {
query_ctx.set_current_schema(&schema);
}
let query_ctx = Arc::new(query_ctx);
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_READ_ELAPSED
.with_label_values(&[db.as_str()])
.start_timer();
let request = decode_remote_read_request(body).await?;
state.prom_store_handler.read(request, query_ctx).await
}

View File

@@ -56,7 +56,7 @@ use crate::error::{
TableNotFoundSnafu, UnexpectedResultSnafu,
};
use crate::http::header::collect_plan_metrics;
use crate::prom_store::{FIELD_NAME_LABEL, METRIC_NAME_LABEL};
use crate::prom_store::{DATABASE_LABEL, FIELD_NAME_LABEL, METRIC_NAME_LABEL, SCHEMA_LABEL};
use crate::prometheus_handler::PrometheusHandlerRef;
/// For [ValueType::Vector] result type
@@ -318,7 +318,7 @@ async fn do_instant_query(
) -> PrometheusJsonResponse {
let result = handler.do_query(prom_query, query_ctx).await;
let (metric_name, result_type) = match retrieve_metric_name_and_result_type(&prom_query.query) {
Ok((metric_name, result_type)) => (metric_name.unwrap_or_default(), result_type),
Ok((metric_name, result_type)) => (metric_name, result_type),
Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
};
PrometheusJsonResponse::from_query_result(result, metric_name, result_type).await
@@ -428,7 +428,7 @@ async fn do_range_query(
let result = handler.do_query(prom_query, query_ctx).await;
let metric_name = match retrieve_metric_name_and_result_type(&prom_query.query) {
Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
Ok((metric_name, _)) => metric_name.unwrap_or_default(),
Ok((metric_name, _)) => metric_name,
};
PrometheusJsonResponse::from_query_result(result, metric_name, ValueType::Matrix).await
}
@@ -824,13 +824,52 @@ pub(crate) fn try_update_catalog_schema(ctx: &mut QueryContext, catalog: &str, s
}
fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
find_metric_name_and_matchers(expr, |name, matchers| {
name.clone().or(matchers
.find_matchers(METRIC_NAME)
.into_iter()
.next()
.map(|m| m.value))
})
let mut metric_names = HashSet::new();
collect_metric_names(expr, &mut metric_names);
// Return the metric name only if there's exactly one unique metric name
if metric_names.len() == 1 {
metric_names.into_iter().next()
} else {
None
}
}
/// Recursively collect all metric names from a PromQL expression
fn collect_metric_names(expr: &PromqlExpr, metric_names: &mut HashSet<String>) {
match expr {
PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => {
collect_metric_names(expr, metric_names)
}
PromqlExpr::Unary(UnaryExpr { expr }) => collect_metric_names(expr, metric_names),
PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
collect_metric_names(lhs, metric_names);
collect_metric_names(rhs, metric_names);
}
PromqlExpr::Paren(ParenExpr { expr }) => collect_metric_names(expr, metric_names),
PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => collect_metric_names(expr, metric_names),
PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
if let Some(name) = name {
metric_names.insert(name.clone());
} else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
metric_names.insert(matcher.value);
}
}
PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
let VectorSelector { name, matchers, .. } = vs;
if let Some(name) = name {
metric_names.insert(name.clone());
} else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
metric_names.insert(matcher.value);
}
}
PromqlExpr::Call(Call { args, .. }) => {
args.args
.iter()
.for_each(|e| collect_metric_names(e, metric_names));
}
PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
}
}
fn find_metric_name_and_matchers<E, F>(expr: &PromqlExpr, f: F) -> Option<E>
@@ -995,6 +1034,19 @@ pub async fn label_values_query(
let mut field_columns = field_columns.into_iter().collect::<Vec<_>>();
field_columns.sort_unstable();
return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(field_columns));
} else if label_name == SCHEMA_LABEL || label_name == DATABASE_LABEL {
let catalog_manager = handler.catalog_manager();
match retrieve_schema_names(&query_ctx, catalog_manager, params.matches.0).await {
Ok(schema_names) => {
return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(
schema_names,
));
}
Err(e) => {
return PrometheusJsonResponse::error(e.status_code(), e.output_msg());
}
}
}
let queries = params.matches.0;
@@ -1112,53 +1164,51 @@ async fn retrieve_field_names(
Ok(field_columns)
}
/// Try to parse and extract the name of referenced metric from the promql query.
///
/// Returns the metric name if a single metric is referenced, otherwise None.
fn retrieve_metric_name_from_promql(query: &str) -> Option<String> {
let promql_expr = promql_parser::parser::parse(query).ok()?;
async fn retrieve_schema_names(
query_ctx: &QueryContext,
catalog_manager: CatalogManagerRef,
matches: Vec<String>,
) -> Result<Vec<String>> {
let mut schemas = Vec::new();
let catalog = query_ctx.current_catalog();
struct MetricNameVisitor {
metric_name: Option<String>,
}
let candidate_schemas = catalog_manager
.schema_names(catalog, Some(query_ctx))
.await
.context(CatalogSnafu)?;
impl promql_parser::util::ExprVisitor for MetricNameVisitor {
type Error = ();
fn pre_visit(&mut self, plan: &PromqlExpr) -> std::result::Result<bool, Self::Error> {
let query_metric_name = match plan {
PromqlExpr::VectorSelector(vs) => vs
.matchers
.find_matchers(METRIC_NAME)
.into_iter()
.next()
.map(|m| m.value)
.or_else(|| vs.name.clone()),
PromqlExpr::MatrixSelector(ms) => ms
.vs
.matchers
.find_matchers(METRIC_NAME)
.into_iter()
.next()
.map(|m| m.value)
.or_else(|| ms.vs.name.clone()),
_ => return Ok(true),
};
// set it to empty string if multiple metrics are referenced.
if self.metric_name.is_some() && query_metric_name.is_some() {
self.metric_name = Some(String::new());
} else {
self.metric_name = query_metric_name.or_else(|| self.metric_name.clone());
for schema in candidate_schemas {
let mut found = true;
for match_item in &matches {
if let Some(table_name) = retrieve_metric_name_from_promql(match_item) {
let exists = catalog_manager
.table_exists(catalog, &schema, &table_name, Some(query_ctx))
.await
.context(CatalogSnafu)?;
if !exists {
found = false;
break;
}
}
}
Ok(true)
if found {
schemas.push(schema);
}
}
let mut visitor = MetricNameVisitor { metric_name: None };
promql_parser::util::walk_expr(&mut visitor, &promql_expr).ok()?;
visitor.metric_name
schemas.sort_unstable();
Ok(schemas)
}
/// Try to parse and extract the name of referenced metric from the promql query.
///
/// Returns the metric name if exactly one unique metric is referenced, otherwise None.
/// Multiple references to the same metric are allowed.
fn retrieve_metric_name_from_promql(query: &str) -> Option<String> {
let promql_expr = promql_parser::parser::parse(query).ok()?;
promql_expr_to_metric_name(&promql_expr)
}
#[derive(Debug, Default, Serialize, Deserialize)]
@@ -1275,3 +1325,205 @@ pub async fn parse_query(
PrometheusJsonResponse::error(StatusCode::InvalidArguments, "query is required")
}
}
#[cfg(test)]
mod tests {
use promql_parser::parser::value::ValueType;
use super::*;
struct TestCase {
name: &'static str,
promql: &'static str,
expected_metric: Option<&'static str>,
expected_type: ValueType,
should_error: bool,
}
#[test]
fn test_retrieve_metric_name_and_result_type() {
let test_cases = &[
// Single metric cases
TestCase {
name: "simple metric",
promql: "cpu_usage",
expected_metric: Some("cpu_usage"),
expected_type: ValueType::Vector,
should_error: false,
},
TestCase {
name: "metric with selector",
promql: r#"cpu_usage{instance="localhost"}"#,
expected_metric: Some("cpu_usage"),
expected_type: ValueType::Vector,
should_error: false,
},
TestCase {
name: "metric with range selector",
promql: "cpu_usage[5m]",
expected_metric: Some("cpu_usage"),
expected_type: ValueType::Matrix,
should_error: false,
},
TestCase {
name: "metric with __name__ matcher",
promql: r#"{__name__="cpu_usage"}"#,
expected_metric: Some("cpu_usage"),
expected_type: ValueType::Vector,
should_error: false,
},
TestCase {
name: "metric with unary operator",
promql: "-cpu_usage",
expected_metric: Some("cpu_usage"),
expected_type: ValueType::Vector,
should_error: false,
},
// Aggregation and function cases
TestCase {
name: "metric with aggregation",
promql: "sum(cpu_usage)",
expected_metric: Some("cpu_usage"),
expected_type: ValueType::Vector,
should_error: false,
},
TestCase {
name: "complex aggregation",
promql: r#"sum by (instance) (cpu_usage{job="node"})"#,
expected_metric: Some("cpu_usage"),
expected_type: ValueType::Vector,
should_error: false,
},
// Same metric binary operations
TestCase {
name: "same metric addition",
promql: "cpu_usage + cpu_usage",
expected_metric: Some("cpu_usage"),
expected_type: ValueType::Vector,
should_error: false,
},
TestCase {
name: "metric with scalar addition",
promql: r#"sum(rate(cpu_usage{job="node"}[5m])) by (instance) + 100"#,
expected_metric: Some("cpu_usage"),
expected_type: ValueType::Vector,
should_error: false,
},
// Multiple metrics cases
TestCase {
name: "different metrics addition",
promql: "cpu_usage + memory_usage",
expected_metric: None,
expected_type: ValueType::Vector,
should_error: false,
},
TestCase {
name: "different metrics subtraction",
promql: "network_in - network_out",
expected_metric: None,
expected_type: ValueType::Vector,
should_error: false,
},
// Unless operator cases
TestCase {
name: "unless with different metrics",
promql: "cpu_usage unless memory_usage",
expected_metric: None,
expected_type: ValueType::Vector,
should_error: false,
},
TestCase {
name: "unless with same metric",
promql: "cpu_usage unless cpu_usage",
expected_metric: Some("cpu_usage"),
expected_type: ValueType::Vector,
should_error: false,
},
// Subquery cases
TestCase {
name: "basic subquery",
promql: "cpu_usage[5m:1m]",
expected_metric: Some("cpu_usage"),
expected_type: ValueType::Matrix,
should_error: false,
},
TestCase {
name: "subquery with multiple metrics",
promql: "(cpu_usage + memory_usage)[5m:1m]",
expected_metric: None,
expected_type: ValueType::Matrix,
should_error: false,
},
// Literal values
TestCase {
name: "scalar value",
promql: "42",
expected_metric: None,
expected_type: ValueType::Scalar,
should_error: false,
},
TestCase {
name: "string literal",
promql: r#""hello world""#,
expected_metric: None,
expected_type: ValueType::String,
should_error: false,
},
// Error cases
TestCase {
name: "invalid syntax",
promql: "cpu_usage{invalid=",
expected_metric: None,
expected_type: ValueType::Vector,
should_error: true,
},
TestCase {
name: "empty query",
promql: "",
expected_metric: None,
expected_type: ValueType::Vector,
should_error: true,
},
TestCase {
name: "malformed brackets",
promql: "cpu_usage[5m",
expected_metric: None,
expected_type: ValueType::Vector,
should_error: true,
},
];
for test_case in test_cases {
let result = retrieve_metric_name_and_result_type(test_case.promql);
if test_case.should_error {
assert!(
result.is_err(),
"Test '{}' should have failed but succeeded with: {:?}",
test_case.name,
result
);
} else {
let (metric_name, value_type) = result.unwrap_or_else(|e| {
panic!(
"Test '{}' should have succeeded but failed with error: {}",
test_case.name, e
)
});
let expected_metric_name = test_case.expected_metric.map(|s| s.to_string());
assert_eq!(
metric_name, expected_metric_name,
"Test '{}': metric name mismatch. Expected: {:?}, Got: {:?}",
test_case.name, expected_metric_name, metric_name
);
assert_eq!(
value_type, test_case.expected_type,
"Test '{}': value type mismatch. Expected: {:?}, Got: {:?}",
test_case.name, test_case.expected_type, value_type
);
}
}
}
}

View File

@@ -118,7 +118,7 @@ impl PrometheusJsonResponse {
/// Convert from `Result<Output>`
pub async fn from_query_result(
result: Result<Output>,
metric_name: String,
metric_name: Option<String>,
result_type: ValueType,
) -> Self {
let response: Result<Self> = try {
@@ -182,7 +182,7 @@ impl PrometheusJsonResponse {
/// Convert [RecordBatches] to [PromData]
fn record_batches_to_data(
batches: RecordBatches,
metric_name: String,
metric_name: Option<String>,
result_type: ValueType,
) -> Result<PrometheusResponse> {
// infer semantic type of each column from schema.
@@ -230,7 +230,6 @@ impl PrometheusJsonResponse {
reason: "no value column found".to_string(),
})?;
let metric_name = (METRIC_NAME, metric_name.as_str());
// Preserves the order of output tags.
// Tag order matters, e.g., after sorc and sort_desc, the output order must be kept.
let mut buffer = IndexMap::<Vec<(&str, &str)>, Vec<(f64, String)>>::new();
@@ -276,9 +275,10 @@ impl PrometheusJsonResponse {
}
// retrieve tags
// TODO(ruihang): push table name `__metric__`
let mut tags = Vec::with_capacity(num_label_columns + 1);
tags.push(metric_name);
if let Some(metric_name) = &metric_name {
tags.push((METRIC_NAME, metric_name.as_str()));
}
for (tag_column, tag_name) in tag_columns.iter().zip(tag_names.iter()) {
// TODO(ruihang): add test for NULL tag
if let Some(tag_value) = tag_column.get_data(row_index) {

View File

@@ -19,7 +19,7 @@ use std::collections::BTreeMap;
use std::hash::{Hash, Hasher};
use api::prom_store::remote::label_matcher::Type as MatcherType;
use api::prom_store::remote::{Label, Query, Sample, TimeSeries, WriteRequest};
use api::prom_store::remote::{Label, Query, ReadRequest, Sample, TimeSeries, WriteRequest};
use api::v1::RowInsertRequests;
use common_grpc::precision::Precision;
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
@@ -44,6 +44,9 @@ pub const METRIC_NAME_LABEL_BYTES: &[u8] = b"__name__";
pub const DATABASE_LABEL: &str = "__database__";
pub const DATABASE_LABEL_BYTES: &[u8] = b"__database__";
pub const SCHEMA_LABEL: &str = "__schema__";
pub const SCHEMA_LABEL_BYTES: &[u8] = b"__schema__";
pub const PHYSICAL_TABLE_LABEL: &str = "__physical_table__";
pub const PHYSICAL_TABLE_LABEL_BYTES: &[u8] = b"__physical_table__";
@@ -73,6 +76,29 @@ pub fn table_name(q: &Query) -> Result<String> {
})
}
/// Extract schema from remote read request. Returns the first schema found from any query's matchers.
/// Prioritizes __schema__ over __database__ labels.
pub fn extract_schema_from_read_request(request: &ReadRequest) -> Option<String> {
for query in &request.queries {
for matcher in &query.matchers {
if matcher.name == SCHEMA_LABEL && matcher.r#type == MatcherType::Eq as i32 {
return Some(matcher.value.clone());
}
}
}
// If no __schema__ found, look for __database__
for query in &request.queries {
for matcher in &query.matchers {
if matcher.name == DATABASE_LABEL && matcher.r#type == MatcherType::Eq as i32 {
return Some(matcher.value.clone());
}
}
}
None
}
/// Create a DataFrame from a remote Query
#[tracing::instrument(skip_all)]
pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result<LogicalPlan> {
@@ -91,7 +117,7 @@ pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result<LogicalPlan> {
for m in label_matches {
let name = &m.name;
if name == METRIC_NAME_LABEL {
if name == METRIC_NAME_LABEL || name == SCHEMA_LABEL || name == DATABASE_LABEL {
continue;
}

View File

@@ -34,7 +34,7 @@ use crate::http::PromValidationMode;
use crate::pipeline::run_pipeline;
use crate::prom_row_builder::{PromCtx, TablesBuilder};
use crate::prom_store::{
DATABASE_LABEL_BYTES, METRIC_NAME_LABEL_BYTES, PHYSICAL_TABLE_LABEL_BYTES,
DATABASE_LABEL_BYTES, METRIC_NAME_LABEL_BYTES, PHYSICAL_TABLE_LABEL_BYTES, SCHEMA_LABEL_BYTES,
};
use crate::query_handler::PipelineHandlerRef;
use crate::repeated_field::{Clear, RepeatedField};
@@ -199,10 +199,17 @@ impl PromTimeSeries {
self.table_name = decode_string(&label.value, prom_validation_mode)?;
self.labels.truncate(self.labels.len() - 1); // remove last label
}
DATABASE_LABEL_BYTES => {
SCHEMA_LABEL_BYTES => {
self.schema = Some(decode_string(&label.value, prom_validation_mode)?);
self.labels.truncate(self.labels.len() - 1); // remove last label
}
DATABASE_LABEL_BYTES => {
// Only set schema from __database__ if __schema__ hasn't been set yet
if self.schema.is_none() {
self.schema = Some(decode_string(&label.value, prom_validation_mode)?);
}
self.labels.truncate(self.labels.len() - 1); // remove last label
}
PHYSICAL_TABLE_LABEL_BYTES => {
self.physical_table =
Some(decode_string(&label.value, prom_validation_mode)?);

View File

@@ -16,7 +16,10 @@ use std::collections::BTreeMap;
use std::io::Write;
use std::str::FromStr;
use api::prom_store::remote::WriteRequest;
use api::prom_store::remote::label_matcher::Type as MatcherType;
use api::prom_store::remote::{
Label, LabelMatcher, Query, ReadRequest, ReadResponse, Sample, TimeSeries, WriteRequest,
};
use auth::user_provider_from_option;
use axum::http::{HeaderName, HeaderValue, StatusCode};
use chrono::Utc;
@@ -94,6 +97,7 @@ macro_rules! http_tests {
test_dashboard_path,
test_prometheus_remote_write,
test_prometheus_remote_special_labels,
test_prometheus_remote_schema_labels,
test_prometheus_remote_write_with_pipeline,
test_vm_proto_remote_write,
@@ -781,6 +785,89 @@ pub async fn test_prom_http_api(store_type: StorageType) {
serde_json::from_value::<PrometheusResponse>(json!(["host1", "host2"])).unwrap()
);
// special labels
let res = client
.get("/v1/prometheus/api/v1/label/__schema__/values?start=0&end=600")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PrometheusResponse>(json!([
"greptime_private",
"information_schema",
"public"
]))
.unwrap()
);
// special labels
let res = client
.get("/v1/prometheus/api/v1/label/__schema__/values?match[]=demo&start=0&end=600")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PrometheusResponse>(json!(["public"])).unwrap()
);
// special labels
let res = client
.get("/v1/prometheus/api/v1/label/__database__/values?match[]=demo&start=0&end=600")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PrometheusResponse>(json!(["public"])).unwrap()
);
// special labels
let res = client
.get("/v1/prometheus/api/v1/label/__database__/values?match[]=multi_labels{idc=\"idc1\", env=\"dev\"}&start=0&end=600")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PrometheusResponse>(json!(["public"])).unwrap()
);
// match special labels.
let res = client
.get("/v1/prometheus/api/v1/label/host/values?match[]=multi_labels{__schema__=\"public\", idc=\"idc1\", env=\"dev\"}&start=0&end=600")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PrometheusResponse>(json!(["host1", "host2"])).unwrap()
);
// match special labels.
let res = client
.get("/v1/prometheus/api/v1/label/host/values?match[]=multi_labels{__schema__=\"information_schema\", idc=\"idc1\", env=\"dev\"}&start=0&end=600")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PrometheusResponse>(json!([])).unwrap()
);
// search field name
let res = client
.get("/v1/prometheus/api/v1/label/__field__/values?match[]=demo")
@@ -1138,6 +1225,7 @@ write_cache_path = ""
write_cache_size = "5GiB"
sst_write_buffer_size = "8MiB"
parallel_scan_channel_size = 32
max_concurrent_scan_files = 128
allow_stale_entries = false
min_compaction_interval = "0s"
@@ -1465,6 +1553,188 @@ pub async fn test_prometheus_remote_write_with_pipeline(store_type: StorageType)
guard.remove_all().await;
}
pub async fn test_prometheus_remote_schema_labels(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =
setup_test_prom_app_with_frontend(store_type, "test_prometheus_remote_schema_labels").await;
let client = TestClient::new(app).await;
// Create test schemas
let res = client
.post("/v1/sql?sql=create database test_schema_1")
.header("Content-Type", "application/x-www-form-urlencoded")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let res = client
.post("/v1/sql?sql=create database test_schema_2")
.header("Content-Type", "application/x-www-form-urlencoded")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
// Write data with __schema__ label
let schema_series = TimeSeries {
labels: vec![
Label {
name: "__name__".to_string(),
value: "metric_with_schema".to_string(),
},
Label {
name: "__schema__".to_string(),
value: "test_schema_1".to_string(),
},
Label {
name: "instance".to_string(),
value: "host1".to_string(),
},
],
samples: vec![Sample {
value: 100.0,
timestamp: 1000,
}],
..Default::default()
};
let write_request = WriteRequest {
timeseries: vec![schema_series],
..Default::default()
};
let serialized_request = write_request.encode_to_vec();
let compressed_request =
prom_store::snappy_compress(&serialized_request).expect("failed to encode snappy");
let res = client
.post("/v1/prometheus/write")
.header("Content-Encoding", "snappy")
.body(compressed_request)
.send()
.await;
assert_eq!(res.status(), StatusCode::NO_CONTENT);
// Read data from test_schema_1 using __schema__ matcher
let read_request = ReadRequest {
queries: vec![Query {
start_timestamp_ms: 500,
end_timestamp_ms: 1500,
matchers: vec![
LabelMatcher {
name: "__name__".to_string(),
value: "metric_with_schema".to_string(),
r#type: MatcherType::Eq as i32,
},
LabelMatcher {
name: "__schema__".to_string(),
value: "test_schema_1".to_string(),
r#type: MatcherType::Eq as i32,
},
],
..Default::default()
}],
..Default::default()
};
let serialized_read_request = read_request.encode_to_vec();
let compressed_read_request =
prom_store::snappy_compress(&serialized_read_request).expect("failed to encode snappy");
let mut result = client
.post("/v1/prometheus/read")
.body(compressed_read_request)
.send()
.await;
assert_eq!(result.status(), StatusCode::OK);
let response_body = result.chunk().await.unwrap();
let decompressed_response = prom_store::snappy_decompress(&response_body).unwrap();
let read_response = ReadResponse::decode(&decompressed_response[..]).unwrap();
assert_eq!(read_response.results.len(), 1);
assert_eq!(read_response.results[0].timeseries.len(), 1);
let timeseries = &read_response.results[0].timeseries[0];
assert_eq!(timeseries.samples.len(), 1);
assert_eq!(timeseries.samples[0].value, 100.0);
assert_eq!(timeseries.samples[0].timestamp, 1000);
// write data to unknown schema
let unknown_schema_series = TimeSeries {
labels: vec![
Label {
name: "__name__".to_string(),
value: "metric_unknown_schema".to_string(),
},
Label {
name: "__schema__".to_string(),
value: "unknown_schema".to_string(),
},
Label {
name: "instance".to_string(),
value: "host2".to_string(),
},
],
samples: vec![Sample {
value: 200.0,
timestamp: 2000,
}],
..Default::default()
};
let unknown_write_request = WriteRequest {
timeseries: vec![unknown_schema_series],
..Default::default()
};
let serialized_unknown_request = unknown_write_request.encode_to_vec();
let compressed_unknown_request =
prom_store::snappy_compress(&serialized_unknown_request).expect("failed to encode snappy");
// Write data to unknown schema
let res = client
.post("/v1/prometheus/write")
.header("Content-Encoding", "snappy")
.body(compressed_unknown_request)
.send()
.await;
assert_eq!(res.status(), StatusCode::BAD_REQUEST);
// Read data from unknown schema
let unknown_read_request = ReadRequest {
queries: vec![Query {
start_timestamp_ms: 1500,
end_timestamp_ms: 2500,
matchers: vec![
LabelMatcher {
name: "__name__".to_string(),
value: "metric_unknown_schema".to_string(),
r#type: MatcherType::Eq as i32,
},
LabelMatcher {
name: "__schema__".to_string(),
value: "unknown_schema".to_string(),
r#type: MatcherType::Eq as i32,
},
],
..Default::default()
}],
..Default::default()
};
let serialized_unknown_read_request = unknown_read_request.encode_to_vec();
let compressed_unknown_read_request =
prom_store::snappy_compress(&serialized_unknown_read_request)
.expect("failed to encode snappy");
let unknown_result = client
.post("/v1/prometheus/read")
.body(compressed_unknown_read_request)
.send()
.await;
assert_eq!(unknown_result.status(), StatusCode::BAD_REQUEST);
guard.remove_all().await;
}
pub async fn test_vm_proto_remote_write(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =

View File

@@ -0,0 +1,122 @@
create table t (
ts timestamp(3) time index,
job STRING,
instance STRING,
val DOUBLE,
PRIMARY KEY(job, instance),
);
Affected Rows: 0
insert into t values
(0, 'job1', 'instance1', 1),
(0, 'job2', 'instance2', 2),
(5000, 'job1', 'instance3',3),
(5000, 'job2', 'instance4',4),
(10000, 'job1', 'instance5',5),
(10000, 'job2', 'instance6',6),
(15000, 'job1', 'instance7',7),
(15000, 'job2', 'instance8',8);
Affected Rows: 8
-- SQLNESS SORT_RESULT 3 1
tql eval (0, 15, '5s') absent(t{job="job1"});
++
++
-- SQLNESS SORT_RESULT 3 1
tql eval (0, 15, '5s') absent(t{job="job2"});
++
++
-- SQLNESS SORT_RESULT 3 1
tql eval (0, 15, '5s') absent(t{job="job3"});
+---------------------+-----+------+
| ts | val | job |
+---------------------+-----+------+
| 1970-01-01T00:00:00 | 1.0 | job3 |
| 1970-01-01T00:00:05 | 1.0 | job3 |
| 1970-01-01T00:00:10 | 1.0 | job3 |
| 1970-01-01T00:00:15 | 1.0 | job3 |
+---------------------+-----+------+
-- SQLNESS SORT_RESULT 3 1
tql eval (0, 15, '5s') absent(nonexistent_table);
+---------------------+-------+
| time | value |
+---------------------+-------+
| 1970-01-01T00:00:00 | 1.0 |
| 1970-01-01T00:00:05 | 1.0 |
| 1970-01-01T00:00:10 | 1.0 |
| 1970-01-01T00:00:15 | 1.0 |
+---------------------+-------+
-- SQLNESS SORT_RESULT 3 1
tql eval (0, 15, '5s') absent(t{job="nonexistent_job"});
+---------------------+-----+-----------------+
| ts | val | job |
+---------------------+-----+-----------------+
| 1970-01-01T00:00:00 | 1.0 | nonexistent_job |
| 1970-01-01T00:00:05 | 1.0 | nonexistent_job |
| 1970-01-01T00:00:10 | 1.0 | nonexistent_job |
| 1970-01-01T00:00:15 | 1.0 | nonexistent_job |
+---------------------+-----+-----------------+
-- SQLNESS SORT_RESULT 3 1
tql eval (1000, 1000, '1s') absent(t{job="job1"});
+---------------------+-----+------+
| ts | val | job |
+---------------------+-----+------+
| 1970-01-01T00:16:40 | 1.0 | job1 |
+---------------------+-----+------+
-- SQLNESS SORT_RESULT 3 1
tql eval (0, 15, '5s') absent(t{job="nonexistent_job1", job="nonexistent_job2"});
+---------------------+-----+------------------+
| ts | val | job |
+---------------------+-----+------------------+
| 1970-01-01T00:00:00 | 1.0 | nonexistent_job2 |
| 1970-01-01T00:00:05 | 1.0 | nonexistent_job2 |
| 1970-01-01T00:00:10 | 1.0 | nonexistent_job2 |
| 1970-01-01T00:00:15 | 1.0 | nonexistent_job2 |
+---------------------+-----+------------------+
-- SQLNESS SORT_RESULT 3 1
tql eval (0, 15, '5s') absent(t{job=~"nonexistent_job1", job!="nonexistent_job2"});
+---------------------+-----+
| ts | val |
+---------------------+-----+
| 1970-01-01T00:00:00 | 1.0 |
| 1970-01-01T00:00:05 | 1.0 |
| 1970-01-01T00:00:10 | 1.0 |
| 1970-01-01T00:00:15 | 1.0 |
+---------------------+-----+
-- SQLNESS SORT_RESULT 3 1
tql eval (0, 15, '5s') sum(t{job="job2"});
+---------------------+------------+
| ts | sum(t.val) |
+---------------------+------------+
| 1970-01-01T00:00:00 | 2.0 |
| 1970-01-01T00:00:05 | 6.0 |
| 1970-01-01T00:00:10 | 12.0 |
| 1970-01-01T00:00:15 | 20.0 |
+---------------------+------------+
-- ABSENT is not supported for aggregation functions for now
-- tql eval (0, 15, '5s') absent(sum(t{job="job2"}));
-- tql eval (0, 15, '5s') absent(sum(t{job="job3"}));
drop table t;
Affected Rows: 0

View File

@@ -0,0 +1,50 @@
create table t (
ts timestamp(3) time index,
job STRING,
instance STRING,
val DOUBLE,
PRIMARY KEY(job, instance),
);
insert into t values
(0, 'job1', 'instance1', 1),
(0, 'job2', 'instance2', 2),
(5000, 'job1', 'instance3',3),
(5000, 'job2', 'instance4',4),
(10000, 'job1', 'instance5',5),
(10000, 'job2', 'instance6',6),
(15000, 'job1', 'instance7',7),
(15000, 'job2', 'instance8',8);
-- SQLNESS SORT_RESULT 3 1
tql eval (0, 15, '5s') absent(t{job="job1"});
-- SQLNESS SORT_RESULT 3 1
tql eval (0, 15, '5s') absent(t{job="job2"});
-- SQLNESS SORT_RESULT 3 1
tql eval (0, 15, '5s') absent(t{job="job3"});
-- SQLNESS SORT_RESULT 3 1
tql eval (0, 15, '5s') absent(nonexistent_table);
-- SQLNESS SORT_RESULT 3 1
tql eval (0, 15, '5s') absent(t{job="nonexistent_job"});
-- SQLNESS SORT_RESULT 3 1
tql eval (1000, 1000, '1s') absent(t{job="job1"});
-- SQLNESS SORT_RESULT 3 1
tql eval (0, 15, '5s') absent(t{job="nonexistent_job1", job="nonexistent_job2"});
-- SQLNESS SORT_RESULT 3 1
tql eval (0, 15, '5s') absent(t{job=~"nonexistent_job1", job!="nonexistent_job2"});
-- SQLNESS SORT_RESULT 3 1
tql eval (0, 15, '5s') sum(t{job="job2"});
-- ABSENT is not supported for aggregation functions for now
-- tql eval (0, 15, '5s') absent(sum(t{job="job2"}));
-- tql eval (0, 15, '5s') absent(sum(t{job="job3"}));
drop table t;