feat: implement otel-arrow protocol for GreptimeDB (#5840)

* [wip]: implement arrow service

* add service

* feat/otel-arrow:
 ### Add OpenTelemetry Arrow Support

 - **`Cargo.toml`, `Cargo.lock`**: Updated `otel-arrow-rust` dependency to use a local path and added `arrow-ipc` as a dependency.
 - **`src/servers/src/grpc.rs`, `src/servers/src/grpc/builder.rs`**: Integrated `ArrowMetricsServiceServer` with gRPC server, including support for custom header interception and message compression.
 - **`src/servers/src/otel_arrow.rs`**: Implemented `OtelArrowServiceHandler` for handling OpenTelemetry Arrow metrics and added `HeaderInterceptor` for custom header handling.

* feat/otel-arrow:
 Add error handling for OpenTelemetry Arrow requests

 - **`src/error.rs`**: Introduced a new error variant `HandleOtelArrowRequest` to handle failures in processing OpenTelemetry Arrow requests.
 - **`src/otel_arrow.rs`**: Implemented error handling for receiving and consuming batches from the OpenTelemetry Arrow client. Added logging for errors and updated the response status accordingly.

* feat/otel-arrow:
 Remove `otel_arrow` Module from gRPC Server

 - Deleted the `otel_arrow` module from the gRPC server implementation.
 - Removed the `otel_arrow` module import from `grpc.rs`.
 - Deleted the `otel_arrow.rs` file, which contained the `OtelArrowServer` struct and its implementation.

* feat/otel-arrow:
 ## Remove `Arc` Implementations for Protocol and Pipeline Handlers

 - **Removed `Arc` Implementations**: Deleted `Arc` implementations for `OpenTelemetryProtocolHandler` and `PipelineHandler` traits in `query_handler.rs`. This change simplifies the code by removing redundant async trait implementations for `Arc<T>`.
 - **File Affected**: `src/servers/src/query_handler.rs`

* feat/otel-arrow:
 Improve error handling and metadata processing in `otel_arrow.rs`

 - Updated error handling by ignoring the result of `sender.send` to prevent panic on failure.
 - Enhanced metadata processing in `HeaderInterceptor` by using `Ok` to safely handle `grpc-encoding` entry retrieval.

* fix dependency

* feat/otel-arrow:
 - **Update Dependencies**:
   - Moved `otel-arrow-rust` dependency in `Cargo.toml`.
   - Adjusted workspace dependencies in `src/frontend/Cargo.toml`.

 - **Error Handling**:
   - Removed `MissingQueryContext` error variant from `src/servers/src/error.rs`.

* fix: toml format

* remove useless code

* chore: resolve conflicts
This commit is contained in:
Lei, HUANG
2025-04-21 15:24:23 +08:00
committed by GitHub
parent 56f319a707
commit 90ffaa8a62
11 changed files with 566 additions and 229 deletions

504
Cargo.lock generated
View File

@@ -266,25 +266,61 @@ version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
[[package]]
name = "arrow"
version = "53.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3a3ec4fe573f9d1f59d99c085197ef669b00b088ba1d7bb75224732d9357a74"
dependencies = [
"arrow-arith 53.4.1",
"arrow-array 53.4.1",
"arrow-buffer 53.4.1",
"arrow-cast 53.4.1",
"arrow-csv 53.4.1",
"arrow-data 53.4.1",
"arrow-ipc 53.4.1",
"arrow-json 53.4.1",
"arrow-ord 53.4.1",
"arrow-row 53.4.1",
"arrow-schema 53.4.1",
"arrow-select 53.4.1",
"arrow-string 53.4.1",
]
[[package]]
name = "arrow"
version = "54.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc208515aa0151028e464cc94a692156e945ce5126abd3537bb7fd6ba2143ed1"
dependencies = [
"arrow-arith",
"arrow-array",
"arrow-buffer",
"arrow-cast",
"arrow-csv",
"arrow-data",
"arrow-ipc",
"arrow-json",
"arrow-ord",
"arrow-row",
"arrow-schema",
"arrow-select",
"arrow-string",
"arrow-arith 54.2.1",
"arrow-array 54.2.1",
"arrow-buffer 54.3.1",
"arrow-cast 54.2.1",
"arrow-csv 54.2.1",
"arrow-data 54.3.1",
"arrow-ipc 54.2.1",
"arrow-json 54.2.1",
"arrow-ord 54.2.1",
"arrow-row 54.2.1",
"arrow-schema 54.3.1",
"arrow-select 54.2.1",
"arrow-string 54.2.1",
]
[[package]]
name = "arrow-arith"
version = "53.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6dcf19f07792d8c7f91086c67b574a79301e367029b17fcf63fb854332246a10"
dependencies = [
"arrow-array 53.4.1",
"arrow-buffer 53.4.1",
"arrow-data 53.4.1",
"arrow-schema 53.4.1",
"chrono",
"half",
"num",
]
[[package]]
@@ -293,14 +329,30 @@ version = "54.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e07e726e2b3f7816a85c6a45b6ec118eeeabf0b2a8c208122ad949437181f49a"
dependencies = [
"arrow-array",
"arrow-buffer",
"arrow-data",
"arrow-schema",
"arrow-array 54.2.1",
"arrow-buffer 54.3.1",
"arrow-data 54.3.1",
"arrow-schema 54.3.1",
"chrono",
"num",
]
[[package]]
name = "arrow-array"
version = "53.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7845c32b41f7053e37a075b3c2f29c6f5ea1b3ca6e5df7a2d325ee6e1b4a63cf"
dependencies = [
"ahash 0.8.11",
"arrow-buffer 53.4.1",
"arrow-data 53.4.1",
"arrow-schema 53.4.1",
"chrono",
"half",
"hashbrown 0.15.2",
"num",
]
[[package]]
name = "arrow-array"
version = "54.2.1"
@@ -308,9 +360,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2262eba4f16c78496adfd559a29fe4b24df6088efc9985a873d58e92be022d5"
dependencies = [
"ahash 0.8.11",
"arrow-buffer",
"arrow-data",
"arrow-schema",
"arrow-buffer 54.3.1",
"arrow-data 54.3.1",
"arrow-schema 54.3.1",
"chrono",
"chrono-tz",
"half",
@@ -318,6 +370,17 @@ dependencies = [
"num",
]
[[package]]
name = "arrow-buffer"
version = "53.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b5c681a99606f3316f2a99d9c8b6fa3aad0b1d34d8f6d7a1b471893940219d8"
dependencies = [
"bytes",
"half",
"num",
]
[[package]]
name = "arrow-buffer"
version = "54.3.1"
@@ -329,17 +392,37 @@ dependencies = [
"num",
]
[[package]]
name = "arrow-cast"
version = "53.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6365f8527d4f87b133eeb862f9b8093c009d41a210b8f101f91aa2392f61daac"
dependencies = [
"arrow-array 53.4.1",
"arrow-buffer 53.4.1",
"arrow-data 53.4.1",
"arrow-schema 53.4.1",
"arrow-select 53.4.1",
"atoi",
"base64 0.22.1",
"chrono",
"half",
"lexical-core",
"num",
"ryu",
]
[[package]]
name = "arrow-cast"
version = "54.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4103d88c5b441525ed4ac23153be7458494c2b0c9a11115848fdb9b81f6f886a"
dependencies = [
"arrow-array",
"arrow-buffer",
"arrow-data",
"arrow-schema",
"arrow-select",
"arrow-array 54.2.1",
"arrow-buffer 54.3.1",
"arrow-data 54.3.1",
"arrow-schema 54.3.1",
"arrow-select 54.2.1",
"atoi",
"base64 0.22.1",
"chrono",
@@ -350,15 +433,34 @@ dependencies = [
"ryu",
]
[[package]]
name = "arrow-csv"
version = "53.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30dac4d23ac769300349197b845e0fd18c7f9f15d260d4659ae6b5a9ca06f586"
dependencies = [
"arrow-array 53.4.1",
"arrow-buffer 53.4.1",
"arrow-cast 53.4.1",
"arrow-data 53.4.1",
"arrow-schema 53.4.1",
"chrono",
"csv",
"csv-core",
"lazy_static",
"lexical-core",
"regex",
]
[[package]]
name = "arrow-csv"
version = "54.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43d3cb0914486a3cae19a5cad2598e44e225d53157926d0ada03c20521191a65"
dependencies = [
"arrow-array",
"arrow-cast",
"arrow-schema",
"arrow-array 54.2.1",
"arrow-cast 54.2.1",
"arrow-schema 54.3.1",
"chrono",
"csv",
"csv-core",
@@ -366,14 +468,26 @@ dependencies = [
"regex",
]
[[package]]
name = "arrow-data"
version = "53.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd962fc3bf7f60705b25bcaa8eb3318b2545aa1d528656525ebdd6a17a6cd6fb"
dependencies = [
"arrow-buffer 53.4.1",
"arrow-schema 53.4.1",
"half",
"num",
]
[[package]]
name = "arrow-data"
version = "54.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61cfdd7d99b4ff618f167e548b2411e5dd2c98c0ddebedd7df433d34c20a4429"
dependencies = [
"arrow-buffer",
"arrow-schema",
"arrow-buffer 54.3.1",
"arrow-schema 54.3.1",
"half",
"num",
]
@@ -384,11 +498,11 @@ version = "54.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7408f2bf3b978eddda272c7699f439760ebc4ac70feca25fefa82c5b8ce808d"
dependencies = [
"arrow-array",
"arrow-buffer",
"arrow-cast",
"arrow-ipc",
"arrow-schema",
"arrow-array 54.2.1",
"arrow-buffer 54.3.1",
"arrow-cast 54.2.1",
"arrow-ipc 54.2.1",
"arrow-schema 54.3.1",
"base64 0.22.1",
"bytes",
"futures",
@@ -397,32 +511,67 @@ dependencies = [
"tonic 0.12.3",
]
[[package]]
name = "arrow-ipc"
version = "53.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3527365b24372f9c948f16e53738eb098720eea2093ae73c7af04ac5e30a39b"
dependencies = [
"arrow-array 53.4.1",
"arrow-buffer 53.4.1",
"arrow-cast 53.4.1",
"arrow-data 53.4.1",
"arrow-schema 53.4.1",
"flatbuffers",
"zstd 0.13.2",
]
[[package]]
name = "arrow-ipc"
version = "54.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddecdeab02491b1ce88885986e25002a3da34dd349f682c7cfe67bab7cc17b86"
dependencies = [
"arrow-array",
"arrow-buffer",
"arrow-data",
"arrow-schema",
"arrow-array 54.2.1",
"arrow-buffer 54.3.1",
"arrow-data 54.3.1",
"arrow-schema 54.3.1",
"flatbuffers",
"lz4_flex",
"zstd 0.13.2",
]
[[package]]
name = "arrow-json"
version = "53.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acdec0024749fc0d95e025c0b0266d78613727b3b3a5d4cf8ea47eb6d38afdd1"
dependencies = [
"arrow-array 53.4.1",
"arrow-buffer 53.4.1",
"arrow-cast 53.4.1",
"arrow-data 53.4.1",
"arrow-schema 53.4.1",
"chrono",
"half",
"indexmap 2.9.0",
"lexical-core",
"num",
"serde",
"serde_json",
]
[[package]]
name = "arrow-json"
version = "54.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d03b9340013413eb84868682ace00a1098c81a5ebc96d279f7ebf9a4cac3c0fd"
dependencies = [
"arrow-array",
"arrow-buffer",
"arrow-cast",
"arrow-data",
"arrow-schema",
"arrow-array 54.2.1",
"arrow-buffer 54.3.1",
"arrow-cast 54.2.1",
"arrow-data 54.3.1",
"arrow-schema 54.3.1",
"chrono",
"half",
"indexmap 2.9.0",
@@ -432,17 +581,46 @@ dependencies = [
"serde_json",
]
[[package]]
name = "arrow-ord"
version = "53.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79af2db0e62a508d34ddf4f76bfd6109b6ecc845257c9cba6f939653668f89ac"
dependencies = [
"arrow-array 53.4.1",
"arrow-buffer 53.4.1",
"arrow-data 53.4.1",
"arrow-schema 53.4.1",
"arrow-select 53.4.1",
"half",
"num",
]
[[package]]
name = "arrow-ord"
version = "54.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f841bfcc1997ef6ac48ee0305c4dfceb1f7c786fe31e67c1186edf775e1f1160"
dependencies = [
"arrow-array",
"arrow-buffer",
"arrow-data",
"arrow-schema",
"arrow-select",
"arrow-array 54.2.1",
"arrow-buffer 54.3.1",
"arrow-data 54.3.1",
"arrow-schema 54.3.1",
"arrow-select 54.2.1",
]
[[package]]
name = "arrow-row"
version = "53.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da30e9d10e9c52f09ea0cf15086d6d785c11ae8dcc3ea5f16d402221b6ac7735"
dependencies = [
"ahash 0.8.11",
"arrow-array 53.4.1",
"arrow-buffer 53.4.1",
"arrow-data 53.4.1",
"arrow-schema 53.4.1",
"half",
]
[[package]]
@@ -451,13 +629,19 @@ version = "54.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1eeb55b0a0a83851aa01f2ca5ee5648f607e8506ba6802577afdda9d75cdedcd"
dependencies = [
"arrow-array",
"arrow-buffer",
"arrow-data",
"arrow-schema",
"arrow-array 54.2.1",
"arrow-buffer 54.3.1",
"arrow-data 54.3.1",
"arrow-schema 54.3.1",
"half",
]
[[package]]
name = "arrow-schema"
version = "53.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35b0f9c0c3582dd55db0f136d3b44bfa0189df07adcf7dc7f2f2e74db0f52eb8"
[[package]]
name = "arrow-schema"
version = "54.3.1"
@@ -467,6 +651,20 @@ dependencies = [
"serde",
]
[[package]]
name = "arrow-select"
version = "53.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92fc337f01635218493c23da81a364daf38c694b05fc20569c3193c11c561984"
dependencies = [
"ahash 0.8.11",
"arrow-array 53.4.1",
"arrow-buffer 53.4.1",
"arrow-data 53.4.1",
"arrow-schema 53.4.1",
"num",
]
[[package]]
name = "arrow-select"
version = "54.2.1"
@@ -474,24 +672,41 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e2932aece2d0c869dd2125feb9bd1709ef5c445daa3838ac4112dcfa0fda52c"
dependencies = [
"ahash 0.8.11",
"arrow-array",
"arrow-buffer",
"arrow-data",
"arrow-schema",
"arrow-array 54.2.1",
"arrow-buffer 54.3.1",
"arrow-data 54.3.1",
"arrow-schema 54.3.1",
"num",
]
[[package]]
name = "arrow-string"
version = "53.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d596a9fc25dae556672d5069b090331aca8acb93cae426d8b7dcdf1c558fa0ce"
dependencies = [
"arrow-array 53.4.1",
"arrow-buffer 53.4.1",
"arrow-data 53.4.1",
"arrow-schema 53.4.1",
"arrow-select 53.4.1",
"memchr",
"num",
"regex",
"regex-syntax 0.8.5",
]
[[package]]
name = "arrow-string"
version = "54.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "912e38bd6a7a7714c1d9b61df80315685553b7455e8a6045c27531d8ecd5b458"
dependencies = [
"arrow-array",
"arrow-buffer",
"arrow-data",
"arrow-schema",
"arrow-select",
"arrow-array 54.2.1",
"arrow-buffer 54.3.1",
"arrow-data 54.3.1",
"arrow-schema 54.3.1",
"arrow-select 54.2.1",
"memchr",
"num",
"regex",
@@ -1349,8 +1564,8 @@ name = "catalog"
version = "0.14.0"
dependencies = [
"api",
"arrow",
"arrow-schema",
"arrow 54.2.1",
"arrow-schema 54.3.1",
"async-stream",
"async-trait",
"bytes",
@@ -1940,8 +2155,8 @@ dependencies = [
name = "common-datasource"
version = "0.14.0"
dependencies = [
"arrow",
"arrow-schema",
"arrow 54.2.1",
"arrow-schema 54.3.1",
"async-compression 0.3.15",
"async-trait",
"bytes",
@@ -2403,7 +2618,7 @@ dependencies = [
name = "common-time"
version = "0.14.0"
dependencies = [
"arrow",
"arrow 54.2.1",
"chrono",
"chrono-tz",
"common-error",
@@ -2904,10 +3119,10 @@ name = "datafusion"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
dependencies = [
"arrow",
"arrow-array",
"arrow-ipc",
"arrow-schema",
"arrow 54.2.1",
"arrow-array 54.2.1",
"arrow-ipc 54.2.1",
"arrow-schema 54.3.1",
"async-compression 0.4.13",
"async-trait",
"bytes",
@@ -2955,7 +3170,7 @@ name = "datafusion-catalog"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
dependencies = [
"arrow",
"arrow 54.2.1",
"async-trait",
"dashmap",
"datafusion-common",
@@ -2975,8 +3190,8 @@ name = "datafusion-catalog-listing"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
dependencies = [
"arrow",
"arrow-schema",
"arrow 54.2.1",
"arrow-schema 54.3.1",
"chrono",
"datafusion-catalog",
"datafusion-common",
@@ -2999,10 +3214,10 @@ version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
dependencies = [
"ahash 0.8.11",
"arrow",
"arrow-array",
"arrow-ipc",
"arrow-schema",
"arrow 54.2.1",
"arrow-array 54.2.1",
"arrow-ipc 54.2.1",
"arrow-schema 54.3.1",
"base64 0.22.1",
"half",
"hashbrown 0.14.5",
@@ -3037,7 +3252,7 @@ name = "datafusion-execution"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
dependencies = [
"arrow",
"arrow 54.2.1",
"dashmap",
"datafusion-common",
"datafusion-expr",
@@ -3055,7 +3270,7 @@ name = "datafusion-expr"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
dependencies = [
"arrow",
"arrow 54.2.1",
"chrono",
"datafusion-common",
"datafusion-doc",
@@ -3075,7 +3290,7 @@ name = "datafusion-expr-common"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
dependencies = [
"arrow",
"arrow 54.2.1",
"datafusion-common",
"itertools 0.14.0",
"paste",
@@ -3086,8 +3301,8 @@ name = "datafusion-functions"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
dependencies = [
"arrow",
"arrow-buffer",
"arrow 54.2.1",
"arrow-buffer 54.3.1",
"base64 0.22.1",
"blake2",
"blake3",
@@ -3116,8 +3331,8 @@ version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
dependencies = [
"ahash 0.8.11",
"arrow",
"arrow-schema",
"arrow 54.2.1",
"arrow-schema 54.3.1",
"datafusion-common",
"datafusion-doc",
"datafusion-execution",
@@ -3137,7 +3352,7 @@ version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
dependencies = [
"ahash 0.8.11",
"arrow",
"arrow 54.2.1",
"datafusion-common",
"datafusion-expr-common",
"datafusion-physical-expr-common",
@@ -3148,10 +3363,10 @@ name = "datafusion-functions-nested"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
dependencies = [
"arrow",
"arrow-array",
"arrow-ord",
"arrow-schema",
"arrow 54.2.1",
"arrow-array 54.2.1",
"arrow-ord 54.2.1",
"arrow-schema 54.3.1",
"datafusion-common",
"datafusion-doc",
"datafusion-execution",
@@ -3170,7 +3385,7 @@ name = "datafusion-functions-table"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
dependencies = [
"arrow",
"arrow 54.2.1",
"async-trait",
"datafusion-catalog",
"datafusion-common",
@@ -3220,7 +3435,7 @@ name = "datafusion-optimizer"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
dependencies = [
"arrow",
"arrow 54.2.1",
"chrono",
"datafusion-common",
"datafusion-expr",
@@ -3239,9 +3454,9 @@ version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
dependencies = [
"ahash 0.8.11",
"arrow",
"arrow-array",
"arrow-schema",
"arrow 54.2.1",
"arrow-array 54.2.1",
"arrow-schema 54.3.1",
"datafusion-common",
"datafusion-expr",
"datafusion-expr-common",
@@ -3262,7 +3477,7 @@ version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
dependencies = [
"ahash 0.8.11",
"arrow",
"arrow 54.2.1",
"datafusion-common",
"datafusion-expr-common",
"hashbrown 0.14.5",
@@ -3274,8 +3489,8 @@ name = "datafusion-physical-optimizer"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
dependencies = [
"arrow",
"arrow-schema",
"arrow 54.2.1",
"arrow-schema 54.3.1",
"datafusion-common",
"datafusion-execution",
"datafusion-expr",
@@ -3296,10 +3511,10 @@ version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
dependencies = [
"ahash 0.8.11",
"arrow",
"arrow-array",
"arrow-ord",
"arrow-schema",
"arrow 54.2.1",
"arrow-array 54.2.1",
"arrow-ord 54.2.1",
"arrow-schema 54.3.1",
"async-trait",
"chrono",
"datafusion-common",
@@ -3325,9 +3540,9 @@ name = "datafusion-sql"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5bbedc6704162afb03478f56ffb629405a4e1220#5bbedc6704162afb03478f56ffb629405a4e1220"
dependencies = [
"arrow",
"arrow-array",
"arrow-schema",
"arrow 54.2.1",
"arrow-array 54.2.1",
"arrow-schema 54.3.1",
"bigdecimal 0.4.8",
"datafusion-common",
"datafusion-expr",
@@ -3420,9 +3635,9 @@ dependencies = [
name = "datatypes"
version = "0.14.0"
dependencies = [
"arrow",
"arrow-array",
"arrow-schema",
"arrow 54.2.1",
"arrow-array 54.2.1",
"arrow-schema 54.3.1",
"base64 0.22.1",
"common-base",
"common-decimal",
@@ -4170,8 +4385,8 @@ name = "flow"
version = "0.14.0"
dependencies = [
"api",
"arrow",
"arrow-schema",
"arrow 54.2.1",
"arrow-schema 54.3.1",
"async-recursion",
"async-trait",
"bytes",
@@ -4324,6 +4539,7 @@ dependencies = [
"num_cpus",
"opentelemetry-proto 0.27.0",
"operator",
"otel-arrow-rust",
"partition",
"pipeline",
"prometheus",
@@ -7546,6 +7762,27 @@ dependencies = [
"libc",
]
[[package]]
name = "num_enum"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e613fc340b2220f734a8595782c551f1250e969d87d3be1ae0579e8d4065179"
dependencies = [
"num_enum_derive",
]
[[package]]
name = "num_enum_derive"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af1844ef2428cc3e1cb900be36181049ef3d3193c63e43026cfe202983b27a56"
dependencies = [
"proc-macro-crate 1.3.1",
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]]
name = "num_threads"
version = "0.1.7"
@@ -7940,7 +8177,7 @@ name = "orc-rust"
version = "0.6.0"
source = "git+https://github.com/datafusion-contrib/orc-rust?rev=3134cab581a8e91b942d6a23aca2916ea965f6bb#3134cab581a8e91b942d6a23aca2916ea965f6bb"
dependencies = [
"arrow",
"arrow 54.2.1",
"async-trait",
"bytemuck",
"bytes",
@@ -8026,6 +8263,24 @@ version = "6.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2355d85b9a3786f481747ced0e0ff2ba35213a1f9bd406ed906554d7af805a1"
[[package]]
name = "otel-arrow-rust"
version = "0.1.0"
source = "git+https://github.com/open-telemetry/otel-arrow?rev=5d551412d2a12e689cde4d84c14ef29e36784e51#5d551412d2a12e689cde4d84c14ef29e36784e51"
dependencies = [
"arrow 53.4.1",
"arrow-ipc 53.4.1",
"lazy_static",
"num_enum",
"opentelemetry-proto 0.27.0",
"paste",
"prost 0.13.5",
"serde",
"snafu 0.8.5",
"tonic 0.12.3",
"tonic-build 0.12.3",
]
[[package]]
name = "overload"
version = "0.1.1"
@@ -8124,13 +8379,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f88838dca3b84d41444a0341b19f347e8098a3898b0f21536654b8b799e11abd"
dependencies = [
"ahash 0.8.11",
"arrow-array",
"arrow-buffer",
"arrow-cast",
"arrow-data",
"arrow-ipc",
"arrow-schema",
"arrow-select",
"arrow-array 54.2.1",
"arrow-buffer 54.3.1",
"arrow-cast 54.2.1",
"arrow-data 54.3.1",
"arrow-ipc 54.2.1",
"arrow-schema 54.3.1",
"arrow-select 54.2.1",
"base64 0.22.1",
"brotli",
"bytes",
@@ -8451,7 +8706,7 @@ version = "0.14.0"
dependencies = [
"ahash 0.8.11",
"api",
"arrow",
"arrow 54.2.1",
"async-trait",
"catalog",
"chrono",
@@ -9161,8 +9416,8 @@ dependencies = [
"ahash 0.8.11",
"api",
"arc-swap",
"arrow",
"arrow-schema",
"arrow 54.2.1",
"arrow-schema 54.3.1",
"async-recursion",
"async-stream",
"async-trait",
@@ -10577,10 +10832,10 @@ version = "0.14.0"
dependencies = [
"ahash 0.8.11",
"api",
"arrow",
"arrow 54.2.1",
"arrow-flight",
"arrow-ipc",
"arrow-schema",
"arrow-ipc 54.2.1",
"arrow-schema 54.3.1",
"async-trait",
"auth",
"axum 0.8.1",
@@ -10643,6 +10898,7 @@ dependencies = [
"openmetrics-parser",
"opensrv-mysql",
"opentelemetry-proto 0.27.0",
"otel-arrow-rust",
"parking_lot 0.12.3",
"permutation",
"pgwire",

View File

@@ -269,6 +269,9 @@ metric-engine = { path = "src/metric-engine" }
mito2 = { path = "src/mito2" }
object-store = { path = "src/object-store" }
operator = { path = "src/operator" }
otel-arrow-rust = { git = "https://github.com/open-telemetry/otel-arrow", rev = "5d551412d2a12e689cde4d84c14ef29e36784e51", features = [
"server",
] }
partition = { path = "src/partition" }
pipeline = { path = "src/pipeline" }
plugins = { path = "src/plugins" }

View File

@@ -39,6 +39,7 @@ datafusion.workspace = true
datafusion-expr.workspace = true
datanode.workspace = true
datatypes.workspace = true
futures.workspace = true
humantime-serde.workspace = true
lazy_static.workspace = true
log-query.workspace = true
@@ -47,6 +48,7 @@ meta-client.workspace = true
num_cpus.workspace = true
opentelemetry-proto.workspace = true
operator.workspace = true
otel-arrow-rust.workspace = true
partition.workspace = true
pipeline.workspace = true
prometheus.workspace = true

View File

@@ -27,6 +27,7 @@ use servers::http::{HttpServer, HttpServerBuilder};
use servers::interceptor::LogIngestInterceptorRef;
use servers::metrics_handler::MetricsHandler;
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
use servers::otel_arrow::OtelArrowServiceHandler;
use servers::postgres::PostgresServer;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
use servers::query_handler::sql::ServerSqlQueryHandlerAdapter;
@@ -162,6 +163,7 @@ where
let grpc_server = builder
.database_handler(greptime_request_handler.clone())
.prometheus_handler(self.instance.clone(), user_provider.clone())
.otel_arrow_handler(OtelArrowServiceHandler(self.instance.clone()))
.flight_handler(Arc::new(greptime_request_handler))
.build();
Ok(grpc_server)

View File

@@ -85,6 +85,7 @@ socket2 = "0.5"
# 2. Use ring, instead of aws-lc-rs in https://github.com/databendlabs/opensrv/pull/72
opensrv-mysql = { git = "https://github.com/datafuselabs/opensrv", rev = "a1fb4da215c8693c7e4f62be249a01b7fec52997" }
opentelemetry-proto.workspace = true
otel-arrow-rust.workspace = true
parking_lot.workspace = true
pgwire = { version = "0.28.0", default-features = false, features = ["server-api-ring"] }
pin-project = "1.0"

View File

@@ -540,12 +540,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Missing query context"))]
MissingQueryContext {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid table name"))]
InvalidTableName {
#[snafu(source)]
@@ -619,6 +613,13 @@ pub enum Error {
#[snafu(display("Overflow while casting `{:?}` to Interval", val))]
DurationOverflow { val: Duration },
#[snafu(display("Failed to handle otel-arrow request, error message: {}", err_msg))]
HandleOtelArrowRequest {
err_msg: String,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -677,7 +678,6 @@ impl ErrorExt for Error {
| TimePrecision { .. }
| UrlDecode { .. }
| IncompatibleSchema { .. }
| MissingQueryContext { .. }
| MysqlValueConversion { .. }
| ParseJson { .. }
| InvalidLokiLabels { .. }
@@ -738,7 +738,10 @@ impl ErrorExt for Error {
ConvertSqlValue { source, .. } => source.status_code(),
InFlightWriteBytesExceeded { .. } => StatusCode::RateLimited,
DurationOverflow { .. } => StatusCode::InvalidArguments,
HandleOtelArrowRequest { .. } => StatusCode::Internal,
}
}

View File

@@ -32,11 +32,13 @@ use common_grpc::channel_manager::{
};
use common_telemetry::{error, info, warn};
use futures::FutureExt;
use otel_arrow_rust::opentelemetry::ArrowMetricsServiceServer;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use tokio::net::TcpListener;
use tokio::sync::oneshot::{self, Receiver, Sender};
use tokio::sync::Mutex;
use tonic::service::interceptor::InterceptedService;
use tonic::service::Routes;
use tonic::transport::server::TcpIncoming;
use tonic::transport::ServerTlsConfig;
@@ -47,6 +49,8 @@ use crate::error::{
AlreadyStartedSnafu, InternalSnafu, Result, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu,
};
use crate::metrics::MetricsMiddlewareLayer;
use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
use crate::query_handler::OpenTelemetryProtocolHandlerRef;
use crate::server::Server;
use crate::tls::TlsOption;
@@ -138,6 +142,15 @@ pub struct GrpcServer {
routes: Mutex<Option<Routes>>,
// tls config
tls_config: Option<ServerTlsConfig>,
// Otel arrow service
otel_arrow_service: Mutex<
Option<
InterceptedService<
ArrowMetricsServiceServer<OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>>,
HeaderInterceptor,
>,
>,
>,
}
/// Grpc Server configuration
@@ -264,11 +277,16 @@ impl Server for GrpcServer {
if let Some(tls_config) = self.tls_config.clone() {
builder = builder.tls_config(tls_config).context(StartGrpcSnafu)?;
}
let builder = builder
let mut builder = builder
.add_routes(routes)
.add_service(self.create_healthcheck_service())
.add_service(self.create_reflection_service());
if let Some(otel_arrow_service) = self.otel_arrow_service.lock().await.take() {
builder = builder.add_service(otel_arrow_service);
}
let (serve_state_tx, serve_state_rx) = oneshot::channel();
let mut serve_state = self.serve_state.lock().await;
*serve_state = Some(serve_state_rx);

View File

@@ -19,8 +19,11 @@ use arrow_flight::flight_service_server::FlightServiceServer;
use auth::UserProviderRef;
use common_grpc::error::{Error, InvalidConfigFilePathSnafu, Result};
use common_runtime::Runtime;
use otel_arrow_rust::opentelemetry::ArrowMetricsServiceServer;
use snafu::ResultExt;
use tokio::sync::Mutex;
use tonic::codec::CompressionEncoding;
use tonic::service::interceptor::InterceptedService;
use tonic::service::RoutesBuilder;
use tonic::transport::{Identity, ServerTlsConfig};
@@ -30,7 +33,9 @@ use crate::grpc::greptime_handler::GreptimeRequestHandler;
use crate::grpc::prom_query_gateway::PrometheusGatewayService;
use crate::grpc::region_server::{RegionServerHandlerRef, RegionServerRequestHandler};
use crate::grpc::{GrpcServer, GrpcServerConfig};
use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
use crate::prometheus_handler::PrometheusHandlerRef;
use crate::query_handler::OpenTelemetryProtocolHandlerRef;
use crate::tls::TlsOption;
/// Add a gRPC service (`service`) to a `builder`([RoutesBuilder]).
@@ -59,6 +64,12 @@ pub struct GrpcServerBuilder {
runtime: Runtime,
routes_builder: RoutesBuilder,
tls_config: Option<ServerTlsConfig>,
otel_arrow_service: Option<
InterceptedService<
ArrowMetricsServiceServer<OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>>,
HeaderInterceptor,
>,
>,
}
impl GrpcServerBuilder {
@@ -68,6 +79,7 @@ impl GrpcServerBuilder {
runtime,
routes_builder: RoutesBuilder::default(),
tls_config: None,
otel_arrow_service: None,
}
}
@@ -113,6 +125,22 @@ impl GrpcServerBuilder {
self
}
/// Add handler for [OtelArrowService].
pub fn otel_arrow_handler(
mut self,
handler: OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>,
) -> Self {
let mut server = ArrowMetricsServiceServer::new(handler);
server = server
.max_decoding_message_size(self.config.max_recv_message_size)
.max_encoding_message_size(self.config.max_send_message_size)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Zstd);
let svc = InterceptedService::new(server, HeaderInterceptor {});
self.otel_arrow_service = Some(svc);
self
}
/// Add handler for [RegionServer].
pub fn region_server_handler(mut self, region_server_handler: RegionServerHandlerRef) -> Self {
let handler = RegionServerRequestHandler::new(region_server_handler, self.runtime.clone());
@@ -152,6 +180,7 @@ impl GrpcServerBuilder {
shutdown_tx: Mutex::new(None),
serve_state: Mutex::new(None),
tls_config: self.tls_config,
otel_arrow_service: Mutex::new(self.otel_arrow_service),
}
}
}

View File

@@ -1,97 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::result::Result as StdResult;
use std::sync::Arc;
use opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsService;
use opentelemetry_proto::tonic::collector::metrics::v1::{
ExportMetricsServiceRequest, ExportMetricsServiceResponse,
};
use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceService;
use opentelemetry_proto::tonic::collector::trace::v1::{
ExportTraceServiceRequest, ExportTraceServiceResponse,
};
use session::context::{Channel, QueryContext};
use snafu::{OptionExt, ResultExt};
use tonic::{Request, Response, Status};
use crate::error;
use crate::http::header::constants::GREPTIME_TRACE_TABLE_NAME_HEADER_NAME;
use crate::otlp::trace::TRACE_TABLE_NAME;
use crate::query_handler::OpenTelemetryProtocolHandlerRef;
pub struct OtlpService {
handler: OpenTelemetryProtocolHandlerRef,
}
impl OtlpService {
pub fn new(handler: OpenTelemetryProtocolHandlerRef) -> Self {
Self { handler }
}
}
#[async_trait::async_trait]
impl TraceService for OtlpService {
async fn export(
&self,
request: Request<ExportTraceServiceRequest>,
) -> StdResult<Response<ExportTraceServiceResponse>, Status> {
let (headers, extensions, req) = request.into_parts();
let table_name = match headers.get(GREPTIME_TRACE_TABLE_NAME_HEADER_NAME) {
Some(table_name) => table_name
.to_str()
.context(error::InvalidTableNameSnafu)?
.to_string(),
None => TRACE_TABLE_NAME.to_string(),
};
let mut ctx = extensions
.get::<QueryContext>()
.cloned()
.context(error::MissingQueryContextSnafu)?;
ctx.set_channel(Channel::Otlp);
let ctx = Arc::new(ctx);
let _ = self.handler.traces(req, table_name, ctx).await?;
Ok(Response::new(ExportTraceServiceResponse {
partial_success: None,
}))
}
}
#[async_trait::async_trait]
impl MetricsService for OtlpService {
async fn export(
&self,
request: Request<ExportMetricsServiceRequest>,
) -> StdResult<Response<ExportMetricsServiceResponse>, Status> {
let (_headers, extensions, req) = request.into_parts();
let mut ctx = extensions
.get::<QueryContext>()
.cloned()
.context(error::MissingQueryContextSnafu)?;
ctx.set_channel(Channel::Otlp);
let ctx = Arc::new(ctx);
let _ = self.handler.metrics(req, ctx).await?;
Ok(Response::new(ExportMetricsServiceResponse {
partial_success: None,
}))
}
}

View File

@@ -37,6 +37,7 @@ mod metrics;
pub mod metrics_handler;
pub mod mysql;
pub mod opentsdb;
pub mod otel_arrow;
pub mod otlp;
mod pipeline;
pub mod postgres;

View File

@@ -0,0 +1,119 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_error::ext::ErrorExt;
use common_error::status_code::status_to_tonic_code;
use common_telemetry::error;
use futures::SinkExt;
use otel_arrow_rust::opentelemetry::{ArrowMetricsService, BatchArrowRecords, BatchStatus};
use otel_arrow_rust::Consumer;
use session::context::QueryContext;
use tonic::metadata::{Entry, MetadataValue};
use tonic::service::Interceptor;
use tonic::{Request, Response, Status, Streaming};
use crate::error;
use crate::query_handler::OpenTelemetryProtocolHandlerRef;
pub struct OtelArrowServiceHandler<T>(pub T);
impl<T> OtelArrowServiceHandler<T> {
pub fn new(handler: T) -> Self {
Self(handler)
}
}
#[async_trait::async_trait]
impl ArrowMetricsService for OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef> {
type ArrowMetricsStream = futures::channel::mpsc::Receiver<Result<BatchStatus, Status>>;
async fn arrow_metrics(
&self,
request: Request<Streaming<BatchArrowRecords>>,
) -> Result<Response<Self::ArrowMetricsStream>, Status> {
let (mut sender, receiver) = futures::channel::mpsc::channel(100);
let mut incoming_requests = request.into_inner();
let handler = self.0.clone();
let query_context = QueryContext::arc();
// handles incoming requests
common_runtime::spawn_global(async move {
let mut consumer = Consumer::default();
while let Some(batch_res) = incoming_requests.message().await.transpose() {
let mut batch = match batch_res {
Ok(batch) => batch,
Err(e) => {
error!(
"Failed to receive batch from otel-arrow client, error: {}",
e
);
let _ = sender.send(Err(e)).await;
return;
}
};
let batch_status = BatchStatus {
batch_id: batch.batch_id,
status_code: 0,
status_message: Default::default(),
};
let request = match consumer.consume_batches(&mut batch).map_err(|e| {
error::HandleOtelArrowRequestSnafu {
err_msg: e.to_string(),
}
.build()
}) {
Ok(request) => request,
Err(e) => {
let _ = sender
.send(Err(Status::new(
status_to_tonic_code(e.status_code()),
e.to_string(),
)))
.await;
error!(e;
"Failed to consume batch from otel-arrow client"
);
return;
}
};
if let Err(e) = handler.metrics(request, query_context.clone()).await {
let _ = sender
.send(Err(Status::new(
status_to_tonic_code(e.status_code()),
e.to_string(),
)))
.await;
error!(e; "Failed to ingest metrics from otel-arrow");
return;
}
let _ = sender.send(Ok(batch_status)).await;
}
});
Ok(Response::new(receiver))
}
}
/// This serves as a workaround for otel-arrow collector's custom header.
#[derive(Clone)]
pub struct HeaderInterceptor;
impl Interceptor for HeaderInterceptor {
fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, Status> {
if let Ok(Entry::Occupied(mut e)) = request.metadata_mut().entry("grpc-encoding") {
// This works as a workaround to handle customized compression type (zstdarrow*) in otel-arrow.
if e.get().as_bytes().starts_with(b"zstdarrow") {
e.insert(MetadataValue::from_static("zstd"));
}
}
Ok(request)
}
}