mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-26 08:00:01 +00:00
Compare commits
6 Commits
feat/objbe
...
v0.13.0-ni
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
11a4f54c49 | ||
|
|
d363c8ee3c | ||
|
|
50b521c526 | ||
|
|
c9d70e0e28 | ||
|
|
c0c87652c3 | ||
|
|
faaa0affd0 |
2
.github/workflows/release.yml
vendored
2
.github/workflows/release.yml
vendored
@@ -91,7 +91,7 @@ env:
|
||||
# The scheduled version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-YYYYMMDD', like v0.2.0-nigthly-20230313;
|
||||
NIGHTLY_RELEASE_PREFIX: nightly
|
||||
# Note: The NEXT_RELEASE_VERSION should be modified manually by every formal release.
|
||||
NEXT_RELEASE_VERSION: v0.12.0
|
||||
NEXT_RELEASE_VERSION: v0.13.0
|
||||
|
||||
jobs:
|
||||
allocate-runners:
|
||||
|
||||
142
Cargo.lock
generated
142
Cargo.lock
generated
@@ -185,7 +185,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
|
||||
|
||||
[[package]]
|
||||
name = "api"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"common-base",
|
||||
"common-decimal",
|
||||
@@ -710,7 +710,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "auth"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -1324,7 +1324,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cache"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"catalog",
|
||||
"common-error",
|
||||
@@ -1348,7 +1348,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
|
||||
|
||||
[[package]]
|
||||
name = "catalog"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow",
|
||||
@@ -1661,7 +1661,7 @@ checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97"
|
||||
|
||||
[[package]]
|
||||
name = "cli"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"auth",
|
||||
@@ -1703,7 +1703,7 @@ dependencies = [
|
||||
"session",
|
||||
"snafu 0.8.5",
|
||||
"store-api",
|
||||
"substrait 0.12.0",
|
||||
"substrait 0.13.0",
|
||||
"table",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
@@ -1712,7 +1712,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "client"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arc-swap",
|
||||
@@ -1739,7 +1739,7 @@ dependencies = [
|
||||
"rand",
|
||||
"serde_json",
|
||||
"snafu 0.8.5",
|
||||
"substrait 0.12.0",
|
||||
"substrait 0.13.0",
|
||||
"substrait 0.37.3",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
@@ -1780,7 +1780,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cmd"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"auth",
|
||||
@@ -1841,7 +1841,7 @@ dependencies = [
|
||||
"similar-asserts",
|
||||
"snafu 0.8.5",
|
||||
"store-api",
|
||||
"substrait 0.12.0",
|
||||
"substrait 0.13.0",
|
||||
"table",
|
||||
"temp-env",
|
||||
"tempfile",
|
||||
@@ -1887,7 +1887,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
|
||||
|
||||
[[package]]
|
||||
name = "common-base"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"anymap2",
|
||||
"async-trait",
|
||||
@@ -1909,11 +1909,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-catalog"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
|
||||
[[package]]
|
||||
name = "common-config"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"common-base",
|
||||
"common-error",
|
||||
@@ -1938,7 +1938,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-datasource"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-schema",
|
||||
@@ -1974,7 +1974,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-decimal"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"bigdecimal 0.4.5",
|
||||
"common-error",
|
||||
@@ -1987,7 +1987,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-error"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"http 1.1.0",
|
||||
"snafu 0.8.5",
|
||||
@@ -1997,7 +1997,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-frontend"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"common-error",
|
||||
@@ -2007,7 +2007,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-function"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"api",
|
||||
@@ -2055,7 +2055,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-greptimedb-telemetry"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"common-runtime",
|
||||
@@ -2072,7 +2072,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-grpc"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow-flight",
|
||||
@@ -2100,7 +2100,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-grpc-expr"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"common-base",
|
||||
@@ -2119,7 +2119,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-macro"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"common-query",
|
||||
@@ -2133,7 +2133,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-mem-prof"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"common-error",
|
||||
"common-macro",
|
||||
@@ -2146,7 +2146,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-meta"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"anymap2",
|
||||
"api",
|
||||
@@ -2206,7 +2206,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-options"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"common-grpc",
|
||||
"humantime-serde",
|
||||
@@ -2215,11 +2215,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-plugins"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
|
||||
[[package]]
|
||||
name = "common-pprof"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"common-error",
|
||||
"common-macro",
|
||||
@@ -2231,7 +2231,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-procedure"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
@@ -2258,7 +2258,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-procedure-test"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"common-procedure",
|
||||
@@ -2266,7 +2266,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-query"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -2292,7 +2292,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-recordbatch"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"common-error",
|
||||
@@ -2311,7 +2311,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-runtime"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"clap 4.5.19",
|
||||
@@ -2341,7 +2341,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-telemetry"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"atty",
|
||||
"backtrace",
|
||||
@@ -2369,7 +2369,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-test-util"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"client",
|
||||
"common-query",
|
||||
@@ -2381,7 +2381,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-time"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"chrono",
|
||||
@@ -2399,7 +2399,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-version"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"build-data",
|
||||
"const_format",
|
||||
@@ -2409,7 +2409,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-wal"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"common-base",
|
||||
"common-error",
|
||||
@@ -3340,7 +3340,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "datanode"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow-flight",
|
||||
@@ -3392,7 +3392,7 @@ dependencies = [
|
||||
"session",
|
||||
"snafu 0.8.5",
|
||||
"store-api",
|
||||
"substrait 0.12.0",
|
||||
"substrait 0.13.0",
|
||||
"table",
|
||||
"tokio",
|
||||
"toml 0.8.19",
|
||||
@@ -3401,7 +3401,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "datatypes"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -4045,7 +4045,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "file-engine"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -4155,7 +4155,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
|
||||
|
||||
[[package]]
|
||||
name = "flow"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow",
|
||||
@@ -4216,7 +4216,7 @@ dependencies = [
|
||||
"snafu 0.8.5",
|
||||
"store-api",
|
||||
"strum 0.25.0",
|
||||
"substrait 0.12.0",
|
||||
"substrait 0.13.0",
|
||||
"table",
|
||||
"tokio",
|
||||
"tonic 0.12.3",
|
||||
@@ -4271,7 +4271,7 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
|
||||
|
||||
[[package]]
|
||||
name = "frontend"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arc-swap",
|
||||
@@ -5539,7 +5539,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "index"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"asynchronous-codec",
|
||||
@@ -6331,7 +6331,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
|
||||
|
||||
[[package]]
|
||||
name = "log-query"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"common-error",
|
||||
@@ -6343,7 +6343,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "log-store"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
@@ -6636,7 +6636,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "meta-client"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -6663,7 +6663,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "meta-srv"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -6749,7 +6749,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "metric-engine"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"aquamarine",
|
||||
@@ -6847,7 +6847,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "mito2"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"aquamarine",
|
||||
@@ -7544,7 +7544,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "object-store"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bytes",
|
||||
@@ -7793,7 +7793,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "operator"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"api",
|
||||
@@ -7841,7 +7841,7 @@ dependencies = [
|
||||
"sql",
|
||||
"sqlparser 0.52.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=71dd86058d2af97b9925093d40c4e03360403170)",
|
||||
"store-api",
|
||||
"substrait 0.12.0",
|
||||
"substrait 0.13.0",
|
||||
"table",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
@@ -8078,7 +8078,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "partition"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -8346,7 +8346,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
|
||||
|
||||
[[package]]
|
||||
name = "pipeline"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"api",
|
||||
@@ -8486,7 +8486,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "plugins"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"auth",
|
||||
"clap 4.5.19",
|
||||
@@ -8748,7 +8748,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "promql"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"async-trait",
|
||||
@@ -8993,7 +8993,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "puffin"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"async-compression 0.4.13",
|
||||
"async-trait",
|
||||
@@ -9034,7 +9034,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "query"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"api",
|
||||
@@ -9099,7 +9099,7 @@ dependencies = [
|
||||
"sqlparser 0.52.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=71dd86058d2af97b9925093d40c4e03360403170)",
|
||||
"statrs",
|
||||
"store-api",
|
||||
"substrait 0.12.0",
|
||||
"substrait 0.13.0",
|
||||
"table",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
@@ -10444,7 +10444,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "servers"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"api",
|
||||
@@ -10561,7 +10561,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "session"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arc-swap",
|
||||
@@ -10870,7 +10870,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "sql"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"chrono",
|
||||
@@ -10924,7 +10924,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "sqlness-runner"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"clap 4.5.19",
|
||||
@@ -11241,7 +11241,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "store-api"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"aquamarine",
|
||||
@@ -11371,7 +11371,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "substrait"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
@@ -11552,7 +11552,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "table"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -11803,7 +11803,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
|
||||
|
||||
[[package]]
|
||||
name = "tests-fuzz"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"arbitrary",
|
||||
"async-trait",
|
||||
@@ -11847,7 +11847,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tests-integration"
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow-flight",
|
||||
@@ -11913,7 +11913,7 @@ dependencies = [
|
||||
"sql",
|
||||
"sqlx",
|
||||
"store-api",
|
||||
"substrait 0.12.0",
|
||||
"substrait 0.13.0",
|
||||
"table",
|
||||
"tempfile",
|
||||
"time",
|
||||
|
||||
@@ -67,7 +67,7 @@ members = [
|
||||
resolver = "2"
|
||||
|
||||
[workspace.package]
|
||||
version = "0.12.0"
|
||||
version = "0.13.0"
|
||||
edition = "2021"
|
||||
license = "Apache-2.0"
|
||||
|
||||
|
||||
40
docs/benchmarks/tsbs/v0.12.0.md
Normal file
40
docs/benchmarks/tsbs/v0.12.0.md
Normal file
@@ -0,0 +1,40 @@
|
||||
# TSBS benchmark - v0.12.0
|
||||
|
||||
## Environment
|
||||
|
||||
### Amazon EC2
|
||||
|
||||
| | |
|
||||
|---------|-------------------------|
|
||||
| Machine | c5d.2xlarge |
|
||||
| CPU | 8 core |
|
||||
| Memory | 16GB |
|
||||
| Disk | 100GB (GP3) |
|
||||
| OS | Ubuntu Server 24.04 LTS |
|
||||
|
||||
## Write performance
|
||||
|
||||
| Environment | Ingest rate (rows/s) |
|
||||
|-----------------|----------------------|
|
||||
| EC2 c5d.2xlarge | 326839.28 |
|
||||
|
||||
## Query performance
|
||||
|
||||
| Query type | EC2 c5d.2xlarge (ms) |
|
||||
|-----------------------|----------------------|
|
||||
| cpu-max-all-1 | 12.46 |
|
||||
| cpu-max-all-8 | 24.20 |
|
||||
| double-groupby-1 | 673.08 |
|
||||
| double-groupby-5 | 963.99 |
|
||||
| double-groupby-all | 1330.05 |
|
||||
| groupby-orderby-limit | 952.46 |
|
||||
| high-cpu-1 | 5.08 |
|
||||
| high-cpu-all | 4638.57 |
|
||||
| lastpoint | 591.02 |
|
||||
| single-groupby-1-1-1 | 4.06 |
|
||||
| single-groupby-1-1-12 | 4.73 |
|
||||
| single-groupby-1-8-1 | 8.23 |
|
||||
| single-groupby-5-1-1 | 4.61 |
|
||||
| single-groupby-5-1-12 | 5.61 |
|
||||
| single-groupby-5-8-1 | 9.74 |
|
||||
|
||||
@@ -22,6 +22,7 @@ mod scalar_add;
|
||||
mod scalar_mul;
|
||||
pub(crate) mod sum;
|
||||
mod vector_add;
|
||||
mod vector_dim;
|
||||
mod vector_div;
|
||||
mod vector_mul;
|
||||
mod vector_norm;
|
||||
@@ -54,6 +55,7 @@ impl VectorFunction {
|
||||
registry.register(Arc::new(vector_mul::VectorMulFunction));
|
||||
registry.register(Arc::new(vector_div::VectorDivFunction));
|
||||
registry.register(Arc::new(vector_norm::VectorNormFunction));
|
||||
registry.register(Arc::new(vector_dim::VectorDimFunction));
|
||||
registry.register(Arc::new(elem_sum::ElemSumFunction));
|
||||
registry.register(Arc::new(elem_product::ElemProductFunction));
|
||||
}
|
||||
|
||||
172
src/common/function/src/scalars/vector/vector_dim.rs
Normal file
172
src/common/function/src/scalars/vector/vector_dim.rs
Normal file
@@ -0,0 +1,172 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::borrow::Cow;
|
||||
use std::fmt::Display;
|
||||
|
||||
use common_query::error::InvalidFuncArgsSnafu;
|
||||
use common_query::prelude::{Signature, TypeSignature, Volatility};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::scalars::ScalarVectorBuilder;
|
||||
use datatypes::vectors::{MutableVector, UInt64VectorBuilder, VectorRef};
|
||||
use snafu::ensure;
|
||||
|
||||
use crate::function::{Function, FunctionContext};
|
||||
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const};
|
||||
|
||||
const NAME: &str = "vec_dim";
|
||||
|
||||
/// Returns the dimension of the vector.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```sql
|
||||
/// SELECT vec_dim('[7.0, 8.0, 9.0, 10.0]');
|
||||
///
|
||||
/// +---------------------------------------------------------------+
|
||||
/// | vec_dim(Utf8("[7.0, 8.0, 9.0, 10.0]")) |
|
||||
/// +---------------------------------------------------------------+
|
||||
/// | 4 |
|
||||
/// +---------------------------------------------------------------+
|
||||
///
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct VectorDimFunction;
|
||||
|
||||
impl Function for VectorDimFunction {
|
||||
fn name(&self) -> &str {
|
||||
NAME
|
||||
}
|
||||
|
||||
fn return_type(
|
||||
&self,
|
||||
_input_types: &[ConcreteDataType],
|
||||
) -> common_query::error::Result<ConcreteDataType> {
|
||||
Ok(ConcreteDataType::uint64_datatype())
|
||||
}
|
||||
|
||||
fn signature(&self) -> Signature {
|
||||
Signature::one_of(
|
||||
vec![
|
||||
TypeSignature::Exact(vec![ConcreteDataType::string_datatype()]),
|
||||
TypeSignature::Exact(vec![ConcreteDataType::binary_datatype()]),
|
||||
],
|
||||
Volatility::Immutable,
|
||||
)
|
||||
}
|
||||
|
||||
fn eval(
|
||||
&self,
|
||||
_func_ctx: FunctionContext,
|
||||
columns: &[VectorRef],
|
||||
) -> common_query::error::Result<VectorRef> {
|
||||
ensure!(
|
||||
columns.len() == 1,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect exactly one, have: {}",
|
||||
columns.len()
|
||||
)
|
||||
}
|
||||
);
|
||||
let arg0 = &columns[0];
|
||||
|
||||
let len = arg0.len();
|
||||
let mut result = UInt64VectorBuilder::with_capacity(len);
|
||||
if len == 0 {
|
||||
return Ok(result.to_vector());
|
||||
}
|
||||
|
||||
let arg0_const = as_veclit_if_const(arg0)?;
|
||||
|
||||
for i in 0..len {
|
||||
let arg0 = match arg0_const.as_ref() {
|
||||
Some(arg0) => Some(Cow::Borrowed(arg0.as_ref())),
|
||||
None => as_veclit(arg0.get_ref(i))?,
|
||||
};
|
||||
let Some(arg0) = arg0 else {
|
||||
result.push_null();
|
||||
continue;
|
||||
};
|
||||
result.push(Some(arg0.len() as u64));
|
||||
}
|
||||
|
||||
Ok(result.to_vector())
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for VectorDimFunction {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", NAME.to_ascii_uppercase())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_query::error::Error;
|
||||
use datatypes::vectors::StringVector;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_vec_dim() {
|
||||
let func = VectorDimFunction;
|
||||
|
||||
let input0 = Arc::new(StringVector::from(vec![
|
||||
Some("[0.0,2.0,3.0]".to_string()),
|
||||
Some("[1.0,2.0,3.0,4.0]".to_string()),
|
||||
None,
|
||||
Some("[5.0]".to_string()),
|
||||
]));
|
||||
|
||||
let result = func.eval(FunctionContext::default(), &[input0]).unwrap();
|
||||
|
||||
let result = result.as_ref();
|
||||
assert_eq!(result.len(), 4);
|
||||
assert_eq!(result.get_ref(0).as_u64().unwrap(), Some(3));
|
||||
assert_eq!(result.get_ref(1).as_u64().unwrap(), Some(4));
|
||||
assert!(result.get_ref(2).is_null());
|
||||
assert_eq!(result.get_ref(3).as_u64().unwrap(), Some(1));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_dim_error() {
|
||||
let func = VectorDimFunction;
|
||||
|
||||
let input0 = Arc::new(StringVector::from(vec![
|
||||
Some("[1.0,2.0,3.0]".to_string()),
|
||||
Some("[4.0,5.0,6.0]".to_string()),
|
||||
None,
|
||||
Some("[2.0,3.0,3.0]".to_string()),
|
||||
]));
|
||||
let input1 = Arc::new(StringVector::from(vec![
|
||||
Some("[1.0,1.0,1.0]".to_string()),
|
||||
Some("[6.0,5.0,4.0]".to_string()),
|
||||
Some("[3.0,2.0,2.0]".to_string()),
|
||||
]));
|
||||
|
||||
let result = func.eval(FunctionContext::default(), &[input0, input1]);
|
||||
|
||||
match result {
|
||||
Err(Error::InvalidFuncArgs { err_msg, .. }) => {
|
||||
assert_eq!(
|
||||
err_msg,
|
||||
"The length of the args is not correct, expect exactly one, have: 2"
|
||||
)
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -36,11 +36,11 @@ use servers::error::{
|
||||
TableNotFoundSnafu,
|
||||
};
|
||||
use servers::http::jaeger::QueryTraceParams;
|
||||
use servers::otlp::trace::{
|
||||
use servers::otlp::trace::v0::{
|
||||
DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_ID_COLUMN,
|
||||
SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
|
||||
TRACE_TABLE_NAME,
|
||||
};
|
||||
use servers::otlp::trace::TRACE_TABLE_NAME;
|
||||
use servers::query_handler::JaegerQueryHandler;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
@@ -72,7 +72,10 @@ impl OpenTelemetryProtocolHandler for Instance {
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn traces(
|
||||
&self,
|
||||
pipeline_handler: PipelineHandlerRef,
|
||||
request: ExportTraceServiceRequest,
|
||||
pipeline: PipelineWay,
|
||||
pipeline_params: GreptimePipelineParams,
|
||||
table_name: String,
|
||||
ctx: QueryContextRef,
|
||||
) -> ServerResult<Output> {
|
||||
@@ -87,9 +90,14 @@ impl OpenTelemetryProtocolHandler for Instance {
|
||||
.get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
|
||||
interceptor_ref.pre_execute(ctx.clone())?;
|
||||
|
||||
let spans = otlp::trace::parse(request);
|
||||
|
||||
let (requests, rows) = otlp::trace::to_grpc_insert_requests(table_name, spans)?;
|
||||
let (requests, rows) = otlp::trace::to_grpc_insert_requests(
|
||||
request,
|
||||
pipeline,
|
||||
pipeline_params,
|
||||
table_name,
|
||||
&ctx,
|
||||
pipeline_handler,
|
||||
)?;
|
||||
|
||||
OTLP_TRACES_ROWS.inc_by(rows as u64);
|
||||
|
||||
|
||||
@@ -164,7 +164,6 @@ where
|
||||
let grpc_server = builder
|
||||
.database_handler(greptime_request_handler.clone())
|
||||
.prometheus_handler(self.instance.clone(), user_provider.clone())
|
||||
.otlp_handler(self.instance.clone(), user_provider)
|
||||
.flight_handler(Arc::new(greptime_request_handler))
|
||||
.build();
|
||||
Ok(grpc_server)
|
||||
|
||||
@@ -162,15 +162,38 @@ impl MetricEngineInner {
|
||||
let physical_region_id = validate_create_logical_regions(&requests)?;
|
||||
let data_region_id = utils::to_data_region_id(physical_region_id);
|
||||
|
||||
ensure!(
|
||||
self.state
|
||||
.read()
|
||||
.unwrap()
|
||||
.exist_physical_region(data_region_id),
|
||||
PhysicalRegionNotFoundSnafu {
|
||||
region_id: data_region_id,
|
||||
}
|
||||
);
|
||||
|
||||
// Filters out the requests that the logical region already exists
|
||||
let requests = {
|
||||
let state = self.state.read().unwrap();
|
||||
let logical_region_exists = state.logical_region_exists_filter(data_region_id);
|
||||
// TODO(weny): log the skipped logical regions
|
||||
requests
|
||||
.into_iter()
|
||||
.filter(|(region_id, _)| !logical_region_exists(region_id))
|
||||
.collect::<Vec<_>>()
|
||||
let mut skipped = Vec::with_capacity(requests.len());
|
||||
let mut kept_requests = Vec::with_capacity(requests.len());
|
||||
|
||||
for (region_id, request) in requests {
|
||||
if state.is_logical_region_exist(region_id) {
|
||||
skipped.push(region_id);
|
||||
} else {
|
||||
kept_requests.push((region_id, request));
|
||||
}
|
||||
}
|
||||
|
||||
// log skipped regions
|
||||
if !skipped.is_empty() {
|
||||
info!(
|
||||
"Skipped creating logical regions {skipped:?} because they already exist",
|
||||
skipped = skipped
|
||||
);
|
||||
}
|
||||
kept_requests
|
||||
};
|
||||
|
||||
// Finds new columns to add to physical region
|
||||
|
||||
@@ -83,18 +83,6 @@ pub(crate) struct MetricEngineState {
|
||||
}
|
||||
|
||||
impl MetricEngineState {
|
||||
pub fn logical_region_exists_filter(
|
||||
&self,
|
||||
physical_region_id: RegionId,
|
||||
) -> impl for<'a> Fn(&'a RegionId) -> bool + use<'_> {
|
||||
let state = self
|
||||
.physical_region_states()
|
||||
.get(&physical_region_id)
|
||||
.unwrap();
|
||||
|
||||
move |logical_region_id| state.logical_regions().contains(logical_region_id)
|
||||
}
|
||||
|
||||
pub fn add_physical_region(
|
||||
&mut self,
|
||||
physical_region_id: RegionId,
|
||||
|
||||
@@ -20,12 +20,9 @@ pub mod processor;
|
||||
pub mod transform;
|
||||
pub mod value;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use error::{
|
||||
IntermediateKeyIndexSnafu, PrepareValueMustBeObjectSnafu, YamlLoadSnafu, YamlParseSnafu,
|
||||
};
|
||||
use itertools::Itertools;
|
||||
use processor::{Processor, Processors};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use transform::{Transformer, Transforms};
|
||||
@@ -34,7 +31,6 @@ use yaml_rust::YamlLoader;
|
||||
|
||||
use crate::dispatcher::{Dispatcher, Rule};
|
||||
use crate::etl::error::Result;
|
||||
use crate::{GreptimeTransformer, PipelineVersion};
|
||||
|
||||
const DESCRIPTION: &str = "description";
|
||||
const PROCESSORS: &str = "processors";
|
||||
@@ -214,57 +210,6 @@ pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str
|
||||
.context(IntermediateKeyIndexSnafu { kind, key })
|
||||
}
|
||||
|
||||
/// SelectInfo is used to store the selected keys from OpenTelemetry record attrs
|
||||
/// The key is used to uplift value from the attributes and serve as column name in the table
|
||||
#[derive(Default)]
|
||||
pub struct SelectInfo {
|
||||
pub keys: Vec<String>,
|
||||
}
|
||||
|
||||
/// Try to convert a string to SelectInfo
|
||||
/// The string should be a comma-separated list of keys
|
||||
/// example: "key1,key2,key3"
|
||||
/// The keys will be sorted and deduplicated
|
||||
impl From<String> for SelectInfo {
|
||||
fn from(value: String) -> Self {
|
||||
let mut keys: Vec<String> = value.split(',').map(|s| s.to_string()).sorted().collect();
|
||||
keys.dedup();
|
||||
|
||||
SelectInfo { keys }
|
||||
}
|
||||
}
|
||||
|
||||
impl SelectInfo {
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.keys.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
pub const GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME: &str = "greptime_identity";
|
||||
|
||||
/// Enum for holding information of a pipeline, which is either pipeline itself,
|
||||
/// or information that be used to retrieve a pipeline from `PipelineHandler`
|
||||
pub enum PipelineDefinition {
|
||||
Resolved(Arc<Pipeline<GreptimeTransformer>>),
|
||||
ByNameAndValue((String, PipelineVersion)),
|
||||
GreptimeIdentityPipeline,
|
||||
}
|
||||
|
||||
impl PipelineDefinition {
|
||||
pub fn from_name(name: &str, version: PipelineVersion) -> Self {
|
||||
if name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME {
|
||||
Self::GreptimeIdentityPipeline
|
||||
} else {
|
||||
Self::ByNameAndValue((name.to_owned(), version))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub enum PipelineWay {
|
||||
OtlpLogDirect(Box<SelectInfo>),
|
||||
Pipeline(PipelineDefinition),
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use api::v1::Rows;
|
||||
|
||||
@@ -25,10 +25,10 @@ pub use etl::transform::{GreptimeTransformer, Transformer};
|
||||
pub use etl::value::{Array, Map, Value};
|
||||
pub use etl::{
|
||||
error as etl_error, json_array_to_intermediate_state, json_to_intermediate_state, parse,
|
||||
Content, DispatchedTo, Pipeline, PipelineDefinition, PipelineExecOutput, PipelineMap,
|
||||
PipelineWay, SelectInfo, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
|
||||
Content, DispatchedTo, Pipeline, PipelineExecOutput, PipelineMap,
|
||||
};
|
||||
pub use manager::{
|
||||
error, pipeline_operator, table, util, PipelineInfo, PipelineRef, PipelineTableRef,
|
||||
PipelineVersion,
|
||||
error, pipeline_operator, table, util, PipelineDefinition, PipelineInfo, PipelineRef,
|
||||
PipelineTableRef, PipelineVersion, PipelineWay, SelectInfo,
|
||||
GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
|
||||
};
|
||||
|
||||
@@ -16,6 +16,8 @@ use std::sync::Arc;
|
||||
|
||||
use common_time::Timestamp;
|
||||
use datatypes::timestamp::TimestampNanosecond;
|
||||
use itertools::Itertools;
|
||||
use util::to_pipeline_version;
|
||||
|
||||
use crate::table::PipelineTable;
|
||||
use crate::{GreptimeTransformer, Pipeline};
|
||||
@@ -37,3 +39,78 @@ pub type PipelineInfo = (Timestamp, PipelineRef);
|
||||
|
||||
pub type PipelineTableRef = Arc<PipelineTable>;
|
||||
pub type PipelineRef = Arc<Pipeline<GreptimeTransformer>>;
|
||||
|
||||
/// SelectInfo is used to store the selected keys from OpenTelemetry record attrs
|
||||
/// The key is used to uplift value from the attributes and serve as column name in the table
|
||||
#[derive(Default)]
|
||||
pub struct SelectInfo {
|
||||
pub keys: Vec<String>,
|
||||
}
|
||||
|
||||
/// Try to convert a string to SelectInfo
|
||||
/// The string should be a comma-separated list of keys
|
||||
/// example: "key1,key2,key3"
|
||||
/// The keys will be sorted and deduplicated
|
||||
impl From<String> for SelectInfo {
|
||||
fn from(value: String) -> Self {
|
||||
let mut keys: Vec<String> = value.split(',').map(|s| s.to_string()).sorted().collect();
|
||||
keys.dedup();
|
||||
|
||||
SelectInfo { keys }
|
||||
}
|
||||
}
|
||||
|
||||
impl SelectInfo {
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.keys.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
pub const GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME: &str = "greptime_identity";
|
||||
pub const GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME: &str = "greptime_trace_v1";
|
||||
|
||||
/// Enum for holding information of a pipeline, which is either pipeline itself,
|
||||
/// or information that be used to retrieve a pipeline from `PipelineHandler`
|
||||
pub enum PipelineDefinition {
|
||||
Resolved(Arc<Pipeline<GreptimeTransformer>>),
|
||||
ByNameAndValue((String, PipelineVersion)),
|
||||
GreptimeIdentityPipeline,
|
||||
}
|
||||
|
||||
impl PipelineDefinition {
|
||||
pub fn from_name(name: &str, version: PipelineVersion) -> Self {
|
||||
if name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME {
|
||||
Self::GreptimeIdentityPipeline
|
||||
} else {
|
||||
Self::ByNameAndValue((name.to_owned(), version))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub enum PipelineWay {
|
||||
OtlpLogDirect(Box<SelectInfo>),
|
||||
Pipeline(PipelineDefinition),
|
||||
OtlpTraceDirectV0,
|
||||
OtlpTraceDirectV1,
|
||||
}
|
||||
|
||||
impl PipelineWay {
|
||||
pub fn from_name_and_default(
|
||||
name: Option<&str>,
|
||||
version: Option<&str>,
|
||||
default_pipeline: PipelineWay,
|
||||
) -> error::Result<PipelineWay> {
|
||||
if let Some(pipeline_name) = name {
|
||||
if pipeline_name == GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME {
|
||||
Ok(PipelineWay::OtlpTraceDirectV1)
|
||||
} else {
|
||||
Ok(PipelineWay::Pipeline(PipelineDefinition::from_name(
|
||||
pipeline_name,
|
||||
to_pipeline_version(version)?,
|
||||
)))
|
||||
}
|
||||
} else {
|
||||
Ok(default_pipeline)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,10 +23,10 @@ use crate::table::{
|
||||
};
|
||||
use crate::PipelineVersion;
|
||||
|
||||
pub fn to_pipeline_version(version_str: Option<String>) -> Result<PipelineVersion> {
|
||||
pub fn to_pipeline_version(version_str: Option<&str>) -> Result<PipelineVersion> {
|
||||
match version_str {
|
||||
Some(version) => {
|
||||
let ts = Timestamp::from_str_utc(&version)
|
||||
let ts = Timestamp::from_str_utc(version)
|
||||
.map_err(|_| InvalidPipelineVersionSnafu { version }.build())?;
|
||||
Ok(Some(TimestampNanosecond(ts)))
|
||||
}
|
||||
@@ -73,14 +73,14 @@ mod tests {
|
||||
assert!(none_result.is_ok());
|
||||
assert!(none_result.unwrap().is_none());
|
||||
|
||||
let some_result = to_pipeline_version(Some("2023-01-01 00:00:00Z".to_string()));
|
||||
let some_result = to_pipeline_version(Some("2023-01-01 00:00:00Z"));
|
||||
assert!(some_result.is_ok());
|
||||
assert_eq!(
|
||||
some_result.unwrap(),
|
||||
Some(TimestampNanosecond::new(1672531200000000000))
|
||||
);
|
||||
|
||||
let invalid = to_pipeline_version(Some("invalid".to_string()));
|
||||
let invalid = to_pipeline_version(Some("invalid"));
|
||||
assert!(invalid.is_err());
|
||||
}
|
||||
|
||||
|
||||
@@ -18,7 +18,6 @@ mod cancellation;
|
||||
mod database;
|
||||
pub mod flight;
|
||||
pub mod greptime_handler;
|
||||
mod otlp;
|
||||
pub mod prom_query_gateway;
|
||||
pub mod region_server;
|
||||
|
||||
|
||||
@@ -29,12 +29,6 @@ pub struct AuthMiddlewareLayer {
|
||||
user_provider: Option<UserProviderRef>,
|
||||
}
|
||||
|
||||
impl AuthMiddlewareLayer {
|
||||
pub fn with(user_provider: Option<UserProviderRef>) -> Self {
|
||||
Self { user_provider }
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Layer<S> for AuthMiddlewareLayer {
|
||||
type Service = AuthMiddleware<S>;
|
||||
|
||||
|
||||
@@ -19,25 +19,18 @@ use arrow_flight::flight_service_server::FlightServiceServer;
|
||||
use auth::UserProviderRef;
|
||||
use common_grpc::error::{Error, InvalidConfigFilePathSnafu, Result};
|
||||
use common_runtime::Runtime;
|
||||
use opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer;
|
||||
use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer;
|
||||
use snafu::ResultExt;
|
||||
use tokio::sync::Mutex;
|
||||
use tonic::codec::CompressionEncoding;
|
||||
use tonic::service::RoutesBuilder;
|
||||
use tonic::transport::{Identity, ServerTlsConfig};
|
||||
use tower::ServiceBuilder;
|
||||
|
||||
use super::flight::{FlightCraftRef, FlightCraftWrapper};
|
||||
use super::region_server::{RegionServerHandlerRef, RegionServerRequestHandler};
|
||||
use super::{GrpcServer, GrpcServerConfig};
|
||||
use crate::grpc::authorize::AuthMiddlewareLayer;
|
||||
use crate::grpc::database::DatabaseService;
|
||||
use crate::grpc::greptime_handler::GreptimeRequestHandler;
|
||||
use crate::grpc::otlp::OtlpService;
|
||||
use crate::grpc::prom_query_gateway::PrometheusGatewayService;
|
||||
use crate::prometheus_handler::PrometheusHandlerRef;
|
||||
use crate::query_handler::OpenTelemetryProtocolHandlerRef;
|
||||
use crate::tls::TlsOption;
|
||||
|
||||
/// Add a gRPC service (`service`) to a `builder`([RoutesBuilder]).
|
||||
@@ -127,37 +120,6 @@ impl GrpcServerBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// Add handler for OpenTelemetry Protocol (OTLP) requests.
|
||||
pub fn otlp_handler(
|
||||
mut self,
|
||||
otlp_handler: OpenTelemetryProtocolHandlerRef,
|
||||
user_provider: Option<UserProviderRef>,
|
||||
) -> Self {
|
||||
let tracing_service = TraceServiceServer::new(OtlpService::new(otlp_handler.clone()))
|
||||
.accept_compressed(CompressionEncoding::Gzip)
|
||||
.accept_compressed(CompressionEncoding::Zstd)
|
||||
.send_compressed(CompressionEncoding::Gzip)
|
||||
.send_compressed(CompressionEncoding::Zstd);
|
||||
|
||||
let trace_server = ServiceBuilder::new()
|
||||
.layer(AuthMiddlewareLayer::with(user_provider.clone()))
|
||||
.service(tracing_service);
|
||||
self.routes_builder.add_service(trace_server);
|
||||
|
||||
let metrics_service = MetricsServiceServer::new(OtlpService::new(otlp_handler))
|
||||
.accept_compressed(CompressionEncoding::Gzip)
|
||||
.accept_compressed(CompressionEncoding::Zstd)
|
||||
.send_compressed(CompressionEncoding::Gzip)
|
||||
.send_compressed(CompressionEncoding::Zstd);
|
||||
|
||||
let metrics_server = ServiceBuilder::new()
|
||||
.layer(AuthMiddlewareLayer::with(user_provider))
|
||||
.service(metrics_service);
|
||||
self.routes_builder.add_service(metrics_server);
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
pub fn routes_builder_mut(&mut self) -> &mut RoutesBuilder {
|
||||
&mut self.routes_builder
|
||||
}
|
||||
|
||||
@@ -205,7 +205,7 @@ pub async fn delete_pipeline(
|
||||
reason: "version is required",
|
||||
})?;
|
||||
|
||||
let version = to_pipeline_version(Some(version_str.clone())).context(PipelineSnafu)?;
|
||||
let version = to_pipeline_version(Some(&version_str)).context(PipelineSnafu)?;
|
||||
|
||||
query_ctx.set_channel(Channel::Http);
|
||||
let query_ctx = Arc::new(query_ctx);
|
||||
@@ -445,8 +445,8 @@ pub async fn pipeline_dryrun(
|
||||
|
||||
match params.pipeline {
|
||||
None => {
|
||||
let version =
|
||||
to_pipeline_version(params.pipeline_version).context(PipelineSnafu)?;
|
||||
let version = to_pipeline_version(params.pipeline_version.as_deref())
|
||||
.context(PipelineSnafu)?;
|
||||
let pipeline_name = check_pipeline_name_exists(params.pipeline_name)?;
|
||||
let pipeline = handler
|
||||
.get_pipeline(&pipeline_name, version, query_ctx.clone())
|
||||
@@ -486,7 +486,8 @@ pub async fn pipeline_dryrun(
|
||||
// is specified using query param.
|
||||
let pipeline_name = check_pipeline_name_exists(query_params.pipeline_name)?;
|
||||
|
||||
let version = to_pipeline_version(query_params.version).context(PipelineSnafu)?;
|
||||
let version =
|
||||
to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
|
||||
|
||||
let ignore_errors = query_params.ignore_errors.unwrap_or(false);
|
||||
|
||||
@@ -532,7 +533,7 @@ pub async fn log_ingester(
|
||||
reason: "table is required",
|
||||
})?;
|
||||
|
||||
let version = to_pipeline_version(query_params.version).context(PipelineSnafu)?;
|
||||
let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
|
||||
|
||||
let ignore_errors = query_params.ignore_errors.unwrap_or(false);
|
||||
|
||||
|
||||
@@ -34,11 +34,11 @@ use crate::error::{
|
||||
};
|
||||
use crate::http::HttpRecordsOutput;
|
||||
use crate::metrics::METRIC_JAEGER_QUERY_ELAPSED;
|
||||
use crate::otlp::trace::{
|
||||
use crate::otlp::trace::v0::{
|
||||
DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_ID_COLUMN,
|
||||
SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
|
||||
TRACE_TABLE_NAME,
|
||||
};
|
||||
use crate::otlp::trace::TRACE_TABLE_NAME;
|
||||
use crate::query_handler::JaegerQueryHandlerRef;
|
||||
|
||||
/// JaegerAPIResponse is the response of Jaeger HTTP API.
|
||||
|
||||
@@ -29,8 +29,7 @@ use opentelemetry_proto::tonic::collector::metrics::v1::{
|
||||
use opentelemetry_proto::tonic::collector::trace::v1::{
|
||||
ExportTraceServiceRequest, ExportTraceServiceResponse,
|
||||
};
|
||||
use pipeline::util::to_pipeline_version;
|
||||
use pipeline::{PipelineDefinition, PipelineWay};
|
||||
use pipeline::PipelineWay;
|
||||
use prost::Message;
|
||||
use session::context::{Channel, QueryContext};
|
||||
use snafu::prelude::*;
|
||||
@@ -75,6 +74,7 @@ pub async fn metrics(
|
||||
pub async fn traces(
|
||||
State(handler): State<OpenTelemetryProtocolHandlerRef>,
|
||||
TraceTableName(table_name): TraceTableName,
|
||||
pipeline_info: PipelineInfo,
|
||||
Extension(mut query_ctx): Extension<QueryContext>,
|
||||
bytes: Bytes,
|
||||
) -> Result<OtlpResponse<ExportTraceServiceResponse>> {
|
||||
@@ -88,8 +88,29 @@ pub async fn traces(
|
||||
.start_timer();
|
||||
let request =
|
||||
ExportTraceServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?;
|
||||
|
||||
let pipeline = PipelineWay::from_name_and_default(
|
||||
pipeline_info.pipeline_name.as_deref(),
|
||||
pipeline_info.pipeline_version.as_deref(),
|
||||
PipelineWay::OtlpTraceDirectV0,
|
||||
)
|
||||
.context(PipelineSnafu)?;
|
||||
|
||||
let pipeline_params = pipeline_info.pipeline_params;
|
||||
|
||||
// here we use nightly feature `trait_upcasting` to convert handler to
|
||||
// pipeline_handler
|
||||
let pipeline_handler: Arc<dyn PipelineHandler + Send + Sync> = handler.clone();
|
||||
|
||||
handler
|
||||
.traces(request, table_name, query_ctx)
|
||||
.traces(
|
||||
pipeline_handler,
|
||||
request,
|
||||
pipeline,
|
||||
pipeline_params,
|
||||
table_name,
|
||||
query_ctx,
|
||||
)
|
||||
.await
|
||||
.map(|o| OtlpResponse {
|
||||
resp_body: ExportTraceServiceResponse {
|
||||
@@ -118,15 +139,12 @@ pub async fn logs(
|
||||
.start_timer();
|
||||
let request = ExportLogsServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?;
|
||||
|
||||
let pipeline = if let Some(pipeline_name) = pipeline_info.pipeline_name {
|
||||
PipelineWay::Pipeline(PipelineDefinition::from_name(
|
||||
&pipeline_name,
|
||||
to_pipeline_version(pipeline_info.pipeline_version).context(PipelineSnafu)?,
|
||||
))
|
||||
} else {
|
||||
PipelineWay::OtlpLogDirect(Box::new(select_info))
|
||||
};
|
||||
|
||||
let pipeline = PipelineWay::from_name_and_default(
|
||||
pipeline_info.pipeline_name.as_deref(),
|
||||
pipeline_info.pipeline_version.as_deref(),
|
||||
PipelineWay::OtlpLogDirect(Box::new(select_info)),
|
||||
)
|
||||
.context(PipelineSnafu)?;
|
||||
let pipeline_params = pipeline_info.pipeline_params;
|
||||
|
||||
// here we use nightly feature `trait_upcasting` to convert handler to
|
||||
|
||||
@@ -32,7 +32,8 @@ use snafu::{ensure, ResultExt};
|
||||
use super::trace::attributes::OtlpAnyValue;
|
||||
use super::utils::{bytes_to_hex_string, key_value_to_jsonb};
|
||||
use crate::error::{
|
||||
IncompatibleSchemaSnafu, PipelineTransformSnafu, Result, UnsupportedJsonDataTypeForTagSnafu,
|
||||
IncompatibleSchemaSnafu, NotSupportedSnafu, PipelineTransformSnafu, Result,
|
||||
UnsupportedJsonDataTypeForTagSnafu,
|
||||
};
|
||||
use crate::pipeline::run_pipeline;
|
||||
use crate::query_handler::PipelineHandlerRef;
|
||||
@@ -98,6 +99,10 @@ pub async fn to_grpc_insert_requests(
|
||||
let insert_requests = RowInsertRequests { inserts };
|
||||
Ok((insert_requests, len))
|
||||
}
|
||||
_ => NotSupportedSnafu {
|
||||
feat: "Unsupported pipeline for logs",
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,183 +12,42 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{ColumnDataType, RowInsertRequests};
|
||||
use common_grpc::precision::Precision;
|
||||
use itertools::Itertools;
|
||||
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
|
||||
use opentelemetry_proto::tonic::common::v1::any_value;
|
||||
|
||||
use self::span::{parse_span, TraceSpan, TraceSpans};
|
||||
use crate::error::Result;
|
||||
use crate::otlp::utils::{make_column_data, make_string_column_data};
|
||||
use crate::row_writer::{self, MultiTableData, TableData};
|
||||
|
||||
const APPROXIMATE_COLUMN_COUNT: usize = 24;
|
||||
|
||||
pub const TRACE_TABLE_NAME: &str = "opentelemetry_traces";
|
||||
pub const SERVICE_NAME_COLUMN: &str = "service_name";
|
||||
pub const TRACE_ID_COLUMN: &str = "trace_id";
|
||||
pub const TIMESTAMP_COLUMN: &str = "timestamp";
|
||||
pub const DURATION_NANO_COLUMN: &str = "duration_nano";
|
||||
pub const SPAN_ID_COLUMN: &str = "span_id";
|
||||
pub const SPAN_NAME_COLUMN: &str = "span_name";
|
||||
pub const SPAN_KIND_COLUMN: &str = "span_kind";
|
||||
pub const SPAN_ATTRIBUTES_COLUMN: &str = "span_attributes";
|
||||
|
||||
/// The span kind prefix in the database.
|
||||
/// If the span kind is `server`, it will be stored as `SPAN_KIND_SERVER` in the database.
|
||||
pub const SPAN_KIND_PREFIX: &str = "SPAN_KIND_";
|
||||
|
||||
pub mod attributes;
|
||||
pub mod span;
|
||||
pub mod v0;
|
||||
|
||||
/// Convert OpenTelemetry traces to SpanTraces
|
||||
///
|
||||
/// See
|
||||
/// <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto>
|
||||
/// for data structure of OTLP traces.
|
||||
pub fn parse(request: ExportTraceServiceRequest) -> TraceSpans {
|
||||
let span_size = request
|
||||
.resource_spans
|
||||
.iter()
|
||||
.flat_map(|res| res.scope_spans.iter())
|
||||
.flat_map(|scope| scope.spans.iter())
|
||||
.count();
|
||||
let mut spans = Vec::with_capacity(span_size);
|
||||
for resource_spans in request.resource_spans {
|
||||
let resource_attrs = resource_spans
|
||||
.resource
|
||||
.map(|r| r.attributes)
|
||||
.unwrap_or_default();
|
||||
let service_name = resource_attrs
|
||||
.iter()
|
||||
.find_or_first(|kv| kv.key == "service.name")
|
||||
.and_then(|kv| kv.value.clone())
|
||||
.and_then(|v| match v.value {
|
||||
Some(any_value::Value::StringValue(s)) => Some(s),
|
||||
Some(any_value::Value::BytesValue(b)) => {
|
||||
Some(String::from_utf8_lossy(&b).to_string())
|
||||
}
|
||||
_ => None,
|
||||
});
|
||||
use api::v1::RowInsertRequests;
|
||||
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
|
||||
use pipeline::{GreptimePipelineParams, PipelineWay};
|
||||
use session::context::QueryContextRef;
|
||||
|
||||
for scope_spans in resource_spans.scope_spans {
|
||||
let scope = scope_spans.scope.unwrap_or_default();
|
||||
for span in scope_spans.spans {
|
||||
spans.push(parse_span(
|
||||
service_name.clone(),
|
||||
&resource_attrs,
|
||||
&scope,
|
||||
span,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
spans
|
||||
}
|
||||
use crate::error::{NotSupportedSnafu, Result};
|
||||
use crate::query_handler::PipelineHandlerRef;
|
||||
|
||||
pub const TRACE_TABLE_NAME: &str = "opentelemetry_traces";
|
||||
|
||||
/// Convert SpanTraces to GreptimeDB row insert requests.
|
||||
/// Returns `InsertRequests` and total number of rows to ingest
|
||||
pub fn to_grpc_insert_requests(
|
||||
request: ExportTraceServiceRequest,
|
||||
pipeline: PipelineWay,
|
||||
pipeline_params: GreptimePipelineParams,
|
||||
table_name: String,
|
||||
spans: TraceSpans,
|
||||
query_ctx: &QueryContextRef,
|
||||
pipeline_handler: PipelineHandlerRef,
|
||||
) -> Result<(RowInsertRequests, usize)> {
|
||||
let mut multi_table_writer = MultiTableData::default();
|
||||
let one_table_writer = multi_table_writer.get_or_default_table_data(
|
||||
table_name,
|
||||
APPROXIMATE_COLUMN_COUNT,
|
||||
spans.len(),
|
||||
);
|
||||
|
||||
for span in spans {
|
||||
write_span_to_row(one_table_writer, span)?;
|
||||
}
|
||||
|
||||
Ok(multi_table_writer.into_row_insert_requests())
|
||||
}
|
||||
|
||||
pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> {
|
||||
let mut row = writer.alloc_one_row();
|
||||
|
||||
// write ts
|
||||
row_writer::write_ts_to_nanos(
|
||||
writer,
|
||||
"timestamp",
|
||||
Some(span.start_in_nanosecond as i64),
|
||||
Precision::Nanosecond,
|
||||
&mut row,
|
||||
)?;
|
||||
// write ts fields
|
||||
let fields = vec![
|
||||
make_column_data(
|
||||
"timestamp_end",
|
||||
ColumnDataType::TimestampNanosecond,
|
||||
ValueData::TimestampNanosecondValue(span.end_in_nanosecond as i64),
|
||||
match pipeline {
|
||||
PipelineWay::OtlpTraceDirectV0 => v0::v0_to_grpc_insert_requests(
|
||||
request,
|
||||
pipeline,
|
||||
pipeline_params,
|
||||
table_name,
|
||||
query_ctx,
|
||||
pipeline_handler,
|
||||
),
|
||||
make_column_data(
|
||||
"duration_nano",
|
||||
ColumnDataType::Uint64,
|
||||
ValueData::U64Value(span.end_in_nanosecond - span.start_in_nanosecond),
|
||||
),
|
||||
];
|
||||
row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
|
||||
|
||||
if let Some(service_name) = span.service_name {
|
||||
row_writer::write_tag(writer, "service_name", service_name, &mut row)?;
|
||||
_ => NotSupportedSnafu {
|
||||
feat: "Unsupported pipeline for logs",
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
|
||||
// tags
|
||||
let iter = vec![
|
||||
("trace_id", span.trace_id),
|
||||
("span_id", span.span_id),
|
||||
("parent_span_id", span.parent_span_id),
|
||||
]
|
||||
.into_iter()
|
||||
.map(|(col, val)| (col.to_string(), val));
|
||||
row_writer::write_tags(writer, iter, &mut row)?;
|
||||
|
||||
// write fields
|
||||
let fields = vec![
|
||||
make_string_column_data("span_kind", span.span_kind),
|
||||
make_string_column_data("span_name", span.span_name),
|
||||
make_string_column_data("span_status_code", span.span_status_code),
|
||||
make_string_column_data("span_status_message", span.span_status_message),
|
||||
make_string_column_data("trace_state", span.trace_state),
|
||||
];
|
||||
row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
|
||||
|
||||
row_writer::write_json(
|
||||
writer,
|
||||
"span_attributes",
|
||||
span.span_attributes.into(),
|
||||
&mut row,
|
||||
)?;
|
||||
row_writer::write_json(writer, "span_events", span.span_events.into(), &mut row)?;
|
||||
row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?;
|
||||
|
||||
// write fields
|
||||
let fields = vec![
|
||||
make_string_column_data("scope_name", span.scope_name),
|
||||
make_string_column_data("scope_version", span.scope_version),
|
||||
];
|
||||
row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
|
||||
|
||||
row_writer::write_json(
|
||||
writer,
|
||||
"scope_attributes",
|
||||
span.scope_attributes.into(),
|
||||
&mut row,
|
||||
)?;
|
||||
|
||||
row_writer::write_json(
|
||||
writer,
|
||||
"resource_attributes",
|
||||
span.resource_attributes.into(),
|
||||
&mut row,
|
||||
)?;
|
||||
|
||||
writer.add_row(row);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
198
src/servers/src/otlp/trace/v0.rs
Normal file
198
src/servers/src/otlp/trace/v0.rs
Normal file
@@ -0,0 +1,198 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{ColumnDataType, RowInsertRequests};
|
||||
use common_grpc::precision::Precision;
|
||||
use itertools::Itertools;
|
||||
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
|
||||
use opentelemetry_proto::tonic::common::v1::any_value;
|
||||
use pipeline::{GreptimePipelineParams, PipelineWay};
|
||||
use session::context::QueryContextRef;
|
||||
|
||||
use super::span::{parse_span, TraceSpan, TraceSpans};
|
||||
use crate::error::Result;
|
||||
use crate::otlp::utils::{make_column_data, make_string_column_data};
|
||||
use crate::query_handler::PipelineHandlerRef;
|
||||
use crate::row_writer::{self, MultiTableData, TableData};
|
||||
|
||||
const APPROXIMATE_COLUMN_COUNT: usize = 24;
|
||||
|
||||
pub const SERVICE_NAME_COLUMN: &str = "service_name";
|
||||
pub const TRACE_ID_COLUMN: &str = "trace_id";
|
||||
pub const TIMESTAMP_COLUMN: &str = "timestamp";
|
||||
pub const DURATION_NANO_COLUMN: &str = "duration_nano";
|
||||
pub const SPAN_ID_COLUMN: &str = "span_id";
|
||||
pub const SPAN_NAME_COLUMN: &str = "span_name";
|
||||
pub const SPAN_KIND_COLUMN: &str = "span_kind";
|
||||
pub const SPAN_ATTRIBUTES_COLUMN: &str = "span_attributes";
|
||||
|
||||
/// The span kind prefix in the database.
|
||||
/// If the span kind is `server`, it will be stored as `SPAN_KIND_SERVER` in the database.
|
||||
pub const SPAN_KIND_PREFIX: &str = "SPAN_KIND_";
|
||||
|
||||
/// Convert OpenTelemetry traces to SpanTraces
|
||||
///
|
||||
/// See
|
||||
/// <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto>
|
||||
/// for data structure of OTLP traces.
|
||||
pub fn parse(request: ExportTraceServiceRequest) -> TraceSpans {
|
||||
let span_size = request
|
||||
.resource_spans
|
||||
.iter()
|
||||
.flat_map(|res| res.scope_spans.iter())
|
||||
.flat_map(|scope| scope.spans.iter())
|
||||
.count();
|
||||
let mut spans = Vec::with_capacity(span_size);
|
||||
for resource_spans in request.resource_spans {
|
||||
let resource_attrs = resource_spans
|
||||
.resource
|
||||
.map(|r| r.attributes)
|
||||
.unwrap_or_default();
|
||||
let service_name = resource_attrs
|
||||
.iter()
|
||||
.find_or_first(|kv| kv.key == "service.name")
|
||||
.and_then(|kv| kv.value.clone())
|
||||
.and_then(|v| match v.value {
|
||||
Some(any_value::Value::StringValue(s)) => Some(s),
|
||||
Some(any_value::Value::BytesValue(b)) => {
|
||||
Some(String::from_utf8_lossy(&b).to_string())
|
||||
}
|
||||
_ => None,
|
||||
});
|
||||
|
||||
for scope_spans in resource_spans.scope_spans {
|
||||
let scope = scope_spans.scope.unwrap_or_default();
|
||||
for span in scope_spans.spans {
|
||||
spans.push(parse_span(
|
||||
service_name.clone(),
|
||||
&resource_attrs,
|
||||
&scope,
|
||||
span,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
spans
|
||||
}
|
||||
|
||||
/// Convert SpanTraces to GreptimeDB row insert requests.
|
||||
/// Returns `InsertRequests` and total number of rows to ingest
|
||||
pub fn v0_to_grpc_insert_requests(
|
||||
request: ExportTraceServiceRequest,
|
||||
_pipeline: PipelineWay,
|
||||
_pipeline_params: GreptimePipelineParams,
|
||||
table_name: String,
|
||||
_query_ctx: &QueryContextRef,
|
||||
_pipeline_handler: PipelineHandlerRef,
|
||||
) -> Result<(RowInsertRequests, usize)> {
|
||||
let spans = parse(request);
|
||||
let mut multi_table_writer = MultiTableData::default();
|
||||
let one_table_writer = multi_table_writer.get_or_default_table_data(
|
||||
table_name,
|
||||
APPROXIMATE_COLUMN_COUNT,
|
||||
spans.len(),
|
||||
);
|
||||
|
||||
for span in spans {
|
||||
write_span_to_row(one_table_writer, span)?;
|
||||
}
|
||||
|
||||
Ok(multi_table_writer.into_row_insert_requests())
|
||||
}
|
||||
|
||||
pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> {
|
||||
let mut row = writer.alloc_one_row();
|
||||
|
||||
// write ts
|
||||
row_writer::write_ts_to_nanos(
|
||||
writer,
|
||||
"timestamp",
|
||||
Some(span.start_in_nanosecond as i64),
|
||||
Precision::Nanosecond,
|
||||
&mut row,
|
||||
)?;
|
||||
// write ts fields
|
||||
let fields = vec![
|
||||
make_column_data(
|
||||
"timestamp_end",
|
||||
ColumnDataType::TimestampNanosecond,
|
||||
ValueData::TimestampNanosecondValue(span.end_in_nanosecond as i64),
|
||||
),
|
||||
make_column_data(
|
||||
"duration_nano",
|
||||
ColumnDataType::Uint64,
|
||||
ValueData::U64Value(span.end_in_nanosecond - span.start_in_nanosecond),
|
||||
),
|
||||
];
|
||||
row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
|
||||
|
||||
if let Some(service_name) = span.service_name {
|
||||
row_writer::write_tag(writer, "service_name", service_name, &mut row)?;
|
||||
}
|
||||
|
||||
// tags
|
||||
let iter = vec![
|
||||
("trace_id", span.trace_id),
|
||||
("span_id", span.span_id),
|
||||
("parent_span_id", span.parent_span_id),
|
||||
]
|
||||
.into_iter()
|
||||
.map(|(col, val)| (col.to_string(), val));
|
||||
row_writer::write_tags(writer, iter, &mut row)?;
|
||||
|
||||
// write fields
|
||||
let fields = vec![
|
||||
make_string_column_data("span_kind", span.span_kind),
|
||||
make_string_column_data("span_name", span.span_name),
|
||||
make_string_column_data("span_status_code", span.span_status_code),
|
||||
make_string_column_data("span_status_message", span.span_status_message),
|
||||
make_string_column_data("trace_state", span.trace_state),
|
||||
];
|
||||
row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
|
||||
|
||||
row_writer::write_json(
|
||||
writer,
|
||||
"span_attributes",
|
||||
span.span_attributes.into(),
|
||||
&mut row,
|
||||
)?;
|
||||
row_writer::write_json(writer, "span_events", span.span_events.into(), &mut row)?;
|
||||
row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?;
|
||||
|
||||
// write fields
|
||||
let fields = vec![
|
||||
make_string_column_data("scope_name", span.scope_name),
|
||||
make_string_column_data("scope_version", span.scope_version),
|
||||
];
|
||||
row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
|
||||
|
||||
row_writer::write_json(
|
||||
writer,
|
||||
"scope_attributes",
|
||||
span.scope_attributes.into(),
|
||||
&mut row,
|
||||
)?;
|
||||
|
||||
row_writer::write_json(
|
||||
writer,
|
||||
"resource_attributes",
|
||||
span.resource_attributes.into(),
|
||||
&mut row,
|
||||
)?;
|
||||
|
||||
writer.add_row(row);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -107,7 +107,10 @@ pub trait OpenTelemetryProtocolHandler: PipelineHandler {
|
||||
/// Handling opentelemetry traces request
|
||||
async fn traces(
|
||||
&self,
|
||||
pipeline_handler: PipelineHandlerRef,
|
||||
request: ExportTraceServiceRequest,
|
||||
pipeline: PipelineWay,
|
||||
pipeline_params: GreptimePipelineParams,
|
||||
table_name: String,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<Output>;
|
||||
|
||||
@@ -284,3 +284,45 @@ FROM (
|
||||
| [-4,-20,-54] |
|
||||
+-------------------------------+
|
||||
|
||||
SELECT vec_dim('[7.0, 8.0, 9.0, 10.0]');
|
||||
|
||||
+----------------------------------------+
|
||||
| vec_dim(Utf8("[7.0, 8.0, 9.0, 10.0]")) |
|
||||
+----------------------------------------+
|
||||
| 4 |
|
||||
+----------------------------------------+
|
||||
|
||||
SELECT v, vec_dim(v)
|
||||
FROM (
|
||||
SELECT '[1.0, 2.0, 3.0]' AS v
|
||||
UNION ALL
|
||||
SELECT '[-1.0]' AS v
|
||||
UNION ALL
|
||||
SELECT '[4.0, 5.0, 6.0]' AS v
|
||||
) Order By vec_dim(v) ASC;
|
||||
|
||||
+-----------------+------------+
|
||||
| v | vec_dim(v) |
|
||||
+-----------------+------------+
|
||||
| [-1.0] | 1 |
|
||||
| [1.0, 2.0, 3.0] | 3 |
|
||||
| [4.0, 5.0, 6.0] | 3 |
|
||||
+-----------------+------------+
|
||||
|
||||
SELECT v, vec_dim(v)
|
||||
FROM (
|
||||
SELECT '[1.0, 2.0, 3.0]' AS v
|
||||
UNION ALL
|
||||
SELECT '[-1.0]' AS v
|
||||
UNION ALL
|
||||
SELECT '[7.0, 8.0, 9.0, 10.0]' AS v
|
||||
) Order By vec_dim(v) ASC;
|
||||
|
||||
+-----------------------+------------+
|
||||
| v | vec_dim(v) |
|
||||
+-----------------------+------------+
|
||||
| [-1.0] | 1 |
|
||||
| [1.0, 2.0, 3.0] | 3 |
|
||||
| [7.0, 8.0, 9.0, 10.0] | 4 |
|
||||
+-----------------------+------------+
|
||||
|
||||
|
||||
@@ -79,3 +79,23 @@ FROM (
|
||||
UNION ALL
|
||||
SELECT '[4.0, 5.0, 6.0]' AS v
|
||||
);
|
||||
|
||||
SELECT vec_dim('[7.0, 8.0, 9.0, 10.0]');
|
||||
|
||||
SELECT v, vec_dim(v)
|
||||
FROM (
|
||||
SELECT '[1.0, 2.0, 3.0]' AS v
|
||||
UNION ALL
|
||||
SELECT '[-1.0]' AS v
|
||||
UNION ALL
|
||||
SELECT '[4.0, 5.0, 6.0]' AS v
|
||||
) Order By vec_dim(v) ASC;
|
||||
|
||||
SELECT v, vec_dim(v)
|
||||
FROM (
|
||||
SELECT '[1.0, 2.0, 3.0]' AS v
|
||||
UNION ALL
|
||||
SELECT '[-1.0]' AS v
|
||||
UNION ALL
|
||||
SELECT '[7.0, 8.0, 9.0, 10.0]' AS v
|
||||
) Order By vec_dim(v) ASC;
|
||||
|
||||
@@ -3,6 +3,8 @@ Pn = "Pn"
|
||||
ue = "ue"
|
||||
worl = "worl"
|
||||
ot = "ot"
|
||||
typ = "typ"
|
||||
typs = "typs"
|
||||
unqualifed = "unqualifed"
|
||||
|
||||
[files]
|
||||
|
||||
Reference in New Issue
Block a user