Compare commits

...

17 Commits

Author SHA1 Message Date
Weny Xu
4eb0771afe feat: introduce install_manifest_to for RegionManifestManager (#5742)
* feat: introduce `install_manifest_changes` for `RegionManifestManager`

* chore: rename function to `install_manifest_to`

* Apply suggestions from code review

Co-authored-by: jeremyhi <jiachun_feng@proton.me>

* chore: add comments

* chore: add comments

* chore: update logic and add comments

* chore: add more check

* Update src/mito2/src/manifest/manager.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

---------

Co-authored-by: jeremyhi <jiachun_feng@proton.me>
Co-authored-by: Yingwen <realevenyag@gmail.com>
2025-03-21 05:19:23 +00:00
Yohan Wal
a0739a96e4 fix: wrap table name with `` (#5748)
* fix: wrap table name with quotes

* fix: minor fix
2025-03-20 09:38:54 +00:00
Ning Sun
77ccf1eac8 chore: add datanode write rows to grafana dashboard (#5745) 2025-03-20 03:39:40 +00:00
Yohan Wal
1dc4a196bf feat: add mysql election logic (#5694)
* feat: add mysql election

* feat: add mysql election

* chore: fix deps

* chore: fix deps

* fix: duplicate container

* fix: duplicate setup for sqlness

* fix: call once

* fix: do not use NOWAIT for mysql 5.7

* chore: apply comments

* fix: no parallel sqlness for mysql

* chore: comments and minor revert

* chore: apply comments

* chore: apply comments

* chore: add  to table name

* ci: use 2 metasrv to detect election bugs

* refactor: better election logic

* chore: apply comments

* chore: apply comments

* feat: version check before startup
2025-03-19 11:31:18 +00:00
shuiyisong
2431cd3bdf chore: merge error files under pipeline crate (#5738) 2025-03-19 09:55:51 +00:00
discord9
cd730e0486 fix: mysql prepare limit&offset param (#5734)
* fix: prepare limit&offset param

* test: sqlness

* chore: per review

* chore: per review
2025-03-19 07:49:26 +00:00
zyy17
a19441bed8 refactor: remove trace id from primary key in opentelemetry_traces table (#5733)
* refactor: remove trace id in primary key

* refactor: remove trace id in primary key in v0 model

* refactor: add span id in v1

* fix: integration test
2025-03-19 06:17:58 +00:00
dennis zhuang
162e3b8620 docs: adds news to readme (#5735) 2025-03-19 01:33:46 +00:00
Wenbin
83642dab87 feat: remove duplicated peer definition (#5728)
* remove duplicate peer

* fix
2025-03-18 11:30:25 +00:00
discord9
46070958c9 fix: mysql prepare bool value (#5732) 2025-03-18 10:50:45 +00:00
pikady
eea8b1c730 feat: add vec_kth_elem function (#5674)
* feat: add vec_kth_elem function

Signed-off-by: pikady <2652917633@qq.com>

* code format

Signed-off-by: pikady <2652917633@qq.com>

* add test sql

Signed-off-by: pikady <2652917633@qq.com>

* change indexing from 1-based to 0-based

Signed-off-by: pikady <2652917633@qq.com>

* improve code formatting and correct spelling errors

Signed-off-by: pikady <2652917633@qq.com>

* Update tests/cases/standalone/common/function/vector/vector.sql

I noticed the two lines are identical. Could you clarify the reason for the change? Thanks!

Co-authored-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: pikady <2652917633@qq.com>
Co-authored-by: Zhenchi <zhongzc_arch@outlook.com>
2025-03-18 07:25:53 +00:00
Ning Sun
1ab4ddab8d feat: update pipeline header name to x-greptime-pipeline-name (#5710)
* feat: update pipeline header name to x-greptime-pipeline-name

* refactor: update string_value_from_header
2025-03-18 02:39:54 +00:00
Ning Sun
9e63018198 feat: disable http timeout (#5721)
* feat: update to disable http timeout by default

* feat: make http timeout default to 0

* test: correct test case

* chore: generate new config doc

* test: correct tests
2025-03-18 01:18:56 +00:00
discord9
594bec8c36 feat: load manifest manually in mito engine (#5725)
* feat: load manifest and some

* chore: per review
2025-03-18 01:18:08 +00:00
localhost
1586732d20 chore: add some method for log query handler (#5685)
* chore: add some method for log query handler

* chore: make clippy happy

* chore: add some method for log query handler

* Update src/frontend/src/instance/logs.rs

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>

---------

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>
2025-03-17 18:36:43 +00:00
yihong
16fddd97a7 chore: revert commit update flate2 version (#5706)" (#5715)
Revert "chore: update flate2 version (#5706)"

This reverts commit a5df3954f3.
2025-03-17 12:16:26 +00:00
Ning Sun
2260782c12 refactor: update jaeger api implementation for new trace modeling (#5655)
* refactor: update jaeger api implementation

* test: add tests for v1 data model

* feat: customize trace table name

* fix: update column requirements to use Column type instead of String

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: lint fix

* refactor: accumulate resource attributes for v1

* fix: add empty check for additional string

* feat: add table option to mark data model version

* fix: do not overwrite all tags

* feat: use table option to mark table data model version and process accordingly

* chore: update comments to reflect query changes

* feat: use header for jaeger table name

* feat: update index for service_name, drop index for span_name

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: zyy17 <zyylsxm@gmail.com>
2025-03-17 07:31:32 +00:00
107 changed files with 3550 additions and 886 deletions

View File

@@ -8,7 +8,7 @@ inputs:
default: 2
description: "Number of Datanode replicas"
meta-replicas:
default: 1
default: 2
description: "Number of Metasrv replicas"
image-registry:
default: "docker.io"

View File

@@ -576,9 +576,12 @@ jobs:
- name: "Remote WAL"
opts: "-w kafka -k 127.0.0.1:9092"
kafka: true
- name: "Pg Kvbackend"
- name: "PostgreSQL KvBackend"
opts: "--setup-pg"
kafka: false
- name: "MySQL Kvbackend"
opts: "--setup-mysql"
kafka: false
timeout-minutes: 60
steps:
- uses: actions/checkout@v4

28
Cargo.lock generated
View File

@@ -4119,12 +4119,11 @@ dependencies = [
[[package]]
name = "flate2"
version = "1.1.0"
version = "1.0.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11faaf5a5236997af9848be0bef4db95824b1d534ebc64d0f0c6cf3e67bd38dc"
checksum = "a1b589b4dc103969ad3cf85c950899926ec64300a1a46d76c03a6072957036f0"
dependencies = [
"crc32fast",
"libz-rs-sys",
"libz-sys",
"miniz_oxide",
]
@@ -4706,7 +4705,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=c5419bbd20cb42e568ec325a4d71a3c94cc327e1#c5419bbd20cb42e568ec325a4d71a3c94cc327e1"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a7274ddce299f33d23dbe8af5bbe6219f07c559a#a7274ddce299f33d23dbe8af5bbe6219f07c559a"
dependencies = [
"prost 0.13.3",
"serde",
@@ -6279,15 +6278,6 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "libz-rs-sys"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "902bc563b5d65ad9bba616b490842ef0651066a1a1dc3ce1087113ffcb873c8d"
dependencies = [
"zlib-rs",
]
[[package]]
name = "libz-sys"
version = "1.1.20"
@@ -6730,6 +6720,7 @@ dependencies = [
"servers",
"session",
"snafu 0.8.5",
"sqlx",
"store-api",
"strum 0.25.0",
"table",
@@ -6832,9 +6823,9 @@ checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "miniz_oxide"
version = "0.8.5"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e3e04debbb59698c15bacbb6d93584a8c0ca9cc3213cb423d31f760d8843ce5"
checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1"
dependencies = [
"adler2",
]
@@ -11930,6 +11921,7 @@ dependencies = [
"operator",
"partition",
"paste",
"pipeline",
"prost 0.13.3",
"query",
"rand",
@@ -13964,12 +13956,6 @@ dependencies = [
"syn 2.0.96",
]
[[package]]
name = "zlib-rs"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b20717f0917c908dc63de2e44e97f1e6b126ca58d0e391cee86d504eb8fbd05"
[[package]]
name = "zstd"
version = "0.11.2+zstd.1.5.2"

View File

@@ -126,11 +126,10 @@ deadpool-postgres = "0.12"
derive_builder = "0.12"
dotenv = "0.15"
etcd-client = "0.14"
flate2 = { version = "1.1.0", default-features = false, features = ["zlib-rs"] }
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "c5419bbd20cb42e568ec325a4d71a3c94cc327e1" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a7274ddce299f33d23dbe8af5bbe6219f07c559a" }
hex = "0.4"
http = "1"
humantime = "2.1"
@@ -192,6 +191,8 @@ snafu = "0.8"
sqlx = { version = "0.8", features = [
"runtime-tokio-rustls",
"mysql",
"postgres",
"chrono",
] }
sysinfo = "0.30"
# on branch v0.52.x

View File

@@ -6,7 +6,7 @@
</picture>
</p>
<h2 align="center">Unified & Cost-Effective Time Series Database for Metrics, Logs, and Events</h2>
<h2 align="center">Unified & Cost-Effective Observerability Database for Metrics, Logs, and Events</h2>
<div align="center">
<h3 align="center">
@@ -62,15 +62,19 @@
## Introduction
**GreptimeDB** is an open-source unified & cost-effective time-series database for **Metrics**, **Logs**, and **Events** (also **Traces** in plan). You can gain real-time insights from Edge to Cloud at Any Scale.
**GreptimeDB** is an open-source unified & cost-effective observerability database for **Metrics**, **Logs**, and **Events** (also **Traces** in plan). You can gain real-time insights from Edge to Cloud at Any Scale.
## News
**[GreptimeDB archives 1 billion cold run #1 in JSONBench!](https://greptime.com/blogs/2025-03-18-jsonbench-greptimedb-performance)**
## Why GreptimeDB
Our core developers have been building time-series data platforms for years. Based on our best practices, GreptimeDB was born to give you:
Our core developers have been building observerability data platforms for years. Based on our best practices, GreptimeDB was born to give you:
* **Unified Processing of Metrics, Logs, and Events**
GreptimeDB unifies time series data processing by treating all data - whether metrics, logs, or events - as timestamped events with context. Users can analyze this data using either [SQL](https://docs.greptime.com/user-guide/query-data/sql) or [PromQL](https://docs.greptime.com/user-guide/query-data/promql) and leverage stream processing ([Flow](https://docs.greptime.com/user-guide/flow-computation/overview)) to enable continuous aggregation. [Read more](https://docs.greptime.com/user-guide/concepts/data-model).
GreptimeDB unifies observerability data processing by treating all data - whether metrics, logs, or events - as timestamped events with context. Users can analyze this data using either [SQL](https://docs.greptime.com/user-guide/query-data/sql) or [PromQL](https://docs.greptime.com/user-guide/query-data/promql) and leverage stream processing ([Flow](https://docs.greptime.com/user-guide/flow-computation/overview)) to enable continuous aggregation. [Read more](https://docs.greptime.com/user-guide/concepts/data-model).
* **Cloud-native Distributed Database**

View File

@@ -24,7 +24,7 @@
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
| `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default<br/>This allows browser to access http APIs without CORS restrictions |
| `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. |
@@ -222,7 +222,7 @@
| `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
| `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default<br/>This allows browser to access http APIs without CORS restrictions |
| `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. |
@@ -390,7 +390,7 @@
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. Enabled by default. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.bind_addr` | String | `127.0.0.1:3001` | The address to bind the gRPC server. |
@@ -563,7 +563,7 @@
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
| `meta_client` | -- | -- | The metasrv client options. |
| `meta_client.metasrv_addrs` | Array | -- | The addresses of the metasrv. |

View File

@@ -27,7 +27,7 @@ max_concurrent_queries = 0
## The address to bind the HTTP server.
addr = "127.0.0.1:4000"
## HTTP request timeout. Set to 0 to disable timeout.
timeout = "30s"
timeout = "0s"
## HTTP request body limit.
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
## Set to 0 to disable limit.

View File

@@ -30,7 +30,7 @@ max_send_message_size = "512MB"
## The address to bind the HTTP server.
addr = "127.0.0.1:4000"
## HTTP request timeout. Set to 0 to disable timeout.
timeout = "30s"
timeout = "0s"
## HTTP request body limit.
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
## Set to 0 to disable limit.

View File

@@ -26,7 +26,7 @@ retry_interval = "3s"
## The address to bind the HTTP server.
addr = "127.0.0.1:4000"
## HTTP request timeout. Set to 0 to disable timeout.
timeout = "30s"
timeout = "0s"
## HTTP request body limit.
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
## Set to 0 to disable limit.

View File

@@ -34,7 +34,7 @@ max_concurrent_queries = 0
## The address to bind the HTTP server.
addr = "127.0.0.1:4000"
## HTTP request timeout. Set to 0 to disable timeout.
timeout = "30s"
timeout = "0s"
## HTTP request body limit.
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
## Set to 0 to disable limit.

View File

@@ -4782,7 +4782,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Current counts for stalled write requests by instance\n\nWrite stalls when memtable is full and pending for flush\n\n",
"description": "Ingestion size by row counts.",
"fieldConfig": {
"defaults": {
"color": {
@@ -4844,7 +4844,7 @@
"x": 12,
"y": 138
},
"id": 221,
"id": 277,
"options": {
"legend": {
"calcs": [],
@@ -4864,14 +4864,14 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "sum by(pod) (greptime_mito_write_stall_total{pod=~\"$datanode\"})",
"expr": "rate(greptime_mito_write_rows_total{pod=~\"$datanode\"}[$__rate_interval])",
"instant": false,
"legendFormat": "{{pod}}",
"range": true,
"refId": "A"
}
],
"title": "Write Stall per Instance",
"title": "Write Rows per Instance",
"type": "timeseries"
},
{
@@ -4976,7 +4976,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Cache size by instance.\n",
"description": "Current counts for stalled write requests by instance\n\nWrite stalls when memtable is full and pending for flush\n\n",
"fieldConfig": {
"defaults": {
"color": {
@@ -5028,7 +5028,7 @@
}
]
},
"unit": "decbytes"
"unit": "none"
},
"overrides": []
},
@@ -5038,7 +5038,7 @@
"x": 12,
"y": 146
},
"id": 229,
"id": 221,
"options": {
"legend": {
"calcs": [],
@@ -5058,14 +5058,14 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "greptime_mito_cache_bytes{pod=~\"$datanode\"}",
"expr": "sum by(pod) (greptime_mito_write_stall_total{pod=~\"$datanode\"})",
"instant": false,
"legendFormat": "{{pod}}-{{type}}",
"legendFormat": "{{pod}}",
"range": true,
"refId": "A"
}
],
"title": "Cached Bytes per Instance",
"title": "Write Stall per Instance",
"type": "timeseries"
},
{
@@ -5172,7 +5172,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "P99 latency of each type of reads by instance",
"description": "Cache size by instance.\n",
"fieldConfig": {
"defaults": {
"color": {
@@ -5224,7 +5224,7 @@
}
]
},
"unit": "s"
"unit": "decbytes"
},
"overrides": []
},
@@ -5234,17 +5234,13 @@
"x": 12,
"y": 154
},
"id": 228,
"id": 229,
"options": {
"legend": {
"calcs": [
"lastNotNull"
],
"calcs": [],
"displayMode": "table",
"placement": "bottom",
"showLegend": true,
"sortBy": "Last *",
"sortDesc": true
"showLegend": true
},
"tooltip": {
"mode": "single",
@@ -5258,14 +5254,14 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, sum by(pod, le, stage) (rate(greptime_mito_read_stage_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
"expr": "greptime_mito_cache_bytes{pod=~\"$datanode\"}",
"instant": false,
"legendFormat": "{{pod}}-{{stage}}-p99",
"legendFormat": "{{pod}}-{{type}}",
"range": true,
"refId": "A"
}
],
"title": "Read Stage P99 per Instance",
"title": "Cached Bytes per Instance",
"type": "timeseries"
},
{
@@ -5317,7 +5313,8 @@
"mode": "absolute",
"steps": [
{
"color": "green"
"color": "green",
"value": null
},
{
"color": "red",
@@ -5370,7 +5367,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Latency of compaction task, at p99",
"description": "P99 latency of each type of reads by instance",
"fieldConfig": {
"defaults": {
"color": {
@@ -5414,7 +5411,8 @@
"mode": "absolute",
"steps": [
{
"color": "green"
"color": "green",
"value": null
},
{
"color": "red",
@@ -5432,7 +5430,7 @@
"x": 12,
"y": 162
},
"id": 230,
"id": 228,
"options": {
"legend": {
"calcs": [
@@ -5440,7 +5438,9 @@
],
"displayMode": "table",
"placement": "bottom",
"showLegend": true
"showLegend": true,
"sortBy": "Last *",
"sortDesc": true
},
"tooltip": {
"mode": "single",
@@ -5454,14 +5454,14 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, sum by(pod, le) (rate(greptime_mito_compaction_total_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
"expr": "histogram_quantile(0.99, sum by(pod, le, stage) (rate(greptime_mito_read_stage_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
"instant": false,
"legendFormat": "[{{pod}}]-compaction-p99",
"legendFormat": "{{pod}}-{{stage}}-p99",
"range": true,
"refId": "A"
}
],
"title": "Compaction P99 per Instance",
"title": "Read Stage P99 per Instance",
"type": "timeseries"
},
{
@@ -5570,7 +5570,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Compaction latency by stage",
"description": "Latency of compaction task, at p99",
"fieldConfig": {
"defaults": {
"color": {
@@ -5632,7 +5632,7 @@
"x": 12,
"y": 170
},
"id": 232,
"id": 230,
"options": {
"legend": {
"calcs": [
@@ -5654,9 +5654,9 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, sum by(pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
"expr": "histogram_quantile(0.99, sum by(pod, le) (rate(greptime_mito_compaction_total_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
"instant": false,
"legendFormat": "{{pod}}-{{stage}}-p99",
"legendFormat": "[{{pod}}]-compaction-p99",
"range": true,
"refId": "A"
}
@@ -5794,7 +5794,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Write-ahead log operations latency at p99",
"description": "Compaction latency by stage",
"fieldConfig": {
"defaults": {
"color": {
@@ -5856,13 +5856,13 @@
"x": 12,
"y": 178
},
"id": 269,
"id": 232,
"options": {
"legend": {
"calcs": [
"lastNotNull"
],
"displayMode": "list",
"displayMode": "table",
"placement": "bottom",
"showLegend": true
},
@@ -5878,14 +5878,14 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, sum by(le,logstore,optype,pod) (rate(greptime_logstore_op_elapsed_bucket[$__rate_interval])))",
"expr": "histogram_quantile(0.99, sum by(pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
"instant": false,
"legendFormat": "{{pod}}-{{logstore}}-{{optype}}-p99",
"legendFormat": "{{pod}}-{{stage}}-p99",
"range": true,
"refId": "A"
}
],
"title": "Log Store op duration seconds",
"title": "Compaction P99 per Instance",
"type": "timeseries"
},
{
@@ -5993,7 +5993,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Ongoing compaction task count",
"description": "Write-ahead log operations latency at p99",
"fieldConfig": {
"defaults": {
"color": {
@@ -6045,7 +6045,7 @@
}
]
},
"unit": "none"
"unit": "s"
},
"overrides": []
},
@@ -6055,13 +6055,13 @@
"x": 12,
"y": 186
},
"id": 271,
"id": 269,
"options": {
"legend": {
"calcs": [
"lastNotNull"
],
"displayMode": "table",
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
@@ -6078,14 +6078,14 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "greptime_mito_inflight_compaction_count",
"expr": "histogram_quantile(0.99, sum by(le,logstore,optype,pod) (rate(greptime_logstore_op_elapsed_bucket[$__rate_interval])))",
"instant": false,
"legendFormat": "{{pod}}",
"legendFormat": "{{pod}}-{{logstore}}-{{optype}}-p99",
"range": true,
"refId": "A"
}
],
"title": "Inflight Compaction",
"title": "Log Store op duration seconds",
"type": "timeseries"
},
{
@@ -6188,6 +6188,105 @@
"title": "Inflight Flush",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Ongoing compaction task count",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "points",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "none"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 194
},
"id": 271,
"options": {
"legend": {
"calcs": [
"lastNotNull"
],
"displayMode": "table",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.1.3",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "greptime_mito_inflight_compaction_count",
"instant": false,
"legendFormat": "{{pod}}",
"range": true,
"refId": "A"
}
],
"title": "Inflight Compaction",
"type": "timeseries"
},
{
"collapsed": false,
"gridPos": {

View File

@@ -440,7 +440,7 @@ mod tests {
[http]
addr = "127.0.0.1:4000"
timeout = "30s"
timeout = "0s"
body_limit = "2GB"
[opentsdb]
@@ -461,7 +461,7 @@ mod tests {
let fe_opts = command.load_options(&Default::default()).unwrap().component;
assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
assert_eq!(Duration::from_secs(30), fe_opts.http.timeout);
assert_eq!(Duration::from_secs(0), fe_opts.http.timeout);
assert_eq!(ReadableSize::gb(2), fe_opts.http.body_limit);

View File

@@ -135,5 +135,6 @@ pub fn is_readonly_schema(schema: &str) -> bool {
pub const TRACE_ID_COLUMN: &str = "trace_id";
pub const SPAN_ID_COLUMN: &str = "span_id";
pub const SPAN_NAME_COLUMN: &str = "span_name";
pub const SERVICE_NAME_COLUMN: &str = "service_name";
pub const PARENT_SPAN_ID_COLUMN: &str = "parent_span_id";
// ---- End of special table and fields ----

View File

@@ -24,6 +24,7 @@ pub(crate) mod sum;
mod vector_add;
mod vector_dim;
mod vector_div;
mod vector_kth_elem;
mod vector_mul;
mod vector_norm;
mod vector_sub;
@@ -57,6 +58,7 @@ impl VectorFunction {
registry.register(Arc::new(vector_div::VectorDivFunction));
registry.register(Arc::new(vector_norm::VectorNormFunction));
registry.register(Arc::new(vector_dim::VectorDimFunction));
registry.register(Arc::new(vector_kth_elem::VectorKthElemFunction));
registry.register(Arc::new(vector_subvector::VectorSubvectorFunction));
registry.register(Arc::new(elem_sum::ElemSumFunction));
registry.register(Arc::new(elem_product::ElemProductFunction));

View File

@@ -0,0 +1,211 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::borrow::Cow;
use std::fmt::Display;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::Signature;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{Float32VectorBuilder, MutableVector, VectorRef};
use snafu::ensure;
use crate::function::{Function, FunctionContext};
use crate::helper;
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const};
const NAME: &str = "vec_kth_elem";
/// Returns the k-th(0-based index) element of the vector.
///
/// # Example
///
/// ```sql
/// SELECT vec_kth_elem("[2, 4, 6]",1) as result;
///
/// +---------+
/// | result |
/// +---------+
/// | 4 |
/// +---------+
///
/// ```
///
#[derive(Debug, Clone, Default)]
pub struct VectorKthElemFunction;
impl Function for VectorKthElemFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(
&self,
_input_types: &[ConcreteDataType],
) -> common_query::error::Result<ConcreteDataType> {
Ok(ConcreteDataType::float32_datatype())
}
fn signature(&self) -> Signature {
helper::one_of_sigs2(
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::binary_datatype(),
],
vec![ConcreteDataType::int64_datatype()],
)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly two, have: {}",
columns.len()
),
}
);
let arg0 = &columns[0];
let arg1 = &columns[1];
let len = arg0.len();
let mut result = Float32VectorBuilder::with_capacity(len);
if len == 0 {
return Ok(result.to_vector());
};
let arg0_const = as_veclit_if_const(arg0)?;
for i in 0..len {
let arg0 = match arg0_const.as_ref() {
Some(arg0) => Some(Cow::Borrowed(arg0.as_ref())),
None => as_veclit(arg0.get_ref(i))?,
};
let Some(arg0) = arg0 else {
result.push_null();
continue;
};
let arg1 = arg1.get(i).as_f64_lossy();
let Some(arg1) = arg1 else {
result.push_null();
continue;
};
ensure!(
arg1 >= 0.0 && arg1.fract() == 0.0,
InvalidFuncArgsSnafu {
err_msg: format!(
"Invalid argument: k must be a non-negative integer, but got k = {}.",
arg1
),
}
);
let k = arg1 as usize;
ensure!(
k < arg0.len(),
InvalidFuncArgsSnafu {
err_msg: format!(
"Out of range: k must be in the range [0, {}], but got k = {}.",
arg0.len() - 1,
k
),
}
);
let value = arg0[k];
result.push(Some(value));
}
Ok(result.to_vector())
}
}
impl Display for VectorKthElemFunction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", NAME.to_ascii_uppercase())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_query::error;
use datatypes::vectors::{Int64Vector, StringVector};
use super::*;
#[test]
fn test_vec_kth_elem() {
let func = VectorKthElemFunction;
let input0 = Arc::new(StringVector::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
Some("[4.0,5.0,6.0]".to_string()),
Some("[7.0,8.0,9.0]".to_string()),
None,
]));
let input1 = Arc::new(Int64Vector::from(vec![Some(0), Some(2), None, Some(1)]));
let result = func
.eval(&FunctionContext::default(), &[input0, input1])
.unwrap();
let result = result.as_ref();
assert_eq!(result.len(), 4);
assert_eq!(result.get_ref(0).as_f32().unwrap(), Some(1.0));
assert_eq!(result.get_ref(1).as_f32().unwrap(), Some(6.0));
assert!(result.get_ref(2).is_null());
assert!(result.get_ref(3).is_null());
let input0 = Arc::new(StringVector::from(vec![Some("[1.0,2.0,3.0]".to_string())]));
let input1 = Arc::new(Int64Vector::from(vec![Some(3)]));
let err = func
.eval(&FunctionContext::default(), &[input0, input1])
.unwrap_err();
match err {
error::Error::InvalidFuncArgs { err_msg, .. } => {
assert_eq!(
err_msg,
format!("Out of range: k must be in the range [0, 2], but got k = 3.")
)
}
_ => unreachable!(),
}
let input0 = Arc::new(StringVector::from(vec![Some("[1.0,2.0,3.0]".to_string())]));
let input1 = Arc::new(Int64Vector::from(vec![Some(-1)]));
let err = func
.eval(&FunctionContext::default(), &[input0, input1])
.unwrap_err();
match err {
error::Error::InvalidFuncArgs { err_msg, .. } => {
assert_eq!(
err_msg,
format!("Invalid argument: k must be a non-negative integer, but got k = -1.")
)
}
_ => unreachable!(),
}
}
}

View File

@@ -155,21 +155,21 @@ impl<'a> MySqlTemplateFactory<'a> {
table_name: table_name.to_string(),
create_table_statement: format!(
// Cannot be more than 3072 bytes in PRIMARY KEY
"CREATE TABLE IF NOT EXISTS {table_name}(k VARBINARY(3072) PRIMARY KEY, v BLOB);",
"CREATE TABLE IF NOT EXISTS `{table_name}`(k VARBINARY(3072) PRIMARY KEY, v BLOB);",
),
range_template: RangeTemplate {
point: format!("SELECT k, v FROM {table_name} WHERE k = ?"),
range: format!("SELECT k, v FROM {table_name} WHERE k >= ? AND k < ? ORDER BY k"),
full: format!("SELECT k, v FROM {table_name} ? ORDER BY k"),
left_bounded: format!("SELECT k, v FROM {table_name} WHERE k >= ? ORDER BY k"),
prefix: format!("SELECT k, v FROM {table_name} WHERE k LIKE ? ORDER BY k"),
point: format!("SELECT k, v FROM `{table_name}` WHERE k = ?"),
range: format!("SELECT k, v FROM `{table_name}` WHERE k >= ? AND k < ? ORDER BY k"),
full: format!("SELECT k, v FROM `{table_name}` ? ORDER BY k"),
left_bounded: format!("SELECT k, v FROM `{table_name}` WHERE k >= ? ORDER BY k"),
prefix: format!("SELECT k, v FROM `{table_name}` WHERE k LIKE ? ORDER BY k"),
},
delete_template: RangeTemplate {
point: format!("DELETE FROM {table_name} WHERE k = ?;"),
range: format!("DELETE FROM {table_name} WHERE k >= ? AND k < ?;"),
full: format!("DELETE FROM {table_name}"),
left_bounded: format!("DELETE FROM {table_name} WHERE k >= ?;"),
prefix: format!("DELETE FROM {table_name} WHERE k LIKE ?;"),
point: format!("DELETE FROM `{table_name}` WHERE k = ?;"),
range: format!("DELETE FROM `{table_name}` WHERE k >= ? AND k < ?;"),
full: format!("DELETE FROM `{table_name}`"),
left_bounded: format!("DELETE FROM `{table_name}` WHERE k >= ?;"),
prefix: format!("DELETE FROM `{table_name}` WHERE k LIKE ?;"),
},
}
}
@@ -189,14 +189,17 @@ impl MySqlTemplateSet {
fn generate_batch_get_query(&self, key_len: usize) -> String {
let table_name = &self.table_name;
let in_clause = mysql_generate_in_placeholders(1, key_len).join(", ");
format!("SELECT k, v FROM {table_name} WHERE k in ({});", in_clause)
format!(
"SELECT k, v FROM `{table_name}` WHERE k in ({});",
in_clause
)
}
/// Generates the sql for batch delete.
fn generate_batch_delete_query(&self, key_len: usize) -> String {
let table_name = &self.table_name;
let in_clause = mysql_generate_in_placeholders(1, key_len).join(", ");
format!("DELETE FROM {table_name} WHERE k in ({});", in_clause)
format!("DELETE FROM `{table_name}` WHERE k in ({});", in_clause)
}
/// Generates the sql for batch upsert.
@@ -212,9 +215,9 @@ impl MySqlTemplateSet {
let values_clause = values_placeholders.join(", ");
(
format!(r#"SELECT k, v FROM {table_name} WHERE k IN ({in_clause})"#,),
format!(r#"SELECT k, v FROM `{table_name}` WHERE k IN ({in_clause})"#,),
format!(
r#"INSERT INTO {table_name} (k, v) VALUES {values_clause} ON DUPLICATE KEY UPDATE v = VALUES(v);"#,
r#"INSERT INTO `{table_name}` (k, v) VALUES {values_clause} ON DUPLICATE KEY UPDATE v = VALUES(v);"#,
),
)
}

View File

@@ -157,21 +157,25 @@ impl<'a> PgSqlTemplateFactory<'a> {
PgSqlTemplateSet {
table_name: table_name.to_string(),
create_table_statement: format!(
"CREATE TABLE IF NOT EXISTS {table_name}(k bytea PRIMARY KEY, v bytea)",
"CREATE TABLE IF NOT EXISTS \"{table_name}\"(k bytea PRIMARY KEY, v bytea)",
),
range_template: RangeTemplate {
point: format!("SELECT k, v FROM {table_name} WHERE k = $1"),
range: format!("SELECT k, v FROM {table_name} WHERE k >= $1 AND k < $2 ORDER BY k"),
full: format!("SELECT k, v FROM {table_name} $1 ORDER BY k"),
left_bounded: format!("SELECT k, v FROM {table_name} WHERE k >= $1 ORDER BY k"),
prefix: format!("SELECT k, v FROM {table_name} WHERE k LIKE $1 ORDER BY k"),
point: format!("SELECT k, v FROM \"{table_name}\" WHERE k = $1"),
range: format!(
"SELECT k, v FROM \"{table_name}\" WHERE k >= $1 AND k < $2 ORDER BY k"
),
full: format!("SELECT k, v FROM \"{table_name}\" $1 ORDER BY k"),
left_bounded: format!("SELECT k, v FROM \"{table_name}\" WHERE k >= $1 ORDER BY k"),
prefix: format!("SELECT k, v FROM \"{table_name}\" WHERE k LIKE $1 ORDER BY k"),
},
delete_template: RangeTemplate {
point: format!("DELETE FROM {table_name} WHERE k = $1 RETURNING k,v;"),
range: format!("DELETE FROM {table_name} WHERE k >= $1 AND k < $2 RETURNING k,v;"),
full: format!("DELETE FROM {table_name} RETURNING k,v"),
left_bounded: format!("DELETE FROM {table_name} WHERE k >= $1 RETURNING k,v;"),
prefix: format!("DELETE FROM {table_name} WHERE k LIKE $1 RETURNING k,v;"),
point: format!("DELETE FROM \"{table_name}\" WHERE k = $1 RETURNING k,v;"),
range: format!(
"DELETE FROM \"{table_name}\" WHERE k >= $1 AND k < $2 RETURNING k,v;"
),
full: format!("DELETE FROM \"{table_name}\" RETURNING k,v"),
left_bounded: format!("DELETE FROM \"{table_name}\" WHERE k >= $1 RETURNING k,v;"),
prefix: format!("DELETE FROM \"{table_name}\" WHERE k LIKE $1 RETURNING k,v;"),
},
}
}
@@ -191,7 +195,10 @@ impl PgSqlTemplateSet {
fn generate_batch_get_query(&self, key_len: usize) -> String {
let table_name = &self.table_name;
let in_clause = pg_generate_in_placeholders(1, key_len).join(", ");
format!("SELECT k, v FROM {table_name} WHERE k in ({});", in_clause)
format!(
"SELECT k, v FROM \"{table_name}\" WHERE k in ({});",
in_clause
)
}
/// Generates the sql for batch delete.
@@ -199,7 +206,7 @@ impl PgSqlTemplateSet {
let table_name = &self.table_name;
let in_clause = pg_generate_in_placeholders(1, key_len).join(", ");
format!(
"DELETE FROM {table_name} WHERE k in ({}) RETURNING k,v;",
"DELETE FROM \"{table_name}\" WHERE k in ({}) RETURNING k,v;",
in_clause
)
}
@@ -220,9 +227,9 @@ impl PgSqlTemplateSet {
format!(
r#"
WITH prev AS (
SELECT k,v FROM {table_name} WHERE k IN ({in_clause})
SELECT k,v FROM "{table_name}" WHERE k IN ({in_clause})
), update AS (
INSERT INTO {table_name} (k, v) VALUES
INSERT INTO "{table_name}" (k, v) VALUES
{values_clause}
ON CONFLICT (
k

View File

@@ -12,63 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{Display, Formatter};
use std::sync::Arc;
use api::v1::meta::Peer as PbPeer;
use serde::{Deserialize, Serialize};
pub use api::v1::meta::Peer;
use crate::error::Error;
use crate::{DatanodeId, FlownodeId};
#[derive(Debug, Default, Clone, Hash, Eq, PartialEq, Deserialize, Serialize)]
pub struct Peer {
/// Node identifier. Unique in a cluster.
pub id: u64,
pub addr: String,
}
impl From<PbPeer> for Peer {
fn from(p: PbPeer) -> Self {
Self {
id: p.id,
addr: p.addr,
}
}
}
impl From<Peer> for PbPeer {
fn from(p: Peer) -> Self {
Self {
id: p.id,
addr: p.addr,
}
}
}
impl Peer {
pub fn new(id: u64, addr: impl Into<String>) -> Self {
Self {
id,
addr: addr.into(),
}
}
#[cfg(any(test, feature = "testing"))]
pub fn empty(id: u64) -> Self {
Self {
id,
addr: String::new(),
}
}
}
impl Display for Peer {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "peer-{}({})", self.id, self.addr)
}
}
/// can query peer given a node id
#[async_trait::async_trait]
pub trait PeerLookupService {

View File

@@ -25,6 +25,6 @@ pub mod heartbeat;
pub mod metrics;
pub mod region_server;
pub mod service;
mod store;
pub mod store;
#[cfg(any(test, feature = "testing"))]
pub mod tests;

View File

@@ -15,7 +15,7 @@
//! object storage utilities
mod azblob;
mod fs;
pub mod fs;
mod gcs;
mod oss;
mod s3;

View File

@@ -24,7 +24,8 @@ use crate::config::FileConfig;
use crate::error::{self, Result};
use crate::store;
pub(crate) async fn new_fs_object_store(
/// A helper function to create a file system object store.
pub async fn new_fs_object_store(
data_home: &str,
_file_config: &FileConfig,
) -> Result<ObjectStore> {

View File

@@ -28,14 +28,14 @@ use common_recordbatch::adapter::RecordBatchStreamAdapter;
use datafusion::dataframe::DataFrame;
use datafusion::execution::context::SessionContext;
use datafusion::execution::SessionStateBuilder;
use datafusion_expr::{col, lit, lit_timestamp_nano, Expr};
use datafusion_expr::{col, lit, lit_timestamp_nano, wildcard, Expr};
use query::QueryEngineRef;
use serde_json::Value as JsonValue;
use servers::error::{
CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult,
TableNotFoundSnafu,
};
use servers::http::jaeger::{QueryTraceParams, FIND_TRACES_COLS};
use servers::http::jaeger::{QueryTraceParams, JAEGER_QUERY_TABLE_NAME_KEY};
use servers::otlp::trace::{
DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_KIND_COLUMN,
SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN, TRACE_TABLE_NAME,
@@ -43,6 +43,7 @@ use servers::otlp::trace::{
use servers::query_handler::JaegerQueryHandler;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use table::requests::{TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1};
use table::table::adapter::DfTableProviderAdapter;
use super::Instance;
@@ -82,7 +83,19 @@ impl JaegerQueryHandler for Instance {
))));
}
// It's equivalent to `SELECT span_name, span_kind FROM {db}.{trace_table} WHERE service_name = '{service_name}'`.
// It's equivalent to
//
// ```
// SELECT
// span_name,
// span_kind
// FROM
// {db}.{trace_table}
// WHERE
// service_name = '{service_name}'
// ORDER BY
// timestamp
// ```.
Ok(query_trace_table(
ctx,
self.catalog_manager(),
@@ -101,9 +114,19 @@ impl JaegerQueryHandler for Instance {
}
async fn get_trace(&self, ctx: QueryContextRef, trace_id: &str) -> ServerResult<Output> {
// It's equivalent to `SELECT trace_id, timestamp, duration_nano, service_name, span_name, span_id, span_attributes, resource_attributes, parent_span_id
// FROM {db}.{trace_table} WHERE trace_id = '{trace_id}'`.
let selects: Vec<Expr> = FIND_TRACES_COLS.clone();
// It's equivalent to
//
// ```
// SELECT
// *
// FROM
// {db}.{trace_table}
// WHERE
// trace_id = '{trace_id}'
// ORDER BY
// timestamp
// ```.
let selects = vec![wildcard()];
let filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
@@ -125,7 +148,7 @@ impl JaegerQueryHandler for Instance {
ctx: QueryContextRef,
query_params: QueryTraceParams,
) -> ServerResult<Output> {
let selects: Vec<Expr> = FIND_TRACES_COLS.clone();
let selects = vec![wildcard()];
let mut filters = vec![];
@@ -174,17 +197,34 @@ async fn query_trace_table(
tags: Option<HashMap<String, JsonValue>>,
distinct: bool,
) -> ServerResult<Output> {
let db = ctx.get_db_string();
let table_name = ctx
.extension(JAEGER_QUERY_TABLE_NAME_KEY)
.unwrap_or(TRACE_TABLE_NAME);
let table = catalog_manager
.table(ctx.current_catalog(), &db, TRACE_TABLE_NAME, Some(&ctx))
.table(
ctx.current_catalog(),
&ctx.current_schema(),
table_name,
Some(&ctx),
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table: TRACE_TABLE_NAME,
table: table_name,
catalog: ctx.current_catalog(),
schema: db,
schema: ctx.current_schema(),
})?;
let is_data_model_v1 = table
.table_info()
.meta
.options
.extra_options
.get(TABLE_DATA_MODEL)
.map(|s| s.as_str())
== Some(TABLE_DATA_MODEL_TRACE_V1);
let df_context = create_df_context(query_engine, ctx.clone())?;
let dataframe = df_context
@@ -196,7 +236,9 @@ async fn query_trace_table(
// Apply all filters.
let dataframe = filters
.into_iter()
.chain(tags.map_or(Ok(vec![]), |t| tags_filters(&dataframe, t))?)
.chain(tags.map_or(Ok(vec![]), |t| {
tags_filters(&dataframe, t, is_data_model_v1)
})?)
.try_fold(dataframe, |df, expr| {
df.filter(expr).context(DataFusionSnafu)
})?;
@@ -205,7 +247,10 @@ async fn query_trace_table(
let dataframe = if distinct {
dataframe.distinct().context(DataFusionSnafu)?
} else {
// for non distinct query, sort by timestamp to make results stable
dataframe
.sort_by(vec![col(TIMESTAMP_COLUMN)])
.context(DataFusionSnafu)?
};
// Apply the limit if needed.
@@ -237,7 +282,7 @@ fn create_df_context(
SessionStateBuilder::new_from_existing(query_engine.engine_state().session_state()).build(),
);
// The following JSON UDFs will be used for tags filters.
// The following JSON UDFs will be used for tags filters on v0 data model.
let udfs: Vec<FunctionRef> = vec![
Arc::new(JsonGetInt),
Arc::new(JsonGetFloat),
@@ -256,7 +301,7 @@ fn create_df_context(
Ok(df_context)
}
fn tags_filters(
fn json_tag_filters(
dataframe: &DataFrame,
tags: HashMap<String, JsonValue>,
) -> ServerResult<Vec<Expr>> {
@@ -322,3 +367,41 @@ fn tags_filters(
Ok(filters)
}
fn flatten_tag_filters(tags: HashMap<String, JsonValue>) -> ServerResult<Vec<Expr>> {
let filters = tags
.into_iter()
.filter_map(|(key, value)| {
let key = format!("\"span_attributes.{}\"", key);
match value {
JsonValue::String(value) => Some(col(key).eq(lit(value))),
JsonValue::Number(value) => {
if value.is_f64() {
// safe to unwrap as checked previously
Some(col(key).eq(lit(value.as_f64().unwrap())))
} else {
Some(col(key).eq(lit(value.as_i64().unwrap())))
}
}
JsonValue::Bool(value) => Some(col(key).eq(lit(value))),
JsonValue::Null => Some(col(key).is_null()),
// not supported at the moment
JsonValue::Array(_value) => None,
JsonValue::Object(_value) => None,
}
})
.collect();
Ok(filters)
}
fn tags_filters(
dataframe: &DataFrame,
tags: HashMap<String, JsonValue>,
is_data_model_v1: bool,
) -> ServerResult<Vec<Expr>> {
if is_data_model_v1 {
flatten_tag_filters(tags)
} else {
json_tag_filters(dataframe, tags)
}
}

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::Deref;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use client::Output;
use common_error::ext::BoxedError;
@@ -20,7 +22,7 @@ use server_error::Result as ServerResult;
use servers::error::{self as server_error, AuthSnafu, ExecuteQuerySnafu};
use servers::interceptor::{LogQueryInterceptor, LogQueryInterceptorRef};
use servers::query_handler::LogQueryHandler;
use session::context::QueryContextRef;
use session::context::{QueryContext, QueryContextRef};
use snafu::ResultExt;
use tonic::async_trait;
@@ -64,4 +66,8 @@ impl LogQueryHandler for Instance {
Ok(interceptor.as_ref().post_query(output, ctx.clone())?)
}
fn catalog_manager(&self, _ctx: &QueryContext) -> ServerResult<&dyn catalog::CatalogManager> {
Ok(self.catalog_manager.deref())
}
}

View File

@@ -90,6 +90,8 @@ impl OpenTelemetryProtocolHandler for Instance {
.get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_execute(ctx.clone())?;
let is_trace_v1_model = matches!(pipeline, PipelineWay::OtlpTraceDirectV1);
let (requests, rows) = otlp::trace::to_grpc_insert_requests(
request,
pipeline,
@@ -101,10 +103,17 @@ impl OpenTelemetryProtocolHandler for Instance {
OTLP_TRACES_ROWS.inc_by(rows as u64);
self.handle_trace_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)
if is_trace_v1_model {
self.handle_trace_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)
} else {
self.handle_log_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)
}
}
#[tracing::instrument(skip_all)]

View File

@@ -284,7 +284,7 @@ impl ClusterInfo for MetaClient {
followers
.into_iter()
.map(|node| NodeInfo {
peer: node.peer.map(|p| p.into()).unwrap_or_default(),
peer: node.peer.unwrap_or_default(),
last_activity_ts,
status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
version: node.version,
@@ -292,7 +292,7 @@ impl ClusterInfo for MetaClient {
start_time_ms: node.start_time_ms,
})
.chain(leader.into_iter().map(|node| NodeInfo {
peer: node.peer.map(|p| p.into()).unwrap_or_default(),
peer: node.peer.unwrap_or_default(),
last_activity_ts,
status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
version: node.version,

View File

@@ -6,7 +6,8 @@ license.workspace = true
[features]
mock = []
pg_kvbackend = ["dep:tokio-postgres", "common-meta/pg_kvbackend"]
pg_kvbackend = ["dep:tokio-postgres", "common-meta/pg_kvbackend", "dep:deadpool-postgres", "dep:deadpool"]
mysql_kvbackend = ["dep:sqlx", "common-meta/mysql_kvbackend"]
[lints]
workspace = true
@@ -38,8 +39,8 @@ common-version.workspace = true
common-wal.workspace = true
dashmap.workspace = true
datatypes.workspace = true
deadpool.workspace = true
deadpool-postgres.workspace = true
deadpool = { workspace = true, optional = true }
deadpool-postgres = { workspace = true, optional = true }
derive_builder.workspace = true
etcd-client.workspace = true
futures.workspace = true
@@ -60,6 +61,7 @@ serde.workspace = true
serde_json.workspace = true
servers.workspace = true
snafu.workspace = true
sqlx = { workspace = true, optional = true }
store-api.workspace = true
strum.workspace = true
table.workspace = true

View File

@@ -23,6 +23,8 @@ use common_config::Configurable;
use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::memory::MemoryKvBackend;
#[cfg(feature = "mysql_kvbackend")]
use common_meta::kv_backend::rds::MySqlStore;
#[cfg(feature = "pg_kvbackend")]
use common_meta::kv_backend::rds::PgStore;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
@@ -38,9 +40,15 @@ use servers::export_metrics::ExportMetricsTask;
use servers::http::{HttpServer, HttpServerBuilder};
use servers::metrics_handler::MetricsHandler;
use servers::server::Server;
#[cfg(feature = "pg_kvbackend")]
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
use snafu::OptionExt;
use snafu::ResultExt;
#[cfg(feature = "mysql_kvbackend")]
use sqlx::mysql::MySqlConnectOptions;
#[cfg(feature = "mysql_kvbackend")]
use sqlx::mysql::{MySqlConnection, MySqlPool};
#[cfg(feature = "mysql_kvbackend")]
use sqlx::Connection;
use tokio::net::TcpListener;
use tokio::sync::mpsc::{self, Receiver, Sender};
#[cfg(feature = "pg_kvbackend")]
@@ -49,9 +57,11 @@ use tonic::codec::CompressionEncoding;
use tonic::transport::server::{Router, TcpIncoming};
use crate::election::etcd::EtcdElection;
#[cfg(feature = "mysql_kvbackend")]
use crate::election::mysql::MySqlElection;
#[cfg(feature = "pg_kvbackend")]
use crate::election::postgres::PgElection;
#[cfg(feature = "pg_kvbackend")]
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
use crate::election::CANDIDATE_LEASE_SECS;
use crate::metasrv::builder::MetasrvBuilder;
use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectorRef};
@@ -229,7 +239,6 @@ pub async fn metasrv_builder(
#[cfg(feature = "pg_kvbackend")]
(None, BackendImpl::PostgresStore) => {
let pool = create_postgres_pool(opts).await?;
// TODO(CookiePie): use table name from config.
let kv_backend = PgStore::with_pg_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
.await
.context(error::KvBackendSnafu)?;
@@ -246,6 +255,26 @@ pub async fn metasrv_builder(
.await?;
(kv_backend, Some(election))
}
#[cfg(feature = "mysql_kvbackend")]
(None, BackendImpl::MysqlStore) => {
let pool = create_mysql_pool(opts).await?;
let kv_backend =
MySqlStore::with_mysql_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
.await
.context(error::KvBackendSnafu)?;
// Since election will acquire a lock of the table, we need a separate table for election.
let election_table_name = opts.meta_table_name.clone() + "_election";
let election_client = create_mysql_client(opts).await?;
let election = MySqlElection::with_mysql_client(
opts.server_addr.clone(),
election_client,
opts.store_key_prefix.clone(),
CANDIDATE_LEASE_SECS,
&election_table_name,
)
.await?;
(kv_backend, Some(election))
}
};
if !opts.store_key_prefix.is_empty() {
@@ -323,3 +352,41 @@ async fn create_postgres_pool(opts: &MetasrvOptions) -> Result<deadpool_postgres
.context(error::CreatePostgresPoolSnafu)?;
Ok(pool)
}
#[cfg(feature = "mysql_kvbackend")]
async fn setup_mysql_options(opts: &MetasrvOptions) -> Result<MySqlConnectOptions> {
let mysql_url = opts
.store_addrs
.first()
.context(error::InvalidArgumentsSnafu {
err_msg: "empty store addrs",
})?;
// Avoid `SET` commands in sqlx
let opts: MySqlConnectOptions = mysql_url
.parse()
.context(error::ParseMySqlUrlSnafu { mysql_url })?;
let opts = opts
.no_engine_substitution(false)
.pipes_as_concat(false)
.timezone(None)
.set_names(false);
Ok(opts)
}
#[cfg(feature = "mysql_kvbackend")]
async fn create_mysql_pool(opts: &MetasrvOptions) -> Result<MySqlPool> {
let opts = setup_mysql_options(opts).await?;
let pool = MySqlPool::connect_with(opts)
.await
.context(error::CreateMySqlPoolSnafu)?;
Ok(pool)
}
#[cfg(feature = "mysql_kvbackend")]
async fn create_mysql_client(opts: &MetasrvOptions) -> Result<MySqlConnection> {
let opts = setup_mysql_options(opts).await?;
let client = MySqlConnection::connect_with(&opts)
.await
.context(error::ConnectMySqlSnafu)?;
Ok(client)
}

View File

@@ -13,6 +13,8 @@
// limitations under the License.
pub mod etcd;
#[cfg(feature = "mysql_kvbackend")]
pub mod mysql;
#[cfg(feature = "pg_kvbackend")]
pub mod postgres;

View File

@@ -0,0 +1,800 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS};
use common_telemetry::{error, warn};
use common_time::Timestamp;
use itertools::Itertools;
use snafu::{ensure, OptionExt, ResultExt};
use sqlx::mysql::{MySqlArguments, MySqlRow};
use sqlx::query::Query;
use sqlx::{MySql, MySqlConnection, MySqlTransaction, Row};
use tokio::sync::{broadcast, Mutex, MutexGuard};
use tokio::time::{Interval, MissedTickBehavior};
use crate::election::{
listen_leader_change, Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT, ELECTION_KEY,
};
use crate::error::{
DeserializeFromJsonSnafu, MySqlExecutionSnafu, NoLeaderSnafu, Result, SerializeToJsonSnafu,
UnexpectedSnafu,
};
use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
// Separator between value and expire time.
const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#;
/// Lease information.
/// TODO(CookiePie): PgElection can also use this struct. Refactor it to a common module.
#[derive(Default, Clone)]
struct Lease {
leader_value: String,
expire_time: Timestamp,
current: Timestamp,
// origin is the origin value of the lease, used for CAS.
origin: String,
}
struct ElectionSqlFactory<'a> {
table_name: &'a str,
}
struct ElectionSqlSet {
campaign: String,
// SQL to put a value with expire time.
//
// Parameters for the query:
// `$1`: key,
// `$2`: value,
// `$3`: lease time in seconds
//
// Returns:
// If the key already exists, return the previous value.
put_value_with_lease: String,
// SQL to update a value with expire time.
//
// Parameters for the query:
// `$1`: updated value,
// `$2`: lease time in seconds
// `$3`: key,
// `$4`: previous value,
update_value_with_lease: String,
// SQL to get a value with expire time.
//
// Parameters:
// `$1`: key
get_value_with_lease: String,
// SQL to get all values with expire time with the given key prefix.
//
// Parameters:
// `$1`: key prefix like 'prefix%'
//
// Returns:
// column 0: value,
// column 1: current timestamp
get_value_with_lease_by_prefix: String,
// SQL to delete a value.
//
// Parameters:
// `?`: key
//
// Returns:
// Rows affected
delete_value: String,
}
impl<'a> ElectionSqlFactory<'a> {
fn new(table_name: &'a str) -> Self {
Self { table_name }
}
fn build(self) -> ElectionSqlSet {
ElectionSqlSet {
campaign: self.campaign_sql(),
put_value_with_lease: self.put_value_with_lease_sql(),
update_value_with_lease: self.update_value_with_lease_sql(),
get_value_with_lease: self.get_value_with_lease_sql(),
get_value_with_lease_by_prefix: self.get_value_with_lease_by_prefix_sql(),
delete_value: self.delete_value_sql(),
}
}
// Currently the session timeout is longer than the leader lease time.
// So the leader will renew the lease twice before the session timeout if everything goes well.
fn set_idle_session_timeout_sql(&self) -> String {
format!("SET SESSION wait_timeout = {};", META_LEASE_SECS + 1)
}
fn set_lock_wait_timeout_sql(&self) -> &str {
"SET SESSION innodb_lock_wait_timeout = 1;"
}
fn create_table_sql(&self) -> String {
format!(
r#"
CREATE TABLE IF NOT EXISTS `{}` (
k VARBINARY(3072) PRIMARY KEY,
v BLOB
);
"#,
self.table_name
)
}
fn insert_once(&self) -> String {
format!(
"INSERT IGNORE INTO `{}` (k, v) VALUES ('__place_holder_for_lock', '');",
self.table_name
)
}
fn check_version(&self) -> &str {
"SELECT @@version;"
}
fn campaign_sql(&self) -> String {
format!("SELECT * FROM `{}` FOR UPDATE;", self.table_name)
}
fn put_value_with_lease_sql(&self) -> String {
format!(
r#"
INSERT INTO `{}` (k, v) VALUES (
?,
CONCAT(
?,
'{}',
DATE_FORMAT(DATE_ADD(NOW(4), INTERVAL ? SECOND), '%Y-%m-%d %T.%f')
)
)
ON DUPLICATE KEY UPDATE v = VALUES(v);
"#,
self.table_name, LEASE_SEP
)
}
fn update_value_with_lease_sql(&self) -> String {
format!(
r#"UPDATE `{}`
SET v = CONCAT(?, '{}', DATE_FORMAT(DATE_ADD(NOW(4), INTERVAL ? SECOND), '%Y-%m-%d %T.%f'))
WHERE k = ? AND v = ?"#,
self.table_name, LEASE_SEP
)
}
fn get_value_with_lease_sql(&self) -> String {
format!(
r#"SELECT v, DATE_FORMAT(NOW(4), '%Y-%m-%d %T.%f') FROM `{}` WHERE k = ?"#,
self.table_name
)
}
fn get_value_with_lease_by_prefix_sql(&self) -> String {
format!(
r#"SELECT v, DATE_FORMAT(NOW(4), '%Y-%m-%d %T.%f') FROM `{}` WHERE k LIKE ?"#,
self.table_name
)
}
fn delete_value_sql(&self) -> String {
format!("DELETE FROM {} WHERE k = ?;", self.table_name)
}
}
/// Parse the value and expire time from the given string. The value should be in the format "value || LEASE_SEP || expire_time".
fn parse_value_and_expire_time(value: &str) -> Result<(String, Timestamp)> {
let (value, expire_time) =
value
.split(LEASE_SEP)
.collect_tuple()
.with_context(|| UnexpectedSnafu {
violated: format!(
"Invalid value {}, expect node info || {} || expire time",
value, LEASE_SEP
),
})?;
// Given expire_time is in the format 'YYYY-MM-DD HH24:MI:SS.MS'
let expire_time = match Timestamp::from_str(expire_time, None) {
Ok(ts) => ts,
Err(_) => UnexpectedSnafu {
violated: format!("Invalid timestamp: {}", expire_time),
}
.fail()?,
};
Ok((value.to_string(), expire_time))
}
#[derive(Debug, Clone, Default)]
struct MySqlLeaderKey {
name: Vec<u8>,
key: Vec<u8>,
rev: i64,
lease: i64,
}
impl LeaderKey for MySqlLeaderKey {
fn name(&self) -> &[u8] {
&self.name
}
fn key(&self) -> &[u8] {
&self.key
}
fn revision(&self) -> i64 {
self.rev
}
fn lease_id(&self) -> i64 {
self.lease
}
}
enum Executor<'a> {
Default(MutexGuard<'a, MySqlConnection>),
Txn(MySqlTransaction<'a>),
}
impl Executor<'_> {
async fn query(
&mut self,
query: Query<'_, MySql, MySqlArguments>,
sql: &str,
) -> Result<Vec<MySqlRow>> {
match self {
Executor::Default(client) => {
let res = query
.fetch_all(&mut **client)
.await
.context(MySqlExecutionSnafu { sql })?;
Ok(res)
}
Executor::Txn(txn) => {
let res = query
.fetch_all(&mut **txn)
.await
.context(MySqlExecutionSnafu { sql })?;
Ok(res)
}
}
}
async fn execute(&mut self, query: Query<'_, MySql, MySqlArguments>, sql: &str) -> Result<u64> {
match self {
Executor::Default(client) => {
let res = query
.execute(&mut **client)
.await
.context(MySqlExecutionSnafu { sql })?;
Ok(res.rows_affected())
}
Executor::Txn(txn) => {
let res = query
.execute(&mut **txn)
.await
.context(MySqlExecutionSnafu { sql })?;
Ok(res.rows_affected())
}
}
}
async fn commit(self) -> Result<()> {
match self {
Executor::Txn(txn) => {
txn.commit()
.await
.context(MySqlExecutionSnafu { sql: "COMMIT" })?;
Ok(())
}
_ => Ok(()),
}
}
}
/// MySQL implementation of Election.
pub struct MySqlElection {
leader_value: String,
client: Mutex<MySqlConnection>,
is_leader: AtomicBool,
leader_infancy: AtomicBool,
leader_watcher: broadcast::Sender<LeaderChangeMessage>,
store_key_prefix: String,
candidate_lease_ttl_secs: u64,
sql_set: ElectionSqlSet,
}
impl MySqlElection {
pub async fn with_mysql_client(
leader_value: String,
mut client: sqlx::MySqlConnection,
store_key_prefix: String,
candidate_lease_ttl_secs: u64,
table_name: &str,
) -> Result<ElectionRef> {
let sql_factory = ElectionSqlFactory::new(table_name);
sqlx::query(&sql_factory.create_table_sql())
.execute(&mut client)
.await
.context(MySqlExecutionSnafu {
sql: &sql_factory.create_table_sql(),
})?;
// Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead lock.
sqlx::query(&sql_factory.set_idle_session_timeout_sql())
.execute(&mut client)
.await
.context(MySqlExecutionSnafu {
sql: &sql_factory.set_idle_session_timeout_sql(),
})?;
// Set lock wait timeout to LOCK_WAIT_TIMEOUT to avoid waiting too long.
sqlx::query(sql_factory.set_lock_wait_timeout_sql())
.execute(&mut client)
.await
.context(MySqlExecutionSnafu {
sql: sql_factory.set_lock_wait_timeout_sql(),
})?;
// Insert at least one row for `SELECT * FOR UPDATE` to work.
sqlx::query(&sql_factory.insert_once())
.execute(&mut client)
.await
.context(MySqlExecutionSnafu {
sql: &sql_factory.insert_once(),
})?;
// Check MySQL version
Self::check_version(&mut client, sql_factory.check_version()).await?;
let tx = listen_leader_change(leader_value.clone());
Ok(Arc::new(Self {
leader_value,
client: Mutex::new(client),
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(false),
leader_watcher: tx,
store_key_prefix,
candidate_lease_ttl_secs,
sql_set: sql_factory.build(),
}))
}
fn election_key(&self) -> String {
format!("{}{}", self.store_key_prefix, ELECTION_KEY)
}
fn candidate_root(&self) -> String {
format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
}
fn candidate_key(&self) -> String {
format!("{}{}", self.candidate_root(), self.leader_value)
}
}
#[async_trait::async_trait]
impl Election for MySqlElection {
type Leader = LeaderValue;
fn is_leader(&self) -> bool {
self.is_leader.load(Ordering::Relaxed)
}
fn in_leader_infancy(&self) -> bool {
self.leader_infancy
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
}
async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
let key = self.candidate_key();
let node_info =
serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu {
input: format!("{node_info:?}"),
})?;
{
let client = self.client.lock().await;
let mut executor = Executor::Default(client);
let res = self
.put_value_with_lease(
&key,
&node_info,
self.candidate_lease_ttl_secs,
&mut executor,
)
.await?;
// May registered before, just update the lease.
if !res {
warn!("Candidate already registered, update the lease");
self.delete_value(&key, &mut executor).await?;
self.put_value_with_lease(
&key,
&node_info,
self.candidate_lease_ttl_secs,
&mut executor,
)
.await?;
}
}
// Check if the current lease has expired and renew the lease.
let mut keep_alive_interval =
tokio::time::interval(Duration::from_secs(self.candidate_lease_ttl_secs / 2));
loop {
let _ = keep_alive_interval.tick().await;
let client = self.client.lock().await;
let mut executor = Executor::Default(client);
let lease = self
.get_value_with_lease(&key, &mut executor)
.await?
.unwrap_or_default();
ensure!(
lease.expire_time > lease.current,
UnexpectedSnafu {
violated: format!(
"Candidate lease expired at {:?} (current time: {:?}), key: {:?}",
lease.expire_time,
lease.current,
String::from_utf8_lossy(&key.into_bytes())
),
}
);
self.update_value_with_lease(&key, &lease.origin, &node_info, &mut executor)
.await?;
std::mem::drop(executor);
}
}
async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
let key_prefix = self.candidate_root();
let client = self.client.lock().await;
let mut executor = Executor::Default(client);
let (mut candidates, current) = self
.get_value_with_lease_by_prefix(&key_prefix, &mut executor)
.await?;
// Remove expired candidates
candidates.retain(|c| c.1 > current);
let mut valid_candidates = Vec::with_capacity(candidates.len());
for (c, _) in candidates {
let node_info: MetasrvNodeInfo =
serde_json::from_str(&c).with_context(|_| DeserializeFromJsonSnafu {
input: format!("{:?}", c),
})?;
valid_candidates.push(node_info);
}
Ok(valid_candidates)
}
async fn campaign(&self) -> Result<()> {
let mut keep_alive_interval =
tokio::time::interval(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS));
keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
let _ = self.do_campaign(&mut keep_alive_interval).await;
}
}
async fn leader(&self) -> Result<Self::Leader> {
if self.is_leader.load(Ordering::Relaxed) {
Ok(self.leader_value.as_bytes().into())
} else {
let key = self.election_key();
let client = self.client.lock().await;
let mut executor = Executor::Default(client);
if let Some(lease) = self.get_value_with_lease(&key, &mut executor).await? {
ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
Ok(lease.leader_value.as_bytes().into())
} else {
NoLeaderSnafu.fail()
}
}
}
async fn resign(&self) -> Result<()> {
todo!()
}
fn subscribe_leader_change(&self) -> broadcast::Receiver<LeaderChangeMessage> {
self.leader_watcher.subscribe()
}
}
impl MySqlElection {
/// Returns value, expire time and current time. If `with_origin` is true, the origin string is also returned.
async fn get_value_with_lease(
&self,
key: &str,
executor: &mut Executor<'_>,
) -> Result<Option<Lease>> {
let key = key.as_bytes();
let query = sqlx::query(&self.sql_set.get_value_with_lease).bind(key);
let res = executor
.query(query, &self.sql_set.get_value_with_lease)
.await?;
if res.is_empty() {
return Ok(None);
}
// Safety: Checked if res is empty above.
let current_time_str = String::from_utf8_lossy(res[0].try_get(1).unwrap());
let current_time = match Timestamp::from_str(&current_time_str, None) {
Ok(ts) => ts,
Err(_) => UnexpectedSnafu {
violated: format!("Invalid timestamp: {}", current_time_str),
}
.fail()?,
};
// Safety: Checked if res is empty above.
let value_and_expire_time = String::from_utf8_lossy(res[0].try_get(0).unwrap_or_default());
let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
Ok(Some(Lease {
leader_value: value,
expire_time,
current: current_time,
origin: value_and_expire_time.to_string(),
}))
}
/// Returns all values and expire time with the given key prefix. Also returns the current time.
async fn get_value_with_lease_by_prefix(
&self,
key_prefix: &str,
executor: &mut Executor<'_>,
) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
let query = sqlx::query(&self.sql_set.get_value_with_lease_by_prefix).bind(key_prefix);
let res = executor
.query(query, &self.sql_set.get_value_with_lease_by_prefix)
.await?;
let mut values_with_leases = vec![];
let mut current = Timestamp::default();
for row in res {
let current_time_str = row.try_get(1).unwrap_or_default();
current = match Timestamp::from_str(current_time_str, None) {
Ok(ts) => ts,
Err(_) => UnexpectedSnafu {
violated: format!("Invalid timestamp: {}", current_time_str),
}
.fail()?,
};
let value_and_expire_time = String::from_utf8_lossy(row.try_get(0).unwrap_or_default());
let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
values_with_leases.push((value, expire_time));
}
Ok((values_with_leases, current))
}
async fn update_value_with_lease(
&self,
key: &str,
prev: &str,
updated: &str,
executor: &mut Executor<'_>,
) -> Result<()> {
let key = key.as_bytes();
let prev = prev.as_bytes();
let updated = updated.as_bytes();
let query = sqlx::query(&self.sql_set.update_value_with_lease)
.bind(updated)
.bind(self.candidate_lease_ttl_secs as f64)
.bind(key)
.bind(prev);
let res = executor
.execute(query, &self.sql_set.update_value_with_lease)
.await?;
ensure!(
res == 1,
UnexpectedSnafu {
violated: format!("Failed to update key: {}", String::from_utf8_lossy(key)),
}
);
Ok(())
}
/// Returns `true` if the insertion is successful
async fn put_value_with_lease(
&self,
key: &str,
value: &str,
lease_ttl_secs: u64,
executor: &mut Executor<'_>,
) -> Result<bool> {
let key = key.as_bytes();
let lease_ttl_secs = lease_ttl_secs as f64;
let query = sqlx::query(&self.sql_set.put_value_with_lease)
.bind(key)
.bind(value)
.bind(lease_ttl_secs);
let res = executor
.query(query, &self.sql_set.put_value_with_lease)
.await?;
Ok(res.is_empty())
}
/// Returns `true` if the deletion is successful.
/// Caution: Should only delete the key if the lease is expired.
async fn delete_value(&self, key: &str, executor: &mut Executor<'_>) -> Result<bool> {
let key = key.as_bytes();
let query = sqlx::query(&self.sql_set.delete_value).bind(key);
let res = executor.execute(query, &self.sql_set.delete_value).await?;
Ok(res == 1)
}
/// Attempts to acquire leadership by executing a campaign. This function continuously checks
/// if the current lease is still valid.
async fn do_campaign(&self, interval: &mut Interval) -> Result<()> {
// Need to restrict the scope of the client to avoid ambiguous overloads.
use sqlx::Acquire;
loop {
let client = self.client.lock().await;
let executor = Executor::Default(client);
let mut lease = Lease::default();
match (
self.lease_check(executor, &mut lease).await,
self.is_leader(),
) {
// If the leader lease is valid and I'm the leader, renew the lease.
(Ok(_), true) => {
let mut client = self.client.lock().await;
let txn = client
.begin()
.await
.context(MySqlExecutionSnafu { sql: "BEGIN" })?;
let mut executor = Executor::Txn(txn);
let query = sqlx::query(&self.sql_set.campaign);
executor.query(query, &self.sql_set.campaign).await?;
self.renew_lease(executor, lease).await?;
}
// If the leader lease expires and I'm the leader, notify the leader watcher and step down.
// Another instance should be elected as the leader in this case.
(Err(_), true) => {
warn!("Leader lease expired, re-initiate the campaign");
self.step_down_without_lock().await?;
}
// If the leader lease expires and I'm not the leader, elect myself.
(Err(_), false) => {
warn!("Leader lease expired, re-initiate the campaign");
let mut client = self.client.lock().await;
let txn = client
.begin()
.await
.context(MySqlExecutionSnafu { sql: "BEGIN" })?;
let mut executor = Executor::Txn(txn);
let query = sqlx::query(&self.sql_set.campaign);
executor.query(query, &self.sql_set.campaign).await?;
self.elected(&mut executor).await?;
executor.commit().await?;
}
// If the leader lease is valid and I'm not the leader, do nothing.
(Ok(_), false) => {}
}
interval.tick().await;
}
}
/// Renew the lease
async fn renew_lease(&self, mut executor: Executor<'_>, lease: Lease) -> Result<()> {
let key = self.election_key();
self.update_value_with_lease(&key, &lease.origin, &self.leader_value, &mut executor)
.await?;
executor.commit().await?;
Ok(())
}
/// Performs a lease check during the election process.
///
/// This function performs the following checks and actions:
///
/// - **Case 1**: If the current instance is not the leader but the lease has expired, it raises an error
/// to re-initiate the campaign. If the leader failed to renew the lease, its session will expire and the lock
/// will be released.
/// - **Case 2**: If all checks pass, the function returns without performing any actions.
async fn lease_check(&self, mut executor: Executor<'_>, lease: &mut Lease) -> Result<()> {
let key = self.election_key();
let check_lease = self
.get_value_with_lease(&key, &mut executor)
.await?
.context(NoLeaderSnafu)?;
*lease = check_lease;
// Case 1: Lease expired
ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
// Case 2: Everything is fine
Ok(())
}
/// Still consider itself as the leader locally but failed to acquire the lock. Step down without deleting the key.
async fn step_down_without_lock(&self) -> Result<()> {
let key = self.election_key().into_bytes();
let leader_key = MySqlLeaderKey {
name: self.leader_value.clone().into_bytes(),
key: key.clone(),
..Default::default()
};
if self
.is_leader
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::StepDown(Arc::new(leader_key)))
{
error!(e; "Failed to send leader change message");
}
}
Ok(())
}
/// Elected as leader. The leader should put the key and notify the leader watcher.
/// Caution: Should only elected while holding the lock.
async fn elected(&self, executor: &mut Executor<'_>) -> Result<()> {
let key = self.election_key();
let leader_key = MySqlLeaderKey {
name: self.leader_value.clone().into_bytes(),
key: key.clone().into_bytes(),
..Default::default()
};
self.delete_value(&key, executor).await?;
self.put_value_with_lease(&key, &self.leader_value, META_LEASE_SECS, executor)
.await?;
if self
.is_leader
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
self.leader_infancy.store(true, Ordering::Relaxed);
if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::Elected(Arc::new(leader_key)))
{
error!(e; "Failed to send leader change message");
}
}
Ok(())
}
/// Check if the MySQL version is supported.
async fn check_version(client: &mut MySqlConnection, sql: &str) -> Result<()> {
let query = sqlx::query(sql);
match query.fetch_one(client).await {
Ok(row) => {
let version: String = row.try_get(0).unwrap();
if !version.starts_with("8.0") || !version.starts_with("5.7") {
warn!(
"Unsupported MySQL version: {}, expected: [5.7, 8.0]",
version
);
}
}
Err(e) => {
warn!(e; "Failed to check MySQL version through sql: {}", sql);
}
}
Ok(())
}
}

View File

@@ -109,10 +109,10 @@ impl<'a> ElectionSqlFactory<'a> {
}
}
// Currently the session timeout is longer than the leader lease time, so the leader lease may expire while the session is still alive.
// Either the leader reconnects and step down or the session expires and the lock is released.
fn set_idle_session_timeout_sql(&self) -> &str {
"SET idle_session_timeout = '10s';"
// Currently the session timeout is longer than the leader lease time.
// So the leader will renew the lease twice before the session timeout if everything goes well.
fn set_idle_session_timeout_sql(&self) -> String {
format!("SET idle_session_timeout = '{}s';", META_LEASE_SECS + 1)
}
fn campaign_sql(&self) -> String {
@@ -126,9 +126,9 @@ impl<'a> ElectionSqlFactory<'a> {
fn put_value_with_lease_sql(&self) -> String {
format!(
r#"WITH prev AS (
SELECT k, v FROM {} WHERE k = $1
SELECT k, v FROM "{}" WHERE k = $1
), insert AS (
INSERT INTO {}
INSERT INTO "{}"
VALUES($1, convert_to($2 || '{}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $3, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8'))
ON CONFLICT (k) DO NOTHING
)
@@ -140,7 +140,7 @@ impl<'a> ElectionSqlFactory<'a> {
fn update_value_with_lease_sql(&self) -> String {
format!(
r#"UPDATE {}
r#"UPDATE "{}"
SET v = convert_to($3 || '{}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8')
WHERE k = $1 AND v = $2"#,
self.table_name, LEASE_SEP
@@ -149,21 +149,21 @@ impl<'a> ElectionSqlFactory<'a> {
fn get_value_with_lease_sql(&self) -> String {
format!(
r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {} WHERE k = $1"#,
r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM "{}" WHERE k = $1"#,
self.table_name
)
}
fn get_value_with_lease_by_prefix_sql(&self) -> String {
format!(
r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {} WHERE k LIKE $1"#,
r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM "{}" WHERE k LIKE $1"#,
self.table_name
)
}
fn delete_value_sql(&self) -> String {
format!(
"DELETE FROM {} WHERE k = $1 RETURNING k,v;",
"DELETE FROM \"{}\" WHERE k = $1 RETURNING k,v;",
self.table_name
)
}
@@ -241,7 +241,7 @@ impl PgElection {
let sql_factory = ElectionSqlFactory::new(lock_id, table_name);
// Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead advisory lock.
client
.execute(sql_factory.set_idle_session_timeout_sql(), &[])
.execute(&sql_factory.set_idle_session_timeout_sql(), &[])
.await
.context(PostgresExecutionSnafu)?;
@@ -285,7 +285,6 @@ impl Election for PgElection {
.is_ok()
}
/// TODO(CookiePie): Split the candidate registration and keep alive logic into separate methods, so that upper layers can call them separately.
async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
let key = self.candidate_key();
let node_info =
@@ -317,7 +316,9 @@ impl Election for PgElection {
prev_expire_time > current_time,
UnexpectedSnafu {
violated: format!(
"Candidate lease expired, key: {:?}",
"Candidate lease expired at {:?} (current time {:?}), key: {:?}",
prev_expire_time,
current_time,
String::from_utf8_lossy(&key.into_bytes())
),
}
@@ -369,23 +370,19 @@ impl Election for PgElection {
.query(&self.sql_set.campaign, &[])
.await
.context(PostgresExecutionSnafu)?;
if let Some(row) = res.first() {
match row.try_get(0) {
Ok(true) => self.leader_action().await?,
Ok(false) => self.follower_action().await?,
Err(_) => {
return UnexpectedSnafu {
violated: "Failed to get the result of acquiring advisory lock"
.to_string(),
}
.fail();
}
let row = res.first().context(UnexpectedSnafu {
violated: "Failed to get the result of acquiring advisory lock",
})?;
let is_leader = row.try_get(0).map_err(|_| {
UnexpectedSnafu {
violated: "Failed to get the result of get lock",
}
.build()
})?;
if is_leader {
self.leader_action().await?;
} else {
return UnexpectedSnafu {
violated: "Failed to get the result of acquiring advisory lock".to_string(),
}
.fail();
self.follower_action().await?;
}
let _ = keep_alive_interval.tick().await;
}
@@ -747,7 +744,7 @@ mod tests {
});
if let Some(table_name) = table_name {
let create_table_sql = format!(
"CREATE TABLE IF NOT EXISTS {}(k bytea PRIMARY KEY, v bytea);",
"CREATE TABLE IF NOT EXISTS \"{}\"(k bytea PRIMARY KEY, v bytea);",
table_name
);
client.execute(&create_table_sql, &[]).await.unwrap();
@@ -756,7 +753,7 @@ mod tests {
}
async fn drop_table(client: &Client, table_name: &str) {
let sql = format!("DROP TABLE IF EXISTS {};", table_name);
let sql = format!("DROP TABLE IF EXISTS \"{}\";", table_name);
client.execute(&sql, &[]).await.unwrap();
}

View File

@@ -343,6 +343,16 @@ pub enum Error {
location: Location,
},
#[cfg(feature = "mysql_kvbackend")]
#[snafu(display("Failed to parse mysql url: {}", mysql_url))]
ParseMySqlUrl {
#[snafu(source)]
error: sqlx::error::Error,
mysql_url: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to find table route for {table_id}"))]
TableRouteNotFound {
table_id: TableId,
@@ -729,6 +739,34 @@ pub enum Error {
location: Location,
},
#[cfg(feature = "mysql_kvbackend")]
#[snafu(display("Failed to execute via mysql, sql: {}", sql))]
MySqlExecution {
#[snafu(source)]
error: sqlx::Error,
#[snafu(implicit)]
location: Location,
sql: String,
},
#[cfg(feature = "mysql_kvbackend")]
#[snafu(display("Failed to create mysql pool"))]
CreateMySqlPool {
#[snafu(source)]
error: sqlx::Error,
#[snafu(implicit)]
location: Location,
},
#[cfg(feature = "mysql_kvbackend")]
#[snafu(display("Failed to connect to mysql"))]
ConnectMySql {
#[snafu(source)]
error: sqlx::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Handler not found: {}", name))]
HandlerNotFound {
name: String,
@@ -911,6 +949,11 @@ impl ErrorExt for Error {
| Error::GetPostgresConnection { .. }
| Error::PostgresExecution { .. }
| Error::ConnectPostgres { .. } => StatusCode::Internal,
#[cfg(feature = "mysql_kvbackend")]
Error::MySqlExecution { .. }
| Error::CreateMySqlPool { .. }
| Error::ConnectMySql { .. }
| Error::ParseMySqlUrl { .. } => StatusCode::Internal,
}
}

View File

@@ -153,7 +153,7 @@ fn extract_base_info(request: &HeartbeatRequest) -> Option<(NodeInfoKey, Peer, P
return None;
};
Some((key, Peer::from(peer.clone()), info.clone()))
Some((key, peer.clone(), info.clone()))
}
async fn put_into_memory_store(ctx: &mut Context, key: NodeInfoKey, value: NodeInfo) -> Result<()> {

View File

@@ -70,7 +70,7 @@ impl HeartbeatHandler for RemapFlowPeerHandler {
async fn rewrite_node_address(ctx: &mut Context, peer: &Peer) {
let key = NodeAddressKey::with_flownode(peer.id).to_bytes();
if let Ok(value) = NodeAddressValue::new(peer.clone().into()).try_as_raw_value() {
if let Ok(value) = NodeAddressValue::new(peer.clone()).try_as_raw_value() {
let put = PutRequest {
key,
value,

View File

@@ -72,9 +72,9 @@ pub const TABLE_ID_SEQ: &str = "table_id";
pub const FLOW_ID_SEQ: &str = "flow_id";
pub const METASRV_HOME: &str = "./greptimedb_data/metasrv";
#[cfg(feature = "pg_kvbackend")]
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
pub const DEFAULT_META_TABLE_NAME: &str = "greptime_metakv";
#[cfg(feature = "pg_kvbackend")]
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
pub const DEFAULT_META_ELECTION_LOCK_ID: u64 = 1;
// The datastores that implements metadata kvbackend.
@@ -89,6 +89,9 @@ pub enum BackendImpl {
#[cfg(feature = "pg_kvbackend")]
// Postgres as metadata storage.
PostgresStore,
#[cfg(feature = "mysql_kvbackend")]
// MySql as metadata storage.
MysqlStore,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
@@ -146,7 +149,7 @@ pub struct MetasrvOptions {
pub tracing: TracingOptions,
/// The datastore for kv metadata.
pub backend: BackendImpl,
#[cfg(feature = "pg_kvbackend")]
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
/// Table name of rds kv backend.
pub meta_table_name: String,
#[cfg(feature = "pg_kvbackend")]
@@ -191,7 +194,7 @@ impl Default for MetasrvOptions {
flush_stats_factor: 3,
tracing: TracingOptions::default(),
backend: BackendImpl::EtcdStore,
#[cfg(feature = "pg_kvbackend")]
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
meta_table_name: DEFAULT_META_TABLE_NAME.to_string(),
#[cfg(feature = "pg_kvbackend")]
meta_election_lock_id: DEFAULT_META_ELECTION_LOCK_ID,

View File

@@ -59,7 +59,7 @@ pub mod engine;
pub mod error;
mod metadata_region;
mod metrics;
mod row_modifier;
pub mod row_modifier;
#[cfg(test)]
mod test_util;
mod utils;

View File

@@ -40,7 +40,7 @@ const TSID_HASH_SEED: u32 = 846793005;
///
/// - For [`PrimaryKeyEncoding::Dense`] encoding,
/// it adds two columns(`__table_id`, `__tsid`) to the row.
pub struct RowModifier {
pub(crate) struct RowModifier {
codec: SparsePrimaryKeyCodec,
}
@@ -52,7 +52,7 @@ impl RowModifier {
}
/// Modify rows with the given primary key encoding.
pub fn modify_rows(
pub(crate) fn modify_rows(
&self,
iter: RowsIter,
table_id: TableId,
@@ -145,16 +145,14 @@ impl RowModifier {
/// Fills internal columns of a row with table name and a hash of tag values.
fn fill_internal_columns(&self, table_id: TableId, iter: &RowIter<'_>) -> (Value, Value) {
let mut hasher = mur3::Hasher128::with_seed(TSID_HASH_SEED);
let mut hasher = TsidGenerator::default();
for (name, value) in iter.primary_keys_with_name() {
// The type is checked before. So only null is ignored.
if let Some(ValueData::StringValue(string)) = &value.value_data {
name.hash(&mut hasher);
string.hash(&mut hasher);
hasher.write_label(name, string);
}
}
// TSID is 64 bits, simply truncate the 128 bits hash
let (hash, _) = hasher.finish128();
let hash = hasher.finish();
(
ValueData::U32Value(table_id).into(),
@@ -163,6 +161,34 @@ impl RowModifier {
}
}
/// Tsid generator.
pub struct TsidGenerator {
hasher: mur3::Hasher128,
}
impl Default for TsidGenerator {
fn default() -> Self {
Self {
hasher: mur3::Hasher128::with_seed(TSID_HASH_SEED),
}
}
}
impl TsidGenerator {
/// Writes a label pair to the generator.
pub fn write_label(&mut self, name: &str, value: &str) {
name.hash(&mut self.hasher);
value.hash(&mut self.hasher);
}
/// Generates a new TSID.
pub fn finish(&mut self) -> u64 {
// TSID is 64 bits, simply truncate the 128 bits hash
let (hash, _) = self.hasher.finish128();
hash
}
}
/// Index of a value.
#[derive(Debug, Clone, Copy)]
struct ValueIndex {

View File

@@ -121,7 +121,7 @@ impl AccessLayer {
/// Writes a SST with specific `file_id` and `metadata` to the layer.
///
/// Returns the info of the SST. If no data written, returns None.
pub(crate) async fn write_sst(
pub async fn write_sst(
&self,
request: SstWriteRequest,
write_opts: &WriteOptions,
@@ -191,26 +191,26 @@ impl AccessLayer {
/// `OperationType` represents the origin of the `SstWriteRequest`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(crate) enum OperationType {
pub enum OperationType {
Flush,
Compact,
}
/// Contents to build a SST.
pub(crate) struct SstWriteRequest {
pub(crate) op_type: OperationType,
pub(crate) metadata: RegionMetadataRef,
pub(crate) source: Source,
pub(crate) cache_manager: CacheManagerRef,
pub struct SstWriteRequest {
pub op_type: OperationType,
pub metadata: RegionMetadataRef,
pub source: Source,
pub cache_manager: CacheManagerRef,
#[allow(dead_code)]
pub(crate) storage: Option<String>,
pub(crate) max_sequence: Option<SequenceNumber>,
pub storage: Option<String>,
pub max_sequence: Option<SequenceNumber>,
/// Configs for index
pub(crate) index_options: IndexOptions,
pub(crate) inverted_index_config: InvertedIndexConfig,
pub(crate) fulltext_index_config: FulltextIndexConfig,
pub(crate) bloom_filter_index_config: BloomFilterConfig,
pub index_options: IndexOptions,
pub inverted_index_config: InvertedIndexConfig,
pub fulltext_index_config: FulltextIndexConfig,
pub bloom_filter_index_config: BloomFilterConfig,
}
pub(crate) async fn new_fs_cache_store(root: &str) -> Result<ObjectStore> {

View File

@@ -46,6 +46,7 @@ const INDEX_CREATE_MEM_THRESHOLD_FACTOR: u64 = 16;
pub(crate) const FETCH_OPTION_TIMEOUT: Duration = Duration::from_secs(3);
/// Configuration for [MitoEngine](crate::engine::MitoEngine).
/// Before using the config, make sure to call `MitoConfig::validate()` to check if the config is valid.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(default)]
pub struct MitoConfig {

View File

@@ -42,6 +42,14 @@ use crate::worker::WorkerId;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("External error, context: {}", context))]
External {
source: BoxedError,
context: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to encode sparse primary key, reason: {}", reason))]
EncodeSparsePrimaryKey {
reason: String,
@@ -773,6 +781,50 @@ pub enum Error {
#[snafu(display("checksum mismatch (actual: {}, expected: {})", actual, expected))]
ChecksumMismatch { actual: u32, expected: u32 },
#[snafu(display(
"No checkpoint found, region: {}, last_version: {}",
region_id,
last_version
))]
NoCheckpoint {
region_id: RegionId,
last_version: ManifestVersion,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"No manifests found in range: [{}..{}), region: {}, last_version: {}",
start_version,
end_version,
region_id,
last_version
))]
NoManifests {
region_id: RegionId,
start_version: ManifestVersion,
end_version: ManifestVersion,
last_version: ManifestVersion,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Failed to install manifest to {}, region: {}, available manifest version: {}, last version: {}",
target_version,
available_version,
region_id,
last_version
))]
InstallManifestTo {
region_id: RegionId,
target_version: ManifestVersion,
available_version: ManifestVersion,
#[snafu(implicit)]
location: Location,
last_version: ManifestVersion,
},
#[snafu(display("Region {} is stopped", region_id))]
RegionStopped {
region_id: RegionId,
@@ -1011,7 +1063,10 @@ impl ErrorExt for Error {
| OperateAbortedIndex { .. }
| UnexpectedReplay { .. }
| IndexEncodeNull { .. }
| UnexpectedImpureDefault { .. } => StatusCode::Unexpected,
| UnexpectedImpureDefault { .. }
| NoCheckpoint { .. }
| NoManifests { .. }
| InstallManifestTo { .. } => StatusCode::Unexpected,
RegionNotFound { .. } => StatusCode::RegionNotFound,
ObjectStoreNotFound { .. }
| InvalidScanIndex { .. }
@@ -1090,6 +1145,8 @@ impl ErrorExt for Error {
InvalidConfig { .. } => StatusCode::InvalidArguments,
StaleLogEntry { .. } => StatusCode::Unexpected,
External { source, .. } => source.status_code(),
FilterRecordBatch { source, .. } => source.status_code(),
Download { .. } | Upload { .. } => StatusCode::StorageUnavailable,

View File

@@ -23,8 +23,8 @@
#[cfg_attr(feature = "test", allow(unused))]
pub mod test_util;
mod access_layer;
mod cache;
pub mod access_layer;
pub mod cache;
pub mod compaction;
pub mod config;
pub mod engine;

View File

@@ -23,7 +23,9 @@ use snafu::{ensure, OptionExt, ResultExt};
use store_api::manifest::{ManifestVersion, MAX_VERSION, MIN_VERSION};
use store_api::metadata::RegionMetadataRef;
use crate::error::{self, RegionStoppedSnafu, Result};
use crate::error::{
self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result,
};
use crate::manifest::action::{
RegionChange, RegionCheckpoint, RegionManifest, RegionManifestBuilder, RegionMetaAction,
RegionMetaActionList,
@@ -197,9 +199,9 @@ impl RegionManifestManager {
let checkpoint = Self::last_checkpoint(&mut store).await?;
let last_checkpoint_version = checkpoint
.as_ref()
.map(|checkpoint| checkpoint.last_version)
.map(|(checkpoint, _)| checkpoint.last_version)
.unwrap_or(MIN_VERSION);
let mut manifest_builder = if let Some(checkpoint) = checkpoint {
let mut manifest_builder = if let Some((checkpoint, _)) = checkpoint {
info!(
"Recover region manifest {} from checkpoint version {}",
options.manifest_dir, checkpoint.last_version
@@ -275,6 +277,153 @@ impl RegionManifestManager {
self.stopped = true;
}
/// Installs the manifest changes from the current version to the target version (inclusive).
///
/// Returns installed version.
/// **Note**: This function is not guaranteed to install the target version strictly.
/// The installed version may be greater than the target version.
pub async fn install_manifest_to(
&mut self,
target_version: ManifestVersion,
) -> Result<ManifestVersion> {
let _t = MANIFEST_OP_ELAPSED
.with_label_values(&["install_manifest_to"])
.start_timer();
// Case 1: If the target version is less than the current version, return the current version.
if self.last_version >= target_version {
debug!(
"Target version {} is less than or equal to the current version {}, region: {}, skip install",
target_version, self.last_version, self.manifest.metadata.region_id
);
return Ok(self.last_version);
}
ensure!(
!self.stopped,
RegionStoppedSnafu {
region_id: self.manifest.metadata.region_id,
}
);
// Fetches manifests from the last version strictly.
let mut manifests = self
.store
// Invariant: last_version < target_version.
.fetch_manifests_strict_from(self.last_version + 1, target_version + 1)
.await?;
// Case 2: No manifests in range: [current_version+1, target_version+1)
//
// |---------Has been deleted------------| [Checkpoint Version]...[Latest Version]
// [Leader region]
// [Current Version]......[Target Version]
// [Follower region]
if manifests.is_empty() {
debug!(
"Manifests are not strict from {}, region: {}, tries to install the last checkpoint",
self.last_version, self.manifest.metadata.region_id
);
let last_version = self.install_last_checkpoint().await?;
// Case 2.1: If the installed checkpoint version is greater than or equal to the target version, return the last version.
if last_version >= target_version {
return Ok(last_version);
}
// Fetches manifests from the installed version strictly.
manifests = self
.store
// Invariant: last_version < target_version.
.fetch_manifests_strict_from(last_version + 1, target_version + 1)
.await?;
}
if manifests.is_empty() {
return NoManifestsSnafu {
region_id: self.manifest.metadata.region_id,
start_version: self.last_version + 1,
end_version: target_version + 1,
last_version: self.last_version,
}
.fail();
}
debug_assert_eq!(manifests.first().unwrap().0, self.last_version + 1);
let mut manifest_builder =
RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
for (manifest_version, raw_action_list) in manifests {
self.store
.set_delta_file_size(manifest_version, raw_action_list.len() as u64);
let action_list = RegionMetaActionList::decode(&raw_action_list)?;
for action in action_list.actions {
match action {
RegionMetaAction::Change(action) => {
manifest_builder.apply_change(manifest_version, action);
}
RegionMetaAction::Edit(action) => {
manifest_builder.apply_edit(manifest_version, action);
}
RegionMetaAction::Remove(_) => {
debug!(
"Unhandled action for region {}, action: {:?}",
self.manifest.metadata.region_id, action
);
}
RegionMetaAction::Truncate(action) => {
manifest_builder.apply_truncate(manifest_version, action);
}
}
}
}
let new_manifest = manifest_builder.try_build()?;
ensure!(
new_manifest.manifest_version >= target_version,
InstallManifestToSnafu {
region_id: self.manifest.metadata.region_id,
target_version,
available_version: new_manifest.manifest_version,
last_version: self.last_version,
}
);
let version = self.last_version;
self.manifest = Arc::new(new_manifest);
self.last_version = self.manifest.manifest_version;
info!(
"Install manifest changes from {} to {}, region: {}",
version, self.last_version, self.manifest.metadata.region_id
);
Ok(self.last_version)
}
/// Installs the last checkpoint.
pub(crate) async fn install_last_checkpoint(&mut self) -> Result<ManifestVersion> {
let Some((checkpoint, checkpoint_size)) = Self::last_checkpoint(&mut self.store).await?
else {
return NoCheckpointSnafu {
region_id: self.manifest.metadata.region_id,
last_version: self.last_version,
}
.fail();
};
self.store.reset_manifest_size();
self.store
.set_checkpoint_file_size(checkpoint.last_version, checkpoint_size);
let builder = RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint);
let manifest = builder.try_build()?;
self.last_version = manifest.manifest_version;
self.manifest = Arc::new(manifest);
info!(
"Installed region manifest from checkpoint: {}, region: {}",
checkpoint.last_version, self.manifest.metadata.region_id
);
Ok(self.last_version)
}
/// Updates the manifest. Returns the current manifest version number.
pub async fn update(&mut self, action_list: RegionMetaActionList) -> Result<ManifestVersion> {
let _t = MANIFEST_OP_ELAPSED
@@ -371,14 +520,17 @@ impl RegionManifestManager {
}
/// Fetches the last [RegionCheckpoint] from storage.
///
/// If the checkpoint is not found, returns `None`.
/// Otherwise, returns the checkpoint and the size of the checkpoint.
pub(crate) async fn last_checkpoint(
store: &mut ManifestObjectStore,
) -> Result<Option<RegionCheckpoint>> {
) -> Result<Option<(RegionCheckpoint, u64)>> {
let last_checkpoint = store.load_last_checkpoint().await?;
if let Some((_, bytes)) = last_checkpoint {
let checkpoint = RegionCheckpoint::decode(&bytes)?;
Ok(Some(checkpoint))
Ok(Some((checkpoint, bytes.len() as u64)))
} else {
Ok(None)
}

View File

@@ -236,7 +236,31 @@ impl ManifestObjectStore {
Ok(entries)
}
/// Fetch all manifests in concurrent.
/// Fetches manifests in range [start_version, end_version).
///
/// This functions is guaranteed to return manifests from the `start_version` strictly (must contain `start_version`).
pub async fn fetch_manifests_strict_from(
&self,
start_version: ManifestVersion,
end_version: ManifestVersion,
) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
let mut manifests = self.fetch_manifests(start_version, end_version).await?;
let start_index = manifests.iter().position(|(v, _)| *v == start_version);
debug!(
"fetches manifests in range [{},{}), start_index: {:?}",
start_version, end_version, start_index
);
if let Some(start_index) = start_index {
Ok(manifests.split_off(start_index))
} else {
Ok(vec![])
}
}
/// Fetch all manifests in concurrent, and return the manifests in range [start_version, end_version)
///
/// **Notes**: This function is no guarantee to return manifests from the `start_version` strictly.
/// Uses [fetch_manifests_strict_from](ManifestObjectStore::fetch_manifests_strict_from) to get manifests from the `start_version`.
pub async fn fetch_manifests(
&self,
start_version: ManifestVersion,
@@ -576,6 +600,12 @@ impl ManifestObjectStore {
self.manifest_size_map.read().unwrap().values().sum()
}
/// Resets the size of all files.
pub(crate) fn reset_manifest_size(&mut self) {
self.manifest_size_map.write().unwrap().clear();
self.total_manifest_size.store(0, Ordering::Relaxed);
}
/// Set the size of the delta file by delta version.
pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) {
let mut m = self.manifest_size_map.write().unwrap();
@@ -585,7 +615,7 @@ impl ManifestObjectStore {
}
/// Set the size of the checkpoint file by checkpoint version.
fn set_checkpoint_file_size(&self, version: ManifestVersion, size: u64) {
pub(crate) fn set_checkpoint_file_size(&self, version: ManifestVersion, size: u64) {
let mut m = self.manifest_size_map.write().unwrap();
m.insert(FileKey::Checkpoint(version), size);
@@ -595,6 +625,7 @@ impl ManifestObjectStore {
fn unset_file_size(&self, key: &FileKey) {
let mut m = self.manifest_size_map.write().unwrap();
if let Some(val) = m.remove(key) {
debug!("Unset file size: {:?}, size: {}", key, val);
self.dec_total_manifest_size(val);
}
}

View File

@@ -44,6 +44,18 @@ async fn build_manager(
(env, manager)
}
async fn build_manager_with_initial_metadata(
env: &TestEnv,
checkpoint_distance: u64,
compress_type: CompressionType,
) -> RegionManifestManager {
let metadata = Arc::new(basic_region_metadata());
env.create_manifest_manager(compress_type, checkpoint_distance, Some(metadata.clone()))
.await
.unwrap()
.unwrap()
}
async fn reopen_manager(
env: &TestEnv,
checkpoint_distance: u64,
@@ -265,4 +277,142 @@ async fn generate_checkpoint_with_compression_types(
.await
.unwrap()
.unwrap()
.0
}
fn generate_action_lists(num: usize) -> (Vec<FileId>, Vec<RegionMetaActionList>) {
let mut files = vec![];
let mut actions = vec![];
for _ in 0..num {
let file_id = FileId::random();
files.push(file_id);
let file_meta = FileMeta {
region_id: RegionId::new(123, 456),
file_id,
time_range: (0.into(), 10000000.into()),
level: 0,
file_size: 1024000,
available_indexes: Default::default(),
index_file_size: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,
};
let action = RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
files_to_add: vec![file_meta],
files_to_remove: vec![],
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
})]);
actions.push(action);
}
(files, actions)
}
#[tokio::test]
async fn manifest_install_manifest_to() {
common_telemetry::init_default_ut_logging();
let (env, mut manager) = build_manager(0, CompressionType::Uncompressed).await;
let (files, actions) = generate_action_lists(10);
for action in actions {
manager.update(action).await.unwrap();
}
// Nothing to install
let target_version = manager.manifest().manifest_version;
let installed_version = manager.install_manifest_to(target_version).await.unwrap();
assert_eq!(target_version, installed_version);
let mut another_manager =
build_manager_with_initial_metadata(&env, 0, CompressionType::Uncompressed).await;
// install manifest changes
let target_version = manager.manifest().manifest_version;
let installed_version = another_manager
.install_manifest_to(target_version - 1)
.await
.unwrap();
assert_eq!(target_version - 1, installed_version);
for file_id in files[0..9].iter() {
assert!(another_manager.manifest().files.contains_key(file_id));
}
let installed_version = another_manager
.install_manifest_to(target_version)
.await
.unwrap();
assert_eq!(target_version, installed_version);
for file_id in files.iter() {
assert!(another_manager.manifest().files.contains_key(file_id));
}
}
#[tokio::test]
async fn manifest_install_manifest_to_with_checkpoint() {
common_telemetry::init_default_ut_logging();
let (env, mut manager) = build_manager(3, CompressionType::Uncompressed).await;
let (files, actions) = generate_action_lists(10);
for action in actions {
manager.update(action).await.unwrap();
while manager.checkpointer().is_doing_checkpoint() {
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
// has checkpoint
assert!(manager
.store()
.load_last_checkpoint()
.await
.unwrap()
.is_some());
// check files
let mut expected = vec![
"/",
"00000000000000000006.checkpoint",
"00000000000000000007.json",
"00000000000000000008.json",
"00000000000000000009.checkpoint",
"00000000000000000009.json",
"00000000000000000010.json",
"_last_checkpoint",
];
expected.sort_unstable();
let mut paths = manager
.store()
.get_paths(|e| Some(e.name().to_string()))
.await
.unwrap();
paths.sort_unstable();
assert_eq!(expected, paths);
let mut another_manager =
build_manager_with_initial_metadata(&env, 0, CompressionType::Uncompressed).await;
// Install 9 manifests
let target_version = manager.manifest().manifest_version;
let installed_version = another_manager
.install_manifest_to(target_version - 1)
.await
.unwrap();
assert_eq!(target_version - 1, installed_version);
for file_id in files[0..9].iter() {
assert!(another_manager.manifest().files.contains_key(file_id));
}
// Install all manifests
let target_version = manager.manifest().manifest_version;
let installed_version = another_manager
.install_manifest_to(target_version)
.await
.unwrap();
assert_eq!(target_version, installed_version);
for file_id in files.iter() {
assert!(another_manager.manifest().files.contains_key(file_id));
}
assert_eq!(4217, another_manager.store().total_manifest_size());
}

View File

@@ -14,7 +14,7 @@
//! Mito region.
pub(crate) mod opener;
pub mod opener;
pub mod options;
pub(crate) mod version;

View File

@@ -16,7 +16,7 @@
use std::any::TypeId;
use std::collections::HashMap;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::{AtomicI64, AtomicU64};
use std::sync::Arc;
use common_telemetry::{debug, error, info, warn};
@@ -30,7 +30,9 @@ use object_store::util::{join_dir, normalize_dir};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
use store_api::metadata::{
ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
};
use store_api::region_engine::RegionRole;
use store_api::storage::{ColumnId, RegionId};
@@ -42,6 +44,7 @@ use crate::error::{
EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu,
Result, StaleLogEntrySnafu,
};
use crate::manifest::action::RegionManifest;
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::manifest::storage::manifest_compress_type;
use crate::memtable::time_partition::TimePartitions;
@@ -207,11 +210,16 @@ impl RegionOpener {
}
// Safety: must be set before calling this method.
let options = self.options.take().unwrap();
let object_store = self.object_store(&options.storage)?.clone();
let object_store = get_object_store(&options.storage, &self.object_store_manager)?;
let provider = self.provider::<S>(&options.wal_options)?;
let metadata = Arc::new(metadata);
// Create a manifest manager for this region and writes regions to the manifest file.
let region_manifest_options = self.manifest_options(config, &options)?;
let region_manifest_options = Self::manifest_options(
config,
&options,
&self.region_dir,
&self.object_store_manager,
)?;
let manifest_manager = RegionManifestManager::new(
metadata.clone(),
region_manifest_options,
@@ -334,7 +342,12 @@ impl RegionOpener {
) -> Result<Option<MitoRegion>> {
let region_options = self.options.as_ref().unwrap().clone();
let region_manifest_options = self.manifest_options(config, &region_options)?;
let region_manifest_options = Self::manifest_options(
config,
&region_options,
&self.region_dir,
&self.object_store_manager,
)?;
let Some(manifest_manager) = RegionManifestManager::open(
region_manifest_options,
self.stats.total_manifest_size.clone(),
@@ -354,7 +367,7 @@ impl RegionOpener {
.take()
.unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
let on_region_opened = wal.on_region_opened();
let object_store = self.object_store(&region_options.storage)?.clone();
let object_store = get_object_store(&region_options.storage, &self.object_store_manager)?;
debug!("Open region {} with options: {:?}", region_id, self.options);
@@ -444,13 +457,14 @@ impl RegionOpener {
/// Returns a new manifest options.
fn manifest_options(
&self,
config: &MitoConfig,
options: &RegionOptions,
region_dir: &str,
object_store_manager: &ObjectStoreManagerRef,
) -> Result<RegionManifestOptions> {
let object_store = self.object_store(&options.storage)?.clone();
let object_store = get_object_store(&options.storage, object_store_manager)?;
Ok(RegionManifestOptions {
manifest_dir: new_manifest_dir(&self.region_dir),
manifest_dir: new_manifest_dir(region_dir),
object_store,
// We don't allow users to set the compression algorithm as we use it as a file suffix.
// Currently, the manifest storage doesn't have good support for changing compression algorithms.
@@ -458,20 +472,72 @@ impl RegionOpener {
checkpoint_distance: config.manifest_checkpoint_distance,
})
}
}
/// Returns an object store corresponding to `name`. If `name` is `None`, this method returns the default object store.
fn object_store(&self, name: &Option<String>) -> Result<&object_store::ObjectStore> {
if let Some(name) = name {
Ok(self
.object_store_manager
.find(name)
.context(ObjectStoreNotFoundSnafu {
object_store: name.to_string(),
})?)
} else {
Ok(self.object_store_manager.default_object_store())
/// Returns an object store corresponding to `name`. If `name` is `None`, this method returns the default object store.
pub fn get_object_store(
name: &Option<String>,
object_store_manager: &ObjectStoreManagerRef,
) -> Result<object_store::ObjectStore> {
if let Some(name) = name {
Ok(object_store_manager
.find(name)
.with_context(|| ObjectStoreNotFoundSnafu {
object_store: name.to_string(),
})?
.clone())
} else {
Ok(object_store_manager.default_object_store().clone())
}
}
/// A loader for loading metadata from a region dir.
pub struct RegionMetadataLoader {
config: Arc<MitoConfig>,
object_store_manager: ObjectStoreManagerRef,
}
impl RegionMetadataLoader {
/// Creates a new `RegionOpenerBuilder`.
pub fn new(config: Arc<MitoConfig>, object_store_manager: ObjectStoreManagerRef) -> Self {
Self {
config,
object_store_manager,
}
}
/// Loads the metadata of the region from the region dir.
pub async fn load(
&self,
region_dir: &str,
region_options: &RegionOptions,
) -> Result<Option<RegionMetadataRef>> {
let manifest = self.load_manifest(region_dir, region_options).await?;
Ok(manifest.map(|m| m.metadata.clone()))
}
/// Loads the manifest of the region from the region dir.
pub async fn load_manifest(
&self,
region_dir: &str,
region_options: &RegionOptions,
) -> Result<Option<Arc<RegionManifest>>> {
let region_manifest_options = RegionOpener::manifest_options(
&self.config,
region_options,
region_dir,
&self.object_store_manager,
)?;
let Some(manifest_manager) =
RegionManifestManager::open(region_manifest_options, Arc::new(AtomicU64::new(0)))
.await?
else {
return Ok(None);
};
let manifest = manifest_manager.manifest();
Ok(Some(manifest))
}
}
/// Checks whether the recovered region has the same schema as region to create.

View File

@@ -33,6 +33,8 @@ use crate::row_converter::dense::SortField;
use crate::row_converter::{CompositeValues, PrimaryKeyCodec, PrimaryKeyFilter};
/// A codec for sparse key of metrics.
/// It requires the input primary key columns are sorted by the column name in lexicographical order.
/// It encodes the column id of the physical region.
#[derive(Clone, Debug)]
pub struct SparsePrimaryKeyCodec {
inner: Arc<SparsePrimaryKeyCodecInner>,

View File

@@ -16,9 +16,9 @@ pub(crate) mod bloom_filter;
mod codec;
pub(crate) mod fulltext_index;
mod indexer;
pub(crate) mod intermediate;
pub mod intermediate;
pub(crate) mod inverted_index;
pub(crate) mod puffin_manager;
pub mod puffin_manager;
mod statistics;
pub(crate) mod store;

View File

@@ -49,6 +49,11 @@ impl IntermediateManager {
/// Create a new `IntermediateManager` with the given root path.
/// It will clean up all garbage intermediate files from previous runs.
pub async fn init_fs(aux_path: impl AsRef<str>) -> Result<Self> {
common_telemetry::info!(
"Initializing intermediate manager, aux_path: {}",
aux_path.as_ref()
);
let store = new_fs_cache_store(&normalize_dir(aux_path.as_ref())).await?;
let store = InstrumentedStore::new(store);

View File

@@ -61,6 +61,7 @@ impl Default for WriteOptions {
}
/// Parquet SST info returned by the writer.
#[derive(Debug)]
pub struct SstInfo {
/// SST file id.
pub file_id: FileId,

View File

@@ -28,7 +28,7 @@ use api::v1::{
use catalog::CatalogManagerRef;
use client::{OutputData, OutputMeta};
use common_catalog::consts::{
default_engine, PARENT_SPAN_ID_COLUMN, SPAN_NAME_COLUMN, TRACE_ID_COLUMN,
default_engine, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, TRACE_ID_COLUMN,
};
use common_grpc_expr::util::ColumnExpr;
use common_meta::cache::TableFlownodeSetCacheRef;
@@ -54,7 +54,10 @@ use store_api::metric_engine_consts::{
use store_api::mito_engine_options::{APPEND_MODE_KEY, MERGE_MODE_KEY};
use store_api::storage::{RegionId, TableId};
use table::metadata::TableInfo;
use table::requests::{InsertRequest as TableInsertRequest, AUTO_CREATE_TABLE_KEY, TTL_KEY};
use table::requests::{
InsertRequest as TableInsertRequest, AUTO_CREATE_TABLE_KEY, TABLE_DATA_MODEL,
TABLE_DATA_MODEL_TRACE_V1, TTL_KEY,
};
use table::table_reference::TableReference;
use table::TableRef;
@@ -578,7 +581,8 @@ impl Inserter {
// - trace_id: when searching by trace id
// - parent_span_id: when searching root span
// - span_name: when searching certain types of span
let index_columns = [TRACE_ID_COLUMN, PARENT_SPAN_ID_COLUMN, SPAN_NAME_COLUMN];
let index_columns =
[TRACE_ID_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN];
for index_column in index_columns {
if let Some(col) = create_table
.column_defs
@@ -595,6 +599,12 @@ impl Inserter {
}
}
// use table_options to mark table model version
create_table.table_options.insert(
TABLE_DATA_MODEL.to_string(),
TABLE_DATA_MODEL_TRACE_V1.to_string(),
);
let table = self
.create_physical_table(
create_table,

View File

@@ -13,7 +13,8 @@
// limitations under the License.
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use pipeline::{json_to_intermediate_state, parse, Content, GreptimeTransformer, Pipeline, Result};
use pipeline::error::Result;
use pipeline::{json_to_intermediate_state, parse, Content, GreptimeTransformer, Pipeline};
use serde_json::{Deserializer, Value};
fn processor_mut(

View File

@@ -16,7 +16,7 @@ use common_telemetry::debug;
use snafu::OptionExt;
use yaml_rust::Yaml;
use crate::etl::error::{
use crate::error::{
Error, FieldRequiredForDispatcherSnafu, Result, TableSuffixRequiredForDispatcherRuleSnafu,
ValueRequiredForDispatcherRuleSnafu,
};

View File

@@ -17,6 +17,7 @@ use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use datatypes::timestamp::TimestampNanosecond;
use snafu::{Location, Snafu};
#[derive(Snafu)]
@@ -51,7 +52,7 @@ pub enum Error {
#[snafu(display("Processor {processor}: expect string value, but got {v:?}"))]
ProcessorExpectString {
processor: String,
v: crate::etl::Value,
v: crate::Value,
#[snafu(implicit)]
location: Location,
},
@@ -607,13 +608,197 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Pipeline table not found"))]
PipelineTableNotFound {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to insert pipeline to pipelines table"))]
InsertPipeline {
#[snafu(source)]
source: operator::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Pipeline not found, name: {}, version: {}", name, version.map(|ts| ts.0.to_iso8601_string()).unwrap_or("latest".to_string())))]
PipelineNotFound {
name: String,
version: Option<TimestampNanosecond>,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to collect record batch"))]
CollectRecords {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
source: common_recordbatch::error::Error,
},
#[snafu(display("Failed to cast type, msg: {}", msg))]
CastType {
msg: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to build DataFusion logical plan"))]
BuildDfLogicalPlan {
#[snafu(source)]
error: datafusion_common::DataFusionError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to execute internal statement"))]
ExecuteInternalStatement {
#[snafu(source)]
source: query::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to create dataframe"))]
DataFrame {
#[snafu(source)]
source: query::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("General catalog error"))]
Catalog {
#[snafu(source)]
source: catalog::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to create table"))]
CreateTable {
#[snafu(source)]
source: operator::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid pipeline version format: {}", version))]
InvalidPipelineVersion {
version: String,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
StatusCode::InvalidArguments
use Error::*;
match self {
CastType { .. } => StatusCode::Unexpected,
PipelineTableNotFound { .. } => StatusCode::TableNotFound,
InsertPipeline { source, .. } => source.status_code(),
CollectRecords { source, .. } => source.status_code(),
PipelineNotFound { .. } | InvalidPipelineVersion { .. } => StatusCode::InvalidArguments,
BuildDfLogicalPlan { .. } => StatusCode::Internal,
ExecuteInternalStatement { source, .. } => source.status_code(),
DataFrame { source, .. } => source.status_code(),
Catalog { source, .. } => source.status_code(),
CreateTable { source, .. } => source.status_code(),
EmptyInputField { .. }
| MissingInputField { .. }
| ProcessorMustBeMap { .. }
| ProcessorMissingField { .. }
| ProcessorExpectString { .. }
| ProcessorUnsupportedValue { .. }
| ProcessorKeyMustBeString { .. }
| ProcessorFailedToParseString { .. }
| ProcessorMustHaveStringKey { .. }
| UnsupportedProcessor { .. }
| FieldMustBeType { .. }
| FailedParseFieldFromString { .. }
| FailedToParseIntKey { .. }
| FailedToParseInt { .. }
| FailedToParseFloatKey { .. }
| IntermediateKeyIndex { .. }
| CmcdMissingValue { .. }
| CmcdMissingKey { .. }
| KeyMustBeString { .. }
| CsvRead { .. }
| CsvNoRecord { .. }
| CsvSeparatorName { .. }
| CsvQuoteName { .. }
| DateParseTimezone { .. }
| DateParse { .. }
| DateFailedToGetLocalTimezone { .. }
| DateFailedToGetTimestamp { .. }
| DateInvalidFormat { .. }
| DissectInvalidPattern { .. }
| DissectEmptyPattern { .. }
| DissectSplitExceedsInput { .. }
| DissectSplitNotMatchInput { .. }
| DissectConsecutiveNames { .. }
| DissectNoMatchingPattern { .. }
| DissectModifierAlreadySet { .. }
| DissectAppendOrderAlreadySet { .. }
| DissectOrderOnlyAppend { .. }
| DissectOrderOnlyAppendModifier { .. }
| DissectEndModifierAlreadySet { .. }
| EpochInvalidResolution { .. }
| GsubPatternRequired { .. }
| GsubReplacementRequired { .. }
| Regex { .. }
| JoinSeparatorRequired { .. }
| LetterInvalidMethod { .. }
| RegexNamedGroupNotFound { .. }
| RegexNoValidField { .. }
| RegexNoValidPattern { .. }
| UrlEncodingInvalidMethod { .. }
| DigestPatternInvalid { .. }
| UrlEncodingDecode { .. }
| TransformOnFailureInvalidValue { .. }
| TransformElementMustBeMap { .. }
| TransformTypeMustBeSet { .. }
| TransformEmpty { .. }
| TransformColumnNameMustBeUnique { .. }
| TransformMultipleTimestampIndex { .. }
| TransformTimestampIndexCount { .. }
| CoerceUnsupportedNullType { .. }
| CoerceUnsupportedNullTypeTo { .. }
| CoerceUnsupportedEpochType { .. }
| CoerceStringToType { .. }
| CoerceJsonTypeTo { .. }
| CoerceTypeToJson { .. }
| CoerceIncompatibleTypes { .. }
| ValueInvalidResolution { .. }
| ValueParseType { .. }
| ValueParseInt { .. }
| ValueParseFloat { .. }
| ValueParseBoolean { .. }
| ValueDefaultValueUnsupported { .. }
| ValueUnsupportedNumberType { .. }
| ValueUnsupportedYamlType { .. }
| ValueYamlKeyMustBeString { .. }
| YamlLoad { .. }
| YamlParse { .. }
| PrepareValueMustBeObject { .. }
| ColumnOptions { .. }
| UnsupportedIndexType { .. }
| UnsupportedNumberType { .. }
| IdentifyPipelineColumnTypeMismatch { .. }
| JsonPathParse { .. }
| JsonPathParseResultIndex { .. }
| FieldRequiredForDispatcher
| TableSuffixRequiredForDispatcherRule
| ValueRequiredForDispatcherRule
| ReachedMaxNestedLevels { .. } => StatusCode::InvalidArguments,
}
}
fn as_any(&self) -> &dyn Any {

View File

@@ -13,16 +13,11 @@
// limitations under the License.
#![allow(dead_code)]
pub mod error;
pub mod field;
pub mod processor;
pub mod transform;
pub mod value;
use error::{
IntermediateKeyIndexSnafu, PrepareValueMustBeObjectSnafu, YamlLoadSnafu, YamlParseSnafu,
};
use processor::{Processor, Processors};
use snafu::{ensure, OptionExt, ResultExt};
use transform::{Transformer, Transforms};
@@ -30,7 +25,9 @@ use value::Value;
use yaml_rust::YamlLoader;
use crate::dispatcher::{Dispatcher, Rule};
use crate::etl::error::Result;
use crate::error::{
IntermediateKeyIndexSnafu, PrepareValueMustBeObjectSnafu, Result, YamlLoadSnafu, YamlParseSnafu,
};
const DESCRIPTION: &str = "description";
const PROCESSORS: &str = "processors";

View File

@@ -17,8 +17,7 @@ use std::str::FromStr;
use snafu::OptionExt;
use super::error::{EmptyInputFieldSnafu, MissingInputFieldSnafu};
use crate::etl::error::{Error, Result};
use crate::error::{EmptyInputFieldSnafu, Error, MissingInputFieldSnafu, Result};
/// Raw processor-defined inputs and outputs
#[derive(Debug, Default, Clone)]

View File

@@ -45,15 +45,13 @@ use snafu::{OptionExt, ResultExt};
use timestamp::TimestampProcessor;
use urlencoding::UrlEncodingProcessor;
use super::error::{
FailedParseFieldFromStringSnafu, FieldMustBeTypeSnafu, ProcessorKeyMustBeStringSnafu,
ProcessorMustBeMapSnafu, ProcessorMustHaveStringKeySnafu,
};
use super::field::{Field, Fields};
use super::PipelineMap;
use crate::etl::error::{Error, Result};
use crate::error::{
Error, FailedParseFieldFromStringSnafu, FieldMustBeTypeSnafu, ProcessorKeyMustBeStringSnafu,
ProcessorMustBeMapSnafu, ProcessorMustHaveStringKeySnafu, Result, UnsupportedProcessorSnafu,
};
use crate::etl::processor::simple_extract::SimpleExtractProcessor;
use crate::etl_error::UnsupportedProcessorSnafu;
const FIELD_NAME: &str = "field";
const FIELDS_NAME: &str = "fields";

View File

@@ -19,7 +19,7 @@
use snafu::{OptionExt, ResultExt};
use urlencoding::decode;
use crate::etl::error::{
use crate::error::{
CmcdMissingKeySnafu, CmcdMissingValueSnafu, Error, FailedToParseFloatKeySnafu,
FailedToParseIntKeySnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
ProcessorMissingFieldSnafu, Result,

View File

@@ -19,7 +19,7 @@ use itertools::EitherOrBoth::{Both, Left, Right};
use itertools::Itertools;
use snafu::{OptionExt, ResultExt};
use crate::etl::error::{
use crate::error::{
CsvNoRecordSnafu, CsvQuoteNameSnafu, CsvReadSnafu, CsvSeparatorNameSnafu, Error,
KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
};

View File

@@ -19,7 +19,7 @@ use chrono_tz::Tz;
use lazy_static::lazy_static;
use snafu::{OptionExt, ResultExt};
use crate::etl::error::{
use crate::error::{
DateFailedToGetLocalTimezoneSnafu, DateFailedToGetTimestampSnafu, DateParseSnafu,
DateParseTimezoneSnafu, Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
ProcessorFailedToParseStringSnafu, ProcessorMissingFieldSnafu, Result,

View File

@@ -22,7 +22,7 @@ use once_cell::sync::Lazy;
use regex::Regex;
use snafu::OptionExt;
use crate::etl::error::{
use crate::error::{
Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
};
use crate::etl::field::Fields;

View File

@@ -24,8 +24,9 @@ use std::borrow::Cow;
use regex::Regex;
use snafu::OptionExt;
use crate::etl::error::{
Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
use crate::error::{
DigestPatternInvalidSnafu, Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
ProcessorMissingFieldSnafu, Result,
};
use crate::etl::field::Fields;
use crate::etl::processor::{
@@ -33,7 +34,6 @@ use crate::etl::processor::{
};
use crate::etl::value::Value;
use crate::etl::PipelineMap;
use crate::etl_error::DigestPatternInvalidSnafu;
pub(crate) const PROCESSOR_DIGEST: &str = "digest";

View File

@@ -18,7 +18,7 @@ use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
use itertools::Itertools;
use snafu::OptionExt;
use crate::etl::error::{
use crate::error::{
DissectAppendOrderAlreadySetSnafu, DissectConsecutiveNamesSnafu, DissectEmptyPatternSnafu,
DissectEndModifierAlreadySetSnafu, DissectInvalidPatternSnafu, DissectModifierAlreadySetSnafu,
DissectNoMatchingPatternSnafu, DissectOrderOnlyAppendModifierSnafu,

View File

@@ -14,7 +14,7 @@
use snafu::{OptionExt, ResultExt};
use crate::etl::error::{
use crate::error::{
EpochInvalidResolutionSnafu, Error, FailedToParseIntSnafu, KeyMustBeStringSnafu,
ProcessorMissingFieldSnafu, ProcessorUnsupportedValueSnafu, Result,
};

View File

@@ -15,7 +15,7 @@
use regex::Regex;
use snafu::{OptionExt, ResultExt};
use crate::etl::error::{
use crate::error::{
Error, GsubPatternRequiredSnafu, GsubReplacementRequiredSnafu, KeyMustBeStringSnafu,
ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, RegexSnafu, Result,
};

View File

@@ -14,7 +14,7 @@
use snafu::OptionExt;
use crate::etl::error::{
use crate::error::{
Error, JoinSeparatorRequiredSnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
ProcessorMissingFieldSnafu, Result,
};

View File

@@ -19,12 +19,11 @@ use super::{
yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, PipelineMap, Processor, FIELDS_NAME,
FIELD_NAME, IGNORE_MISSING_NAME, JSON_PATH_NAME, JSON_PATH_RESULT_INDEX_NAME,
};
use crate::etl::error::{Error, Result};
use crate::etl::field::Fields;
use crate::etl_error::{
JsonPathParseResultIndexSnafu, JsonPathParseSnafu, KeyMustBeStringSnafu,
ProcessorMissingFieldSnafu,
use crate::error::{
Error, JsonPathParseResultIndexSnafu, JsonPathParseSnafu, KeyMustBeStringSnafu,
ProcessorMissingFieldSnafu, Result,
};
use crate::etl::field::Fields;
use crate::Value;
pub(crate) const PROCESSOR_JSON_PATH: &str = "json_path";

View File

@@ -14,7 +14,7 @@
use snafu::OptionExt;
use crate::etl::error::{
use crate::error::{
Error, KeyMustBeStringSnafu, LetterInvalidMethodSnafu, ProcessorExpectStringSnafu,
ProcessorMissingFieldSnafu, Result,
};

View File

@@ -22,7 +22,7 @@ use lazy_static::lazy_static;
use regex::Regex;
use snafu::{OptionExt, ResultExt};
use crate::etl::error::{
use crate::error::{
Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu,
RegexNamedGroupNotFoundSnafu, RegexNoValidFieldSnafu, RegexNoValidPatternSnafu, RegexSnafu,
Result,

View File

@@ -14,13 +14,12 @@
use snafu::OptionExt as _;
use crate::etl::error::{Error, Result};
use crate::error::{Error, KeyMustBeStringSnafu, ProcessorMissingFieldSnafu, Result};
use crate::etl::field::Fields;
use crate::etl::processor::{
yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME,
IGNORE_MISSING_NAME, SIMPLE_EXTRACT_KEY_NAME,
};
use crate::etl_error::{KeyMustBeStringSnafu, ProcessorMissingFieldSnafu};
use crate::{PipelineMap, Processor, Value};
pub(crate) const PROCESSOR_SIMPLE_EXTRACT: &str = "simple_extract";

View File

@@ -19,7 +19,7 @@ use chrono_tz::Tz;
use lazy_static::lazy_static;
use snafu::{OptionExt, ResultExt};
use crate::etl::error::{
use crate::error::{
DateFailedToGetLocalTimezoneSnafu, DateFailedToGetTimestampSnafu, DateInvalidFormatSnafu,
DateParseSnafu, DateParseTimezoneSnafu, EpochInvalidResolutionSnafu, Error,
KeyMustBeStringSnafu, ProcessorFailedToParseStringSnafu, ProcessorMissingFieldSnafu,

View File

@@ -15,7 +15,7 @@
use snafu::{OptionExt, ResultExt};
use urlencoding::{decode, encode};
use crate::etl::error::{
use crate::error::{
Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
UrlEncodingDecodeSnafu, UrlEncodingInvalidMethodSnafu,
};

View File

@@ -17,7 +17,14 @@ pub mod transformer;
use snafu::OptionExt;
use crate::etl::error::{Error, Result};
use super::field::Fields;
use super::processor::{yaml_new_field, yaml_new_fields, yaml_string};
use super::value::Timestamp;
use super::PipelineMap;
use crate::error::{
Error, KeyMustBeStringSnafu, Result, TransformElementMustBeMapSnafu,
TransformOnFailureInvalidValueSnafu, TransformTypeMustBeSetSnafu,
};
use crate::etl::processor::yaml_bool;
use crate::etl::transform::index::Index;
use crate::etl::value::Value;
@@ -32,15 +39,6 @@ const TRANSFORM_ON_FAILURE: &str = "on_failure";
pub use transformer::greptime::GreptimeTransformer;
use super::error::{
KeyMustBeStringSnafu, TransformElementMustBeMapSnafu, TransformOnFailureInvalidValueSnafu,
TransformTypeMustBeSetSnafu,
};
use super::field::Fields;
use super::processor::{yaml_new_field, yaml_new_fields, yaml_string};
use super::value::Timestamp;
use super::PipelineMap;
pub trait Transformer: std::fmt::Debug + Sized + Send + Sync + 'static {
type Output;
type VecOutput;

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::etl::error::{Error, Result, UnsupportedIndexTypeSnafu};
use crate::error::{Error, Result, UnsupportedIndexTypeSnafu};
const INDEX_TIMESTAMP: &str = "timestamp";
const INDEX_TIMEINDEX: &str = "time";

View File

@@ -27,7 +27,7 @@ use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
use itertools::Itertools;
use serde_json::Number;
use crate::etl::error::{
use crate::error::{
IdentifyPipelineColumnTypeMismatchSnafu, ReachedMaxNestedLevelsSnafu, Result,
TransformColumnNameMustBeUniqueSnafu, TransformEmptySnafu,
TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu,

View File

@@ -20,7 +20,7 @@ use greptime_proto::v1::value::ValueData;
use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType};
use snafu::ResultExt;
use crate::etl::error::{
use crate::error::{
CoerceIncompatibleTypesSnafu, CoerceJsonTypeToSnafu, CoerceStringToTypeSnafu,
CoerceTypeToJsonSnafu, CoerceUnsupportedEpochTypeSnafu, CoerceUnsupportedNullTypeSnafu,
CoerceUnsupportedNullTypeToSnafu, ColumnOptionsSnafu, Error, Result,

View File

@@ -28,13 +28,12 @@ use regex::Regex;
use snafu::{OptionExt, ResultExt};
pub use time::Timestamp;
use super::error::{
ValueDefaultValueUnsupportedSnafu, ValueInvalidResolutionSnafu, ValueParseBooleanSnafu,
ValueParseFloatSnafu, ValueParseIntSnafu, ValueParseTypeSnafu, ValueUnsupportedNumberTypeSnafu,
ValueUnsupportedYamlTypeSnafu, ValueYamlKeyMustBeStringSnafu,
};
use super::PipelineMap;
use crate::etl::error::{Error, Result};
use crate::error::{
Error, Result, ValueDefaultValueUnsupportedSnafu, ValueInvalidResolutionSnafu,
ValueParseBooleanSnafu, ValueParseFloatSnafu, ValueParseIntSnafu, ValueParseTypeSnafu,
ValueUnsupportedNumberTypeSnafu, ValueUnsupportedYamlTypeSnafu, ValueYamlKeyMustBeStringSnafu,
};
/// Value can be used as type
/// acts as value: the enclosed value is the actual value

View File

@@ -13,22 +13,22 @@
// limitations under the License.
mod dispatcher;
pub mod error;
mod etl;
mod manager;
mod metrics;
pub use etl::error::Result;
pub use etl::processor::Processor;
pub use etl::transform::transformer::greptime::{GreptimePipelineParams, SchemaInfo};
pub use etl::transform::transformer::identity_pipeline;
pub use etl::transform::{GreptimeTransformer, Transformer};
pub use etl::value::{Array, Map, Value};
pub use etl::{
error as etl_error, json_array_to_intermediate_state, json_to_intermediate_state, parse,
Content, DispatchedTo, Pipeline, PipelineExecOutput, PipelineMap,
json_array_to_intermediate_state, json_to_intermediate_state, parse, Content, DispatchedTo,
Pipeline, PipelineExecOutput, PipelineMap,
};
pub use manager::{
error, pipeline_operator, table, util, PipelineDefinition, PipelineInfo, PipelineRef,
pipeline_operator, table, util, PipelineDefinition, PipelineInfo, PipelineRef,
PipelineTableRef, PipelineVersion, PipelineWay, SelectInfo,
GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME,
};

View File

@@ -19,10 +19,10 @@ use datatypes::timestamp::TimestampNanosecond;
use itertools::Itertools;
use util::to_pipeline_version;
use crate::error::Result;
use crate::table::PipelineTable;
use crate::{GreptimeTransformer, Pipeline};
pub mod error;
pub mod pipeline_operator;
pub mod table;
pub mod util;
@@ -99,7 +99,7 @@ impl PipelineWay {
name: Option<&str>,
version: Option<&str>,
default_pipeline: PipelineWay,
) -> error::Result<PipelineWay> {
) -> Result<PipelineWay> {
if let Some(pipeline_name) = name {
if pipeline_name == GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME {
Ok(PipelineWay::OtlpTraceDirectV1)

View File

@@ -1,153 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use datatypes::timestamp::TimestampNanosecond;
use snafu::{Location, Snafu};
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Pipeline table not found"))]
PipelineTableNotFound {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to insert pipeline to pipelines table"))]
InsertPipeline {
#[snafu(source)]
source: operator::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to parse pipeline"))]
CompilePipeline {
#[snafu(source)]
source: crate::etl::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Pipeline not found, name: {}, version: {}", name, version.map(|ts| ts.0.to_iso8601_string()).unwrap_or("latest".to_string())))]
PipelineNotFound {
name: String,
version: Option<TimestampNanosecond>,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to collect record batch"))]
CollectRecords {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
source: common_recordbatch::error::Error,
},
#[snafu(display("Failed to cast type, msg: {}", msg))]
CastType {
msg: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to build DataFusion logical plan"))]
BuildDfLogicalPlan {
#[snafu(source)]
error: datafusion_common::DataFusionError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to execute internal statement"))]
ExecuteInternalStatement {
#[snafu(source)]
source: query::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to create dataframe"))]
DataFrame {
#[snafu(source)]
source: query::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("General catalog error"))]
Catalog {
#[snafu(source)]
source: catalog::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to create table"))]
CreateTable {
#[snafu(source)]
source: operator::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to execute pipeline"))]
PipelineTransform {
#[snafu(source)]
source: crate::etl::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid pipeline version format: {}", version))]
InvalidPipelineVersion {
version: String,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
match self {
CastType { .. } => StatusCode::Unexpected,
PipelineTableNotFound { .. } => StatusCode::TableNotFound,
InsertPipeline { source, .. } => source.status_code(),
CollectRecords { source, .. } => source.status_code(),
PipelineNotFound { .. }
| CompilePipeline { .. }
| PipelineTransform { .. }
| InvalidPipelineVersion { .. } => StatusCode::InvalidArguments,
BuildDfLogicalPlan { .. } => StatusCode::Internal,
ExecuteInternalStatement { source, .. } => source.status_code(),
DataFrame { source, .. } => source.status_code(),
Catalog { source, .. } => source.status_code(),
CreateTable { source, .. } => source.status_code(),
}
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -41,9 +41,9 @@ use table::metadata::TableInfo;
use table::TableRef;
use crate::error::{
BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, CompilePipelineSnafu,
DataFrameSnafu, ExecuteInternalStatementSnafu, InsertPipelineSnafu,
InvalidPipelineVersionSnafu, PipelineNotFoundSnafu, Result,
BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, DataFrameSnafu,
ExecuteInternalStatementSnafu, InsertPipelineSnafu, InvalidPipelineVersionSnafu,
PipelineNotFoundSnafu, Result,
};
use crate::etl::transform::GreptimeTransformer;
use crate::etl::{parse, Content, Pipeline};
@@ -204,7 +204,7 @@ impl PipelineTable {
/// Compile a pipeline from a string.
pub fn compile_pipeline(pipeline: &str) -> Result<Pipeline<GreptimeTransformer>> {
let yaml_content = Content::Yaml(pipeline);
parse::<GreptimeTransformer>(&yaml_content).context(CompilePipelineSnafu)
parse::<GreptimeTransformer>(&yaml_content)
}
/// Insert a pipeline into the pipeline table.

View File

@@ -151,7 +151,7 @@ pub enum Error {
#[snafu(display("Failed to describe statement"))]
DescribeStatement { source: BoxedError },
#[snafu(display("Pipeline management api error"))]
#[snafu(display("Pipeline error"))]
Pipeline {
#[snafu(source)]
source: pipeline::error::Error,
@@ -159,14 +159,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Pipeline transform error"))]
PipelineTransform {
#[snafu(source)]
source: pipeline::etl_error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Not supported: {}", feat))]
NotSupported { feat: String },
@@ -661,7 +653,6 @@ impl ErrorExt for Error {
| CheckDatabaseValidity { source, .. } => source.status_code(),
Pipeline { source, .. } => source.status_code(),
PipelineTransform { source, .. } => source.status_code(),
NotSupported { .. }
| InvalidParameter { .. }

View File

@@ -154,7 +154,7 @@ impl Default for HttpOptions {
fn default() -> Self {
Self {
addr: "127.0.0.1:4000".to_string(),
timeout: Duration::from_secs(30),
timeout: Duration::from_secs(0),
disable_dashboard: false,
body_limit: DEFAULT_BODY_LIMIT,
is_strict_mode: false,
@@ -1384,7 +1384,7 @@ mod test {
fn test_http_options_default() {
let default = HttpOptions::default();
assert_eq!("127.0.0.1:4000".to_string(), default.addr);
assert_eq!(Duration::from_secs(30), default.timeout)
assert_eq!(Duration::from_secs(0), default.timeout)
}
#[tokio::test]

View File

@@ -32,7 +32,6 @@ use common_telemetry::{error, warn};
use datatypes::value::column_data_to_json;
use headers::ContentType;
use lazy_static::lazy_static;
use pipeline::error::PipelineTransformSnafu;
use pipeline::util::to_pipeline_version;
use pipeline::{GreptimePipelineParams, GreptimeTransformer, PipelineDefinition, PipelineVersion};
use serde::{Deserialize, Serialize};
@@ -284,9 +283,7 @@ async fn dryrun_pipeline_inner(
&pipeline_handler,
PipelineDefinition::Resolved(pipeline),
&params,
pipeline::json_array_to_intermediate_state(value)
.context(PipelineTransformSnafu)
.context(PipelineSnafu)?,
pipeline::json_array_to_intermediate_state(value).context(PipelineSnafu)?,
"dry_run".to_owned(),
query_ctx,
true,
@@ -636,9 +633,7 @@ pub(crate) async fn ingest_logs_inner(
&state,
PipelineDefinition::from_name(&pipeline_name, version),
&pipeline_params,
pipeline::json_array_to_intermediate_state(request.values)
.context(PipelineTransformSnafu)
.context(PipelineSnafu)?,
pipeline::json_array_to_intermediate_state(request.values).context(PipelineSnafu)?,
request.table,
&query_ctx,
true,

View File

@@ -23,7 +23,8 @@ use pipeline::{GreptimePipelineParams, SelectInfo};
use crate::http::header::constants::{
GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME, GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME,
GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME, GREPTIME_LOG_TABLE_NAME_HEADER_NAME,
GREPTIME_PIPELINE_PARAMS_HEADER, GREPTIME_TRACE_TABLE_NAME_HEADER_NAME,
GREPTIME_PIPELINE_NAME_HEADER_NAME, GREPTIME_PIPELINE_PARAMS_HEADER,
GREPTIME_PIPELINE_VERSION_HEADER_NAME, GREPTIME_TRACE_TABLE_NAME_HEADER_NAME,
};
/// Axum extractor for optional target log table name from HTTP header
@@ -38,7 +39,7 @@ where
async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
let headers = &parts.headers;
string_value_from_header(headers, GREPTIME_LOG_TABLE_NAME_HEADER_NAME).map(LogTableName)
string_value_from_header(headers, &[GREPTIME_LOG_TABLE_NAME_HEADER_NAME]).map(LogTableName)
}
}
@@ -54,7 +55,8 @@ where
async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
let headers = &parts.headers;
string_value_from_header(headers, GREPTIME_TRACE_TABLE_NAME_HEADER_NAME).map(TraceTableName)
string_value_from_header(headers, &[GREPTIME_TRACE_TABLE_NAME_HEADER_NAME])
.map(TraceTableName)
}
}
@@ -71,7 +73,7 @@ where
async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
let select =
string_value_from_header(&parts.headers, GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME)?;
string_value_from_header(&parts.headers, &[GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME])?;
match select {
Some(name) => {
@@ -102,12 +104,22 @@ where
async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
let headers = &parts.headers;
let pipeline_name =
string_value_from_header(headers, GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME)?;
let pipeline_version =
string_value_from_header(headers, GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME)?;
let pipeline_name = string_value_from_header(
headers,
&[
GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME,
GREPTIME_PIPELINE_NAME_HEADER_NAME,
],
)?;
let pipeline_version = string_value_from_header(
headers,
&[
GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME,
GREPTIME_PIPELINE_VERSION_HEADER_NAME,
],
)?;
let pipeline_parameters =
string_value_from_header(headers, GREPTIME_PIPELINE_PARAMS_HEADER)?;
string_value_from_header(headers, &[GREPTIME_PIPELINE_PARAMS_HEADER])?;
Ok(PipelineInfo {
pipeline_name,
@@ -120,17 +132,19 @@ where
#[inline]
fn string_value_from_header(
headers: &HeaderMap,
header_key: &str,
header_keys: &[&str],
) -> Result<Option<String>, (StatusCode, String)> {
headers
.get(header_key)
.map(|value| {
String::from_utf8(value.as_bytes().to_vec()).map_err(|_| {
for header_key in header_keys {
if let Some(value) = headers.get(*header_key) {
return Some(String::from_utf8(value.as_bytes().to_vec()).map_err(|_| {
(
StatusCode::BAD_REQUEST,
format!("`{}` header is not valid UTF-8 string type.", header_key),
)
})
})
.transpose()
}))
.transpose();
}
}
Ok(None)
}

View File

@@ -45,8 +45,15 @@ pub mod constants {
pub const GREPTIME_DB_HEADER_NAME: &str = "x-greptime-db-name";
pub const GREPTIME_TIMEZONE_HEADER_NAME: &str = "x-greptime-timezone";
pub const GREPTIME_DB_HEADER_ERROR_CODE: &str = common_error::GREPTIME_DB_HEADER_ERROR_CODE;
// Deprecated: pipeline is also used with trace, so we remove log from it.
pub const GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME: &str = "x-greptime-log-pipeline-name";
pub const GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME: &str = "x-greptime-log-pipeline-version";
// More generic pipeline header name
pub const GREPTIME_PIPELINE_NAME_HEADER_NAME: &str = "x-greptime-pipeline-name";
pub const GREPTIME_PIPELINE_VERSION_HEADER_NAME: &str = "x-greptime-pipeline-version";
pub const GREPTIME_LOG_TABLE_NAME_HEADER_NAME: &str = "x-greptime-log-table-name";
pub const GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME: &str = "x-greptime-log-extract-keys";
pub const GREPTIME_TRACE_TABLE_NAME_HEADER_NAME: &str = "x-greptime-trace-table-name";

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::hash_map::Entry::{Occupied, Vacant};
use std::collections::HashMap;
use std::sync::Arc;
@@ -26,8 +25,6 @@ use common_error::status_code::StatusCode;
use common_query::{Output, OutputData};
use common_recordbatch::util;
use common_telemetry::{debug, error, tracing, warn};
use datafusion_expr::{col, Expr};
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use session::context::{Channel, QueryContext};
@@ -36,6 +33,7 @@ use snafu::{OptionExt, ResultExt};
use crate::error::{
status_code_to_http_status, CollectRecordbatchSnafu, Error, InvalidJaegerQuerySnafu, Result,
};
use crate::http::extractor::TraceTableName;
use crate::http::HttpRecordsOutput;
use crate::metrics::METRIC_JAEGER_QUERY_ELAPSED;
use crate::otlp::trace::{
@@ -47,43 +45,9 @@ use crate::otlp::trace::{
};
use crate::query_handler::JaegerQueryHandlerRef;
lazy_static! {
pub static ref FIND_TRACES_COLS: Vec<Expr> = vec![
col(TRACE_ID_COLUMN),
col(TIMESTAMP_COLUMN),
col(DURATION_NANO_COLUMN),
col(SERVICE_NAME_COLUMN),
col(SPAN_NAME_COLUMN),
col(SPAN_ID_COLUMN),
col(SPAN_ATTRIBUTES_COLUMN),
col(RESOURCE_ATTRIBUTES_COLUMN),
col(PARENT_SPAN_ID_COLUMN),
col(SPAN_EVENTS_COLUMN),
col(SCOPE_NAME_COLUMN),
col(SCOPE_VERSION_COLUMN),
col(SPAN_KIND_COLUMN),
col(SPAN_STATUS_CODE),
];
static ref FIND_TRACES_SCHEMA: Vec<(&'static str, &'static str)> = vec![
(TRACE_ID_COLUMN, "String"),
(TIMESTAMP_COLUMN, "TimestampNanosecond"),
(DURATION_NANO_COLUMN, "UInt64"),
(SERVICE_NAME_COLUMN, "String"),
(SPAN_NAME_COLUMN, "String"),
(SPAN_ID_COLUMN, "String"),
(SPAN_ATTRIBUTES_COLUMN, "Json"),
(RESOURCE_ATTRIBUTES_COLUMN, "Json"),
(PARENT_SPAN_ID_COLUMN, "String"),
(SPAN_EVENTS_COLUMN, "Json"),
(SCOPE_NAME_COLUMN, "String"),
(SCOPE_VERSION_COLUMN, "String"),
(SPAN_KIND_COLUMN, "String"),
(SPAN_STATUS_CODE, "String"),
];
}
pub const JAEGER_QUERY_TABLE_NAME_KEY: &str = "jaeger_query_table_name";
const REF_TYPE_CHILD_OF: &str = "CHILD_OF";
const SPAN_KIND_TIME_FMTS: [&str; 2] = ["%Y-%m-%d %H:%M:%S%.6f%z", "%Y-%m-%d %H:%M:%S%.9f%z"];
/// JaegerAPIResponse is the response of Jaeger HTTP API.
@@ -240,9 +204,6 @@ pub enum ValueType {
#[derive(Default, Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct JaegerQueryParams {
/// Database that the trace data stored in.
pub db: Option<String>,
/// Service name of the trace.
#[serde(rename = "service")]
pub service_name: Option<String>,
@@ -275,26 +236,27 @@ pub struct JaegerQueryParams {
pub span_kind: Option<String>,
}
fn update_query_context(query_ctx: &mut QueryContext, table_name: Option<String>) {
// db should be already handled by middlewares
query_ctx.set_channel(Channel::Jaeger);
if let Some(table) = table_name {
query_ctx.set_extension(JAEGER_QUERY_TABLE_NAME_KEY, table);
}
}
impl QueryTraceParams {
fn from_jaeger_query_params(db: &str, query_params: JaegerQueryParams) -> Result<Self> {
fn from_jaeger_query_params(query_params: JaegerQueryParams) -> Result<Self> {
let mut internal_query_params: QueryTraceParams = QueryTraceParams {
db: db.to_string(),
service_name: query_params.service_name.context(InvalidJaegerQuerySnafu {
reason: "service_name is required".to_string(),
})?,
operation_name: query_params.operation_name,
// Convert start time from microseconds to nanoseconds.
start_time: query_params.start.map(|start| start * 1000),
end_time: query_params.end.map(|end| end * 1000),
..Default::default()
};
internal_query_params.service_name =
query_params.service_name.context(InvalidJaegerQuerySnafu {
reason: "service_name is required".to_string(),
})?;
internal_query_params.operation_name = query_params.operation_name;
// Convert start time from microseconds to nanoseconds.
internal_query_params.start_time = query_params.start.map(|start| start * 1000);
// Convert end time from microseconds to nanoseconds.
internal_query_params.end_time = query_params.end.map(|end| end * 1000);
if let Some(max_duration) = query_params.max_duration {
let duration = humantime::parse_duration(&max_duration).map_err(|e| {
InvalidJaegerQuerySnafu {
@@ -343,7 +305,6 @@ impl QueryTraceParams {
#[derive(Debug, Default, PartialEq)]
pub struct QueryTraceParams {
pub db: String,
pub service_name: String,
pub operation_name: Option<String>,
@@ -367,12 +328,14 @@ pub async fn handle_get_services(
State(handler): State<JaegerQueryHandlerRef>,
Query(query_params): Query<JaegerQueryParams>,
Extension(mut query_ctx): Extension<QueryContext>,
TraceTableName(table_name): TraceTableName,
) -> impl IntoResponse {
debug!(
"Received Jaeger '/api/services' request, query_params: {:?}, query_ctx: {:?}",
query_params, query_ctx
);
query_ctx.set_channel(Channel::Jaeger);
update_query_context(&mut query_ctx, table_name);
let query_ctx = Arc::new(query_ctx);
let db = query_ctx.get_db_string();
@@ -418,12 +381,14 @@ pub async fn handle_get_trace(
Path(trace_id): Path<String>,
Query(query_params): Query<JaegerQueryParams>,
Extension(mut query_ctx): Extension<QueryContext>,
TraceTableName(table_name): TraceTableName,
) -> impl IntoResponse {
debug!(
"Received Jaeger '/api/traces/{}' request, query_params: {:?}, query_ctx: {:?}",
trace_id, query_params, query_ctx
);
query_ctx.set_channel(Channel::Jaeger);
update_query_context(&mut query_ctx, table_name);
let query_ctx = Arc::new(query_ctx);
let db = query_ctx.get_db_string();
@@ -472,12 +437,14 @@ pub async fn handle_find_traces(
State(handler): State<JaegerQueryHandlerRef>,
Query(query_params): Query<JaegerQueryParams>,
Extension(mut query_ctx): Extension<QueryContext>,
TraceTableName(table_name): TraceTableName,
) -> impl IntoResponse {
debug!(
"Received Jaeger '/api/traces' request, query_params: {:?}, query_ctx: {:?}",
query_params, query_ctx
);
query_ctx.set_channel(Channel::Jaeger);
update_query_context(&mut query_ctx, table_name);
let query_ctx = Arc::new(query_ctx);
let db = query_ctx.get_db_string();
@@ -486,7 +453,7 @@ pub async fn handle_find_traces(
.with_label_values(&[&db, "/api/traces"])
.start_timer();
match QueryTraceParams::from_jaeger_query_params(&db, query_params) {
match QueryTraceParams::from_jaeger_query_params(query_params) {
Ok(query_params) => {
let output = handler.find_traces(query_ctx, query_params).await;
match output {
@@ -521,13 +488,14 @@ pub async fn handle_get_operations(
State(handler): State<JaegerQueryHandlerRef>,
Query(query_params): Query<JaegerQueryParams>,
Extension(mut query_ctx): Extension<QueryContext>,
TraceTableName(table_name): TraceTableName,
) -> impl IntoResponse {
debug!(
"Received Jaeger '/api/operations' request, query_params: {:?}, query_ctx: {:?}",
query_params, query_ctx
);
if let Some(service_name) = query_params.service_name {
query_ctx.set_channel(Channel::Jaeger);
if let Some(service_name) = &query_params.service_name {
update_query_context(&mut query_ctx, table_name);
let query_ctx = Arc::new(query_ctx);
let db = query_ctx.get_db_string();
@@ -537,7 +505,7 @@ pub async fn handle_get_operations(
.start_timer();
match handler
.get_operations(query_ctx, &service_name, query_params.span_kind.as_deref())
.get_operations(query_ctx, service_name, query_params.span_kind.as_deref())
.await
{
Ok(output) => match covert_to_records(output).await {
@@ -593,12 +561,14 @@ pub async fn handle_get_operations_by_service(
Path(service_name): Path<String>,
Query(query_params): Query<JaegerQueryParams>,
Extension(mut query_ctx): Extension<QueryContext>,
TraceTableName(table_name): TraceTableName,
) -> impl IntoResponse {
debug!(
"Received Jaeger '/api/services/{}/operations' request, query_params: {:?}, query_ctx: {:?}",
service_name, query_params, query_ctx
);
query_ctx.set_channel(Channel::Jaeger);
update_query_context(&mut query_ctx, table_name);
let query_ctx = Arc::new(query_ctx);
let db = query_ctx.get_db_string();
@@ -690,11 +660,8 @@ fn error_response(err: Error) -> (HttpStatusCode, axum::Json<JaegerAPIResponse>)
}),
)
}
// Construct Jaeger traces from records.
fn traces_from_records(records: HttpRecordsOutput) -> Result<Vec<Trace>> {
let expected_schema = FIND_TRACES_SCHEMA.clone();
check_schema(&records, &expected_schema)?;
fn traces_from_records(records: HttpRecordsOutput) -> Result<Vec<Trace>> {
// maintain the mapping: trace_id -> (process_id -> service_name).
let mut trace_id_to_processes: HashMap<String, HashMap<String, String>> = HashMap::new();
// maintain the mapping: trace_id -> spans.
@@ -702,38 +669,202 @@ fn traces_from_records(records: HttpRecordsOutput) -> Result<Vec<Trace>> {
// maintain the mapping: service.name -> resource.attributes.
let mut service_to_resource_attributes: HashMap<String, Vec<KeyValue>> = HashMap::new();
let is_span_attributes_flatten = !records
.schema
.column_schemas
.iter()
.any(|c| c.name == SPAN_ATTRIBUTES_COLUMN);
for row in records.rows.into_iter() {
let mut span = Span::default();
let mut row_iter = row.into_iter();
let mut service_name = None;
let mut resource_tags = vec![];
// Set trace id.
if let Some(JsonValue::String(trace_id)) = row_iter.next() {
span.trace_id = trace_id.clone();
trace_id_to_processes.entry(trace_id).or_default();
}
for (idx, cell) in row.into_iter().enumerate() {
// safe to use index here
let column_name = &records.schema.column_schemas[idx].name;
// Convert timestamp from nanoseconds to microseconds.
if let Some(JsonValue::Number(timestamp)) = row_iter.next() {
span.start_time = timestamp.as_u64().ok_or_else(|| {
InvalidJaegerQuerySnafu {
reason: "Failed to convert timestamp to u64".to_string(),
match column_name.as_str() {
TRACE_ID_COLUMN => {
if let JsonValue::String(trace_id) = cell {
span.trace_id = trace_id.clone();
trace_id_to_processes.entry(trace_id).or_default();
}
}
.build()
})? / 1000;
}
// Convert duration from nanoseconds to microseconds.
if let Some(JsonValue::Number(duration)) = row_iter.next() {
span.duration = duration.as_u64().ok_or_else(|| {
InvalidJaegerQuerySnafu {
reason: "Failed to convert duration to u64".to_string(),
TIMESTAMP_COLUMN => {
span.start_time = cell.as_u64().context(InvalidJaegerQuerySnafu {
reason: "Failed to convert timestamp to u64".to_string(),
})? / 1000;
}
.build()
})? / 1000;
DURATION_NANO_COLUMN => {
span.duration = cell.as_u64().context(InvalidJaegerQuerySnafu {
reason: "Failed to convert duration to u64".to_string(),
})? / 1000;
}
SERVICE_NAME_COLUMN => {
if let JsonValue::String(name) = cell {
service_name = Some(name);
}
}
SPAN_NAME_COLUMN => {
if let JsonValue::String(span_name) = cell {
span.operation_name = span_name;
}
}
SPAN_ID_COLUMN => {
if let JsonValue::String(span_id) = cell {
span.span_id = span_id;
}
}
SPAN_ATTRIBUTES_COLUMN => {
// for v0 data model, span_attributes are nested as a json
// data structure
if let JsonValue::Object(span_attrs) = cell {
span.tags.extend(object_to_tags(span_attrs));
}
}
RESOURCE_ATTRIBUTES_COLUMN => {
// for v0 data model, resource_attributes are nested as a json
// data structure
if let JsonValue::Object(mut resource_attrs) = cell {
resource_attrs.remove(KEY_SERVICE_NAME);
resource_tags = object_to_tags(resource_attrs);
}
}
PARENT_SPAN_ID_COLUMN => {
if let JsonValue::String(parent_span_id) = cell {
if !parent_span_id.is_empty() {
span.references.push(Reference {
trace_id: span.trace_id.clone(),
span_id: parent_span_id,
ref_type: REF_TYPE_CHILD_OF.to_string(),
});
}
}
}
SPAN_EVENTS_COLUMN => {
if let JsonValue::Array(events) = cell {
for event in events {
if let JsonValue::Object(mut obj) = event {
let Some(action) = obj.get("name").and_then(|v| v.as_str()) else {
continue;
};
let Some(t) =
obj.get("time").and_then(|t| t.as_str()).and_then(|s| {
SPAN_KIND_TIME_FMTS
.iter()
.find_map(|fmt| {
chrono::DateTime::parse_from_str(s, fmt).ok()
})
.map(|dt| dt.timestamp_micros() as u64)
})
else {
continue;
};
let mut fields = vec![KeyValue {
key: "event".to_string(),
value_type: ValueType::String,
value: Value::String(action.to_string()),
}];
// Add event attributes as fields
if let Some(JsonValue::Object(attrs)) = obj.remove("attributes") {
fields.extend(object_to_tags(attrs));
}
span.logs.push(Log {
timestamp: t,
fields,
});
}
}
}
}
SCOPE_NAME_COLUMN => {
if let JsonValue::String(scope_name) = cell {
if !scope_name.is_empty() {
span.tags.push(KeyValue {
key: KEY_OTEL_SCOPE_NAME.to_string(),
value_type: ValueType::String,
value: Value::String(scope_name),
});
}
}
}
SCOPE_VERSION_COLUMN => {
if let JsonValue::String(scope_version) = cell {
if !scope_version.is_empty() {
span.tags.push(KeyValue {
key: KEY_OTEL_SCOPE_VERSION.to_string(),
value_type: ValueType::String,
value: Value::String(scope_version),
});
}
}
}
SPAN_KIND_COLUMN => {
if let JsonValue::String(span_kind) = cell {
if !span_kind.is_empty() {
span.tags.push(KeyValue {
key: KEY_SPAN_KIND.to_string(),
value_type: ValueType::String,
value: Value::String(normalize_span_kind(&span_kind)),
});
}
}
}
SPAN_STATUS_CODE => {
if let JsonValue::String(span_status) = cell {
if span_status != SPAN_STATUS_UNSET && !span_status.is_empty() {
span.tags.push(KeyValue {
key: KEY_OTEL_STATUS_CODE.to_string(),
value_type: ValueType::String,
value: Value::String(normalize_status_code(&span_status)),
});
}
}
}
_ => {
// this this v1 data model
if is_span_attributes_flatten {
const SPAN_ATTR_PREFIX: &str = "span_attributes.";
const RESOURCE_ATTR_PREFIX: &str = "resource_attributes.";
// a span attributes column
if column_name.starts_with(SPAN_ATTR_PREFIX) {
if let Some(keyvalue) = to_keyvalue(
column_name
.strip_prefix(SPAN_ATTR_PREFIX)
.unwrap_or_default()
.to_string(),
cell,
) {
span.tags.push(keyvalue);
}
} else if column_name.starts_with(RESOURCE_ATTR_PREFIX) {
if let Some(keyvalue) = to_keyvalue(
column_name
.strip_prefix(RESOURCE_ATTR_PREFIX)
.unwrap_or_default()
.to_string(),
cell,
) {
resource_tags.push(keyvalue);
}
}
}
}
}
}
// Collect services to construct processes.
if let Some(JsonValue::String(service_name)) = row_iter.next() {
if let Some(service_name) = service_name {
if !service_to_resource_attributes.contains_key(&service_name) {
service_to_resource_attributes.insert(service_name.clone(), resource_tags);
}
if let Some(process) = trace_id_to_processes.get_mut(&span.trace_id) {
if let Some(process_id) = process.get(&service_name) {
span.process_id = process_id.clone();
@@ -746,127 +877,8 @@ fn traces_from_records(records: HttpRecordsOutput) -> Result<Vec<Trace>> {
}
}
// Set operation name. In Jaeger, the operation name is the span name.
if let Some(JsonValue::String(span_name)) = row_iter.next() {
span.operation_name = span_name;
}
// Set span id.
if let Some(JsonValue::String(span_id)) = row_iter.next() {
span.span_id = span_id;
}
// Convert span attributes to tags.
if let Some(JsonValue::Object(object)) = row_iter.next() {
span.tags = object_to_tags(object);
}
// Save resource attributes with service name.
if let Some(JsonValue::Object(mut object)) = row_iter.next() {
if let Some(service_name) = object
.remove(KEY_SERVICE_NAME)
.and_then(|v| v.as_str().map(|s| s.to_string()))
{
match service_to_resource_attributes.entry(service_name) {
Occupied(_) => {}
Vacant(vacant) => {
let _ = vacant.insert(object_to_tags(object));
}
}
}
}
// Set parent span id.
if let Some(JsonValue::String(parent_span_id)) = row_iter.next()
&& !parent_span_id.is_empty()
{
span.references.push(Reference {
trace_id: span.trace_id.clone(),
span_id: parent_span_id,
ref_type: REF_TYPE_CHILD_OF.to_string(),
});
}
// Set span events to logs.
if let Some(JsonValue::Array(events)) = row_iter.next() {
for event in events {
if let JsonValue::Object(mut obj) = event {
let Some(action) = obj.get("name").and_then(|v| v.as_str()) else {
continue;
};
let Some(t) = obj.get("time").and_then(|t| t.as_str()).and_then(|s| {
SPAN_KIND_TIME_FMTS
.iter()
.find_map(|fmt| chrono::DateTime::parse_from_str(s, fmt).ok())
.map(|dt| dt.timestamp_micros() as u64)
}) else {
continue;
};
let mut fields = vec![KeyValue {
key: "event".to_string(),
value_type: ValueType::String,
value: Value::String(action.to_string()),
}];
// Add event attributes as fields
if let Some(JsonValue::Object(attrs)) = obj.remove("attributes") {
fields.extend(object_to_tags(attrs));
}
span.logs.push(Log {
timestamp: t,
fields,
});
}
}
}
// Set scope name.
if let Some(JsonValue::String(scope_name)) = row_iter.next()
&& !scope_name.is_empty()
{
span.tags.push(KeyValue {
key: KEY_OTEL_SCOPE_NAME.to_string(),
value_type: ValueType::String,
value: Value::String(scope_name),
});
}
// Set scope version.
if let Some(JsonValue::String(scope_version)) = row_iter.next()
&& !scope_version.is_empty()
{
span.tags.push(KeyValue {
key: KEY_OTEL_SCOPE_VERSION.to_string(),
value_type: ValueType::String,
value: Value::String(scope_version),
});
}
// Set span kind.
if let Some(JsonValue::String(span_kind)) = row_iter.next()
&& !span_kind.is_empty()
{
span.tags.push(KeyValue {
key: KEY_SPAN_KIND.to_string(),
value_type: ValueType::String,
value: Value::String(normalize_span_kind(&span_kind)),
});
}
// Set span status code.
if let Some(JsonValue::String(span_status_code)) = row_iter.next()
&& span_status_code != SPAN_STATUS_UNSET
&& !span_status_code.is_empty()
{
span.tags.push(KeyValue {
key: KEY_OTEL_STATUS_CODE.to_string(),
value_type: ValueType::String,
value: Value::String(normalize_status_code(&span_status_code)),
});
}
// ensure span tags order
span.tags.sort_by(|a, b| a.key.cmp(&b.key));
if let Some(spans) = trace_id_to_spans.get_mut(&span.trace_id) {
spans.push(span);
@@ -899,42 +911,41 @@ fn traces_from_records(records: HttpRecordsOutput) -> Result<Vec<Trace>> {
Ok(traces)
}
#[inline]
fn to_keyvalue(key: String, value: JsonValue) -> Option<KeyValue> {
match value {
JsonValue::String(value) => Some(KeyValue {
key,
value_type: ValueType::String,
value: Value::String(value.to_string()),
}),
JsonValue::Number(value) => Some(KeyValue {
key,
value_type: ValueType::Int64,
value: Value::Int64(value.as_i64().unwrap_or(0)),
}),
JsonValue::Bool(value) => Some(KeyValue {
key,
value_type: ValueType::Boolean,
value: Value::Boolean(value),
}),
JsonValue::Array(value) => Some(KeyValue {
key,
value_type: ValueType::String,
value: Value::String(serde_json::to_string(&value).unwrap()),
}),
JsonValue::Object(value) => Some(KeyValue {
key,
value_type: ValueType::String,
value: Value::String(serde_json::to_string(&value).unwrap()),
}),
JsonValue::Null => None,
}
}
fn object_to_tags(object: serde_json::map::Map<String, JsonValue>) -> Vec<KeyValue> {
object
.into_iter()
.filter_map(|(key, value)| match value {
JsonValue::String(value) => Some(KeyValue {
key,
value_type: ValueType::String,
value: Value::String(value.to_string()),
}),
JsonValue::Number(value) => Some(KeyValue {
key,
value_type: ValueType::Int64,
value: Value::Int64(value.as_i64().unwrap_or(0)),
}),
JsonValue::Bool(value) => Some(KeyValue {
key,
value_type: ValueType::Boolean,
value: Value::Boolean(value),
}),
JsonValue::Array(value) => Some(KeyValue {
key,
value_type: ValueType::String,
value: Value::String(serde_json::to_string(&value).unwrap()),
}),
JsonValue::Object(value) => Some(KeyValue {
key,
value_type: ValueType::String,
value: Value::String(serde_json::to_string(&value).unwrap()),
}),
// FIXME(zyy17): Do we need to support other types?
_ => {
warn!("Unsupported value type: {:?}", value);
None
}
})
.filter_map(|(key, value)| to_keyvalue(key, value))
.collect()
}
@@ -1055,7 +1066,6 @@ fn convert_string_to_boolean(input: &serde_json::Value) -> Option<serde_json::Va
#[cfg(test)]
mod tests {
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use serde_json::{json, Number, Value as JsonValue};
use super::*;
@@ -1301,6 +1311,151 @@ mod tests {
}
}
#[test]
fn test_traces_from_v1_records() {
// The tests is the tuple of `(test_records, expected)`.
let tests = vec![(
HttpRecordsOutput {
schema: OutputSchema {
column_schemas: vec![
ColumnSchema {
name: "trace_id".to_string(),
data_type: "String".to_string(),
},
ColumnSchema {
name: "timestamp".to_string(),
data_type: "TimestampNanosecond".to_string(),
},
ColumnSchema {
name: "duration_nano".to_string(),
data_type: "UInt64".to_string(),
},
ColumnSchema {
name: "service_name".to_string(),
data_type: "String".to_string(),
},
ColumnSchema {
name: "span_name".to_string(),
data_type: "String".to_string(),
},
ColumnSchema {
name: "span_id".to_string(),
data_type: "String".to_string(),
},
ColumnSchema {
name: "span_attributes.http.request.method".to_string(),
data_type: "String".to_string(),
},
ColumnSchema {
name: "span_attributes.http.request.url".to_string(),
data_type: "String".to_string(),
},
ColumnSchema {
name: "span_attributes.http.status_code".to_string(),
data_type: "UInt64".to_string(),
},
],
},
rows: vec![
vec![
JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
JsonValue::Number(Number::from_u128(1738726754492422000).unwrap()),
JsonValue::Number(Number::from_u128(100000000).unwrap()),
JsonValue::String("test-service-0".to_string()),
JsonValue::String("access-mysql".to_string()),
JsonValue::String("008421dbbd33a3e9".to_string()),
JsonValue::String("GET".to_string()),
JsonValue::String("/data".to_string()),
JsonValue::Number(Number::from_u128(200).unwrap()),
],
vec![
JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
JsonValue::Number(Number::from_u128(1738726754642422000).unwrap()),
JsonValue::Number(Number::from_u128(100000000).unwrap()),
JsonValue::String("test-service-0".to_string()),
JsonValue::String("access-redis".to_string()),
JsonValue::String("ffa03416a7b9ea48".to_string()),
JsonValue::String("POST".to_string()),
JsonValue::String("/create".to_string()),
JsonValue::Number(Number::from_u128(400).unwrap()),
],
],
total_rows: 2,
metrics: HashMap::new(),
},
vec![Trace {
trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
spans: vec![
Span {
trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
span_id: "008421dbbd33a3e9".to_string(),
operation_name: "access-mysql".to_string(),
start_time: 1738726754492422,
duration: 100000,
tags: vec![
KeyValue {
key: "http.request.method".to_string(),
value_type: ValueType::String,
value: Value::String("GET".to_string()),
},
KeyValue {
key: "http.request.url".to_string(),
value_type: ValueType::String,
value: Value::String("/data".to_string()),
},
KeyValue {
key: "http.status_code".to_string(),
value_type: ValueType::Int64,
value: Value::Int64(200),
},
],
process_id: "p1".to_string(),
..Default::default()
},
Span {
trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
span_id: "ffa03416a7b9ea48".to_string(),
operation_name: "access-redis".to_string(),
start_time: 1738726754642422,
duration: 100000,
tags: vec![
KeyValue {
key: "http.request.method".to_string(),
value_type: ValueType::String,
value: Value::String("POST".to_string()),
},
KeyValue {
key: "http.request.url".to_string(),
value_type: ValueType::String,
value: Value::String("/create".to_string()),
},
KeyValue {
key: "http.status_code".to_string(),
value_type: ValueType::Int64,
value: Value::Int64(400),
},
],
process_id: "p1".to_string(),
..Default::default()
},
],
processes: HashMap::from([(
"p1".to_string(),
Process {
service_name: "test-service-0".to_string(),
tags: vec![],
},
)]),
..Default::default()
}],
)];
for (records, expected) in tests {
let traces = traces_from_records(records).unwrap();
assert_eq!(traces, expected);
}
}
#[test]
fn test_from_jaeger_query_params() {
// The tests is the tuple of `(test_query_params, expected)`.
@@ -1311,7 +1466,6 @@ mod tests {
..Default::default()
},
QueryTraceParams {
db: DEFAULT_SCHEMA_NAME.to_string(),
service_name: "test-service-0".to_string(),
..Default::default()
},
@@ -1329,7 +1483,6 @@ mod tests {
..Default::default()
},
QueryTraceParams {
db: DEFAULT_SCHEMA_NAME.to_string(),
service_name: "test-service-0".to_string(),
operation_name: Some("access-mysql".to_string()),
start_time: Some(1738726754492422000),
@@ -1349,9 +1502,7 @@ mod tests {
];
for (query_params, expected) in tests {
let query_params =
QueryTraceParams::from_jaeger_query_params(DEFAULT_SCHEMA_NAME, query_params)
.unwrap();
let query_params = QueryTraceParams::from_jaeger_query_params(query_params).unwrap();
assert_eq!(query_params, expected);
}
}

View File

@@ -47,7 +47,7 @@ use tokio::io::AsyncWrite;
use crate::error::{self, DataFrameSnafu, InvalidPrepareStatementSnafu, Result};
use crate::metrics::METRIC_AUTH_FAILURE;
use crate::mysql::helper::{
self, format_placeholder, replace_placeholders, transform_placeholders,
self, fix_placeholder_types, format_placeholder, replace_placeholders, transform_placeholders,
};
use crate::mysql::writer;
use crate::mysql::writer::{create_mysql_column, handle_err};
@@ -183,7 +183,7 @@ impl MysqlInstanceShim {
let describe_result = self
.do_describe(statement.clone(), query_ctx.clone())
.await?;
let (plan, schema) = if let Some(DescribeResult {
let (mut plan, schema) = if let Some(DescribeResult {
logical_plan,
schema,
}) = describe_result
@@ -193,7 +193,8 @@ impl MysqlInstanceShim {
(None, None)
};
let params = if let Some(plan) = &plan {
let params = if let Some(plan) = &mut plan {
fix_placeholder_types(plan)?;
prepared_params(
&plan
.get_parameter_types()
@@ -258,7 +259,8 @@ impl MysqlInstanceShim {
};
let outputs = match sql_plan.plan {
Some(plan) => {
Some(mut plan) => {
fix_placeholder_types(&mut plan)?;
let param_types = plan
.get_parameter_types()
.context(DataFrameSnafu)?
@@ -295,6 +297,10 @@ impl MysqlInstanceShim {
}
Params::CliParams(params) => params.iter().map(|x| x.to_string()).collect(),
};
debug!(
"do_execute Replacing with Params: {:?}, Original Query: {}",
param_strs, sql_plan.query
);
let query = replace_params(param_strs, sql_plan.query);
debug!("Mysql execute replaced query: {}", query);
self.do_query(&query, query_ctx.clone()).await
@@ -412,6 +418,7 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
let (params, columns) = self
.do_prepare(raw_query, query_ctx.clone(), stmt_key)
.await?;
debug!("on_prepare: Params: {:?}, Columns: {:?}", params, columns);
w.reply(stmt_id, &params, &columns).await?;
crate::metrics::METRIC_MYSQL_PREPARED_COUNT
.with_label_values(&[query_ctx.get_db_string().as_str()])
@@ -641,12 +648,13 @@ fn replace_params_with_values(
debug_assert_eq!(param_types.len(), params.len());
debug!(
"replace_params_with_values(param_types: {:#?}, params: {:#?})",
"replace_params_with_values(param_types: {:#?}, params: {:#?}, plan: {:#?})",
param_types,
params
.iter()
.map(|x| format!("({:?}, {:?})", x.value, x.coltype))
.join(", ")
.join(", "),
plan
);
let mut values = Vec::with_capacity(params.len());
@@ -672,9 +680,10 @@ fn replace_params_with_exprs(
debug_assert_eq!(param_types.len(), params.len());
debug!(
"replace_params_with_exprs(param_types: {:#?}, params: {:#?})",
"replace_params_with_exprs(param_types: {:#?}, params: {:#?}, plan: {:#?})",
param_types,
params.iter().map(|x| format!("({:?})", x)).join(", ")
params.iter().map(|x| format!("({:?})", x)).join(", "),
plan
);
let mut values = Vec::with_capacity(params.len());

View File

@@ -18,6 +18,8 @@ use std::time::Duration;
use chrono::NaiveDate;
use common_query::prelude::ScalarValue;
use common_time::Timestamp;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_expr::LogicalPlan;
use datatypes::prelude::ConcreteDataType;
use datatypes::types::TimestampType;
use datatypes::value::{self, Value};
@@ -28,7 +30,7 @@ use sql::ast::{visit_expressions_mut, Expr, Value as ValueExpr, VisitMut};
use sql::statements::sql_value_to_value;
use sql::statements::statement::Statement;
use crate::error::{self, Result};
use crate::error::{self, DataFusionSnafu, Result};
/// Returns the placeholder string "$i".
pub fn format_placeholder(i: usize) -> String {
@@ -77,6 +79,47 @@ pub fn transform_placeholders(stmt: Statement) -> Statement {
}
}
/// Give placeholder in skip and limit `int64` data type if it is not specified
///
/// because it seems datafusion will not give data type to placeholder if it's in limit/skip position, still unknown if this is a feature or a bug. And if a placeholder expr have no data type, datafusion will fail to extract it using `LogicalPlan::get_parameter_types`
pub fn fix_placeholder_types(plan: &mut LogicalPlan) -> Result<()> {
let give_placeholder_types = |mut e| {
if let datafusion_expr::Expr::Placeholder(ph) = &mut e {
if ph.data_type.is_none() {
ph.data_type = Some(arrow_schema::DataType::Int64);
Ok(Transformed::yes(e))
} else {
Ok(Transformed::no(e))
}
} else {
Ok(Transformed::no(e))
}
};
*plan = std::mem::take(plan)
.transform(|p| {
let LogicalPlan::Limit(mut limit) = p else {
return Ok(Transformed::no(p));
};
if let Some(fetch) = &mut limit.fetch {
*fetch = Box::new(
std::mem::take(fetch)
.transform(give_placeholder_types)?
.data,
);
}
if let Some(skip) = &mut limit.skip {
*skip = Box::new(std::mem::take(skip).transform(give_placeholder_types)?.data);
}
Ok(Transformed::yes(LogicalPlan::Limit(limit)))
})
.context(DataFusionSnafu)?
.data;
Ok(())
}
fn visit_placeholders<V>(v: &mut V)
where
V: VisitMut,
@@ -106,6 +149,7 @@ pub fn convert_value(param: &ParamValue, t: &ConcreteDataType) -> Result<ScalarV
ConcreteDataType::UInt64(_) => Ok(ScalarValue::UInt64(Some(i as u64))),
ConcreteDataType::Float32(_) => Ok(ScalarValue::Float32(Some(i as f32))),
ConcreteDataType::Float64(_) => Ok(ScalarValue::Float64(Some(i as f64))),
ConcreteDataType::Boolean(_) => Ok(ScalarValue::Boolean(Some(i != 0))),
ConcreteDataType::Timestamp(ts_type) => Value::Timestamp(ts_type.create_timestamp(i))
.try_to_scalar_value(t)
.context(error::ConvertScalarValueSnafu),
@@ -127,6 +171,7 @@ pub fn convert_value(param: &ParamValue, t: &ConcreteDataType) -> Result<ScalarV
ConcreteDataType::UInt64(_) => Ok(ScalarValue::UInt64(Some(u))),
ConcreteDataType::Float32(_) => Ok(ScalarValue::Float32(Some(u as f32))),
ConcreteDataType::Float64(_) => Ok(ScalarValue::Float64(Some(u as f64))),
ConcreteDataType::Boolean(_) => Ok(ScalarValue::Boolean(Some(u != 0))),
ConcreteDataType::Timestamp(ts_type) => {
Value::Timestamp(ts_type.create_timestamp(u as i64))
.try_to_scalar_value(t)

View File

@@ -32,7 +32,7 @@ use snafu::{ensure, ResultExt};
use super::trace::attributes::OtlpAnyValue;
use super::utils::{bytes_to_hex_string, key_value_to_jsonb};
use crate::error::{
IncompatibleSchemaSnafu, NotSupportedSnafu, PipelineTransformSnafu, Result,
IncompatibleSchemaSnafu, NotSupportedSnafu, PipelineSnafu, Result,
UnsupportedJsonDataTypeForTagSnafu,
};
use crate::pipeline::run_pipeline;
@@ -72,8 +72,7 @@ pub async fn to_grpc_insert_requests(
}
PipelineWay::Pipeline(pipeline_def) => {
let data = parse_export_logs_service_request(request);
let array =
pipeline::json_array_to_intermediate_state(data).context(PipelineTransformSnafu)?;
let array = pipeline::json_array_to_intermediate_state(data).context(PipelineSnafu)?;
let inserts = run_pipeline(
&pipeline_handler,

View File

@@ -87,21 +87,14 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()>
row_writer::write_tag(writer, SERVICE_NAME_COLUMN, service_name, &mut row)?;
}
// tags
let iter = vec![
(TRACE_ID_COLUMN, span.trace_id),
(SPAN_ID_COLUMN, span.span_id),
(
// write fields
let fields = vec![
make_string_column_data(TRACE_ID_COLUMN, span.trace_id),
make_string_column_data(SPAN_ID_COLUMN, span.span_id),
make_string_column_data(
PARENT_SPAN_ID_COLUMN,
span.parent_span_id.unwrap_or_default(),
),
]
.into_iter()
.map(|(col, val)| (col.to_string(), val));
row_writer::write_tags(writer, iter, &mut row)?;
// write fields
let fields = vec![
make_string_column_data(SPAN_KIND_COLUMN, span.span_kind),
make_string_column_data(SPAN_NAME_COLUMN, span.span_name),
make_string_column_data("span_status_code", span.span_status_code),

View File

@@ -95,13 +95,6 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()>
];
row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
// tags
let tags = vec![
(TRACE_ID_COLUMN.to_string(), span.trace_id),
(SPAN_ID_COLUMN.to_string(), span.span_id),
];
row_writer::write_tags(writer, tags.into_iter(), &mut row)?;
// write fields
if let Some(parent_span_id) = span.parent_span_id {
row_writer::write_fields(
@@ -115,6 +108,8 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()>
}
let fields = vec![
make_string_column_data(TRACE_ID_COLUMN, span.trace_id),
make_string_column_data(SPAN_ID_COLUMN, span.span_id),
make_string_column_data(SPAN_KIND_COLUMN, span.span_kind),
make_string_column_data(SPAN_NAME_COLUMN, span.span_name),
make_string_column_data("span_status_code", span.span_status_code),
@@ -126,9 +121,9 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()>
row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
if let Some(service_name) = span.service_name {
row_writer::write_fields(
row_writer::write_tags(
writer,
std::iter::once(make_string_column_data(SERVICE_NAME_COLUMN, service_name)),
std::iter::once((SERVICE_NAME_COLUMN.to_string(), service_name)),
&mut row,
)?;
}

View File

@@ -23,7 +23,7 @@ use pipeline::{
use session::context::QueryContextRef;
use snafu::ResultExt;
use crate::error::{CatalogSnafu, PipelineTransformSnafu, Result};
use crate::error::{CatalogSnafu, PipelineSnafu, Result};
use crate::metrics::{
METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_SUCCESS_VALUE,
};
@@ -74,7 +74,7 @@ pub(crate) async fn run_pipeline(
table_name,
}]
})
.context(PipelineTransformSnafu)
.context(PipelineSnafu)
} else {
let pipeline = get_pipeline(pipeline_definition, state, query_ctx).await?;
@@ -91,7 +91,7 @@ pub(crate) async fn run_pipeline(
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());
})
.context(PipelineTransformSnafu)?;
.context(PipelineSnafu)?;
match r {
PipelineExecOutput::Transformed(row) => {

View File

@@ -32,6 +32,7 @@ use std::sync::Arc;
use api::prom_store::remote::ReadRequest;
use api::v1::RowInsertRequests;
use async_trait::async_trait;
use catalog::CatalogManager;
use common_query::Output;
use headers::HeaderValue;
use log_query::LogQuery;
@@ -172,7 +173,11 @@ pub trait PipelineHandler {
/// Handle log query requests.
#[async_trait]
pub trait LogQueryHandler {
/// Execute a log query.
async fn query(&self, query: LogQuery, ctx: QueryContextRef) -> Result<Output>;
/// Get catalog manager.
fn catalog_manager(&self, ctx: &QueryContext) -> Result<&dyn CatalogManager>;
}
/// Handle Jaeger query requests.

View File

@@ -386,7 +386,7 @@ async fn test_config() {
[http]
addr = "127.0.0.1:4000"
timeout = "30s"
timeout = "0s"
body_limit = "2GB"
[logging]

View File

@@ -43,6 +43,9 @@ pub const FILE_TABLE_LOCATION_KEY: &str = "location";
pub const FILE_TABLE_PATTERN_KEY: &str = "pattern";
pub const FILE_TABLE_FORMAT_KEY: &str = "format";
pub const TABLE_DATA_MODEL: &str = "table_data_model";
pub const TABLE_DATA_MODEL_TRACE_V1: &str = "greptime_trace_v1";
/// Returns true if the `key` is a valid key for any engine or storage.
pub fn validate_table_option(key: &str) -> bool {
if is_supported_in_s3(key) {
@@ -70,6 +73,8 @@ pub fn validate_table_option(key: &str) -> bool {
// metric engine keys:
PHYSICAL_TABLE_METADATA_KEY,
LOGICAL_TABLE_METADATA_KEY,
// table model info
TABLE_DATA_MODEL,
]
.contains(&key)
}

View File

@@ -53,12 +53,7 @@ serde_yaml = "0.9"
snafu = { workspace = true }
sql = { workspace = true }
sqlparser.workspace = true
sqlx = { version = "0.8", features = [
"runtime-tokio-rustls",
"mysql",
"postgres",
"chrono",
] }
sqlx.workspace = true
store-api = { workspace = true }
strum.workspace = true
tinytemplate = "1.2"

View File

@@ -37,16 +37,16 @@ common-telemetry.workspace = true
common-test-util.workspace = true
common-time.workspace = true
common-wal.workspace = true
datanode.workspace = true
datanode = { workspace = true }
datatypes.workspace = true
dotenv.workspace = true
flate2.workspace = true
flate2 = "1.0"
flow.workspace = true
frontend = { workspace = true, features = ["testing"] }
futures.workspace = true
futures-util.workspace = true
hyper-util = { workspace = true, features = ["tokio"] }
log-query.workspace = true
log-query = { workspace = true }
loki-proto.workspace = true
meta-client.workspace = true
meta-srv = { workspace = true, features = ["mock"] }
@@ -92,9 +92,10 @@ itertools.workspace = true
opentelemetry-proto.workspace = true
partition.workspace = true
paste.workspace = true
pipeline.workspace = true
prost.workspace = true
rand.workspace = true
session = { workspace = true, features = ["testing"] }
store-api.workspace = true
tokio-postgres.workspace = true
tokio-postgres = { workspace = true }
url = "2.3"

View File

@@ -28,6 +28,7 @@ use loki_proto::prost_types::Timestamp;
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use pipeline::GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME;
use prost::Message;
use serde_json::{json, Value};
use servers::http::handler::HealthResponse;
@@ -105,6 +106,7 @@ macro_rules! http_tests {
test_elasticsearch_logs_with_index,
test_log_query,
test_jaeger_query_api,
test_jaeger_query_api_for_trace_v1,
);
)*
};
@@ -941,7 +943,7 @@ init_regions_parallelism = 16
[http]
addr = "127.0.0.1:4000"
timeout = "30s"
timeout = "0s"
body_limit = "64MiB"
is_strict_mode = false
cors_allowed_origins = []
@@ -2114,7 +2116,7 @@ pub async fn test_otlp_traces_v0(store_type: StorageType) {
assert_eq!(StatusCode::OK, res.status());
// select traces data
let expected = r#"[[1736480942444376000,1736480942444499000,123000,"telemetrygen","c05d7a4ec8e1f231f02ed6e8da8655b4","9630f2916e2f7909","d24f921c75f68e23","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-client"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}],[1736480942444376000,1736480942444499000,123000,"telemetrygen","c05d7a4ec8e1f231f02ed6e8da8655b4","d24f921c75f68e23","","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-server"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}],[1736480942444589000,1736480942444712000,123000,"telemetrygen","cc9e0991a2e63d274984bd44ee669203","8f847259b0f6e1ab","eba7be77e3558179","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-client"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}],[1736480942444589000,1736480942444712000,123000,"telemetrygen","cc9e0991a2e63d274984bd44ee669203","eba7be77e3558179","","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-server"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}]]"#;
let expected = r#"[[1736480942444376000,1736480942444499000,123000,"telemetrygen","c05d7a4ec8e1f231f02ed6e8da8655b4","d24f921c75f68e23","","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-server"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}],[1736480942444376000,1736480942444499000,123000,"telemetrygen","c05d7a4ec8e1f231f02ed6e8da8655b4","9630f2916e2f7909","d24f921c75f68e23","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-client"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}],[1736480942444589000,1736480942444712000,123000,"telemetrygen","cc9e0991a2e63d274984bd44ee669203","eba7be77e3558179","","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-server"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}],[1736480942444589000,1736480942444712000,123000,"telemetrygen","cc9e0991a2e63d274984bd44ee669203","8f847259b0f6e1ab","eba7be77e3558179","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-client"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}]]"#;
validate_data(
"otlp_traces",
&client,
@@ -2160,7 +2162,6 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
// init
common_telemetry::init_default_ut_logging();
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_otlp_traces").await;
const TRACE_V1: &str = "greptime_trace_v1";
let content = r#"
{"resourceSpans":[{"resource":{"attributes":[{"key":"service.name","value":{"stringValue":"telemetrygen"}}],"droppedAttributesCount":0},"scopeSpans":[{"scope":{"name":"telemetrygen","version":"","attributes":[],"droppedAttributesCount":0},"spans":[{"traceId":"c05d7a4ec8e1f231f02ed6e8da8655b4","spanId":"9630f2916e2f7909","traceState":"","parentSpanId":"d24f921c75f68e23","flags":256,"name":"okey-dokey-0","kind":2,"startTimeUnixNano":"1736480942444376000","endTimeUnixNano":"1736480942444499000","attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-client"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}},{"traceId":"c05d7a4ec8e1f231f02ed6e8da8655b4","spanId":"d24f921c75f68e23","traceState":"","parentSpanId":"","flags":256,"name":"lets-go","kind":3,"startTimeUnixNano":"1736480942444376000","endTimeUnixNano":"1736480942444499000","attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-server"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}},{"traceId":"cc9e0991a2e63d274984bd44ee669203","spanId":"8f847259b0f6e1ab","traceState":"","parentSpanId":"eba7be77e3558179","flags":256,"name":"okey-dokey-0","kind":2,"startTimeUnixNano":"1736480942444589000","endTimeUnixNano":"1736480942444712000","attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-client"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}},{"traceId":"cc9e0991a2e63d274984bd44ee669203","spanId":"eba7be77e3558179","traceState":"","parentSpanId":"","flags":256,"name":"lets-go","kind":3,"startTimeUnixNano":"1736480942444589000","endTimeUnixNano":"1736480942444712000","attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-server"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}}],"schemaUrl":""}],"schemaUrl":"https://opentelemetry.io/schemas/1.4.0"}]}
@@ -2181,8 +2182,8 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
HeaderValue::from_static("application/x-protobuf"),
),
(
HeaderName::from_static("x-greptime-log-pipeline-name"),
HeaderValue::from_static(TRACE_V1),
HeaderName::from_static("x-greptime-pipeline-name"),
HeaderValue::from_static(GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME),
),
(
HeaderName::from_static("x-greptime-trace-table-name"),
@@ -2197,10 +2198,10 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
assert_eq!(StatusCode::OK, res.status());
// select traces data
let expected = r#"[[1736480942444376000,1736480942444499000,123000,"c05d7a4ec8e1f231f02ed6e8da8655b4","9630f2916e2f7909","d24f921c75f68e23","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]],[1736480942444376000,1736480942444499000,123000,"c05d7a4ec8e1f231f02ed6e8da8655b4","d24f921c75f68e23",null,"SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444589000,1736480942444712000,123000,"cc9e0991a2e63d274984bd44ee669203","8f847259b0f6e1ab","eba7be77e3558179","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]],[1736480942444589000,1736480942444712000,123000,"cc9e0991a2e63d274984bd44ee669203","eba7be77e3558179",null,"SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]]]"#;
let expected = r#"[[1736480942444376000,1736480942444499000,123000,null,"c05d7a4ec8e1f231f02ed6e8da8655b4","d24f921c75f68e23","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444376000,1736480942444499000,123000,"d24f921c75f68e23","c05d7a4ec8e1f231f02ed6e8da8655b4","9630f2916e2f7909","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]],[1736480942444589000,1736480942444712000,123000,null,"cc9e0991a2e63d274984bd44ee669203","eba7be77e3558179","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444589000,1736480942444712000,123000,"eba7be77e3558179","cc9e0991a2e63d274984bd44ee669203","8f847259b0f6e1ab","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]]]"#;
validate_data("otlp_traces", &client, "select * from mytable;", expected).await;
let expected_ddl = r#"[["mytable","CREATE TABLE IF NOT EXISTS \"mytable\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL,\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"trace_id\", \"span_id\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '1',\n trace_id >= '1' AND trace_id < '2',\n trace_id >= '2' AND trace_id < '3',\n trace_id >= '3' AND trace_id < '4',\n trace_id >= '4' AND trace_id < '5',\n trace_id >= '5' AND trace_id < '6',\n trace_id >= '6' AND trace_id < '7',\n trace_id >= '7' AND trace_id < '8',\n trace_id >= '8' AND trace_id < '9',\n trace_id >= '9' AND trace_id < 'A',\n trace_id >= 'A' AND trace_id < 'B' OR trace_id >= 'a' AND trace_id < 'b',\n trace_id >= 'B' AND trace_id < 'C' OR trace_id >= 'b' AND trace_id < 'c',\n trace_id >= 'C' AND trace_id < 'D' OR trace_id >= 'c' AND trace_id < 'd',\n trace_id >= 'D' AND trace_id < 'E' OR trace_id >= 'd' AND trace_id < 'e',\n trace_id >= 'E' AND trace_id < 'F' OR trace_id >= 'e' AND trace_id < 'f',\n trace_id >= 'F' AND trace_id < 'a' OR trace_id >= 'f'\n)\nENGINE=mito\nWITH(\n append_mode = 'true'\n)"]]"#;
let expected_ddl = r#"[["mytable","CREATE TABLE IF NOT EXISTS \"mytable\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '1',\n trace_id >= '1' AND trace_id < '2',\n trace_id >= '2' AND trace_id < '3',\n trace_id >= '3' AND trace_id < '4',\n trace_id >= '4' AND trace_id < '5',\n trace_id >= '5' AND trace_id < '6',\n trace_id >= '6' AND trace_id < '7',\n trace_id >= '7' AND trace_id < '8',\n trace_id >= '8' AND trace_id < '9',\n trace_id >= '9' AND trace_id < 'A',\n trace_id >= 'A' AND trace_id < 'B' OR trace_id >= 'a' AND trace_id < 'b',\n trace_id >= 'B' AND trace_id < 'C' OR trace_id >= 'b' AND trace_id < 'c',\n trace_id >= 'C' AND trace_id < 'D' OR trace_id >= 'c' AND trace_id < 'd',\n trace_id >= 'D' AND trace_id < 'E' OR trace_id >= 'd' AND trace_id < 'e',\n trace_id >= 'E' AND trace_id < 'F' OR trace_id >= 'e' AND trace_id < 'f',\n trace_id >= 'F' AND trace_id < 'a' OR trace_id >= 'f'\n)\nENGINE=mito\nWITH(\n append_mode = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#;
validate_data(
"otlp_traces",
&client,
@@ -2222,8 +2223,8 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
HeaderValue::from_static("application/x-protobuf"),
),
(
HeaderName::from_static("x-greptime-log-pipeline-name"),
HeaderValue::from_static(TRACE_V1),
HeaderName::from_static("x-greptime-pipeline-name"),
HeaderValue::from_static(GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME),
),
(
HeaderName::from_static("x-greptime-trace-table-name"),
@@ -2729,8 +2730,8 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
"spanId": "008421dbbd33a3e9",
"name": "access-mysql",
"kind": 2,
"startTimeUnixNano": "1738726754492422000",
"endTimeUnixNano": "1738726754592422000",
"startTimeUnixNano": "1738726754492421000",
"endTimeUnixNano": "1738726754592421000",
"attributes": [
{
"key": "operation.type",
@@ -2807,6 +2808,7 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
let req: ExportTraceServiceRequest = serde_json::from_str(content).unwrap();
let body = req.encode_to_vec();
// write traces data.
let res = send_req(
&client,
@@ -2906,7 +2908,7 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
"spanID": "008421dbbd33a3e9",
"operationName": "access-mysql",
"references": [],
"startTime": 1738726754492422,
"startTime": 1738726754492421,
"duration": 100000,
"tags": [
{
@@ -2919,11 +2921,6 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
"type": "string",
"value": "access-mysql"
},
{
"key": "peer.service",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "otel.scope.name",
"type": "string",
@@ -2934,6 +2931,11 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
"type": "string",
"value": "1.0.0"
},
{
"key": "peer.service",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "span.kind",
"type": "string",
@@ -2961,11 +2963,6 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
"type": "string",
"value": "access-redis"
},
{
"key": "peer.service",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "otel.scope.name",
"type": "string",
@@ -2976,6 +2973,11 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
"type": "string",
"value": "1.0.0"
},
{
"key": "peer.service",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "span.kind",
"type": "string",
@@ -3008,7 +3010,7 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
// Test `/api/traces` API.
let res = client
.get("/v1/jaeger/api/traces?service=test-jaeger-query-api&operation=access-mysql&start=1738726754492422&end=1738726754642422&tags=%7B%22operation.type%22%3A%22access-mysql%22%7D")
.get("/v1/jaeger/api/traces?service=test-jaeger-query-api&operation=access-mysql&start=1738726754492421&end=1738726754642422&tags=%7B%22operation.type%22%3A%22access-mysql%22%7D")
.send()
.await;
assert_eq!(StatusCode::OK, res.status());
@@ -3023,7 +3025,7 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
"spanID": "008421dbbd33a3e9",
"operationName": "access-mysql",
"references": [],
"startTime": 1738726754492422,
"startTime": 1738726754492421,
"duration": 100000,
"tags": [
{
@@ -3036,11 +3038,6 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
"type": "string",
"value": "access-mysql"
},
{
"key": "peer.service",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "otel.scope.name",
"type": "string",
@@ -3051,6 +3048,11 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
"type": "string",
"value": "1.0.0"
},
{
"key": "peer.service",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "span.kind",
"type": "string",
@@ -3084,6 +3086,429 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
guard.remove_all().await;
}
pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =
setup_test_http_app_with_frontend(store_type, "test_jaeger_query_api_v1").await;
let client = TestClient::new(app).await;
// Test empty response for `/api/services` API before writing any traces.
let res = client
.get("/v1/jaeger/api/services")
.header("x-greptime-trace-table-name", "mytable")
.send()
.await;
assert_eq!(StatusCode::OK, res.status());
let expected = r#"
{
"data": null,
"total": 0,
"limit": 0,
"offset": 0,
"errors": []
}
"#;
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
let expected: Value = serde_json::from_str(expected).unwrap();
assert_eq!(resp, expected);
let content = r#"
{
"resourceSpans": [
{
"resource": {
"attributes": [
{
"key": "service.name",
"value": {
"stringValue": "test-jaeger-query-api"
}
}
]
},
"scopeSpans": [
{
"scope": {
"name": "test-jaeger-query-api",
"version": "1.0.0"
},
"spans": [
{
"traceId": "5611dce1bc9ebed65352d99a027b08ea",
"spanId": "008421dbbd33a3e9",
"name": "access-mysql",
"kind": 2,
"startTimeUnixNano": "1738726754492421000",
"endTimeUnixNano": "1738726754592421000",
"attributes": [
{
"key": "operation.type",
"value": {
"stringValue": "access-mysql"
}
},
{
"key": "net.peer.ip",
"value": {
"stringValue": "1.2.3.4"
}
},
{
"key": "peer.service",
"value": {
"stringValue": "test-jaeger-query-api"
}
}
],
"status": {
"message": "success",
"code": 0
}
}
]
},
{
"scope": {
"name": "test-jaeger-query-api",
"version": "1.0.0"
},
"spans": [
{
"traceId": "5611dce1bc9ebed65352d99a027b08ea",
"spanId": "ffa03416a7b9ea48",
"name": "access-redis",
"kind": 2,
"startTimeUnixNano": "1738726754492422000",
"endTimeUnixNano": "1738726754592422000",
"attributes": [
{
"key": "operation.type",
"value": {
"stringValue": "access-redis"
}
},
{
"key": "net.peer.ip",
"value": {
"stringValue": "1.2.3.4"
}
},
{
"key": "peer.service",
"value": {
"stringValue": "test-jaeger-query-api"
}
}
],
"status": {
"message": "success",
"code": 0
}
}
]
}
],
"schemaUrl": "https://opentelemetry.io/schemas/1.4.0"
}
]
}
"#;
let req: ExportTraceServiceRequest = serde_json::from_str(content).unwrap();
let body = req.encode_to_vec();
// write traces data.
let res = send_req(
&client,
vec![
(
HeaderName::from_static("content-type"),
HeaderValue::from_static("application/x-protobuf"),
),
(
HeaderName::from_static("x-greptime-log-pipeline-name"),
HeaderValue::from_static(GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME),
),
(
HeaderName::from_static("x-greptime-trace-table-name"),
HeaderValue::from_static("mytable"),
),
],
"/v1/otlp/v1/traces",
body.clone(),
false,
)
.await;
assert_eq!(StatusCode::OK, res.status());
// Test `/api/services` API.
let res = client
.get("/v1/jaeger/api/services")
.header("x-greptime-trace-table-name", "mytable")
.send()
.await;
assert_eq!(StatusCode::OK, res.status());
let expected = r#"
{
"data": [
"test-jaeger-query-api"
],
"total": 1,
"limit": 0,
"offset": 0,
"errors": []
}
"#;
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
let expected: Value = serde_json::from_str(expected).unwrap();
assert_eq!(resp, expected);
// Test `/api/operations` API.
let res = client
.get("/v1/jaeger/api/operations?service=test-jaeger-query-api")
.header("x-greptime-trace-table-name", "mytable")
.send()
.await;
assert_eq!(StatusCode::OK, res.status());
let expected = r#"
{
"data": [
{
"name": "access-mysql",
"spanKind": "server"
},
{
"name": "access-redis",
"spanKind": "server"
}
],
"total": 2,
"limit": 0,
"offset": 0,
"errors": []
}
"#;
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
let expected: Value = serde_json::from_str(expected).unwrap();
assert_eq!(resp, expected);
// Test `/api/services/{service_name}/operations` API.
let res = client
.get("/v1/jaeger/api/services/test-jaeger-query-api/operations")
.header("x-greptime-trace-table-name", "mytable")
.send()
.await;
assert_eq!(StatusCode::OK, res.status());
let expected = r#"
{
"data": [
"access-mysql",
"access-redis"
],
"total": 2,
"limit": 0,
"offset": 0,
"errors": []
}
"#;
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
let expected: Value = serde_json::from_str(expected).unwrap();
assert_eq!(resp, expected);
// Test `/api/traces/{trace_id}` API.
let res = client
.get("/v1/jaeger/api/traces/5611dce1bc9ebed65352d99a027b08ea")
.header("x-greptime-trace-table-name", "mytable")
.send()
.await;
assert_eq!(StatusCode::OK, res.status());
let expected = r#"{
"data": [
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spans": [
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spanID": "008421dbbd33a3e9",
"operationName": "access-mysql",
"references": [],
"startTime": 1738726754492421,
"duration": 100000,
"tags": [
{
"key": "net.peer.ip",
"type": "string",
"value": "1.2.3.4"
},
{
"key": "operation.type",
"type": "string",
"value": "access-mysql"
},
{
"key": "otel.scope.name",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "otel.scope.version",
"type": "string",
"value": "1.0.0"
},
{
"key": "peer.service",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "span.kind",
"type": "string",
"value": "server"
}
],
"logs": [],
"processID": "p1"
},
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spanID": "ffa03416a7b9ea48",
"operationName": "access-redis",
"references": [],
"startTime": 1738726754492422,
"duration": 100000,
"tags": [
{
"key": "net.peer.ip",
"type": "string",
"value": "1.2.3.4"
},
{
"key": "operation.type",
"type": "string",
"value": "access-redis"
},
{
"key": "otel.scope.name",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "otel.scope.version",
"type": "string",
"value": "1.0.0"
},
{
"key": "peer.service",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "span.kind",
"type": "string",
"value": "server"
}
],
"logs": [],
"processID": "p1"
}
],
"processes": {
"p1": {
"serviceName": "test-jaeger-query-api",
"tags": []
}
}
}
],
"total": 0,
"limit": 0,
"offset": 0,
"errors": []
}
"#;
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
let expected: Value = serde_json::from_str(expected).unwrap();
assert_eq!(resp, expected);
// Test `/api/traces` API.
let res = client
.get("/v1/jaeger/api/traces?service=test-jaeger-query-api&operation=access-mysql&start=1738726754492421&end=1738726754642422&tags=%7B%22operation.type%22%3A%22access-mysql%22%7D")
.header("x-greptime-trace-table-name", "mytable")
.send()
.await;
assert_eq!(StatusCode::OK, res.status());
let expected = r#"
{
"data": [
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spans": [
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spanID": "008421dbbd33a3e9",
"operationName": "access-mysql",
"references": [],
"startTime": 1738726754492421,
"duration": 100000,
"tags": [
{
"key": "net.peer.ip",
"type": "string",
"value": "1.2.3.4"
},
{
"key": "operation.type",
"type": "string",
"value": "access-mysql"
},
{
"key": "otel.scope.name",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "otel.scope.version",
"type": "string",
"value": "1.0.0"
},
{
"key": "peer.service",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "span.kind",
"type": "string",
"value": "server"
}
],
"logs": [],
"processID": "p1"
}
],
"processes": {
"p1": {
"serviceName": "test-jaeger-query-api",
"tags": []
}
}
}
],
"total": 0,
"limit": 0,
"offset": 0,
"errors": []
}
"#;
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
let expected: Value = serde_json::from_str(expected).unwrap();
assert_eq!(resp, expected);
guard.remove_all().await;
}
async fn validate_data(test_name: &str, client: &TestClient, sql: &str, expected: &str) {
let res = client
.get(format!("/v1/sql?sql={sql}").as_str())

View File

@@ -326,6 +326,31 @@ FROM (
| [7.0, 8.0, 9.0, 10.0] | 4 |
+-----------------------+------------+
SELECT vec_kth_elem('[1.0, 2.0, 3.0]', 2);
+------------------------------------------------+
| vec_kth_elem(Utf8("[1.0, 2.0, 3.0]"),Int64(2)) |
+------------------------------------------------+
| 3.0 |
+------------------------------------------------+
SELECT v, vec_kth_elem(v, 0) AS first_elem
FROM (
SELECT '[1.0, 2.0, 3.0]' AS v
UNION ALL
SELECT '[4.0, 5.0, 6.0, 7.0]' AS v
UNION ALL
SELECT '[8.0]' AS v
)
WHERE vec_kth_elem(v, 0) > 2.0;
+----------------------+------------+
| v | first_elem |
+----------------------+------------+
| [4.0, 5.0, 6.0, 7.0] | 4.0 |
| [8.0] | 8.0 |
+----------------------+------------+
SELECT vec_to_string(vec_subvector('[1.0,2.0,3.0,4.0,5.0]', 0, 3));
+-------------------------------------------------------------------------------+

View File

@@ -100,6 +100,18 @@ FROM (
SELECT '[7.0, 8.0, 9.0, 10.0]' AS v
) Order By vec_dim(v) ASC;
SELECT vec_kth_elem('[1.0, 2.0, 3.0]', 2);
SELECT v, vec_kth_elem(v, 0) AS first_elem
FROM (
SELECT '[1.0, 2.0, 3.0]' AS v
UNION ALL
SELECT '[4.0, 5.0, 6.0, 7.0]' AS v
UNION ALL
SELECT '[8.0]' AS v
)
WHERE vec_kth_elem(v, 0) > 2.0;
SELECT vec_to_string(vec_subvector('[1.0,2.0,3.0,4.0,5.0]', 0, 3));
SELECT vec_to_string(vec_subvector('[1.0,2.0,3.0,4.0,5.0]', 5, 5));
@@ -121,4 +133,3 @@ FROM (
UNION ALL
SELECT '[4.0, 5.0, 6.0, 10, -8, 100]' AS v
) ORDER BY v;

Some files were not shown because too many files have changed in this diff Show More