mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-27 16:32:54 +00:00
Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5fc0c5706c | ||
|
|
4d768b2c31 | ||
|
|
b62f219810 | ||
|
|
5d330fad17 | ||
|
|
dfdfae1a7b | ||
|
|
822f0caf4b |
154
Cargo.lock
generated
154
Cargo.lock
generated
@@ -211,7 +211,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
|
||||
|
||||
[[package]]
|
||||
name = "api"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"common-base",
|
||||
"common-decimal",
|
||||
@@ -944,7 +944,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "auth"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -1586,7 +1586,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cache"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"catalog",
|
||||
"common-error",
|
||||
@@ -1621,7 +1621,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
|
||||
|
||||
[[package]]
|
||||
name = "catalog"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow 54.2.1",
|
||||
@@ -1959,7 +1959,7 @@ checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97"
|
||||
|
||||
[[package]]
|
||||
name = "cli"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
@@ -2004,7 +2004,7 @@ dependencies = [
|
||||
"session",
|
||||
"snafu 0.8.5",
|
||||
"store-api",
|
||||
"substrait 0.15.3",
|
||||
"substrait 0.15.4",
|
||||
"table",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
@@ -2013,7 +2013,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "client"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arc-swap",
|
||||
@@ -2043,7 +2043,7 @@ dependencies = [
|
||||
"rand 0.9.0",
|
||||
"serde_json",
|
||||
"snafu 0.8.5",
|
||||
"substrait 0.15.3",
|
||||
"substrait 0.15.4",
|
||||
"substrait 0.37.3",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
@@ -2084,7 +2084,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cmd"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"auth",
|
||||
@@ -2145,7 +2145,7 @@ dependencies = [
|
||||
"snafu 0.8.5",
|
||||
"stat",
|
||||
"store-api",
|
||||
"substrait 0.15.3",
|
||||
"substrait 0.15.4",
|
||||
"table",
|
||||
"temp-env",
|
||||
"tempfile",
|
||||
@@ -2192,7 +2192,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
|
||||
|
||||
[[package]]
|
||||
name = "common-base"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"anymap2",
|
||||
"async-trait",
|
||||
@@ -2214,11 +2214,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-catalog"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
|
||||
[[package]]
|
||||
name = "common-config"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"common-base",
|
||||
"common-error",
|
||||
@@ -2243,7 +2243,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-datasource"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"arrow-schema 54.3.1",
|
||||
@@ -2280,7 +2280,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-decimal"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"bigdecimal 0.4.8",
|
||||
"common-error",
|
||||
@@ -2293,7 +2293,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-error"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"common-macro",
|
||||
"http 1.1.0",
|
||||
@@ -2304,7 +2304,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-frontend"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"common-error",
|
||||
@@ -2320,7 +2320,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-function"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"api",
|
||||
@@ -2373,7 +2373,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-greptimedb-telemetry"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"common-runtime",
|
||||
@@ -2390,7 +2390,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-grpc"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow-flight",
|
||||
@@ -2422,7 +2422,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-grpc-expr"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"common-base",
|
||||
@@ -2441,7 +2441,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-macro"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"common-query",
|
||||
@@ -2455,7 +2455,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-mem-prof"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"common-error",
|
||||
@@ -2471,7 +2471,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-meta"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"anymap2",
|
||||
"api",
|
||||
@@ -2536,7 +2536,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-options"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"common-grpc",
|
||||
"humantime-serde",
|
||||
@@ -2545,11 +2545,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-plugins"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
|
||||
[[package]]
|
||||
name = "common-pprof"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"common-error",
|
||||
"common-macro",
|
||||
@@ -2561,7 +2561,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-procedure"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
@@ -2588,7 +2588,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-procedure-test"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"common-procedure",
|
||||
@@ -2597,7 +2597,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-query"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -2623,7 +2623,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-recordbatch"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"common-error",
|
||||
@@ -2643,7 +2643,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-runtime"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"clap 4.5.19",
|
||||
@@ -2673,14 +2673,14 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-session"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"strum 0.27.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "common-telemetry"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"backtrace",
|
||||
"common-error",
|
||||
@@ -2708,7 +2708,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-test-util"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"client",
|
||||
"common-grpc",
|
||||
@@ -2721,7 +2721,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-time"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"chrono",
|
||||
@@ -2739,7 +2739,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-version"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"build-data",
|
||||
"cargo-manifest",
|
||||
@@ -2750,7 +2750,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-wal"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"common-base",
|
||||
"common-error",
|
||||
@@ -2773,7 +2773,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-workload"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"common-telemetry",
|
||||
@@ -3729,7 +3729,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "datanode"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow-flight",
|
||||
@@ -3782,7 +3782,7 @@ dependencies = [
|
||||
"session",
|
||||
"snafu 0.8.5",
|
||||
"store-api",
|
||||
"substrait 0.15.3",
|
||||
"substrait 0.15.4",
|
||||
"table",
|
||||
"tokio",
|
||||
"toml 0.8.19",
|
||||
@@ -3791,7 +3791,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "datatypes"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"arrow-array 54.2.1",
|
||||
@@ -4451,7 +4451,7 @@ checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6"
|
||||
|
||||
[[package]]
|
||||
name = "file-engine"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -4588,7 +4588,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
|
||||
|
||||
[[package]]
|
||||
name = "flow"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow 54.2.1",
|
||||
@@ -4653,7 +4653,7 @@ dependencies = [
|
||||
"sql",
|
||||
"store-api",
|
||||
"strum 0.27.1",
|
||||
"substrait 0.15.3",
|
||||
"substrait 0.15.4",
|
||||
"table",
|
||||
"tokio",
|
||||
"tonic 0.12.3",
|
||||
@@ -4708,7 +4708,7 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
|
||||
|
||||
[[package]]
|
||||
name = "frontend"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arc-swap",
|
||||
@@ -4768,7 +4768,7 @@ dependencies = [
|
||||
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
|
||||
"store-api",
|
||||
"strfmt",
|
||||
"substrait 0.15.3",
|
||||
"substrait 0.15.4",
|
||||
"table",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
@@ -5158,7 +5158,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=96c733f8472284d3c83a4c011dc6de9cf830c353#96c733f8472284d3c83a4c011dc6de9cf830c353"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a5d256ba4abb7393e0859ffbf7fca1e38f3433dc#a5d256ba4abb7393e0859ffbf7fca1e38f3433dc"
|
||||
dependencies = [
|
||||
"prost 0.13.5",
|
||||
"serde",
|
||||
@@ -5929,7 +5929,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "index"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"asynchronous-codec",
|
||||
@@ -6814,7 +6814,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
|
||||
|
||||
[[package]]
|
||||
name = "log-query"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"common-error",
|
||||
@@ -6826,7 +6826,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "log-store"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
@@ -7124,7 +7124,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "meta-client"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -7152,7 +7152,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "meta-srv"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -7243,7 +7243,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "metric-engine"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"aquamarine",
|
||||
@@ -7333,7 +7333,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "mito-codec"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"bytes",
|
||||
@@ -7356,7 +7356,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "mito2"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"aquamarine",
|
||||
@@ -8106,7 +8106,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "object-store"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bytes",
|
||||
@@ -8420,7 +8420,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "operator"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"api",
|
||||
@@ -8475,7 +8475,7 @@ dependencies = [
|
||||
"sql",
|
||||
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
|
||||
"store-api",
|
||||
"substrait 0.15.3",
|
||||
"substrait 0.15.4",
|
||||
"table",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
@@ -8742,7 +8742,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "partition"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -9030,7 +9030,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
|
||||
|
||||
[[package]]
|
||||
name = "pipeline"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"api",
|
||||
@@ -9173,7 +9173,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "plugins"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"auth",
|
||||
"clap 4.5.19",
|
||||
@@ -9486,7 +9486,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "promql"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"async-trait",
|
||||
@@ -9768,7 +9768,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "puffin"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"async-compression 0.4.13",
|
||||
"async-trait",
|
||||
@@ -9810,7 +9810,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "query"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"api",
|
||||
@@ -9876,7 +9876,7 @@ dependencies = [
|
||||
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
|
||||
"statrs",
|
||||
"store-api",
|
||||
"substrait 0.15.3",
|
||||
"substrait 0.15.4",
|
||||
"table",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
@@ -11162,7 +11162,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "servers"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"api",
|
||||
@@ -11283,7 +11283,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "session"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arc-swap",
|
||||
@@ -11622,7 +11622,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "sql"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"chrono",
|
||||
@@ -11677,7 +11677,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "sqlness-runner"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"clap 4.5.19",
|
||||
@@ -11977,7 +11977,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "stat"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"nix 0.30.1",
|
||||
]
|
||||
@@ -12003,7 +12003,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "store-api"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"aquamarine",
|
||||
@@ -12164,7 +12164,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "substrait"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
@@ -12344,7 +12344,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "table"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -12605,7 +12605,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
|
||||
|
||||
[[package]]
|
||||
name = "tests-fuzz"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"arbitrary",
|
||||
"async-trait",
|
||||
@@ -12649,7 +12649,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tests-integration"
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow-flight",
|
||||
@@ -12716,7 +12716,7 @@ dependencies = [
|
||||
"sql",
|
||||
"sqlx",
|
||||
"store-api",
|
||||
"substrait 0.15.3",
|
||||
"substrait 0.15.4",
|
||||
"table",
|
||||
"tempfile",
|
||||
"time",
|
||||
|
||||
@@ -71,7 +71,7 @@ members = [
|
||||
resolver = "2"
|
||||
|
||||
[workspace.package]
|
||||
version = "0.15.3"
|
||||
version = "0.15.4"
|
||||
edition = "2021"
|
||||
license = "Apache-2.0"
|
||||
|
||||
@@ -134,7 +134,7 @@ etcd-client = "0.14"
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "96c733f8472284d3c83a4c011dc6de9cf830c353" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a5d256ba4abb7393e0859ffbf7fca1e38f3433dc" }
|
||||
hex = "0.4"
|
||||
http = "1"
|
||||
humantime = "2.1"
|
||||
|
||||
@@ -147,6 +147,7 @@
|
||||
| `region_engine.mito.write_cache_ttl` | String | Unset | TTL for write cache. |
|
||||
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
|
||||
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
|
||||
| `region_engine.mito.max_concurrent_scan_files` | Integer | `128` | Maximum number of SST files to scan concurrently. |
|
||||
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
|
||||
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.<br/>To align with the old behavior, the default value is 0 (no restrictions). |
|
||||
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
|
||||
@@ -496,6 +497,7 @@
|
||||
| `region_engine.mito.write_cache_ttl` | String | Unset | TTL for write cache. |
|
||||
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
|
||||
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
|
||||
| `region_engine.mito.max_concurrent_scan_files` | Integer | `128` | Maximum number of SST files to scan concurrently. |
|
||||
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
|
||||
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.<br/>To align with the old behavior, the default value is 0 (no restrictions). |
|
||||
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
|
||||
|
||||
@@ -474,6 +474,9 @@ sst_write_buffer_size = "8MB"
|
||||
## Capacity of the channel to send data from parallel scan tasks to the main task.
|
||||
parallel_scan_channel_size = 32
|
||||
|
||||
## Maximum number of SST files to scan concurrently.
|
||||
max_concurrent_scan_files = 128
|
||||
|
||||
## Whether to allow stale WAL entries read during replay.
|
||||
allow_stale_entries = false
|
||||
|
||||
|
||||
@@ -565,6 +565,9 @@ sst_write_buffer_size = "8MB"
|
||||
## Capacity of the channel to send data from parallel scan tasks to the main task.
|
||||
parallel_scan_channel_size = 32
|
||||
|
||||
## Maximum number of SST files to scan concurrently.
|
||||
max_concurrent_scan_files = 128
|
||||
|
||||
## Whether to allow stale WAL entries read during replay.
|
||||
allow_stale_entries = false
|
||||
|
||||
|
||||
@@ -19,7 +19,8 @@ use datafusion::execution::registry::SerializerRegistry;
|
||||
use datafusion_common::DataFusionError;
|
||||
use datafusion_expr::UserDefinedLogicalNode;
|
||||
use promql::extension_plan::{
|
||||
EmptyMetric, InstantManipulate, RangeManipulate, ScalarCalculate, SeriesDivide, SeriesNormalize,
|
||||
Absent, EmptyMetric, InstantManipulate, RangeManipulate, ScalarCalculate, SeriesDivide,
|
||||
SeriesNormalize,
|
||||
};
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -65,6 +66,13 @@ impl SerializerRegistry for ExtensionSerializer {
|
||||
.expect("Failed to downcast to SeriesDivide");
|
||||
Ok(series_divide.serialize())
|
||||
}
|
||||
name if name == Absent::name() => {
|
||||
let absent = node
|
||||
.as_any()
|
||||
.downcast_ref::<Absent>()
|
||||
.expect("Failed to downcast to Absent");
|
||||
Ok(absent.serialize())
|
||||
}
|
||||
name if name == EmptyMetric::name() => Err(DataFusionError::Substrait(
|
||||
"EmptyMetric should not be serialized".to_string(),
|
||||
)),
|
||||
@@ -103,6 +111,10 @@ impl SerializerRegistry for ExtensionSerializer {
|
||||
let scalar_calculate = ScalarCalculate::deserialize(bytes)?;
|
||||
Ok(Arc::new(scalar_calculate))
|
||||
}
|
||||
name if name == Absent::name() => {
|
||||
let absent = Absent::deserialize(bytes)?;
|
||||
Ok(Arc::new(absent))
|
||||
}
|
||||
name if name == EmptyMetric::name() => Err(DataFusionError::Substrait(
|
||||
"EmptyMetric should not be deserialized".to_string(),
|
||||
)),
|
||||
|
||||
@@ -21,9 +21,10 @@ use common_catalog::format_full_table_name;
|
||||
use common_recordbatch::util;
|
||||
use common_telemetry::tracing;
|
||||
use datatypes::prelude::Value;
|
||||
use promql_parser::label::{Matcher, Matchers};
|
||||
use promql_parser::label::{MatchOp, Matcher, Matchers};
|
||||
use query::promql;
|
||||
use query::promql::planner::PromPlanner;
|
||||
use servers::prom_store::{DATABASE_LABEL, SCHEMA_LABEL};
|
||||
use servers::prometheus;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
@@ -114,7 +115,17 @@ impl Instance {
|
||||
end: SystemTime,
|
||||
ctx: &QueryContextRef,
|
||||
) -> Result<Vec<String>> {
|
||||
let table_schema = ctx.current_schema();
|
||||
let table_schema = matchers
|
||||
.iter()
|
||||
.find_map(|m| {
|
||||
if (m.name == SCHEMA_LABEL || m.name == DATABASE_LABEL) && m.op == MatchOp::Equal {
|
||||
Some(m.value.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.unwrap_or_else(|| ctx.current_schema());
|
||||
|
||||
let table = self
|
||||
.catalog_manager
|
||||
.table(ctx.current_catalog(), &table_schema, &metric, Some(ctx))
|
||||
|
||||
@@ -30,6 +30,8 @@ use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
|
||||
const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
|
||||
/// Default channel size for parallel scan task.
|
||||
pub(crate) const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
|
||||
/// Default maximum number of SST files to scan concurrently.
|
||||
pub(crate) const DEFAULT_MAX_CONCURRENT_SCAN_FILES: usize = 128;
|
||||
|
||||
// Use `1/GLOBAL_WRITE_BUFFER_SIZE_FACTOR` of OS memory as global write buffer size in default mode
|
||||
const GLOBAL_WRITE_BUFFER_SIZE_FACTOR: u64 = 8;
|
||||
@@ -107,6 +109,8 @@ pub struct MitoConfig {
|
||||
pub sst_write_buffer_size: ReadableSize,
|
||||
/// Capacity of the channel to send data from parallel scan tasks to the main task (default 32).
|
||||
pub parallel_scan_channel_size: usize,
|
||||
/// Maximum number of SST files to scan concurrently (default 128).
|
||||
pub max_concurrent_scan_files: usize,
|
||||
/// Whether to allow stale entries read during replay.
|
||||
pub allow_stale_entries: bool,
|
||||
|
||||
@@ -152,6 +156,7 @@ impl Default for MitoConfig {
|
||||
write_cache_ttl: None,
|
||||
sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
|
||||
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
|
||||
max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
|
||||
allow_stale_entries: false,
|
||||
index: IndexConfig::default(),
|
||||
inverted_index: InvertedIndexConfig::default(),
|
||||
|
||||
@@ -506,6 +506,7 @@ impl EngineInner {
|
||||
CacheStrategy::EnableAll(cache_manager),
|
||||
)
|
||||
.with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
|
||||
.with_max_concurrent_scan_files(self.config.max_concurrent_scan_files)
|
||||
.with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
|
||||
.with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
|
||||
.with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
|
||||
|
||||
@@ -13,6 +13,8 @@
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::Rows;
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_recordbatch::RecordBatches;
|
||||
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
|
||||
use futures::TryStreamExt;
|
||||
@@ -151,6 +153,58 @@ async fn test_scan_with_min_sst_sequence() {
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_max_concurrent_scan_files() {
|
||||
let mut env = TestEnv::with_prefix("test_max_concurrent_scan_files").await;
|
||||
let config = MitoConfig {
|
||||
max_concurrent_scan_files: 2,
|
||||
..Default::default()
|
||||
};
|
||||
let engine = env.create_engine(config).await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
let column_schemas = test_util::rows_schema(&request);
|
||||
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let put_and_flush = async |start, end| {
|
||||
let rows = Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows: test_util::build_rows(start, end),
|
||||
};
|
||||
test_util::put_rows(&engine, region_id, rows).await;
|
||||
test_util::flush_region(&engine, region_id, None).await;
|
||||
};
|
||||
|
||||
// Write overlapping files.
|
||||
put_and_flush(0, 4).await;
|
||||
put_and_flush(3, 7).await;
|
||||
put_and_flush(6, 9).await;
|
||||
|
||||
let request = ScanRequest::default();
|
||||
let scanner = engine.scanner(region_id, request).await.unwrap();
|
||||
let Scanner::Seq(scanner) = scanner else {
|
||||
panic!("Scanner should be seq scan");
|
||||
};
|
||||
let error = scanner.check_scan_limit().unwrap_err();
|
||||
assert_eq!(StatusCode::RateLimited, error.status_code());
|
||||
|
||||
let request = ScanRequest {
|
||||
distribution: Some(TimeSeriesDistribution::PerSeries),
|
||||
..Default::default()
|
||||
};
|
||||
let scanner = engine.scanner(region_id, request).await.unwrap();
|
||||
let Scanner::Series(scanner) = scanner else {
|
||||
panic!("Scanner should be series scan");
|
||||
};
|
||||
let error = scanner.check_scan_limit().unwrap_err();
|
||||
assert_eq!(StatusCode::RateLimited, error.status_code());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_series_scan() {
|
||||
let mut env = TestEnv::with_prefix("test_series_scan").await;
|
||||
|
||||
@@ -1032,6 +1032,18 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Too many files to read concurrently: {}, max allowed: {}",
|
||||
actual,
|
||||
max
|
||||
))]
|
||||
TooManyFilesToRead {
|
||||
actual: usize,
|
||||
max: usize,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
@@ -1189,6 +1201,8 @@ impl ErrorExt for Error {
|
||||
Encode { source, .. } | Decode { source, .. } => source.status_code(),
|
||||
|
||||
InconsistentTimestampLength { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
TooManyFilesToRead { .. } => StatusCode::RateLimited,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -39,7 +39,7 @@ use tokio_stream::wrappers::ReceiverStream;
|
||||
|
||||
use crate::access_layer::AccessLayerRef;
|
||||
use crate::cache::CacheStrategy;
|
||||
use crate::config::DEFAULT_SCAN_CHANNEL_SIZE;
|
||||
use crate::config::{DEFAULT_MAX_CONCURRENT_SCAN_FILES, DEFAULT_SCAN_CHANNEL_SIZE};
|
||||
use crate::error::Result;
|
||||
use crate::memtable::MemtableRange;
|
||||
use crate::metrics::READ_SST_COUNT;
|
||||
@@ -187,6 +187,8 @@ pub(crate) struct ScanRegion {
|
||||
cache_strategy: CacheStrategy,
|
||||
/// Capacity of the channel to send data from parallel scan tasks to the main task.
|
||||
parallel_scan_channel_size: usize,
|
||||
/// Maximum number of SST files to scan concurrently.
|
||||
max_concurrent_scan_files: usize,
|
||||
/// Whether to ignore inverted index.
|
||||
ignore_inverted_index: bool,
|
||||
/// Whether to ignore fulltext index.
|
||||
@@ -214,6 +216,7 @@ impl ScanRegion {
|
||||
request,
|
||||
cache_strategy,
|
||||
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
|
||||
max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
|
||||
ignore_inverted_index: false,
|
||||
ignore_fulltext_index: false,
|
||||
ignore_bloom_filter: false,
|
||||
@@ -232,6 +235,16 @@ impl ScanRegion {
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets maximum number of SST files to scan concurrently.
|
||||
#[must_use]
|
||||
pub(crate) fn with_max_concurrent_scan_files(
|
||||
mut self,
|
||||
max_concurrent_scan_files: usize,
|
||||
) -> Self {
|
||||
self.max_concurrent_scan_files = max_concurrent_scan_files;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets whether to ignore inverted index.
|
||||
#[must_use]
|
||||
pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
|
||||
@@ -421,6 +434,7 @@ impl ScanRegion {
|
||||
.with_bloom_filter_index_applier(bloom_filter_applier)
|
||||
.with_fulltext_index_applier(fulltext_index_applier)
|
||||
.with_parallel_scan_channel_size(self.parallel_scan_channel_size)
|
||||
.with_max_concurrent_scan_files(self.max_concurrent_scan_files)
|
||||
.with_start_time(self.start_time)
|
||||
.with_append_mode(self.version.options.append_mode)
|
||||
.with_filter_deleted(self.filter_deleted)
|
||||
@@ -597,6 +611,8 @@ pub struct ScanInput {
|
||||
ignore_file_not_found: bool,
|
||||
/// Capacity of the channel to send data from parallel scan tasks to the main task.
|
||||
pub(crate) parallel_scan_channel_size: usize,
|
||||
/// Maximum number of SST files to scan concurrently.
|
||||
pub(crate) max_concurrent_scan_files: usize,
|
||||
/// Index appliers.
|
||||
inverted_index_applier: Option<InvertedIndexApplierRef>,
|
||||
bloom_filter_index_applier: Option<BloomFilterIndexApplierRef>,
|
||||
@@ -629,6 +645,7 @@ impl ScanInput {
|
||||
cache_strategy: CacheStrategy::Disabled,
|
||||
ignore_file_not_found: false,
|
||||
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
|
||||
max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
|
||||
inverted_index_applier: None,
|
||||
bloom_filter_index_applier: None,
|
||||
fulltext_index_applier: None,
|
||||
@@ -693,6 +710,16 @@ impl ScanInput {
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets maximum number of SST files to scan concurrently.
|
||||
#[must_use]
|
||||
pub(crate) fn with_max_concurrent_scan_files(
|
||||
mut self,
|
||||
max_concurrent_scan_files: usize,
|
||||
) -> Self {
|
||||
self.max_concurrent_scan_files = max_concurrent_scan_files;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets invereted index applier.
|
||||
#[must_use]
|
||||
pub(crate) fn with_inverted_index_applier(
|
||||
|
||||
@@ -33,11 +33,11 @@ use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, Sc
|
||||
use store_api::storage::TimeSeriesRowSelector;
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
use crate::error::{PartitionOutOfRangeSnafu, Result};
|
||||
use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu};
|
||||
use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
|
||||
use crate::read::last_row::LastRowReader;
|
||||
use crate::read::merge::MergeReaderBuilder;
|
||||
use crate::read::range::RangeBuilderList;
|
||||
use crate::read::range::{RangeBuilderList, RangeMeta};
|
||||
use crate::read::scan_region::{ScanInput, StreamContext};
|
||||
use crate::read::scan_util::{
|
||||
scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList,
|
||||
@@ -347,6 +347,40 @@ impl SeqScan {
|
||||
|
||||
metrics
|
||||
}
|
||||
|
||||
/// Finds the maximum number of files to read in a single partition range.
|
||||
fn max_files_in_partition(ranges: &[RangeMeta], partition_ranges: &[PartitionRange]) -> usize {
|
||||
partition_ranges
|
||||
.iter()
|
||||
.map(|part_range| {
|
||||
let range_meta = &ranges[part_range.identifier];
|
||||
range_meta.indices.len()
|
||||
})
|
||||
.max()
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
/// Checks resource limit for the scanner.
|
||||
pub(crate) fn check_scan_limit(&self) -> Result<()> {
|
||||
// Check max file count limit for all partitions since we scan them in parallel.
|
||||
let total_max_files: usize = self
|
||||
.properties
|
||||
.partitions
|
||||
.iter()
|
||||
.map(|partition| Self::max_files_in_partition(&self.stream_ctx.ranges, partition))
|
||||
.sum();
|
||||
|
||||
let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
|
||||
if total_max_files > max_concurrent_files {
|
||||
return TooManyFilesToReadSnafu {
|
||||
actual: total_max_files,
|
||||
max: max_concurrent_files,
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl RegionScanner for SeqScan {
|
||||
@@ -372,6 +406,9 @@ impl RegionScanner for SeqScan {
|
||||
|
||||
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
|
||||
self.properties.prepare(request);
|
||||
|
||||
self.check_scan_limit().map_err(BoxedError::new)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -37,7 +37,7 @@ use tokio::sync::Semaphore;
|
||||
|
||||
use crate::error::{
|
||||
ComputeArrowSnafu, Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result,
|
||||
ScanMultiTimesSnafu, ScanSeriesSnafu,
|
||||
ScanMultiTimesSnafu, ScanSeriesSnafu, TooManyFilesToReadSnafu,
|
||||
};
|
||||
use crate::read::range::RangeBuilderList;
|
||||
use crate::read::scan_region::{ScanInput, StreamContext};
|
||||
@@ -201,6 +201,32 @@ impl SeriesScan {
|
||||
let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
|
||||
Ok(Box::pin(chained_stream))
|
||||
}
|
||||
|
||||
/// Checks resource limit for the scanner.
|
||||
pub(crate) fn check_scan_limit(&self) -> Result<()> {
|
||||
// Sum the total number of files across all partitions
|
||||
let total_files: usize = self
|
||||
.properties
|
||||
.partitions
|
||||
.iter()
|
||||
.flat_map(|partition| partition.iter())
|
||||
.map(|part_range| {
|
||||
let range_meta = &self.stream_ctx.ranges[part_range.identifier];
|
||||
range_meta.indices.len()
|
||||
})
|
||||
.sum();
|
||||
|
||||
let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
|
||||
if total_files > max_concurrent_files {
|
||||
return TooManyFilesToReadSnafu {
|
||||
actual: total_files,
|
||||
max: max_concurrent_files,
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
|
||||
@@ -236,6 +262,9 @@ impl RegionScanner for SeriesScan {
|
||||
|
||||
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
|
||||
self.properties.prepare(request);
|
||||
|
||||
self.check_scan_limit().map_err(BoxedError::new)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -242,6 +242,7 @@ impl RegionScanner for UnorderedScan {
|
||||
|
||||
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
|
||||
self.properties.prepare(request);
|
||||
// UnorderedScan only scans one row group per partition so the resource requirement won't be too high.
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod absent;
|
||||
mod empty_metric;
|
||||
mod histogram_fold;
|
||||
mod instant_manipulate;
|
||||
@@ -24,6 +25,7 @@ mod series_divide;
|
||||
mod test_util;
|
||||
mod union_distinct_on;
|
||||
|
||||
pub use absent::{Absent, AbsentExec, AbsentStream};
|
||||
use datafusion::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType};
|
||||
pub use empty_metric::{build_special_time_expr, EmptyMetric, EmptyMetricExec, EmptyMetricStream};
|
||||
pub use histogram_fold::{HistogramFold, HistogramFoldExec, HistogramFoldStream};
|
||||
|
||||
654
src/promql/src/extension_plan/absent.rs
Normal file
654
src/promql/src/extension_plan/absent.rs
Normal file
@@ -0,0 +1,654 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::HashMap;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use datafusion::arrow::array::Array;
|
||||
use datafusion::common::{DFSchemaRef, Result as DataFusionResult};
|
||||
use datafusion::execution::context::TaskContext;
|
||||
use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore};
|
||||
use datafusion::physical_expr::{EquivalenceProperties, LexRequirement, PhysicalSortRequirement};
|
||||
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
|
||||
use datafusion::physical_plan::expressions::Column as ColumnExpr;
|
||||
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
|
||||
use datafusion::physical_plan::{
|
||||
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PlanProperties,
|
||||
RecordBatchStream, SendableRecordBatchStream,
|
||||
};
|
||||
use datafusion_common::DFSchema;
|
||||
use datafusion_expr::EmptyRelation;
|
||||
use datatypes::arrow;
|
||||
use datatypes::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray};
|
||||
use datatypes::arrow::datatypes::{DataType, Field, SchemaRef, TimeUnit};
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::arrow_array::StringArray;
|
||||
use datatypes::compute::SortOptions;
|
||||
use futures::{ready, Stream, StreamExt};
|
||||
use greptime_proto::substrait_extension as pb;
|
||||
use prost::Message;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::DeserializeSnafu;
|
||||
use crate::extension_plan::Millisecond;
|
||||
|
||||
/// Maximum number of rows per output batch
|
||||
const ABSENT_BATCH_SIZE: usize = 8192;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash)]
|
||||
pub struct Absent {
|
||||
start: Millisecond,
|
||||
end: Millisecond,
|
||||
step: Millisecond,
|
||||
time_index_column: String,
|
||||
value_column: String,
|
||||
fake_labels: Vec<(String, String)>,
|
||||
input: LogicalPlan,
|
||||
output_schema: DFSchemaRef,
|
||||
}
|
||||
|
||||
impl PartialOrd for Absent {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
// compare on fields except schema and input
|
||||
(
|
||||
self.start,
|
||||
self.end,
|
||||
self.step,
|
||||
&self.time_index_column,
|
||||
&self.value_column,
|
||||
&self.fake_labels,
|
||||
)
|
||||
.partial_cmp(&(
|
||||
other.start,
|
||||
other.end,
|
||||
other.step,
|
||||
&other.time_index_column,
|
||||
&other.value_column,
|
||||
&other.fake_labels,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl UserDefinedLogicalNodeCore for Absent {
|
||||
fn name(&self) -> &str {
|
||||
Self::name()
|
||||
}
|
||||
|
||||
fn inputs(&self) -> Vec<&LogicalPlan> {
|
||||
vec![&self.input]
|
||||
}
|
||||
|
||||
fn schema(&self) -> &DFSchemaRef {
|
||||
&self.output_schema
|
||||
}
|
||||
|
||||
fn expressions(&self) -> Vec<Expr> {
|
||||
vec![]
|
||||
}
|
||||
|
||||
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"PromAbsent: start={}, end={}, step={}",
|
||||
self.start, self.end, self.step
|
||||
)
|
||||
}
|
||||
|
||||
fn with_exprs_and_inputs(
|
||||
&self,
|
||||
_exprs: Vec<Expr>,
|
||||
inputs: Vec<LogicalPlan>,
|
||||
) -> DataFusionResult<Self> {
|
||||
if inputs.is_empty() {
|
||||
return Err(datafusion::error::DataFusionError::Internal(
|
||||
"Absent must have at least one input".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
start: self.start,
|
||||
end: self.end,
|
||||
step: self.step,
|
||||
time_index_column: self.time_index_column.clone(),
|
||||
value_column: self.value_column.clone(),
|
||||
fake_labels: self.fake_labels.clone(),
|
||||
input: inputs[0].clone(),
|
||||
output_schema: self.output_schema.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Absent {
|
||||
pub fn try_new(
|
||||
start: Millisecond,
|
||||
end: Millisecond,
|
||||
step: Millisecond,
|
||||
time_index_column: String,
|
||||
value_column: String,
|
||||
fake_labels: Vec<(String, String)>,
|
||||
input: LogicalPlan,
|
||||
) -> DataFusionResult<Self> {
|
||||
let mut fields = vec![
|
||||
Field::new(
|
||||
&time_index_column,
|
||||
DataType::Timestamp(TimeUnit::Millisecond, None),
|
||||
true,
|
||||
),
|
||||
Field::new(&value_column, DataType::Float64, true),
|
||||
];
|
||||
|
||||
// remove duplicate fake labels
|
||||
let mut fake_labels = fake_labels
|
||||
.into_iter()
|
||||
.collect::<HashMap<String, String>>()
|
||||
.into_iter()
|
||||
.collect::<Vec<_>>();
|
||||
fake_labels.sort_unstable_by(|a, b| a.0.cmp(&b.0));
|
||||
for (name, _) in fake_labels.iter() {
|
||||
fields.push(Field::new(name, DataType::Utf8, true));
|
||||
}
|
||||
|
||||
let output_schema = Arc::new(DFSchema::from_unqualified_fields(
|
||||
fields.into(),
|
||||
HashMap::new(),
|
||||
)?);
|
||||
|
||||
Ok(Self {
|
||||
start,
|
||||
end,
|
||||
step,
|
||||
time_index_column,
|
||||
value_column,
|
||||
fake_labels,
|
||||
input,
|
||||
output_schema,
|
||||
})
|
||||
}
|
||||
|
||||
pub const fn name() -> &'static str {
|
||||
"prom_absent"
|
||||
}
|
||||
|
||||
pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
|
||||
let output_schema = Arc::new(self.output_schema.as_arrow().clone());
|
||||
let properties = PlanProperties::new(
|
||||
EquivalenceProperties::new(output_schema.clone()),
|
||||
Partitioning::UnknownPartitioning(1),
|
||||
EmissionType::Incremental,
|
||||
Boundedness::Bounded,
|
||||
);
|
||||
Arc::new(AbsentExec {
|
||||
start: self.start,
|
||||
end: self.end,
|
||||
step: self.step,
|
||||
time_index_column: self.time_index_column.clone(),
|
||||
value_column: self.value_column.clone(),
|
||||
fake_labels: self.fake_labels.clone(),
|
||||
output_schema: output_schema.clone(),
|
||||
input: exec_input,
|
||||
properties,
|
||||
metric: ExecutionPlanMetricsSet::new(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn serialize(&self) -> Vec<u8> {
|
||||
pb::Absent {
|
||||
start: self.start,
|
||||
end: self.end,
|
||||
step: self.step,
|
||||
time_index_column: self.time_index_column.clone(),
|
||||
value_column: self.value_column.clone(),
|
||||
fake_labels: self
|
||||
.fake_labels
|
||||
.iter()
|
||||
.map(|(name, value)| pb::LabelPair {
|
||||
key: name.clone(),
|
||||
value: value.clone(),
|
||||
})
|
||||
.collect(),
|
||||
}
|
||||
.encode_to_vec()
|
||||
}
|
||||
|
||||
pub fn deserialize(bytes: &[u8]) -> DataFusionResult<Self> {
|
||||
let pb_absent = pb::Absent::decode(bytes).context(DeserializeSnafu)?;
|
||||
let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
|
||||
produce_one_row: false,
|
||||
schema: Arc::new(DFSchema::empty()),
|
||||
});
|
||||
Self::try_new(
|
||||
pb_absent.start,
|
||||
pb_absent.end,
|
||||
pb_absent.step,
|
||||
pb_absent.time_index_column,
|
||||
pb_absent.value_column,
|
||||
pb_absent
|
||||
.fake_labels
|
||||
.iter()
|
||||
.map(|label| (label.key.clone(), label.value.clone()))
|
||||
.collect(),
|
||||
placeholder_plan,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct AbsentExec {
|
||||
start: Millisecond,
|
||||
end: Millisecond,
|
||||
step: Millisecond,
|
||||
time_index_column: String,
|
||||
value_column: String,
|
||||
fake_labels: Vec<(String, String)>,
|
||||
output_schema: SchemaRef,
|
||||
input: Arc<dyn ExecutionPlan>,
|
||||
properties: PlanProperties,
|
||||
metric: ExecutionPlanMetricsSet,
|
||||
}
|
||||
|
||||
impl ExecutionPlan for AbsentExec {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.output_schema.clone()
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
&self.properties
|
||||
}
|
||||
|
||||
fn required_input_distribution(&self) -> Vec<Distribution> {
|
||||
vec![Distribution::SinglePartition]
|
||||
}
|
||||
|
||||
fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
|
||||
vec![Some(LexRequirement::new(vec![PhysicalSortRequirement {
|
||||
expr: Arc::new(
|
||||
ColumnExpr::new_with_schema(&self.time_index_column, &self.input.schema()).unwrap(),
|
||||
),
|
||||
options: Some(SortOptions {
|
||||
descending: false,
|
||||
nulls_first: false,
|
||||
}),
|
||||
}]))]
|
||||
}
|
||||
|
||||
fn maintains_input_order(&self) -> Vec<bool> {
|
||||
vec![false]
|
||||
}
|
||||
|
||||
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
|
||||
vec![&self.input]
|
||||
}
|
||||
|
||||
fn with_new_children(
|
||||
self: Arc<Self>,
|
||||
children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
|
||||
assert!(!children.is_empty());
|
||||
Ok(Arc::new(Self {
|
||||
start: self.start,
|
||||
end: self.end,
|
||||
step: self.step,
|
||||
time_index_column: self.time_index_column.clone(),
|
||||
value_column: self.value_column.clone(),
|
||||
fake_labels: self.fake_labels.clone(),
|
||||
output_schema: self.output_schema.clone(),
|
||||
input: children[0].clone(),
|
||||
properties: self.properties.clone(),
|
||||
metric: self.metric.clone(),
|
||||
}))
|
||||
}
|
||||
|
||||
fn execute(
|
||||
&self,
|
||||
partition: usize,
|
||||
context: Arc<TaskContext>,
|
||||
) -> DataFusionResult<SendableRecordBatchStream> {
|
||||
let baseline_metric = BaselineMetrics::new(&self.metric, partition);
|
||||
let input = self.input.execute(partition, context)?;
|
||||
|
||||
Ok(Box::pin(AbsentStream {
|
||||
end: self.end,
|
||||
step: self.step,
|
||||
time_index_column_index: self
|
||||
.input
|
||||
.schema()
|
||||
.column_with_name(&self.time_index_column)
|
||||
.unwrap() // Safety: we have checked the column name in `try_new`
|
||||
.0,
|
||||
output_schema: self.output_schema.clone(),
|
||||
fake_labels: self.fake_labels.clone(),
|
||||
input,
|
||||
metric: baseline_metric,
|
||||
// Buffer for streaming output timestamps
|
||||
output_timestamps: Vec::new(),
|
||||
// Current timestamp in the output range
|
||||
output_ts_cursor: self.start,
|
||||
input_finished: false,
|
||||
}))
|
||||
}
|
||||
|
||||
fn metrics(&self) -> Option<MetricsSet> {
|
||||
Some(self.metric.clone_inner())
|
||||
}
|
||||
|
||||
fn name(&self) -> &str {
|
||||
"AbsentExec"
|
||||
}
|
||||
}
|
||||
|
||||
impl DisplayAs for AbsentExec {
|
||||
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
match t {
|
||||
DisplayFormatType::Default | DisplayFormatType::Verbose => {
|
||||
write!(
|
||||
f,
|
||||
"PromAbsentExec: start={}, end={}, step={}",
|
||||
self.start, self.end, self.step
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct AbsentStream {
|
||||
end: Millisecond,
|
||||
step: Millisecond,
|
||||
time_index_column_index: usize,
|
||||
output_schema: SchemaRef,
|
||||
fake_labels: Vec<(String, String)>,
|
||||
input: SendableRecordBatchStream,
|
||||
metric: BaselineMetrics,
|
||||
// Buffer for streaming output timestamps
|
||||
output_timestamps: Vec<Millisecond>,
|
||||
// Current timestamp in the output range
|
||||
output_ts_cursor: Millisecond,
|
||||
input_finished: bool,
|
||||
}
|
||||
|
||||
impl RecordBatchStream for AbsentStream {
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.output_schema.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for AbsentStream {
|
||||
type Item = DataFusionResult<RecordBatch>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
loop {
|
||||
if !self.input_finished {
|
||||
match ready!(self.input.poll_next_unpin(cx)) {
|
||||
Some(Ok(batch)) => {
|
||||
let timer = std::time::Instant::now();
|
||||
if let Err(e) = self.process_input_batch(&batch) {
|
||||
return Poll::Ready(Some(Err(e)));
|
||||
}
|
||||
self.metric.elapsed_compute().add_elapsed(timer);
|
||||
|
||||
// If we have enough data for a batch, output it
|
||||
if self.output_timestamps.len() >= ABSENT_BATCH_SIZE {
|
||||
let timer = std::time::Instant::now();
|
||||
let result = self.flush_output_batch();
|
||||
self.metric.elapsed_compute().add_elapsed(timer);
|
||||
|
||||
match result {
|
||||
Ok(Some(batch)) => return Poll::Ready(Some(Ok(batch))),
|
||||
Ok(None) => continue,
|
||||
Err(e) => return Poll::Ready(Some(Err(e))),
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
|
||||
None => {
|
||||
self.input_finished = true;
|
||||
|
||||
let timer = std::time::Instant::now();
|
||||
// Process any remaining absent timestamps
|
||||
if let Err(e) = self.process_remaining_absent_timestamps() {
|
||||
return Poll::Ready(Some(Err(e)));
|
||||
}
|
||||
let result = self.flush_output_batch();
|
||||
self.metric.elapsed_compute().add_elapsed(timer);
|
||||
return Poll::Ready(result.transpose());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AbsentStream {
|
||||
fn process_input_batch(&mut self, batch: &RecordBatch) -> DataFusionResult<()> {
|
||||
// Extract timestamps from this batch
|
||||
let timestamp_array = batch.column(self.time_index_column_index);
|
||||
let milli_ts_array = arrow::compute::cast(
|
||||
timestamp_array,
|
||||
&DataType::Timestamp(TimeUnit::Millisecond, None),
|
||||
)?;
|
||||
let timestamp_array = milli_ts_array
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondArray>()
|
||||
.unwrap();
|
||||
|
||||
// Process against current output cursor position
|
||||
for &input_ts in timestamp_array.values() {
|
||||
// Generate absent timestamps up to this input timestamp
|
||||
while self.output_ts_cursor < input_ts && self.output_ts_cursor <= self.end {
|
||||
self.output_timestamps.push(self.output_ts_cursor);
|
||||
self.output_ts_cursor += self.step;
|
||||
}
|
||||
|
||||
// Skip the input timestamp if it matches our cursor
|
||||
if self.output_ts_cursor == input_ts {
|
||||
self.output_ts_cursor += self.step;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn process_remaining_absent_timestamps(&mut self) -> DataFusionResult<()> {
|
||||
// Generate all remaining absent timestamps (input is finished)
|
||||
while self.output_ts_cursor <= self.end {
|
||||
self.output_timestamps.push(self.output_ts_cursor);
|
||||
self.output_ts_cursor += self.step;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn flush_output_batch(&mut self) -> DataFusionResult<Option<RecordBatch>> {
|
||||
if self.output_timestamps.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.output_schema.fields().len());
|
||||
let num_rows = self.output_timestamps.len();
|
||||
columns.push(Arc::new(TimestampMillisecondArray::from(
|
||||
self.output_timestamps.clone(),
|
||||
)) as _);
|
||||
columns.push(Arc::new(Float64Array::from(vec![1.0; num_rows])) as _);
|
||||
|
||||
for (_, value) in self.fake_labels.iter() {
|
||||
columns.push(Arc::new(StringArray::from_iter(std::iter::repeat_n(
|
||||
Some(value.clone()),
|
||||
num_rows,
|
||||
))) as _);
|
||||
}
|
||||
|
||||
let batch = RecordBatch::try_new(self.output_schema.clone(), columns)?;
|
||||
|
||||
self.output_timestamps.clear();
|
||||
Ok(Some(batch))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
|
||||
use datafusion::arrow::record_batch::RecordBatch;
|
||||
use datafusion::physical_plan::memory::MemoryExec;
|
||||
use datafusion::prelude::SessionContext;
|
||||
use datatypes::arrow::array::{Float64Array, TimestampMillisecondArray};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_absent_basic() {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new(
|
||||
"timestamp",
|
||||
DataType::Timestamp(TimeUnit::Millisecond, None),
|
||||
true,
|
||||
),
|
||||
Field::new("value", DataType::Float64, true),
|
||||
]));
|
||||
|
||||
// Input has timestamps: 0, 2000, 4000
|
||||
let timestamp_array = Arc::new(TimestampMillisecondArray::from(vec![0, 2000, 4000]));
|
||||
let value_array = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0]));
|
||||
let batch =
|
||||
RecordBatch::try_new(schema.clone(), vec![timestamp_array, value_array]).unwrap();
|
||||
|
||||
let memory_exec = MemoryExec::try_new(&[vec![batch]], schema, None).unwrap();
|
||||
|
||||
let output_schema = Arc::new(Schema::new(vec![
|
||||
Field::new(
|
||||
"timestamp",
|
||||
DataType::Timestamp(TimeUnit::Millisecond, None),
|
||||
true,
|
||||
),
|
||||
Field::new("value", DataType::Float64, true),
|
||||
]));
|
||||
|
||||
let absent_exec = AbsentExec {
|
||||
start: 0,
|
||||
end: 5000,
|
||||
step: 1000,
|
||||
time_index_column: "timestamp".to_string(),
|
||||
value_column: "value".to_string(),
|
||||
fake_labels: vec![],
|
||||
output_schema: output_schema.clone(),
|
||||
input: Arc::new(memory_exec),
|
||||
properties: PlanProperties::new(
|
||||
EquivalenceProperties::new(output_schema.clone()),
|
||||
Partitioning::UnknownPartitioning(1),
|
||||
EmissionType::Incremental,
|
||||
Boundedness::Bounded,
|
||||
),
|
||||
metric: ExecutionPlanMetricsSet::new(),
|
||||
};
|
||||
|
||||
let session_ctx = SessionContext::new();
|
||||
let task_ctx = session_ctx.task_ctx();
|
||||
let mut stream = absent_exec.execute(0, task_ctx).unwrap();
|
||||
|
||||
// Collect all output batches
|
||||
let mut output_timestamps = Vec::new();
|
||||
while let Some(batch_result) = stream.next().await {
|
||||
let batch = batch_result.unwrap();
|
||||
let ts_array = batch
|
||||
.column(0)
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondArray>()
|
||||
.unwrap();
|
||||
for i in 0..ts_array.len() {
|
||||
if !ts_array.is_null(i) {
|
||||
let ts = ts_array.value(i);
|
||||
output_timestamps.push(ts);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Should output absent timestamps: 1000, 3000, 5000
|
||||
// (0, 2000, 4000 exist in input, so 1000, 3000, 5000 are absent)
|
||||
assert_eq!(output_timestamps, vec![1000, 3000, 5000]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_absent_empty_input() {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new(
|
||||
"timestamp",
|
||||
DataType::Timestamp(TimeUnit::Millisecond, None),
|
||||
true,
|
||||
),
|
||||
Field::new("value", DataType::Float64, true),
|
||||
]));
|
||||
|
||||
// Empty input
|
||||
let memory_exec = MemoryExec::try_new(&[vec![]], schema, None).unwrap();
|
||||
|
||||
let output_schema = Arc::new(Schema::new(vec![
|
||||
Field::new(
|
||||
"timestamp",
|
||||
DataType::Timestamp(TimeUnit::Millisecond, None),
|
||||
true,
|
||||
),
|
||||
Field::new("value", DataType::Float64, true),
|
||||
]));
|
||||
let absent_exec = AbsentExec {
|
||||
start: 0,
|
||||
end: 2000,
|
||||
step: 1000,
|
||||
time_index_column: "timestamp".to_string(),
|
||||
value_column: "value".to_string(),
|
||||
fake_labels: vec![],
|
||||
output_schema: output_schema.clone(),
|
||||
input: Arc::new(memory_exec),
|
||||
properties: PlanProperties::new(
|
||||
EquivalenceProperties::new(output_schema.clone()),
|
||||
Partitioning::UnknownPartitioning(1),
|
||||
EmissionType::Incremental,
|
||||
Boundedness::Bounded,
|
||||
),
|
||||
metric: ExecutionPlanMetricsSet::new(),
|
||||
};
|
||||
|
||||
let session_ctx = SessionContext::new();
|
||||
let task_ctx = session_ctx.task_ctx();
|
||||
let mut stream = absent_exec.execute(0, task_ctx).unwrap();
|
||||
|
||||
// Collect all output timestamps
|
||||
let mut output_timestamps = Vec::new();
|
||||
while let Some(batch_result) = stream.next().await {
|
||||
let batch = batch_result.unwrap();
|
||||
let ts_array = batch
|
||||
.column(0)
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondArray>()
|
||||
.unwrap();
|
||||
for i in 0..ts_array.len() {
|
||||
if !ts_array.is_null(i) {
|
||||
let ts = ts_array.value(i);
|
||||
output_timestamps.push(ts);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Should output all timestamps in range: 0, 1000, 2000
|
||||
assert_eq!(output_timestamps, vec![0, 1000, 2000]);
|
||||
}
|
||||
}
|
||||
@@ -22,8 +22,8 @@ use datafusion::physical_plan::ExecutionPlan;
|
||||
use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
|
||||
|
||||
use crate::extension_plan::{
|
||||
EmptyMetric, HistogramFold, InstantManipulate, RangeManipulate, ScalarCalculate, SeriesDivide,
|
||||
SeriesNormalize, UnionDistinctOn,
|
||||
Absent, EmptyMetric, HistogramFold, InstantManipulate, RangeManipulate, ScalarCalculate,
|
||||
SeriesDivide, SeriesNormalize, UnionDistinctOn,
|
||||
};
|
||||
|
||||
pub struct PromExtensionPlanner;
|
||||
@@ -57,6 +57,8 @@ impl ExtensionPlanner for PromExtensionPlanner {
|
||||
physical_inputs[0].clone(),
|
||||
physical_inputs[1].clone(),
|
||||
)))
|
||||
} else if let Some(node) = node.as_any().downcast_ref::<Absent>() {
|
||||
Ok(Some(node.to_execution_plan(physical_inputs[0].clone())))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@ use datafusion::datasource::DefaultTableSource;
|
||||
use datafusion::execution::context::SessionState;
|
||||
use datafusion::functions_aggregate::average::avg_udaf;
|
||||
use datafusion::functions_aggregate::count::count_udaf;
|
||||
use datafusion::functions_aggregate::expr_fn::first_value;
|
||||
use datafusion::functions_aggregate::grouping::grouping_udaf;
|
||||
use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
|
||||
use datafusion::functions_aggregate::stddev::stddev_pop_udaf;
|
||||
@@ -50,7 +51,7 @@ use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTi
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use itertools::Itertools;
|
||||
use promql::extension_plan::{
|
||||
build_special_time_expr, EmptyMetric, HistogramFold, InstantManipulate, Millisecond,
|
||||
build_special_time_expr, Absent, EmptyMetric, HistogramFold, InstantManipulate, Millisecond,
|
||||
RangeManipulate, ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn,
|
||||
};
|
||||
use promql::functions::{
|
||||
@@ -86,6 +87,8 @@ use crate::promql::error::{
|
||||
const SPECIAL_TIME_FUNCTION: &str = "time";
|
||||
/// `scalar()` function in PromQL.
|
||||
const SCALAR_FUNCTION: &str = "scalar";
|
||||
/// `absent()` function in PromQL
|
||||
const SPECIAL_ABSENT_FUNCTION: &str = "absent";
|
||||
/// `histogram_quantile` function in PromQL
|
||||
const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile";
|
||||
/// `vector` function in PromQL
|
||||
@@ -124,7 +127,10 @@ struct PromPlannerContext {
|
||||
time_index_column: Option<String>,
|
||||
field_columns: Vec<String>,
|
||||
tag_columns: Vec<String>,
|
||||
/// The matcher for field columns `__field__`.
|
||||
field_column_matcher: Option<Vec<Matcher>>,
|
||||
/// The matcher for selectors (normal matchers).
|
||||
selector_matcher: Vec<Matcher>,
|
||||
schema_name: Option<String>,
|
||||
/// The range in millisecond of range selector. None if there is no range selector.
|
||||
range: Option<Millisecond>,
|
||||
@@ -148,6 +154,7 @@ impl PromPlannerContext {
|
||||
self.field_columns = vec![];
|
||||
self.tag_columns = vec![];
|
||||
self.field_column_matcher = None;
|
||||
self.selector_matcher.clear();
|
||||
self.schema_name = None;
|
||||
self.range = None;
|
||||
}
|
||||
@@ -830,6 +837,7 @@ impl PromPlanner {
|
||||
}
|
||||
SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await,
|
||||
SCALAR_FUNCTION => return self.create_scalar_plan(args, session_state).await,
|
||||
SPECIAL_ABSENT_FUNCTION => return self.create_absent_plan(args, session_state).await,
|
||||
_ => {}
|
||||
}
|
||||
|
||||
@@ -1001,6 +1009,7 @@ impl PromPlanner {
|
||||
);
|
||||
self.ctx.schema_name = Some(matcher.value.clone());
|
||||
} else if matcher.name != METRIC_NAME {
|
||||
self.ctx.selector_matcher.push(matcher.clone());
|
||||
let _ = matchers.insert(matcher.clone());
|
||||
}
|
||||
}
|
||||
@@ -1246,6 +1255,13 @@ impl PromPlanner {
|
||||
) -> Result<Vec<DfExpr>> {
|
||||
let mut exprs = Vec::with_capacity(label_matchers.matchers.len());
|
||||
for matcher in label_matchers.matchers {
|
||||
if matcher.name == SCHEMA_COLUMN_MATCHER
|
||||
|| matcher.name == DB_COLUMN_MATCHER
|
||||
|| matcher.name == FIELD_COLUMN_MATCHER
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
let col = if table_schema
|
||||
.field_with_unqualified_name(&matcher.name)
|
||||
.is_err()
|
||||
@@ -2449,6 +2465,69 @@ impl PromPlanner {
|
||||
Ok(scalar_plan)
|
||||
}
|
||||
|
||||
/// Create a [SPECIAL_ABSENT_FUNCTION] plan
|
||||
async fn create_absent_plan(
|
||||
&mut self,
|
||||
args: &PromFunctionArgs,
|
||||
session_state: &SessionState,
|
||||
) -> Result<LogicalPlan> {
|
||||
if args.args.len() != 1 {
|
||||
return FunctionInvalidArgumentSnafu {
|
||||
fn_name: SPECIAL_ABSENT_FUNCTION.to_string(),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
let input = self.prom_expr_to_plan(&args.args[0], session_state).await?;
|
||||
|
||||
let time_index_expr = self.create_time_index_column_expr()?;
|
||||
let first_field_expr =
|
||||
self.create_field_column_exprs()?
|
||||
.pop()
|
||||
.with_context(|| ValueNotFoundSnafu {
|
||||
table: self.ctx.table_name.clone().unwrap_or_default(),
|
||||
})?;
|
||||
let first_value_expr = first_value(first_field_expr, None);
|
||||
|
||||
let ordered_aggregated_input = LogicalPlanBuilder::from(input)
|
||||
.aggregate(
|
||||
vec![time_index_expr.clone()],
|
||||
vec![first_value_expr.clone()],
|
||||
)
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
.sort(vec![time_index_expr.sort(true, false)])
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
.build()
|
||||
.context(DataFusionPlanningSnafu)?;
|
||||
|
||||
let fake_labels = self
|
||||
.ctx
|
||||
.selector_matcher
|
||||
.iter()
|
||||
.filter_map(|matcher| match matcher.op {
|
||||
MatchOp::Equal => Some((matcher.name.clone(), matcher.value.clone())),
|
||||
_ => None,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Create the absent plan
|
||||
let absent_plan = LogicalPlan::Extension(Extension {
|
||||
node: Arc::new(
|
||||
Absent::try_new(
|
||||
self.ctx.start,
|
||||
self.ctx.end,
|
||||
self.ctx.interval,
|
||||
self.ctx.time_index_column.as_ref().unwrap().clone(),
|
||||
self.ctx.field_columns[0].clone(),
|
||||
fake_labels,
|
||||
ordered_aggregated_input,
|
||||
)
|
||||
.context(DataFusionPlanningSnafu)?,
|
||||
),
|
||||
});
|
||||
|
||||
Ok(absent_plan)
|
||||
}
|
||||
|
||||
/// Try to build a DataFusion Literal Expression from PromQL Expr, return
|
||||
/// `None` if the input is not a literal expression.
|
||||
fn try_build_literal_expr(expr: &PromExpr) -> Option<DfExpr> {
|
||||
|
||||
@@ -121,7 +121,7 @@ impl PrometheusGatewayService {
|
||||
let result = self.handler.do_query(&query, ctx).await;
|
||||
let (metric_name, mut result_type) =
|
||||
match retrieve_metric_name_and_result_type(&query.query) {
|
||||
Ok((metric_name, result_type)) => (metric_name.unwrap_or_default(), result_type),
|
||||
Ok((metric_name, result_type)) => (metric_name, result_type),
|
||||
Err(err) => {
|
||||
return PrometheusJsonResponse::error(err.status_code(), err.output_msg())
|
||||
}
|
||||
|
||||
@@ -38,7 +38,7 @@ use crate::error::{self, InternalSnafu, PipelineSnafu, Result};
|
||||
use crate::http::extractor::PipelineInfo;
|
||||
use crate::http::header::{write_cost_header_map, GREPTIME_DB_HEADER_METRICS};
|
||||
use crate::http::PromValidationMode;
|
||||
use crate::prom_store::{snappy_decompress, zstd_decompress};
|
||||
use crate::prom_store::{extract_schema_from_read_request, snappy_decompress, zstd_decompress};
|
||||
use crate::proto::{PromSeriesProcessor, PromWriteRequest};
|
||||
use crate::query_handler::{PipelineHandlerRef, PromStoreProtocolHandlerRef, PromStoreResponse};
|
||||
|
||||
@@ -117,6 +117,7 @@ pub async fn remote_write(
|
||||
let is_zstd = content_encoding.contains(VM_ENCODING);
|
||||
|
||||
let mut processor = PromSeriesProcessor::default_processor();
|
||||
|
||||
if let Some(pipeline_name) = pipeline_info.pipeline_name {
|
||||
let pipeline_def = PipelineDefinition::from_name(
|
||||
&pipeline_name,
|
||||
@@ -184,13 +185,19 @@ pub async fn remote_read(
|
||||
) -> Result<PromStoreResponse> {
|
||||
let db = params.db.clone().unwrap_or_default();
|
||||
query_ctx.set_channel(Channel::Prometheus);
|
||||
|
||||
let request = decode_remote_read_request(body).await?;
|
||||
|
||||
// Extract schema from special labels and set it in query context
|
||||
if let Some(schema) = extract_schema_from_read_request(&request) {
|
||||
query_ctx.set_current_schema(&schema);
|
||||
}
|
||||
|
||||
let query_ctx = Arc::new(query_ctx);
|
||||
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_READ_ELAPSED
|
||||
.with_label_values(&[db.as_str()])
|
||||
.start_timer();
|
||||
|
||||
let request = decode_remote_read_request(body).await?;
|
||||
|
||||
state.prom_store_handler.read(request, query_ctx).await
|
||||
}
|
||||
|
||||
|
||||
@@ -56,7 +56,7 @@ use crate::error::{
|
||||
TableNotFoundSnafu, UnexpectedResultSnafu,
|
||||
};
|
||||
use crate::http::header::collect_plan_metrics;
|
||||
use crate::prom_store::{FIELD_NAME_LABEL, METRIC_NAME_LABEL};
|
||||
use crate::prom_store::{DATABASE_LABEL, FIELD_NAME_LABEL, METRIC_NAME_LABEL, SCHEMA_LABEL};
|
||||
use crate::prometheus_handler::PrometheusHandlerRef;
|
||||
|
||||
/// For [ValueType::Vector] result type
|
||||
@@ -318,7 +318,7 @@ async fn do_instant_query(
|
||||
) -> PrometheusJsonResponse {
|
||||
let result = handler.do_query(prom_query, query_ctx).await;
|
||||
let (metric_name, result_type) = match retrieve_metric_name_and_result_type(&prom_query.query) {
|
||||
Ok((metric_name, result_type)) => (metric_name.unwrap_or_default(), result_type),
|
||||
Ok((metric_name, result_type)) => (metric_name, result_type),
|
||||
Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
|
||||
};
|
||||
PrometheusJsonResponse::from_query_result(result, metric_name, result_type).await
|
||||
@@ -428,7 +428,7 @@ async fn do_range_query(
|
||||
let result = handler.do_query(prom_query, query_ctx).await;
|
||||
let metric_name = match retrieve_metric_name_and_result_type(&prom_query.query) {
|
||||
Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
|
||||
Ok((metric_name, _)) => metric_name.unwrap_or_default(),
|
||||
Ok((metric_name, _)) => metric_name,
|
||||
};
|
||||
PrometheusJsonResponse::from_query_result(result, metric_name, ValueType::Matrix).await
|
||||
}
|
||||
@@ -824,13 +824,52 @@ pub(crate) fn try_update_catalog_schema(ctx: &mut QueryContext, catalog: &str, s
|
||||
}
|
||||
|
||||
fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
|
||||
find_metric_name_and_matchers(expr, |name, matchers| {
|
||||
name.clone().or(matchers
|
||||
.find_matchers(METRIC_NAME)
|
||||
.into_iter()
|
||||
.next()
|
||||
.map(|m| m.value))
|
||||
})
|
||||
let mut metric_names = HashSet::new();
|
||||
collect_metric_names(expr, &mut metric_names);
|
||||
|
||||
// Return the metric name only if there's exactly one unique metric name
|
||||
if metric_names.len() == 1 {
|
||||
metric_names.into_iter().next()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Recursively collect all metric names from a PromQL expression
|
||||
fn collect_metric_names(expr: &PromqlExpr, metric_names: &mut HashSet<String>) {
|
||||
match expr {
|
||||
PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => {
|
||||
collect_metric_names(expr, metric_names)
|
||||
}
|
||||
PromqlExpr::Unary(UnaryExpr { expr }) => collect_metric_names(expr, metric_names),
|
||||
PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
|
||||
collect_metric_names(lhs, metric_names);
|
||||
collect_metric_names(rhs, metric_names);
|
||||
}
|
||||
PromqlExpr::Paren(ParenExpr { expr }) => collect_metric_names(expr, metric_names),
|
||||
PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => collect_metric_names(expr, metric_names),
|
||||
PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
|
||||
if let Some(name) = name {
|
||||
metric_names.insert(name.clone());
|
||||
} else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
|
||||
metric_names.insert(matcher.value);
|
||||
}
|
||||
}
|
||||
PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
|
||||
let VectorSelector { name, matchers, .. } = vs;
|
||||
if let Some(name) = name {
|
||||
metric_names.insert(name.clone());
|
||||
} else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
|
||||
metric_names.insert(matcher.value);
|
||||
}
|
||||
}
|
||||
PromqlExpr::Call(Call { args, .. }) => {
|
||||
args.args
|
||||
.iter()
|
||||
.for_each(|e| collect_metric_names(e, metric_names));
|
||||
}
|
||||
PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
|
||||
}
|
||||
}
|
||||
|
||||
fn find_metric_name_and_matchers<E, F>(expr: &PromqlExpr, f: F) -> Option<E>
|
||||
@@ -995,6 +1034,19 @@ pub async fn label_values_query(
|
||||
let mut field_columns = field_columns.into_iter().collect::<Vec<_>>();
|
||||
field_columns.sort_unstable();
|
||||
return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(field_columns));
|
||||
} else if label_name == SCHEMA_LABEL || label_name == DATABASE_LABEL {
|
||||
let catalog_manager = handler.catalog_manager();
|
||||
|
||||
match retrieve_schema_names(&query_ctx, catalog_manager, params.matches.0).await {
|
||||
Ok(schema_names) => {
|
||||
return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(
|
||||
schema_names,
|
||||
));
|
||||
}
|
||||
Err(e) => {
|
||||
return PrometheusJsonResponse::error(e.status_code(), e.output_msg());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let queries = params.matches.0;
|
||||
@@ -1112,53 +1164,51 @@ async fn retrieve_field_names(
|
||||
Ok(field_columns)
|
||||
}
|
||||
|
||||
/// Try to parse and extract the name of referenced metric from the promql query.
|
||||
///
|
||||
/// Returns the metric name if a single metric is referenced, otherwise None.
|
||||
fn retrieve_metric_name_from_promql(query: &str) -> Option<String> {
|
||||
let promql_expr = promql_parser::parser::parse(query).ok()?;
|
||||
async fn retrieve_schema_names(
|
||||
query_ctx: &QueryContext,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
matches: Vec<String>,
|
||||
) -> Result<Vec<String>> {
|
||||
let mut schemas = Vec::new();
|
||||
let catalog = query_ctx.current_catalog();
|
||||
|
||||
struct MetricNameVisitor {
|
||||
metric_name: Option<String>,
|
||||
}
|
||||
let candidate_schemas = catalog_manager
|
||||
.schema_names(catalog, Some(query_ctx))
|
||||
.await
|
||||
.context(CatalogSnafu)?;
|
||||
|
||||
impl promql_parser::util::ExprVisitor for MetricNameVisitor {
|
||||
type Error = ();
|
||||
|
||||
fn pre_visit(&mut self, plan: &PromqlExpr) -> std::result::Result<bool, Self::Error> {
|
||||
let query_metric_name = match plan {
|
||||
PromqlExpr::VectorSelector(vs) => vs
|
||||
.matchers
|
||||
.find_matchers(METRIC_NAME)
|
||||
.into_iter()
|
||||
.next()
|
||||
.map(|m| m.value)
|
||||
.or_else(|| vs.name.clone()),
|
||||
PromqlExpr::MatrixSelector(ms) => ms
|
||||
.vs
|
||||
.matchers
|
||||
.find_matchers(METRIC_NAME)
|
||||
.into_iter()
|
||||
.next()
|
||||
.map(|m| m.value)
|
||||
.or_else(|| ms.vs.name.clone()),
|
||||
_ => return Ok(true),
|
||||
};
|
||||
|
||||
// set it to empty string if multiple metrics are referenced.
|
||||
if self.metric_name.is_some() && query_metric_name.is_some() {
|
||||
self.metric_name = Some(String::new());
|
||||
} else {
|
||||
self.metric_name = query_metric_name.or_else(|| self.metric_name.clone());
|
||||
for schema in candidate_schemas {
|
||||
let mut found = true;
|
||||
for match_item in &matches {
|
||||
if let Some(table_name) = retrieve_metric_name_from_promql(match_item) {
|
||||
let exists = catalog_manager
|
||||
.table_exists(catalog, &schema, &table_name, Some(query_ctx))
|
||||
.await
|
||||
.context(CatalogSnafu)?;
|
||||
if !exists {
|
||||
found = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
if found {
|
||||
schemas.push(schema);
|
||||
}
|
||||
}
|
||||
|
||||
let mut visitor = MetricNameVisitor { metric_name: None };
|
||||
promql_parser::util::walk_expr(&mut visitor, &promql_expr).ok()?;
|
||||
visitor.metric_name
|
||||
schemas.sort_unstable();
|
||||
|
||||
Ok(schemas)
|
||||
}
|
||||
|
||||
/// Try to parse and extract the name of referenced metric from the promql query.
|
||||
///
|
||||
/// Returns the metric name if exactly one unique metric is referenced, otherwise None.
|
||||
/// Multiple references to the same metric are allowed.
|
||||
fn retrieve_metric_name_from_promql(query: &str) -> Option<String> {
|
||||
let promql_expr = promql_parser::parser::parse(query).ok()?;
|
||||
promql_expr_to_metric_name(&promql_expr)
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Serialize, Deserialize)]
|
||||
@@ -1275,3 +1325,205 @@ pub async fn parse_query(
|
||||
PrometheusJsonResponse::error(StatusCode::InvalidArguments, "query is required")
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use promql_parser::parser::value::ValueType;
|
||||
|
||||
use super::*;
|
||||
|
||||
struct TestCase {
|
||||
name: &'static str,
|
||||
promql: &'static str,
|
||||
expected_metric: Option<&'static str>,
|
||||
expected_type: ValueType,
|
||||
should_error: bool,
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_retrieve_metric_name_and_result_type() {
|
||||
let test_cases = &[
|
||||
// Single metric cases
|
||||
TestCase {
|
||||
name: "simple metric",
|
||||
promql: "cpu_usage",
|
||||
expected_metric: Some("cpu_usage"),
|
||||
expected_type: ValueType::Vector,
|
||||
should_error: false,
|
||||
},
|
||||
TestCase {
|
||||
name: "metric with selector",
|
||||
promql: r#"cpu_usage{instance="localhost"}"#,
|
||||
expected_metric: Some("cpu_usage"),
|
||||
expected_type: ValueType::Vector,
|
||||
should_error: false,
|
||||
},
|
||||
TestCase {
|
||||
name: "metric with range selector",
|
||||
promql: "cpu_usage[5m]",
|
||||
expected_metric: Some("cpu_usage"),
|
||||
expected_type: ValueType::Matrix,
|
||||
should_error: false,
|
||||
},
|
||||
TestCase {
|
||||
name: "metric with __name__ matcher",
|
||||
promql: r#"{__name__="cpu_usage"}"#,
|
||||
expected_metric: Some("cpu_usage"),
|
||||
expected_type: ValueType::Vector,
|
||||
should_error: false,
|
||||
},
|
||||
TestCase {
|
||||
name: "metric with unary operator",
|
||||
promql: "-cpu_usage",
|
||||
expected_metric: Some("cpu_usage"),
|
||||
expected_type: ValueType::Vector,
|
||||
should_error: false,
|
||||
},
|
||||
// Aggregation and function cases
|
||||
TestCase {
|
||||
name: "metric with aggregation",
|
||||
promql: "sum(cpu_usage)",
|
||||
expected_metric: Some("cpu_usage"),
|
||||
expected_type: ValueType::Vector,
|
||||
should_error: false,
|
||||
},
|
||||
TestCase {
|
||||
name: "complex aggregation",
|
||||
promql: r#"sum by (instance) (cpu_usage{job="node"})"#,
|
||||
expected_metric: Some("cpu_usage"),
|
||||
expected_type: ValueType::Vector,
|
||||
should_error: false,
|
||||
},
|
||||
// Same metric binary operations
|
||||
TestCase {
|
||||
name: "same metric addition",
|
||||
promql: "cpu_usage + cpu_usage",
|
||||
expected_metric: Some("cpu_usage"),
|
||||
expected_type: ValueType::Vector,
|
||||
should_error: false,
|
||||
},
|
||||
TestCase {
|
||||
name: "metric with scalar addition",
|
||||
promql: r#"sum(rate(cpu_usage{job="node"}[5m])) by (instance) + 100"#,
|
||||
expected_metric: Some("cpu_usage"),
|
||||
expected_type: ValueType::Vector,
|
||||
should_error: false,
|
||||
},
|
||||
// Multiple metrics cases
|
||||
TestCase {
|
||||
name: "different metrics addition",
|
||||
promql: "cpu_usage + memory_usage",
|
||||
expected_metric: None,
|
||||
expected_type: ValueType::Vector,
|
||||
should_error: false,
|
||||
},
|
||||
TestCase {
|
||||
name: "different metrics subtraction",
|
||||
promql: "network_in - network_out",
|
||||
expected_metric: None,
|
||||
expected_type: ValueType::Vector,
|
||||
should_error: false,
|
||||
},
|
||||
// Unless operator cases
|
||||
TestCase {
|
||||
name: "unless with different metrics",
|
||||
promql: "cpu_usage unless memory_usage",
|
||||
expected_metric: None,
|
||||
expected_type: ValueType::Vector,
|
||||
should_error: false,
|
||||
},
|
||||
TestCase {
|
||||
name: "unless with same metric",
|
||||
promql: "cpu_usage unless cpu_usage",
|
||||
expected_metric: Some("cpu_usage"),
|
||||
expected_type: ValueType::Vector,
|
||||
should_error: false,
|
||||
},
|
||||
// Subquery cases
|
||||
TestCase {
|
||||
name: "basic subquery",
|
||||
promql: "cpu_usage[5m:1m]",
|
||||
expected_metric: Some("cpu_usage"),
|
||||
expected_type: ValueType::Matrix,
|
||||
should_error: false,
|
||||
},
|
||||
TestCase {
|
||||
name: "subquery with multiple metrics",
|
||||
promql: "(cpu_usage + memory_usage)[5m:1m]",
|
||||
expected_metric: None,
|
||||
expected_type: ValueType::Matrix,
|
||||
should_error: false,
|
||||
},
|
||||
// Literal values
|
||||
TestCase {
|
||||
name: "scalar value",
|
||||
promql: "42",
|
||||
expected_metric: None,
|
||||
expected_type: ValueType::Scalar,
|
||||
should_error: false,
|
||||
},
|
||||
TestCase {
|
||||
name: "string literal",
|
||||
promql: r#""hello world""#,
|
||||
expected_metric: None,
|
||||
expected_type: ValueType::String,
|
||||
should_error: false,
|
||||
},
|
||||
// Error cases
|
||||
TestCase {
|
||||
name: "invalid syntax",
|
||||
promql: "cpu_usage{invalid=",
|
||||
expected_metric: None,
|
||||
expected_type: ValueType::Vector,
|
||||
should_error: true,
|
||||
},
|
||||
TestCase {
|
||||
name: "empty query",
|
||||
promql: "",
|
||||
expected_metric: None,
|
||||
expected_type: ValueType::Vector,
|
||||
should_error: true,
|
||||
},
|
||||
TestCase {
|
||||
name: "malformed brackets",
|
||||
promql: "cpu_usage[5m",
|
||||
expected_metric: None,
|
||||
expected_type: ValueType::Vector,
|
||||
should_error: true,
|
||||
},
|
||||
];
|
||||
|
||||
for test_case in test_cases {
|
||||
let result = retrieve_metric_name_and_result_type(test_case.promql);
|
||||
|
||||
if test_case.should_error {
|
||||
assert!(
|
||||
result.is_err(),
|
||||
"Test '{}' should have failed but succeeded with: {:?}",
|
||||
test_case.name,
|
||||
result
|
||||
);
|
||||
} else {
|
||||
let (metric_name, value_type) = result.unwrap_or_else(|e| {
|
||||
panic!(
|
||||
"Test '{}' should have succeeded but failed with error: {}",
|
||||
test_case.name, e
|
||||
)
|
||||
});
|
||||
|
||||
let expected_metric_name = test_case.expected_metric.map(|s| s.to_string());
|
||||
assert_eq!(
|
||||
metric_name, expected_metric_name,
|
||||
"Test '{}': metric name mismatch. Expected: {:?}, Got: {:?}",
|
||||
test_case.name, expected_metric_name, metric_name
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
value_type, test_case.expected_type,
|
||||
"Test '{}': value type mismatch. Expected: {:?}, Got: {:?}",
|
||||
test_case.name, test_case.expected_type, value_type
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -118,7 +118,7 @@ impl PrometheusJsonResponse {
|
||||
/// Convert from `Result<Output>`
|
||||
pub async fn from_query_result(
|
||||
result: Result<Output>,
|
||||
metric_name: String,
|
||||
metric_name: Option<String>,
|
||||
result_type: ValueType,
|
||||
) -> Self {
|
||||
let response: Result<Self> = try {
|
||||
@@ -182,7 +182,7 @@ impl PrometheusJsonResponse {
|
||||
/// Convert [RecordBatches] to [PromData]
|
||||
fn record_batches_to_data(
|
||||
batches: RecordBatches,
|
||||
metric_name: String,
|
||||
metric_name: Option<String>,
|
||||
result_type: ValueType,
|
||||
) -> Result<PrometheusResponse> {
|
||||
// infer semantic type of each column from schema.
|
||||
@@ -230,7 +230,6 @@ impl PrometheusJsonResponse {
|
||||
reason: "no value column found".to_string(),
|
||||
})?;
|
||||
|
||||
let metric_name = (METRIC_NAME, metric_name.as_str());
|
||||
// Preserves the order of output tags.
|
||||
// Tag order matters, e.g., after sorc and sort_desc, the output order must be kept.
|
||||
let mut buffer = IndexMap::<Vec<(&str, &str)>, Vec<(f64, String)>>::new();
|
||||
@@ -276,9 +275,10 @@ impl PrometheusJsonResponse {
|
||||
}
|
||||
|
||||
// retrieve tags
|
||||
// TODO(ruihang): push table name `__metric__`
|
||||
let mut tags = Vec::with_capacity(num_label_columns + 1);
|
||||
tags.push(metric_name);
|
||||
if let Some(metric_name) = &metric_name {
|
||||
tags.push((METRIC_NAME, metric_name.as_str()));
|
||||
}
|
||||
for (tag_column, tag_name) in tag_columns.iter().zip(tag_names.iter()) {
|
||||
// TODO(ruihang): add test for NULL tag
|
||||
if let Some(tag_value) = tag_column.get_data(row_index) {
|
||||
|
||||
@@ -19,7 +19,7 @@ use std::collections::BTreeMap;
|
||||
use std::hash::{Hash, Hasher};
|
||||
|
||||
use api::prom_store::remote::label_matcher::Type as MatcherType;
|
||||
use api::prom_store::remote::{Label, Query, Sample, TimeSeries, WriteRequest};
|
||||
use api::prom_store::remote::{Label, Query, ReadRequest, Sample, TimeSeries, WriteRequest};
|
||||
use api::v1::RowInsertRequests;
|
||||
use common_grpc::precision::Precision;
|
||||
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
|
||||
@@ -44,6 +44,9 @@ pub const METRIC_NAME_LABEL_BYTES: &[u8] = b"__name__";
|
||||
pub const DATABASE_LABEL: &str = "__database__";
|
||||
pub const DATABASE_LABEL_BYTES: &[u8] = b"__database__";
|
||||
|
||||
pub const SCHEMA_LABEL: &str = "__schema__";
|
||||
pub const SCHEMA_LABEL_BYTES: &[u8] = b"__schema__";
|
||||
|
||||
pub const PHYSICAL_TABLE_LABEL: &str = "__physical_table__";
|
||||
pub const PHYSICAL_TABLE_LABEL_BYTES: &[u8] = b"__physical_table__";
|
||||
|
||||
@@ -73,6 +76,29 @@ pub fn table_name(q: &Query) -> Result<String> {
|
||||
})
|
||||
}
|
||||
|
||||
/// Extract schema from remote read request. Returns the first schema found from any query's matchers.
|
||||
/// Prioritizes __schema__ over __database__ labels.
|
||||
pub fn extract_schema_from_read_request(request: &ReadRequest) -> Option<String> {
|
||||
for query in &request.queries {
|
||||
for matcher in &query.matchers {
|
||||
if matcher.name == SCHEMA_LABEL && matcher.r#type == MatcherType::Eq as i32 {
|
||||
return Some(matcher.value.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If no __schema__ found, look for __database__
|
||||
for query in &request.queries {
|
||||
for matcher in &query.matchers {
|
||||
if matcher.name == DATABASE_LABEL && matcher.r#type == MatcherType::Eq as i32 {
|
||||
return Some(matcher.value.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
/// Create a DataFrame from a remote Query
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result<LogicalPlan> {
|
||||
@@ -91,7 +117,7 @@ pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result<LogicalPlan> {
|
||||
for m in label_matches {
|
||||
let name = &m.name;
|
||||
|
||||
if name == METRIC_NAME_LABEL {
|
||||
if name == METRIC_NAME_LABEL || name == SCHEMA_LABEL || name == DATABASE_LABEL {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
@@ -34,7 +34,7 @@ use crate::http::PromValidationMode;
|
||||
use crate::pipeline::run_pipeline;
|
||||
use crate::prom_row_builder::{PromCtx, TablesBuilder};
|
||||
use crate::prom_store::{
|
||||
DATABASE_LABEL_BYTES, METRIC_NAME_LABEL_BYTES, PHYSICAL_TABLE_LABEL_BYTES,
|
||||
DATABASE_LABEL_BYTES, METRIC_NAME_LABEL_BYTES, PHYSICAL_TABLE_LABEL_BYTES, SCHEMA_LABEL_BYTES,
|
||||
};
|
||||
use crate::query_handler::PipelineHandlerRef;
|
||||
use crate::repeated_field::{Clear, RepeatedField};
|
||||
@@ -199,10 +199,17 @@ impl PromTimeSeries {
|
||||
self.table_name = decode_string(&label.value, prom_validation_mode)?;
|
||||
self.labels.truncate(self.labels.len() - 1); // remove last label
|
||||
}
|
||||
DATABASE_LABEL_BYTES => {
|
||||
SCHEMA_LABEL_BYTES => {
|
||||
self.schema = Some(decode_string(&label.value, prom_validation_mode)?);
|
||||
self.labels.truncate(self.labels.len() - 1); // remove last label
|
||||
}
|
||||
DATABASE_LABEL_BYTES => {
|
||||
// Only set schema from __database__ if __schema__ hasn't been set yet
|
||||
if self.schema.is_none() {
|
||||
self.schema = Some(decode_string(&label.value, prom_validation_mode)?);
|
||||
}
|
||||
self.labels.truncate(self.labels.len() - 1); // remove last label
|
||||
}
|
||||
PHYSICAL_TABLE_LABEL_BYTES => {
|
||||
self.physical_table =
|
||||
Some(decode_string(&label.value, prom_validation_mode)?);
|
||||
|
||||
@@ -16,7 +16,10 @@ use std::collections::BTreeMap;
|
||||
use std::io::Write;
|
||||
use std::str::FromStr;
|
||||
|
||||
use api::prom_store::remote::WriteRequest;
|
||||
use api::prom_store::remote::label_matcher::Type as MatcherType;
|
||||
use api::prom_store::remote::{
|
||||
Label, LabelMatcher, Query, ReadRequest, ReadResponse, Sample, TimeSeries, WriteRequest,
|
||||
};
|
||||
use auth::user_provider_from_option;
|
||||
use axum::http::{HeaderName, HeaderValue, StatusCode};
|
||||
use chrono::Utc;
|
||||
@@ -94,6 +97,7 @@ macro_rules! http_tests {
|
||||
test_dashboard_path,
|
||||
test_prometheus_remote_write,
|
||||
test_prometheus_remote_special_labels,
|
||||
test_prometheus_remote_schema_labels,
|
||||
test_prometheus_remote_write_with_pipeline,
|
||||
test_vm_proto_remote_write,
|
||||
|
||||
@@ -781,6 +785,89 @@ pub async fn test_prom_http_api(store_type: StorageType) {
|
||||
serde_json::from_value::<PrometheusResponse>(json!(["host1", "host2"])).unwrap()
|
||||
);
|
||||
|
||||
// special labels
|
||||
let res = client
|
||||
.get("/v1/prometheus/api/v1/label/__schema__/values?start=0&end=600")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
|
||||
assert_eq!(body.status, "success");
|
||||
assert_eq!(
|
||||
body.data,
|
||||
serde_json::from_value::<PrometheusResponse>(json!([
|
||||
"greptime_private",
|
||||
"information_schema",
|
||||
"public"
|
||||
]))
|
||||
.unwrap()
|
||||
);
|
||||
|
||||
// special labels
|
||||
let res = client
|
||||
.get("/v1/prometheus/api/v1/label/__schema__/values?match[]=demo&start=0&end=600")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
|
||||
assert_eq!(body.status, "success");
|
||||
assert_eq!(
|
||||
body.data,
|
||||
serde_json::from_value::<PrometheusResponse>(json!(["public"])).unwrap()
|
||||
);
|
||||
|
||||
// special labels
|
||||
let res = client
|
||||
.get("/v1/prometheus/api/v1/label/__database__/values?match[]=demo&start=0&end=600")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
|
||||
assert_eq!(body.status, "success");
|
||||
assert_eq!(
|
||||
body.data,
|
||||
serde_json::from_value::<PrometheusResponse>(json!(["public"])).unwrap()
|
||||
);
|
||||
|
||||
// special labels
|
||||
let res = client
|
||||
.get("/v1/prometheus/api/v1/label/__database__/values?match[]=multi_labels{idc=\"idc1\", env=\"dev\"}&start=0&end=600")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
|
||||
assert_eq!(body.status, "success");
|
||||
assert_eq!(
|
||||
body.data,
|
||||
serde_json::from_value::<PrometheusResponse>(json!(["public"])).unwrap()
|
||||
);
|
||||
|
||||
// match special labels.
|
||||
let res = client
|
||||
.get("/v1/prometheus/api/v1/label/host/values?match[]=multi_labels{__schema__=\"public\", idc=\"idc1\", env=\"dev\"}&start=0&end=600")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
|
||||
assert_eq!(body.status, "success");
|
||||
assert_eq!(
|
||||
body.data,
|
||||
serde_json::from_value::<PrometheusResponse>(json!(["host1", "host2"])).unwrap()
|
||||
);
|
||||
|
||||
// match special labels.
|
||||
let res = client
|
||||
.get("/v1/prometheus/api/v1/label/host/values?match[]=multi_labels{__schema__=\"information_schema\", idc=\"idc1\", env=\"dev\"}&start=0&end=600")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
|
||||
assert_eq!(body.status, "success");
|
||||
assert_eq!(
|
||||
body.data,
|
||||
serde_json::from_value::<PrometheusResponse>(json!([])).unwrap()
|
||||
);
|
||||
|
||||
// search field name
|
||||
let res = client
|
||||
.get("/v1/prometheus/api/v1/label/__field__/values?match[]=demo")
|
||||
@@ -1138,6 +1225,7 @@ write_cache_path = ""
|
||||
write_cache_size = "5GiB"
|
||||
sst_write_buffer_size = "8MiB"
|
||||
parallel_scan_channel_size = 32
|
||||
max_concurrent_scan_files = 128
|
||||
allow_stale_entries = false
|
||||
min_compaction_interval = "0s"
|
||||
|
||||
@@ -1465,6 +1553,188 @@ pub async fn test_prometheus_remote_write_with_pipeline(store_type: StorageType)
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_prometheus_remote_schema_labels(store_type: StorageType) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (app, mut guard) =
|
||||
setup_test_prom_app_with_frontend(store_type, "test_prometheus_remote_schema_labels").await;
|
||||
let client = TestClient::new(app).await;
|
||||
|
||||
// Create test schemas
|
||||
let res = client
|
||||
.post("/v1/sql?sql=create database test_schema_1")
|
||||
.header("Content-Type", "application/x-www-form-urlencoded")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
let res = client
|
||||
.post("/v1/sql?sql=create database test_schema_2")
|
||||
.header("Content-Type", "application/x-www-form-urlencoded")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
// Write data with __schema__ label
|
||||
let schema_series = TimeSeries {
|
||||
labels: vec![
|
||||
Label {
|
||||
name: "__name__".to_string(),
|
||||
value: "metric_with_schema".to_string(),
|
||||
},
|
||||
Label {
|
||||
name: "__schema__".to_string(),
|
||||
value: "test_schema_1".to_string(),
|
||||
},
|
||||
Label {
|
||||
name: "instance".to_string(),
|
||||
value: "host1".to_string(),
|
||||
},
|
||||
],
|
||||
samples: vec![Sample {
|
||||
value: 100.0,
|
||||
timestamp: 1000,
|
||||
}],
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let write_request = WriteRequest {
|
||||
timeseries: vec![schema_series],
|
||||
..Default::default()
|
||||
};
|
||||
let serialized_request = write_request.encode_to_vec();
|
||||
let compressed_request =
|
||||
prom_store::snappy_compress(&serialized_request).expect("failed to encode snappy");
|
||||
|
||||
let res = client
|
||||
.post("/v1/prometheus/write")
|
||||
.header("Content-Encoding", "snappy")
|
||||
.body(compressed_request)
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
// Read data from test_schema_1 using __schema__ matcher
|
||||
let read_request = ReadRequest {
|
||||
queries: vec![Query {
|
||||
start_timestamp_ms: 500,
|
||||
end_timestamp_ms: 1500,
|
||||
matchers: vec![
|
||||
LabelMatcher {
|
||||
name: "__name__".to_string(),
|
||||
value: "metric_with_schema".to_string(),
|
||||
r#type: MatcherType::Eq as i32,
|
||||
},
|
||||
LabelMatcher {
|
||||
name: "__schema__".to_string(),
|
||||
value: "test_schema_1".to_string(),
|
||||
r#type: MatcherType::Eq as i32,
|
||||
},
|
||||
],
|
||||
..Default::default()
|
||||
}],
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let serialized_read_request = read_request.encode_to_vec();
|
||||
let compressed_read_request =
|
||||
prom_store::snappy_compress(&serialized_read_request).expect("failed to encode snappy");
|
||||
|
||||
let mut result = client
|
||||
.post("/v1/prometheus/read")
|
||||
.body(compressed_read_request)
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(result.status(), StatusCode::OK);
|
||||
|
||||
let response_body = result.chunk().await.unwrap();
|
||||
let decompressed_response = prom_store::snappy_decompress(&response_body).unwrap();
|
||||
let read_response = ReadResponse::decode(&decompressed_response[..]).unwrap();
|
||||
|
||||
assert_eq!(read_response.results.len(), 1);
|
||||
assert_eq!(read_response.results[0].timeseries.len(), 1);
|
||||
|
||||
let timeseries = &read_response.results[0].timeseries[0];
|
||||
assert_eq!(timeseries.samples.len(), 1);
|
||||
assert_eq!(timeseries.samples[0].value, 100.0);
|
||||
assert_eq!(timeseries.samples[0].timestamp, 1000);
|
||||
|
||||
// write data to unknown schema
|
||||
let unknown_schema_series = TimeSeries {
|
||||
labels: vec![
|
||||
Label {
|
||||
name: "__name__".to_string(),
|
||||
value: "metric_unknown_schema".to_string(),
|
||||
},
|
||||
Label {
|
||||
name: "__schema__".to_string(),
|
||||
value: "unknown_schema".to_string(),
|
||||
},
|
||||
Label {
|
||||
name: "instance".to_string(),
|
||||
value: "host2".to_string(),
|
||||
},
|
||||
],
|
||||
samples: vec![Sample {
|
||||
value: 200.0,
|
||||
timestamp: 2000,
|
||||
}],
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let unknown_write_request = WriteRequest {
|
||||
timeseries: vec![unknown_schema_series],
|
||||
..Default::default()
|
||||
};
|
||||
let serialized_unknown_request = unknown_write_request.encode_to_vec();
|
||||
let compressed_unknown_request =
|
||||
prom_store::snappy_compress(&serialized_unknown_request).expect("failed to encode snappy");
|
||||
|
||||
// Write data to unknown schema
|
||||
let res = client
|
||||
.post("/v1/prometheus/write")
|
||||
.header("Content-Encoding", "snappy")
|
||||
.body(compressed_unknown_request)
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::BAD_REQUEST);
|
||||
|
||||
// Read data from unknown schema
|
||||
let unknown_read_request = ReadRequest {
|
||||
queries: vec![Query {
|
||||
start_timestamp_ms: 1500,
|
||||
end_timestamp_ms: 2500,
|
||||
matchers: vec![
|
||||
LabelMatcher {
|
||||
name: "__name__".to_string(),
|
||||
value: "metric_unknown_schema".to_string(),
|
||||
r#type: MatcherType::Eq as i32,
|
||||
},
|
||||
LabelMatcher {
|
||||
name: "__schema__".to_string(),
|
||||
value: "unknown_schema".to_string(),
|
||||
r#type: MatcherType::Eq as i32,
|
||||
},
|
||||
],
|
||||
..Default::default()
|
||||
}],
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let serialized_unknown_read_request = unknown_read_request.encode_to_vec();
|
||||
let compressed_unknown_read_request =
|
||||
prom_store::snappy_compress(&serialized_unknown_read_request)
|
||||
.expect("failed to encode snappy");
|
||||
|
||||
let unknown_result = client
|
||||
.post("/v1/prometheus/read")
|
||||
.body(compressed_unknown_read_request)
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(unknown_result.status(), StatusCode::BAD_REQUEST);
|
||||
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_vm_proto_remote_write(store_type: StorageType) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (app, mut guard) =
|
||||
|
||||
122
tests/cases/standalone/common/promql/absent.result
Normal file
122
tests/cases/standalone/common/promql/absent.result
Normal file
@@ -0,0 +1,122 @@
|
||||
create table t (
|
||||
ts timestamp(3) time index,
|
||||
job STRING,
|
||||
instance STRING,
|
||||
val DOUBLE,
|
||||
PRIMARY KEY(job, instance),
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
insert into t values
|
||||
(0, 'job1', 'instance1', 1),
|
||||
(0, 'job2', 'instance2', 2),
|
||||
(5000, 'job1', 'instance3',3),
|
||||
(5000, 'job2', 'instance4',4),
|
||||
(10000, 'job1', 'instance5',5),
|
||||
(10000, 'job2', 'instance6',6),
|
||||
(15000, 'job1', 'instance7',7),
|
||||
(15000, 'job2', 'instance8',8);
|
||||
|
||||
Affected Rows: 8
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (0, 15, '5s') absent(t{job="job1"});
|
||||
|
||||
++
|
||||
++
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (0, 15, '5s') absent(t{job="job2"});
|
||||
|
||||
++
|
||||
++
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (0, 15, '5s') absent(t{job="job3"});
|
||||
|
||||
+---------------------+-----+------+
|
||||
| ts | val | job |
|
||||
+---------------------+-----+------+
|
||||
| 1970-01-01T00:00:00 | 1.0 | job3 |
|
||||
| 1970-01-01T00:00:05 | 1.0 | job3 |
|
||||
| 1970-01-01T00:00:10 | 1.0 | job3 |
|
||||
| 1970-01-01T00:00:15 | 1.0 | job3 |
|
||||
+---------------------+-----+------+
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (0, 15, '5s') absent(nonexistent_table);
|
||||
|
||||
+---------------------+-------+
|
||||
| time | value |
|
||||
+---------------------+-------+
|
||||
| 1970-01-01T00:00:00 | 1.0 |
|
||||
| 1970-01-01T00:00:05 | 1.0 |
|
||||
| 1970-01-01T00:00:10 | 1.0 |
|
||||
| 1970-01-01T00:00:15 | 1.0 |
|
||||
+---------------------+-------+
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (0, 15, '5s') absent(t{job="nonexistent_job"});
|
||||
|
||||
+---------------------+-----+-----------------+
|
||||
| ts | val | job |
|
||||
+---------------------+-----+-----------------+
|
||||
| 1970-01-01T00:00:00 | 1.0 | nonexistent_job |
|
||||
| 1970-01-01T00:00:05 | 1.0 | nonexistent_job |
|
||||
| 1970-01-01T00:00:10 | 1.0 | nonexistent_job |
|
||||
| 1970-01-01T00:00:15 | 1.0 | nonexistent_job |
|
||||
+---------------------+-----+-----------------+
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (1000, 1000, '1s') absent(t{job="job1"});
|
||||
|
||||
+---------------------+-----+------+
|
||||
| ts | val | job |
|
||||
+---------------------+-----+------+
|
||||
| 1970-01-01T00:16:40 | 1.0 | job1 |
|
||||
+---------------------+-----+------+
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (0, 15, '5s') absent(t{job="nonexistent_job1", job="nonexistent_job2"});
|
||||
|
||||
+---------------------+-----+------------------+
|
||||
| ts | val | job |
|
||||
+---------------------+-----+------------------+
|
||||
| 1970-01-01T00:00:00 | 1.0 | nonexistent_job2 |
|
||||
| 1970-01-01T00:00:05 | 1.0 | nonexistent_job2 |
|
||||
| 1970-01-01T00:00:10 | 1.0 | nonexistent_job2 |
|
||||
| 1970-01-01T00:00:15 | 1.0 | nonexistent_job2 |
|
||||
+---------------------+-----+------------------+
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (0, 15, '5s') absent(t{job=~"nonexistent_job1", job!="nonexistent_job2"});
|
||||
|
||||
+---------------------+-----+
|
||||
| ts | val |
|
||||
+---------------------+-----+
|
||||
| 1970-01-01T00:00:00 | 1.0 |
|
||||
| 1970-01-01T00:00:05 | 1.0 |
|
||||
| 1970-01-01T00:00:10 | 1.0 |
|
||||
| 1970-01-01T00:00:15 | 1.0 |
|
||||
+---------------------+-----+
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (0, 15, '5s') sum(t{job="job2"});
|
||||
|
||||
+---------------------+------------+
|
||||
| ts | sum(t.val) |
|
||||
+---------------------+------------+
|
||||
| 1970-01-01T00:00:00 | 2.0 |
|
||||
| 1970-01-01T00:00:05 | 6.0 |
|
||||
| 1970-01-01T00:00:10 | 12.0 |
|
||||
| 1970-01-01T00:00:15 | 20.0 |
|
||||
+---------------------+------------+
|
||||
|
||||
-- ABSENT is not supported for aggregation functions for now
|
||||
-- tql eval (0, 15, '5s') absent(sum(t{job="job2"}));
|
||||
-- tql eval (0, 15, '5s') absent(sum(t{job="job3"}));
|
||||
drop table t;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
50
tests/cases/standalone/common/promql/absent.sql
Normal file
50
tests/cases/standalone/common/promql/absent.sql
Normal file
@@ -0,0 +1,50 @@
|
||||
create table t (
|
||||
ts timestamp(3) time index,
|
||||
job STRING,
|
||||
instance STRING,
|
||||
val DOUBLE,
|
||||
PRIMARY KEY(job, instance),
|
||||
);
|
||||
|
||||
insert into t values
|
||||
(0, 'job1', 'instance1', 1),
|
||||
(0, 'job2', 'instance2', 2),
|
||||
(5000, 'job1', 'instance3',3),
|
||||
(5000, 'job2', 'instance4',4),
|
||||
(10000, 'job1', 'instance5',5),
|
||||
(10000, 'job2', 'instance6',6),
|
||||
(15000, 'job1', 'instance7',7),
|
||||
(15000, 'job2', 'instance8',8);
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (0, 15, '5s') absent(t{job="job1"});
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (0, 15, '5s') absent(t{job="job2"});
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (0, 15, '5s') absent(t{job="job3"});
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (0, 15, '5s') absent(nonexistent_table);
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (0, 15, '5s') absent(t{job="nonexistent_job"});
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (1000, 1000, '1s') absent(t{job="job1"});
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (0, 15, '5s') absent(t{job="nonexistent_job1", job="nonexistent_job2"});
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (0, 15, '5s') absent(t{job=~"nonexistent_job1", job!="nonexistent_job2"});
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (0, 15, '5s') sum(t{job="job2"});
|
||||
|
||||
-- ABSENT is not supported for aggregation functions for now
|
||||
-- tql eval (0, 15, '5s') absent(sum(t{job="job2"}));
|
||||
-- tql eval (0, 15, '5s') absent(sum(t{job="job3"}));
|
||||
|
||||
drop table t;
|
||||
Reference in New Issue
Block a user