mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-25 23:49:58 +00:00
Compare commits
17 Commits
feat/prefi
...
v0.14.0-ni
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4eb0771afe | ||
|
|
a0739a96e4 | ||
|
|
77ccf1eac8 | ||
|
|
1dc4a196bf | ||
|
|
2431cd3bdf | ||
|
|
cd730e0486 | ||
|
|
a19441bed8 | ||
|
|
162e3b8620 | ||
|
|
83642dab87 | ||
|
|
46070958c9 | ||
|
|
eea8b1c730 | ||
|
|
1ab4ddab8d | ||
|
|
9e63018198 | ||
|
|
594bec8c36 | ||
|
|
1586732d20 | ||
|
|
16fddd97a7 | ||
|
|
2260782c12 |
@@ -8,7 +8,7 @@ inputs:
|
||||
default: 2
|
||||
description: "Number of Datanode replicas"
|
||||
meta-replicas:
|
||||
default: 1
|
||||
default: 2
|
||||
description: "Number of Metasrv replicas"
|
||||
image-registry:
|
||||
default: "docker.io"
|
||||
|
||||
5
.github/workflows/develop.yml
vendored
5
.github/workflows/develop.yml
vendored
@@ -576,9 +576,12 @@ jobs:
|
||||
- name: "Remote WAL"
|
||||
opts: "-w kafka -k 127.0.0.1:9092"
|
||||
kafka: true
|
||||
- name: "Pg Kvbackend"
|
||||
- name: "PostgreSQL KvBackend"
|
||||
opts: "--setup-pg"
|
||||
kafka: false
|
||||
- name: "MySQL Kvbackend"
|
||||
opts: "--setup-mysql"
|
||||
kafka: false
|
||||
timeout-minutes: 60
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
28
Cargo.lock
generated
28
Cargo.lock
generated
@@ -4119,12 +4119,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "flate2"
|
||||
version = "1.1.0"
|
||||
version = "1.0.34"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "11faaf5a5236997af9848be0bef4db95824b1d534ebc64d0f0c6cf3e67bd38dc"
|
||||
checksum = "a1b589b4dc103969ad3cf85c950899926ec64300a1a46d76c03a6072957036f0"
|
||||
dependencies = [
|
||||
"crc32fast",
|
||||
"libz-rs-sys",
|
||||
"libz-sys",
|
||||
"miniz_oxide",
|
||||
]
|
||||
@@ -4706,7 +4705,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=c5419bbd20cb42e568ec325a4d71a3c94cc327e1#c5419bbd20cb42e568ec325a4d71a3c94cc327e1"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a7274ddce299f33d23dbe8af5bbe6219f07c559a#a7274ddce299f33d23dbe8af5bbe6219f07c559a"
|
||||
dependencies = [
|
||||
"prost 0.13.3",
|
||||
"serde",
|
||||
@@ -6279,15 +6278,6 @@ dependencies = [
|
||||
"vcpkg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libz-rs-sys"
|
||||
version = "0.4.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "902bc563b5d65ad9bba616b490842ef0651066a1a1dc3ce1087113ffcb873c8d"
|
||||
dependencies = [
|
||||
"zlib-rs",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libz-sys"
|
||||
version = "1.1.20"
|
||||
@@ -6730,6 +6720,7 @@ dependencies = [
|
||||
"servers",
|
||||
"session",
|
||||
"snafu 0.8.5",
|
||||
"sqlx",
|
||||
"store-api",
|
||||
"strum 0.25.0",
|
||||
"table",
|
||||
@@ -6832,9 +6823,9 @@ checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
|
||||
|
||||
[[package]]
|
||||
name = "miniz_oxide"
|
||||
version = "0.8.5"
|
||||
version = "0.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8e3e04debbb59698c15bacbb6d93584a8c0ca9cc3213cb423d31f760d8843ce5"
|
||||
checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1"
|
||||
dependencies = [
|
||||
"adler2",
|
||||
]
|
||||
@@ -11930,6 +11921,7 @@ dependencies = [
|
||||
"operator",
|
||||
"partition",
|
||||
"paste",
|
||||
"pipeline",
|
||||
"prost 0.13.3",
|
||||
"query",
|
||||
"rand",
|
||||
@@ -13964,12 +13956,6 @@ dependencies = [
|
||||
"syn 2.0.96",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zlib-rs"
|
||||
version = "0.4.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8b20717f0917c908dc63de2e44e97f1e6b126ca58d0e391cee86d504eb8fbd05"
|
||||
|
||||
[[package]]
|
||||
name = "zstd"
|
||||
version = "0.11.2+zstd.1.5.2"
|
||||
|
||||
@@ -126,11 +126,10 @@ deadpool-postgres = "0.12"
|
||||
derive_builder = "0.12"
|
||||
dotenv = "0.15"
|
||||
etcd-client = "0.14"
|
||||
flate2 = { version = "1.1.0", default-features = false, features = ["zlib-rs"] }
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "c5419bbd20cb42e568ec325a4d71a3c94cc327e1" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a7274ddce299f33d23dbe8af5bbe6219f07c559a" }
|
||||
hex = "0.4"
|
||||
http = "1"
|
||||
humantime = "2.1"
|
||||
@@ -192,6 +191,8 @@ snafu = "0.8"
|
||||
sqlx = { version = "0.8", features = [
|
||||
"runtime-tokio-rustls",
|
||||
"mysql",
|
||||
"postgres",
|
||||
"chrono",
|
||||
] }
|
||||
sysinfo = "0.30"
|
||||
# on branch v0.52.x
|
||||
|
||||
12
README.md
12
README.md
@@ -6,7 +6,7 @@
|
||||
</picture>
|
||||
</p>
|
||||
|
||||
<h2 align="center">Unified & Cost-Effective Time Series Database for Metrics, Logs, and Events</h2>
|
||||
<h2 align="center">Unified & Cost-Effective Observerability Database for Metrics, Logs, and Events</h2>
|
||||
|
||||
<div align="center">
|
||||
<h3 align="center">
|
||||
@@ -62,15 +62,19 @@
|
||||
|
||||
## Introduction
|
||||
|
||||
**GreptimeDB** is an open-source unified & cost-effective time-series database for **Metrics**, **Logs**, and **Events** (also **Traces** in plan). You can gain real-time insights from Edge to Cloud at Any Scale.
|
||||
**GreptimeDB** is an open-source unified & cost-effective observerability database for **Metrics**, **Logs**, and **Events** (also **Traces** in plan). You can gain real-time insights from Edge to Cloud at Any Scale.
|
||||
|
||||
## News
|
||||
|
||||
**[GreptimeDB archives 1 billion cold run #1 in JSONBench!](https://greptime.com/blogs/2025-03-18-jsonbench-greptimedb-performance)**
|
||||
|
||||
## Why GreptimeDB
|
||||
|
||||
Our core developers have been building time-series data platforms for years. Based on our best practices, GreptimeDB was born to give you:
|
||||
Our core developers have been building observerability data platforms for years. Based on our best practices, GreptimeDB was born to give you:
|
||||
|
||||
* **Unified Processing of Metrics, Logs, and Events**
|
||||
|
||||
GreptimeDB unifies time series data processing by treating all data - whether metrics, logs, or events - as timestamped events with context. Users can analyze this data using either [SQL](https://docs.greptime.com/user-guide/query-data/sql) or [PromQL](https://docs.greptime.com/user-guide/query-data/promql) and leverage stream processing ([Flow](https://docs.greptime.com/user-guide/flow-computation/overview)) to enable continuous aggregation. [Read more](https://docs.greptime.com/user-guide/concepts/data-model).
|
||||
GreptimeDB unifies observerability data processing by treating all data - whether metrics, logs, or events - as timestamped events with context. Users can analyze this data using either [SQL](https://docs.greptime.com/user-guide/query-data/sql) or [PromQL](https://docs.greptime.com/user-guide/query-data/promql) and leverage stream processing ([Flow](https://docs.greptime.com/user-guide/flow-computation/overview)) to enable continuous aggregation. [Read more](https://docs.greptime.com/user-guide/concepts/data-model).
|
||||
|
||||
* **Cloud-native Distributed Database**
|
||||
|
||||
|
||||
@@ -24,7 +24,7 @@
|
||||
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
|
||||
| `http` | -- | -- | The HTTP server options. |
|
||||
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
|
||||
| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
|
||||
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
|
||||
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
|
||||
| `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default<br/>This allows browser to access http APIs without CORS restrictions |
|
||||
| `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. |
|
||||
@@ -222,7 +222,7 @@
|
||||
| `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. |
|
||||
| `http` | -- | -- | The HTTP server options. |
|
||||
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
|
||||
| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
|
||||
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
|
||||
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
|
||||
| `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default<br/>This allows browser to access http APIs without CORS restrictions |
|
||||
| `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. |
|
||||
@@ -390,7 +390,7 @@
|
||||
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. Enabled by default. |
|
||||
| `http` | -- | -- | The HTTP server options. |
|
||||
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
|
||||
| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
|
||||
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
|
||||
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
|
||||
| `grpc` | -- | -- | The gRPC server options. |
|
||||
| `grpc.bind_addr` | String | `127.0.0.1:3001` | The address to bind the gRPC server. |
|
||||
@@ -563,7 +563,7 @@
|
||||
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
|
||||
| `http` | -- | -- | The HTTP server options. |
|
||||
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
|
||||
| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
|
||||
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
|
||||
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
|
||||
| `meta_client` | -- | -- | The metasrv client options. |
|
||||
| `meta_client.metasrv_addrs` | Array | -- | The addresses of the metasrv. |
|
||||
|
||||
@@ -27,7 +27,7 @@ max_concurrent_queries = 0
|
||||
## The address to bind the HTTP server.
|
||||
addr = "127.0.0.1:4000"
|
||||
## HTTP request timeout. Set to 0 to disable timeout.
|
||||
timeout = "30s"
|
||||
timeout = "0s"
|
||||
## HTTP request body limit.
|
||||
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
|
||||
## Set to 0 to disable limit.
|
||||
|
||||
@@ -30,7 +30,7 @@ max_send_message_size = "512MB"
|
||||
## The address to bind the HTTP server.
|
||||
addr = "127.0.0.1:4000"
|
||||
## HTTP request timeout. Set to 0 to disable timeout.
|
||||
timeout = "30s"
|
||||
timeout = "0s"
|
||||
## HTTP request body limit.
|
||||
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
|
||||
## Set to 0 to disable limit.
|
||||
|
||||
@@ -26,7 +26,7 @@ retry_interval = "3s"
|
||||
## The address to bind the HTTP server.
|
||||
addr = "127.0.0.1:4000"
|
||||
## HTTP request timeout. Set to 0 to disable timeout.
|
||||
timeout = "30s"
|
||||
timeout = "0s"
|
||||
## HTTP request body limit.
|
||||
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
|
||||
## Set to 0 to disable limit.
|
||||
|
||||
@@ -34,7 +34,7 @@ max_concurrent_queries = 0
|
||||
## The address to bind the HTTP server.
|
||||
addr = "127.0.0.1:4000"
|
||||
## HTTP request timeout. Set to 0 to disable timeout.
|
||||
timeout = "30s"
|
||||
timeout = "0s"
|
||||
## HTTP request body limit.
|
||||
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
|
||||
## Set to 0 to disable limit.
|
||||
|
||||
@@ -4782,7 +4782,7 @@
|
||||
"type": "prometheus",
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"description": "Current counts for stalled write requests by instance\n\nWrite stalls when memtable is full and pending for flush\n\n",
|
||||
"description": "Ingestion size by row counts.",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
@@ -4844,7 +4844,7 @@
|
||||
"x": 12,
|
||||
"y": 138
|
||||
},
|
||||
"id": 221,
|
||||
"id": 277,
|
||||
"options": {
|
||||
"legend": {
|
||||
"calcs": [],
|
||||
@@ -4864,14 +4864,14 @@
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "sum by(pod) (greptime_mito_write_stall_total{pod=~\"$datanode\"})",
|
||||
"expr": "rate(greptime_mito_write_rows_total{pod=~\"$datanode\"}[$__rate_interval])",
|
||||
"instant": false,
|
||||
"legendFormat": "{{pod}}",
|
||||
"range": true,
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"title": "Write Stall per Instance",
|
||||
"title": "Write Rows per Instance",
|
||||
"type": "timeseries"
|
||||
},
|
||||
{
|
||||
@@ -4976,7 +4976,7 @@
|
||||
"type": "prometheus",
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"description": "Cache size by instance.\n",
|
||||
"description": "Current counts for stalled write requests by instance\n\nWrite stalls when memtable is full and pending for flush\n\n",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
@@ -5028,7 +5028,7 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
"unit": "decbytes"
|
||||
"unit": "none"
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
@@ -5038,7 +5038,7 @@
|
||||
"x": 12,
|
||||
"y": 146
|
||||
},
|
||||
"id": 229,
|
||||
"id": 221,
|
||||
"options": {
|
||||
"legend": {
|
||||
"calcs": [],
|
||||
@@ -5058,14 +5058,14 @@
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "greptime_mito_cache_bytes{pod=~\"$datanode\"}",
|
||||
"expr": "sum by(pod) (greptime_mito_write_stall_total{pod=~\"$datanode\"})",
|
||||
"instant": false,
|
||||
"legendFormat": "{{pod}}-{{type}}",
|
||||
"legendFormat": "{{pod}}",
|
||||
"range": true,
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"title": "Cached Bytes per Instance",
|
||||
"title": "Write Stall per Instance",
|
||||
"type": "timeseries"
|
||||
},
|
||||
{
|
||||
@@ -5172,7 +5172,7 @@
|
||||
"type": "prometheus",
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"description": "P99 latency of each type of reads by instance",
|
||||
"description": "Cache size by instance.\n",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
@@ -5224,7 +5224,7 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
"unit": "s"
|
||||
"unit": "decbytes"
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
@@ -5234,17 +5234,13 @@
|
||||
"x": 12,
|
||||
"y": 154
|
||||
},
|
||||
"id": 228,
|
||||
"id": 229,
|
||||
"options": {
|
||||
"legend": {
|
||||
"calcs": [
|
||||
"lastNotNull"
|
||||
],
|
||||
"calcs": [],
|
||||
"displayMode": "table",
|
||||
"placement": "bottom",
|
||||
"showLegend": true,
|
||||
"sortBy": "Last *",
|
||||
"sortDesc": true
|
||||
"showLegend": true
|
||||
},
|
||||
"tooltip": {
|
||||
"mode": "single",
|
||||
@@ -5258,14 +5254,14 @@
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.99, sum by(pod, le, stage) (rate(greptime_mito_read_stage_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
|
||||
"expr": "greptime_mito_cache_bytes{pod=~\"$datanode\"}",
|
||||
"instant": false,
|
||||
"legendFormat": "{{pod}}-{{stage}}-p99",
|
||||
"legendFormat": "{{pod}}-{{type}}",
|
||||
"range": true,
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"title": "Read Stage P99 per Instance",
|
||||
"title": "Cached Bytes per Instance",
|
||||
"type": "timeseries"
|
||||
},
|
||||
{
|
||||
@@ -5317,7 +5313,8 @@
|
||||
"mode": "absolute",
|
||||
"steps": [
|
||||
{
|
||||
"color": "green"
|
||||
"color": "green",
|
||||
"value": null
|
||||
},
|
||||
{
|
||||
"color": "red",
|
||||
@@ -5370,7 +5367,7 @@
|
||||
"type": "prometheus",
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"description": "Latency of compaction task, at p99",
|
||||
"description": "P99 latency of each type of reads by instance",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
@@ -5414,7 +5411,8 @@
|
||||
"mode": "absolute",
|
||||
"steps": [
|
||||
{
|
||||
"color": "green"
|
||||
"color": "green",
|
||||
"value": null
|
||||
},
|
||||
{
|
||||
"color": "red",
|
||||
@@ -5432,7 +5430,7 @@
|
||||
"x": 12,
|
||||
"y": 162
|
||||
},
|
||||
"id": 230,
|
||||
"id": 228,
|
||||
"options": {
|
||||
"legend": {
|
||||
"calcs": [
|
||||
@@ -5440,7 +5438,9 @@
|
||||
],
|
||||
"displayMode": "table",
|
||||
"placement": "bottom",
|
||||
"showLegend": true
|
||||
"showLegend": true,
|
||||
"sortBy": "Last *",
|
||||
"sortDesc": true
|
||||
},
|
||||
"tooltip": {
|
||||
"mode": "single",
|
||||
@@ -5454,14 +5454,14 @@
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.99, sum by(pod, le) (rate(greptime_mito_compaction_total_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
|
||||
"expr": "histogram_quantile(0.99, sum by(pod, le, stage) (rate(greptime_mito_read_stage_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
|
||||
"instant": false,
|
||||
"legendFormat": "[{{pod}}]-compaction-p99",
|
||||
"legendFormat": "{{pod}}-{{stage}}-p99",
|
||||
"range": true,
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"title": "Compaction P99 per Instance",
|
||||
"title": "Read Stage P99 per Instance",
|
||||
"type": "timeseries"
|
||||
},
|
||||
{
|
||||
@@ -5570,7 +5570,7 @@
|
||||
"type": "prometheus",
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"description": "Compaction latency by stage",
|
||||
"description": "Latency of compaction task, at p99",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
@@ -5632,7 +5632,7 @@
|
||||
"x": 12,
|
||||
"y": 170
|
||||
},
|
||||
"id": 232,
|
||||
"id": 230,
|
||||
"options": {
|
||||
"legend": {
|
||||
"calcs": [
|
||||
@@ -5654,9 +5654,9 @@
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.99, sum by(pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
|
||||
"expr": "histogram_quantile(0.99, sum by(pod, le) (rate(greptime_mito_compaction_total_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
|
||||
"instant": false,
|
||||
"legendFormat": "{{pod}}-{{stage}}-p99",
|
||||
"legendFormat": "[{{pod}}]-compaction-p99",
|
||||
"range": true,
|
||||
"refId": "A"
|
||||
}
|
||||
@@ -5794,7 +5794,7 @@
|
||||
"type": "prometheus",
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"description": "Write-ahead log operations latency at p99",
|
||||
"description": "Compaction latency by stage",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
@@ -5856,13 +5856,13 @@
|
||||
"x": 12,
|
||||
"y": 178
|
||||
},
|
||||
"id": 269,
|
||||
"id": 232,
|
||||
"options": {
|
||||
"legend": {
|
||||
"calcs": [
|
||||
"lastNotNull"
|
||||
],
|
||||
"displayMode": "list",
|
||||
"displayMode": "table",
|
||||
"placement": "bottom",
|
||||
"showLegend": true
|
||||
},
|
||||
@@ -5878,14 +5878,14 @@
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.99, sum by(le,logstore,optype,pod) (rate(greptime_logstore_op_elapsed_bucket[$__rate_interval])))",
|
||||
"expr": "histogram_quantile(0.99, sum by(pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
|
||||
"instant": false,
|
||||
"legendFormat": "{{pod}}-{{logstore}}-{{optype}}-p99",
|
||||
"legendFormat": "{{pod}}-{{stage}}-p99",
|
||||
"range": true,
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"title": "Log Store op duration seconds",
|
||||
"title": "Compaction P99 per Instance",
|
||||
"type": "timeseries"
|
||||
},
|
||||
{
|
||||
@@ -5993,7 +5993,7 @@
|
||||
"type": "prometheus",
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"description": "Ongoing compaction task count",
|
||||
"description": "Write-ahead log operations latency at p99",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
@@ -6045,7 +6045,7 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
"unit": "none"
|
||||
"unit": "s"
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
@@ -6055,13 +6055,13 @@
|
||||
"x": 12,
|
||||
"y": 186
|
||||
},
|
||||
"id": 271,
|
||||
"id": 269,
|
||||
"options": {
|
||||
"legend": {
|
||||
"calcs": [
|
||||
"lastNotNull"
|
||||
],
|
||||
"displayMode": "table",
|
||||
"displayMode": "list",
|
||||
"placement": "bottom",
|
||||
"showLegend": true
|
||||
},
|
||||
@@ -6078,14 +6078,14 @@
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "greptime_mito_inflight_compaction_count",
|
||||
"expr": "histogram_quantile(0.99, sum by(le,logstore,optype,pod) (rate(greptime_logstore_op_elapsed_bucket[$__rate_interval])))",
|
||||
"instant": false,
|
||||
"legendFormat": "{{pod}}",
|
||||
"legendFormat": "{{pod}}-{{logstore}}-{{optype}}-p99",
|
||||
"range": true,
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"title": "Inflight Compaction",
|
||||
"title": "Log Store op duration seconds",
|
||||
"type": "timeseries"
|
||||
},
|
||||
{
|
||||
@@ -6188,6 +6188,105 @@
|
||||
"title": "Inflight Flush",
|
||||
"type": "timeseries"
|
||||
},
|
||||
{
|
||||
"datasource": {
|
||||
"type": "prometheus",
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"description": "Ongoing compaction task count",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
"mode": "palette-classic"
|
||||
},
|
||||
"custom": {
|
||||
"axisBorderShow": false,
|
||||
"axisCenteredZero": false,
|
||||
"axisColorMode": "text",
|
||||
"axisLabel": "",
|
||||
"axisPlacement": "auto",
|
||||
"barAlignment": 0,
|
||||
"drawStyle": "points",
|
||||
"fillOpacity": 0,
|
||||
"gradientMode": "none",
|
||||
"hideFrom": {
|
||||
"legend": false,
|
||||
"tooltip": false,
|
||||
"viz": false
|
||||
},
|
||||
"insertNulls": false,
|
||||
"lineInterpolation": "linear",
|
||||
"lineWidth": 1,
|
||||
"pointSize": 5,
|
||||
"scaleDistribution": {
|
||||
"type": "linear"
|
||||
},
|
||||
"showPoints": "auto",
|
||||
"spanNulls": false,
|
||||
"stacking": {
|
||||
"group": "A",
|
||||
"mode": "none"
|
||||
},
|
||||
"thresholdsStyle": {
|
||||
"mode": "off"
|
||||
}
|
||||
},
|
||||
"mappings": [],
|
||||
"thresholds": {
|
||||
"mode": "absolute",
|
||||
"steps": [
|
||||
{
|
||||
"color": "green"
|
||||
},
|
||||
{
|
||||
"color": "red",
|
||||
"value": 80
|
||||
}
|
||||
]
|
||||
},
|
||||
"unit": "none"
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 8,
|
||||
"w": 12,
|
||||
"x": 12,
|
||||
"y": 194
|
||||
},
|
||||
"id": 271,
|
||||
"options": {
|
||||
"legend": {
|
||||
"calcs": [
|
||||
"lastNotNull"
|
||||
],
|
||||
"displayMode": "table",
|
||||
"placement": "bottom",
|
||||
"showLegend": true
|
||||
},
|
||||
"tooltip": {
|
||||
"mode": "single",
|
||||
"sort": "none"
|
||||
}
|
||||
},
|
||||
"pluginVersion": "11.1.3",
|
||||
"targets": [
|
||||
{
|
||||
"datasource": {
|
||||
"type": "prometheus",
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "greptime_mito_inflight_compaction_count",
|
||||
"instant": false,
|
||||
"legendFormat": "{{pod}}",
|
||||
"range": true,
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"title": "Inflight Compaction",
|
||||
"type": "timeseries"
|
||||
},
|
||||
{
|
||||
"collapsed": false,
|
||||
"gridPos": {
|
||||
|
||||
@@ -440,7 +440,7 @@ mod tests {
|
||||
|
||||
[http]
|
||||
addr = "127.0.0.1:4000"
|
||||
timeout = "30s"
|
||||
timeout = "0s"
|
||||
body_limit = "2GB"
|
||||
|
||||
[opentsdb]
|
||||
@@ -461,7 +461,7 @@ mod tests {
|
||||
let fe_opts = command.load_options(&Default::default()).unwrap().component;
|
||||
|
||||
assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
|
||||
assert_eq!(Duration::from_secs(30), fe_opts.http.timeout);
|
||||
assert_eq!(Duration::from_secs(0), fe_opts.http.timeout);
|
||||
|
||||
assert_eq!(ReadableSize::gb(2), fe_opts.http.body_limit);
|
||||
|
||||
|
||||
@@ -135,5 +135,6 @@ pub fn is_readonly_schema(schema: &str) -> bool {
|
||||
pub const TRACE_ID_COLUMN: &str = "trace_id";
|
||||
pub const SPAN_ID_COLUMN: &str = "span_id";
|
||||
pub const SPAN_NAME_COLUMN: &str = "span_name";
|
||||
pub const SERVICE_NAME_COLUMN: &str = "service_name";
|
||||
pub const PARENT_SPAN_ID_COLUMN: &str = "parent_span_id";
|
||||
// ---- End of special table and fields ----
|
||||
|
||||
@@ -24,6 +24,7 @@ pub(crate) mod sum;
|
||||
mod vector_add;
|
||||
mod vector_dim;
|
||||
mod vector_div;
|
||||
mod vector_kth_elem;
|
||||
mod vector_mul;
|
||||
mod vector_norm;
|
||||
mod vector_sub;
|
||||
@@ -57,6 +58,7 @@ impl VectorFunction {
|
||||
registry.register(Arc::new(vector_div::VectorDivFunction));
|
||||
registry.register(Arc::new(vector_norm::VectorNormFunction));
|
||||
registry.register(Arc::new(vector_dim::VectorDimFunction));
|
||||
registry.register(Arc::new(vector_kth_elem::VectorKthElemFunction));
|
||||
registry.register(Arc::new(vector_subvector::VectorSubvectorFunction));
|
||||
registry.register(Arc::new(elem_sum::ElemSumFunction));
|
||||
registry.register(Arc::new(elem_product::ElemProductFunction));
|
||||
|
||||
211
src/common/function/src/scalars/vector/vector_kth_elem.rs
Normal file
211
src/common/function/src/scalars/vector/vector_kth_elem.rs
Normal file
@@ -0,0 +1,211 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::borrow::Cow;
|
||||
use std::fmt::Display;
|
||||
|
||||
use common_query::error::{InvalidFuncArgsSnafu, Result};
|
||||
use common_query::prelude::Signature;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::scalars::ScalarVectorBuilder;
|
||||
use datatypes::vectors::{Float32VectorBuilder, MutableVector, VectorRef};
|
||||
use snafu::ensure;
|
||||
|
||||
use crate::function::{Function, FunctionContext};
|
||||
use crate::helper;
|
||||
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const};
|
||||
|
||||
const NAME: &str = "vec_kth_elem";
|
||||
|
||||
/// Returns the k-th(0-based index) element of the vector.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```sql
|
||||
/// SELECT vec_kth_elem("[2, 4, 6]",1) as result;
|
||||
///
|
||||
/// +---------+
|
||||
/// | result |
|
||||
/// +---------+
|
||||
/// | 4 |
|
||||
/// +---------+
|
||||
///
|
||||
/// ```
|
||||
///
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct VectorKthElemFunction;
|
||||
|
||||
impl Function for VectorKthElemFunction {
|
||||
fn name(&self) -> &str {
|
||||
NAME
|
||||
}
|
||||
|
||||
fn return_type(
|
||||
&self,
|
||||
_input_types: &[ConcreteDataType],
|
||||
) -> common_query::error::Result<ConcreteDataType> {
|
||||
Ok(ConcreteDataType::float32_datatype())
|
||||
}
|
||||
|
||||
fn signature(&self) -> Signature {
|
||||
helper::one_of_sigs2(
|
||||
vec![
|
||||
ConcreteDataType::string_datatype(),
|
||||
ConcreteDataType::binary_datatype(),
|
||||
],
|
||||
vec![ConcreteDataType::int64_datatype()],
|
||||
)
|
||||
}
|
||||
|
||||
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
|
||||
ensure!(
|
||||
columns.len() == 2,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect exactly two, have: {}",
|
||||
columns.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let arg0 = &columns[0];
|
||||
let arg1 = &columns[1];
|
||||
|
||||
let len = arg0.len();
|
||||
let mut result = Float32VectorBuilder::with_capacity(len);
|
||||
if len == 0 {
|
||||
return Ok(result.to_vector());
|
||||
};
|
||||
|
||||
let arg0_const = as_veclit_if_const(arg0)?;
|
||||
|
||||
for i in 0..len {
|
||||
let arg0 = match arg0_const.as_ref() {
|
||||
Some(arg0) => Some(Cow::Borrowed(arg0.as_ref())),
|
||||
None => as_veclit(arg0.get_ref(i))?,
|
||||
};
|
||||
let Some(arg0) = arg0 else {
|
||||
result.push_null();
|
||||
continue;
|
||||
};
|
||||
|
||||
let arg1 = arg1.get(i).as_f64_lossy();
|
||||
let Some(arg1) = arg1 else {
|
||||
result.push_null();
|
||||
continue;
|
||||
};
|
||||
|
||||
ensure!(
|
||||
arg1 >= 0.0 && arg1.fract() == 0.0,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"Invalid argument: k must be a non-negative integer, but got k = {}.",
|
||||
arg1
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let k = arg1 as usize;
|
||||
|
||||
ensure!(
|
||||
k < arg0.len(),
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"Out of range: k must be in the range [0, {}], but got k = {}.",
|
||||
arg0.len() - 1,
|
||||
k
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let value = arg0[k];
|
||||
|
||||
result.push(Some(value));
|
||||
}
|
||||
Ok(result.to_vector())
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for VectorKthElemFunction {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", NAME.to_ascii_uppercase())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_query::error;
|
||||
use datatypes::vectors::{Int64Vector, StringVector};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_vec_kth_elem() {
|
||||
let func = VectorKthElemFunction;
|
||||
|
||||
let input0 = Arc::new(StringVector::from(vec![
|
||||
Some("[1.0,2.0,3.0]".to_string()),
|
||||
Some("[4.0,5.0,6.0]".to_string()),
|
||||
Some("[7.0,8.0,9.0]".to_string()),
|
||||
None,
|
||||
]));
|
||||
let input1 = Arc::new(Int64Vector::from(vec![Some(0), Some(2), None, Some(1)]));
|
||||
|
||||
let result = func
|
||||
.eval(&FunctionContext::default(), &[input0, input1])
|
||||
.unwrap();
|
||||
|
||||
let result = result.as_ref();
|
||||
assert_eq!(result.len(), 4);
|
||||
assert_eq!(result.get_ref(0).as_f32().unwrap(), Some(1.0));
|
||||
assert_eq!(result.get_ref(1).as_f32().unwrap(), Some(6.0));
|
||||
assert!(result.get_ref(2).is_null());
|
||||
assert!(result.get_ref(3).is_null());
|
||||
|
||||
let input0 = Arc::new(StringVector::from(vec![Some("[1.0,2.0,3.0]".to_string())]));
|
||||
let input1 = Arc::new(Int64Vector::from(vec![Some(3)]));
|
||||
|
||||
let err = func
|
||||
.eval(&FunctionContext::default(), &[input0, input1])
|
||||
.unwrap_err();
|
||||
match err {
|
||||
error::Error::InvalidFuncArgs { err_msg, .. } => {
|
||||
assert_eq!(
|
||||
err_msg,
|
||||
format!("Out of range: k must be in the range [0, 2], but got k = 3.")
|
||||
)
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
|
||||
let input0 = Arc::new(StringVector::from(vec![Some("[1.0,2.0,3.0]".to_string())]));
|
||||
let input1 = Arc::new(Int64Vector::from(vec![Some(-1)]));
|
||||
|
||||
let err = func
|
||||
.eval(&FunctionContext::default(), &[input0, input1])
|
||||
.unwrap_err();
|
||||
match err {
|
||||
error::Error::InvalidFuncArgs { err_msg, .. } => {
|
||||
assert_eq!(
|
||||
err_msg,
|
||||
format!("Invalid argument: k must be a non-negative integer, but got k = -1.")
|
||||
)
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -155,21 +155,21 @@ impl<'a> MySqlTemplateFactory<'a> {
|
||||
table_name: table_name.to_string(),
|
||||
create_table_statement: format!(
|
||||
// Cannot be more than 3072 bytes in PRIMARY KEY
|
||||
"CREATE TABLE IF NOT EXISTS {table_name}(k VARBINARY(3072) PRIMARY KEY, v BLOB);",
|
||||
"CREATE TABLE IF NOT EXISTS `{table_name}`(k VARBINARY(3072) PRIMARY KEY, v BLOB);",
|
||||
),
|
||||
range_template: RangeTemplate {
|
||||
point: format!("SELECT k, v FROM {table_name} WHERE k = ?"),
|
||||
range: format!("SELECT k, v FROM {table_name} WHERE k >= ? AND k < ? ORDER BY k"),
|
||||
full: format!("SELECT k, v FROM {table_name} ? ORDER BY k"),
|
||||
left_bounded: format!("SELECT k, v FROM {table_name} WHERE k >= ? ORDER BY k"),
|
||||
prefix: format!("SELECT k, v FROM {table_name} WHERE k LIKE ? ORDER BY k"),
|
||||
point: format!("SELECT k, v FROM `{table_name}` WHERE k = ?"),
|
||||
range: format!("SELECT k, v FROM `{table_name}` WHERE k >= ? AND k < ? ORDER BY k"),
|
||||
full: format!("SELECT k, v FROM `{table_name}` ? ORDER BY k"),
|
||||
left_bounded: format!("SELECT k, v FROM `{table_name}` WHERE k >= ? ORDER BY k"),
|
||||
prefix: format!("SELECT k, v FROM `{table_name}` WHERE k LIKE ? ORDER BY k"),
|
||||
},
|
||||
delete_template: RangeTemplate {
|
||||
point: format!("DELETE FROM {table_name} WHERE k = ?;"),
|
||||
range: format!("DELETE FROM {table_name} WHERE k >= ? AND k < ?;"),
|
||||
full: format!("DELETE FROM {table_name}"),
|
||||
left_bounded: format!("DELETE FROM {table_name} WHERE k >= ?;"),
|
||||
prefix: format!("DELETE FROM {table_name} WHERE k LIKE ?;"),
|
||||
point: format!("DELETE FROM `{table_name}` WHERE k = ?;"),
|
||||
range: format!("DELETE FROM `{table_name}` WHERE k >= ? AND k < ?;"),
|
||||
full: format!("DELETE FROM `{table_name}`"),
|
||||
left_bounded: format!("DELETE FROM `{table_name}` WHERE k >= ?;"),
|
||||
prefix: format!("DELETE FROM `{table_name}` WHERE k LIKE ?;"),
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -189,14 +189,17 @@ impl MySqlTemplateSet {
|
||||
fn generate_batch_get_query(&self, key_len: usize) -> String {
|
||||
let table_name = &self.table_name;
|
||||
let in_clause = mysql_generate_in_placeholders(1, key_len).join(", ");
|
||||
format!("SELECT k, v FROM {table_name} WHERE k in ({});", in_clause)
|
||||
format!(
|
||||
"SELECT k, v FROM `{table_name}` WHERE k in ({});",
|
||||
in_clause
|
||||
)
|
||||
}
|
||||
|
||||
/// Generates the sql for batch delete.
|
||||
fn generate_batch_delete_query(&self, key_len: usize) -> String {
|
||||
let table_name = &self.table_name;
|
||||
let in_clause = mysql_generate_in_placeholders(1, key_len).join(", ");
|
||||
format!("DELETE FROM {table_name} WHERE k in ({});", in_clause)
|
||||
format!("DELETE FROM `{table_name}` WHERE k in ({});", in_clause)
|
||||
}
|
||||
|
||||
/// Generates the sql for batch upsert.
|
||||
@@ -212,9 +215,9 @@ impl MySqlTemplateSet {
|
||||
let values_clause = values_placeholders.join(", ");
|
||||
|
||||
(
|
||||
format!(r#"SELECT k, v FROM {table_name} WHERE k IN ({in_clause})"#,),
|
||||
format!(r#"SELECT k, v FROM `{table_name}` WHERE k IN ({in_clause})"#,),
|
||||
format!(
|
||||
r#"INSERT INTO {table_name} (k, v) VALUES {values_clause} ON DUPLICATE KEY UPDATE v = VALUES(v);"#,
|
||||
r#"INSERT INTO `{table_name}` (k, v) VALUES {values_clause} ON DUPLICATE KEY UPDATE v = VALUES(v);"#,
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -157,21 +157,25 @@ impl<'a> PgSqlTemplateFactory<'a> {
|
||||
PgSqlTemplateSet {
|
||||
table_name: table_name.to_string(),
|
||||
create_table_statement: format!(
|
||||
"CREATE TABLE IF NOT EXISTS {table_name}(k bytea PRIMARY KEY, v bytea)",
|
||||
"CREATE TABLE IF NOT EXISTS \"{table_name}\"(k bytea PRIMARY KEY, v bytea)",
|
||||
),
|
||||
range_template: RangeTemplate {
|
||||
point: format!("SELECT k, v FROM {table_name} WHERE k = $1"),
|
||||
range: format!("SELECT k, v FROM {table_name} WHERE k >= $1 AND k < $2 ORDER BY k"),
|
||||
full: format!("SELECT k, v FROM {table_name} $1 ORDER BY k"),
|
||||
left_bounded: format!("SELECT k, v FROM {table_name} WHERE k >= $1 ORDER BY k"),
|
||||
prefix: format!("SELECT k, v FROM {table_name} WHERE k LIKE $1 ORDER BY k"),
|
||||
point: format!("SELECT k, v FROM \"{table_name}\" WHERE k = $1"),
|
||||
range: format!(
|
||||
"SELECT k, v FROM \"{table_name}\" WHERE k >= $1 AND k < $2 ORDER BY k"
|
||||
),
|
||||
full: format!("SELECT k, v FROM \"{table_name}\" $1 ORDER BY k"),
|
||||
left_bounded: format!("SELECT k, v FROM \"{table_name}\" WHERE k >= $1 ORDER BY k"),
|
||||
prefix: format!("SELECT k, v FROM \"{table_name}\" WHERE k LIKE $1 ORDER BY k"),
|
||||
},
|
||||
delete_template: RangeTemplate {
|
||||
point: format!("DELETE FROM {table_name} WHERE k = $1 RETURNING k,v;"),
|
||||
range: format!("DELETE FROM {table_name} WHERE k >= $1 AND k < $2 RETURNING k,v;"),
|
||||
full: format!("DELETE FROM {table_name} RETURNING k,v"),
|
||||
left_bounded: format!("DELETE FROM {table_name} WHERE k >= $1 RETURNING k,v;"),
|
||||
prefix: format!("DELETE FROM {table_name} WHERE k LIKE $1 RETURNING k,v;"),
|
||||
point: format!("DELETE FROM \"{table_name}\" WHERE k = $1 RETURNING k,v;"),
|
||||
range: format!(
|
||||
"DELETE FROM \"{table_name}\" WHERE k >= $1 AND k < $2 RETURNING k,v;"
|
||||
),
|
||||
full: format!("DELETE FROM \"{table_name}\" RETURNING k,v"),
|
||||
left_bounded: format!("DELETE FROM \"{table_name}\" WHERE k >= $1 RETURNING k,v;"),
|
||||
prefix: format!("DELETE FROM \"{table_name}\" WHERE k LIKE $1 RETURNING k,v;"),
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -191,7 +195,10 @@ impl PgSqlTemplateSet {
|
||||
fn generate_batch_get_query(&self, key_len: usize) -> String {
|
||||
let table_name = &self.table_name;
|
||||
let in_clause = pg_generate_in_placeholders(1, key_len).join(", ");
|
||||
format!("SELECT k, v FROM {table_name} WHERE k in ({});", in_clause)
|
||||
format!(
|
||||
"SELECT k, v FROM \"{table_name}\" WHERE k in ({});",
|
||||
in_clause
|
||||
)
|
||||
}
|
||||
|
||||
/// Generates the sql for batch delete.
|
||||
@@ -199,7 +206,7 @@ impl PgSqlTemplateSet {
|
||||
let table_name = &self.table_name;
|
||||
let in_clause = pg_generate_in_placeholders(1, key_len).join(", ");
|
||||
format!(
|
||||
"DELETE FROM {table_name} WHERE k in ({}) RETURNING k,v;",
|
||||
"DELETE FROM \"{table_name}\" WHERE k in ({}) RETURNING k,v;",
|
||||
in_clause
|
||||
)
|
||||
}
|
||||
@@ -220,9 +227,9 @@ impl PgSqlTemplateSet {
|
||||
format!(
|
||||
r#"
|
||||
WITH prev AS (
|
||||
SELECT k,v FROM {table_name} WHERE k IN ({in_clause})
|
||||
SELECT k,v FROM "{table_name}" WHERE k IN ({in_clause})
|
||||
), update AS (
|
||||
INSERT INTO {table_name} (k, v) VALUES
|
||||
INSERT INTO "{table_name}" (k, v) VALUES
|
||||
{values_clause}
|
||||
ON CONFLICT (
|
||||
k
|
||||
|
||||
@@ -12,63 +12,13 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::meta::Peer as PbPeer;
|
||||
use serde::{Deserialize, Serialize};
|
||||
pub use api::v1::meta::Peer;
|
||||
|
||||
use crate::error::Error;
|
||||
use crate::{DatanodeId, FlownodeId};
|
||||
|
||||
#[derive(Debug, Default, Clone, Hash, Eq, PartialEq, Deserialize, Serialize)]
|
||||
pub struct Peer {
|
||||
/// Node identifier. Unique in a cluster.
|
||||
pub id: u64,
|
||||
pub addr: String,
|
||||
}
|
||||
|
||||
impl From<PbPeer> for Peer {
|
||||
fn from(p: PbPeer) -> Self {
|
||||
Self {
|
||||
id: p.id,
|
||||
addr: p.addr,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Peer> for PbPeer {
|
||||
fn from(p: Peer) -> Self {
|
||||
Self {
|
||||
id: p.id,
|
||||
addr: p.addr,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Peer {
|
||||
pub fn new(id: u64, addr: impl Into<String>) -> Self {
|
||||
Self {
|
||||
id,
|
||||
addr: addr.into(),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub fn empty(id: u64) -> Self {
|
||||
Self {
|
||||
id,
|
||||
addr: String::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for Peer {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "peer-{}({})", self.id, self.addr)
|
||||
}
|
||||
}
|
||||
|
||||
/// can query peer given a node id
|
||||
#[async_trait::async_trait]
|
||||
pub trait PeerLookupService {
|
||||
|
||||
@@ -25,6 +25,6 @@ pub mod heartbeat;
|
||||
pub mod metrics;
|
||||
pub mod region_server;
|
||||
pub mod service;
|
||||
mod store;
|
||||
pub mod store;
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub mod tests;
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
//! object storage utilities
|
||||
|
||||
mod azblob;
|
||||
mod fs;
|
||||
pub mod fs;
|
||||
mod gcs;
|
||||
mod oss;
|
||||
mod s3;
|
||||
|
||||
@@ -24,7 +24,8 @@ use crate::config::FileConfig;
|
||||
use crate::error::{self, Result};
|
||||
use crate::store;
|
||||
|
||||
pub(crate) async fn new_fs_object_store(
|
||||
/// A helper function to create a file system object store.
|
||||
pub async fn new_fs_object_store(
|
||||
data_home: &str,
|
||||
_file_config: &FileConfig,
|
||||
) -> Result<ObjectStore> {
|
||||
|
||||
@@ -28,14 +28,14 @@ use common_recordbatch::adapter::RecordBatchStreamAdapter;
|
||||
use datafusion::dataframe::DataFrame;
|
||||
use datafusion::execution::context::SessionContext;
|
||||
use datafusion::execution::SessionStateBuilder;
|
||||
use datafusion_expr::{col, lit, lit_timestamp_nano, Expr};
|
||||
use datafusion_expr::{col, lit, lit_timestamp_nano, wildcard, Expr};
|
||||
use query::QueryEngineRef;
|
||||
use serde_json::Value as JsonValue;
|
||||
use servers::error::{
|
||||
CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult,
|
||||
TableNotFoundSnafu,
|
||||
};
|
||||
use servers::http::jaeger::{QueryTraceParams, FIND_TRACES_COLS};
|
||||
use servers::http::jaeger::{QueryTraceParams, JAEGER_QUERY_TABLE_NAME_KEY};
|
||||
use servers::otlp::trace::{
|
||||
DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_KIND_COLUMN,
|
||||
SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN, TRACE_TABLE_NAME,
|
||||
@@ -43,6 +43,7 @@ use servers::otlp::trace::{
|
||||
use servers::query_handler::JaegerQueryHandler;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use table::requests::{TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1};
|
||||
use table::table::adapter::DfTableProviderAdapter;
|
||||
|
||||
use super::Instance;
|
||||
@@ -82,7 +83,19 @@ impl JaegerQueryHandler for Instance {
|
||||
))));
|
||||
}
|
||||
|
||||
// It's equivalent to `SELECT span_name, span_kind FROM {db}.{trace_table} WHERE service_name = '{service_name}'`.
|
||||
// It's equivalent to
|
||||
//
|
||||
// ```
|
||||
// SELECT
|
||||
// span_name,
|
||||
// span_kind
|
||||
// FROM
|
||||
// {db}.{trace_table}
|
||||
// WHERE
|
||||
// service_name = '{service_name}'
|
||||
// ORDER BY
|
||||
// timestamp
|
||||
// ```.
|
||||
Ok(query_trace_table(
|
||||
ctx,
|
||||
self.catalog_manager(),
|
||||
@@ -101,9 +114,19 @@ impl JaegerQueryHandler for Instance {
|
||||
}
|
||||
|
||||
async fn get_trace(&self, ctx: QueryContextRef, trace_id: &str) -> ServerResult<Output> {
|
||||
// It's equivalent to `SELECT trace_id, timestamp, duration_nano, service_name, span_name, span_id, span_attributes, resource_attributes, parent_span_id
|
||||
// FROM {db}.{trace_table} WHERE trace_id = '{trace_id}'`.
|
||||
let selects: Vec<Expr> = FIND_TRACES_COLS.clone();
|
||||
// It's equivalent to
|
||||
//
|
||||
// ```
|
||||
// SELECT
|
||||
// *
|
||||
// FROM
|
||||
// {db}.{trace_table}
|
||||
// WHERE
|
||||
// trace_id = '{trace_id}'
|
||||
// ORDER BY
|
||||
// timestamp
|
||||
// ```.
|
||||
let selects = vec![wildcard()];
|
||||
|
||||
let filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
|
||||
|
||||
@@ -125,7 +148,7 @@ impl JaegerQueryHandler for Instance {
|
||||
ctx: QueryContextRef,
|
||||
query_params: QueryTraceParams,
|
||||
) -> ServerResult<Output> {
|
||||
let selects: Vec<Expr> = FIND_TRACES_COLS.clone();
|
||||
let selects = vec![wildcard()];
|
||||
|
||||
let mut filters = vec![];
|
||||
|
||||
@@ -174,17 +197,34 @@ async fn query_trace_table(
|
||||
tags: Option<HashMap<String, JsonValue>>,
|
||||
distinct: bool,
|
||||
) -> ServerResult<Output> {
|
||||
let db = ctx.get_db_string();
|
||||
let table_name = ctx
|
||||
.extension(JAEGER_QUERY_TABLE_NAME_KEY)
|
||||
.unwrap_or(TRACE_TABLE_NAME);
|
||||
|
||||
let table = catalog_manager
|
||||
.table(ctx.current_catalog(), &db, TRACE_TABLE_NAME, Some(&ctx))
|
||||
.table(
|
||||
ctx.current_catalog(),
|
||||
&ctx.current_schema(),
|
||||
table_name,
|
||||
Some(&ctx),
|
||||
)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table: TRACE_TABLE_NAME,
|
||||
table: table_name,
|
||||
catalog: ctx.current_catalog(),
|
||||
schema: db,
|
||||
schema: ctx.current_schema(),
|
||||
})?;
|
||||
|
||||
let is_data_model_v1 = table
|
||||
.table_info()
|
||||
.meta
|
||||
.options
|
||||
.extra_options
|
||||
.get(TABLE_DATA_MODEL)
|
||||
.map(|s| s.as_str())
|
||||
== Some(TABLE_DATA_MODEL_TRACE_V1);
|
||||
|
||||
let df_context = create_df_context(query_engine, ctx.clone())?;
|
||||
|
||||
let dataframe = df_context
|
||||
@@ -196,7 +236,9 @@ async fn query_trace_table(
|
||||
// Apply all filters.
|
||||
let dataframe = filters
|
||||
.into_iter()
|
||||
.chain(tags.map_or(Ok(vec![]), |t| tags_filters(&dataframe, t))?)
|
||||
.chain(tags.map_or(Ok(vec![]), |t| {
|
||||
tags_filters(&dataframe, t, is_data_model_v1)
|
||||
})?)
|
||||
.try_fold(dataframe, |df, expr| {
|
||||
df.filter(expr).context(DataFusionSnafu)
|
||||
})?;
|
||||
@@ -205,7 +247,10 @@ async fn query_trace_table(
|
||||
let dataframe = if distinct {
|
||||
dataframe.distinct().context(DataFusionSnafu)?
|
||||
} else {
|
||||
// for non distinct query, sort by timestamp to make results stable
|
||||
dataframe
|
||||
.sort_by(vec![col(TIMESTAMP_COLUMN)])
|
||||
.context(DataFusionSnafu)?
|
||||
};
|
||||
|
||||
// Apply the limit if needed.
|
||||
@@ -237,7 +282,7 @@ fn create_df_context(
|
||||
SessionStateBuilder::new_from_existing(query_engine.engine_state().session_state()).build(),
|
||||
);
|
||||
|
||||
// The following JSON UDFs will be used for tags filters.
|
||||
// The following JSON UDFs will be used for tags filters on v0 data model.
|
||||
let udfs: Vec<FunctionRef> = vec![
|
||||
Arc::new(JsonGetInt),
|
||||
Arc::new(JsonGetFloat),
|
||||
@@ -256,7 +301,7 @@ fn create_df_context(
|
||||
Ok(df_context)
|
||||
}
|
||||
|
||||
fn tags_filters(
|
||||
fn json_tag_filters(
|
||||
dataframe: &DataFrame,
|
||||
tags: HashMap<String, JsonValue>,
|
||||
) -> ServerResult<Vec<Expr>> {
|
||||
@@ -322,3 +367,41 @@ fn tags_filters(
|
||||
|
||||
Ok(filters)
|
||||
}
|
||||
|
||||
fn flatten_tag_filters(tags: HashMap<String, JsonValue>) -> ServerResult<Vec<Expr>> {
|
||||
let filters = tags
|
||||
.into_iter()
|
||||
.filter_map(|(key, value)| {
|
||||
let key = format!("\"span_attributes.{}\"", key);
|
||||
match value {
|
||||
JsonValue::String(value) => Some(col(key).eq(lit(value))),
|
||||
JsonValue::Number(value) => {
|
||||
if value.is_f64() {
|
||||
// safe to unwrap as checked previously
|
||||
Some(col(key).eq(lit(value.as_f64().unwrap())))
|
||||
} else {
|
||||
Some(col(key).eq(lit(value.as_i64().unwrap())))
|
||||
}
|
||||
}
|
||||
JsonValue::Bool(value) => Some(col(key).eq(lit(value))),
|
||||
JsonValue::Null => Some(col(key).is_null()),
|
||||
// not supported at the moment
|
||||
JsonValue::Array(_value) => None,
|
||||
JsonValue::Object(_value) => None,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
Ok(filters)
|
||||
}
|
||||
|
||||
fn tags_filters(
|
||||
dataframe: &DataFrame,
|
||||
tags: HashMap<String, JsonValue>,
|
||||
is_data_model_v1: bool,
|
||||
) -> ServerResult<Vec<Expr>> {
|
||||
if is_data_model_v1 {
|
||||
flatten_tag_filters(tags)
|
||||
} else {
|
||||
json_tag_filters(dataframe, tags)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,8 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::ops::Deref;
|
||||
|
||||
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
|
||||
use client::Output;
|
||||
use common_error::ext::BoxedError;
|
||||
@@ -20,7 +22,7 @@ use server_error::Result as ServerResult;
|
||||
use servers::error::{self as server_error, AuthSnafu, ExecuteQuerySnafu};
|
||||
use servers::interceptor::{LogQueryInterceptor, LogQueryInterceptorRef};
|
||||
use servers::query_handler::LogQueryHandler;
|
||||
use session::context::QueryContextRef;
|
||||
use session::context::{QueryContext, QueryContextRef};
|
||||
use snafu::ResultExt;
|
||||
use tonic::async_trait;
|
||||
|
||||
@@ -64,4 +66,8 @@ impl LogQueryHandler for Instance {
|
||||
|
||||
Ok(interceptor.as_ref().post_query(output, ctx.clone())?)
|
||||
}
|
||||
|
||||
fn catalog_manager(&self, _ctx: &QueryContext) -> ServerResult<&dyn catalog::CatalogManager> {
|
||||
Ok(self.catalog_manager.deref())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -90,6 +90,8 @@ impl OpenTelemetryProtocolHandler for Instance {
|
||||
.get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
|
||||
interceptor_ref.pre_execute(ctx.clone())?;
|
||||
|
||||
let is_trace_v1_model = matches!(pipeline, PipelineWay::OtlpTraceDirectV1);
|
||||
|
||||
let (requests, rows) = otlp::trace::to_grpc_insert_requests(
|
||||
request,
|
||||
pipeline,
|
||||
@@ -101,10 +103,17 @@ impl OpenTelemetryProtocolHandler for Instance {
|
||||
|
||||
OTLP_TRACES_ROWS.inc_by(rows as u64);
|
||||
|
||||
self.handle_trace_inserts(requests, ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)
|
||||
if is_trace_v1_model {
|
||||
self.handle_trace_inserts(requests, ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)
|
||||
} else {
|
||||
self.handle_log_inserts(requests, ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
|
||||
@@ -284,7 +284,7 @@ impl ClusterInfo for MetaClient {
|
||||
followers
|
||||
.into_iter()
|
||||
.map(|node| NodeInfo {
|
||||
peer: node.peer.map(|p| p.into()).unwrap_or_default(),
|
||||
peer: node.peer.unwrap_or_default(),
|
||||
last_activity_ts,
|
||||
status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
|
||||
version: node.version,
|
||||
@@ -292,7 +292,7 @@ impl ClusterInfo for MetaClient {
|
||||
start_time_ms: node.start_time_ms,
|
||||
})
|
||||
.chain(leader.into_iter().map(|node| NodeInfo {
|
||||
peer: node.peer.map(|p| p.into()).unwrap_or_default(),
|
||||
peer: node.peer.unwrap_or_default(),
|
||||
last_activity_ts,
|
||||
status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
|
||||
version: node.version,
|
||||
|
||||
@@ -6,7 +6,8 @@ license.workspace = true
|
||||
|
||||
[features]
|
||||
mock = []
|
||||
pg_kvbackend = ["dep:tokio-postgres", "common-meta/pg_kvbackend"]
|
||||
pg_kvbackend = ["dep:tokio-postgres", "common-meta/pg_kvbackend", "dep:deadpool-postgres", "dep:deadpool"]
|
||||
mysql_kvbackend = ["dep:sqlx", "common-meta/mysql_kvbackend"]
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
@@ -38,8 +39,8 @@ common-version.workspace = true
|
||||
common-wal.workspace = true
|
||||
dashmap.workspace = true
|
||||
datatypes.workspace = true
|
||||
deadpool.workspace = true
|
||||
deadpool-postgres.workspace = true
|
||||
deadpool = { workspace = true, optional = true }
|
||||
deadpool-postgres = { workspace = true, optional = true }
|
||||
derive_builder.workspace = true
|
||||
etcd-client.workspace = true
|
||||
futures.workspace = true
|
||||
@@ -60,6 +61,7 @@ serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
servers.workspace = true
|
||||
snafu.workspace = true
|
||||
sqlx = { workspace = true, optional = true }
|
||||
store-api.workspace = true
|
||||
strum.workspace = true
|
||||
table.workspace = true
|
||||
|
||||
@@ -23,6 +23,8 @@ use common_config::Configurable;
|
||||
use common_meta::kv_backend::chroot::ChrootKvBackend;
|
||||
use common_meta::kv_backend::etcd::EtcdStore;
|
||||
use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
#[cfg(feature = "mysql_kvbackend")]
|
||||
use common_meta::kv_backend::rds::MySqlStore;
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
use common_meta::kv_backend::rds::PgStore;
|
||||
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
|
||||
@@ -38,9 +40,15 @@ use servers::export_metrics::ExportMetricsTask;
|
||||
use servers::http::{HttpServer, HttpServerBuilder};
|
||||
use servers::metrics_handler::MetricsHandler;
|
||||
use servers::server::Server;
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
|
||||
use snafu::OptionExt;
|
||||
use snafu::ResultExt;
|
||||
#[cfg(feature = "mysql_kvbackend")]
|
||||
use sqlx::mysql::MySqlConnectOptions;
|
||||
#[cfg(feature = "mysql_kvbackend")]
|
||||
use sqlx::mysql::{MySqlConnection, MySqlPool};
|
||||
#[cfg(feature = "mysql_kvbackend")]
|
||||
use sqlx::Connection;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::mpsc::{self, Receiver, Sender};
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
@@ -49,9 +57,11 @@ use tonic::codec::CompressionEncoding;
|
||||
use tonic::transport::server::{Router, TcpIncoming};
|
||||
|
||||
use crate::election::etcd::EtcdElection;
|
||||
#[cfg(feature = "mysql_kvbackend")]
|
||||
use crate::election::mysql::MySqlElection;
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
use crate::election::postgres::PgElection;
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
|
||||
use crate::election::CANDIDATE_LEASE_SECS;
|
||||
use crate::metasrv::builder::MetasrvBuilder;
|
||||
use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectorRef};
|
||||
@@ -229,7 +239,6 @@ pub async fn metasrv_builder(
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
(None, BackendImpl::PostgresStore) => {
|
||||
let pool = create_postgres_pool(opts).await?;
|
||||
// TODO(CookiePie): use table name from config.
|
||||
let kv_backend = PgStore::with_pg_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
|
||||
.await
|
||||
.context(error::KvBackendSnafu)?;
|
||||
@@ -246,6 +255,26 @@ pub async fn metasrv_builder(
|
||||
.await?;
|
||||
(kv_backend, Some(election))
|
||||
}
|
||||
#[cfg(feature = "mysql_kvbackend")]
|
||||
(None, BackendImpl::MysqlStore) => {
|
||||
let pool = create_mysql_pool(opts).await?;
|
||||
let kv_backend =
|
||||
MySqlStore::with_mysql_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
|
||||
.await
|
||||
.context(error::KvBackendSnafu)?;
|
||||
// Since election will acquire a lock of the table, we need a separate table for election.
|
||||
let election_table_name = opts.meta_table_name.clone() + "_election";
|
||||
let election_client = create_mysql_client(opts).await?;
|
||||
let election = MySqlElection::with_mysql_client(
|
||||
opts.server_addr.clone(),
|
||||
election_client,
|
||||
opts.store_key_prefix.clone(),
|
||||
CANDIDATE_LEASE_SECS,
|
||||
&election_table_name,
|
||||
)
|
||||
.await?;
|
||||
(kv_backend, Some(election))
|
||||
}
|
||||
};
|
||||
|
||||
if !opts.store_key_prefix.is_empty() {
|
||||
@@ -323,3 +352,41 @@ async fn create_postgres_pool(opts: &MetasrvOptions) -> Result<deadpool_postgres
|
||||
.context(error::CreatePostgresPoolSnafu)?;
|
||||
Ok(pool)
|
||||
}
|
||||
|
||||
#[cfg(feature = "mysql_kvbackend")]
|
||||
async fn setup_mysql_options(opts: &MetasrvOptions) -> Result<MySqlConnectOptions> {
|
||||
let mysql_url = opts
|
||||
.store_addrs
|
||||
.first()
|
||||
.context(error::InvalidArgumentsSnafu {
|
||||
err_msg: "empty store addrs",
|
||||
})?;
|
||||
// Avoid `SET` commands in sqlx
|
||||
let opts: MySqlConnectOptions = mysql_url
|
||||
.parse()
|
||||
.context(error::ParseMySqlUrlSnafu { mysql_url })?;
|
||||
let opts = opts
|
||||
.no_engine_substitution(false)
|
||||
.pipes_as_concat(false)
|
||||
.timezone(None)
|
||||
.set_names(false);
|
||||
Ok(opts)
|
||||
}
|
||||
|
||||
#[cfg(feature = "mysql_kvbackend")]
|
||||
async fn create_mysql_pool(opts: &MetasrvOptions) -> Result<MySqlPool> {
|
||||
let opts = setup_mysql_options(opts).await?;
|
||||
let pool = MySqlPool::connect_with(opts)
|
||||
.await
|
||||
.context(error::CreateMySqlPoolSnafu)?;
|
||||
Ok(pool)
|
||||
}
|
||||
|
||||
#[cfg(feature = "mysql_kvbackend")]
|
||||
async fn create_mysql_client(opts: &MetasrvOptions) -> Result<MySqlConnection> {
|
||||
let opts = setup_mysql_options(opts).await?;
|
||||
let client = MySqlConnection::connect_with(&opts)
|
||||
.await
|
||||
.context(error::ConnectMySqlSnafu)?;
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
@@ -13,6 +13,8 @@
|
||||
// limitations under the License.
|
||||
|
||||
pub mod etcd;
|
||||
#[cfg(feature = "mysql_kvbackend")]
|
||||
pub mod mysql;
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
pub mod postgres;
|
||||
|
||||
|
||||
800
src/meta-srv/src/election/mysql.rs
Normal file
800
src/meta-srv/src/election/mysql.rs
Normal file
@@ -0,0 +1,800 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS};
|
||||
use common_telemetry::{error, warn};
|
||||
use common_time::Timestamp;
|
||||
use itertools::Itertools;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use sqlx::mysql::{MySqlArguments, MySqlRow};
|
||||
use sqlx::query::Query;
|
||||
use sqlx::{MySql, MySqlConnection, MySqlTransaction, Row};
|
||||
use tokio::sync::{broadcast, Mutex, MutexGuard};
|
||||
use tokio::time::{Interval, MissedTickBehavior};
|
||||
|
||||
use crate::election::{
|
||||
listen_leader_change, Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT, ELECTION_KEY,
|
||||
};
|
||||
use crate::error::{
|
||||
DeserializeFromJsonSnafu, MySqlExecutionSnafu, NoLeaderSnafu, Result, SerializeToJsonSnafu,
|
||||
UnexpectedSnafu,
|
||||
};
|
||||
use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
|
||||
|
||||
// Separator between value and expire time.
|
||||
const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#;
|
||||
|
||||
/// Lease information.
|
||||
/// TODO(CookiePie): PgElection can also use this struct. Refactor it to a common module.
|
||||
#[derive(Default, Clone)]
|
||||
struct Lease {
|
||||
leader_value: String,
|
||||
expire_time: Timestamp,
|
||||
current: Timestamp,
|
||||
// origin is the origin value of the lease, used for CAS.
|
||||
origin: String,
|
||||
}
|
||||
|
||||
struct ElectionSqlFactory<'a> {
|
||||
table_name: &'a str,
|
||||
}
|
||||
|
||||
struct ElectionSqlSet {
|
||||
campaign: String,
|
||||
// SQL to put a value with expire time.
|
||||
//
|
||||
// Parameters for the query:
|
||||
// `$1`: key,
|
||||
// `$2`: value,
|
||||
// `$3`: lease time in seconds
|
||||
//
|
||||
// Returns:
|
||||
// If the key already exists, return the previous value.
|
||||
put_value_with_lease: String,
|
||||
// SQL to update a value with expire time.
|
||||
//
|
||||
// Parameters for the query:
|
||||
// `$1`: updated value,
|
||||
// `$2`: lease time in seconds
|
||||
// `$3`: key,
|
||||
// `$4`: previous value,
|
||||
update_value_with_lease: String,
|
||||
// SQL to get a value with expire time.
|
||||
//
|
||||
// Parameters:
|
||||
// `$1`: key
|
||||
get_value_with_lease: String,
|
||||
// SQL to get all values with expire time with the given key prefix.
|
||||
//
|
||||
// Parameters:
|
||||
// `$1`: key prefix like 'prefix%'
|
||||
//
|
||||
// Returns:
|
||||
// column 0: value,
|
||||
// column 1: current timestamp
|
||||
get_value_with_lease_by_prefix: String,
|
||||
// SQL to delete a value.
|
||||
//
|
||||
// Parameters:
|
||||
// `?`: key
|
||||
//
|
||||
// Returns:
|
||||
// Rows affected
|
||||
delete_value: String,
|
||||
}
|
||||
|
||||
impl<'a> ElectionSqlFactory<'a> {
|
||||
fn new(table_name: &'a str) -> Self {
|
||||
Self { table_name }
|
||||
}
|
||||
|
||||
fn build(self) -> ElectionSqlSet {
|
||||
ElectionSqlSet {
|
||||
campaign: self.campaign_sql(),
|
||||
put_value_with_lease: self.put_value_with_lease_sql(),
|
||||
update_value_with_lease: self.update_value_with_lease_sql(),
|
||||
get_value_with_lease: self.get_value_with_lease_sql(),
|
||||
get_value_with_lease_by_prefix: self.get_value_with_lease_by_prefix_sql(),
|
||||
delete_value: self.delete_value_sql(),
|
||||
}
|
||||
}
|
||||
|
||||
// Currently the session timeout is longer than the leader lease time.
|
||||
// So the leader will renew the lease twice before the session timeout if everything goes well.
|
||||
fn set_idle_session_timeout_sql(&self) -> String {
|
||||
format!("SET SESSION wait_timeout = {};", META_LEASE_SECS + 1)
|
||||
}
|
||||
|
||||
fn set_lock_wait_timeout_sql(&self) -> &str {
|
||||
"SET SESSION innodb_lock_wait_timeout = 1;"
|
||||
}
|
||||
|
||||
fn create_table_sql(&self) -> String {
|
||||
format!(
|
||||
r#"
|
||||
CREATE TABLE IF NOT EXISTS `{}` (
|
||||
k VARBINARY(3072) PRIMARY KEY,
|
||||
v BLOB
|
||||
);
|
||||
"#,
|
||||
self.table_name
|
||||
)
|
||||
}
|
||||
|
||||
fn insert_once(&self) -> String {
|
||||
format!(
|
||||
"INSERT IGNORE INTO `{}` (k, v) VALUES ('__place_holder_for_lock', '');",
|
||||
self.table_name
|
||||
)
|
||||
}
|
||||
|
||||
fn check_version(&self) -> &str {
|
||||
"SELECT @@version;"
|
||||
}
|
||||
|
||||
fn campaign_sql(&self) -> String {
|
||||
format!("SELECT * FROM `{}` FOR UPDATE;", self.table_name)
|
||||
}
|
||||
|
||||
fn put_value_with_lease_sql(&self) -> String {
|
||||
format!(
|
||||
r#"
|
||||
INSERT INTO `{}` (k, v) VALUES (
|
||||
?,
|
||||
CONCAT(
|
||||
?,
|
||||
'{}',
|
||||
DATE_FORMAT(DATE_ADD(NOW(4), INTERVAL ? SECOND), '%Y-%m-%d %T.%f')
|
||||
)
|
||||
)
|
||||
ON DUPLICATE KEY UPDATE v = VALUES(v);
|
||||
"#,
|
||||
self.table_name, LEASE_SEP
|
||||
)
|
||||
}
|
||||
|
||||
fn update_value_with_lease_sql(&self) -> String {
|
||||
format!(
|
||||
r#"UPDATE `{}`
|
||||
SET v = CONCAT(?, '{}', DATE_FORMAT(DATE_ADD(NOW(4), INTERVAL ? SECOND), '%Y-%m-%d %T.%f'))
|
||||
WHERE k = ? AND v = ?"#,
|
||||
self.table_name, LEASE_SEP
|
||||
)
|
||||
}
|
||||
|
||||
fn get_value_with_lease_sql(&self) -> String {
|
||||
format!(
|
||||
r#"SELECT v, DATE_FORMAT(NOW(4), '%Y-%m-%d %T.%f') FROM `{}` WHERE k = ?"#,
|
||||
self.table_name
|
||||
)
|
||||
}
|
||||
|
||||
fn get_value_with_lease_by_prefix_sql(&self) -> String {
|
||||
format!(
|
||||
r#"SELECT v, DATE_FORMAT(NOW(4), '%Y-%m-%d %T.%f') FROM `{}` WHERE k LIKE ?"#,
|
||||
self.table_name
|
||||
)
|
||||
}
|
||||
|
||||
fn delete_value_sql(&self) -> String {
|
||||
format!("DELETE FROM {} WHERE k = ?;", self.table_name)
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse the value and expire time from the given string. The value should be in the format "value || LEASE_SEP || expire_time".
|
||||
fn parse_value_and_expire_time(value: &str) -> Result<(String, Timestamp)> {
|
||||
let (value, expire_time) =
|
||||
value
|
||||
.split(LEASE_SEP)
|
||||
.collect_tuple()
|
||||
.with_context(|| UnexpectedSnafu {
|
||||
violated: format!(
|
||||
"Invalid value {}, expect node info || {} || expire time",
|
||||
value, LEASE_SEP
|
||||
),
|
||||
})?;
|
||||
// Given expire_time is in the format 'YYYY-MM-DD HH24:MI:SS.MS'
|
||||
let expire_time = match Timestamp::from_str(expire_time, None) {
|
||||
Ok(ts) => ts,
|
||||
Err(_) => UnexpectedSnafu {
|
||||
violated: format!("Invalid timestamp: {}", expire_time),
|
||||
}
|
||||
.fail()?,
|
||||
};
|
||||
Ok((value.to_string(), expire_time))
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
struct MySqlLeaderKey {
|
||||
name: Vec<u8>,
|
||||
key: Vec<u8>,
|
||||
rev: i64,
|
||||
lease: i64,
|
||||
}
|
||||
|
||||
impl LeaderKey for MySqlLeaderKey {
|
||||
fn name(&self) -> &[u8] {
|
||||
&self.name
|
||||
}
|
||||
|
||||
fn key(&self) -> &[u8] {
|
||||
&self.key
|
||||
}
|
||||
|
||||
fn revision(&self) -> i64 {
|
||||
self.rev
|
||||
}
|
||||
|
||||
fn lease_id(&self) -> i64 {
|
||||
self.lease
|
||||
}
|
||||
}
|
||||
|
||||
enum Executor<'a> {
|
||||
Default(MutexGuard<'a, MySqlConnection>),
|
||||
Txn(MySqlTransaction<'a>),
|
||||
}
|
||||
|
||||
impl Executor<'_> {
|
||||
async fn query(
|
||||
&mut self,
|
||||
query: Query<'_, MySql, MySqlArguments>,
|
||||
sql: &str,
|
||||
) -> Result<Vec<MySqlRow>> {
|
||||
match self {
|
||||
Executor::Default(client) => {
|
||||
let res = query
|
||||
.fetch_all(&mut **client)
|
||||
.await
|
||||
.context(MySqlExecutionSnafu { sql })?;
|
||||
Ok(res)
|
||||
}
|
||||
Executor::Txn(txn) => {
|
||||
let res = query
|
||||
.fetch_all(&mut **txn)
|
||||
.await
|
||||
.context(MySqlExecutionSnafu { sql })?;
|
||||
Ok(res)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn execute(&mut self, query: Query<'_, MySql, MySqlArguments>, sql: &str) -> Result<u64> {
|
||||
match self {
|
||||
Executor::Default(client) => {
|
||||
let res = query
|
||||
.execute(&mut **client)
|
||||
.await
|
||||
.context(MySqlExecutionSnafu { sql })?;
|
||||
Ok(res.rows_affected())
|
||||
}
|
||||
Executor::Txn(txn) => {
|
||||
let res = query
|
||||
.execute(&mut **txn)
|
||||
.await
|
||||
.context(MySqlExecutionSnafu { sql })?;
|
||||
Ok(res.rows_affected())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn commit(self) -> Result<()> {
|
||||
match self {
|
||||
Executor::Txn(txn) => {
|
||||
txn.commit()
|
||||
.await
|
||||
.context(MySqlExecutionSnafu { sql: "COMMIT" })?;
|
||||
Ok(())
|
||||
}
|
||||
_ => Ok(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// MySQL implementation of Election.
|
||||
pub struct MySqlElection {
|
||||
leader_value: String,
|
||||
client: Mutex<MySqlConnection>,
|
||||
is_leader: AtomicBool,
|
||||
leader_infancy: AtomicBool,
|
||||
leader_watcher: broadcast::Sender<LeaderChangeMessage>,
|
||||
store_key_prefix: String,
|
||||
candidate_lease_ttl_secs: u64,
|
||||
sql_set: ElectionSqlSet,
|
||||
}
|
||||
|
||||
impl MySqlElection {
|
||||
pub async fn with_mysql_client(
|
||||
leader_value: String,
|
||||
mut client: sqlx::MySqlConnection,
|
||||
store_key_prefix: String,
|
||||
candidate_lease_ttl_secs: u64,
|
||||
table_name: &str,
|
||||
) -> Result<ElectionRef> {
|
||||
let sql_factory = ElectionSqlFactory::new(table_name);
|
||||
sqlx::query(&sql_factory.create_table_sql())
|
||||
.execute(&mut client)
|
||||
.await
|
||||
.context(MySqlExecutionSnafu {
|
||||
sql: &sql_factory.create_table_sql(),
|
||||
})?;
|
||||
// Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead lock.
|
||||
sqlx::query(&sql_factory.set_idle_session_timeout_sql())
|
||||
.execute(&mut client)
|
||||
.await
|
||||
.context(MySqlExecutionSnafu {
|
||||
sql: &sql_factory.set_idle_session_timeout_sql(),
|
||||
})?;
|
||||
// Set lock wait timeout to LOCK_WAIT_TIMEOUT to avoid waiting too long.
|
||||
sqlx::query(sql_factory.set_lock_wait_timeout_sql())
|
||||
.execute(&mut client)
|
||||
.await
|
||||
.context(MySqlExecutionSnafu {
|
||||
sql: sql_factory.set_lock_wait_timeout_sql(),
|
||||
})?;
|
||||
// Insert at least one row for `SELECT * FOR UPDATE` to work.
|
||||
sqlx::query(&sql_factory.insert_once())
|
||||
.execute(&mut client)
|
||||
.await
|
||||
.context(MySqlExecutionSnafu {
|
||||
sql: &sql_factory.insert_once(),
|
||||
})?;
|
||||
// Check MySQL version
|
||||
Self::check_version(&mut client, sql_factory.check_version()).await?;
|
||||
let tx = listen_leader_change(leader_value.clone());
|
||||
Ok(Arc::new(Self {
|
||||
leader_value,
|
||||
client: Mutex::new(client),
|
||||
is_leader: AtomicBool::new(false),
|
||||
leader_infancy: AtomicBool::new(false),
|
||||
leader_watcher: tx,
|
||||
store_key_prefix,
|
||||
candidate_lease_ttl_secs,
|
||||
sql_set: sql_factory.build(),
|
||||
}))
|
||||
}
|
||||
|
||||
fn election_key(&self) -> String {
|
||||
format!("{}{}", self.store_key_prefix, ELECTION_KEY)
|
||||
}
|
||||
|
||||
fn candidate_root(&self) -> String {
|
||||
format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
|
||||
}
|
||||
|
||||
fn candidate_key(&self) -> String {
|
||||
format!("{}{}", self.candidate_root(), self.leader_value)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Election for MySqlElection {
|
||||
type Leader = LeaderValue;
|
||||
|
||||
fn is_leader(&self) -> bool {
|
||||
self.is_leader.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
fn in_leader_infancy(&self) -> bool {
|
||||
self.leader_infancy
|
||||
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
|
||||
.is_ok()
|
||||
}
|
||||
|
||||
async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
|
||||
let key = self.candidate_key();
|
||||
let node_info =
|
||||
serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu {
|
||||
input: format!("{node_info:?}"),
|
||||
})?;
|
||||
|
||||
{
|
||||
let client = self.client.lock().await;
|
||||
let mut executor = Executor::Default(client);
|
||||
let res = self
|
||||
.put_value_with_lease(
|
||||
&key,
|
||||
&node_info,
|
||||
self.candidate_lease_ttl_secs,
|
||||
&mut executor,
|
||||
)
|
||||
.await?;
|
||||
// May registered before, just update the lease.
|
||||
if !res {
|
||||
warn!("Candidate already registered, update the lease");
|
||||
self.delete_value(&key, &mut executor).await?;
|
||||
self.put_value_with_lease(
|
||||
&key,
|
||||
&node_info,
|
||||
self.candidate_lease_ttl_secs,
|
||||
&mut executor,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
// Check if the current lease has expired and renew the lease.
|
||||
let mut keep_alive_interval =
|
||||
tokio::time::interval(Duration::from_secs(self.candidate_lease_ttl_secs / 2));
|
||||
loop {
|
||||
let _ = keep_alive_interval.tick().await;
|
||||
let client = self.client.lock().await;
|
||||
let mut executor = Executor::Default(client);
|
||||
let lease = self
|
||||
.get_value_with_lease(&key, &mut executor)
|
||||
.await?
|
||||
.unwrap_or_default();
|
||||
|
||||
ensure!(
|
||||
lease.expire_time > lease.current,
|
||||
UnexpectedSnafu {
|
||||
violated: format!(
|
||||
"Candidate lease expired at {:?} (current time: {:?}), key: {:?}",
|
||||
lease.expire_time,
|
||||
lease.current,
|
||||
String::from_utf8_lossy(&key.into_bytes())
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
self.update_value_with_lease(&key, &lease.origin, &node_info, &mut executor)
|
||||
.await?;
|
||||
std::mem::drop(executor);
|
||||
}
|
||||
}
|
||||
|
||||
async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
|
||||
let key_prefix = self.candidate_root();
|
||||
let client = self.client.lock().await;
|
||||
let mut executor = Executor::Default(client);
|
||||
let (mut candidates, current) = self
|
||||
.get_value_with_lease_by_prefix(&key_prefix, &mut executor)
|
||||
.await?;
|
||||
// Remove expired candidates
|
||||
candidates.retain(|c| c.1 > current);
|
||||
let mut valid_candidates = Vec::with_capacity(candidates.len());
|
||||
for (c, _) in candidates {
|
||||
let node_info: MetasrvNodeInfo =
|
||||
serde_json::from_str(&c).with_context(|_| DeserializeFromJsonSnafu {
|
||||
input: format!("{:?}", c),
|
||||
})?;
|
||||
valid_candidates.push(node_info);
|
||||
}
|
||||
Ok(valid_candidates)
|
||||
}
|
||||
|
||||
async fn campaign(&self) -> Result<()> {
|
||||
let mut keep_alive_interval =
|
||||
tokio::time::interval(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS));
|
||||
keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
|
||||
loop {
|
||||
let _ = self.do_campaign(&mut keep_alive_interval).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn leader(&self) -> Result<Self::Leader> {
|
||||
if self.is_leader.load(Ordering::Relaxed) {
|
||||
Ok(self.leader_value.as_bytes().into())
|
||||
} else {
|
||||
let key = self.election_key();
|
||||
|
||||
let client = self.client.lock().await;
|
||||
let mut executor = Executor::Default(client);
|
||||
if let Some(lease) = self.get_value_with_lease(&key, &mut executor).await? {
|
||||
ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
|
||||
Ok(lease.leader_value.as_bytes().into())
|
||||
} else {
|
||||
NoLeaderSnafu.fail()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn resign(&self) -> Result<()> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn subscribe_leader_change(&self) -> broadcast::Receiver<LeaderChangeMessage> {
|
||||
self.leader_watcher.subscribe()
|
||||
}
|
||||
}
|
||||
|
||||
impl MySqlElection {
|
||||
/// Returns value, expire time and current time. If `with_origin` is true, the origin string is also returned.
|
||||
async fn get_value_with_lease(
|
||||
&self,
|
||||
key: &str,
|
||||
executor: &mut Executor<'_>,
|
||||
) -> Result<Option<Lease>> {
|
||||
let key = key.as_bytes();
|
||||
let query = sqlx::query(&self.sql_set.get_value_with_lease).bind(key);
|
||||
let res = executor
|
||||
.query(query, &self.sql_set.get_value_with_lease)
|
||||
.await?;
|
||||
|
||||
if res.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
// Safety: Checked if res is empty above.
|
||||
let current_time_str = String::from_utf8_lossy(res[0].try_get(1).unwrap());
|
||||
let current_time = match Timestamp::from_str(¤t_time_str, None) {
|
||||
Ok(ts) => ts,
|
||||
Err(_) => UnexpectedSnafu {
|
||||
violated: format!("Invalid timestamp: {}", current_time_str),
|
||||
}
|
||||
.fail()?,
|
||||
};
|
||||
// Safety: Checked if res is empty above.
|
||||
let value_and_expire_time = String::from_utf8_lossy(res[0].try_get(0).unwrap_or_default());
|
||||
let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
|
||||
|
||||
Ok(Some(Lease {
|
||||
leader_value: value,
|
||||
expire_time,
|
||||
current: current_time,
|
||||
origin: value_and_expire_time.to_string(),
|
||||
}))
|
||||
}
|
||||
|
||||
/// Returns all values and expire time with the given key prefix. Also returns the current time.
|
||||
async fn get_value_with_lease_by_prefix(
|
||||
&self,
|
||||
key_prefix: &str,
|
||||
executor: &mut Executor<'_>,
|
||||
) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
|
||||
let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
|
||||
let query = sqlx::query(&self.sql_set.get_value_with_lease_by_prefix).bind(key_prefix);
|
||||
let res = executor
|
||||
.query(query, &self.sql_set.get_value_with_lease_by_prefix)
|
||||
.await?;
|
||||
|
||||
let mut values_with_leases = vec![];
|
||||
let mut current = Timestamp::default();
|
||||
for row in res {
|
||||
let current_time_str = row.try_get(1).unwrap_or_default();
|
||||
current = match Timestamp::from_str(current_time_str, None) {
|
||||
Ok(ts) => ts,
|
||||
Err(_) => UnexpectedSnafu {
|
||||
violated: format!("Invalid timestamp: {}", current_time_str),
|
||||
}
|
||||
.fail()?,
|
||||
};
|
||||
|
||||
let value_and_expire_time = String::from_utf8_lossy(row.try_get(0).unwrap_or_default());
|
||||
let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
|
||||
|
||||
values_with_leases.push((value, expire_time));
|
||||
}
|
||||
Ok((values_with_leases, current))
|
||||
}
|
||||
|
||||
async fn update_value_with_lease(
|
||||
&self,
|
||||
key: &str,
|
||||
prev: &str,
|
||||
updated: &str,
|
||||
executor: &mut Executor<'_>,
|
||||
) -> Result<()> {
|
||||
let key = key.as_bytes();
|
||||
let prev = prev.as_bytes();
|
||||
let updated = updated.as_bytes();
|
||||
|
||||
let query = sqlx::query(&self.sql_set.update_value_with_lease)
|
||||
.bind(updated)
|
||||
.bind(self.candidate_lease_ttl_secs as f64)
|
||||
.bind(key)
|
||||
.bind(prev);
|
||||
let res = executor
|
||||
.execute(query, &self.sql_set.update_value_with_lease)
|
||||
.await?;
|
||||
|
||||
ensure!(
|
||||
res == 1,
|
||||
UnexpectedSnafu {
|
||||
violated: format!("Failed to update key: {}", String::from_utf8_lossy(key)),
|
||||
}
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns `true` if the insertion is successful
|
||||
async fn put_value_with_lease(
|
||||
&self,
|
||||
key: &str,
|
||||
value: &str,
|
||||
lease_ttl_secs: u64,
|
||||
executor: &mut Executor<'_>,
|
||||
) -> Result<bool> {
|
||||
let key = key.as_bytes();
|
||||
let lease_ttl_secs = lease_ttl_secs as f64;
|
||||
let query = sqlx::query(&self.sql_set.put_value_with_lease)
|
||||
.bind(key)
|
||||
.bind(value)
|
||||
.bind(lease_ttl_secs);
|
||||
let res = executor
|
||||
.query(query, &self.sql_set.put_value_with_lease)
|
||||
.await?;
|
||||
Ok(res.is_empty())
|
||||
}
|
||||
|
||||
/// Returns `true` if the deletion is successful.
|
||||
/// Caution: Should only delete the key if the lease is expired.
|
||||
async fn delete_value(&self, key: &str, executor: &mut Executor<'_>) -> Result<bool> {
|
||||
let key = key.as_bytes();
|
||||
let query = sqlx::query(&self.sql_set.delete_value).bind(key);
|
||||
let res = executor.execute(query, &self.sql_set.delete_value).await?;
|
||||
|
||||
Ok(res == 1)
|
||||
}
|
||||
|
||||
/// Attempts to acquire leadership by executing a campaign. This function continuously checks
|
||||
/// if the current lease is still valid.
|
||||
async fn do_campaign(&self, interval: &mut Interval) -> Result<()> {
|
||||
// Need to restrict the scope of the client to avoid ambiguous overloads.
|
||||
use sqlx::Acquire;
|
||||
|
||||
loop {
|
||||
let client = self.client.lock().await;
|
||||
let executor = Executor::Default(client);
|
||||
let mut lease = Lease::default();
|
||||
match (
|
||||
self.lease_check(executor, &mut lease).await,
|
||||
self.is_leader(),
|
||||
) {
|
||||
// If the leader lease is valid and I'm the leader, renew the lease.
|
||||
(Ok(_), true) => {
|
||||
let mut client = self.client.lock().await;
|
||||
let txn = client
|
||||
.begin()
|
||||
.await
|
||||
.context(MySqlExecutionSnafu { sql: "BEGIN" })?;
|
||||
let mut executor = Executor::Txn(txn);
|
||||
let query = sqlx::query(&self.sql_set.campaign);
|
||||
executor.query(query, &self.sql_set.campaign).await?;
|
||||
self.renew_lease(executor, lease).await?;
|
||||
}
|
||||
// If the leader lease expires and I'm the leader, notify the leader watcher and step down.
|
||||
// Another instance should be elected as the leader in this case.
|
||||
(Err(_), true) => {
|
||||
warn!("Leader lease expired, re-initiate the campaign");
|
||||
self.step_down_without_lock().await?;
|
||||
}
|
||||
// If the leader lease expires and I'm not the leader, elect myself.
|
||||
(Err(_), false) => {
|
||||
warn!("Leader lease expired, re-initiate the campaign");
|
||||
let mut client = self.client.lock().await;
|
||||
let txn = client
|
||||
.begin()
|
||||
.await
|
||||
.context(MySqlExecutionSnafu { sql: "BEGIN" })?;
|
||||
let mut executor = Executor::Txn(txn);
|
||||
let query = sqlx::query(&self.sql_set.campaign);
|
||||
executor.query(query, &self.sql_set.campaign).await?;
|
||||
self.elected(&mut executor).await?;
|
||||
executor.commit().await?;
|
||||
}
|
||||
// If the leader lease is valid and I'm not the leader, do nothing.
|
||||
(Ok(_), false) => {}
|
||||
}
|
||||
interval.tick().await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Renew the lease
|
||||
async fn renew_lease(&self, mut executor: Executor<'_>, lease: Lease) -> Result<()> {
|
||||
let key = self.election_key();
|
||||
self.update_value_with_lease(&key, &lease.origin, &self.leader_value, &mut executor)
|
||||
.await?;
|
||||
executor.commit().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Performs a lease check during the election process.
|
||||
///
|
||||
/// This function performs the following checks and actions:
|
||||
///
|
||||
/// - **Case 1**: If the current instance is not the leader but the lease has expired, it raises an error
|
||||
/// to re-initiate the campaign. If the leader failed to renew the lease, its session will expire and the lock
|
||||
/// will be released.
|
||||
/// - **Case 2**: If all checks pass, the function returns without performing any actions.
|
||||
async fn lease_check(&self, mut executor: Executor<'_>, lease: &mut Lease) -> Result<()> {
|
||||
let key = self.election_key();
|
||||
let check_lease = self
|
||||
.get_value_with_lease(&key, &mut executor)
|
||||
.await?
|
||||
.context(NoLeaderSnafu)?;
|
||||
*lease = check_lease;
|
||||
// Case 1: Lease expired
|
||||
ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
|
||||
// Case 2: Everything is fine
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Still consider itself as the leader locally but failed to acquire the lock. Step down without deleting the key.
|
||||
async fn step_down_without_lock(&self) -> Result<()> {
|
||||
let key = self.election_key().into_bytes();
|
||||
let leader_key = MySqlLeaderKey {
|
||||
name: self.leader_value.clone().into_bytes(),
|
||||
key: key.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
if self
|
||||
.is_leader
|
||||
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
|
||||
.is_ok()
|
||||
{
|
||||
if let Err(e) = self
|
||||
.leader_watcher
|
||||
.send(LeaderChangeMessage::StepDown(Arc::new(leader_key)))
|
||||
{
|
||||
error!(e; "Failed to send leader change message");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Elected as leader. The leader should put the key and notify the leader watcher.
|
||||
/// Caution: Should only elected while holding the lock.
|
||||
async fn elected(&self, executor: &mut Executor<'_>) -> Result<()> {
|
||||
let key = self.election_key();
|
||||
let leader_key = MySqlLeaderKey {
|
||||
name: self.leader_value.clone().into_bytes(),
|
||||
key: key.clone().into_bytes(),
|
||||
..Default::default()
|
||||
};
|
||||
self.delete_value(&key, executor).await?;
|
||||
self.put_value_with_lease(&key, &self.leader_value, META_LEASE_SECS, executor)
|
||||
.await?;
|
||||
|
||||
if self
|
||||
.is_leader
|
||||
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
|
||||
.is_ok()
|
||||
{
|
||||
self.leader_infancy.store(true, Ordering::Relaxed);
|
||||
|
||||
if let Err(e) = self
|
||||
.leader_watcher
|
||||
.send(LeaderChangeMessage::Elected(Arc::new(leader_key)))
|
||||
{
|
||||
error!(e; "Failed to send leader change message");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check if the MySQL version is supported.
|
||||
async fn check_version(client: &mut MySqlConnection, sql: &str) -> Result<()> {
|
||||
let query = sqlx::query(sql);
|
||||
match query.fetch_one(client).await {
|
||||
Ok(row) => {
|
||||
let version: String = row.try_get(0).unwrap();
|
||||
if !version.starts_with("8.0") || !version.starts_with("5.7") {
|
||||
warn!(
|
||||
"Unsupported MySQL version: {}, expected: [5.7, 8.0]",
|
||||
version
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(e; "Failed to check MySQL version through sql: {}", sql);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -109,10 +109,10 @@ impl<'a> ElectionSqlFactory<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
// Currently the session timeout is longer than the leader lease time, so the leader lease may expire while the session is still alive.
|
||||
// Either the leader reconnects and step down or the session expires and the lock is released.
|
||||
fn set_idle_session_timeout_sql(&self) -> &str {
|
||||
"SET idle_session_timeout = '10s';"
|
||||
// Currently the session timeout is longer than the leader lease time.
|
||||
// So the leader will renew the lease twice before the session timeout if everything goes well.
|
||||
fn set_idle_session_timeout_sql(&self) -> String {
|
||||
format!("SET idle_session_timeout = '{}s';", META_LEASE_SECS + 1)
|
||||
}
|
||||
|
||||
fn campaign_sql(&self) -> String {
|
||||
@@ -126,9 +126,9 @@ impl<'a> ElectionSqlFactory<'a> {
|
||||
fn put_value_with_lease_sql(&self) -> String {
|
||||
format!(
|
||||
r#"WITH prev AS (
|
||||
SELECT k, v FROM {} WHERE k = $1
|
||||
SELECT k, v FROM "{}" WHERE k = $1
|
||||
), insert AS (
|
||||
INSERT INTO {}
|
||||
INSERT INTO "{}"
|
||||
VALUES($1, convert_to($2 || '{}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $3, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8'))
|
||||
ON CONFLICT (k) DO NOTHING
|
||||
)
|
||||
@@ -140,7 +140,7 @@ impl<'a> ElectionSqlFactory<'a> {
|
||||
|
||||
fn update_value_with_lease_sql(&self) -> String {
|
||||
format!(
|
||||
r#"UPDATE {}
|
||||
r#"UPDATE "{}"
|
||||
SET v = convert_to($3 || '{}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8')
|
||||
WHERE k = $1 AND v = $2"#,
|
||||
self.table_name, LEASE_SEP
|
||||
@@ -149,21 +149,21 @@ impl<'a> ElectionSqlFactory<'a> {
|
||||
|
||||
fn get_value_with_lease_sql(&self) -> String {
|
||||
format!(
|
||||
r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {} WHERE k = $1"#,
|
||||
r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM "{}" WHERE k = $1"#,
|
||||
self.table_name
|
||||
)
|
||||
}
|
||||
|
||||
fn get_value_with_lease_by_prefix_sql(&self) -> String {
|
||||
format!(
|
||||
r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {} WHERE k LIKE $1"#,
|
||||
r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM "{}" WHERE k LIKE $1"#,
|
||||
self.table_name
|
||||
)
|
||||
}
|
||||
|
||||
fn delete_value_sql(&self) -> String {
|
||||
format!(
|
||||
"DELETE FROM {} WHERE k = $1 RETURNING k,v;",
|
||||
"DELETE FROM \"{}\" WHERE k = $1 RETURNING k,v;",
|
||||
self.table_name
|
||||
)
|
||||
}
|
||||
@@ -241,7 +241,7 @@ impl PgElection {
|
||||
let sql_factory = ElectionSqlFactory::new(lock_id, table_name);
|
||||
// Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead advisory lock.
|
||||
client
|
||||
.execute(sql_factory.set_idle_session_timeout_sql(), &[])
|
||||
.execute(&sql_factory.set_idle_session_timeout_sql(), &[])
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
|
||||
@@ -285,7 +285,6 @@ impl Election for PgElection {
|
||||
.is_ok()
|
||||
}
|
||||
|
||||
/// TODO(CookiePie): Split the candidate registration and keep alive logic into separate methods, so that upper layers can call them separately.
|
||||
async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
|
||||
let key = self.candidate_key();
|
||||
let node_info =
|
||||
@@ -317,7 +316,9 @@ impl Election for PgElection {
|
||||
prev_expire_time > current_time,
|
||||
UnexpectedSnafu {
|
||||
violated: format!(
|
||||
"Candidate lease expired, key: {:?}",
|
||||
"Candidate lease expired at {:?} (current time {:?}), key: {:?}",
|
||||
prev_expire_time,
|
||||
current_time,
|
||||
String::from_utf8_lossy(&key.into_bytes())
|
||||
),
|
||||
}
|
||||
@@ -369,23 +370,19 @@ impl Election for PgElection {
|
||||
.query(&self.sql_set.campaign, &[])
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
if let Some(row) = res.first() {
|
||||
match row.try_get(0) {
|
||||
Ok(true) => self.leader_action().await?,
|
||||
Ok(false) => self.follower_action().await?,
|
||||
Err(_) => {
|
||||
return UnexpectedSnafu {
|
||||
violated: "Failed to get the result of acquiring advisory lock"
|
||||
.to_string(),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
let row = res.first().context(UnexpectedSnafu {
|
||||
violated: "Failed to get the result of acquiring advisory lock",
|
||||
})?;
|
||||
let is_leader = row.try_get(0).map_err(|_| {
|
||||
UnexpectedSnafu {
|
||||
violated: "Failed to get the result of get lock",
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
if is_leader {
|
||||
self.leader_action().await?;
|
||||
} else {
|
||||
return UnexpectedSnafu {
|
||||
violated: "Failed to get the result of acquiring advisory lock".to_string(),
|
||||
}
|
||||
.fail();
|
||||
self.follower_action().await?;
|
||||
}
|
||||
let _ = keep_alive_interval.tick().await;
|
||||
}
|
||||
@@ -747,7 +744,7 @@ mod tests {
|
||||
});
|
||||
if let Some(table_name) = table_name {
|
||||
let create_table_sql = format!(
|
||||
"CREATE TABLE IF NOT EXISTS {}(k bytea PRIMARY KEY, v bytea);",
|
||||
"CREATE TABLE IF NOT EXISTS \"{}\"(k bytea PRIMARY KEY, v bytea);",
|
||||
table_name
|
||||
);
|
||||
client.execute(&create_table_sql, &[]).await.unwrap();
|
||||
@@ -756,7 +753,7 @@ mod tests {
|
||||
}
|
||||
|
||||
async fn drop_table(client: &Client, table_name: &str) {
|
||||
let sql = format!("DROP TABLE IF EXISTS {};", table_name);
|
||||
let sql = format!("DROP TABLE IF EXISTS \"{}\";", table_name);
|
||||
client.execute(&sql, &[]).await.unwrap();
|
||||
}
|
||||
|
||||
|
||||
@@ -343,6 +343,16 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[cfg(feature = "mysql_kvbackend")]
|
||||
#[snafu(display("Failed to parse mysql url: {}", mysql_url))]
|
||||
ParseMySqlUrl {
|
||||
#[snafu(source)]
|
||||
error: sqlx::error::Error,
|
||||
mysql_url: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to find table route for {table_id}"))]
|
||||
TableRouteNotFound {
|
||||
table_id: TableId,
|
||||
@@ -729,6 +739,34 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[cfg(feature = "mysql_kvbackend")]
|
||||
#[snafu(display("Failed to execute via mysql, sql: {}", sql))]
|
||||
MySqlExecution {
|
||||
#[snafu(source)]
|
||||
error: sqlx::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
sql: String,
|
||||
},
|
||||
|
||||
#[cfg(feature = "mysql_kvbackend")]
|
||||
#[snafu(display("Failed to create mysql pool"))]
|
||||
CreateMySqlPool {
|
||||
#[snafu(source)]
|
||||
error: sqlx::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[cfg(feature = "mysql_kvbackend")]
|
||||
#[snafu(display("Failed to connect to mysql"))]
|
||||
ConnectMySql {
|
||||
#[snafu(source)]
|
||||
error: sqlx::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Handler not found: {}", name))]
|
||||
HandlerNotFound {
|
||||
name: String,
|
||||
@@ -911,6 +949,11 @@ impl ErrorExt for Error {
|
||||
| Error::GetPostgresConnection { .. }
|
||||
| Error::PostgresExecution { .. }
|
||||
| Error::ConnectPostgres { .. } => StatusCode::Internal,
|
||||
#[cfg(feature = "mysql_kvbackend")]
|
||||
Error::MySqlExecution { .. }
|
||||
| Error::CreateMySqlPool { .. }
|
||||
| Error::ConnectMySql { .. }
|
||||
| Error::ParseMySqlUrl { .. } => StatusCode::Internal,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -153,7 +153,7 @@ fn extract_base_info(request: &HeartbeatRequest) -> Option<(NodeInfoKey, Peer, P
|
||||
return None;
|
||||
};
|
||||
|
||||
Some((key, Peer::from(peer.clone()), info.clone()))
|
||||
Some((key, peer.clone(), info.clone()))
|
||||
}
|
||||
|
||||
async fn put_into_memory_store(ctx: &mut Context, key: NodeInfoKey, value: NodeInfo) -> Result<()> {
|
||||
|
||||
@@ -70,7 +70,7 @@ impl HeartbeatHandler for RemapFlowPeerHandler {
|
||||
|
||||
async fn rewrite_node_address(ctx: &mut Context, peer: &Peer) {
|
||||
let key = NodeAddressKey::with_flownode(peer.id).to_bytes();
|
||||
if let Ok(value) = NodeAddressValue::new(peer.clone().into()).try_as_raw_value() {
|
||||
if let Ok(value) = NodeAddressValue::new(peer.clone()).try_as_raw_value() {
|
||||
let put = PutRequest {
|
||||
key,
|
||||
value,
|
||||
|
||||
@@ -72,9 +72,9 @@ pub const TABLE_ID_SEQ: &str = "table_id";
|
||||
pub const FLOW_ID_SEQ: &str = "flow_id";
|
||||
pub const METASRV_HOME: &str = "./greptimedb_data/metasrv";
|
||||
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
|
||||
pub const DEFAULT_META_TABLE_NAME: &str = "greptime_metakv";
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
|
||||
pub const DEFAULT_META_ELECTION_LOCK_ID: u64 = 1;
|
||||
|
||||
// The datastores that implements metadata kvbackend.
|
||||
@@ -89,6 +89,9 @@ pub enum BackendImpl {
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
// Postgres as metadata storage.
|
||||
PostgresStore,
|
||||
#[cfg(feature = "mysql_kvbackend")]
|
||||
// MySql as metadata storage.
|
||||
MysqlStore,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
@@ -146,7 +149,7 @@ pub struct MetasrvOptions {
|
||||
pub tracing: TracingOptions,
|
||||
/// The datastore for kv metadata.
|
||||
pub backend: BackendImpl,
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
|
||||
/// Table name of rds kv backend.
|
||||
pub meta_table_name: String,
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
@@ -191,7 +194,7 @@ impl Default for MetasrvOptions {
|
||||
flush_stats_factor: 3,
|
||||
tracing: TracingOptions::default(),
|
||||
backend: BackendImpl::EtcdStore,
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
|
||||
meta_table_name: DEFAULT_META_TABLE_NAME.to_string(),
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
meta_election_lock_id: DEFAULT_META_ELECTION_LOCK_ID,
|
||||
|
||||
@@ -59,7 +59,7 @@ pub mod engine;
|
||||
pub mod error;
|
||||
mod metadata_region;
|
||||
mod metrics;
|
||||
mod row_modifier;
|
||||
pub mod row_modifier;
|
||||
#[cfg(test)]
|
||||
mod test_util;
|
||||
mod utils;
|
||||
|
||||
@@ -40,7 +40,7 @@ const TSID_HASH_SEED: u32 = 846793005;
|
||||
///
|
||||
/// - For [`PrimaryKeyEncoding::Dense`] encoding,
|
||||
/// it adds two columns(`__table_id`, `__tsid`) to the row.
|
||||
pub struct RowModifier {
|
||||
pub(crate) struct RowModifier {
|
||||
codec: SparsePrimaryKeyCodec,
|
||||
}
|
||||
|
||||
@@ -52,7 +52,7 @@ impl RowModifier {
|
||||
}
|
||||
|
||||
/// Modify rows with the given primary key encoding.
|
||||
pub fn modify_rows(
|
||||
pub(crate) fn modify_rows(
|
||||
&self,
|
||||
iter: RowsIter,
|
||||
table_id: TableId,
|
||||
@@ -145,16 +145,14 @@ impl RowModifier {
|
||||
|
||||
/// Fills internal columns of a row with table name and a hash of tag values.
|
||||
fn fill_internal_columns(&self, table_id: TableId, iter: &RowIter<'_>) -> (Value, Value) {
|
||||
let mut hasher = mur3::Hasher128::with_seed(TSID_HASH_SEED);
|
||||
let mut hasher = TsidGenerator::default();
|
||||
for (name, value) in iter.primary_keys_with_name() {
|
||||
// The type is checked before. So only null is ignored.
|
||||
if let Some(ValueData::StringValue(string)) = &value.value_data {
|
||||
name.hash(&mut hasher);
|
||||
string.hash(&mut hasher);
|
||||
hasher.write_label(name, string);
|
||||
}
|
||||
}
|
||||
// TSID is 64 bits, simply truncate the 128 bits hash
|
||||
let (hash, _) = hasher.finish128();
|
||||
let hash = hasher.finish();
|
||||
|
||||
(
|
||||
ValueData::U32Value(table_id).into(),
|
||||
@@ -163,6 +161,34 @@ impl RowModifier {
|
||||
}
|
||||
}
|
||||
|
||||
/// Tsid generator.
|
||||
pub struct TsidGenerator {
|
||||
hasher: mur3::Hasher128,
|
||||
}
|
||||
|
||||
impl Default for TsidGenerator {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
hasher: mur3::Hasher128::with_seed(TSID_HASH_SEED),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TsidGenerator {
|
||||
/// Writes a label pair to the generator.
|
||||
pub fn write_label(&mut self, name: &str, value: &str) {
|
||||
name.hash(&mut self.hasher);
|
||||
value.hash(&mut self.hasher);
|
||||
}
|
||||
|
||||
/// Generates a new TSID.
|
||||
pub fn finish(&mut self) -> u64 {
|
||||
// TSID is 64 bits, simply truncate the 128 bits hash
|
||||
let (hash, _) = self.hasher.finish128();
|
||||
hash
|
||||
}
|
||||
}
|
||||
|
||||
/// Index of a value.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
struct ValueIndex {
|
||||
|
||||
@@ -121,7 +121,7 @@ impl AccessLayer {
|
||||
/// Writes a SST with specific `file_id` and `metadata` to the layer.
|
||||
///
|
||||
/// Returns the info of the SST. If no data written, returns None.
|
||||
pub(crate) async fn write_sst(
|
||||
pub async fn write_sst(
|
||||
&self,
|
||||
request: SstWriteRequest,
|
||||
write_opts: &WriteOptions,
|
||||
@@ -191,26 +191,26 @@ impl AccessLayer {
|
||||
|
||||
/// `OperationType` represents the origin of the `SstWriteRequest`.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub(crate) enum OperationType {
|
||||
pub enum OperationType {
|
||||
Flush,
|
||||
Compact,
|
||||
}
|
||||
|
||||
/// Contents to build a SST.
|
||||
pub(crate) struct SstWriteRequest {
|
||||
pub(crate) op_type: OperationType,
|
||||
pub(crate) metadata: RegionMetadataRef,
|
||||
pub(crate) source: Source,
|
||||
pub(crate) cache_manager: CacheManagerRef,
|
||||
pub struct SstWriteRequest {
|
||||
pub op_type: OperationType,
|
||||
pub metadata: RegionMetadataRef,
|
||||
pub source: Source,
|
||||
pub cache_manager: CacheManagerRef,
|
||||
#[allow(dead_code)]
|
||||
pub(crate) storage: Option<String>,
|
||||
pub(crate) max_sequence: Option<SequenceNumber>,
|
||||
pub storage: Option<String>,
|
||||
pub max_sequence: Option<SequenceNumber>,
|
||||
|
||||
/// Configs for index
|
||||
pub(crate) index_options: IndexOptions,
|
||||
pub(crate) inverted_index_config: InvertedIndexConfig,
|
||||
pub(crate) fulltext_index_config: FulltextIndexConfig,
|
||||
pub(crate) bloom_filter_index_config: BloomFilterConfig,
|
||||
pub index_options: IndexOptions,
|
||||
pub inverted_index_config: InvertedIndexConfig,
|
||||
pub fulltext_index_config: FulltextIndexConfig,
|
||||
pub bloom_filter_index_config: BloomFilterConfig,
|
||||
}
|
||||
|
||||
pub(crate) async fn new_fs_cache_store(root: &str) -> Result<ObjectStore> {
|
||||
|
||||
@@ -46,6 +46,7 @@ const INDEX_CREATE_MEM_THRESHOLD_FACTOR: u64 = 16;
|
||||
pub(crate) const FETCH_OPTION_TIMEOUT: Duration = Duration::from_secs(3);
|
||||
|
||||
/// Configuration for [MitoEngine](crate::engine::MitoEngine).
|
||||
/// Before using the config, make sure to call `MitoConfig::validate()` to check if the config is valid.
|
||||
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
|
||||
#[serde(default)]
|
||||
pub struct MitoConfig {
|
||||
|
||||
@@ -42,6 +42,14 @@ use crate::worker::WorkerId;
|
||||
#[snafu(visibility(pub))]
|
||||
#[stack_trace_debug]
|
||||
pub enum Error {
|
||||
#[snafu(display("External error, context: {}", context))]
|
||||
External {
|
||||
source: BoxedError,
|
||||
context: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to encode sparse primary key, reason: {}", reason))]
|
||||
EncodeSparsePrimaryKey {
|
||||
reason: String,
|
||||
@@ -773,6 +781,50 @@ pub enum Error {
|
||||
#[snafu(display("checksum mismatch (actual: {}, expected: {})", actual, expected))]
|
||||
ChecksumMismatch { actual: u32, expected: u32 },
|
||||
|
||||
#[snafu(display(
|
||||
"No checkpoint found, region: {}, last_version: {}",
|
||||
region_id,
|
||||
last_version
|
||||
))]
|
||||
NoCheckpoint {
|
||||
region_id: RegionId,
|
||||
last_version: ManifestVersion,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"No manifests found in range: [{}..{}), region: {}, last_version: {}",
|
||||
start_version,
|
||||
end_version,
|
||||
region_id,
|
||||
last_version
|
||||
))]
|
||||
NoManifests {
|
||||
region_id: RegionId,
|
||||
start_version: ManifestVersion,
|
||||
end_version: ManifestVersion,
|
||||
last_version: ManifestVersion,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to install manifest to {}, region: {}, available manifest version: {}, last version: {}",
|
||||
target_version,
|
||||
available_version,
|
||||
region_id,
|
||||
last_version
|
||||
))]
|
||||
InstallManifestTo {
|
||||
region_id: RegionId,
|
||||
target_version: ManifestVersion,
|
||||
available_version: ManifestVersion,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
last_version: ManifestVersion,
|
||||
},
|
||||
|
||||
#[snafu(display("Region {} is stopped", region_id))]
|
||||
RegionStopped {
|
||||
region_id: RegionId,
|
||||
@@ -1011,7 +1063,10 @@ impl ErrorExt for Error {
|
||||
| OperateAbortedIndex { .. }
|
||||
| UnexpectedReplay { .. }
|
||||
| IndexEncodeNull { .. }
|
||||
| UnexpectedImpureDefault { .. } => StatusCode::Unexpected,
|
||||
| UnexpectedImpureDefault { .. }
|
||||
| NoCheckpoint { .. }
|
||||
| NoManifests { .. }
|
||||
| InstallManifestTo { .. } => StatusCode::Unexpected,
|
||||
RegionNotFound { .. } => StatusCode::RegionNotFound,
|
||||
ObjectStoreNotFound { .. }
|
||||
| InvalidScanIndex { .. }
|
||||
@@ -1090,6 +1145,8 @@ impl ErrorExt for Error {
|
||||
InvalidConfig { .. } => StatusCode::InvalidArguments,
|
||||
StaleLogEntry { .. } => StatusCode::Unexpected,
|
||||
|
||||
External { source, .. } => source.status_code(),
|
||||
|
||||
FilterRecordBatch { source, .. } => source.status_code(),
|
||||
|
||||
Download { .. } | Upload { .. } => StatusCode::StorageUnavailable,
|
||||
|
||||
@@ -23,8 +23,8 @@
|
||||
#[cfg_attr(feature = "test", allow(unused))]
|
||||
pub mod test_util;
|
||||
|
||||
mod access_layer;
|
||||
mod cache;
|
||||
pub mod access_layer;
|
||||
pub mod cache;
|
||||
pub mod compaction;
|
||||
pub mod config;
|
||||
pub mod engine;
|
||||
|
||||
@@ -23,7 +23,9 @@ use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::manifest::{ManifestVersion, MAX_VERSION, MIN_VERSION};
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
|
||||
use crate::error::{self, RegionStoppedSnafu, Result};
|
||||
use crate::error::{
|
||||
self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result,
|
||||
};
|
||||
use crate::manifest::action::{
|
||||
RegionChange, RegionCheckpoint, RegionManifest, RegionManifestBuilder, RegionMetaAction,
|
||||
RegionMetaActionList,
|
||||
@@ -197,9 +199,9 @@ impl RegionManifestManager {
|
||||
let checkpoint = Self::last_checkpoint(&mut store).await?;
|
||||
let last_checkpoint_version = checkpoint
|
||||
.as_ref()
|
||||
.map(|checkpoint| checkpoint.last_version)
|
||||
.map(|(checkpoint, _)| checkpoint.last_version)
|
||||
.unwrap_or(MIN_VERSION);
|
||||
let mut manifest_builder = if let Some(checkpoint) = checkpoint {
|
||||
let mut manifest_builder = if let Some((checkpoint, _)) = checkpoint {
|
||||
info!(
|
||||
"Recover region manifest {} from checkpoint version {}",
|
||||
options.manifest_dir, checkpoint.last_version
|
||||
@@ -275,6 +277,153 @@ impl RegionManifestManager {
|
||||
self.stopped = true;
|
||||
}
|
||||
|
||||
/// Installs the manifest changes from the current version to the target version (inclusive).
|
||||
///
|
||||
/// Returns installed version.
|
||||
/// **Note**: This function is not guaranteed to install the target version strictly.
|
||||
/// The installed version may be greater than the target version.
|
||||
pub async fn install_manifest_to(
|
||||
&mut self,
|
||||
target_version: ManifestVersion,
|
||||
) -> Result<ManifestVersion> {
|
||||
let _t = MANIFEST_OP_ELAPSED
|
||||
.with_label_values(&["install_manifest_to"])
|
||||
.start_timer();
|
||||
|
||||
// Case 1: If the target version is less than the current version, return the current version.
|
||||
if self.last_version >= target_version {
|
||||
debug!(
|
||||
"Target version {} is less than or equal to the current version {}, region: {}, skip install",
|
||||
target_version, self.last_version, self.manifest.metadata.region_id
|
||||
);
|
||||
return Ok(self.last_version);
|
||||
}
|
||||
|
||||
ensure!(
|
||||
!self.stopped,
|
||||
RegionStoppedSnafu {
|
||||
region_id: self.manifest.metadata.region_id,
|
||||
}
|
||||
);
|
||||
|
||||
// Fetches manifests from the last version strictly.
|
||||
let mut manifests = self
|
||||
.store
|
||||
// Invariant: last_version < target_version.
|
||||
.fetch_manifests_strict_from(self.last_version + 1, target_version + 1)
|
||||
.await?;
|
||||
|
||||
// Case 2: No manifests in range: [current_version+1, target_version+1)
|
||||
//
|
||||
// |---------Has been deleted------------| [Checkpoint Version]...[Latest Version]
|
||||
// [Leader region]
|
||||
// [Current Version]......[Target Version]
|
||||
// [Follower region]
|
||||
if manifests.is_empty() {
|
||||
debug!(
|
||||
"Manifests are not strict from {}, region: {}, tries to install the last checkpoint",
|
||||
self.last_version, self.manifest.metadata.region_id
|
||||
);
|
||||
let last_version = self.install_last_checkpoint().await?;
|
||||
// Case 2.1: If the installed checkpoint version is greater than or equal to the target version, return the last version.
|
||||
if last_version >= target_version {
|
||||
return Ok(last_version);
|
||||
}
|
||||
|
||||
// Fetches manifests from the installed version strictly.
|
||||
manifests = self
|
||||
.store
|
||||
// Invariant: last_version < target_version.
|
||||
.fetch_manifests_strict_from(last_version + 1, target_version + 1)
|
||||
.await?;
|
||||
}
|
||||
|
||||
if manifests.is_empty() {
|
||||
return NoManifestsSnafu {
|
||||
region_id: self.manifest.metadata.region_id,
|
||||
start_version: self.last_version + 1,
|
||||
end_version: target_version + 1,
|
||||
last_version: self.last_version,
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
|
||||
debug_assert_eq!(manifests.first().unwrap().0, self.last_version + 1);
|
||||
let mut manifest_builder =
|
||||
RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
|
||||
|
||||
for (manifest_version, raw_action_list) in manifests {
|
||||
self.store
|
||||
.set_delta_file_size(manifest_version, raw_action_list.len() as u64);
|
||||
let action_list = RegionMetaActionList::decode(&raw_action_list)?;
|
||||
for action in action_list.actions {
|
||||
match action {
|
||||
RegionMetaAction::Change(action) => {
|
||||
manifest_builder.apply_change(manifest_version, action);
|
||||
}
|
||||
RegionMetaAction::Edit(action) => {
|
||||
manifest_builder.apply_edit(manifest_version, action);
|
||||
}
|
||||
RegionMetaAction::Remove(_) => {
|
||||
debug!(
|
||||
"Unhandled action for region {}, action: {:?}",
|
||||
self.manifest.metadata.region_id, action
|
||||
);
|
||||
}
|
||||
RegionMetaAction::Truncate(action) => {
|
||||
manifest_builder.apply_truncate(manifest_version, action);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let new_manifest = manifest_builder.try_build()?;
|
||||
ensure!(
|
||||
new_manifest.manifest_version >= target_version,
|
||||
InstallManifestToSnafu {
|
||||
region_id: self.manifest.metadata.region_id,
|
||||
target_version,
|
||||
available_version: new_manifest.manifest_version,
|
||||
last_version: self.last_version,
|
||||
}
|
||||
);
|
||||
|
||||
let version = self.last_version;
|
||||
self.manifest = Arc::new(new_manifest);
|
||||
self.last_version = self.manifest.manifest_version;
|
||||
info!(
|
||||
"Install manifest changes from {} to {}, region: {}",
|
||||
version, self.last_version, self.manifest.metadata.region_id
|
||||
);
|
||||
|
||||
Ok(self.last_version)
|
||||
}
|
||||
|
||||
/// Installs the last checkpoint.
|
||||
pub(crate) async fn install_last_checkpoint(&mut self) -> Result<ManifestVersion> {
|
||||
let Some((checkpoint, checkpoint_size)) = Self::last_checkpoint(&mut self.store).await?
|
||||
else {
|
||||
return NoCheckpointSnafu {
|
||||
region_id: self.manifest.metadata.region_id,
|
||||
last_version: self.last_version,
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
self.store.reset_manifest_size();
|
||||
self.store
|
||||
.set_checkpoint_file_size(checkpoint.last_version, checkpoint_size);
|
||||
let builder = RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint);
|
||||
let manifest = builder.try_build()?;
|
||||
self.last_version = manifest.manifest_version;
|
||||
self.manifest = Arc::new(manifest);
|
||||
info!(
|
||||
"Installed region manifest from checkpoint: {}, region: {}",
|
||||
checkpoint.last_version, self.manifest.metadata.region_id
|
||||
);
|
||||
|
||||
Ok(self.last_version)
|
||||
}
|
||||
|
||||
/// Updates the manifest. Returns the current manifest version number.
|
||||
pub async fn update(&mut self, action_list: RegionMetaActionList) -> Result<ManifestVersion> {
|
||||
let _t = MANIFEST_OP_ELAPSED
|
||||
@@ -371,14 +520,17 @@ impl RegionManifestManager {
|
||||
}
|
||||
|
||||
/// Fetches the last [RegionCheckpoint] from storage.
|
||||
///
|
||||
/// If the checkpoint is not found, returns `None`.
|
||||
/// Otherwise, returns the checkpoint and the size of the checkpoint.
|
||||
pub(crate) async fn last_checkpoint(
|
||||
store: &mut ManifestObjectStore,
|
||||
) -> Result<Option<RegionCheckpoint>> {
|
||||
) -> Result<Option<(RegionCheckpoint, u64)>> {
|
||||
let last_checkpoint = store.load_last_checkpoint().await?;
|
||||
|
||||
if let Some((_, bytes)) = last_checkpoint {
|
||||
let checkpoint = RegionCheckpoint::decode(&bytes)?;
|
||||
Ok(Some(checkpoint))
|
||||
Ok(Some((checkpoint, bytes.len() as u64)))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
@@ -236,7 +236,31 @@ impl ManifestObjectStore {
|
||||
Ok(entries)
|
||||
}
|
||||
|
||||
/// Fetch all manifests in concurrent.
|
||||
/// Fetches manifests in range [start_version, end_version).
|
||||
///
|
||||
/// This functions is guaranteed to return manifests from the `start_version` strictly (must contain `start_version`).
|
||||
pub async fn fetch_manifests_strict_from(
|
||||
&self,
|
||||
start_version: ManifestVersion,
|
||||
end_version: ManifestVersion,
|
||||
) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
|
||||
let mut manifests = self.fetch_manifests(start_version, end_version).await?;
|
||||
let start_index = manifests.iter().position(|(v, _)| *v == start_version);
|
||||
debug!(
|
||||
"fetches manifests in range [{},{}), start_index: {:?}",
|
||||
start_version, end_version, start_index
|
||||
);
|
||||
if let Some(start_index) = start_index {
|
||||
Ok(manifests.split_off(start_index))
|
||||
} else {
|
||||
Ok(vec![])
|
||||
}
|
||||
}
|
||||
|
||||
/// Fetch all manifests in concurrent, and return the manifests in range [start_version, end_version)
|
||||
///
|
||||
/// **Notes**: This function is no guarantee to return manifests from the `start_version` strictly.
|
||||
/// Uses [fetch_manifests_strict_from](ManifestObjectStore::fetch_manifests_strict_from) to get manifests from the `start_version`.
|
||||
pub async fn fetch_manifests(
|
||||
&self,
|
||||
start_version: ManifestVersion,
|
||||
@@ -576,6 +600,12 @@ impl ManifestObjectStore {
|
||||
self.manifest_size_map.read().unwrap().values().sum()
|
||||
}
|
||||
|
||||
/// Resets the size of all files.
|
||||
pub(crate) fn reset_manifest_size(&mut self) {
|
||||
self.manifest_size_map.write().unwrap().clear();
|
||||
self.total_manifest_size.store(0, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Set the size of the delta file by delta version.
|
||||
pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) {
|
||||
let mut m = self.manifest_size_map.write().unwrap();
|
||||
@@ -585,7 +615,7 @@ impl ManifestObjectStore {
|
||||
}
|
||||
|
||||
/// Set the size of the checkpoint file by checkpoint version.
|
||||
fn set_checkpoint_file_size(&self, version: ManifestVersion, size: u64) {
|
||||
pub(crate) fn set_checkpoint_file_size(&self, version: ManifestVersion, size: u64) {
|
||||
let mut m = self.manifest_size_map.write().unwrap();
|
||||
m.insert(FileKey::Checkpoint(version), size);
|
||||
|
||||
@@ -595,6 +625,7 @@ impl ManifestObjectStore {
|
||||
fn unset_file_size(&self, key: &FileKey) {
|
||||
let mut m = self.manifest_size_map.write().unwrap();
|
||||
if let Some(val) = m.remove(key) {
|
||||
debug!("Unset file size: {:?}, size: {}", key, val);
|
||||
self.dec_total_manifest_size(val);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,6 +44,18 @@ async fn build_manager(
|
||||
(env, manager)
|
||||
}
|
||||
|
||||
async fn build_manager_with_initial_metadata(
|
||||
env: &TestEnv,
|
||||
checkpoint_distance: u64,
|
||||
compress_type: CompressionType,
|
||||
) -> RegionManifestManager {
|
||||
let metadata = Arc::new(basic_region_metadata());
|
||||
env.create_manifest_manager(compress_type, checkpoint_distance, Some(metadata.clone()))
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
async fn reopen_manager(
|
||||
env: &TestEnv,
|
||||
checkpoint_distance: u64,
|
||||
@@ -265,4 +277,142 @@ async fn generate_checkpoint_with_compression_types(
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.0
|
||||
}
|
||||
|
||||
fn generate_action_lists(num: usize) -> (Vec<FileId>, Vec<RegionMetaActionList>) {
|
||||
let mut files = vec![];
|
||||
let mut actions = vec![];
|
||||
for _ in 0..num {
|
||||
let file_id = FileId::random();
|
||||
files.push(file_id);
|
||||
let file_meta = FileMeta {
|
||||
region_id: RegionId::new(123, 456),
|
||||
file_id,
|
||||
time_range: (0.into(), 10000000.into()),
|
||||
level: 0,
|
||||
file_size: 1024000,
|
||||
available_indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
num_rows: 0,
|
||||
num_row_groups: 0,
|
||||
sequence: None,
|
||||
};
|
||||
let action = RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
|
||||
files_to_add: vec![file_meta],
|
||||
files_to_remove: vec![],
|
||||
compaction_time_window: None,
|
||||
flushed_entry_id: None,
|
||||
flushed_sequence: None,
|
||||
})]);
|
||||
actions.push(action);
|
||||
}
|
||||
(files, actions)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn manifest_install_manifest_to() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (env, mut manager) = build_manager(0, CompressionType::Uncompressed).await;
|
||||
let (files, actions) = generate_action_lists(10);
|
||||
for action in actions {
|
||||
manager.update(action).await.unwrap();
|
||||
}
|
||||
|
||||
// Nothing to install
|
||||
let target_version = manager.manifest().manifest_version;
|
||||
let installed_version = manager.install_manifest_to(target_version).await.unwrap();
|
||||
assert_eq!(target_version, installed_version);
|
||||
|
||||
let mut another_manager =
|
||||
build_manager_with_initial_metadata(&env, 0, CompressionType::Uncompressed).await;
|
||||
|
||||
// install manifest changes
|
||||
let target_version = manager.manifest().manifest_version;
|
||||
let installed_version = another_manager
|
||||
.install_manifest_to(target_version - 1)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(target_version - 1, installed_version);
|
||||
for file_id in files[0..9].iter() {
|
||||
assert!(another_manager.manifest().files.contains_key(file_id));
|
||||
}
|
||||
|
||||
let installed_version = another_manager
|
||||
.install_manifest_to(target_version)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(target_version, installed_version);
|
||||
for file_id in files.iter() {
|
||||
assert!(another_manager.manifest().files.contains_key(file_id));
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn manifest_install_manifest_to_with_checkpoint() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (env, mut manager) = build_manager(3, CompressionType::Uncompressed).await;
|
||||
let (files, actions) = generate_action_lists(10);
|
||||
for action in actions {
|
||||
manager.update(action).await.unwrap();
|
||||
|
||||
while manager.checkpointer().is_doing_checkpoint() {
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
}
|
||||
|
||||
// has checkpoint
|
||||
assert!(manager
|
||||
.store()
|
||||
.load_last_checkpoint()
|
||||
.await
|
||||
.unwrap()
|
||||
.is_some());
|
||||
|
||||
// check files
|
||||
let mut expected = vec![
|
||||
"/",
|
||||
"00000000000000000006.checkpoint",
|
||||
"00000000000000000007.json",
|
||||
"00000000000000000008.json",
|
||||
"00000000000000000009.checkpoint",
|
||||
"00000000000000000009.json",
|
||||
"00000000000000000010.json",
|
||||
"_last_checkpoint",
|
||||
];
|
||||
expected.sort_unstable();
|
||||
let mut paths = manager
|
||||
.store()
|
||||
.get_paths(|e| Some(e.name().to_string()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
paths.sort_unstable();
|
||||
assert_eq!(expected, paths);
|
||||
|
||||
let mut another_manager =
|
||||
build_manager_with_initial_metadata(&env, 0, CompressionType::Uncompressed).await;
|
||||
|
||||
// Install 9 manifests
|
||||
let target_version = manager.manifest().manifest_version;
|
||||
let installed_version = another_manager
|
||||
.install_manifest_to(target_version - 1)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(target_version - 1, installed_version);
|
||||
for file_id in files[0..9].iter() {
|
||||
assert!(another_manager.manifest().files.contains_key(file_id));
|
||||
}
|
||||
|
||||
// Install all manifests
|
||||
let target_version = manager.manifest().manifest_version;
|
||||
let installed_version = another_manager
|
||||
.install_manifest_to(target_version)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(target_version, installed_version);
|
||||
for file_id in files.iter() {
|
||||
assert!(another_manager.manifest().files.contains_key(file_id));
|
||||
}
|
||||
assert_eq!(4217, another_manager.store().total_manifest_size());
|
||||
}
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
//! Mito region.
|
||||
|
||||
pub(crate) mod opener;
|
||||
pub mod opener;
|
||||
pub mod options;
|
||||
pub(crate) mod version;
|
||||
|
||||
|
||||
@@ -16,7 +16,7 @@
|
||||
|
||||
use std::any::TypeId;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::atomic::AtomicI64;
|
||||
use std::sync::atomic::{AtomicI64, AtomicU64};
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::{debug, error, info, warn};
|
||||
@@ -30,7 +30,9 @@ use object_store::util::{join_dir, normalize_dir};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::logstore::provider::Provider;
|
||||
use store_api::logstore::LogStore;
|
||||
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
|
||||
use store_api::metadata::{
|
||||
ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
|
||||
};
|
||||
use store_api::region_engine::RegionRole;
|
||||
use store_api::storage::{ColumnId, RegionId};
|
||||
|
||||
@@ -42,6 +44,7 @@ use crate::error::{
|
||||
EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu,
|
||||
Result, StaleLogEntrySnafu,
|
||||
};
|
||||
use crate::manifest::action::RegionManifest;
|
||||
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
|
||||
use crate::manifest::storage::manifest_compress_type;
|
||||
use crate::memtable::time_partition::TimePartitions;
|
||||
@@ -207,11 +210,16 @@ impl RegionOpener {
|
||||
}
|
||||
// Safety: must be set before calling this method.
|
||||
let options = self.options.take().unwrap();
|
||||
let object_store = self.object_store(&options.storage)?.clone();
|
||||
let object_store = get_object_store(&options.storage, &self.object_store_manager)?;
|
||||
let provider = self.provider::<S>(&options.wal_options)?;
|
||||
let metadata = Arc::new(metadata);
|
||||
// Create a manifest manager for this region and writes regions to the manifest file.
|
||||
let region_manifest_options = self.manifest_options(config, &options)?;
|
||||
let region_manifest_options = Self::manifest_options(
|
||||
config,
|
||||
&options,
|
||||
&self.region_dir,
|
||||
&self.object_store_manager,
|
||||
)?;
|
||||
let manifest_manager = RegionManifestManager::new(
|
||||
metadata.clone(),
|
||||
region_manifest_options,
|
||||
@@ -334,7 +342,12 @@ impl RegionOpener {
|
||||
) -> Result<Option<MitoRegion>> {
|
||||
let region_options = self.options.as_ref().unwrap().clone();
|
||||
|
||||
let region_manifest_options = self.manifest_options(config, ®ion_options)?;
|
||||
let region_manifest_options = Self::manifest_options(
|
||||
config,
|
||||
®ion_options,
|
||||
&self.region_dir,
|
||||
&self.object_store_manager,
|
||||
)?;
|
||||
let Some(manifest_manager) = RegionManifestManager::open(
|
||||
region_manifest_options,
|
||||
self.stats.total_manifest_size.clone(),
|
||||
@@ -354,7 +367,7 @@ impl RegionOpener {
|
||||
.take()
|
||||
.unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
|
||||
let on_region_opened = wal.on_region_opened();
|
||||
let object_store = self.object_store(®ion_options.storage)?.clone();
|
||||
let object_store = get_object_store(®ion_options.storage, &self.object_store_manager)?;
|
||||
|
||||
debug!("Open region {} with options: {:?}", region_id, self.options);
|
||||
|
||||
@@ -444,13 +457,14 @@ impl RegionOpener {
|
||||
|
||||
/// Returns a new manifest options.
|
||||
fn manifest_options(
|
||||
&self,
|
||||
config: &MitoConfig,
|
||||
options: &RegionOptions,
|
||||
region_dir: &str,
|
||||
object_store_manager: &ObjectStoreManagerRef,
|
||||
) -> Result<RegionManifestOptions> {
|
||||
let object_store = self.object_store(&options.storage)?.clone();
|
||||
let object_store = get_object_store(&options.storage, object_store_manager)?;
|
||||
Ok(RegionManifestOptions {
|
||||
manifest_dir: new_manifest_dir(&self.region_dir),
|
||||
manifest_dir: new_manifest_dir(region_dir),
|
||||
object_store,
|
||||
// We don't allow users to set the compression algorithm as we use it as a file suffix.
|
||||
// Currently, the manifest storage doesn't have good support for changing compression algorithms.
|
||||
@@ -458,20 +472,72 @@ impl RegionOpener {
|
||||
checkpoint_distance: config.manifest_checkpoint_distance,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns an object store corresponding to `name`. If `name` is `None`, this method returns the default object store.
|
||||
fn object_store(&self, name: &Option<String>) -> Result<&object_store::ObjectStore> {
|
||||
if let Some(name) = name {
|
||||
Ok(self
|
||||
.object_store_manager
|
||||
.find(name)
|
||||
.context(ObjectStoreNotFoundSnafu {
|
||||
object_store: name.to_string(),
|
||||
})?)
|
||||
} else {
|
||||
Ok(self.object_store_manager.default_object_store())
|
||||
/// Returns an object store corresponding to `name`. If `name` is `None`, this method returns the default object store.
|
||||
pub fn get_object_store(
|
||||
name: &Option<String>,
|
||||
object_store_manager: &ObjectStoreManagerRef,
|
||||
) -> Result<object_store::ObjectStore> {
|
||||
if let Some(name) = name {
|
||||
Ok(object_store_manager
|
||||
.find(name)
|
||||
.with_context(|| ObjectStoreNotFoundSnafu {
|
||||
object_store: name.to_string(),
|
||||
})?
|
||||
.clone())
|
||||
} else {
|
||||
Ok(object_store_manager.default_object_store().clone())
|
||||
}
|
||||
}
|
||||
|
||||
/// A loader for loading metadata from a region dir.
|
||||
pub struct RegionMetadataLoader {
|
||||
config: Arc<MitoConfig>,
|
||||
object_store_manager: ObjectStoreManagerRef,
|
||||
}
|
||||
|
||||
impl RegionMetadataLoader {
|
||||
/// Creates a new `RegionOpenerBuilder`.
|
||||
pub fn new(config: Arc<MitoConfig>, object_store_manager: ObjectStoreManagerRef) -> Self {
|
||||
Self {
|
||||
config,
|
||||
object_store_manager,
|
||||
}
|
||||
}
|
||||
|
||||
/// Loads the metadata of the region from the region dir.
|
||||
pub async fn load(
|
||||
&self,
|
||||
region_dir: &str,
|
||||
region_options: &RegionOptions,
|
||||
) -> Result<Option<RegionMetadataRef>> {
|
||||
let manifest = self.load_manifest(region_dir, region_options).await?;
|
||||
Ok(manifest.map(|m| m.metadata.clone()))
|
||||
}
|
||||
|
||||
/// Loads the manifest of the region from the region dir.
|
||||
pub async fn load_manifest(
|
||||
&self,
|
||||
region_dir: &str,
|
||||
region_options: &RegionOptions,
|
||||
) -> Result<Option<Arc<RegionManifest>>> {
|
||||
let region_manifest_options = RegionOpener::manifest_options(
|
||||
&self.config,
|
||||
region_options,
|
||||
region_dir,
|
||||
&self.object_store_manager,
|
||||
)?;
|
||||
let Some(manifest_manager) =
|
||||
RegionManifestManager::open(region_manifest_options, Arc::new(AtomicU64::new(0)))
|
||||
.await?
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let manifest = manifest_manager.manifest();
|
||||
Ok(Some(manifest))
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks whether the recovered region has the same schema as region to create.
|
||||
|
||||
@@ -33,6 +33,8 @@ use crate::row_converter::dense::SortField;
|
||||
use crate::row_converter::{CompositeValues, PrimaryKeyCodec, PrimaryKeyFilter};
|
||||
|
||||
/// A codec for sparse key of metrics.
|
||||
/// It requires the input primary key columns are sorted by the column name in lexicographical order.
|
||||
/// It encodes the column id of the physical region.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct SparsePrimaryKeyCodec {
|
||||
inner: Arc<SparsePrimaryKeyCodecInner>,
|
||||
|
||||
@@ -16,9 +16,9 @@ pub(crate) mod bloom_filter;
|
||||
mod codec;
|
||||
pub(crate) mod fulltext_index;
|
||||
mod indexer;
|
||||
pub(crate) mod intermediate;
|
||||
pub mod intermediate;
|
||||
pub(crate) mod inverted_index;
|
||||
pub(crate) mod puffin_manager;
|
||||
pub mod puffin_manager;
|
||||
mod statistics;
|
||||
pub(crate) mod store;
|
||||
|
||||
|
||||
@@ -49,6 +49,11 @@ impl IntermediateManager {
|
||||
/// Create a new `IntermediateManager` with the given root path.
|
||||
/// It will clean up all garbage intermediate files from previous runs.
|
||||
pub async fn init_fs(aux_path: impl AsRef<str>) -> Result<Self> {
|
||||
common_telemetry::info!(
|
||||
"Initializing intermediate manager, aux_path: {}",
|
||||
aux_path.as_ref()
|
||||
);
|
||||
|
||||
let store = new_fs_cache_store(&normalize_dir(aux_path.as_ref())).await?;
|
||||
let store = InstrumentedStore::new(store);
|
||||
|
||||
|
||||
@@ -61,6 +61,7 @@ impl Default for WriteOptions {
|
||||
}
|
||||
|
||||
/// Parquet SST info returned by the writer.
|
||||
#[derive(Debug)]
|
||||
pub struct SstInfo {
|
||||
/// SST file id.
|
||||
pub file_id: FileId,
|
||||
|
||||
@@ -28,7 +28,7 @@ use api::v1::{
|
||||
use catalog::CatalogManagerRef;
|
||||
use client::{OutputData, OutputMeta};
|
||||
use common_catalog::consts::{
|
||||
default_engine, PARENT_SPAN_ID_COLUMN, SPAN_NAME_COLUMN, TRACE_ID_COLUMN,
|
||||
default_engine, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, TRACE_ID_COLUMN,
|
||||
};
|
||||
use common_grpc_expr::util::ColumnExpr;
|
||||
use common_meta::cache::TableFlownodeSetCacheRef;
|
||||
@@ -54,7 +54,10 @@ use store_api::metric_engine_consts::{
|
||||
use store_api::mito_engine_options::{APPEND_MODE_KEY, MERGE_MODE_KEY};
|
||||
use store_api::storage::{RegionId, TableId};
|
||||
use table::metadata::TableInfo;
|
||||
use table::requests::{InsertRequest as TableInsertRequest, AUTO_CREATE_TABLE_KEY, TTL_KEY};
|
||||
use table::requests::{
|
||||
InsertRequest as TableInsertRequest, AUTO_CREATE_TABLE_KEY, TABLE_DATA_MODEL,
|
||||
TABLE_DATA_MODEL_TRACE_V1, TTL_KEY,
|
||||
};
|
||||
use table::table_reference::TableReference;
|
||||
use table::TableRef;
|
||||
|
||||
@@ -578,7 +581,8 @@ impl Inserter {
|
||||
// - trace_id: when searching by trace id
|
||||
// - parent_span_id: when searching root span
|
||||
// - span_name: when searching certain types of span
|
||||
let index_columns = [TRACE_ID_COLUMN, PARENT_SPAN_ID_COLUMN, SPAN_NAME_COLUMN];
|
||||
let index_columns =
|
||||
[TRACE_ID_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN];
|
||||
for index_column in index_columns {
|
||||
if let Some(col) = create_table
|
||||
.column_defs
|
||||
@@ -595,6 +599,12 @@ impl Inserter {
|
||||
}
|
||||
}
|
||||
|
||||
// use table_options to mark table model version
|
||||
create_table.table_options.insert(
|
||||
TABLE_DATA_MODEL.to_string(),
|
||||
TABLE_DATA_MODEL_TRACE_V1.to_string(),
|
||||
);
|
||||
|
||||
let table = self
|
||||
.create_physical_table(
|
||||
create_table,
|
||||
|
||||
@@ -13,7 +13,8 @@
|
||||
// limitations under the License.
|
||||
|
||||
use criterion::{black_box, criterion_group, criterion_main, Criterion};
|
||||
use pipeline::{json_to_intermediate_state, parse, Content, GreptimeTransformer, Pipeline, Result};
|
||||
use pipeline::error::Result;
|
||||
use pipeline::{json_to_intermediate_state, parse, Content, GreptimeTransformer, Pipeline};
|
||||
use serde_json::{Deserializer, Value};
|
||||
|
||||
fn processor_mut(
|
||||
|
||||
@@ -16,7 +16,7 @@ use common_telemetry::debug;
|
||||
use snafu::OptionExt;
|
||||
use yaml_rust::Yaml;
|
||||
|
||||
use crate::etl::error::{
|
||||
use crate::error::{
|
||||
Error, FieldRequiredForDispatcherSnafu, Result, TableSuffixRequiredForDispatcherRuleSnafu,
|
||||
ValueRequiredForDispatcherRuleSnafu,
|
||||
};
|
||||
|
||||
@@ -17,6 +17,7 @@ use std::any::Any;
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_macro::stack_trace_debug;
|
||||
use datatypes::timestamp::TimestampNanosecond;
|
||||
use snafu::{Location, Snafu};
|
||||
|
||||
#[derive(Snafu)]
|
||||
@@ -51,7 +52,7 @@ pub enum Error {
|
||||
#[snafu(display("Processor {processor}: expect string value, but got {v:?}"))]
|
||||
ProcessorExpectString {
|
||||
processor: String,
|
||||
v: crate::etl::Value,
|
||||
v: crate::Value,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
@@ -607,13 +608,197 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Pipeline table not found"))]
|
||||
PipelineTableNotFound {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to insert pipeline to pipelines table"))]
|
||||
InsertPipeline {
|
||||
#[snafu(source)]
|
||||
source: operator::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Pipeline not found, name: {}, version: {}", name, version.map(|ts| ts.0.to_iso8601_string()).unwrap_or("latest".to_string())))]
|
||||
PipelineNotFound {
|
||||
name: String,
|
||||
version: Option<TimestampNanosecond>,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to collect record batch"))]
|
||||
CollectRecords {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
#[snafu(source)]
|
||||
source: common_recordbatch::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to cast type, msg: {}", msg))]
|
||||
CastType {
|
||||
msg: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to build DataFusion logical plan"))]
|
||||
BuildDfLogicalPlan {
|
||||
#[snafu(source)]
|
||||
error: datafusion_common::DataFusionError,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to execute internal statement"))]
|
||||
ExecuteInternalStatement {
|
||||
#[snafu(source)]
|
||||
source: query::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to create dataframe"))]
|
||||
DataFrame {
|
||||
#[snafu(source)]
|
||||
source: query::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("General catalog error"))]
|
||||
Catalog {
|
||||
#[snafu(source)]
|
||||
source: catalog::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to create table"))]
|
||||
CreateTable {
|
||||
#[snafu(source)]
|
||||
source: operator::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid pipeline version format: {}", version))]
|
||||
InvalidPipelineVersion {
|
||||
version: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
impl ErrorExt for Error {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
StatusCode::InvalidArguments
|
||||
use Error::*;
|
||||
match self {
|
||||
CastType { .. } => StatusCode::Unexpected,
|
||||
PipelineTableNotFound { .. } => StatusCode::TableNotFound,
|
||||
InsertPipeline { source, .. } => source.status_code(),
|
||||
CollectRecords { source, .. } => source.status_code(),
|
||||
PipelineNotFound { .. } | InvalidPipelineVersion { .. } => StatusCode::InvalidArguments,
|
||||
BuildDfLogicalPlan { .. } => StatusCode::Internal,
|
||||
ExecuteInternalStatement { source, .. } => source.status_code(),
|
||||
DataFrame { source, .. } => source.status_code(),
|
||||
Catalog { source, .. } => source.status_code(),
|
||||
CreateTable { source, .. } => source.status_code(),
|
||||
|
||||
EmptyInputField { .. }
|
||||
| MissingInputField { .. }
|
||||
| ProcessorMustBeMap { .. }
|
||||
| ProcessorMissingField { .. }
|
||||
| ProcessorExpectString { .. }
|
||||
| ProcessorUnsupportedValue { .. }
|
||||
| ProcessorKeyMustBeString { .. }
|
||||
| ProcessorFailedToParseString { .. }
|
||||
| ProcessorMustHaveStringKey { .. }
|
||||
| UnsupportedProcessor { .. }
|
||||
| FieldMustBeType { .. }
|
||||
| FailedParseFieldFromString { .. }
|
||||
| FailedToParseIntKey { .. }
|
||||
| FailedToParseInt { .. }
|
||||
| FailedToParseFloatKey { .. }
|
||||
| IntermediateKeyIndex { .. }
|
||||
| CmcdMissingValue { .. }
|
||||
| CmcdMissingKey { .. }
|
||||
| KeyMustBeString { .. }
|
||||
| CsvRead { .. }
|
||||
| CsvNoRecord { .. }
|
||||
| CsvSeparatorName { .. }
|
||||
| CsvQuoteName { .. }
|
||||
| DateParseTimezone { .. }
|
||||
| DateParse { .. }
|
||||
| DateFailedToGetLocalTimezone { .. }
|
||||
| DateFailedToGetTimestamp { .. }
|
||||
| DateInvalidFormat { .. }
|
||||
| DissectInvalidPattern { .. }
|
||||
| DissectEmptyPattern { .. }
|
||||
| DissectSplitExceedsInput { .. }
|
||||
| DissectSplitNotMatchInput { .. }
|
||||
| DissectConsecutiveNames { .. }
|
||||
| DissectNoMatchingPattern { .. }
|
||||
| DissectModifierAlreadySet { .. }
|
||||
| DissectAppendOrderAlreadySet { .. }
|
||||
| DissectOrderOnlyAppend { .. }
|
||||
| DissectOrderOnlyAppendModifier { .. }
|
||||
| DissectEndModifierAlreadySet { .. }
|
||||
| EpochInvalidResolution { .. }
|
||||
| GsubPatternRequired { .. }
|
||||
| GsubReplacementRequired { .. }
|
||||
| Regex { .. }
|
||||
| JoinSeparatorRequired { .. }
|
||||
| LetterInvalidMethod { .. }
|
||||
| RegexNamedGroupNotFound { .. }
|
||||
| RegexNoValidField { .. }
|
||||
| RegexNoValidPattern { .. }
|
||||
| UrlEncodingInvalidMethod { .. }
|
||||
| DigestPatternInvalid { .. }
|
||||
| UrlEncodingDecode { .. }
|
||||
| TransformOnFailureInvalidValue { .. }
|
||||
| TransformElementMustBeMap { .. }
|
||||
| TransformTypeMustBeSet { .. }
|
||||
| TransformEmpty { .. }
|
||||
| TransformColumnNameMustBeUnique { .. }
|
||||
| TransformMultipleTimestampIndex { .. }
|
||||
| TransformTimestampIndexCount { .. }
|
||||
| CoerceUnsupportedNullType { .. }
|
||||
| CoerceUnsupportedNullTypeTo { .. }
|
||||
| CoerceUnsupportedEpochType { .. }
|
||||
| CoerceStringToType { .. }
|
||||
| CoerceJsonTypeTo { .. }
|
||||
| CoerceTypeToJson { .. }
|
||||
| CoerceIncompatibleTypes { .. }
|
||||
| ValueInvalidResolution { .. }
|
||||
| ValueParseType { .. }
|
||||
| ValueParseInt { .. }
|
||||
| ValueParseFloat { .. }
|
||||
| ValueParseBoolean { .. }
|
||||
| ValueDefaultValueUnsupported { .. }
|
||||
| ValueUnsupportedNumberType { .. }
|
||||
| ValueUnsupportedYamlType { .. }
|
||||
| ValueYamlKeyMustBeString { .. }
|
||||
| YamlLoad { .. }
|
||||
| YamlParse { .. }
|
||||
| PrepareValueMustBeObject { .. }
|
||||
| ColumnOptions { .. }
|
||||
| UnsupportedIndexType { .. }
|
||||
| UnsupportedNumberType { .. }
|
||||
| IdentifyPipelineColumnTypeMismatch { .. }
|
||||
| JsonPathParse { .. }
|
||||
| JsonPathParseResultIndex { .. }
|
||||
| FieldRequiredForDispatcher
|
||||
| TableSuffixRequiredForDispatcherRule
|
||||
| ValueRequiredForDispatcherRule
|
||||
| ReachedMaxNestedLevels { .. } => StatusCode::InvalidArguments,
|
||||
}
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
@@ -13,16 +13,11 @@
|
||||
// limitations under the License.
|
||||
|
||||
#![allow(dead_code)]
|
||||
|
||||
pub mod error;
|
||||
pub mod field;
|
||||
pub mod processor;
|
||||
pub mod transform;
|
||||
pub mod value;
|
||||
|
||||
use error::{
|
||||
IntermediateKeyIndexSnafu, PrepareValueMustBeObjectSnafu, YamlLoadSnafu, YamlParseSnafu,
|
||||
};
|
||||
use processor::{Processor, Processors};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use transform::{Transformer, Transforms};
|
||||
@@ -30,7 +25,9 @@ use value::Value;
|
||||
use yaml_rust::YamlLoader;
|
||||
|
||||
use crate::dispatcher::{Dispatcher, Rule};
|
||||
use crate::etl::error::Result;
|
||||
use crate::error::{
|
||||
IntermediateKeyIndexSnafu, PrepareValueMustBeObjectSnafu, Result, YamlLoadSnafu, YamlParseSnafu,
|
||||
};
|
||||
|
||||
const DESCRIPTION: &str = "description";
|
||||
const PROCESSORS: &str = "processors";
|
||||
|
||||
@@ -17,8 +17,7 @@ use std::str::FromStr;
|
||||
|
||||
use snafu::OptionExt;
|
||||
|
||||
use super::error::{EmptyInputFieldSnafu, MissingInputFieldSnafu};
|
||||
use crate::etl::error::{Error, Result};
|
||||
use crate::error::{EmptyInputFieldSnafu, Error, MissingInputFieldSnafu, Result};
|
||||
|
||||
/// Raw processor-defined inputs and outputs
|
||||
#[derive(Debug, Default, Clone)]
|
||||
|
||||
@@ -45,15 +45,13 @@ use snafu::{OptionExt, ResultExt};
|
||||
use timestamp::TimestampProcessor;
|
||||
use urlencoding::UrlEncodingProcessor;
|
||||
|
||||
use super::error::{
|
||||
FailedParseFieldFromStringSnafu, FieldMustBeTypeSnafu, ProcessorKeyMustBeStringSnafu,
|
||||
ProcessorMustBeMapSnafu, ProcessorMustHaveStringKeySnafu,
|
||||
};
|
||||
use super::field::{Field, Fields};
|
||||
use super::PipelineMap;
|
||||
use crate::etl::error::{Error, Result};
|
||||
use crate::error::{
|
||||
Error, FailedParseFieldFromStringSnafu, FieldMustBeTypeSnafu, ProcessorKeyMustBeStringSnafu,
|
||||
ProcessorMustBeMapSnafu, ProcessorMustHaveStringKeySnafu, Result, UnsupportedProcessorSnafu,
|
||||
};
|
||||
use crate::etl::processor::simple_extract::SimpleExtractProcessor;
|
||||
use crate::etl_error::UnsupportedProcessorSnafu;
|
||||
|
||||
const FIELD_NAME: &str = "field";
|
||||
const FIELDS_NAME: &str = "fields";
|
||||
|
||||
@@ -19,7 +19,7 @@
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use urlencoding::decode;
|
||||
|
||||
use crate::etl::error::{
|
||||
use crate::error::{
|
||||
CmcdMissingKeySnafu, CmcdMissingValueSnafu, Error, FailedToParseFloatKeySnafu,
|
||||
FailedToParseIntKeySnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
|
||||
ProcessorMissingFieldSnafu, Result,
|
||||
|
||||
@@ -19,7 +19,7 @@ use itertools::EitherOrBoth::{Both, Left, Right};
|
||||
use itertools::Itertools;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::etl::error::{
|
||||
use crate::error::{
|
||||
CsvNoRecordSnafu, CsvQuoteNameSnafu, CsvReadSnafu, CsvSeparatorNameSnafu, Error,
|
||||
KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
|
||||
};
|
||||
|
||||
@@ -19,7 +19,7 @@ use chrono_tz::Tz;
|
||||
use lazy_static::lazy_static;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::etl::error::{
|
||||
use crate::error::{
|
||||
DateFailedToGetLocalTimezoneSnafu, DateFailedToGetTimestampSnafu, DateParseSnafu,
|
||||
DateParseTimezoneSnafu, Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
|
||||
ProcessorFailedToParseStringSnafu, ProcessorMissingFieldSnafu, Result,
|
||||
|
||||
@@ -22,7 +22,7 @@ use once_cell::sync::Lazy;
|
||||
use regex::Regex;
|
||||
use snafu::OptionExt;
|
||||
|
||||
use crate::etl::error::{
|
||||
use crate::error::{
|
||||
Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
|
||||
};
|
||||
use crate::etl::field::Fields;
|
||||
|
||||
@@ -24,8 +24,9 @@ use std::borrow::Cow;
|
||||
use regex::Regex;
|
||||
use snafu::OptionExt;
|
||||
|
||||
use crate::etl::error::{
|
||||
Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
|
||||
use crate::error::{
|
||||
DigestPatternInvalidSnafu, Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
|
||||
ProcessorMissingFieldSnafu, Result,
|
||||
};
|
||||
use crate::etl::field::Fields;
|
||||
use crate::etl::processor::{
|
||||
@@ -33,7 +34,6 @@ use crate::etl::processor::{
|
||||
};
|
||||
use crate::etl::value::Value;
|
||||
use crate::etl::PipelineMap;
|
||||
use crate::etl_error::DigestPatternInvalidSnafu;
|
||||
|
||||
pub(crate) const PROCESSOR_DIGEST: &str = "digest";
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
|
||||
use itertools::Itertools;
|
||||
use snafu::OptionExt;
|
||||
|
||||
use crate::etl::error::{
|
||||
use crate::error::{
|
||||
DissectAppendOrderAlreadySetSnafu, DissectConsecutiveNamesSnafu, DissectEmptyPatternSnafu,
|
||||
DissectEndModifierAlreadySetSnafu, DissectInvalidPatternSnafu, DissectModifierAlreadySetSnafu,
|
||||
DissectNoMatchingPatternSnafu, DissectOrderOnlyAppendModifierSnafu,
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::etl::error::{
|
||||
use crate::error::{
|
||||
EpochInvalidResolutionSnafu, Error, FailedToParseIntSnafu, KeyMustBeStringSnafu,
|
||||
ProcessorMissingFieldSnafu, ProcessorUnsupportedValueSnafu, Result,
|
||||
};
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
use regex::Regex;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::etl::error::{
|
||||
use crate::error::{
|
||||
Error, GsubPatternRequiredSnafu, GsubReplacementRequiredSnafu, KeyMustBeStringSnafu,
|
||||
ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, RegexSnafu, Result,
|
||||
};
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
use snafu::OptionExt;
|
||||
|
||||
use crate::etl::error::{
|
||||
use crate::error::{
|
||||
Error, JoinSeparatorRequiredSnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
|
||||
ProcessorMissingFieldSnafu, Result,
|
||||
};
|
||||
|
||||
@@ -19,12 +19,11 @@ use super::{
|
||||
yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, PipelineMap, Processor, FIELDS_NAME,
|
||||
FIELD_NAME, IGNORE_MISSING_NAME, JSON_PATH_NAME, JSON_PATH_RESULT_INDEX_NAME,
|
||||
};
|
||||
use crate::etl::error::{Error, Result};
|
||||
use crate::etl::field::Fields;
|
||||
use crate::etl_error::{
|
||||
JsonPathParseResultIndexSnafu, JsonPathParseSnafu, KeyMustBeStringSnafu,
|
||||
ProcessorMissingFieldSnafu,
|
||||
use crate::error::{
|
||||
Error, JsonPathParseResultIndexSnafu, JsonPathParseSnafu, KeyMustBeStringSnafu,
|
||||
ProcessorMissingFieldSnafu, Result,
|
||||
};
|
||||
use crate::etl::field::Fields;
|
||||
use crate::Value;
|
||||
|
||||
pub(crate) const PROCESSOR_JSON_PATH: &str = "json_path";
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
use snafu::OptionExt;
|
||||
|
||||
use crate::etl::error::{
|
||||
use crate::error::{
|
||||
Error, KeyMustBeStringSnafu, LetterInvalidMethodSnafu, ProcessorExpectStringSnafu,
|
||||
ProcessorMissingFieldSnafu, Result,
|
||||
};
|
||||
|
||||
@@ -22,7 +22,7 @@ use lazy_static::lazy_static;
|
||||
use regex::Regex;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::etl::error::{
|
||||
use crate::error::{
|
||||
Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu,
|
||||
RegexNamedGroupNotFoundSnafu, RegexNoValidFieldSnafu, RegexNoValidPatternSnafu, RegexSnafu,
|
||||
Result,
|
||||
|
||||
@@ -14,13 +14,12 @@
|
||||
|
||||
use snafu::OptionExt as _;
|
||||
|
||||
use crate::etl::error::{Error, Result};
|
||||
use crate::error::{Error, KeyMustBeStringSnafu, ProcessorMissingFieldSnafu, Result};
|
||||
use crate::etl::field::Fields;
|
||||
use crate::etl::processor::{
|
||||
yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME,
|
||||
IGNORE_MISSING_NAME, SIMPLE_EXTRACT_KEY_NAME,
|
||||
};
|
||||
use crate::etl_error::{KeyMustBeStringSnafu, ProcessorMissingFieldSnafu};
|
||||
use crate::{PipelineMap, Processor, Value};
|
||||
|
||||
pub(crate) const PROCESSOR_SIMPLE_EXTRACT: &str = "simple_extract";
|
||||
|
||||
@@ -19,7 +19,7 @@ use chrono_tz::Tz;
|
||||
use lazy_static::lazy_static;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::etl::error::{
|
||||
use crate::error::{
|
||||
DateFailedToGetLocalTimezoneSnafu, DateFailedToGetTimestampSnafu, DateInvalidFormatSnafu,
|
||||
DateParseSnafu, DateParseTimezoneSnafu, EpochInvalidResolutionSnafu, Error,
|
||||
KeyMustBeStringSnafu, ProcessorFailedToParseStringSnafu, ProcessorMissingFieldSnafu,
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use urlencoding::{decode, encode};
|
||||
|
||||
use crate::etl::error::{
|
||||
use crate::error::{
|
||||
Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
|
||||
UrlEncodingDecodeSnafu, UrlEncodingInvalidMethodSnafu,
|
||||
};
|
||||
|
||||
@@ -17,7 +17,14 @@ pub mod transformer;
|
||||
|
||||
use snafu::OptionExt;
|
||||
|
||||
use crate::etl::error::{Error, Result};
|
||||
use super::field::Fields;
|
||||
use super::processor::{yaml_new_field, yaml_new_fields, yaml_string};
|
||||
use super::value::Timestamp;
|
||||
use super::PipelineMap;
|
||||
use crate::error::{
|
||||
Error, KeyMustBeStringSnafu, Result, TransformElementMustBeMapSnafu,
|
||||
TransformOnFailureInvalidValueSnafu, TransformTypeMustBeSetSnafu,
|
||||
};
|
||||
use crate::etl::processor::yaml_bool;
|
||||
use crate::etl::transform::index::Index;
|
||||
use crate::etl::value::Value;
|
||||
@@ -32,15 +39,6 @@ const TRANSFORM_ON_FAILURE: &str = "on_failure";
|
||||
|
||||
pub use transformer::greptime::GreptimeTransformer;
|
||||
|
||||
use super::error::{
|
||||
KeyMustBeStringSnafu, TransformElementMustBeMapSnafu, TransformOnFailureInvalidValueSnafu,
|
||||
TransformTypeMustBeSetSnafu,
|
||||
};
|
||||
use super::field::Fields;
|
||||
use super::processor::{yaml_new_field, yaml_new_fields, yaml_string};
|
||||
use super::value::Timestamp;
|
||||
use super::PipelineMap;
|
||||
|
||||
pub trait Transformer: std::fmt::Debug + Sized + Send + Sync + 'static {
|
||||
type Output;
|
||||
type VecOutput;
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::etl::error::{Error, Result, UnsupportedIndexTypeSnafu};
|
||||
use crate::error::{Error, Result, UnsupportedIndexTypeSnafu};
|
||||
|
||||
const INDEX_TIMESTAMP: &str = "timestamp";
|
||||
const INDEX_TIMEINDEX: &str = "time";
|
||||
|
||||
@@ -27,7 +27,7 @@ use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
|
||||
use itertools::Itertools;
|
||||
use serde_json::Number;
|
||||
|
||||
use crate::etl::error::{
|
||||
use crate::error::{
|
||||
IdentifyPipelineColumnTypeMismatchSnafu, ReachedMaxNestedLevelsSnafu, Result,
|
||||
TransformColumnNameMustBeUniqueSnafu, TransformEmptySnafu,
|
||||
TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu,
|
||||
|
||||
@@ -20,7 +20,7 @@ use greptime_proto::v1::value::ValueData;
|
||||
use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType};
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::etl::error::{
|
||||
use crate::error::{
|
||||
CoerceIncompatibleTypesSnafu, CoerceJsonTypeToSnafu, CoerceStringToTypeSnafu,
|
||||
CoerceTypeToJsonSnafu, CoerceUnsupportedEpochTypeSnafu, CoerceUnsupportedNullTypeSnafu,
|
||||
CoerceUnsupportedNullTypeToSnafu, ColumnOptionsSnafu, Error, Result,
|
||||
|
||||
@@ -28,13 +28,12 @@ use regex::Regex;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
pub use time::Timestamp;
|
||||
|
||||
use super::error::{
|
||||
ValueDefaultValueUnsupportedSnafu, ValueInvalidResolutionSnafu, ValueParseBooleanSnafu,
|
||||
ValueParseFloatSnafu, ValueParseIntSnafu, ValueParseTypeSnafu, ValueUnsupportedNumberTypeSnafu,
|
||||
ValueUnsupportedYamlTypeSnafu, ValueYamlKeyMustBeStringSnafu,
|
||||
};
|
||||
use super::PipelineMap;
|
||||
use crate::etl::error::{Error, Result};
|
||||
use crate::error::{
|
||||
Error, Result, ValueDefaultValueUnsupportedSnafu, ValueInvalidResolutionSnafu,
|
||||
ValueParseBooleanSnafu, ValueParseFloatSnafu, ValueParseIntSnafu, ValueParseTypeSnafu,
|
||||
ValueUnsupportedNumberTypeSnafu, ValueUnsupportedYamlTypeSnafu, ValueYamlKeyMustBeStringSnafu,
|
||||
};
|
||||
|
||||
/// Value can be used as type
|
||||
/// acts as value: the enclosed value is the actual value
|
||||
|
||||
@@ -13,22 +13,22 @@
|
||||
// limitations under the License.
|
||||
|
||||
mod dispatcher;
|
||||
pub mod error;
|
||||
mod etl;
|
||||
mod manager;
|
||||
mod metrics;
|
||||
|
||||
pub use etl::error::Result;
|
||||
pub use etl::processor::Processor;
|
||||
pub use etl::transform::transformer::greptime::{GreptimePipelineParams, SchemaInfo};
|
||||
pub use etl::transform::transformer::identity_pipeline;
|
||||
pub use etl::transform::{GreptimeTransformer, Transformer};
|
||||
pub use etl::value::{Array, Map, Value};
|
||||
pub use etl::{
|
||||
error as etl_error, json_array_to_intermediate_state, json_to_intermediate_state, parse,
|
||||
Content, DispatchedTo, Pipeline, PipelineExecOutput, PipelineMap,
|
||||
json_array_to_intermediate_state, json_to_intermediate_state, parse, Content, DispatchedTo,
|
||||
Pipeline, PipelineExecOutput, PipelineMap,
|
||||
};
|
||||
pub use manager::{
|
||||
error, pipeline_operator, table, util, PipelineDefinition, PipelineInfo, PipelineRef,
|
||||
pipeline_operator, table, util, PipelineDefinition, PipelineInfo, PipelineRef,
|
||||
PipelineTableRef, PipelineVersion, PipelineWay, SelectInfo,
|
||||
GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
|
||||
GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME,
|
||||
};
|
||||
|
||||
@@ -19,10 +19,10 @@ use datatypes::timestamp::TimestampNanosecond;
|
||||
use itertools::Itertools;
|
||||
use util::to_pipeline_version;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::table::PipelineTable;
|
||||
use crate::{GreptimeTransformer, Pipeline};
|
||||
|
||||
pub mod error;
|
||||
pub mod pipeline_operator;
|
||||
pub mod table;
|
||||
pub mod util;
|
||||
@@ -99,7 +99,7 @@ impl PipelineWay {
|
||||
name: Option<&str>,
|
||||
version: Option<&str>,
|
||||
default_pipeline: PipelineWay,
|
||||
) -> error::Result<PipelineWay> {
|
||||
) -> Result<PipelineWay> {
|
||||
if let Some(pipeline_name) = name {
|
||||
if pipeline_name == GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME {
|
||||
Ok(PipelineWay::OtlpTraceDirectV1)
|
||||
|
||||
@@ -1,153 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_macro::stack_trace_debug;
|
||||
use datatypes::timestamp::TimestampNanosecond;
|
||||
use snafu::{Location, Snafu};
|
||||
|
||||
#[derive(Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
#[stack_trace_debug]
|
||||
pub enum Error {
|
||||
#[snafu(display("Pipeline table not found"))]
|
||||
PipelineTableNotFound {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to insert pipeline to pipelines table"))]
|
||||
InsertPipeline {
|
||||
#[snafu(source)]
|
||||
source: operator::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to parse pipeline"))]
|
||||
CompilePipeline {
|
||||
#[snafu(source)]
|
||||
source: crate::etl::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Pipeline not found, name: {}, version: {}", name, version.map(|ts| ts.0.to_iso8601_string()).unwrap_or("latest".to_string())))]
|
||||
PipelineNotFound {
|
||||
name: String,
|
||||
version: Option<TimestampNanosecond>,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to collect record batch"))]
|
||||
CollectRecords {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
#[snafu(source)]
|
||||
source: common_recordbatch::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to cast type, msg: {}", msg))]
|
||||
CastType {
|
||||
msg: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to build DataFusion logical plan"))]
|
||||
BuildDfLogicalPlan {
|
||||
#[snafu(source)]
|
||||
error: datafusion_common::DataFusionError,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to execute internal statement"))]
|
||||
ExecuteInternalStatement {
|
||||
#[snafu(source)]
|
||||
source: query::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to create dataframe"))]
|
||||
DataFrame {
|
||||
#[snafu(source)]
|
||||
source: query::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("General catalog error"))]
|
||||
Catalog {
|
||||
#[snafu(source)]
|
||||
source: catalog::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to create table"))]
|
||||
CreateTable {
|
||||
#[snafu(source)]
|
||||
source: operator::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to execute pipeline"))]
|
||||
PipelineTransform {
|
||||
#[snafu(source)]
|
||||
source: crate::etl::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid pipeline version format: {}", version))]
|
||||
InvalidPipelineVersion {
|
||||
version: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
impl ErrorExt for Error {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
use Error::*;
|
||||
match self {
|
||||
CastType { .. } => StatusCode::Unexpected,
|
||||
PipelineTableNotFound { .. } => StatusCode::TableNotFound,
|
||||
InsertPipeline { source, .. } => source.status_code(),
|
||||
CollectRecords { source, .. } => source.status_code(),
|
||||
PipelineNotFound { .. }
|
||||
| CompilePipeline { .. }
|
||||
| PipelineTransform { .. }
|
||||
| InvalidPipelineVersion { .. } => StatusCode::InvalidArguments,
|
||||
BuildDfLogicalPlan { .. } => StatusCode::Internal,
|
||||
ExecuteInternalStatement { source, .. } => source.status_code(),
|
||||
DataFrame { source, .. } => source.status_code(),
|
||||
Catalog { source, .. } => source.status_code(),
|
||||
CreateTable { source, .. } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
@@ -41,9 +41,9 @@ use table::metadata::TableInfo;
|
||||
use table::TableRef;
|
||||
|
||||
use crate::error::{
|
||||
BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, CompilePipelineSnafu,
|
||||
DataFrameSnafu, ExecuteInternalStatementSnafu, InsertPipelineSnafu,
|
||||
InvalidPipelineVersionSnafu, PipelineNotFoundSnafu, Result,
|
||||
BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, DataFrameSnafu,
|
||||
ExecuteInternalStatementSnafu, InsertPipelineSnafu, InvalidPipelineVersionSnafu,
|
||||
PipelineNotFoundSnafu, Result,
|
||||
};
|
||||
use crate::etl::transform::GreptimeTransformer;
|
||||
use crate::etl::{parse, Content, Pipeline};
|
||||
@@ -204,7 +204,7 @@ impl PipelineTable {
|
||||
/// Compile a pipeline from a string.
|
||||
pub fn compile_pipeline(pipeline: &str) -> Result<Pipeline<GreptimeTransformer>> {
|
||||
let yaml_content = Content::Yaml(pipeline);
|
||||
parse::<GreptimeTransformer>(&yaml_content).context(CompilePipelineSnafu)
|
||||
parse::<GreptimeTransformer>(&yaml_content)
|
||||
}
|
||||
|
||||
/// Insert a pipeline into the pipeline table.
|
||||
|
||||
@@ -151,7 +151,7 @@ pub enum Error {
|
||||
#[snafu(display("Failed to describe statement"))]
|
||||
DescribeStatement { source: BoxedError },
|
||||
|
||||
#[snafu(display("Pipeline management api error"))]
|
||||
#[snafu(display("Pipeline error"))]
|
||||
Pipeline {
|
||||
#[snafu(source)]
|
||||
source: pipeline::error::Error,
|
||||
@@ -159,14 +159,6 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Pipeline transform error"))]
|
||||
PipelineTransform {
|
||||
#[snafu(source)]
|
||||
source: pipeline::etl_error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Not supported: {}", feat))]
|
||||
NotSupported { feat: String },
|
||||
|
||||
@@ -661,7 +653,6 @@ impl ErrorExt for Error {
|
||||
| CheckDatabaseValidity { source, .. } => source.status_code(),
|
||||
|
||||
Pipeline { source, .. } => source.status_code(),
|
||||
PipelineTransform { source, .. } => source.status_code(),
|
||||
|
||||
NotSupported { .. }
|
||||
| InvalidParameter { .. }
|
||||
|
||||
@@ -154,7 +154,7 @@ impl Default for HttpOptions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
addr: "127.0.0.1:4000".to_string(),
|
||||
timeout: Duration::from_secs(30),
|
||||
timeout: Duration::from_secs(0),
|
||||
disable_dashboard: false,
|
||||
body_limit: DEFAULT_BODY_LIMIT,
|
||||
is_strict_mode: false,
|
||||
@@ -1384,7 +1384,7 @@ mod test {
|
||||
fn test_http_options_default() {
|
||||
let default = HttpOptions::default();
|
||||
assert_eq!("127.0.0.1:4000".to_string(), default.addr);
|
||||
assert_eq!(Duration::from_secs(30), default.timeout)
|
||||
assert_eq!(Duration::from_secs(0), default.timeout)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -32,7 +32,6 @@ use common_telemetry::{error, warn};
|
||||
use datatypes::value::column_data_to_json;
|
||||
use headers::ContentType;
|
||||
use lazy_static::lazy_static;
|
||||
use pipeline::error::PipelineTransformSnafu;
|
||||
use pipeline::util::to_pipeline_version;
|
||||
use pipeline::{GreptimePipelineParams, GreptimeTransformer, PipelineDefinition, PipelineVersion};
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -284,9 +283,7 @@ async fn dryrun_pipeline_inner(
|
||||
&pipeline_handler,
|
||||
PipelineDefinition::Resolved(pipeline),
|
||||
¶ms,
|
||||
pipeline::json_array_to_intermediate_state(value)
|
||||
.context(PipelineTransformSnafu)
|
||||
.context(PipelineSnafu)?,
|
||||
pipeline::json_array_to_intermediate_state(value).context(PipelineSnafu)?,
|
||||
"dry_run".to_owned(),
|
||||
query_ctx,
|
||||
true,
|
||||
@@ -636,9 +633,7 @@ pub(crate) async fn ingest_logs_inner(
|
||||
&state,
|
||||
PipelineDefinition::from_name(&pipeline_name, version),
|
||||
&pipeline_params,
|
||||
pipeline::json_array_to_intermediate_state(request.values)
|
||||
.context(PipelineTransformSnafu)
|
||||
.context(PipelineSnafu)?,
|
||||
pipeline::json_array_to_intermediate_state(request.values).context(PipelineSnafu)?,
|
||||
request.table,
|
||||
&query_ctx,
|
||||
true,
|
||||
|
||||
@@ -23,7 +23,8 @@ use pipeline::{GreptimePipelineParams, SelectInfo};
|
||||
use crate::http::header::constants::{
|
||||
GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME, GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME,
|
||||
GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME, GREPTIME_LOG_TABLE_NAME_HEADER_NAME,
|
||||
GREPTIME_PIPELINE_PARAMS_HEADER, GREPTIME_TRACE_TABLE_NAME_HEADER_NAME,
|
||||
GREPTIME_PIPELINE_NAME_HEADER_NAME, GREPTIME_PIPELINE_PARAMS_HEADER,
|
||||
GREPTIME_PIPELINE_VERSION_HEADER_NAME, GREPTIME_TRACE_TABLE_NAME_HEADER_NAME,
|
||||
};
|
||||
|
||||
/// Axum extractor for optional target log table name from HTTP header
|
||||
@@ -38,7 +39,7 @@ where
|
||||
|
||||
async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
|
||||
let headers = &parts.headers;
|
||||
string_value_from_header(headers, GREPTIME_LOG_TABLE_NAME_HEADER_NAME).map(LogTableName)
|
||||
string_value_from_header(headers, &[GREPTIME_LOG_TABLE_NAME_HEADER_NAME]).map(LogTableName)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,7 +55,8 @@ where
|
||||
|
||||
async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
|
||||
let headers = &parts.headers;
|
||||
string_value_from_header(headers, GREPTIME_TRACE_TABLE_NAME_HEADER_NAME).map(TraceTableName)
|
||||
string_value_from_header(headers, &[GREPTIME_TRACE_TABLE_NAME_HEADER_NAME])
|
||||
.map(TraceTableName)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -71,7 +73,7 @@ where
|
||||
|
||||
async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
|
||||
let select =
|
||||
string_value_from_header(&parts.headers, GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME)?;
|
||||
string_value_from_header(&parts.headers, &[GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME])?;
|
||||
|
||||
match select {
|
||||
Some(name) => {
|
||||
@@ -102,12 +104,22 @@ where
|
||||
|
||||
async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
|
||||
let headers = &parts.headers;
|
||||
let pipeline_name =
|
||||
string_value_from_header(headers, GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME)?;
|
||||
let pipeline_version =
|
||||
string_value_from_header(headers, GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME)?;
|
||||
let pipeline_name = string_value_from_header(
|
||||
headers,
|
||||
&[
|
||||
GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME,
|
||||
GREPTIME_PIPELINE_NAME_HEADER_NAME,
|
||||
],
|
||||
)?;
|
||||
let pipeline_version = string_value_from_header(
|
||||
headers,
|
||||
&[
|
||||
GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME,
|
||||
GREPTIME_PIPELINE_VERSION_HEADER_NAME,
|
||||
],
|
||||
)?;
|
||||
let pipeline_parameters =
|
||||
string_value_from_header(headers, GREPTIME_PIPELINE_PARAMS_HEADER)?;
|
||||
string_value_from_header(headers, &[GREPTIME_PIPELINE_PARAMS_HEADER])?;
|
||||
|
||||
Ok(PipelineInfo {
|
||||
pipeline_name,
|
||||
@@ -120,17 +132,19 @@ where
|
||||
#[inline]
|
||||
fn string_value_from_header(
|
||||
headers: &HeaderMap,
|
||||
header_key: &str,
|
||||
header_keys: &[&str],
|
||||
) -> Result<Option<String>, (StatusCode, String)> {
|
||||
headers
|
||||
.get(header_key)
|
||||
.map(|value| {
|
||||
String::from_utf8(value.as_bytes().to_vec()).map_err(|_| {
|
||||
for header_key in header_keys {
|
||||
if let Some(value) = headers.get(*header_key) {
|
||||
return Some(String::from_utf8(value.as_bytes().to_vec()).map_err(|_| {
|
||||
(
|
||||
StatusCode::BAD_REQUEST,
|
||||
format!("`{}` header is not valid UTF-8 string type.", header_key),
|
||||
)
|
||||
})
|
||||
})
|
||||
.transpose()
|
||||
}))
|
||||
.transpose();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
@@ -45,8 +45,15 @@ pub mod constants {
|
||||
pub const GREPTIME_DB_HEADER_NAME: &str = "x-greptime-db-name";
|
||||
pub const GREPTIME_TIMEZONE_HEADER_NAME: &str = "x-greptime-timezone";
|
||||
pub const GREPTIME_DB_HEADER_ERROR_CODE: &str = common_error::GREPTIME_DB_HEADER_ERROR_CODE;
|
||||
|
||||
// Deprecated: pipeline is also used with trace, so we remove log from it.
|
||||
pub const GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME: &str = "x-greptime-log-pipeline-name";
|
||||
pub const GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME: &str = "x-greptime-log-pipeline-version";
|
||||
|
||||
// More generic pipeline header name
|
||||
pub const GREPTIME_PIPELINE_NAME_HEADER_NAME: &str = "x-greptime-pipeline-name";
|
||||
pub const GREPTIME_PIPELINE_VERSION_HEADER_NAME: &str = "x-greptime-pipeline-version";
|
||||
|
||||
pub const GREPTIME_LOG_TABLE_NAME_HEADER_NAME: &str = "x-greptime-log-table-name";
|
||||
pub const GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME: &str = "x-greptime-log-extract-keys";
|
||||
pub const GREPTIME_TRACE_TABLE_NAME_HEADER_NAME: &str = "x-greptime-trace-table-name";
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::hash_map::Entry::{Occupied, Vacant};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -26,8 +25,6 @@ use common_error::status_code::StatusCode;
|
||||
use common_query::{Output, OutputData};
|
||||
use common_recordbatch::util;
|
||||
use common_telemetry::{debug, error, tracing, warn};
|
||||
use datafusion_expr::{col, Expr};
|
||||
use lazy_static::lazy_static;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value as JsonValue;
|
||||
use session::context::{Channel, QueryContext};
|
||||
@@ -36,6 +33,7 @@ use snafu::{OptionExt, ResultExt};
|
||||
use crate::error::{
|
||||
status_code_to_http_status, CollectRecordbatchSnafu, Error, InvalidJaegerQuerySnafu, Result,
|
||||
};
|
||||
use crate::http::extractor::TraceTableName;
|
||||
use crate::http::HttpRecordsOutput;
|
||||
use crate::metrics::METRIC_JAEGER_QUERY_ELAPSED;
|
||||
use crate::otlp::trace::{
|
||||
@@ -47,43 +45,9 @@ use crate::otlp::trace::{
|
||||
};
|
||||
use crate::query_handler::JaegerQueryHandlerRef;
|
||||
|
||||
lazy_static! {
|
||||
pub static ref FIND_TRACES_COLS: Vec<Expr> = vec![
|
||||
col(TRACE_ID_COLUMN),
|
||||
col(TIMESTAMP_COLUMN),
|
||||
col(DURATION_NANO_COLUMN),
|
||||
col(SERVICE_NAME_COLUMN),
|
||||
col(SPAN_NAME_COLUMN),
|
||||
col(SPAN_ID_COLUMN),
|
||||
col(SPAN_ATTRIBUTES_COLUMN),
|
||||
col(RESOURCE_ATTRIBUTES_COLUMN),
|
||||
col(PARENT_SPAN_ID_COLUMN),
|
||||
col(SPAN_EVENTS_COLUMN),
|
||||
col(SCOPE_NAME_COLUMN),
|
||||
col(SCOPE_VERSION_COLUMN),
|
||||
col(SPAN_KIND_COLUMN),
|
||||
col(SPAN_STATUS_CODE),
|
||||
];
|
||||
static ref FIND_TRACES_SCHEMA: Vec<(&'static str, &'static str)> = vec![
|
||||
(TRACE_ID_COLUMN, "String"),
|
||||
(TIMESTAMP_COLUMN, "TimestampNanosecond"),
|
||||
(DURATION_NANO_COLUMN, "UInt64"),
|
||||
(SERVICE_NAME_COLUMN, "String"),
|
||||
(SPAN_NAME_COLUMN, "String"),
|
||||
(SPAN_ID_COLUMN, "String"),
|
||||
(SPAN_ATTRIBUTES_COLUMN, "Json"),
|
||||
(RESOURCE_ATTRIBUTES_COLUMN, "Json"),
|
||||
(PARENT_SPAN_ID_COLUMN, "String"),
|
||||
(SPAN_EVENTS_COLUMN, "Json"),
|
||||
(SCOPE_NAME_COLUMN, "String"),
|
||||
(SCOPE_VERSION_COLUMN, "String"),
|
||||
(SPAN_KIND_COLUMN, "String"),
|
||||
(SPAN_STATUS_CODE, "String"),
|
||||
];
|
||||
}
|
||||
pub const JAEGER_QUERY_TABLE_NAME_KEY: &str = "jaeger_query_table_name";
|
||||
|
||||
const REF_TYPE_CHILD_OF: &str = "CHILD_OF";
|
||||
|
||||
const SPAN_KIND_TIME_FMTS: [&str; 2] = ["%Y-%m-%d %H:%M:%S%.6f%z", "%Y-%m-%d %H:%M:%S%.9f%z"];
|
||||
|
||||
/// JaegerAPIResponse is the response of Jaeger HTTP API.
|
||||
@@ -240,9 +204,6 @@ pub enum ValueType {
|
||||
#[derive(Default, Debug, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct JaegerQueryParams {
|
||||
/// Database that the trace data stored in.
|
||||
pub db: Option<String>,
|
||||
|
||||
/// Service name of the trace.
|
||||
#[serde(rename = "service")]
|
||||
pub service_name: Option<String>,
|
||||
@@ -275,26 +236,27 @@ pub struct JaegerQueryParams {
|
||||
pub span_kind: Option<String>,
|
||||
}
|
||||
|
||||
fn update_query_context(query_ctx: &mut QueryContext, table_name: Option<String>) {
|
||||
// db should be already handled by middlewares
|
||||
query_ctx.set_channel(Channel::Jaeger);
|
||||
if let Some(table) = table_name {
|
||||
query_ctx.set_extension(JAEGER_QUERY_TABLE_NAME_KEY, table);
|
||||
}
|
||||
}
|
||||
|
||||
impl QueryTraceParams {
|
||||
fn from_jaeger_query_params(db: &str, query_params: JaegerQueryParams) -> Result<Self> {
|
||||
fn from_jaeger_query_params(query_params: JaegerQueryParams) -> Result<Self> {
|
||||
let mut internal_query_params: QueryTraceParams = QueryTraceParams {
|
||||
db: db.to_string(),
|
||||
service_name: query_params.service_name.context(InvalidJaegerQuerySnafu {
|
||||
reason: "service_name is required".to_string(),
|
||||
})?,
|
||||
operation_name: query_params.operation_name,
|
||||
// Convert start time from microseconds to nanoseconds.
|
||||
start_time: query_params.start.map(|start| start * 1000),
|
||||
end_time: query_params.end.map(|end| end * 1000),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
internal_query_params.service_name =
|
||||
query_params.service_name.context(InvalidJaegerQuerySnafu {
|
||||
reason: "service_name is required".to_string(),
|
||||
})?;
|
||||
|
||||
internal_query_params.operation_name = query_params.operation_name;
|
||||
|
||||
// Convert start time from microseconds to nanoseconds.
|
||||
internal_query_params.start_time = query_params.start.map(|start| start * 1000);
|
||||
|
||||
// Convert end time from microseconds to nanoseconds.
|
||||
internal_query_params.end_time = query_params.end.map(|end| end * 1000);
|
||||
|
||||
if let Some(max_duration) = query_params.max_duration {
|
||||
let duration = humantime::parse_duration(&max_duration).map_err(|e| {
|
||||
InvalidJaegerQuerySnafu {
|
||||
@@ -343,7 +305,6 @@ impl QueryTraceParams {
|
||||
|
||||
#[derive(Debug, Default, PartialEq)]
|
||||
pub struct QueryTraceParams {
|
||||
pub db: String,
|
||||
pub service_name: String,
|
||||
pub operation_name: Option<String>,
|
||||
|
||||
@@ -367,12 +328,14 @@ pub async fn handle_get_services(
|
||||
State(handler): State<JaegerQueryHandlerRef>,
|
||||
Query(query_params): Query<JaegerQueryParams>,
|
||||
Extension(mut query_ctx): Extension<QueryContext>,
|
||||
TraceTableName(table_name): TraceTableName,
|
||||
) -> impl IntoResponse {
|
||||
debug!(
|
||||
"Received Jaeger '/api/services' request, query_params: {:?}, query_ctx: {:?}",
|
||||
query_params, query_ctx
|
||||
);
|
||||
query_ctx.set_channel(Channel::Jaeger);
|
||||
|
||||
update_query_context(&mut query_ctx, table_name);
|
||||
let query_ctx = Arc::new(query_ctx);
|
||||
let db = query_ctx.get_db_string();
|
||||
|
||||
@@ -418,12 +381,14 @@ pub async fn handle_get_trace(
|
||||
Path(trace_id): Path<String>,
|
||||
Query(query_params): Query<JaegerQueryParams>,
|
||||
Extension(mut query_ctx): Extension<QueryContext>,
|
||||
TraceTableName(table_name): TraceTableName,
|
||||
) -> impl IntoResponse {
|
||||
debug!(
|
||||
"Received Jaeger '/api/traces/{}' request, query_params: {:?}, query_ctx: {:?}",
|
||||
trace_id, query_params, query_ctx
|
||||
);
|
||||
query_ctx.set_channel(Channel::Jaeger);
|
||||
|
||||
update_query_context(&mut query_ctx, table_name);
|
||||
let query_ctx = Arc::new(query_ctx);
|
||||
let db = query_ctx.get_db_string();
|
||||
|
||||
@@ -472,12 +437,14 @@ pub async fn handle_find_traces(
|
||||
State(handler): State<JaegerQueryHandlerRef>,
|
||||
Query(query_params): Query<JaegerQueryParams>,
|
||||
Extension(mut query_ctx): Extension<QueryContext>,
|
||||
TraceTableName(table_name): TraceTableName,
|
||||
) -> impl IntoResponse {
|
||||
debug!(
|
||||
"Received Jaeger '/api/traces' request, query_params: {:?}, query_ctx: {:?}",
|
||||
query_params, query_ctx
|
||||
);
|
||||
query_ctx.set_channel(Channel::Jaeger);
|
||||
|
||||
update_query_context(&mut query_ctx, table_name);
|
||||
let query_ctx = Arc::new(query_ctx);
|
||||
let db = query_ctx.get_db_string();
|
||||
|
||||
@@ -486,7 +453,7 @@ pub async fn handle_find_traces(
|
||||
.with_label_values(&[&db, "/api/traces"])
|
||||
.start_timer();
|
||||
|
||||
match QueryTraceParams::from_jaeger_query_params(&db, query_params) {
|
||||
match QueryTraceParams::from_jaeger_query_params(query_params) {
|
||||
Ok(query_params) => {
|
||||
let output = handler.find_traces(query_ctx, query_params).await;
|
||||
match output {
|
||||
@@ -521,13 +488,14 @@ pub async fn handle_get_operations(
|
||||
State(handler): State<JaegerQueryHandlerRef>,
|
||||
Query(query_params): Query<JaegerQueryParams>,
|
||||
Extension(mut query_ctx): Extension<QueryContext>,
|
||||
TraceTableName(table_name): TraceTableName,
|
||||
) -> impl IntoResponse {
|
||||
debug!(
|
||||
"Received Jaeger '/api/operations' request, query_params: {:?}, query_ctx: {:?}",
|
||||
query_params, query_ctx
|
||||
);
|
||||
if let Some(service_name) = query_params.service_name {
|
||||
query_ctx.set_channel(Channel::Jaeger);
|
||||
if let Some(service_name) = &query_params.service_name {
|
||||
update_query_context(&mut query_ctx, table_name);
|
||||
let query_ctx = Arc::new(query_ctx);
|
||||
let db = query_ctx.get_db_string();
|
||||
|
||||
@@ -537,7 +505,7 @@ pub async fn handle_get_operations(
|
||||
.start_timer();
|
||||
|
||||
match handler
|
||||
.get_operations(query_ctx, &service_name, query_params.span_kind.as_deref())
|
||||
.get_operations(query_ctx, service_name, query_params.span_kind.as_deref())
|
||||
.await
|
||||
{
|
||||
Ok(output) => match covert_to_records(output).await {
|
||||
@@ -593,12 +561,14 @@ pub async fn handle_get_operations_by_service(
|
||||
Path(service_name): Path<String>,
|
||||
Query(query_params): Query<JaegerQueryParams>,
|
||||
Extension(mut query_ctx): Extension<QueryContext>,
|
||||
TraceTableName(table_name): TraceTableName,
|
||||
) -> impl IntoResponse {
|
||||
debug!(
|
||||
"Received Jaeger '/api/services/{}/operations' request, query_params: {:?}, query_ctx: {:?}",
|
||||
service_name, query_params, query_ctx
|
||||
);
|
||||
query_ctx.set_channel(Channel::Jaeger);
|
||||
|
||||
update_query_context(&mut query_ctx, table_name);
|
||||
let query_ctx = Arc::new(query_ctx);
|
||||
let db = query_ctx.get_db_string();
|
||||
|
||||
@@ -690,11 +660,8 @@ fn error_response(err: Error) -> (HttpStatusCode, axum::Json<JaegerAPIResponse>)
|
||||
}),
|
||||
)
|
||||
}
|
||||
// Construct Jaeger traces from records.
|
||||
fn traces_from_records(records: HttpRecordsOutput) -> Result<Vec<Trace>> {
|
||||
let expected_schema = FIND_TRACES_SCHEMA.clone();
|
||||
check_schema(&records, &expected_schema)?;
|
||||
|
||||
fn traces_from_records(records: HttpRecordsOutput) -> Result<Vec<Trace>> {
|
||||
// maintain the mapping: trace_id -> (process_id -> service_name).
|
||||
let mut trace_id_to_processes: HashMap<String, HashMap<String, String>> = HashMap::new();
|
||||
// maintain the mapping: trace_id -> spans.
|
||||
@@ -702,38 +669,202 @@ fn traces_from_records(records: HttpRecordsOutput) -> Result<Vec<Trace>> {
|
||||
// maintain the mapping: service.name -> resource.attributes.
|
||||
let mut service_to_resource_attributes: HashMap<String, Vec<KeyValue>> = HashMap::new();
|
||||
|
||||
let is_span_attributes_flatten = !records
|
||||
.schema
|
||||
.column_schemas
|
||||
.iter()
|
||||
.any(|c| c.name == SPAN_ATTRIBUTES_COLUMN);
|
||||
|
||||
for row in records.rows.into_iter() {
|
||||
let mut span = Span::default();
|
||||
let mut row_iter = row.into_iter();
|
||||
let mut service_name = None;
|
||||
let mut resource_tags = vec![];
|
||||
|
||||
// Set trace id.
|
||||
if let Some(JsonValue::String(trace_id)) = row_iter.next() {
|
||||
span.trace_id = trace_id.clone();
|
||||
trace_id_to_processes.entry(trace_id).or_default();
|
||||
}
|
||||
for (idx, cell) in row.into_iter().enumerate() {
|
||||
// safe to use index here
|
||||
let column_name = &records.schema.column_schemas[idx].name;
|
||||
|
||||
// Convert timestamp from nanoseconds to microseconds.
|
||||
if let Some(JsonValue::Number(timestamp)) = row_iter.next() {
|
||||
span.start_time = timestamp.as_u64().ok_or_else(|| {
|
||||
InvalidJaegerQuerySnafu {
|
||||
reason: "Failed to convert timestamp to u64".to_string(),
|
||||
match column_name.as_str() {
|
||||
TRACE_ID_COLUMN => {
|
||||
if let JsonValue::String(trace_id) = cell {
|
||||
span.trace_id = trace_id.clone();
|
||||
trace_id_to_processes.entry(trace_id).or_default();
|
||||
}
|
||||
}
|
||||
.build()
|
||||
})? / 1000;
|
||||
}
|
||||
|
||||
// Convert duration from nanoseconds to microseconds.
|
||||
if let Some(JsonValue::Number(duration)) = row_iter.next() {
|
||||
span.duration = duration.as_u64().ok_or_else(|| {
|
||||
InvalidJaegerQuerySnafu {
|
||||
reason: "Failed to convert duration to u64".to_string(),
|
||||
TIMESTAMP_COLUMN => {
|
||||
span.start_time = cell.as_u64().context(InvalidJaegerQuerySnafu {
|
||||
reason: "Failed to convert timestamp to u64".to_string(),
|
||||
})? / 1000;
|
||||
}
|
||||
.build()
|
||||
})? / 1000;
|
||||
DURATION_NANO_COLUMN => {
|
||||
span.duration = cell.as_u64().context(InvalidJaegerQuerySnafu {
|
||||
reason: "Failed to convert duration to u64".to_string(),
|
||||
})? / 1000;
|
||||
}
|
||||
SERVICE_NAME_COLUMN => {
|
||||
if let JsonValue::String(name) = cell {
|
||||
service_name = Some(name);
|
||||
}
|
||||
}
|
||||
SPAN_NAME_COLUMN => {
|
||||
if let JsonValue::String(span_name) = cell {
|
||||
span.operation_name = span_name;
|
||||
}
|
||||
}
|
||||
SPAN_ID_COLUMN => {
|
||||
if let JsonValue::String(span_id) = cell {
|
||||
span.span_id = span_id;
|
||||
}
|
||||
}
|
||||
SPAN_ATTRIBUTES_COLUMN => {
|
||||
// for v0 data model, span_attributes are nested as a json
|
||||
// data structure
|
||||
if let JsonValue::Object(span_attrs) = cell {
|
||||
span.tags.extend(object_to_tags(span_attrs));
|
||||
}
|
||||
}
|
||||
RESOURCE_ATTRIBUTES_COLUMN => {
|
||||
// for v0 data model, resource_attributes are nested as a json
|
||||
// data structure
|
||||
|
||||
if let JsonValue::Object(mut resource_attrs) = cell {
|
||||
resource_attrs.remove(KEY_SERVICE_NAME);
|
||||
resource_tags = object_to_tags(resource_attrs);
|
||||
}
|
||||
}
|
||||
PARENT_SPAN_ID_COLUMN => {
|
||||
if let JsonValue::String(parent_span_id) = cell {
|
||||
if !parent_span_id.is_empty() {
|
||||
span.references.push(Reference {
|
||||
trace_id: span.trace_id.clone(),
|
||||
span_id: parent_span_id,
|
||||
ref_type: REF_TYPE_CHILD_OF.to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
SPAN_EVENTS_COLUMN => {
|
||||
if let JsonValue::Array(events) = cell {
|
||||
for event in events {
|
||||
if let JsonValue::Object(mut obj) = event {
|
||||
let Some(action) = obj.get("name").and_then(|v| v.as_str()) else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let Some(t) =
|
||||
obj.get("time").and_then(|t| t.as_str()).and_then(|s| {
|
||||
SPAN_KIND_TIME_FMTS
|
||||
.iter()
|
||||
.find_map(|fmt| {
|
||||
chrono::DateTime::parse_from_str(s, fmt).ok()
|
||||
})
|
||||
.map(|dt| dt.timestamp_micros() as u64)
|
||||
})
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let mut fields = vec![KeyValue {
|
||||
key: "event".to_string(),
|
||||
value_type: ValueType::String,
|
||||
value: Value::String(action.to_string()),
|
||||
}];
|
||||
|
||||
// Add event attributes as fields
|
||||
if let Some(JsonValue::Object(attrs)) = obj.remove("attributes") {
|
||||
fields.extend(object_to_tags(attrs));
|
||||
}
|
||||
|
||||
span.logs.push(Log {
|
||||
timestamp: t,
|
||||
fields,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
SCOPE_NAME_COLUMN => {
|
||||
if let JsonValue::String(scope_name) = cell {
|
||||
if !scope_name.is_empty() {
|
||||
span.tags.push(KeyValue {
|
||||
key: KEY_OTEL_SCOPE_NAME.to_string(),
|
||||
value_type: ValueType::String,
|
||||
value: Value::String(scope_name),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
SCOPE_VERSION_COLUMN => {
|
||||
if let JsonValue::String(scope_version) = cell {
|
||||
if !scope_version.is_empty() {
|
||||
span.tags.push(KeyValue {
|
||||
key: KEY_OTEL_SCOPE_VERSION.to_string(),
|
||||
value_type: ValueType::String,
|
||||
value: Value::String(scope_version),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
SPAN_KIND_COLUMN => {
|
||||
if let JsonValue::String(span_kind) = cell {
|
||||
if !span_kind.is_empty() {
|
||||
span.tags.push(KeyValue {
|
||||
key: KEY_SPAN_KIND.to_string(),
|
||||
value_type: ValueType::String,
|
||||
value: Value::String(normalize_span_kind(&span_kind)),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
SPAN_STATUS_CODE => {
|
||||
if let JsonValue::String(span_status) = cell {
|
||||
if span_status != SPAN_STATUS_UNSET && !span_status.is_empty() {
|
||||
span.tags.push(KeyValue {
|
||||
key: KEY_OTEL_STATUS_CODE.to_string(),
|
||||
value_type: ValueType::String,
|
||||
value: Value::String(normalize_status_code(&span_status)),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_ => {
|
||||
// this this v1 data model
|
||||
if is_span_attributes_flatten {
|
||||
const SPAN_ATTR_PREFIX: &str = "span_attributes.";
|
||||
const RESOURCE_ATTR_PREFIX: &str = "resource_attributes.";
|
||||
// a span attributes column
|
||||
if column_name.starts_with(SPAN_ATTR_PREFIX) {
|
||||
if let Some(keyvalue) = to_keyvalue(
|
||||
column_name
|
||||
.strip_prefix(SPAN_ATTR_PREFIX)
|
||||
.unwrap_or_default()
|
||||
.to_string(),
|
||||
cell,
|
||||
) {
|
||||
span.tags.push(keyvalue);
|
||||
}
|
||||
} else if column_name.starts_with(RESOURCE_ATTR_PREFIX) {
|
||||
if let Some(keyvalue) = to_keyvalue(
|
||||
column_name
|
||||
.strip_prefix(RESOURCE_ATTR_PREFIX)
|
||||
.unwrap_or_default()
|
||||
.to_string(),
|
||||
cell,
|
||||
) {
|
||||
resource_tags.push(keyvalue);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Collect services to construct processes.
|
||||
if let Some(JsonValue::String(service_name)) = row_iter.next() {
|
||||
if let Some(service_name) = service_name {
|
||||
if !service_to_resource_attributes.contains_key(&service_name) {
|
||||
service_to_resource_attributes.insert(service_name.clone(), resource_tags);
|
||||
}
|
||||
|
||||
if let Some(process) = trace_id_to_processes.get_mut(&span.trace_id) {
|
||||
if let Some(process_id) = process.get(&service_name) {
|
||||
span.process_id = process_id.clone();
|
||||
@@ -746,127 +877,8 @@ fn traces_from_records(records: HttpRecordsOutput) -> Result<Vec<Trace>> {
|
||||
}
|
||||
}
|
||||
|
||||
// Set operation name. In Jaeger, the operation name is the span name.
|
||||
if let Some(JsonValue::String(span_name)) = row_iter.next() {
|
||||
span.operation_name = span_name;
|
||||
}
|
||||
|
||||
// Set span id.
|
||||
if let Some(JsonValue::String(span_id)) = row_iter.next() {
|
||||
span.span_id = span_id;
|
||||
}
|
||||
|
||||
// Convert span attributes to tags.
|
||||
if let Some(JsonValue::Object(object)) = row_iter.next() {
|
||||
span.tags = object_to_tags(object);
|
||||
}
|
||||
|
||||
// Save resource attributes with service name.
|
||||
if let Some(JsonValue::Object(mut object)) = row_iter.next() {
|
||||
if let Some(service_name) = object
|
||||
.remove(KEY_SERVICE_NAME)
|
||||
.and_then(|v| v.as_str().map(|s| s.to_string()))
|
||||
{
|
||||
match service_to_resource_attributes.entry(service_name) {
|
||||
Occupied(_) => {}
|
||||
Vacant(vacant) => {
|
||||
let _ = vacant.insert(object_to_tags(object));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Set parent span id.
|
||||
if let Some(JsonValue::String(parent_span_id)) = row_iter.next()
|
||||
&& !parent_span_id.is_empty()
|
||||
{
|
||||
span.references.push(Reference {
|
||||
trace_id: span.trace_id.clone(),
|
||||
span_id: parent_span_id,
|
||||
ref_type: REF_TYPE_CHILD_OF.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
// Set span events to logs.
|
||||
if let Some(JsonValue::Array(events)) = row_iter.next() {
|
||||
for event in events {
|
||||
if let JsonValue::Object(mut obj) = event {
|
||||
let Some(action) = obj.get("name").and_then(|v| v.as_str()) else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let Some(t) = obj.get("time").and_then(|t| t.as_str()).and_then(|s| {
|
||||
SPAN_KIND_TIME_FMTS
|
||||
.iter()
|
||||
.find_map(|fmt| chrono::DateTime::parse_from_str(s, fmt).ok())
|
||||
.map(|dt| dt.timestamp_micros() as u64)
|
||||
}) else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let mut fields = vec![KeyValue {
|
||||
key: "event".to_string(),
|
||||
value_type: ValueType::String,
|
||||
value: Value::String(action.to_string()),
|
||||
}];
|
||||
|
||||
// Add event attributes as fields
|
||||
if let Some(JsonValue::Object(attrs)) = obj.remove("attributes") {
|
||||
fields.extend(object_to_tags(attrs));
|
||||
}
|
||||
|
||||
span.logs.push(Log {
|
||||
timestamp: t,
|
||||
fields,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Set scope name.
|
||||
if let Some(JsonValue::String(scope_name)) = row_iter.next()
|
||||
&& !scope_name.is_empty()
|
||||
{
|
||||
span.tags.push(KeyValue {
|
||||
key: KEY_OTEL_SCOPE_NAME.to_string(),
|
||||
value_type: ValueType::String,
|
||||
value: Value::String(scope_name),
|
||||
});
|
||||
}
|
||||
|
||||
// Set scope version.
|
||||
if let Some(JsonValue::String(scope_version)) = row_iter.next()
|
||||
&& !scope_version.is_empty()
|
||||
{
|
||||
span.tags.push(KeyValue {
|
||||
key: KEY_OTEL_SCOPE_VERSION.to_string(),
|
||||
value_type: ValueType::String,
|
||||
value: Value::String(scope_version),
|
||||
});
|
||||
}
|
||||
|
||||
// Set span kind.
|
||||
if let Some(JsonValue::String(span_kind)) = row_iter.next()
|
||||
&& !span_kind.is_empty()
|
||||
{
|
||||
span.tags.push(KeyValue {
|
||||
key: KEY_SPAN_KIND.to_string(),
|
||||
value_type: ValueType::String,
|
||||
value: Value::String(normalize_span_kind(&span_kind)),
|
||||
});
|
||||
}
|
||||
|
||||
// Set span status code.
|
||||
if let Some(JsonValue::String(span_status_code)) = row_iter.next()
|
||||
&& span_status_code != SPAN_STATUS_UNSET
|
||||
&& !span_status_code.is_empty()
|
||||
{
|
||||
span.tags.push(KeyValue {
|
||||
key: KEY_OTEL_STATUS_CODE.to_string(),
|
||||
value_type: ValueType::String,
|
||||
value: Value::String(normalize_status_code(&span_status_code)),
|
||||
});
|
||||
}
|
||||
// ensure span tags order
|
||||
span.tags.sort_by(|a, b| a.key.cmp(&b.key));
|
||||
|
||||
if let Some(spans) = trace_id_to_spans.get_mut(&span.trace_id) {
|
||||
spans.push(span);
|
||||
@@ -899,42 +911,41 @@ fn traces_from_records(records: HttpRecordsOutput) -> Result<Vec<Trace>> {
|
||||
Ok(traces)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn to_keyvalue(key: String, value: JsonValue) -> Option<KeyValue> {
|
||||
match value {
|
||||
JsonValue::String(value) => Some(KeyValue {
|
||||
key,
|
||||
value_type: ValueType::String,
|
||||
value: Value::String(value.to_string()),
|
||||
}),
|
||||
JsonValue::Number(value) => Some(KeyValue {
|
||||
key,
|
||||
value_type: ValueType::Int64,
|
||||
value: Value::Int64(value.as_i64().unwrap_or(0)),
|
||||
}),
|
||||
JsonValue::Bool(value) => Some(KeyValue {
|
||||
key,
|
||||
value_type: ValueType::Boolean,
|
||||
value: Value::Boolean(value),
|
||||
}),
|
||||
JsonValue::Array(value) => Some(KeyValue {
|
||||
key,
|
||||
value_type: ValueType::String,
|
||||
value: Value::String(serde_json::to_string(&value).unwrap()),
|
||||
}),
|
||||
JsonValue::Object(value) => Some(KeyValue {
|
||||
key,
|
||||
value_type: ValueType::String,
|
||||
value: Value::String(serde_json::to_string(&value).unwrap()),
|
||||
}),
|
||||
JsonValue::Null => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn object_to_tags(object: serde_json::map::Map<String, JsonValue>) -> Vec<KeyValue> {
|
||||
object
|
||||
.into_iter()
|
||||
.filter_map(|(key, value)| match value {
|
||||
JsonValue::String(value) => Some(KeyValue {
|
||||
key,
|
||||
value_type: ValueType::String,
|
||||
value: Value::String(value.to_string()),
|
||||
}),
|
||||
JsonValue::Number(value) => Some(KeyValue {
|
||||
key,
|
||||
value_type: ValueType::Int64,
|
||||
value: Value::Int64(value.as_i64().unwrap_or(0)),
|
||||
}),
|
||||
JsonValue::Bool(value) => Some(KeyValue {
|
||||
key,
|
||||
value_type: ValueType::Boolean,
|
||||
value: Value::Boolean(value),
|
||||
}),
|
||||
JsonValue::Array(value) => Some(KeyValue {
|
||||
key,
|
||||
value_type: ValueType::String,
|
||||
value: Value::String(serde_json::to_string(&value).unwrap()),
|
||||
}),
|
||||
JsonValue::Object(value) => Some(KeyValue {
|
||||
key,
|
||||
value_type: ValueType::String,
|
||||
value: Value::String(serde_json::to_string(&value).unwrap()),
|
||||
}),
|
||||
// FIXME(zyy17): Do we need to support other types?
|
||||
_ => {
|
||||
warn!("Unsupported value type: {:?}", value);
|
||||
None
|
||||
}
|
||||
})
|
||||
.filter_map(|(key, value)| to_keyvalue(key, value))
|
||||
.collect()
|
||||
}
|
||||
|
||||
@@ -1055,7 +1066,6 @@ fn convert_string_to_boolean(input: &serde_json::Value) -> Option<serde_json::Va
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
|
||||
use serde_json::{json, Number, Value as JsonValue};
|
||||
|
||||
use super::*;
|
||||
@@ -1301,6 +1311,151 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_traces_from_v1_records() {
|
||||
// The tests is the tuple of `(test_records, expected)`.
|
||||
let tests = vec![(
|
||||
HttpRecordsOutput {
|
||||
schema: OutputSchema {
|
||||
column_schemas: vec![
|
||||
ColumnSchema {
|
||||
name: "trace_id".to_string(),
|
||||
data_type: "String".to_string(),
|
||||
},
|
||||
ColumnSchema {
|
||||
name: "timestamp".to_string(),
|
||||
data_type: "TimestampNanosecond".to_string(),
|
||||
},
|
||||
ColumnSchema {
|
||||
name: "duration_nano".to_string(),
|
||||
data_type: "UInt64".to_string(),
|
||||
},
|
||||
ColumnSchema {
|
||||
name: "service_name".to_string(),
|
||||
data_type: "String".to_string(),
|
||||
},
|
||||
ColumnSchema {
|
||||
name: "span_name".to_string(),
|
||||
data_type: "String".to_string(),
|
||||
},
|
||||
ColumnSchema {
|
||||
name: "span_id".to_string(),
|
||||
data_type: "String".to_string(),
|
||||
},
|
||||
ColumnSchema {
|
||||
name: "span_attributes.http.request.method".to_string(),
|
||||
data_type: "String".to_string(),
|
||||
},
|
||||
ColumnSchema {
|
||||
name: "span_attributes.http.request.url".to_string(),
|
||||
data_type: "String".to_string(),
|
||||
},
|
||||
ColumnSchema {
|
||||
name: "span_attributes.http.status_code".to_string(),
|
||||
data_type: "UInt64".to_string(),
|
||||
},
|
||||
],
|
||||
},
|
||||
rows: vec![
|
||||
vec![
|
||||
JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
|
||||
JsonValue::Number(Number::from_u128(1738726754492422000).unwrap()),
|
||||
JsonValue::Number(Number::from_u128(100000000).unwrap()),
|
||||
JsonValue::String("test-service-0".to_string()),
|
||||
JsonValue::String("access-mysql".to_string()),
|
||||
JsonValue::String("008421dbbd33a3e9".to_string()),
|
||||
JsonValue::String("GET".to_string()),
|
||||
JsonValue::String("/data".to_string()),
|
||||
JsonValue::Number(Number::from_u128(200).unwrap()),
|
||||
],
|
||||
vec![
|
||||
JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
|
||||
JsonValue::Number(Number::from_u128(1738726754642422000).unwrap()),
|
||||
JsonValue::Number(Number::from_u128(100000000).unwrap()),
|
||||
JsonValue::String("test-service-0".to_string()),
|
||||
JsonValue::String("access-redis".to_string()),
|
||||
JsonValue::String("ffa03416a7b9ea48".to_string()),
|
||||
JsonValue::String("POST".to_string()),
|
||||
JsonValue::String("/create".to_string()),
|
||||
JsonValue::Number(Number::from_u128(400).unwrap()),
|
||||
],
|
||||
],
|
||||
total_rows: 2,
|
||||
metrics: HashMap::new(),
|
||||
},
|
||||
vec![Trace {
|
||||
trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
|
||||
spans: vec![
|
||||
Span {
|
||||
trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
|
||||
span_id: "008421dbbd33a3e9".to_string(),
|
||||
operation_name: "access-mysql".to_string(),
|
||||
start_time: 1738726754492422,
|
||||
duration: 100000,
|
||||
tags: vec![
|
||||
KeyValue {
|
||||
key: "http.request.method".to_string(),
|
||||
value_type: ValueType::String,
|
||||
value: Value::String("GET".to_string()),
|
||||
},
|
||||
KeyValue {
|
||||
key: "http.request.url".to_string(),
|
||||
value_type: ValueType::String,
|
||||
value: Value::String("/data".to_string()),
|
||||
},
|
||||
KeyValue {
|
||||
key: "http.status_code".to_string(),
|
||||
value_type: ValueType::Int64,
|
||||
value: Value::Int64(200),
|
||||
},
|
||||
],
|
||||
process_id: "p1".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
Span {
|
||||
trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
|
||||
span_id: "ffa03416a7b9ea48".to_string(),
|
||||
operation_name: "access-redis".to_string(),
|
||||
start_time: 1738726754642422,
|
||||
duration: 100000,
|
||||
tags: vec![
|
||||
KeyValue {
|
||||
key: "http.request.method".to_string(),
|
||||
value_type: ValueType::String,
|
||||
value: Value::String("POST".to_string()),
|
||||
},
|
||||
KeyValue {
|
||||
key: "http.request.url".to_string(),
|
||||
value_type: ValueType::String,
|
||||
value: Value::String("/create".to_string()),
|
||||
},
|
||||
KeyValue {
|
||||
key: "http.status_code".to_string(),
|
||||
value_type: ValueType::Int64,
|
||||
value: Value::Int64(400),
|
||||
},
|
||||
],
|
||||
process_id: "p1".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
],
|
||||
processes: HashMap::from([(
|
||||
"p1".to_string(),
|
||||
Process {
|
||||
service_name: "test-service-0".to_string(),
|
||||
tags: vec![],
|
||||
},
|
||||
)]),
|
||||
..Default::default()
|
||||
}],
|
||||
)];
|
||||
|
||||
for (records, expected) in tests {
|
||||
let traces = traces_from_records(records).unwrap();
|
||||
assert_eq!(traces, expected);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_from_jaeger_query_params() {
|
||||
// The tests is the tuple of `(test_query_params, expected)`.
|
||||
@@ -1311,7 +1466,6 @@ mod tests {
|
||||
..Default::default()
|
||||
},
|
||||
QueryTraceParams {
|
||||
db: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
service_name: "test-service-0".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
@@ -1329,7 +1483,6 @@ mod tests {
|
||||
..Default::default()
|
||||
},
|
||||
QueryTraceParams {
|
||||
db: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
service_name: "test-service-0".to_string(),
|
||||
operation_name: Some("access-mysql".to_string()),
|
||||
start_time: Some(1738726754492422000),
|
||||
@@ -1349,9 +1502,7 @@ mod tests {
|
||||
];
|
||||
|
||||
for (query_params, expected) in tests {
|
||||
let query_params =
|
||||
QueryTraceParams::from_jaeger_query_params(DEFAULT_SCHEMA_NAME, query_params)
|
||||
.unwrap();
|
||||
let query_params = QueryTraceParams::from_jaeger_query_params(query_params).unwrap();
|
||||
assert_eq!(query_params, expected);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -47,7 +47,7 @@ use tokio::io::AsyncWrite;
|
||||
use crate::error::{self, DataFrameSnafu, InvalidPrepareStatementSnafu, Result};
|
||||
use crate::metrics::METRIC_AUTH_FAILURE;
|
||||
use crate::mysql::helper::{
|
||||
self, format_placeholder, replace_placeholders, transform_placeholders,
|
||||
self, fix_placeholder_types, format_placeholder, replace_placeholders, transform_placeholders,
|
||||
};
|
||||
use crate::mysql::writer;
|
||||
use crate::mysql::writer::{create_mysql_column, handle_err};
|
||||
@@ -183,7 +183,7 @@ impl MysqlInstanceShim {
|
||||
let describe_result = self
|
||||
.do_describe(statement.clone(), query_ctx.clone())
|
||||
.await?;
|
||||
let (plan, schema) = if let Some(DescribeResult {
|
||||
let (mut plan, schema) = if let Some(DescribeResult {
|
||||
logical_plan,
|
||||
schema,
|
||||
}) = describe_result
|
||||
@@ -193,7 +193,8 @@ impl MysqlInstanceShim {
|
||||
(None, None)
|
||||
};
|
||||
|
||||
let params = if let Some(plan) = &plan {
|
||||
let params = if let Some(plan) = &mut plan {
|
||||
fix_placeholder_types(plan)?;
|
||||
prepared_params(
|
||||
&plan
|
||||
.get_parameter_types()
|
||||
@@ -258,7 +259,8 @@ impl MysqlInstanceShim {
|
||||
};
|
||||
|
||||
let outputs = match sql_plan.plan {
|
||||
Some(plan) => {
|
||||
Some(mut plan) => {
|
||||
fix_placeholder_types(&mut plan)?;
|
||||
let param_types = plan
|
||||
.get_parameter_types()
|
||||
.context(DataFrameSnafu)?
|
||||
@@ -295,6 +297,10 @@ impl MysqlInstanceShim {
|
||||
}
|
||||
Params::CliParams(params) => params.iter().map(|x| x.to_string()).collect(),
|
||||
};
|
||||
debug!(
|
||||
"do_execute Replacing with Params: {:?}, Original Query: {}",
|
||||
param_strs, sql_plan.query
|
||||
);
|
||||
let query = replace_params(param_strs, sql_plan.query);
|
||||
debug!("Mysql execute replaced query: {}", query);
|
||||
self.do_query(&query, query_ctx.clone()).await
|
||||
@@ -412,6 +418,7 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
|
||||
let (params, columns) = self
|
||||
.do_prepare(raw_query, query_ctx.clone(), stmt_key)
|
||||
.await?;
|
||||
debug!("on_prepare: Params: {:?}, Columns: {:?}", params, columns);
|
||||
w.reply(stmt_id, ¶ms, &columns).await?;
|
||||
crate::metrics::METRIC_MYSQL_PREPARED_COUNT
|
||||
.with_label_values(&[query_ctx.get_db_string().as_str()])
|
||||
@@ -641,12 +648,13 @@ fn replace_params_with_values(
|
||||
debug_assert_eq!(param_types.len(), params.len());
|
||||
|
||||
debug!(
|
||||
"replace_params_with_values(param_types: {:#?}, params: {:#?})",
|
||||
"replace_params_with_values(param_types: {:#?}, params: {:#?}, plan: {:#?})",
|
||||
param_types,
|
||||
params
|
||||
.iter()
|
||||
.map(|x| format!("({:?}, {:?})", x.value, x.coltype))
|
||||
.join(", ")
|
||||
.join(", "),
|
||||
plan
|
||||
);
|
||||
|
||||
let mut values = Vec::with_capacity(params.len());
|
||||
@@ -672,9 +680,10 @@ fn replace_params_with_exprs(
|
||||
debug_assert_eq!(param_types.len(), params.len());
|
||||
|
||||
debug!(
|
||||
"replace_params_with_exprs(param_types: {:#?}, params: {:#?})",
|
||||
"replace_params_with_exprs(param_types: {:#?}, params: {:#?}, plan: {:#?})",
|
||||
param_types,
|
||||
params.iter().map(|x| format!("({:?})", x)).join(", ")
|
||||
params.iter().map(|x| format!("({:?})", x)).join(", "),
|
||||
plan
|
||||
);
|
||||
|
||||
let mut values = Vec::with_capacity(params.len());
|
||||
|
||||
@@ -18,6 +18,8 @@ use std::time::Duration;
|
||||
use chrono::NaiveDate;
|
||||
use common_query::prelude::ScalarValue;
|
||||
use common_time::Timestamp;
|
||||
use datafusion_common::tree_node::{Transformed, TreeNode};
|
||||
use datafusion_expr::LogicalPlan;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::types::TimestampType;
|
||||
use datatypes::value::{self, Value};
|
||||
@@ -28,7 +30,7 @@ use sql::ast::{visit_expressions_mut, Expr, Value as ValueExpr, VisitMut};
|
||||
use sql::statements::sql_value_to_value;
|
||||
use sql::statements::statement::Statement;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::error::{self, DataFusionSnafu, Result};
|
||||
|
||||
/// Returns the placeholder string "$i".
|
||||
pub fn format_placeholder(i: usize) -> String {
|
||||
@@ -77,6 +79,47 @@ pub fn transform_placeholders(stmt: Statement) -> Statement {
|
||||
}
|
||||
}
|
||||
|
||||
/// Give placeholder in skip and limit `int64` data type if it is not specified
|
||||
///
|
||||
/// because it seems datafusion will not give data type to placeholder if it's in limit/skip position, still unknown if this is a feature or a bug. And if a placeholder expr have no data type, datafusion will fail to extract it using `LogicalPlan::get_parameter_types`
|
||||
pub fn fix_placeholder_types(plan: &mut LogicalPlan) -> Result<()> {
|
||||
let give_placeholder_types = |mut e| {
|
||||
if let datafusion_expr::Expr::Placeholder(ph) = &mut e {
|
||||
if ph.data_type.is_none() {
|
||||
ph.data_type = Some(arrow_schema::DataType::Int64);
|
||||
Ok(Transformed::yes(e))
|
||||
} else {
|
||||
Ok(Transformed::no(e))
|
||||
}
|
||||
} else {
|
||||
Ok(Transformed::no(e))
|
||||
}
|
||||
};
|
||||
*plan = std::mem::take(plan)
|
||||
.transform(|p| {
|
||||
let LogicalPlan::Limit(mut limit) = p else {
|
||||
return Ok(Transformed::no(p));
|
||||
};
|
||||
|
||||
if let Some(fetch) = &mut limit.fetch {
|
||||
*fetch = Box::new(
|
||||
std::mem::take(fetch)
|
||||
.transform(give_placeholder_types)?
|
||||
.data,
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(skip) = &mut limit.skip {
|
||||
*skip = Box::new(std::mem::take(skip).transform(give_placeholder_types)?.data);
|
||||
}
|
||||
|
||||
Ok(Transformed::yes(LogicalPlan::Limit(limit)))
|
||||
})
|
||||
.context(DataFusionSnafu)?
|
||||
.data;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn visit_placeholders<V>(v: &mut V)
|
||||
where
|
||||
V: VisitMut,
|
||||
@@ -106,6 +149,7 @@ pub fn convert_value(param: &ParamValue, t: &ConcreteDataType) -> Result<ScalarV
|
||||
ConcreteDataType::UInt64(_) => Ok(ScalarValue::UInt64(Some(i as u64))),
|
||||
ConcreteDataType::Float32(_) => Ok(ScalarValue::Float32(Some(i as f32))),
|
||||
ConcreteDataType::Float64(_) => Ok(ScalarValue::Float64(Some(i as f64))),
|
||||
ConcreteDataType::Boolean(_) => Ok(ScalarValue::Boolean(Some(i != 0))),
|
||||
ConcreteDataType::Timestamp(ts_type) => Value::Timestamp(ts_type.create_timestamp(i))
|
||||
.try_to_scalar_value(t)
|
||||
.context(error::ConvertScalarValueSnafu),
|
||||
@@ -127,6 +171,7 @@ pub fn convert_value(param: &ParamValue, t: &ConcreteDataType) -> Result<ScalarV
|
||||
ConcreteDataType::UInt64(_) => Ok(ScalarValue::UInt64(Some(u))),
|
||||
ConcreteDataType::Float32(_) => Ok(ScalarValue::Float32(Some(u as f32))),
|
||||
ConcreteDataType::Float64(_) => Ok(ScalarValue::Float64(Some(u as f64))),
|
||||
ConcreteDataType::Boolean(_) => Ok(ScalarValue::Boolean(Some(u != 0))),
|
||||
ConcreteDataType::Timestamp(ts_type) => {
|
||||
Value::Timestamp(ts_type.create_timestamp(u as i64))
|
||||
.try_to_scalar_value(t)
|
||||
|
||||
@@ -32,7 +32,7 @@ use snafu::{ensure, ResultExt};
|
||||
use super::trace::attributes::OtlpAnyValue;
|
||||
use super::utils::{bytes_to_hex_string, key_value_to_jsonb};
|
||||
use crate::error::{
|
||||
IncompatibleSchemaSnafu, NotSupportedSnafu, PipelineTransformSnafu, Result,
|
||||
IncompatibleSchemaSnafu, NotSupportedSnafu, PipelineSnafu, Result,
|
||||
UnsupportedJsonDataTypeForTagSnafu,
|
||||
};
|
||||
use crate::pipeline::run_pipeline;
|
||||
@@ -72,8 +72,7 @@ pub async fn to_grpc_insert_requests(
|
||||
}
|
||||
PipelineWay::Pipeline(pipeline_def) => {
|
||||
let data = parse_export_logs_service_request(request);
|
||||
let array =
|
||||
pipeline::json_array_to_intermediate_state(data).context(PipelineTransformSnafu)?;
|
||||
let array = pipeline::json_array_to_intermediate_state(data).context(PipelineSnafu)?;
|
||||
|
||||
let inserts = run_pipeline(
|
||||
&pipeline_handler,
|
||||
|
||||
@@ -87,21 +87,14 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()>
|
||||
row_writer::write_tag(writer, SERVICE_NAME_COLUMN, service_name, &mut row)?;
|
||||
}
|
||||
|
||||
// tags
|
||||
let iter = vec![
|
||||
(TRACE_ID_COLUMN, span.trace_id),
|
||||
(SPAN_ID_COLUMN, span.span_id),
|
||||
(
|
||||
// write fields
|
||||
let fields = vec![
|
||||
make_string_column_data(TRACE_ID_COLUMN, span.trace_id),
|
||||
make_string_column_data(SPAN_ID_COLUMN, span.span_id),
|
||||
make_string_column_data(
|
||||
PARENT_SPAN_ID_COLUMN,
|
||||
span.parent_span_id.unwrap_or_default(),
|
||||
),
|
||||
]
|
||||
.into_iter()
|
||||
.map(|(col, val)| (col.to_string(), val));
|
||||
row_writer::write_tags(writer, iter, &mut row)?;
|
||||
|
||||
// write fields
|
||||
let fields = vec![
|
||||
make_string_column_data(SPAN_KIND_COLUMN, span.span_kind),
|
||||
make_string_column_data(SPAN_NAME_COLUMN, span.span_name),
|
||||
make_string_column_data("span_status_code", span.span_status_code),
|
||||
|
||||
@@ -95,13 +95,6 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()>
|
||||
];
|
||||
row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
|
||||
|
||||
// tags
|
||||
let tags = vec![
|
||||
(TRACE_ID_COLUMN.to_string(), span.trace_id),
|
||||
(SPAN_ID_COLUMN.to_string(), span.span_id),
|
||||
];
|
||||
row_writer::write_tags(writer, tags.into_iter(), &mut row)?;
|
||||
|
||||
// write fields
|
||||
if let Some(parent_span_id) = span.parent_span_id {
|
||||
row_writer::write_fields(
|
||||
@@ -115,6 +108,8 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()>
|
||||
}
|
||||
|
||||
let fields = vec![
|
||||
make_string_column_data(TRACE_ID_COLUMN, span.trace_id),
|
||||
make_string_column_data(SPAN_ID_COLUMN, span.span_id),
|
||||
make_string_column_data(SPAN_KIND_COLUMN, span.span_kind),
|
||||
make_string_column_data(SPAN_NAME_COLUMN, span.span_name),
|
||||
make_string_column_data("span_status_code", span.span_status_code),
|
||||
@@ -126,9 +121,9 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()>
|
||||
row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
|
||||
|
||||
if let Some(service_name) = span.service_name {
|
||||
row_writer::write_fields(
|
||||
row_writer::write_tags(
|
||||
writer,
|
||||
std::iter::once(make_string_column_data(SERVICE_NAME_COLUMN, service_name)),
|
||||
std::iter::once((SERVICE_NAME_COLUMN.to_string(), service_name)),
|
||||
&mut row,
|
||||
)?;
|
||||
}
|
||||
|
||||
@@ -23,7 +23,7 @@ use pipeline::{
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{CatalogSnafu, PipelineTransformSnafu, Result};
|
||||
use crate::error::{CatalogSnafu, PipelineSnafu, Result};
|
||||
use crate::metrics::{
|
||||
METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_SUCCESS_VALUE,
|
||||
};
|
||||
@@ -74,7 +74,7 @@ pub(crate) async fn run_pipeline(
|
||||
table_name,
|
||||
}]
|
||||
})
|
||||
.context(PipelineTransformSnafu)
|
||||
.context(PipelineSnafu)
|
||||
} else {
|
||||
let pipeline = get_pipeline(pipeline_definition, state, query_ctx).await?;
|
||||
|
||||
@@ -91,7 +91,7 @@ pub(crate) async fn run_pipeline(
|
||||
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
|
||||
.observe(transform_timer.elapsed().as_secs_f64());
|
||||
})
|
||||
.context(PipelineTransformSnafu)?;
|
||||
.context(PipelineSnafu)?;
|
||||
|
||||
match r {
|
||||
PipelineExecOutput::Transformed(row) => {
|
||||
|
||||
@@ -32,6 +32,7 @@ use std::sync::Arc;
|
||||
use api::prom_store::remote::ReadRequest;
|
||||
use api::v1::RowInsertRequests;
|
||||
use async_trait::async_trait;
|
||||
use catalog::CatalogManager;
|
||||
use common_query::Output;
|
||||
use headers::HeaderValue;
|
||||
use log_query::LogQuery;
|
||||
@@ -172,7 +173,11 @@ pub trait PipelineHandler {
|
||||
/// Handle log query requests.
|
||||
#[async_trait]
|
||||
pub trait LogQueryHandler {
|
||||
/// Execute a log query.
|
||||
async fn query(&self, query: LogQuery, ctx: QueryContextRef) -> Result<Output>;
|
||||
|
||||
/// Get catalog manager.
|
||||
fn catalog_manager(&self, ctx: &QueryContext) -> Result<&dyn CatalogManager>;
|
||||
}
|
||||
|
||||
/// Handle Jaeger query requests.
|
||||
|
||||
@@ -386,7 +386,7 @@ async fn test_config() {
|
||||
|
||||
[http]
|
||||
addr = "127.0.0.1:4000"
|
||||
timeout = "30s"
|
||||
timeout = "0s"
|
||||
body_limit = "2GB"
|
||||
|
||||
[logging]
|
||||
|
||||
@@ -43,6 +43,9 @@ pub const FILE_TABLE_LOCATION_KEY: &str = "location";
|
||||
pub const FILE_TABLE_PATTERN_KEY: &str = "pattern";
|
||||
pub const FILE_TABLE_FORMAT_KEY: &str = "format";
|
||||
|
||||
pub const TABLE_DATA_MODEL: &str = "table_data_model";
|
||||
pub const TABLE_DATA_MODEL_TRACE_V1: &str = "greptime_trace_v1";
|
||||
|
||||
/// Returns true if the `key` is a valid key for any engine or storage.
|
||||
pub fn validate_table_option(key: &str) -> bool {
|
||||
if is_supported_in_s3(key) {
|
||||
@@ -70,6 +73,8 @@ pub fn validate_table_option(key: &str) -> bool {
|
||||
// metric engine keys:
|
||||
PHYSICAL_TABLE_METADATA_KEY,
|
||||
LOGICAL_TABLE_METADATA_KEY,
|
||||
// table model info
|
||||
TABLE_DATA_MODEL,
|
||||
]
|
||||
.contains(&key)
|
||||
}
|
||||
|
||||
@@ -53,12 +53,7 @@ serde_yaml = "0.9"
|
||||
snafu = { workspace = true }
|
||||
sql = { workspace = true }
|
||||
sqlparser.workspace = true
|
||||
sqlx = { version = "0.8", features = [
|
||||
"runtime-tokio-rustls",
|
||||
"mysql",
|
||||
"postgres",
|
||||
"chrono",
|
||||
] }
|
||||
sqlx.workspace = true
|
||||
store-api = { workspace = true }
|
||||
strum.workspace = true
|
||||
tinytemplate = "1.2"
|
||||
|
||||
@@ -37,16 +37,16 @@ common-telemetry.workspace = true
|
||||
common-test-util.workspace = true
|
||||
common-time.workspace = true
|
||||
common-wal.workspace = true
|
||||
datanode.workspace = true
|
||||
datanode = { workspace = true }
|
||||
datatypes.workspace = true
|
||||
dotenv.workspace = true
|
||||
flate2.workspace = true
|
||||
flate2 = "1.0"
|
||||
flow.workspace = true
|
||||
frontend = { workspace = true, features = ["testing"] }
|
||||
futures.workspace = true
|
||||
futures-util.workspace = true
|
||||
hyper-util = { workspace = true, features = ["tokio"] }
|
||||
log-query.workspace = true
|
||||
log-query = { workspace = true }
|
||||
loki-proto.workspace = true
|
||||
meta-client.workspace = true
|
||||
meta-srv = { workspace = true, features = ["mock"] }
|
||||
@@ -92,9 +92,10 @@ itertools.workspace = true
|
||||
opentelemetry-proto.workspace = true
|
||||
partition.workspace = true
|
||||
paste.workspace = true
|
||||
pipeline.workspace = true
|
||||
prost.workspace = true
|
||||
rand.workspace = true
|
||||
session = { workspace = true, features = ["testing"] }
|
||||
store-api.workspace = true
|
||||
tokio-postgres.workspace = true
|
||||
tokio-postgres = { workspace = true }
|
||||
url = "2.3"
|
||||
|
||||
@@ -28,6 +28,7 @@ use loki_proto::prost_types::Timestamp;
|
||||
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
|
||||
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
|
||||
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
|
||||
use pipeline::GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME;
|
||||
use prost::Message;
|
||||
use serde_json::{json, Value};
|
||||
use servers::http::handler::HealthResponse;
|
||||
@@ -105,6 +106,7 @@ macro_rules! http_tests {
|
||||
test_elasticsearch_logs_with_index,
|
||||
test_log_query,
|
||||
test_jaeger_query_api,
|
||||
test_jaeger_query_api_for_trace_v1,
|
||||
);
|
||||
)*
|
||||
};
|
||||
@@ -941,7 +943,7 @@ init_regions_parallelism = 16
|
||||
|
||||
[http]
|
||||
addr = "127.0.0.1:4000"
|
||||
timeout = "30s"
|
||||
timeout = "0s"
|
||||
body_limit = "64MiB"
|
||||
is_strict_mode = false
|
||||
cors_allowed_origins = []
|
||||
@@ -2114,7 +2116,7 @@ pub async fn test_otlp_traces_v0(store_type: StorageType) {
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
|
||||
// select traces data
|
||||
let expected = r#"[[1736480942444376000,1736480942444499000,123000,"telemetrygen","c05d7a4ec8e1f231f02ed6e8da8655b4","9630f2916e2f7909","d24f921c75f68e23","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-client"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}],[1736480942444376000,1736480942444499000,123000,"telemetrygen","c05d7a4ec8e1f231f02ed6e8da8655b4","d24f921c75f68e23","","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-server"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}],[1736480942444589000,1736480942444712000,123000,"telemetrygen","cc9e0991a2e63d274984bd44ee669203","8f847259b0f6e1ab","eba7be77e3558179","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-client"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}],[1736480942444589000,1736480942444712000,123000,"telemetrygen","cc9e0991a2e63d274984bd44ee669203","eba7be77e3558179","","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-server"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}]]"#;
|
||||
let expected = r#"[[1736480942444376000,1736480942444499000,123000,"telemetrygen","c05d7a4ec8e1f231f02ed6e8da8655b4","d24f921c75f68e23","","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-server"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}],[1736480942444376000,1736480942444499000,123000,"telemetrygen","c05d7a4ec8e1f231f02ed6e8da8655b4","9630f2916e2f7909","d24f921c75f68e23","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-client"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}],[1736480942444589000,1736480942444712000,123000,"telemetrygen","cc9e0991a2e63d274984bd44ee669203","eba7be77e3558179","","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-server"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}],[1736480942444589000,1736480942444712000,123000,"telemetrygen","cc9e0991a2e63d274984bd44ee669203","8f847259b0f6e1ab","eba7be77e3558179","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-client"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}]]"#;
|
||||
validate_data(
|
||||
"otlp_traces",
|
||||
&client,
|
||||
@@ -2160,7 +2162,6 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
|
||||
// init
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_otlp_traces").await;
|
||||
const TRACE_V1: &str = "greptime_trace_v1";
|
||||
|
||||
let content = r#"
|
||||
{"resourceSpans":[{"resource":{"attributes":[{"key":"service.name","value":{"stringValue":"telemetrygen"}}],"droppedAttributesCount":0},"scopeSpans":[{"scope":{"name":"telemetrygen","version":"","attributes":[],"droppedAttributesCount":0},"spans":[{"traceId":"c05d7a4ec8e1f231f02ed6e8da8655b4","spanId":"9630f2916e2f7909","traceState":"","parentSpanId":"d24f921c75f68e23","flags":256,"name":"okey-dokey-0","kind":2,"startTimeUnixNano":"1736480942444376000","endTimeUnixNano":"1736480942444499000","attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-client"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}},{"traceId":"c05d7a4ec8e1f231f02ed6e8da8655b4","spanId":"d24f921c75f68e23","traceState":"","parentSpanId":"","flags":256,"name":"lets-go","kind":3,"startTimeUnixNano":"1736480942444376000","endTimeUnixNano":"1736480942444499000","attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-server"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}},{"traceId":"cc9e0991a2e63d274984bd44ee669203","spanId":"8f847259b0f6e1ab","traceState":"","parentSpanId":"eba7be77e3558179","flags":256,"name":"okey-dokey-0","kind":2,"startTimeUnixNano":"1736480942444589000","endTimeUnixNano":"1736480942444712000","attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-client"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}},{"traceId":"cc9e0991a2e63d274984bd44ee669203","spanId":"eba7be77e3558179","traceState":"","parentSpanId":"","flags":256,"name":"lets-go","kind":3,"startTimeUnixNano":"1736480942444589000","endTimeUnixNano":"1736480942444712000","attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-server"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}}],"schemaUrl":""}],"schemaUrl":"https://opentelemetry.io/schemas/1.4.0"}]}
|
||||
@@ -2181,8 +2182,8 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
|
||||
HeaderValue::from_static("application/x-protobuf"),
|
||||
),
|
||||
(
|
||||
HeaderName::from_static("x-greptime-log-pipeline-name"),
|
||||
HeaderValue::from_static(TRACE_V1),
|
||||
HeaderName::from_static("x-greptime-pipeline-name"),
|
||||
HeaderValue::from_static(GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME),
|
||||
),
|
||||
(
|
||||
HeaderName::from_static("x-greptime-trace-table-name"),
|
||||
@@ -2197,10 +2198,10 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
|
||||
// select traces data
|
||||
let expected = r#"[[1736480942444376000,1736480942444499000,123000,"c05d7a4ec8e1f231f02ed6e8da8655b4","9630f2916e2f7909","d24f921c75f68e23","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]],[1736480942444376000,1736480942444499000,123000,"c05d7a4ec8e1f231f02ed6e8da8655b4","d24f921c75f68e23",null,"SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444589000,1736480942444712000,123000,"cc9e0991a2e63d274984bd44ee669203","8f847259b0f6e1ab","eba7be77e3558179","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]],[1736480942444589000,1736480942444712000,123000,"cc9e0991a2e63d274984bd44ee669203","eba7be77e3558179",null,"SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]]]"#;
|
||||
let expected = r#"[[1736480942444376000,1736480942444499000,123000,null,"c05d7a4ec8e1f231f02ed6e8da8655b4","d24f921c75f68e23","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444376000,1736480942444499000,123000,"d24f921c75f68e23","c05d7a4ec8e1f231f02ed6e8da8655b4","9630f2916e2f7909","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]],[1736480942444589000,1736480942444712000,123000,null,"cc9e0991a2e63d274984bd44ee669203","eba7be77e3558179","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444589000,1736480942444712000,123000,"eba7be77e3558179","cc9e0991a2e63d274984bd44ee669203","8f847259b0f6e1ab","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]]]"#;
|
||||
validate_data("otlp_traces", &client, "select * from mytable;", expected).await;
|
||||
|
||||
let expected_ddl = r#"[["mytable","CREATE TABLE IF NOT EXISTS \"mytable\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL,\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"trace_id\", \"span_id\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '1',\n trace_id >= '1' AND trace_id < '2',\n trace_id >= '2' AND trace_id < '3',\n trace_id >= '3' AND trace_id < '4',\n trace_id >= '4' AND trace_id < '5',\n trace_id >= '5' AND trace_id < '6',\n trace_id >= '6' AND trace_id < '7',\n trace_id >= '7' AND trace_id < '8',\n trace_id >= '8' AND trace_id < '9',\n trace_id >= '9' AND trace_id < 'A',\n trace_id >= 'A' AND trace_id < 'B' OR trace_id >= 'a' AND trace_id < 'b',\n trace_id >= 'B' AND trace_id < 'C' OR trace_id >= 'b' AND trace_id < 'c',\n trace_id >= 'C' AND trace_id < 'D' OR trace_id >= 'c' AND trace_id < 'd',\n trace_id >= 'D' AND trace_id < 'E' OR trace_id >= 'd' AND trace_id < 'e',\n trace_id >= 'E' AND trace_id < 'F' OR trace_id >= 'e' AND trace_id < 'f',\n trace_id >= 'F' AND trace_id < 'a' OR trace_id >= 'f'\n)\nENGINE=mito\nWITH(\n append_mode = 'true'\n)"]]"#;
|
||||
let expected_ddl = r#"[["mytable","CREATE TABLE IF NOT EXISTS \"mytable\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '1',\n trace_id >= '1' AND trace_id < '2',\n trace_id >= '2' AND trace_id < '3',\n trace_id >= '3' AND trace_id < '4',\n trace_id >= '4' AND trace_id < '5',\n trace_id >= '5' AND trace_id < '6',\n trace_id >= '6' AND trace_id < '7',\n trace_id >= '7' AND trace_id < '8',\n trace_id >= '8' AND trace_id < '9',\n trace_id >= '9' AND trace_id < 'A',\n trace_id >= 'A' AND trace_id < 'B' OR trace_id >= 'a' AND trace_id < 'b',\n trace_id >= 'B' AND trace_id < 'C' OR trace_id >= 'b' AND trace_id < 'c',\n trace_id >= 'C' AND trace_id < 'D' OR trace_id >= 'c' AND trace_id < 'd',\n trace_id >= 'D' AND trace_id < 'E' OR trace_id >= 'd' AND trace_id < 'e',\n trace_id >= 'E' AND trace_id < 'F' OR trace_id >= 'e' AND trace_id < 'f',\n trace_id >= 'F' AND trace_id < 'a' OR trace_id >= 'f'\n)\nENGINE=mito\nWITH(\n append_mode = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#;
|
||||
validate_data(
|
||||
"otlp_traces",
|
||||
&client,
|
||||
@@ -2222,8 +2223,8 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
|
||||
HeaderValue::from_static("application/x-protobuf"),
|
||||
),
|
||||
(
|
||||
HeaderName::from_static("x-greptime-log-pipeline-name"),
|
||||
HeaderValue::from_static(TRACE_V1),
|
||||
HeaderName::from_static("x-greptime-pipeline-name"),
|
||||
HeaderValue::from_static(GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME),
|
||||
),
|
||||
(
|
||||
HeaderName::from_static("x-greptime-trace-table-name"),
|
||||
@@ -2729,8 +2730,8 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
|
||||
"spanId": "008421dbbd33a3e9",
|
||||
"name": "access-mysql",
|
||||
"kind": 2,
|
||||
"startTimeUnixNano": "1738726754492422000",
|
||||
"endTimeUnixNano": "1738726754592422000",
|
||||
"startTimeUnixNano": "1738726754492421000",
|
||||
"endTimeUnixNano": "1738726754592421000",
|
||||
"attributes": [
|
||||
{
|
||||
"key": "operation.type",
|
||||
@@ -2807,6 +2808,7 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
|
||||
|
||||
let req: ExportTraceServiceRequest = serde_json::from_str(content).unwrap();
|
||||
let body = req.encode_to_vec();
|
||||
|
||||
// write traces data.
|
||||
let res = send_req(
|
||||
&client,
|
||||
@@ -2906,7 +2908,7 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
|
||||
"spanID": "008421dbbd33a3e9",
|
||||
"operationName": "access-mysql",
|
||||
"references": [],
|
||||
"startTime": 1738726754492422,
|
||||
"startTime": 1738726754492421,
|
||||
"duration": 100000,
|
||||
"tags": [
|
||||
{
|
||||
@@ -2919,11 +2921,6 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
|
||||
"type": "string",
|
||||
"value": "access-mysql"
|
||||
},
|
||||
{
|
||||
"key": "peer.service",
|
||||
"type": "string",
|
||||
"value": "test-jaeger-query-api"
|
||||
},
|
||||
{
|
||||
"key": "otel.scope.name",
|
||||
"type": "string",
|
||||
@@ -2934,6 +2931,11 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
|
||||
"type": "string",
|
||||
"value": "1.0.0"
|
||||
},
|
||||
{
|
||||
"key": "peer.service",
|
||||
"type": "string",
|
||||
"value": "test-jaeger-query-api"
|
||||
},
|
||||
{
|
||||
"key": "span.kind",
|
||||
"type": "string",
|
||||
@@ -2961,11 +2963,6 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
|
||||
"type": "string",
|
||||
"value": "access-redis"
|
||||
},
|
||||
{
|
||||
"key": "peer.service",
|
||||
"type": "string",
|
||||
"value": "test-jaeger-query-api"
|
||||
},
|
||||
{
|
||||
"key": "otel.scope.name",
|
||||
"type": "string",
|
||||
@@ -2976,6 +2973,11 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
|
||||
"type": "string",
|
||||
"value": "1.0.0"
|
||||
},
|
||||
{
|
||||
"key": "peer.service",
|
||||
"type": "string",
|
||||
"value": "test-jaeger-query-api"
|
||||
},
|
||||
{
|
||||
"key": "span.kind",
|
||||
"type": "string",
|
||||
@@ -3008,7 +3010,7 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
|
||||
|
||||
// Test `/api/traces` API.
|
||||
let res = client
|
||||
.get("/v1/jaeger/api/traces?service=test-jaeger-query-api&operation=access-mysql&start=1738726754492422&end=1738726754642422&tags=%7B%22operation.type%22%3A%22access-mysql%22%7D")
|
||||
.get("/v1/jaeger/api/traces?service=test-jaeger-query-api&operation=access-mysql&start=1738726754492421&end=1738726754642422&tags=%7B%22operation.type%22%3A%22access-mysql%22%7D")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
@@ -3023,7 +3025,7 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
|
||||
"spanID": "008421dbbd33a3e9",
|
||||
"operationName": "access-mysql",
|
||||
"references": [],
|
||||
"startTime": 1738726754492422,
|
||||
"startTime": 1738726754492421,
|
||||
"duration": 100000,
|
||||
"tags": [
|
||||
{
|
||||
@@ -3036,11 +3038,6 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
|
||||
"type": "string",
|
||||
"value": "access-mysql"
|
||||
},
|
||||
{
|
||||
"key": "peer.service",
|
||||
"type": "string",
|
||||
"value": "test-jaeger-query-api"
|
||||
},
|
||||
{
|
||||
"key": "otel.scope.name",
|
||||
"type": "string",
|
||||
@@ -3051,6 +3048,11 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
|
||||
"type": "string",
|
||||
"value": "1.0.0"
|
||||
},
|
||||
{
|
||||
"key": "peer.service",
|
||||
"type": "string",
|
||||
"value": "test-jaeger-query-api"
|
||||
},
|
||||
{
|
||||
"key": "span.kind",
|
||||
"type": "string",
|
||||
@@ -3084,6 +3086,429 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (app, mut guard) =
|
||||
setup_test_http_app_with_frontend(store_type, "test_jaeger_query_api_v1").await;
|
||||
|
||||
let client = TestClient::new(app).await;
|
||||
|
||||
// Test empty response for `/api/services` API before writing any traces.
|
||||
let res = client
|
||||
.get("/v1/jaeger/api/services")
|
||||
.header("x-greptime-trace-table-name", "mytable")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
let expected = r#"
|
||||
{
|
||||
"data": null,
|
||||
"total": 0,
|
||||
"limit": 0,
|
||||
"offset": 0,
|
||||
"errors": []
|
||||
}
|
||||
"#;
|
||||
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
|
||||
let expected: Value = serde_json::from_str(expected).unwrap();
|
||||
assert_eq!(resp, expected);
|
||||
|
||||
let content = r#"
|
||||
{
|
||||
"resourceSpans": [
|
||||
{
|
||||
"resource": {
|
||||
"attributes": [
|
||||
{
|
||||
"key": "service.name",
|
||||
"value": {
|
||||
"stringValue": "test-jaeger-query-api"
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"scopeSpans": [
|
||||
{
|
||||
"scope": {
|
||||
"name": "test-jaeger-query-api",
|
||||
"version": "1.0.0"
|
||||
},
|
||||
"spans": [
|
||||
{
|
||||
"traceId": "5611dce1bc9ebed65352d99a027b08ea",
|
||||
"spanId": "008421dbbd33a3e9",
|
||||
"name": "access-mysql",
|
||||
"kind": 2,
|
||||
"startTimeUnixNano": "1738726754492421000",
|
||||
"endTimeUnixNano": "1738726754592421000",
|
||||
"attributes": [
|
||||
{
|
||||
"key": "operation.type",
|
||||
"value": {
|
||||
"stringValue": "access-mysql"
|
||||
}
|
||||
},
|
||||
{
|
||||
"key": "net.peer.ip",
|
||||
"value": {
|
||||
"stringValue": "1.2.3.4"
|
||||
}
|
||||
},
|
||||
{
|
||||
"key": "peer.service",
|
||||
"value": {
|
||||
"stringValue": "test-jaeger-query-api"
|
||||
}
|
||||
}
|
||||
],
|
||||
"status": {
|
||||
"message": "success",
|
||||
"code": 0
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"scope": {
|
||||
"name": "test-jaeger-query-api",
|
||||
"version": "1.0.0"
|
||||
},
|
||||
"spans": [
|
||||
{
|
||||
"traceId": "5611dce1bc9ebed65352d99a027b08ea",
|
||||
"spanId": "ffa03416a7b9ea48",
|
||||
"name": "access-redis",
|
||||
"kind": 2,
|
||||
"startTimeUnixNano": "1738726754492422000",
|
||||
"endTimeUnixNano": "1738726754592422000",
|
||||
"attributes": [
|
||||
{
|
||||
"key": "operation.type",
|
||||
"value": {
|
||||
"stringValue": "access-redis"
|
||||
}
|
||||
},
|
||||
{
|
||||
"key": "net.peer.ip",
|
||||
"value": {
|
||||
"stringValue": "1.2.3.4"
|
||||
}
|
||||
},
|
||||
{
|
||||
"key": "peer.service",
|
||||
"value": {
|
||||
"stringValue": "test-jaeger-query-api"
|
||||
}
|
||||
}
|
||||
],
|
||||
"status": {
|
||||
"message": "success",
|
||||
"code": 0
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
],
|
||||
"schemaUrl": "https://opentelemetry.io/schemas/1.4.0"
|
||||
}
|
||||
]
|
||||
}
|
||||
"#;
|
||||
|
||||
let req: ExportTraceServiceRequest = serde_json::from_str(content).unwrap();
|
||||
let body = req.encode_to_vec();
|
||||
// write traces data.
|
||||
let res = send_req(
|
||||
&client,
|
||||
vec![
|
||||
(
|
||||
HeaderName::from_static("content-type"),
|
||||
HeaderValue::from_static("application/x-protobuf"),
|
||||
),
|
||||
(
|
||||
HeaderName::from_static("x-greptime-log-pipeline-name"),
|
||||
HeaderValue::from_static(GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME),
|
||||
),
|
||||
(
|
||||
HeaderName::from_static("x-greptime-trace-table-name"),
|
||||
HeaderValue::from_static("mytable"),
|
||||
),
|
||||
],
|
||||
"/v1/otlp/v1/traces",
|
||||
body.clone(),
|
||||
false,
|
||||
)
|
||||
.await;
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
|
||||
// Test `/api/services` API.
|
||||
let res = client
|
||||
.get("/v1/jaeger/api/services")
|
||||
.header("x-greptime-trace-table-name", "mytable")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
let expected = r#"
|
||||
{
|
||||
"data": [
|
||||
"test-jaeger-query-api"
|
||||
],
|
||||
"total": 1,
|
||||
"limit": 0,
|
||||
"offset": 0,
|
||||
"errors": []
|
||||
}
|
||||
"#;
|
||||
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
|
||||
let expected: Value = serde_json::from_str(expected).unwrap();
|
||||
assert_eq!(resp, expected);
|
||||
|
||||
// Test `/api/operations` API.
|
||||
let res = client
|
||||
.get("/v1/jaeger/api/operations?service=test-jaeger-query-api")
|
||||
.header("x-greptime-trace-table-name", "mytable")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
let expected = r#"
|
||||
{
|
||||
"data": [
|
||||
{
|
||||
"name": "access-mysql",
|
||||
"spanKind": "server"
|
||||
},
|
||||
{
|
||||
"name": "access-redis",
|
||||
"spanKind": "server"
|
||||
}
|
||||
],
|
||||
"total": 2,
|
||||
"limit": 0,
|
||||
"offset": 0,
|
||||
"errors": []
|
||||
}
|
||||
"#;
|
||||
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
|
||||
let expected: Value = serde_json::from_str(expected).unwrap();
|
||||
assert_eq!(resp, expected);
|
||||
|
||||
// Test `/api/services/{service_name}/operations` API.
|
||||
let res = client
|
||||
.get("/v1/jaeger/api/services/test-jaeger-query-api/operations")
|
||||
.header("x-greptime-trace-table-name", "mytable")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
let expected = r#"
|
||||
{
|
||||
"data": [
|
||||
"access-mysql",
|
||||
"access-redis"
|
||||
],
|
||||
"total": 2,
|
||||
"limit": 0,
|
||||
"offset": 0,
|
||||
"errors": []
|
||||
}
|
||||
"#;
|
||||
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
|
||||
let expected: Value = serde_json::from_str(expected).unwrap();
|
||||
assert_eq!(resp, expected);
|
||||
|
||||
// Test `/api/traces/{trace_id}` API.
|
||||
let res = client
|
||||
.get("/v1/jaeger/api/traces/5611dce1bc9ebed65352d99a027b08ea")
|
||||
.header("x-greptime-trace-table-name", "mytable")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
let expected = r#"{
|
||||
"data": [
|
||||
{
|
||||
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
|
||||
"spans": [
|
||||
{
|
||||
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
|
||||
"spanID": "008421dbbd33a3e9",
|
||||
"operationName": "access-mysql",
|
||||
"references": [],
|
||||
"startTime": 1738726754492421,
|
||||
"duration": 100000,
|
||||
"tags": [
|
||||
{
|
||||
"key": "net.peer.ip",
|
||||
"type": "string",
|
||||
"value": "1.2.3.4"
|
||||
},
|
||||
{
|
||||
"key": "operation.type",
|
||||
"type": "string",
|
||||
"value": "access-mysql"
|
||||
},
|
||||
{
|
||||
"key": "otel.scope.name",
|
||||
"type": "string",
|
||||
"value": "test-jaeger-query-api"
|
||||
},
|
||||
{
|
||||
"key": "otel.scope.version",
|
||||
"type": "string",
|
||||
"value": "1.0.0"
|
||||
},
|
||||
{
|
||||
"key": "peer.service",
|
||||
"type": "string",
|
||||
"value": "test-jaeger-query-api"
|
||||
},
|
||||
{
|
||||
"key": "span.kind",
|
||||
"type": "string",
|
||||
"value": "server"
|
||||
}
|
||||
],
|
||||
"logs": [],
|
||||
"processID": "p1"
|
||||
},
|
||||
{
|
||||
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
|
||||
"spanID": "ffa03416a7b9ea48",
|
||||
"operationName": "access-redis",
|
||||
"references": [],
|
||||
"startTime": 1738726754492422,
|
||||
"duration": 100000,
|
||||
"tags": [
|
||||
{
|
||||
"key": "net.peer.ip",
|
||||
"type": "string",
|
||||
"value": "1.2.3.4"
|
||||
},
|
||||
{
|
||||
"key": "operation.type",
|
||||
"type": "string",
|
||||
"value": "access-redis"
|
||||
},
|
||||
{
|
||||
"key": "otel.scope.name",
|
||||
"type": "string",
|
||||
"value": "test-jaeger-query-api"
|
||||
},
|
||||
{
|
||||
"key": "otel.scope.version",
|
||||
"type": "string",
|
||||
"value": "1.0.0"
|
||||
},
|
||||
{
|
||||
"key": "peer.service",
|
||||
"type": "string",
|
||||
"value": "test-jaeger-query-api"
|
||||
},
|
||||
{
|
||||
"key": "span.kind",
|
||||
"type": "string",
|
||||
"value": "server"
|
||||
}
|
||||
],
|
||||
"logs": [],
|
||||
"processID": "p1"
|
||||
}
|
||||
],
|
||||
"processes": {
|
||||
"p1": {
|
||||
"serviceName": "test-jaeger-query-api",
|
||||
"tags": []
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"total": 0,
|
||||
"limit": 0,
|
||||
"offset": 0,
|
||||
"errors": []
|
||||
}
|
||||
"#;
|
||||
|
||||
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
|
||||
let expected: Value = serde_json::from_str(expected).unwrap();
|
||||
assert_eq!(resp, expected);
|
||||
|
||||
// Test `/api/traces` API.
|
||||
let res = client
|
||||
.get("/v1/jaeger/api/traces?service=test-jaeger-query-api&operation=access-mysql&start=1738726754492421&end=1738726754642422&tags=%7B%22operation.type%22%3A%22access-mysql%22%7D")
|
||||
.header("x-greptime-trace-table-name", "mytable")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
let expected = r#"
|
||||
{
|
||||
"data": [
|
||||
{
|
||||
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
|
||||
"spans": [
|
||||
{
|
||||
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
|
||||
"spanID": "008421dbbd33a3e9",
|
||||
"operationName": "access-mysql",
|
||||
"references": [],
|
||||
"startTime": 1738726754492421,
|
||||
"duration": 100000,
|
||||
"tags": [
|
||||
{
|
||||
"key": "net.peer.ip",
|
||||
"type": "string",
|
||||
"value": "1.2.3.4"
|
||||
},
|
||||
{
|
||||
"key": "operation.type",
|
||||
"type": "string",
|
||||
"value": "access-mysql"
|
||||
},
|
||||
{
|
||||
"key": "otel.scope.name",
|
||||
"type": "string",
|
||||
"value": "test-jaeger-query-api"
|
||||
},
|
||||
{
|
||||
"key": "otel.scope.version",
|
||||
"type": "string",
|
||||
"value": "1.0.0"
|
||||
},
|
||||
{
|
||||
"key": "peer.service",
|
||||
"type": "string",
|
||||
"value": "test-jaeger-query-api"
|
||||
},
|
||||
{
|
||||
"key": "span.kind",
|
||||
"type": "string",
|
||||
"value": "server"
|
||||
}
|
||||
],
|
||||
"logs": [],
|
||||
"processID": "p1"
|
||||
}
|
||||
],
|
||||
"processes": {
|
||||
"p1": {
|
||||
"serviceName": "test-jaeger-query-api",
|
||||
"tags": []
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"total": 0,
|
||||
"limit": 0,
|
||||
"offset": 0,
|
||||
"errors": []
|
||||
}
|
||||
"#;
|
||||
|
||||
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
|
||||
let expected: Value = serde_json::from_str(expected).unwrap();
|
||||
assert_eq!(resp, expected);
|
||||
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
async fn validate_data(test_name: &str, client: &TestClient, sql: &str, expected: &str) {
|
||||
let res = client
|
||||
.get(format!("/v1/sql?sql={sql}").as_str())
|
||||
|
||||
@@ -326,6 +326,31 @@ FROM (
|
||||
| [7.0, 8.0, 9.0, 10.0] | 4 |
|
||||
+-----------------------+------------+
|
||||
|
||||
SELECT vec_kth_elem('[1.0, 2.0, 3.0]', 2);
|
||||
|
||||
+------------------------------------------------+
|
||||
| vec_kth_elem(Utf8("[1.0, 2.0, 3.0]"),Int64(2)) |
|
||||
+------------------------------------------------+
|
||||
| 3.0 |
|
||||
+------------------------------------------------+
|
||||
|
||||
SELECT v, vec_kth_elem(v, 0) AS first_elem
|
||||
FROM (
|
||||
SELECT '[1.0, 2.0, 3.0]' AS v
|
||||
UNION ALL
|
||||
SELECT '[4.0, 5.0, 6.0, 7.0]' AS v
|
||||
UNION ALL
|
||||
SELECT '[8.0]' AS v
|
||||
)
|
||||
WHERE vec_kth_elem(v, 0) > 2.0;
|
||||
|
||||
+----------------------+------------+
|
||||
| v | first_elem |
|
||||
+----------------------+------------+
|
||||
| [4.0, 5.0, 6.0, 7.0] | 4.0 |
|
||||
| [8.0] | 8.0 |
|
||||
+----------------------+------------+
|
||||
|
||||
SELECT vec_to_string(vec_subvector('[1.0,2.0,3.0,4.0,5.0]', 0, 3));
|
||||
|
||||
+-------------------------------------------------------------------------------+
|
||||
|
||||
@@ -100,6 +100,18 @@ FROM (
|
||||
SELECT '[7.0, 8.0, 9.0, 10.0]' AS v
|
||||
) Order By vec_dim(v) ASC;
|
||||
|
||||
SELECT vec_kth_elem('[1.0, 2.0, 3.0]', 2);
|
||||
|
||||
SELECT v, vec_kth_elem(v, 0) AS first_elem
|
||||
FROM (
|
||||
SELECT '[1.0, 2.0, 3.0]' AS v
|
||||
UNION ALL
|
||||
SELECT '[4.0, 5.0, 6.0, 7.0]' AS v
|
||||
UNION ALL
|
||||
SELECT '[8.0]' AS v
|
||||
)
|
||||
WHERE vec_kth_elem(v, 0) > 2.0;
|
||||
|
||||
SELECT vec_to_string(vec_subvector('[1.0,2.0,3.0,4.0,5.0]', 0, 3));
|
||||
|
||||
SELECT vec_to_string(vec_subvector('[1.0,2.0,3.0,4.0,5.0]', 5, 5));
|
||||
@@ -121,4 +133,3 @@ FROM (
|
||||
UNION ALL
|
||||
SELECT '[4.0, 5.0, 6.0, 10, -8, 100]' AS v
|
||||
) ORDER BY v;
|
||||
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user