Compare commits

..

5 Commits

Author SHA1 Message Date
Lei, HUANG
4534e4c31d chore/enable-flight-encoder:
### Add Flight Compression Support

 - **Configuration Updates**:
   - Added `grpc.flight_compression` option to `config/config.md`, `config/datanode.example.toml`, and `config/frontend.example.toml` to specify compression modes for Arrow IPC service.

 - **Code Enhancements**:
   - Updated `FlightEncoder` in `src/common/grpc/src/flight.rs` to support compression modes.
   - Modified `RegionServer` and `DatanodeBuilder` in `src/datanode/src/datanode.rs` and `src/datanode/src/region_server.rs` to handle `FlightCompression`.
   - Integrated `FlightCompression` in `src/servers/src/grpc.rs` and `src/servers/src/grpc/flight.rs` to manage compression settings.

 - **Testing and Integration**:
   - Updated test utilities and integration tests in `tests-integration/src/grpc/flight.rs` and `tests-integration/src/test_util.rs` to include `FlightCompression`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-10 23:15:01 +08:00
discord9
3d8278dc4c feat: truly limit time range by split window 2025-06-10 23:15:01 +08:00
discord9
e4328380b2 feat: better metrics 2025-06-10 23:15:01 +08:00
discord9
e962076207 feat(exp): adjust_flow admin function 2025-06-10 23:15:01 +08:00
discord9
9ef8ba6460 feat: flownode to frontend load balance with guess 2025-06-10 23:15:01 +08:00
52 changed files with 873 additions and 571 deletions

View File

@@ -30,7 +30,7 @@ update_helm_charts_version() {
# Commit the changes.
git add .
git commit -s -m "chore: Update GreptimeDB version to ${VERSION}"
git commit -m "chore: Update GreptimeDB version to ${VERSION}"
git push origin $BRANCH_NAME
# Create a Pull Request.

View File

@@ -26,7 +26,7 @@ update_homebrew_greptime_version() {
# Commit the changes.
git add .
git commit -s -m "chore: Update GreptimeDB version to ${VERSION}"
git commit -m "chore: Update GreptimeDB version to ${VERSION}"
git push origin $BRANCH_NAME
# Create a Pull Request.

51
Cargo.lock generated
View File

@@ -2336,7 +2336,6 @@ dependencies = [
"num-traits",
"once_cell",
"paste",
"promql",
"s2",
"serde",
"serde_json",
@@ -3253,7 +3252,7 @@ dependencies = [
[[package]]
name = "datafusion"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"arrow-array 54.2.1",
@@ -3304,7 +3303,7 @@ dependencies = [
[[package]]
name = "datafusion-catalog"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"async-trait",
@@ -3324,7 +3323,7 @@ dependencies = [
[[package]]
name = "datafusion-catalog-listing"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"arrow-schema 54.3.1",
@@ -3347,7 +3346,7 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"ahash 0.8.11",
"arrow 54.2.1",
@@ -3372,7 +3371,7 @@ dependencies = [
[[package]]
name = "datafusion-common-runtime"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"log",
"tokio",
@@ -3381,12 +3380,12 @@ dependencies = [
[[package]]
name = "datafusion-doc"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
[[package]]
name = "datafusion-execution"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"dashmap",
@@ -3404,7 +3403,7 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"chrono",
@@ -3424,7 +3423,7 @@ dependencies = [
[[package]]
name = "datafusion-expr-common"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"datafusion-common",
@@ -3435,7 +3434,7 @@ dependencies = [
[[package]]
name = "datafusion-functions"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"arrow-buffer 54.3.1",
@@ -3464,7 +3463,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-aggregate"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"ahash 0.8.11",
"arrow 54.2.1",
@@ -3485,7 +3484,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-aggregate-common"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"ahash 0.8.11",
"arrow 54.2.1",
@@ -3497,7 +3496,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-nested"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"arrow-array 54.2.1",
@@ -3519,7 +3518,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-table"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"async-trait",
@@ -3534,7 +3533,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-window"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"datafusion-common",
"datafusion-doc",
@@ -3550,7 +3549,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-window-common"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"datafusion-common",
"datafusion-physical-expr-common",
@@ -3559,7 +3558,7 @@ dependencies = [
[[package]]
name = "datafusion-macros"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"datafusion-expr",
"quote",
@@ -3569,7 +3568,7 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"chrono",
@@ -3587,7 +3586,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"ahash 0.8.11",
"arrow 54.2.1",
@@ -3610,7 +3609,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr-common"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"ahash 0.8.11",
"arrow 54.2.1",
@@ -3623,7 +3622,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-optimizer"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"arrow-schema 54.3.1",
@@ -3644,7 +3643,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-plan"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"ahash 0.8.11",
"arrow 54.2.1",
@@ -3674,7 +3673,7 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"arrow 54.2.1",
"arrow-array 54.2.1",
@@ -3692,7 +3691,7 @@ dependencies = [
[[package]]
name = "datafusion-substrait"
version = "45.0.0"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=12c0381babd52c681043957e9d6ee083a03f7646#12c0381babd52c681043957e9d6ee083a03f7646"
source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=e104c7cf62b11dd5fe41461b82514978234326b4#e104c7cf62b11dd5fe41461b82514978234326b4"
dependencies = [
"async-recursion",
"async-trait",
@@ -5134,7 +5133,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=454c52634c3bac27de10bf0d85d5533eed1cf03f#454c52634c3bac27de10bf0d85d5533eed1cf03f"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=2dca1dc67862d7b410838aef81232274c019b3f6#2dca1dc67862d7b410838aef81232274c019b3f6"
dependencies = [
"prost 0.13.5",
"serde",

View File

@@ -116,15 +116,15 @@ clap = { version = "4.4", features = ["derive"] }
config = "0.13.0"
crossbeam-utils = "0.8"
dashmap = "6.1"
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
deadpool = "0.12"
deadpool-postgres = "0.14"
derive_builder = "0.20"
@@ -133,7 +133,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "454c52634c3bac27de10bf0d85d5533eed1cf03f" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2dca1dc67862d7b410838aef81232274c019b3f6" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -232,6 +232,7 @@
| `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
| `grpc.server_addr` | String | `127.0.0.1:4001` | The address advertised to the metasrv, and used for connections from outside the host.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `grpc.bind_addr`. |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for frontend side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression. |
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
| `grpc.tls.mode` | String | `disable` | TLS mode. |
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |
@@ -404,6 +405,7 @@
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for datanode side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression. |
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
| `grpc.tls.mode` | String | `disable` | TLS mode. |
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |

View File

@@ -44,6 +44,12 @@ runtime_size = 8
max_recv_message_size = "512MB"
## The maximum send message size for gRPC server.
max_send_message_size = "512MB"
## Compression mode for datanode side Arrow IPC service. Available options:
## - `none`: disable all compression
## - `transport`: only enable gRPC transport compression (zstd)
## - `arrow_ipc`: only enable Arrow IPC compression (lz4)
## - `all`: enable all compression.
flight_compression = "arrow_ipc"
## gRPC server TLS options, see `mysql.tls` section.
[grpc.tls]

View File

@@ -54,6 +54,12 @@ bind_addr = "127.0.0.1:4001"
server_addr = "127.0.0.1:4001"
## The number of server worker threads.
runtime_size = 8
## Compression mode for frontend side Arrow IPC service. Available options:
## - `none`: disable all compression
## - `transport`: only enable gRPC transport compression (zstd)
## - `arrow_ipc`: only enable Arrow IPC compression (lz4)
## - `all`: enable all compression.
flight_compression = "arrow_ipc"
## gRPC server TLS options, see `mysql.tls` section.
[grpc.tls]

View File

@@ -58,7 +58,6 @@ where
info!("{desc}, average operation cost: {cost:.2} ms");
}
/// Command to benchmark table metadata operations.
#[derive(Debug, Default, Parser)]
pub struct BenchTableMetadataCommand {
#[clap(long)]

View File

@@ -244,18 +244,6 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unsupported memory backend"))]
UnsupportedMemoryBackend {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("File path invalid: {}", msg))]
InvalidFilePath {
msg: String,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -274,8 +262,6 @@ impl ErrorExt for Error {
| Error::ConnectEtcd { .. }
| Error::CreateDir { .. }
| Error::EmptyResult { .. }
| Error::InvalidFilePath { .. }
| Error::UnsupportedMemoryBackend { .. }
| Error::ParseProxyOpts { .. } => StatusCode::InvalidArguments,
Error::StartProcedureManager { source, .. }

View File

@@ -50,7 +50,6 @@ enum ExportTarget {
All,
}
/// Command for exporting data from the GreptimeDB.
#[derive(Debug, Default, Parser)]
pub struct ExportCommand {
/// Server address to connect

View File

@@ -40,7 +40,6 @@ enum ImportTarget {
All,
}
/// Command to import data from a directory into a GreptimeDB instance.
#[derive(Debug, Default, Parser)]
pub struct ImportCommand {
/// Server address to connect

View File

@@ -20,7 +20,7 @@ mod import;
mod meta_snapshot;
use async_trait::async_trait;
use clap::{Parser, Subcommand};
use clap::Parser;
use common_error::ext::BoxedError;
pub use database::DatabaseClient;
use error::Result;
@@ -28,7 +28,7 @@ use error::Result;
pub use crate::bench::BenchTableMetadataCommand;
pub use crate::export::ExportCommand;
pub use crate::import::ImportCommand;
pub use crate::meta_snapshot::{MetaCommand, MetaInfoCommand, MetaRestoreCommand, MetaSaveCommand};
pub use crate::meta_snapshot::{MetaRestoreCommand, MetaSnapshotCommand};
#[async_trait]
pub trait Tool: Send + Sync {
@@ -51,19 +51,3 @@ impl AttachCommand {
unimplemented!("Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373")
}
}
/// Subcommand for data operations like export and import.
#[derive(Subcommand)]
pub enum DataCommand {
Export(ExportCommand),
Import(ImportCommand),
}
impl DataCommand {
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
match self {
DataCommand::Export(cmd) => cmd.build().await,
DataCommand::Import(cmd) => cmd.build().await,
}
}
}

View File

@@ -12,11 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::Path;
use std::sync::Arc;
use async_trait::async_trait;
use clap::{Parser, Subcommand};
use clap::Parser;
use common_base::secrets::{ExposeSecret, SecretString};
use common_error::ext::BoxedError;
use common_meta::kv_backend::chroot::ChrootKvBackend;
@@ -27,50 +26,10 @@ use meta_srv::bootstrap::create_etcd_client;
use meta_srv::metasrv::BackendImpl;
use object_store::services::{Fs, S3};
use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt};
use snafu::ResultExt;
use crate::error::{
InvalidFilePathSnafu, KvBackendNotSetSnafu, OpenDalSnafu, S3ConfigNotSetSnafu,
UnsupportedMemoryBackendSnafu,
};
use crate::error::{KvBackendNotSetSnafu, OpenDalSnafu, S3ConfigNotSetSnafu};
use crate::Tool;
/// Subcommand for metadata snapshot management.
#[derive(Subcommand)]
pub enum MetaCommand {
#[clap(subcommand)]
Snapshot(MetaSnapshotCommand),
}
impl MetaCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
match self {
MetaCommand::Snapshot(cmd) => cmd.build().await,
}
}
}
/// Subcommand for metadata snapshot operations. such as save, restore and info.
#[derive(Subcommand)]
pub enum MetaSnapshotCommand {
/// Export metadata snapshot tool.
Save(MetaSaveCommand),
/// Restore metadata snapshot tool.
Restore(MetaRestoreCommand),
/// Explore metadata from metadata snapshot.
Info(MetaInfoCommand),
}
impl MetaSnapshotCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
match self {
MetaSnapshotCommand::Save(cmd) => cmd.build().await,
MetaSnapshotCommand::Restore(cmd) => cmd.build().await,
MetaSnapshotCommand::Info(cmd) => cmd.build().await,
}
}
}
#[derive(Debug, Default, Parser)]
struct MetaConnection {
/// The endpoint of store. one of etcd, pg or mysql.
@@ -132,9 +91,6 @@ impl MetaConnection {
.await
.map_err(BoxedError::new)?)
}
Some(BackendImpl::MemoryStore) => UnsupportedMemoryBackendSnafu
.fail()
.map_err(BoxedError::new),
_ => KvBackendNotSetSnafu { backend: "all" }
.fail()
.map_err(BoxedError::new),
@@ -214,7 +170,7 @@ impl S3Config {
/// It will dump the metadata snapshot to local file or s3 bucket.
/// The snapshot file will be in binary format.
#[derive(Debug, Default, Parser)]
pub struct MetaSaveCommand {
pub struct MetaSnapshotCommand {
/// The connection to the metadata store.
#[clap(flatten)]
connection: MetaConnection,
@@ -240,7 +196,7 @@ fn create_local_file_object_store(root: &str) -> Result<ObjectStore, BoxedError>
Ok(object_store)
}
impl MetaSaveCommand {
impl MetaSnapshotCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
let kvbackend = self.connection.build().await?;
let output_dir = &self.output_dir;
@@ -371,89 +327,3 @@ impl Tool for MetaRestoreTool {
}
}
}
/// Explore metadata from metadata snapshot.
#[derive(Debug, Default, Parser)]
pub struct MetaInfoCommand {
/// The s3 config.
#[clap(flatten)]
s3_config: S3Config,
/// The name of the target snapshot file. we will add the file extension automatically.
#[clap(long, default_value = "metadata_snapshot")]
file_name: String,
/// The query string to filter the metadata.
#[clap(long, default_value = "*")]
inspect_key: String,
/// The limit of the metadata to query.
#[clap(long)]
limit: Option<usize>,
}
pub struct MetaInfoTool {
inner: ObjectStore,
source_file: String,
inspect_key: String,
limit: Option<usize>,
}
#[async_trait]
impl Tool for MetaInfoTool {
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
let result = MetadataSnapshotManager::info(
&self.inner,
&self.source_file,
&self.inspect_key,
self.limit,
)
.await
.map_err(BoxedError::new)?;
for item in result {
println!("{}", item);
}
Ok(())
}
}
impl MetaInfoCommand {
fn decide_object_store_root_for_local_store(
file_path: &str,
) -> Result<(&str, &str), BoxedError> {
let path = Path::new(file_path);
let parent = path
.parent()
.and_then(|p| p.to_str())
.context(InvalidFilePathSnafu { msg: file_path })
.map_err(BoxedError::new)?;
let file_name = path
.file_name()
.and_then(|f| f.to_str())
.context(InvalidFilePathSnafu { msg: file_path })
.map_err(BoxedError::new)?;
let root = if parent.is_empty() { "." } else { parent };
Ok((root, file_name))
}
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
let object_store = self.s3_config.build("").map_err(BoxedError::new)?;
if let Some(store) = object_store {
let tool = MetaInfoTool {
inner: store,
source_file: self.file_name.clone(),
inspect_key: self.inspect_key.clone(),
limit: self.limit,
};
Ok(Box::new(tool))
} else {
let (root, file_name) =
Self::decide_object_store_root_for_local_store(&self.file_name)?;
let object_store = create_local_file_object_store(root)?;
let tool = MetaInfoTool {
inner: object_store,
source_file: file_name.to_string(),
inspect_key: self.inspect_key.clone(),
limit: self.limit,
};
Ok(Box::new(tool))
}
}
}

View File

@@ -146,7 +146,6 @@ mod tests {
let output_dir = tempfile::tempdir().unwrap();
let cli = cli::Command::parse_from([
"cli",
"data",
"export",
"--addr",
"127.0.0.1:4000",

View File

@@ -47,7 +47,6 @@ num = "0.4"
num-traits = "0.2"
once_cell.workspace = true
paste.workspace = true
promql.workspace = true
s2 = { version = "0.0.12", optional = true }
serde.workspace = true
serde_json.workspace = true

View File

@@ -0,0 +1,90 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_macro::admin_fn;
use common_query::error::{
InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::Signature;
use datafusion::logical_expr::Volatility;
use datatypes::value::{Value, ValueRef};
use session::context::QueryContextRef;
use snafu::ensure;
use store_api::storage::ConcreteDataType;
use crate::handlers::FlowServiceHandlerRef;
use crate::helper::parse_catalog_flow;
fn adjust_signature() -> Signature {
Signature::exact(
vec![
ConcreteDataType::string_datatype(), // flow name
ConcreteDataType::uint64_datatype(), // min_run_interval in seconds
ConcreteDataType::uint64_datatype(), // max filter number per query
],
Volatility::Immutable,
)
}
#[admin_fn(
name = AdjustFlowFunction,
display_name = adjust_flow,
sig_fn = adjust_signature,
ret = uint64
)]
pub(crate) async fn adjust_flow(
flow_service_handler: &FlowServiceHandlerRef,
query_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
ensure!(
params.len() == 3,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 3, have: {}",
params.len()
),
}
);
let (flow_name, min_run_interval, max_filter_num) = match (params[0], params[1], params[2]) {
(
ValueRef::String(flow_name),
ValueRef::UInt64(min_run_interval),
ValueRef::UInt64(max_filter_num),
) => (flow_name, min_run_interval, max_filter_num),
_ => {
return UnsupportedInputDataTypeSnafu {
function: "adjust_flow",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
}
};
let (catalog_name, flow_name) = parse_catalog_flow(flow_name, query_ctx)?;
let res = flow_service_handler
.adjust(
&catalog_name,
&flow_name,
min_run_interval,
max_filter_num as usize,
query_ctx.clone(),
)
.await?;
let affected_rows = res.affected_rows;
Ok(Value::from(affected_rows))
}

View File

@@ -26,6 +26,7 @@ use flush_compact_table::{CompactTableFunction, FlushTableFunction};
use migrate_region::MigrateRegionFunction;
use remove_region_follower::RemoveRegionFollowerFunction;
use crate::adjust_flow::AdjustFlowFunction;
use crate::flush_flow::FlushFlowFunction;
use crate::function_registry::FunctionRegistry;
@@ -43,5 +44,6 @@ impl AdminFunction {
registry.register_async(Arc::new(FlushTableFunction));
registry.register_async(Arc::new(CompactTableFunction));
registry.register_async(Arc::new(FlushFlowFunction));
registry.register_async(Arc::new(AdjustFlowFunction));
}
}

View File

@@ -12,21 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_error::ext::BoxedError;
use common_macro::admin_fn;
use common_query::error::{
ExecuteSnafu, InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result,
UnsupportedInputDataTypeSnafu,
InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::Signature;
use datafusion::logical_expr::Volatility;
use datatypes::value::{Value, ValueRef};
use session::context::QueryContextRef;
use snafu::{ensure, ResultExt};
use sql::parser::ParserContext;
use snafu::ensure;
use store_api::storage::ConcreteDataType;
use crate::handlers::FlowServiceHandlerRef;
use crate::helper::parse_catalog_flow;
fn flush_signature() -> Signature {
Signature::uniform(
@@ -47,20 +45,6 @@ pub(crate) async fn flush_flow(
query_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
let (catalog_name, flow_name) = parse_flush_flow(params, query_ctx)?;
let res = flow_service_handler
.flush(&catalog_name, &flow_name, query_ctx.clone())
.await?;
let affected_rows = res.affected_rows;
Ok(Value::from(affected_rows))
}
fn parse_flush_flow(
params: &[ValueRef<'_>],
query_ctx: &QueryContextRef,
) -> Result<(String, String)> {
ensure!(
params.len() == 1,
InvalidFuncArgsSnafu {
@@ -70,7 +54,6 @@ fn parse_flush_flow(
),
}
);
let ValueRef::String(flow_name) = params[0] else {
return UnsupportedInputDataTypeSnafu {
function: "flush_flow",
@@ -78,27 +61,14 @@ fn parse_flush_flow(
}
.fail();
};
let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect())
.map_err(BoxedError::new)
.context(ExecuteSnafu)?;
let (catalog_name, flow_name) = parse_catalog_flow(flow_name, query_ctx)?;
let (catalog_name, flow_name) = match &obj_name.0[..] {
[flow_name] => (
query_ctx.current_catalog().to_string(),
flow_name.value.clone(),
),
[catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()),
_ => {
return InvalidFuncArgsSnafu {
err_msg: format!(
"expect flow name to be <catalog>.<flow-name> or <flow-name>, actual: {}",
obj_name
),
}
.fail()
}
};
Ok((catalog_name, flow_name))
let res = flow_service_handler
.flush(&catalog_name, &flow_name, query_ctx.clone())
.await?;
let affected_rows = res.affected_rows;
Ok(Value::from(affected_rows))
}
#[cfg(test)]
@@ -154,10 +124,7 @@ mod test {
("catalog.flow_name", ("catalog", "flow_name")),
];
for (input, expected) in testcases.iter() {
let args = vec![*input];
let args = args.into_iter().map(ValueRef::String).collect::<Vec<_>>();
let result = parse_flush_flow(&args, &QueryContext::arc()).unwrap();
let result = parse_catalog_flow(input, &QueryContext::arc()).unwrap();
assert_eq!(*expected, (result.0.as_str(), result.1.as_str()));
}
}

View File

@@ -39,7 +39,7 @@ use crate::system::SystemFunction;
#[derive(Default)]
pub struct FunctionRegistry {
scalar_functions: RwLock<HashMap<String, ScalarFunctionFactory>>,
functions: RwLock<HashMap<String, ScalarFunctionFactory>>,
async_functions: RwLock<HashMap<String, AsyncFunctionRef>>,
aggregate_functions: RwLock<HashMap<String, AggregateUDF>>,
}
@@ -48,7 +48,7 @@ impl FunctionRegistry {
pub fn register(&self, func: impl Into<ScalarFunctionFactory>) {
let func = func.into();
let _ = self
.scalar_functions
.functions
.write()
.unwrap()
.insert(func.name().to_string(), func);
@@ -87,17 +87,13 @@ impl FunctionRegistry {
.collect()
}
pub fn get_scalar_function(&self, name: &str) -> Option<ScalarFunctionFactory> {
self.scalar_functions.read().unwrap().get(name).cloned()
#[cfg(test)]
pub fn get_function(&self, name: &str) -> Option<ScalarFunctionFactory> {
self.functions.read().unwrap().get(name).cloned()
}
pub fn scalar_functions(&self) -> Vec<ScalarFunctionFactory> {
self.scalar_functions
.read()
.unwrap()
.values()
.cloned()
.collect()
self.functions.read().unwrap().values().cloned().collect()
}
pub fn aggregate_functions(&self) -> Vec<AggregateUDF> {
@@ -148,11 +144,6 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
// Approximate functions
ApproximateFunction::register(&function_registry);
// PromQL aggregate functions
for aggr in promql::functions::aggr_funcs() {
function_registry.register_aggr(aggr);
}
Arc::new(function_registry)
});
@@ -165,10 +156,10 @@ mod tests {
fn test_function_registry() {
let registry = FunctionRegistry::default();
assert!(registry.get_scalar_function("test_and").is_none());
assert!(registry.get_function("test_and").is_none());
assert!(registry.scalar_functions().is_empty());
registry.register_scalar(TestAndFunction);
let _ = registry.get_scalar_function("test_and").unwrap();
let _ = registry.get_function("test_and").unwrap();
assert_eq!(1, registry.scalar_functions().len());
}
}

View File

@@ -87,6 +87,15 @@ pub trait FlowServiceHandler: Send + Sync {
flow: &str,
ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse>;
async fn adjust(
&self,
catalog: &str,
flow: &str,
min_run_interval_secs: u64,
max_filter_num_per_query: usize,
ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse>;
}
pub type TableMutationHandlerRef = Arc<dyn TableMutationHandler>;

View File

@@ -12,12 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_query::error::{InvalidInputTypeSnafu, Result};
use common_error::ext::BoxedError;
use common_query::error::{ExecuteSnafu, InvalidFuncArgsSnafu, InvalidInputTypeSnafu, Result};
use common_query::prelude::{Signature, TypeSignature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::types::cast::cast;
use datatypes::value::ValueRef;
use session::context::QueryContextRef;
use snafu::ResultExt;
use sql::parser::ParserContext;
/// Create a function signature with oneof signatures of interleaving two arguments.
pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>) -> Signature {
@@ -43,3 +46,30 @@ pub fn cast_u64(value: &ValueRef) -> Result<Option<u64>> {
})
.map(|v| v.as_u64())
}
pub fn parse_catalog_flow(
flow_name: &str,
query_ctx: &QueryContextRef,
) -> Result<(String, String)> {
let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect())
.map_err(BoxedError::new)
.context(ExecuteSnafu)?;
let (catalog_name, flow_name) = match &obj_name.0[..] {
[flow_name] => (
query_ctx.current_catalog().to_string(),
flow_name.value.clone(),
),
[catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()),
_ => {
return InvalidFuncArgsSnafu {
err_msg: format!(
"expect flow name to be <catalog>.<flow-name> or <flow-name>, actual: {}",
obj_name
),
}
.fail()
}
};
Ok((catalog_name, flow_name))
}

View File

@@ -15,6 +15,7 @@
#![feature(let_chains)]
#![feature(try_blocks)]
mod adjust_flow;
mod admin;
mod flush_flow;
mod macros;

View File

@@ -148,6 +148,17 @@ impl FunctionState {
) -> Result<api::v1::flow::FlowResponse> {
todo!()
}
async fn adjust(
&self,
_catalog: &str,
_flow: &str,
_min_run_interval_secs: u64,
_max_filter_num_per_query: usize,
_ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse> {
todo!()
}
}
Self {

View File

@@ -64,6 +64,7 @@ impl Default for FlightEncoder {
}
impl FlightEncoder {
/// Creates new [FlightEncoder] with compression disabled.
pub fn with_compression_disabled() -> Self {
let write_options = writer::IpcWriteOptions::default()
.try_with_compression(None)

View File

@@ -14,7 +14,6 @@
pub mod file;
use std::borrow::Cow;
use std::fmt::{Display, Formatter};
use std::path::{Path, PathBuf};
use std::time::Instant;
@@ -272,49 +271,6 @@ impl MetadataSnapshotManager {
Ok((filename.to_string(), num_keyvalues as u64))
}
fn format_output(key: Cow<'_, str>, value: Cow<'_, str>) -> String {
format!("{} => {}", key, value)
}
pub async fn info(
object_store: &ObjectStore,
file_path: &str,
query_str: &str,
limit: Option<usize>,
) -> Result<Vec<String>> {
let path = Path::new(file_path);
let file_name = path
.file_name()
.and_then(|s| s.to_str())
.context(InvalidFilePathSnafu { file_path })?;
let filename = FileName::try_from(file_name)?;
let data = object_store
.read(file_path)
.await
.context(ReadObjectSnafu { file_path })?;
let document = Document::from_slice(&filename.extension.format, &data.to_bytes())?;
let metadata_content = document.into_metadata_content()?.values();
let mut results = Vec::with_capacity(limit.unwrap_or(256));
for kv in metadata_content {
let key_str = String::from_utf8_lossy(&kv.key);
if let Some(prefix) = query_str.strip_suffix('*') {
if key_str.starts_with(prefix) {
let value_str = String::from_utf8_lossy(&kv.value);
results.push(Self::format_output(key_str, value_str));
}
} else if key_str == query_str {
let value_str = String::from_utf8_lossy(&kv.value);
results.push(Self::format_output(key_str, value_str));
}
if results.len() == limit.unwrap_or(usize::MAX) {
break;
}
}
Ok(results)
}
}
#[cfg(test)]

View File

@@ -111,11 +111,6 @@ impl MetadataContent {
pub fn into_iter(self) -> impl Iterator<Item = KeyValue> {
self.values.into_iter()
}
/// Returns the key-value pairs as a vector.
pub fn values(self) -> Vec<KeyValue> {
self.values
}
}
/// The key-value pair of the backup file.

View File

@@ -372,6 +372,7 @@ impl DatanodeBuilder {
opts.max_concurrent_queries,
//TODO: revaluate the hardcoded timeout on the next version of datanode concurrency limiter.
Duration::from_millis(100),
opts.grpc.flight_compression,
);
let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;

View File

@@ -50,6 +50,7 @@ use query::QueryEngineRef;
use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
use servers::grpc::region_server::RegionServerHandler;
use servers::grpc::FlightCompression;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metric_engine_consts::{
@@ -80,6 +81,7 @@ use crate::event_listener::RegionServerEventListenerRef;
#[derive(Clone)]
pub struct RegionServer {
inner: Arc<RegionServerInner>,
flight_compression: FlightCompression,
}
pub struct RegionStat {
@@ -93,6 +95,7 @@ impl RegionServer {
query_engine: QueryEngineRef,
runtime: Runtime,
event_listener: RegionServerEventListenerRef,
flight_compression: FlightCompression,
) -> Self {
Self::with_table_provider(
query_engine,
@@ -101,6 +104,7 @@ impl RegionServer {
Arc::new(DummyTableProviderFactory),
0,
Duration::from_millis(0),
flight_compression,
)
}
@@ -111,6 +115,7 @@ impl RegionServer {
table_provider_factory: TableProviderFactoryRef,
max_concurrent_queries: usize,
concurrent_query_limiter_timeout: Duration,
flight_compression: FlightCompression,
) -> Self {
Self {
inner: Arc::new(RegionServerInner::new(
@@ -123,6 +128,7 @@ impl RegionServer {
concurrent_query_limiter_timeout,
),
)),
flight_compression,
}
}
@@ -536,7 +542,11 @@ impl FlightCraft for RegionServer {
.trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
.await?;
let stream = Box::pin(FlightRecordBatchStream::new(result, tracing_context));
let stream = Box::pin(FlightRecordBatchStream::new(
result,
tracing_context,
self.flight_compression,
));
Ok(Response::new(stream))
}
}

View File

@@ -28,6 +28,7 @@ use query::dataframe::DataFrame;
use query::planner::LogicalPlanner;
use query::query_engine::{DescribeResult, QueryEngineState};
use query::{QueryEngine, QueryEngineContext};
use servers::grpc::FlightCompression;
use session::context::QueryContextRef;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
@@ -97,6 +98,7 @@ pub fn mock_region_server() -> RegionServer {
Arc::new(MockQueryEngine),
Runtime::builder().build().unwrap(),
Box::new(NoopRegionServerEventListener),
FlightCompression::default(),
)
}

View File

@@ -61,6 +61,7 @@ prost.workspace = true
query.workspace = true
rand.workspace = true
serde.workspace = true
serde_json.workspace = true
servers.workspace = true
session.workspace = true
smallvec.workspace = true

View File

@@ -18,7 +18,7 @@ use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use api::v1::flow::{
flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow,
flow_request, AdjustFlow, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow,
};
use api::v1::region::InsertRequests;
use catalog::CatalogManager;
@@ -32,6 +32,7 @@ use common_telemetry::{error, info, trace, warn};
use datatypes::value::Value;
use futures::TryStreamExt;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use session::context::QueryContextBuilder;
use snafu::{ensure, IntoError, OptionExt, ResultExt};
use store_api::storage::{RegionId, TableId};
@@ -809,6 +810,25 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
..Default::default()
})
}
Some(flow_request::Body::Adjust(AdjustFlow { flow_id, options })) => {
#[derive(Debug, Serialize, Deserialize)]
struct Options {
min_run_interval_secs: u64,
max_filter_num_per_query: usize,
}
let options: Options = serde_json::from_str(&options).with_context(|_| {
common_meta::error::DeserializeFromJsonSnafu { input: options }
})?;
self.batching_engine
.adjust_flow(
flow_id.unwrap().id as u64,
options.min_run_interval_secs,
options.max_filter_num_per_query,
)
.await
.map_err(to_meta_err(snafu::location!()))?;
Ok(Default::default())
}
other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
}
}
@@ -841,93 +861,6 @@ fn to_meta_err(
}
}
#[async_trait::async_trait]
impl common_meta::node_manager::Flownode for StreamingEngine {
async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> {
let query_ctx = request
.header
.and_then(|h| h.query_context)
.map(|ctx| ctx.into());
match request.body {
Some(flow_request::Body::Create(CreateRequest {
flow_id: Some(task_id),
source_table_ids,
sink_table_name: Some(sink_table_name),
create_if_not_exists,
expire_after,
comment,
sql,
flow_options,
or_replace,
})) => {
let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
let sink_table_name = [
sink_table_name.catalog_name,
sink_table_name.schema_name,
sink_table_name.table_name,
];
let expire_after = expire_after.map(|e| e.value);
let args = CreateFlowArgs {
flow_id: task_id.id as u64,
sink_table_name,
source_table_ids,
create_if_not_exists,
or_replace,
expire_after,
comment: Some(comment),
sql: sql.clone(),
flow_options,
query_ctx,
};
let ret = self
.create_flow(args)
.await
.map_err(BoxedError::new)
.with_context(|_| CreateFlowSnafu { sql: sql.clone() })
.map_err(to_meta_err(snafu::location!()))?;
METRIC_FLOW_TASK_COUNT.inc();
Ok(FlowResponse {
affected_flows: ret
.map(|id| greptime_proto::v1::FlowId { id: id as u32 })
.into_iter()
.collect_vec(),
..Default::default()
})
}
Some(flow_request::Body::Drop(DropRequest {
flow_id: Some(flow_id),
})) => {
self.remove_flow(flow_id.id as u64)
.await
.map_err(to_meta_err(snafu::location!()))?;
METRIC_FLOW_TASK_COUNT.dec();
Ok(Default::default())
}
Some(flow_request::Body::Flush(FlushFlow {
flow_id: Some(flow_id),
})) => {
let row = self
.flush_flow_inner(flow_id.id as u64)
.await
.map_err(to_meta_err(snafu::location!()))?;
Ok(FlowResponse {
affected_flows: vec![flow_id],
affected_rows: row as u64,
..Default::default()
})
}
other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
}
}
async fn handle_inserts(&self, request: InsertRequests) -> MetaResult<FlowResponse> {
self.handle_inserts_inner(request)
.await
.map(|_| Default::default())
.map_err(to_meta_err(snafu::location!()))
}
}
impl FlowEngine for StreamingEngine {
async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
self.create_flow_inner(args).await

View File

@@ -388,6 +388,20 @@ impl BatchingEngine {
pub async fn flow_exist_inner(&self, flow_id: FlowId) -> bool {
self.tasks.read().await.contains_key(&flow_id)
}
pub async fn adjust_flow(
&self,
flow_id: FlowId,
min_run_interval_secs: u64,
max_filter_num_per_query: usize,
) -> Result<(), Error> {
let task = self.tasks.read().await.get(&flow_id).cloned();
let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?;
debug!("Adjusting flow {flow_id} with min_run_interval_secs={} and max_filter_num_per_query={}", min_run_interval_secs, max_filter_num_per_query);
task.adjust(min_run_interval_secs, max_filter_num_per_query);
Ok(())
}
}
impl FlowEngine for BatchingEngine {

View File

@@ -14,8 +14,9 @@
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
use std::sync::{Arc, Weak};
use std::time::SystemTime;
use std::collections::HashMap;
use std::sync::{Arc, Mutex, Weak};
use std::time::{Duration, Instant, SystemTime};
use api::v1::greptime_request::Request;
use api::v1::CreateTableExpr;
@@ -26,20 +27,21 @@ use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
use common_meta::peer::Peer;
use common_meta::rpc::store::RangeRequest;
use common_query::Output;
use common_telemetry::warn;
use common_telemetry::{debug, warn};
use itertools::Itertools;
use meta_client::client::MetaClient;
use rand::rng;
use rand::seq::SliceRandom;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
use crate::batching_mode::task::BatchingTask;
use crate::batching_mode::{
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT,
GRPC_MAX_RETRIES,
};
use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
use crate::{Error, FlowAuthHeader};
use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD;
use crate::{Error, FlowAuthHeader, FlowId};
/// Just like [`GrpcQueryHandler`] but use BoxedError
///
@@ -74,6 +76,105 @@ impl<
type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
/// Statistics about running query on this frontend from flownode
#[derive(Debug, Default, Clone)]
struct FrontendStat {
/// The query for flow id has been running since this timestamp
since: HashMap<FlowId, Instant>,
/// The average query time for each flow id
/// This is used to calculate the average query time for each flow id
past_query_avg: HashMap<FlowId, (usize, Duration)>,
}
#[derive(Debug, Default, Clone)]
pub struct FrontendStats {
/// The statistics for each flow id
stats: Arc<Mutex<HashMap<String, FrontendStat>>>,
}
impl FrontendStats {
pub fn observe(&self, frontend_addr: &str, flow_id: FlowId) -> FrontendStatsGuard {
let mut stats = self.stats.lock().expect("Failed to lock frontend stats");
let stat = stats.entry(frontend_addr.to_string()).or_default();
stat.since.insert(flow_id, Instant::now());
FrontendStatsGuard {
stats: self.stats.clone(),
frontend_addr: frontend_addr.to_string(),
cur: flow_id,
}
}
/// return frontend addrs sorted by load, from lightest to heaviest
/// The load is calculated as the total average query time for each flow id plus running query's total running time elapsed
pub fn sort_by_load(&self) -> Vec<String> {
let stats = self.stats.lock().expect("Failed to lock frontend stats");
let fe_load_factor = stats
.iter()
.map(|(node_addr, stat)| {
// total expected avg running time for all currently running queries
let total_expect_avg_run_time = stat
.since
.keys()
.map(|f| {
let (count, total_duration) =
stat.past_query_avg.get(f).unwrap_or(&(0, Duration::ZERO));
if *count == 0 {
0.0
} else {
total_duration.as_secs_f64() / *count as f64
}
})
.sum::<f64>();
let total_cur_running_time = stat
.since
.values()
.map(|since| since.elapsed().as_secs_f64())
.sum::<f64>();
(
node_addr.to_string(),
total_expect_avg_run_time + total_cur_running_time,
)
})
.sorted_by(|(_, load_a), (_, load_b)| {
load_a
.partial_cmp(load_b)
.unwrap_or(std::cmp::Ordering::Equal)
})
.collect::<Vec<_>>();
debug!("Frontend load factor: {:?}", fe_load_factor);
for (node_addr, load) in &fe_load_factor {
METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD
.with_label_values(&[&node_addr.to_string()])
.observe(*load);
}
fe_load_factor
.into_iter()
.map(|(addr, _)| addr)
.collect::<Vec<_>>()
}
}
pub struct FrontendStatsGuard {
stats: Arc<Mutex<HashMap<String, FrontendStat>>>,
frontend_addr: String,
cur: FlowId,
}
impl Drop for FrontendStatsGuard {
fn drop(&mut self) {
let mut stats = self.stats.lock().expect("Failed to lock frontend stats");
if let Some(stat) = stats.get_mut(&self.frontend_addr) {
if let Some(since) = stat.since.remove(&self.cur) {
let elapsed = since.elapsed();
let (count, total_duration) = stat.past_query_avg.entry(self.cur).or_default();
*count += 1;
*total_duration += elapsed;
}
}
}
}
/// A simple frontend client able to execute sql using grpc protocol
///
/// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode
@@ -83,6 +184,7 @@ pub enum FrontendClient {
meta_client: Arc<MetaClient>,
chnl_mgr: ChannelManager,
auth: Option<FlowAuthHeader>,
fe_stats: FrontendStats,
},
Standalone {
/// for the sake of simplicity still use grpc even in standalone mode
@@ -114,6 +216,7 @@ impl FrontendClient {
ChannelManager::with_config(cfg)
},
auth,
fe_stats: Default::default(),
}
}
@@ -183,7 +286,7 @@ impl FrontendClient {
/// Get the frontend with recent enough(less than 1 minute from now) `last_activity_ts`
/// and is able to process query
async fn get_random_active_frontend(
pub(crate) async fn get_random_active_frontend(
&self,
catalog: &str,
schema: &str,
@@ -192,6 +295,7 @@ impl FrontendClient {
meta_client: _,
chnl_mgr,
auth,
fe_stats,
} = self
else {
return UnexpectedSnafu {
@@ -208,8 +312,21 @@ impl FrontendClient {
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
// shuffle the frontends to avoid always pick the same one
frontends.shuffle(&mut rng());
let node_addrs_by_load = fe_stats.sort_by_load();
// index+1 to load order asc, so that the lightest node has load 1 and non-existent node has load 0
let addr2load = node_addrs_by_load
.iter()
.enumerate()
.map(|(i, id)| (id.clone(), i + 1))
.collect::<HashMap<_, _>>();
// sort frontends by load, from lightest to heaviest
frontends.sort_by(|(_, a), (_, b)| {
// if not even in stats, treat as 0 load since never been queried
let load_a = addr2load.get(&a.peer.addr).unwrap_or(&0);
let load_b = addr2load.get(&b.peer.addr).unwrap_or(&0);
load_a.cmp(load_b)
});
debug!("Frontend nodes sorted by load: {:?}", frontends);
// found node with maximum last_activity_ts
for (_, node_info) in frontends
@@ -257,6 +374,7 @@ impl FrontendClient {
create: CreateTableExpr,
catalog: &str,
schema: &str,
task: Option<&BatchingTask>,
) -> Result<u32, Error> {
self.handle(
Request::Ddl(api::v1::DdlRequest {
@@ -264,7 +382,8 @@ impl FrontendClient {
}),
catalog,
schema,
&mut None,
None,
task,
)
.await
}
@@ -275,15 +394,31 @@ impl FrontendClient {
req: api::v1::greptime_request::Request,
catalog: &str,
schema: &str,
peer_desc: &mut Option<PeerDesc>,
use_peer: Option<Peer>,
task: Option<&BatchingTask>,
) -> Result<u32, Error> {
match self {
FrontendClient::Distributed { .. } => {
let db = self.get_random_active_frontend(catalog, schema).await?;
FrontendClient::Distributed {
fe_stats, chnl_mgr, ..
} => {
let db = if let Some(peer) = use_peer {
DatabaseWithPeer::new(
Database::new(
catalog,
schema,
Client::with_manager_and_urls(
chnl_mgr.clone(),
vec![peer.addr.clone()],
),
),
peer,
)
} else {
self.get_random_active_frontend(catalog, schema).await?
};
*peer_desc = Some(PeerDesc::Dist {
peer: db.peer.clone(),
});
let flow_id = task.map(|t| t.config.flow_id).unwrap_or_default();
let _guard = fe_stats.observe(&db.peer.addr, flow_id);
db.database
.handle_with_retry(req.clone(), GRPC_MAX_RETRIES)

View File

@@ -32,6 +32,7 @@ use crate::batching_mode::MIN_REFRESH_DURATION;
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT,
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT,
};
use crate::{Error, FlowId};
@@ -52,6 +53,13 @@ pub struct TaskState {
pub(crate) shutdown_rx: oneshot::Receiver<()>,
/// Task handle
pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
/// Slow Query metrics update task handle
pub(crate) slow_query_metric_task: Option<tokio::task::JoinHandle<()>>,
/// min run interval in seconds
pub(crate) min_run_interval: Option<u64>,
/// max filter number per query
pub(crate) max_filter_num: Option<usize>,
}
impl TaskState {
pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
@@ -63,6 +71,9 @@ impl TaskState {
exec_state: ExecState::Idle,
shutdown_rx,
task_handle: None,
slow_query_metric_task: None,
min_run_interval: None,
max_filter_num: None,
}
}
@@ -87,20 +98,17 @@ impl TaskState {
pub fn get_next_start_query_time(
&self,
flow_id: FlowId,
time_window_size: &Option<Duration>,
_time_window_size: &Option<Duration>,
max_timeout: Option<Duration>,
) -> Instant {
let last_duration = max_timeout
let next_duration = max_timeout
.unwrap_or(self.last_query_duration)
.min(self.last_query_duration)
.max(MIN_REFRESH_DURATION);
let next_duration = time_window_size
.map(|t| {
let half = t / 2;
half.max(last_duration)
})
.unwrap_or(last_duration);
.max(
self.min_run_interval
.map(Duration::from_secs)
.unwrap_or(MIN_REFRESH_DURATION),
);
// if have dirty time window, execute immediately to clean dirty time window
if self.dirty_time_windows.windows.is_empty() {
@@ -206,47 +214,69 @@ impl DirtyTimeWindows {
// get the first `window_cnt` time windows
let max_time_range = window_size * window_cnt as i32;
let nth = {
let mut cur_time_range = chrono::Duration::zero();
let mut nth_key = None;
for (idx, (start, end)) in self.windows.iter().enumerate() {
// if time range is too long, stop
if cur_time_range > max_time_range {
nth_key = Some(*start);
break;
}
// if we have enough time windows, stop
if idx >= window_cnt {
nth_key = Some(*start);
break;
}
let mut to_be_query = BTreeMap::new();
let mut new_windows = self.windows.clone();
let mut cur_time_range = chrono::Duration::zero();
for (idx, (start, end)) in self.windows.iter().enumerate() {
let first_end = start
.add_duration(window_size.to_std().unwrap())
.context(TimeSnafu)?;
let end = end.unwrap_or(first_end);
if let Some(end) = end {
if let Some(x) = end.sub(start) {
cur_time_range += x;
}
}
// if time range is too long, stop
if cur_time_range >= max_time_range {
break;
}
nth_key
};
let first_nth = {
if let Some(nth) = nth {
let mut after = self.windows.split_off(&nth);
std::mem::swap(&mut self.windows, &mut after);
after
} else {
std::mem::take(&mut self.windows)
// if we have enough time windows, stop
if idx >= window_cnt {
break;
}
};
if let Some(x) = end.sub(start) {
if cur_time_range + x <= max_time_range {
to_be_query.insert(*start, Some(end));
new_windows.remove(start);
cur_time_range += x;
} else {
// too large a window, split it
// split at window_size * times
let surplus = max_time_range - cur_time_range;
let times = surplus.num_seconds() / window_size.num_seconds();
let split_offset = window_size * times as i32;
let split_at = start
.add_duration(split_offset.to_std().unwrap())
.context(TimeSnafu)?;
to_be_query.insert(*start, Some(split_at));
// remove the original window
new_windows.remove(start);
new_windows.insert(split_at, Some(end));
cur_time_range += split_offset;
break;
}
}
}
self.windows = new_windows;
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
.with_label_values(&[flow_id.to_string().as_str()])
.observe(first_nth.len() as f64);
.with_label_values(&[
flow_id.to_string().as_str(),
format!("{}", window_size).as_str(),
])
.observe(to_be_query.len() as f64);
let full_time_range = first_nth
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT
.with_label_values(&[
flow_id.to_string().as_str(),
format!("{}", window_size).as_str(),
])
.observe(self.windows.len() as f64);
let full_time_range = to_be_query
.iter()
.fold(chrono::Duration::zero(), |acc, (start, end)| {
if let Some(end) = end {
@@ -257,11 +287,14 @@ impl DirtyTimeWindows {
})
.num_seconds() as f64;
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE
.with_label_values(&[flow_id.to_string().as_str()])
.with_label_values(&[
flow_id.to_string().as_str(),
format!("{}", window_size).as_str(),
])
.observe(full_time_range);
let mut expr_lst = vec![];
for (start, end) in first_nth.into_iter() {
for (start, end) in to_be_query.into_iter() {
// align using time window exprs
let (start, end) = if let Some(ctx) = task_ctx {
let Some(time_window_expr) = &ctx.config.time_window_expr else {
@@ -495,6 +528,64 @@ mod test {
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
)
),
// split range
(
Vec::from_iter((0..20).map(|i|Timestamp::new_second(i*3)).chain(std::iter::once(
Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
))),
(chrono::Duration::seconds(3), None),
BTreeMap::from([
(
Timestamp::new_second(0),
Some(Timestamp::new_second(
60
)),
),
(
Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
Some(Timestamp::new_second(
60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1) + 3
)),
)]),
Some(
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
)
),
// split 2 min into 1 min
(
Vec::from_iter((0..40).map(|i|Timestamp::new_second(i*3))),
(chrono::Duration::seconds(3), None),
BTreeMap::from([
(
Timestamp::new_second(0),
Some(Timestamp::new_second(
40 * 3
)),
)]),
Some(
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
)
),
// split 3s + 1min into 3s + 57s
(
Vec::from_iter(std::iter::once(Timestamp::new_second(0)).chain((0..40).map(|i|Timestamp::new_second(20+i*3)))),
(chrono::Duration::seconds(3), None),
BTreeMap::from([
(
Timestamp::new_second(0),
Some(Timestamp::new_second(
3
)),
),(
Timestamp::new_second(20),
Some(Timestamp::new_second(
140
)),
)]),
Some(
"(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:03' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:00:20' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:17' AS TIMESTAMP))))",
)
),
// expired
(
vec![
@@ -511,6 +602,8 @@ mod test {
None
),
];
// let len = testcases.len();
// let testcases = testcases[(len - 2)..(len - 1)].to_vec();
for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
testcases
{

View File

@@ -61,7 +61,8 @@ use crate::error::{
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
};
use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT,
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
};
use crate::{Error, FlowId};
@@ -81,6 +82,14 @@ pub struct TaskConfig {
query_type: QueryType,
}
impl TaskConfig {
pub fn time_window_size(&self) -> Option<Duration> {
self.time_window_expr
.as_ref()
.and_then(|expr| *expr.time_window_size())
}
}
fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
let stmts =
ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default())
@@ -144,6 +153,12 @@ impl BatchingTask {
})
}
pub fn adjust(&self, min_run_interval_secs: u64, max_filter_num_per_query: usize) {
let mut state = self.state.write().unwrap();
state.min_run_interval = Some(min_run_interval_secs);
state.max_filter_num = Some(max_filter_num_per_query);
}
/// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set)
///
/// useful for flush_flow to flush dirty time windows range
@@ -280,7 +295,7 @@ impl BatchingTask {
let catalog = &self.config.sink_table_name[0];
let schema = &self.config.sink_table_name[1];
frontend_client
.create(expr.clone(), catalog, schema)
.create(expr.clone(), catalog, schema, Some(self))
.await?;
Ok(())
}
@@ -328,11 +343,53 @@ impl BatchingTask {
})?;
let plan = expanded_plan;
let mut peer_desc = None;
let db = frontend_client
.get_random_active_frontend(catalog, schema)
.await?;
let peer_desc = db.peer.clone();
let (tx, mut rx) = oneshot::channel();
let peer_inner = peer_desc.clone();
let window_size_pretty = format!(
"{}s",
self.config.time_window_size().unwrap_or_default().as_secs()
);
let inner_window_size_pretty = window_size_pretty.clone();
let flow_id = self.config.flow_id;
let slow_query_metric_task = tokio::task::spawn(async move {
tokio::time::sleep(SLOW_QUERY_THRESHOLD).await;
METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT
.with_label_values(&[
flow_id.to_string().as_str(),
&peer_inner.to_string(),
inner_window_size_pretty.as_str(),
])
.add(1.0);
while rx.try_recv() == Err(TryRecvError::Empty) {
// sleep for a while before next update
tokio::time::sleep(MIN_REFRESH_DURATION).await;
}
METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT
.with_label_values(&[
flow_id.to_string().as_str(),
&peer_inner.to_string(),
inner_window_size_pretty.as_str(),
])
.sub(1.0);
});
self.state.write().unwrap().slow_query_metric_task = Some(slow_query_metric_task);
let res = {
let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
.with_label_values(&[flow_id.to_string().as_str()])
.with_label_values(&[
flow_id.to_string().as_str(),
format!(
"{}s",
self.config.time_window_size().unwrap_or_default().as_secs()
)
.as_str(),
])
.start_timer();
// hack and special handling the insert logical plan
@@ -361,10 +418,12 @@ impl BatchingTask {
};
frontend_client
.handle(req, catalog, schema, &mut peer_desc)
.handle(req, catalog, schema, Some(db.peer), Some(self))
.await
};
// signaling the slow query metric task to stop
let _ = tx.send(());
let elapsed = instant.elapsed();
if let Ok(affected_rows) = &res {
debug!(
@@ -387,7 +446,12 @@ impl BatchingTask {
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
.with_label_values(&[
flow_id.to_string().as_str(),
&peer_desc.unwrap_or_default().to_string(),
&peer_desc.to_string(),
format!(
"{}s",
self.config.time_window_size().unwrap_or_default().as_secs()
)
.as_str(),
])
.observe(elapsed.as_secs_f64());
}
@@ -580,19 +644,20 @@ impl BatchingTask {
),
})?;
let expr = self
.state
.write()
.unwrap()
.dirty_time_windows
.gen_filter_exprs(
let expr = {
let mut state = self.state.write().unwrap();
let max_window_cnt = state
.max_filter_num
.unwrap_or(DirtyTimeWindows::MAX_FILTER_NUM);
state.dirty_time_windows.gen_filter_exprs(
&col_name,
Some(l),
window_size,
DirtyTimeWindows::MAX_FILTER_NUM,
max_window_cnt,
self.config.flow_id,
Some(self),
)?;
)?
};
debug!(
"Flow id={:?}, Generated filter expr: {:?}",

View File

@@ -31,22 +31,37 @@ lazy_static! {
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME: HistogramVec = register_histogram_vec!(
"greptime_flow_batching_engine_query_time_secs",
"flow batching engine query time(seconds)",
&["flow_id"],
&["flow_id", "time_window_granularity"],
vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!(
"greptime_flow_batching_engine_slow_query_secs",
"flow batching engine slow query(seconds)",
&["flow_id", "peer"],
"flow batching engine slow query(seconds), updated after query finished",
&["flow_id", "peer", "time_window_granularity"],
vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT: GaugeVec =
register_gauge_vec!(
"greptime_flow_batching_engine_real_time_slow_query_number",
"flow batching engine real time slow query number, updated in real time",
&["flow_id", "peer", "time_window_granularity"],
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT: HistogramVec =
register_histogram_vec!(
"greptime_flow_batching_engine_stalled_query_window_cnt",
"flow batching engine stalled query time window count",
&["flow_id", "time_window_granularity"],
vec![0.0, 5., 10., 20., 40.]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT: HistogramVec =
register_histogram_vec!(
"greptime_flow_batching_engine_query_window_cnt",
"flow batching engine query time window count",
&["flow_id"],
&["flow_id", "time_window_granularity"],
vec![0.0, 5., 10., 20., 40.]
)
.unwrap();
@@ -54,7 +69,15 @@ lazy_static! {
register_histogram_vec!(
"greptime_flow_batching_engine_query_time_range_secs",
"flow batching engine query time range(seconds)",
&["flow_id"],
&["flow_id", "time_window_granularity"],
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD: HistogramVec =
register_histogram_vec!(
"greptime_flow_batching_engine_guess_fe_load",
"flow batching engine guessed frontend load",
&["fe_addr"],
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
)
.unwrap();

View File

@@ -154,6 +154,7 @@ where
ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()),
user_provider.clone(),
runtime,
opts.grpc.flight_compression,
);
let grpc_server = builder

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::flow::FlowRequestHeader;
use api::v1::flow::{AdjustFlow, FlowRequestHeader};
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_function::handlers::FlowServiceHandler;
@@ -22,6 +22,7 @@ use common_query::error::Result;
use common_telemetry::tracing_context::TracingContext;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use serde_json::json;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
@@ -57,9 +58,96 @@ impl FlowServiceHandler for FlowServiceOperator {
) -> Result<api::v1::flow::FlowResponse> {
self.flush_inner(catalog, flow, ctx).await
}
async fn adjust(
&self,
catalog: &str,
flow: &str,
min_run_interval_secs: u64,
max_filter_num_per_query: usize,
ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse> {
self.adjust_inner(
catalog,
flow,
min_run_interval_secs,
max_filter_num_per_query,
ctx,
)
.await
}
}
impl FlowServiceOperator {
async fn adjust_inner(
&self,
catalog: &str,
flow: &str,
min_run_interval_secs: u64,
max_filter_num_per_query: usize,
ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse> {
let id = self
.flow_metadata_manager
.flow_name_manager()
.get(catalog, flow)
.await
.map_err(BoxedError::new)
.context(common_query::error::ExecuteSnafu)?
.context(common_meta::error::FlowNotFoundSnafu {
flow_name: format!("{}.{}", catalog, flow),
})
.map_err(BoxedError::new)
.context(common_query::error::ExecuteSnafu)?
.flow_id();
let all_flownode_peers = self
.flow_metadata_manager
.flow_route_manager()
.routes(id)
.await
.map_err(BoxedError::new)
.context(common_query::error::ExecuteSnafu)?;
// order of flownodes doesn't matter here
let all_flow_nodes = FuturesUnordered::from_iter(
all_flownode_peers
.iter()
.map(|(_key, peer)| self.node_manager.flownode(peer.peer())),
)
.collect::<Vec<_>>()
.await;
// TODO(discord9): use proper type for flow options
let options = json!({
"min_run_interval_secs": min_run_interval_secs,
"max_filter_num_per_query": max_filter_num_per_query,
});
for node in all_flow_nodes {
let _res = {
use api::v1::flow::{flow_request, FlowRequest};
let flush_req = FlowRequest {
header: Some(FlowRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
query_context: Some(
common_meta::rpc::ddl::QueryContext::from(ctx.clone()).into(),
),
}),
body: Some(flow_request::Body::Adjust(AdjustFlow {
flow_id: Some(api::v1::FlowId { id }),
options: options.to_string(),
})),
};
node.handle(flush_req)
.await
.map_err(BoxedError::new)
.context(common_query::error::ExecuteSnafu)?
};
}
Ok(Default::default())
}
/// Flush the flownodes according to the flow id.
async fn flush_inner(
&self,

View File

@@ -13,17 +13,20 @@
// limitations under the License.
use clap::Parser;
use cli::{BenchTableMetadataCommand, DataCommand, MetaCommand, Tool};
use cli::{
BenchTableMetadataCommand, ExportCommand, ImportCommand, MetaRestoreCommand,
MetaSnapshotCommand, Tool,
};
use common_error::ext::BoxedError;
#[derive(Parser)]
pub enum SubCommand {
// Attach(AttachCommand),
Bench(BenchTableMetadataCommand),
#[clap(subcommand)]
Data(DataCommand),
#[clap(subcommand)]
Meta(MetaCommand),
Export(ExportCommand),
Import(ImportCommand),
MetaSnapshot(MetaSnapshotCommand),
MetaRestore(MetaRestoreCommand),
}
impl SubCommand {
@@ -31,8 +34,10 @@ impl SubCommand {
match self {
// SubCommand::Attach(cmd) => cmd.build().await,
SubCommand::Bench(cmd) => cmd.build().await,
SubCommand::Data(cmd) => cmd.build().await,
SubCommand::Meta(cmd) => cmd.build().await,
SubCommand::Export(cmd) => cmd.build().await,
SubCommand::Import(cmd) => cmd.build().await,
SubCommand::MetaSnapshot(cmd) => cmd.build().await,
SubCommand::MetaRestore(cmd) => cmd.build().await,
}
}
}

View File

@@ -34,7 +34,6 @@ pub use changes::Changes;
use datafusion::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray};
use datafusion::error::DataFusionError;
use datafusion::physical_plan::ColumnarValue;
use datafusion_expr::{AggregateUDF, ScalarUDF};
pub use deriv::Deriv;
pub use extrapolate_rate::{Delta, Increase, Rate};
pub use holt_winters::HoltWinters;
@@ -45,39 +44,6 @@ pub use quantile_aggr::{quantile_udaf, QUANTILE_NAME};
pub use resets::Resets;
pub use round::Round;
/// Range functions for PromQL.
pub fn range_funcs() -> Vec<ScalarUDF> {
vec![
IDelta::<false>::scalar_udf(),
IDelta::<true>::scalar_udf(),
Rate::scalar_udf(),
Increase::scalar_udf(),
Delta::scalar_udf(),
Resets::scalar_udf(),
Changes::scalar_udf(),
Deriv::scalar_udf(),
Round::scalar_udf(),
AvgOverTime::scalar_udf(),
MinOverTime::scalar_udf(),
MaxOverTime::scalar_udf(),
SumOverTime::scalar_udf(),
CountOverTime::scalar_udf(),
LastOverTime::scalar_udf(),
AbsentOverTime::scalar_udf(),
PresentOverTime::scalar_udf(),
StddevOverTime::scalar_udf(),
StdvarOverTime::scalar_udf(),
QuantileOverTime::scalar_udf(),
PredictLinear::scalar_udf(),
HoltWinters::scalar_udf(),
]
}
/// Aggregate functions for PromQL.
pub fn aggr_funcs() -> Vec<AggregateUDF> {
vec![quantile_udaf()]
}
/// Extracts an array from a `ColumnarValue`.
///
/// If the `ColumnarValue` is a scalar, it converts it to an array of size 1.

View File

@@ -40,8 +40,8 @@ pub struct QuantileAccumulator {
/// Create a quantile `AggregateUDF` for PromQL quantile operator,
/// which calculates φ-quantile (0 ≤ φ ≤ 1) over dimensions
pub fn quantile_udaf() -> AggregateUDF {
create_udaf(
pub fn quantile_udaf() -> Arc<AggregateUDF> {
Arc::new(create_udaf(
QUANTILE_NAME,
// Input type: (φ, values)
vec![DataType::Float64, DataType::Float64],
@@ -63,7 +63,7 @@ pub fn quantile_udaf() -> AggregateUDF {
)]
.into(),
)]),
)
))
}
impl QuantileAccumulator {

View File

@@ -1948,7 +1948,7 @@ impl PromPlanner {
token::T_QUANTILE => {
let q = Self::get_param_value_as_f64(op, param)?;
non_col_args.push(lit(q));
Arc::new(quantile_udaf())
quantile_udaf()
}
token::T_AVG => avg_udaf(),
token::T_COUNT_VALUES | token::T_COUNT => count_udaf(),
@@ -2444,7 +2444,7 @@ impl PromPlanner {
LogicalPlanBuilder::from(left)
.alias(left_table_ref)
.context(DataFusionPlanningSnafu)?
.join_detailed(
.join(
right,
JoinType::Inner,
(
@@ -2458,7 +2458,6 @@ impl PromPlanner {
.collect::<Vec<_>>(),
),
None,
true,
)
.context(DataFusionPlanningSnafu)?
.build()

View File

@@ -28,6 +28,11 @@ use datafusion::execution::{FunctionRegistry, SessionStateBuilder};
use datafusion::logical_expr::LogicalPlan;
use datafusion_expr::UserDefinedLogicalNode;
use greptime_proto::substrait_extension::MergeScan as PbMergeScan;
use promql::functions::{
quantile_udaf, AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, IDelta,
Increase, LastOverTime, MaxOverTime, MinOverTime, PresentOverTime, Rate, Resets, Round,
StddevOverTime, StdvarOverTime, SumOverTime,
};
use prost::Message;
use session::context::QueryContextRef;
use snafu::ResultExt;
@@ -112,15 +117,12 @@ impl SubstraitPlanDecoder for DefaultPlanDecoder {
let mut session_state = SessionStateBuilder::new_from_existing(self.session_state.clone())
.with_catalog_list(catalog_list)
.build();
// Substrait decoder will look up the UDFs in SessionState, so we need to register them
// Note: the query context must be passed to set the timezone
// We MUST register the UDFs after we build the session state, otherwise the UDFs will be lost
// if they have the same name as the default UDFs or their alias.
// e.g. The default UDF `to_char()` has an alias `date_format()`, if we register a UDF with the name `date_format()`
// before we build the session state, the UDF will be lost.
// Scalar functions
for func in FUNCTION_REGISTRY.scalar_functions() {
let udf = func.provide(FunctionContext {
query_ctx: self.query_ctx.clone(),
@@ -131,15 +133,6 @@ impl SubstraitPlanDecoder for DefaultPlanDecoder {
.context(RegisterUdfSnafu { name: func.name() })?;
}
// PromQL range functions
for func in promql::functions::range_funcs() {
let name = func.name().to_string();
session_state
.register_udf(Arc::new(func))
.context(RegisterUdfSnafu { name })?;
}
// Aggregate functions
for func in FUNCTION_REGISTRY.aggregate_functions() {
let name = func.name().to_string();
session_state
@@ -147,6 +140,29 @@ impl SubstraitPlanDecoder for DefaultPlanDecoder {
.context(RegisterUdfSnafu { name })?;
}
let _ = session_state.register_udaf(quantile_udaf());
let _ = session_state.register_udf(Arc::new(IDelta::<false>::scalar_udf()));
let _ = session_state.register_udf(Arc::new(IDelta::<true>::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Rate::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Increase::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Delta::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Resets::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Changes::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Deriv::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Round::scalar_udf()));
let _ = session_state.register_udf(Arc::new(AvgOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(MinOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(MaxOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(SumOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(CountOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(LastOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(AbsentOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(PresentOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(StddevOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(StdvarOverTime::scalar_udf()));
// TODO(ruihang): add quantile_over_time, predict_linear, holt_winters, round
let logical_plan = DFLogicalSubstraitConvertor
.decode(message, session_state)
.await

View File

@@ -66,6 +66,8 @@ pub struct GrpcOptions {
pub max_recv_message_size: ReadableSize,
/// Max gRPC sending(encoding) message size
pub max_send_message_size: ReadableSize,
/// Compression mode in Arrow Flight service.
pub flight_compression: FlightCompression,
pub runtime_size: usize,
#[serde(default = "Default::default")]
pub tls: TlsOption,
@@ -114,6 +116,7 @@ impl Default for GrpcOptions {
server_addr: String::new(),
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
flight_compression: FlightCompression::ArrowIpc,
runtime_size: 8,
tls: TlsOption::default(),
}
@@ -132,6 +135,30 @@ impl GrpcOptions {
}
}
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")]
pub enum FlightCompression {
/// Disable all compression in Arrow Flight service.
None,
/// Enable only transport layer compression (zstd).
Transport,
/// Enable only payload compression (lz4)
#[default]
ArrowIpc,
/// Enable all compression.
All,
}
impl FlightCompression {
pub fn transport_compression(&self) -> bool {
self == &FlightCompression::Transport || self == &FlightCompression::All
}
pub fn arrow_compression(&self) -> bool {
self == &FlightCompression::ArrowIpc || self == &FlightCompression::All
}
}
pub struct GrpcServer {
// states
shutdown_tx: Mutex<Option<Sender<()>>>,

View File

@@ -45,7 +45,7 @@ use tonic::{Request, Response, Status, Streaming};
use crate::error::{InvalidParameterSnafu, ParseJsonSnafu, Result, ToJsonSnafu};
pub use crate::grpc::flight::stream::FlightRecordBatchStream;
use crate::grpc::greptime_handler::{get_request_type, GreptimeRequestHandler};
use crate::grpc::TonicResult;
use crate::grpc::{FlightCompression, TonicResult};
use crate::http::header::constants::GREPTIME_DB_HEADER_NAME;
use crate::http::AUTHORIZATION_HEADER;
use crate::{error, hint_headers};
@@ -195,9 +195,14 @@ impl FlightCraft for GreptimeRequestHandler {
protocol = "grpc",
request_type = get_request_type(&request)
);
let flight_compression = self.flight_compression;
async {
let output = self.handle_request(request, hints).await?;
let stream = to_flight_data_stream(output, TracingContext::from_current_span());
let stream = to_flight_data_stream(
output,
TracingContext::from_current_span(),
flight_compression,
);
Ok(Response::new(stream))
}
.trace(span)
@@ -365,14 +370,16 @@ impl Stream for PutRecordBatchRequestStream {
fn to_flight_data_stream(
output: Output,
tracing_context: TracingContext,
flight_compression: FlightCompression,
) -> TonicStream<FlightData> {
match output.data {
OutputData::Stream(stream) => {
let stream = FlightRecordBatchStream::new(stream, tracing_context);
let stream = FlightRecordBatchStream::new(stream, tracing_context, flight_compression);
Box::pin(stream) as _
}
OutputData::RecordBatches(x) => {
let stream = FlightRecordBatchStream::new(x.as_stream(), tracing_context);
let stream =
FlightRecordBatchStream::new(x.as_stream(), tracing_context, flight_compression);
Box::pin(stream) as _
}
OutputData::AffectedRows(rows) => {

View File

@@ -30,6 +30,7 @@ use tokio::task::JoinHandle;
use crate::error;
use crate::grpc::flight::TonicResult;
use crate::grpc::FlightCompression;
#[pin_project(PinnedDrop)]
pub struct FlightRecordBatchStream {
@@ -41,18 +42,27 @@ pub struct FlightRecordBatchStream {
}
impl FlightRecordBatchStream {
pub fn new(recordbatches: SendableRecordBatchStream, tracing_context: TracingContext) -> Self {
pub fn new(
recordbatches: SendableRecordBatchStream,
tracing_context: TracingContext,
compression: FlightCompression,
) -> Self {
let (tx, rx) = mpsc::channel::<TonicResult<FlightMessage>>(1);
let join_handle = common_runtime::spawn_global(async move {
Self::flight_data_stream(recordbatches, tx)
.trace(tracing_context.attach(info_span!("flight_data_stream")))
.await
});
let encoder = if compression.arrow_compression() {
FlightEncoder::default()
} else {
FlightEncoder::with_compression_disabled()
};
Self {
rx,
join_handle,
done: false,
encoder: FlightEncoder::with_compression_disabled(),
encoder,
}
}
@@ -161,7 +171,11 @@ mod test {
let recordbatches = RecordBatches::try_new(schema.clone(), vec![recordbatch.clone()])
.unwrap()
.as_stream();
let mut stream = FlightRecordBatchStream::new(recordbatches, TracingContext::default());
let mut stream = FlightRecordBatchStream::new(
recordbatches,
TracingContext::default(),
FlightCompression::default(),
);
let mut raw_data = Vec::with_capacity(2);
raw_data.push(stream.next().await.unwrap().unwrap());

View File

@@ -49,7 +49,7 @@ use crate::error::{
JoinTaskSnafu, NotFoundAuthHeaderSnafu, Result, UnknownHintSnafu,
};
use crate::grpc::flight::{PutRecordBatchRequest, PutRecordBatchRequestStream};
use crate::grpc::TonicResult;
use crate::grpc::{FlightCompression, TonicResult};
use crate::metrics;
use crate::metrics::{METRIC_AUTH_FAILURE, METRIC_SERVER_GRPC_DB_REQUEST_TIMER};
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
@@ -59,6 +59,7 @@ pub struct GreptimeRequestHandler {
handler: ServerGrpcQueryHandlerRef,
user_provider: Option<UserProviderRef>,
runtime: Option<Runtime>,
pub(crate) flight_compression: FlightCompression,
}
impl GreptimeRequestHandler {
@@ -66,11 +67,13 @@ impl GreptimeRequestHandler {
handler: ServerGrpcQueryHandlerRef,
user_provider: Option<UserProviderRef>,
runtime: Option<Runtime>,
flight_compression: FlightCompression,
) -> Self {
Self {
handler,
user_provider,
runtime,
flight_compression,
}
}

View File

@@ -34,7 +34,7 @@ mod test {
use itertools::Itertools;
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::greptime_handler::GreptimeRequestHandler;
use servers::grpc::GrpcServerConfig;
use servers::grpc::{FlightCompression, GrpcServerConfig};
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
use servers::server::Server;
@@ -94,6 +94,7 @@ mod test {
)
.ok(),
Some(runtime.clone()),
FlightCompression::default(),
);
let mut grpc_server = GrpcServerBuilder::new(GrpcServerConfig::default(), runtime)
.flight_handler(Arc::new(greptime_request_handler))
@@ -139,8 +140,7 @@ mod test {
let schema = record_batches[0].schema.arrow_schema().clone();
let stream = futures::stream::once(async move {
let mut schema_data =
FlightEncoder::with_compression_disabled().encode(FlightMessage::Schema(schema));
let mut schema_data = FlightEncoder::default().encode(FlightMessage::Schema(schema));
let metadata = DoPutMetadata::new(0);
schema_data.app_metadata = serde_json::to_vec(&metadata).unwrap().into();
// first message in "DoPut" stream should carry table name in flight descriptor
@@ -155,7 +155,7 @@ mod test {
tokio_stream::iter(record_batches)
.enumerate()
.map(|(i, x)| {
let mut encoder = FlightEncoder::with_compression_disabled();
let mut encoder = FlightEncoder::default();
let message = FlightMessage::RecordBatch(x.into_df_record_batch());
let mut data = encoder.encode(message);
let metadata = DoPutMetadata::new((i + 1) as i64);

View File

@@ -42,7 +42,7 @@ use object_store::test_util::TempFolder;
use object_store::ObjectStore;
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::greptime_handler::GreptimeRequestHandler;
use servers::grpc::{GrpcOptions, GrpcServer, GrpcServerConfig};
use servers::grpc::{FlightCompression, GrpcOptions, GrpcServer, GrpcServerConfig};
use servers::http::{HttpOptions, HttpServerBuilder, PromValidationMode};
use servers::metrics_handler::MetricsHandler;
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
@@ -585,6 +585,7 @@ pub async fn setup_grpc_server_with(
ServerGrpcQueryHandlerAdapter::arc(fe_instance_ref.clone()),
user_provider.clone(),
Some(runtime.clone()),
FlightCompression::default(),
);
let flight_handler = Arc::new(greptime_request_handler.clone());

View File

@@ -1025,6 +1025,7 @@ bind_addr = "127.0.0.1:4001"
server_addr = "127.0.0.1:4001"
max_recv_message_size = "512MiB"
max_send_message_size = "512MiB"
flight_compression = "arrow_ipc"
runtime_size = 8
[grpc.tls]

View File

@@ -675,16 +675,11 @@ insert into cache_miss_with_null_label values
Affected Rows: 4
-- SQLNESS SORT_RESULT 3 1
-- null!=null, so it will returns the empty set.
tql eval (3, 4, '1s') cache_hit_with_null_label / (cache_miss_with_null_label + cache_hit_with_null_label);
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
| job | null_label | ts | lhs.greptime_value / rhs.cache_miss_with_null_label.greptime_value + cache_hit_with_null_label.greptime_value |
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
| read | | 1970-01-01T00:00:03 | 0.5 |
| read | | 1970-01-01T00:00:04 | 0.75 |
| write | | 1970-01-01T00:00:03 | 0.5 |
| write | | 1970-01-01T00:00:04 | 0.6666666666666666 |
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
++
++
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') cache_hit_with_null_label / ignoring(null_label) (cache_miss_with_null_label + ignoring(null_label) cache_hit_with_null_label);

View File

@@ -325,6 +325,7 @@ insert into cache_miss_with_null_label values
(4000, "write", null, 2.0);
-- SQLNESS SORT_RESULT 3 1
-- null!=null, so it will returns the empty set.
tql eval (3, 4, '1s') cache_hit_with_null_label / (cache_miss_with_null_label + cache_hit_with_null_label);
-- SQLNESS SORT_RESULT 3 1