mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 05:42:57 +00:00
Compare commits
10 Commits
flow/add_a
...
flow/admin
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0c392da638 | ||
|
|
9722482043 | ||
|
|
b5c185ed59 | ||
|
|
9df5f94662 | ||
|
|
838d3ab04e | ||
|
|
12648f388a | ||
|
|
2979aa048e | ||
|
|
74222c3070 | ||
|
|
0311db3089 | ||
|
|
e434294a0c |
@@ -59,7 +59,7 @@ runs:
|
||||
--set base.podTemplate.main.resources.requests.cpu=50m \
|
||||
--set base.podTemplate.main.resources.requests.memory=256Mi \
|
||||
--set base.podTemplate.main.resources.limits.cpu=2000m \
|
||||
--set base.podTemplate.main.resources.limits.memory=2Gi \
|
||||
--set base.podTemplate.main.resources.limits.memory=3Gi \
|
||||
--set frontend.replicas=${{ inputs.frontend-replicas }} \
|
||||
--set datanode.replicas=${{ inputs.datanode-replicas }} \
|
||||
--set meta.replicas=${{ inputs.meta-replicas }} \
|
||||
|
||||
15
.github/workflows/develop.yml
vendored
15
.github/workflows/develop.yml
vendored
@@ -250,6 +250,11 @@ jobs:
|
||||
name: unstable-fuzz-logs
|
||||
path: /tmp/unstable-greptime/
|
||||
retention-days: 3
|
||||
- name: Describe pods
|
||||
if: failure()
|
||||
shell: bash
|
||||
run: |
|
||||
kubectl describe pod -n my-greptimedb
|
||||
|
||||
build-greptime-ci:
|
||||
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
|
||||
@@ -405,6 +410,11 @@ jobs:
|
||||
shell: bash
|
||||
run: |
|
||||
kubectl describe nodes
|
||||
- name: Describe pod
|
||||
if: failure()
|
||||
shell: bash
|
||||
run: |
|
||||
kubectl describe pod -n my-greptimedb
|
||||
- name: Export kind logs
|
||||
if: failure()
|
||||
shell: bash
|
||||
@@ -554,6 +564,11 @@ jobs:
|
||||
shell: bash
|
||||
run: |
|
||||
kubectl describe nodes
|
||||
- name: Describe pods
|
||||
if: failure()
|
||||
shell: bash
|
||||
run: |
|
||||
kubectl describe pod -n my-greptimedb
|
||||
- name: Export kind logs
|
||||
if: failure()
|
||||
shell: bash
|
||||
|
||||
22
Cargo.lock
generated
22
Cargo.lock
generated
@@ -4511,9 +4511,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "flate2"
|
||||
version = "1.1.1"
|
||||
version = "1.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7ced92e76e966ca2fd84c8f7aa01a4aea65b0eb6648d72f7c8f3e2764a67fece"
|
||||
checksum = "4a3d7db9596fecd151c5f638c0ee5d5bd487b6e0ea232e5dc96d5250f6f94b1d"
|
||||
dependencies = [
|
||||
"crc32fast",
|
||||
"libz-rs-sys",
|
||||
@@ -5133,7 +5133,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=454c52634c3bac27de10bf0d85d5533eed1cf03f#454c52634c3bac27de10bf0d85d5533eed1cf03f"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=2dca1dc67862d7b410838aef81232274c019b3f6#2dca1dc67862d7b410838aef81232274c019b3f6"
|
||||
dependencies = [
|
||||
"prost 0.13.5",
|
||||
"serde",
|
||||
@@ -5146,9 +5146,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "grok"
|
||||
version = "2.0.0"
|
||||
version = "2.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "273797968160270573071022613fc4aa28b91fe68f3eef6c96a1b2a1947ddfbd"
|
||||
checksum = "6c52724b609896f661a3f4641dd3a44dc602958ef615857c12d00756b4e9355b"
|
||||
dependencies = [
|
||||
"glob",
|
||||
"onig",
|
||||
@@ -6716,9 +6716,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libz-rs-sys"
|
||||
version = "0.5.0"
|
||||
version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6489ca9bd760fe9642d7644e827b0c9add07df89857b0416ee15c1cc1a3b8c5a"
|
||||
checksum = "172a788537a2221661b480fee8dc5f96c580eb34fa88764d3205dc356c7e4221"
|
||||
dependencies = [
|
||||
"zlib-rs",
|
||||
]
|
||||
@@ -9664,9 +9664,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "psl"
|
||||
version = "2.1.112"
|
||||
version = "2.1.119"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1c6b4c497a0c6bfb466f75167c728b1a861b0cdc39de9c35b877208a270a9590"
|
||||
checksum = "d0e49aa528239f2ca13ad87387977c208e59c3fb8c437609f95f1b3898ec6ef1"
|
||||
dependencies = [
|
||||
"psl-types",
|
||||
]
|
||||
@@ -14701,9 +14701,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "zlib-rs"
|
||||
version = "0.5.0"
|
||||
version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "868b928d7949e09af2f6086dfc1e01936064cc7a819253bce650d4e2a2d63ba8"
|
||||
checksum = "626bd9fa9734751fc50d6060752170984d7053f5a39061f524cda68023d4db8a"
|
||||
|
||||
[[package]]
|
||||
name = "zstd"
|
||||
|
||||
@@ -133,7 +133,7 @@ etcd-client = "0.14"
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "454c52634c3bac27de10bf0d85d5533eed1cf03f" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2dca1dc67862d7b410838aef81232274c019b3f6" }
|
||||
hex = "0.4"
|
||||
http = "1"
|
||||
humantime = "2.1"
|
||||
|
||||
@@ -195,13 +195,13 @@
|
||||
| `slow_query.record_type` | String | Unset | The record type of slow queries. It can be `system_table` or `log`. |
|
||||
| `slow_query.threshold` | String | Unset | The threshold of slow query. |
|
||||
| `slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. |
|
||||
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics` | -- | -- | The standalone can export its metrics and send to Prometheus compatible service (e.g. `greptimedb`) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
|
||||
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
|
||||
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommended to collect metrics generated by itself<br/>You must create the database before enabling it. |
|
||||
| `export_metrics.self_import.db` | String | Unset | -- |
|
||||
| `export_metrics.remote_write` | -- | -- | -- |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
|
||||
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
|
||||
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
|
||||
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
|
||||
@@ -298,13 +298,11 @@
|
||||
| `slow_query.threshold` | String | `30s` | The threshold of slow query. It can be human readable time string, for example: `10s`, `100ms`, `1s`. |
|
||||
| `slow_query.sample_ratio` | Float | `1.0` | The sampling ratio of slow query log. The value should be in the range of (0, 1]. For example, `0.1` means 10% of the slow queries will be logged and `1.0` means all slow queries will be logged. |
|
||||
| `slow_query.ttl` | String | `30d` | The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`. |
|
||||
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics` | -- | -- | The frontend can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
|
||||
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
|
||||
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
|
||||
| `export_metrics.self_import.db` | String | Unset | -- |
|
||||
| `export_metrics.remote_write` | -- | -- | -- |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
|
||||
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
|
||||
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
|
||||
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
|
||||
@@ -315,11 +313,9 @@
|
||||
| Key | Type | Default | Descriptions |
|
||||
| --- | -----| ------- | ----------- |
|
||||
| `data_home` | String | `./greptimedb_data` | The working home directory. |
|
||||
| `bind_addr` | String | `127.0.0.1:3002` | The bind address of metasrv. |
|
||||
| `server_addr` | String | `127.0.0.1:3002` | The communication server address for the frontend and datanode to connect to metasrv.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `bind_addr`. |
|
||||
| `store_addrs` | Array | -- | Store server address default to etcd store.<br/>For postgres store, the format is:<br/>"password=password dbname=postgres user=postgres host=localhost port=5432"<br/>For etcd store, the format is:<br/>"127.0.0.1:2379" |
|
||||
| `store_key_prefix` | String | `""` | If it's not empty, the metasrv will store all data with this key prefix. |
|
||||
| `backend` | String | `etcd_store` | The datastore for meta server.<br/>Available values:<br/>- `etcd_store` (default value)<br/>- `memory_store`<br/>- `postgres_store` |
|
||||
| `backend` | String | `etcd_store` | The datastore for meta server.<br/>Available values:<br/>- `etcd_store` (default value)<br/>- `memory_store`<br/>- `postgres_store`<br/>- `mysql_store` |
|
||||
| `meta_table_name` | String | `greptime_metakv` | Table name in RDS to store metadata. Effect when using a RDS kvbackend.<br/>**Only used when backend is `postgres_store`.** |
|
||||
| `meta_election_lock_id` | Integer | `1` | Advisory lock id in PostgreSQL for election. Effect when using PostgreSQL as kvbackend<br/>Only used when backend is `postgres_store`. |
|
||||
| `selector` | String | `round_robin` | Datanode selector type.<br/>- `round_robin` (default value)<br/>- `lease_based`<br/>- `load_based`<br/>For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". |
|
||||
@@ -331,6 +327,12 @@
|
||||
| `runtime` | -- | -- | The runtime options. |
|
||||
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
|
||||
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
|
||||
| `grpc` | -- | -- | The gRPC server options. |
|
||||
| `grpc.bind_addr` | String | `127.0.0.1:3002` | The address to bind the gRPC server. |
|
||||
| `grpc.server_addr` | String | `127.0.0.1:3002` | The communication server address for the frontend and datanode to connect to metasrv.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `bind_addr`. |
|
||||
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
|
||||
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
|
||||
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
|
||||
| `http` | -- | -- | The HTTP server options. |
|
||||
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
|
||||
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
|
||||
@@ -372,13 +374,11 @@
|
||||
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
|
||||
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
|
||||
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
|
||||
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics` | -- | -- | The metasrv can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
|
||||
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
|
||||
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
|
||||
| `export_metrics.self_import.db` | String | Unset | -- |
|
||||
| `export_metrics.remote_write` | -- | -- | -- |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
|
||||
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
|
||||
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
|
||||
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
|
||||
@@ -536,13 +536,11 @@
|
||||
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
|
||||
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
|
||||
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
|
||||
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
|
||||
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
|
||||
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
|
||||
| `export_metrics.self_import.db` | String | Unset | -- |
|
||||
| `export_metrics.remote_write` | -- | -- | -- |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
|
||||
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
|
||||
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
|
||||
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
|
||||
|
||||
@@ -635,24 +635,16 @@ max_log_files = 720
|
||||
[logging.tracing_sample_ratio]
|
||||
default_ratio = 1.0
|
||||
|
||||
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
|
||||
## The datanode can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.
|
||||
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
|
||||
[export_metrics]
|
||||
|
||||
## whether enable export metrics.
|
||||
enable = false
|
||||
|
||||
## The interval of export metrics.
|
||||
write_interval = "30s"
|
||||
|
||||
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
|
||||
## You must create the database before enabling it.
|
||||
[export_metrics.self_import]
|
||||
## @toml2docs:none-default
|
||||
db = "greptime_metrics"
|
||||
|
||||
[export_metrics.remote_write]
|
||||
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
|
||||
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
|
||||
url = ""
|
||||
|
||||
## HTTP headers of Prometheus remote-write carry.
|
||||
|
||||
@@ -247,24 +247,16 @@ sample_ratio = 1.0
|
||||
## The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`.
|
||||
ttl = "30d"
|
||||
|
||||
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
|
||||
## The frontend can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.
|
||||
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
|
||||
[export_metrics]
|
||||
|
||||
## whether enable export metrics.
|
||||
enable = false
|
||||
|
||||
## The interval of export metrics.
|
||||
write_interval = "30s"
|
||||
|
||||
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
|
||||
## You must create the database before enabling it.
|
||||
[export_metrics.self_import]
|
||||
## @toml2docs:none-default
|
||||
db = "greptime_metrics"
|
||||
|
||||
[export_metrics.remote_write]
|
||||
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
|
||||
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
|
||||
url = ""
|
||||
|
||||
## HTTP headers of Prometheus remote-write carry.
|
||||
|
||||
@@ -1,14 +1,6 @@
|
||||
## The working home directory.
|
||||
data_home = "./greptimedb_data"
|
||||
|
||||
## The bind address of metasrv.
|
||||
bind_addr = "127.0.0.1:3002"
|
||||
|
||||
## The communication server address for the frontend and datanode to connect to metasrv.
|
||||
## If left empty or unset, the server will automatically use the IP address of the first network interface
|
||||
## on the host, with the same port number as the one specified in `bind_addr`.
|
||||
server_addr = "127.0.0.1:3002"
|
||||
|
||||
## Store server address default to etcd store.
|
||||
## For postgres store, the format is:
|
||||
## "password=password dbname=postgres user=postgres host=localhost port=5432"
|
||||
@@ -24,6 +16,7 @@ store_key_prefix = ""
|
||||
## - `etcd_store` (default value)
|
||||
## - `memory_store`
|
||||
## - `postgres_store`
|
||||
## - `mysql_store`
|
||||
backend = "etcd_store"
|
||||
|
||||
## Table name in RDS to store metadata. Effect when using a RDS kvbackend.
|
||||
@@ -67,6 +60,21 @@ node_max_idle_time = "24hours"
|
||||
## The number of threads to execute the runtime for global write operations.
|
||||
#+ compact_rt_size = 4
|
||||
|
||||
## The gRPC server options.
|
||||
[grpc]
|
||||
## The address to bind the gRPC server.
|
||||
bind_addr = "127.0.0.1:3002"
|
||||
## The communication server address for the frontend and datanode to connect to metasrv.
|
||||
## If left empty or unset, the server will automatically use the IP address of the first network interface
|
||||
## on the host, with the same port number as the one specified in `bind_addr`.
|
||||
server_addr = "127.0.0.1:3002"
|
||||
## The number of server worker threads.
|
||||
runtime_size = 8
|
||||
## The maximum receive message size for gRPC server.
|
||||
max_recv_message_size = "512MB"
|
||||
## The maximum send message size for gRPC server.
|
||||
max_send_message_size = "512MB"
|
||||
|
||||
## The HTTP server options.
|
||||
[http]
|
||||
## The address to bind the HTTP server.
|
||||
@@ -229,24 +237,16 @@ max_log_files = 720
|
||||
[logging.tracing_sample_ratio]
|
||||
default_ratio = 1.0
|
||||
|
||||
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
|
||||
## The metasrv can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.
|
||||
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
|
||||
[export_metrics]
|
||||
|
||||
## whether enable export metrics.
|
||||
enable = false
|
||||
|
||||
## The interval of export metrics.
|
||||
write_interval = "30s"
|
||||
|
||||
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
|
||||
## You must create the database before enabling it.
|
||||
[export_metrics.self_import]
|
||||
## @toml2docs:none-default
|
||||
db = "greptime_metrics"
|
||||
|
||||
[export_metrics.remote_write]
|
||||
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
|
||||
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
|
||||
url = ""
|
||||
|
||||
## HTTP headers of Prometheus remote-write carry.
|
||||
|
||||
@@ -750,13 +750,11 @@ default_ratio = 1.0
|
||||
## @toml2docs:none-default
|
||||
#+ sample_ratio = 1.0
|
||||
|
||||
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
|
||||
## The standalone can export its metrics and send to Prometheus compatible service (e.g. `greptimedb`) from remote-write API.
|
||||
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
|
||||
[export_metrics]
|
||||
|
||||
## whether enable export metrics.
|
||||
enable = false
|
||||
|
||||
## The interval of export metrics.
|
||||
write_interval = "30s"
|
||||
|
||||
@@ -767,7 +765,7 @@ write_interval = "30s"
|
||||
db = "greptime_metrics"
|
||||
|
||||
[export_metrics.remote_write]
|
||||
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
|
||||
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
|
||||
url = ""
|
||||
|
||||
## HTTP headers of Prometheus remote-write carry.
|
||||
|
||||
@@ -237,12 +237,20 @@ impl StartCommand {
|
||||
tokio_console_addr: global_options.tokio_console_addr.clone(),
|
||||
};
|
||||
|
||||
#[allow(deprecated)]
|
||||
if let Some(addr) = &self.rpc_bind_addr {
|
||||
opts.bind_addr.clone_from(addr);
|
||||
opts.grpc.bind_addr.clone_from(addr);
|
||||
} else if !opts.bind_addr.is_empty() {
|
||||
opts.grpc.bind_addr.clone_from(&opts.bind_addr);
|
||||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
if let Some(addr) = &self.rpc_server_addr {
|
||||
opts.server_addr.clone_from(addr);
|
||||
opts.grpc.server_addr.clone_from(addr);
|
||||
} else if !opts.server_addr.is_empty() {
|
||||
opts.grpc.server_addr.clone_from(&opts.server_addr);
|
||||
}
|
||||
|
||||
if let Some(addrs) = &self.store_addrs {
|
||||
@@ -319,7 +327,7 @@ impl StartCommand {
|
||||
|
||||
let plugin_opts = opts.plugins;
|
||||
let mut opts = opts.component;
|
||||
opts.detect_server_addr();
|
||||
opts.grpc.detect_server_addr();
|
||||
|
||||
info!("Metasrv options: {:#?}", opts);
|
||||
|
||||
@@ -363,7 +371,7 @@ mod tests {
|
||||
};
|
||||
|
||||
let options = cmd.load_options(&Default::default()).unwrap().component;
|
||||
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
|
||||
assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr);
|
||||
assert_eq!(vec!["127.0.0.1:2380".to_string()], options.store_addrs);
|
||||
assert_eq!(SelectorType::LoadBased, options.selector);
|
||||
}
|
||||
@@ -396,8 +404,8 @@ mod tests {
|
||||
};
|
||||
|
||||
let options = cmd.load_options(&Default::default()).unwrap().component;
|
||||
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
|
||||
assert_eq!("127.0.0.1:3002".to_string(), options.server_addr);
|
||||
assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr);
|
||||
assert_eq!("127.0.0.1:3002".to_string(), options.grpc.server_addr);
|
||||
assert_eq!(vec!["127.0.0.1:2379".to_string()], options.store_addrs);
|
||||
assert_eq!(SelectorType::LeaseBased, options.selector);
|
||||
assert_eq!("debug", options.logging.level.as_ref().unwrap());
|
||||
@@ -509,10 +517,10 @@ mod tests {
|
||||
let opts = command.load_options(&Default::default()).unwrap().component;
|
||||
|
||||
// Should be read from env, env > default values.
|
||||
assert_eq!(opts.bind_addr, "127.0.0.1:14002");
|
||||
assert_eq!(opts.grpc.bind_addr, "127.0.0.1:14002");
|
||||
|
||||
// Should be read from config file, config file > env > default values.
|
||||
assert_eq!(opts.server_addr, "127.0.0.1:3002");
|
||||
assert_eq!(opts.grpc.server_addr, "127.0.0.1:3002");
|
||||
|
||||
// Should be read from cli, cli > config file > env > default values.
|
||||
assert_eq!(opts.http.addr, "127.0.0.1:14000");
|
||||
|
||||
@@ -95,7 +95,7 @@ fn test_load_datanode_example_config() {
|
||||
..Default::default()
|
||||
},
|
||||
export_metrics: ExportMetricsOption {
|
||||
self_import: Some(Default::default()),
|
||||
self_import: None,
|
||||
remote_write: Some(Default::default()),
|
||||
..Default::default()
|
||||
},
|
||||
@@ -148,7 +148,7 @@ fn test_load_frontend_example_config() {
|
||||
},
|
||||
},
|
||||
export_metrics: ExportMetricsOption {
|
||||
self_import: Some(Default::default()),
|
||||
self_import: None,
|
||||
remote_write: Some(Default::default()),
|
||||
..Default::default()
|
||||
},
|
||||
@@ -176,7 +176,11 @@ fn test_load_metasrv_example_config() {
|
||||
component: MetasrvOptions {
|
||||
selector: SelectorType::default(),
|
||||
data_home: DEFAULT_DATA_HOME.to_string(),
|
||||
server_addr: "127.0.0.1:3002".to_string(),
|
||||
grpc: GrpcOptions {
|
||||
bind_addr: "127.0.0.1:3002".to_string(),
|
||||
server_addr: "127.0.0.1:3002".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
logging: LoggingOptions {
|
||||
dir: Path::new(DEFAULT_DATA_HOME)
|
||||
.join(DEFAULT_LOGGING_DIR)
|
||||
@@ -195,7 +199,7 @@ fn test_load_metasrv_example_config() {
|
||||
},
|
||||
},
|
||||
export_metrics: ExportMetricsOption {
|
||||
self_import: Some(Default::default()),
|
||||
self_import: None,
|
||||
remote_write: Some(Default::default()),
|
||||
..Default::default()
|
||||
},
|
||||
|
||||
90
src/common/function/src/adjust_flow.rs
Normal file
90
src/common/function/src/adjust_flow.rs
Normal file
@@ -0,0 +1,90 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_macro::admin_fn;
|
||||
use common_query::error::{
|
||||
InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
|
||||
};
|
||||
use common_query::prelude::Signature;
|
||||
use datafusion::logical_expr::Volatility;
|
||||
use datatypes::value::{Value, ValueRef};
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::ensure;
|
||||
use store_api::storage::ConcreteDataType;
|
||||
|
||||
use crate::handlers::FlowServiceHandlerRef;
|
||||
use crate::helper::parse_catalog_flow;
|
||||
|
||||
fn adjust_signature() -> Signature {
|
||||
Signature::exact(
|
||||
vec![
|
||||
ConcreteDataType::string_datatype(), // flow name
|
||||
ConcreteDataType::uint64_datatype(), // min_run_interval in seconds
|
||||
ConcreteDataType::uint64_datatype(), // max filter number per query
|
||||
],
|
||||
Volatility::Immutable,
|
||||
)
|
||||
}
|
||||
|
||||
#[admin_fn(
|
||||
name = AdjustFlowFunction,
|
||||
display_name = adjust_flow,
|
||||
sig_fn = adjust_signature,
|
||||
ret = uint64
|
||||
)]
|
||||
pub(crate) async fn adjust_flow(
|
||||
flow_service_handler: &FlowServiceHandlerRef,
|
||||
query_ctx: &QueryContextRef,
|
||||
params: &[ValueRef<'_>],
|
||||
) -> Result<Value> {
|
||||
ensure!(
|
||||
params.len() == 3,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect 3, have: {}",
|
||||
params.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let (flow_name, min_run_interval, max_filter_num) = match (params[0], params[1], params[2]) {
|
||||
(
|
||||
ValueRef::String(flow_name),
|
||||
ValueRef::UInt64(min_run_interval),
|
||||
ValueRef::UInt64(max_filter_num),
|
||||
) => (flow_name, min_run_interval, max_filter_num),
|
||||
_ => {
|
||||
return UnsupportedInputDataTypeSnafu {
|
||||
function: "adjust_flow",
|
||||
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
};
|
||||
|
||||
let (catalog_name, flow_name) = parse_catalog_flow(flow_name, query_ctx)?;
|
||||
|
||||
let res = flow_service_handler
|
||||
.adjust(
|
||||
&catalog_name,
|
||||
&flow_name,
|
||||
min_run_interval,
|
||||
max_filter_num as usize,
|
||||
query_ctx.clone(),
|
||||
)
|
||||
.await?;
|
||||
let affected_rows = res.affected_rows;
|
||||
|
||||
Ok(Value::from(affected_rows))
|
||||
}
|
||||
@@ -26,6 +26,7 @@ use flush_compact_table::{CompactTableFunction, FlushTableFunction};
|
||||
use migrate_region::MigrateRegionFunction;
|
||||
use remove_region_follower::RemoveRegionFollowerFunction;
|
||||
|
||||
use crate::adjust_flow::AdjustFlowFunction;
|
||||
use crate::flush_flow::FlushFlowFunction;
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
|
||||
@@ -43,5 +44,6 @@ impl AdminFunction {
|
||||
registry.register_async(Arc::new(FlushTableFunction));
|
||||
registry.register_async(Arc::new(CompactTableFunction));
|
||||
registry.register_async(Arc::new(FlushFlowFunction));
|
||||
registry.register_async(Arc::new(AdjustFlowFunction));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,21 +12,19 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_macro::admin_fn;
|
||||
use common_query::error::{
|
||||
ExecuteSnafu, InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result,
|
||||
UnsupportedInputDataTypeSnafu,
|
||||
InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
|
||||
};
|
||||
use common_query::prelude::Signature;
|
||||
use datafusion::logical_expr::Volatility;
|
||||
use datatypes::value::{Value, ValueRef};
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{ensure, ResultExt};
|
||||
use sql::parser::ParserContext;
|
||||
use snafu::ensure;
|
||||
use store_api::storage::ConcreteDataType;
|
||||
|
||||
use crate::handlers::FlowServiceHandlerRef;
|
||||
use crate::helper::parse_catalog_flow;
|
||||
|
||||
fn flush_signature() -> Signature {
|
||||
Signature::uniform(
|
||||
@@ -47,20 +45,6 @@ pub(crate) async fn flush_flow(
|
||||
query_ctx: &QueryContextRef,
|
||||
params: &[ValueRef<'_>],
|
||||
) -> Result<Value> {
|
||||
let (catalog_name, flow_name) = parse_flush_flow(params, query_ctx)?;
|
||||
|
||||
let res = flow_service_handler
|
||||
.flush(&catalog_name, &flow_name, query_ctx.clone())
|
||||
.await?;
|
||||
let affected_rows = res.affected_rows;
|
||||
|
||||
Ok(Value::from(affected_rows))
|
||||
}
|
||||
|
||||
fn parse_flush_flow(
|
||||
params: &[ValueRef<'_>],
|
||||
query_ctx: &QueryContextRef,
|
||||
) -> Result<(String, String)> {
|
||||
ensure!(
|
||||
params.len() == 1,
|
||||
InvalidFuncArgsSnafu {
|
||||
@@ -70,7 +54,6 @@ fn parse_flush_flow(
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let ValueRef::String(flow_name) = params[0] else {
|
||||
return UnsupportedInputDataTypeSnafu {
|
||||
function: "flush_flow",
|
||||
@@ -78,27 +61,14 @@ fn parse_flush_flow(
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect())
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExecuteSnafu)?;
|
||||
let (catalog_name, flow_name) = parse_catalog_flow(flow_name, query_ctx)?;
|
||||
|
||||
let (catalog_name, flow_name) = match &obj_name.0[..] {
|
||||
[flow_name] => (
|
||||
query_ctx.current_catalog().to_string(),
|
||||
flow_name.value.clone(),
|
||||
),
|
||||
[catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()),
|
||||
_ => {
|
||||
return InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"expect flow name to be <catalog>.<flow-name> or <flow-name>, actual: {}",
|
||||
obj_name
|
||||
),
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
};
|
||||
Ok((catalog_name, flow_name))
|
||||
let res = flow_service_handler
|
||||
.flush(&catalog_name, &flow_name, query_ctx.clone())
|
||||
.await?;
|
||||
let affected_rows = res.affected_rows;
|
||||
|
||||
Ok(Value::from(affected_rows))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -154,10 +124,7 @@ mod test {
|
||||
("catalog.flow_name", ("catalog", "flow_name")),
|
||||
];
|
||||
for (input, expected) in testcases.iter() {
|
||||
let args = vec![*input];
|
||||
let args = args.into_iter().map(ValueRef::String).collect::<Vec<_>>();
|
||||
|
||||
let result = parse_flush_flow(&args, &QueryContext::arc()).unwrap();
|
||||
let result = parse_catalog_flow(input, &QueryContext::arc()).unwrap();
|
||||
assert_eq!(*expected, (result.0.as_str(), result.1.as_str()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,6 +87,15 @@ pub trait FlowServiceHandler: Send + Sync {
|
||||
flow: &str,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<api::v1::flow::FlowResponse>;
|
||||
|
||||
async fn adjust(
|
||||
&self,
|
||||
catalog: &str,
|
||||
flow: &str,
|
||||
min_run_interval_secs: u64,
|
||||
max_filter_num_per_query: usize,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<api::v1::flow::FlowResponse>;
|
||||
}
|
||||
|
||||
pub type TableMutationHandlerRef = Arc<dyn TableMutationHandler>;
|
||||
|
||||
@@ -12,12 +12,15 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_query::error::{InvalidInputTypeSnafu, Result};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_query::error::{ExecuteSnafu, InvalidFuncArgsSnafu, InvalidInputTypeSnafu, Result};
|
||||
use common_query::prelude::{Signature, TypeSignature, Volatility};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::types::cast::cast;
|
||||
use datatypes::value::ValueRef;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::ResultExt;
|
||||
use sql::parser::ParserContext;
|
||||
|
||||
/// Create a function signature with oneof signatures of interleaving two arguments.
|
||||
pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>) -> Signature {
|
||||
@@ -43,3 +46,30 @@ pub fn cast_u64(value: &ValueRef) -> Result<Option<u64>> {
|
||||
})
|
||||
.map(|v| v.as_u64())
|
||||
}
|
||||
|
||||
pub fn parse_catalog_flow(
|
||||
flow_name: &str,
|
||||
query_ctx: &QueryContextRef,
|
||||
) -> Result<(String, String)> {
|
||||
let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect())
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExecuteSnafu)?;
|
||||
|
||||
let (catalog_name, flow_name) = match &obj_name.0[..] {
|
||||
[flow_name] => (
|
||||
query_ctx.current_catalog().to_string(),
|
||||
flow_name.value.clone(),
|
||||
),
|
||||
[catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()),
|
||||
_ => {
|
||||
return InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"expect flow name to be <catalog>.<flow-name> or <flow-name>, actual: {}",
|
||||
obj_name
|
||||
),
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
};
|
||||
Ok((catalog_name, flow_name))
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
#![feature(let_chains)]
|
||||
#![feature(try_blocks)]
|
||||
|
||||
mod adjust_flow;
|
||||
mod admin;
|
||||
mod flush_flow;
|
||||
mod macros;
|
||||
|
||||
@@ -148,6 +148,17 @@ impl FunctionState {
|
||||
) -> Result<api::v1::flow::FlowResponse> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn adjust(
|
||||
&self,
|
||||
_catalog: &str,
|
||||
_flow: &str,
|
||||
_min_run_interval_secs: u64,
|
||||
_max_filter_num_per_query: usize,
|
||||
_ctx: QueryContextRef,
|
||||
) -> Result<api::v1::flow::FlowResponse> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
Self {
|
||||
|
||||
@@ -64,18 +64,6 @@ impl Default for FlightEncoder {
|
||||
}
|
||||
|
||||
impl FlightEncoder {
|
||||
pub fn with_compression_disabled() -> Self {
|
||||
let write_options = writer::IpcWriteOptions::default()
|
||||
.try_with_compression(None)
|
||||
.unwrap();
|
||||
|
||||
Self {
|
||||
write_options,
|
||||
data_gen: writer::IpcDataGenerator::default(),
|
||||
dictionary_tracker: writer::DictionaryTracker::new(false),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn encode(&mut self, flight_message: FlightMessage) -> FlightData {
|
||||
match flight_message {
|
||||
FlightMessage::Schema(schema) => SchemaAsIpc::new(&schema, &self.write_options).into(),
|
||||
|
||||
@@ -35,6 +35,9 @@ pub const FLOWNODE_LEASE_SECS: u64 = DATANODE_LEASE_SECS;
|
||||
/// The lease seconds of metasrv leader.
|
||||
pub const META_LEASE_SECS: u64 = 5;
|
||||
|
||||
/// The keep-alive interval of the Postgres connection.
|
||||
pub const POSTGRES_KEEP_ALIVE_SECS: u64 = 30;
|
||||
|
||||
/// In a lease, there are two opportunities for renewal.
|
||||
pub const META_KEEP_ALIVE_INTERVAL_SECS: u64 = META_LEASE_SECS / 2;
|
||||
|
||||
|
||||
@@ -61,6 +61,7 @@ prost.workspace = true
|
||||
query.workspace = true
|
||||
rand.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
servers.workspace = true
|
||||
session.workspace = true
|
||||
smallvec.workspace = true
|
||||
|
||||
@@ -18,7 +18,7 @@ use std::sync::atomic::AtomicBool;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::flow::{
|
||||
flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow,
|
||||
flow_request, AdjustFlow, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow,
|
||||
};
|
||||
use api::v1::region::InsertRequests;
|
||||
use catalog::CatalogManager;
|
||||
@@ -32,6 +32,7 @@ use common_telemetry::{error, info, trace, warn};
|
||||
use datatypes::value::Value;
|
||||
use futures::TryStreamExt;
|
||||
use itertools::Itertools;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use session::context::QueryContextBuilder;
|
||||
use snafu::{ensure, IntoError, OptionExt, ResultExt};
|
||||
use store_api::storage::{RegionId, TableId};
|
||||
@@ -809,6 +810,25 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
Some(flow_request::Body::Adjust(AdjustFlow { flow_id, options })) => {
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct Options {
|
||||
min_run_interval_secs: u64,
|
||||
max_filter_num_per_query: usize,
|
||||
}
|
||||
let options: Options = serde_json::from_str(&options).with_context(|_| {
|
||||
common_meta::error::DeserializeFromJsonSnafu { input: options }
|
||||
})?;
|
||||
self.batching_engine
|
||||
.adjust_flow(
|
||||
flow_id.unwrap().id as u64,
|
||||
options.min_run_interval_secs,
|
||||
options.max_filter_num_per_query,
|
||||
)
|
||||
.await
|
||||
.map_err(to_meta_err(snafu::location!()))?;
|
||||
Ok(Default::default())
|
||||
}
|
||||
other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
|
||||
}
|
||||
}
|
||||
@@ -841,93 +861,6 @@ fn to_meta_err(
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl common_meta::node_manager::Flownode for StreamingEngine {
|
||||
async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> {
|
||||
let query_ctx = request
|
||||
.header
|
||||
.and_then(|h| h.query_context)
|
||||
.map(|ctx| ctx.into());
|
||||
match request.body {
|
||||
Some(flow_request::Body::Create(CreateRequest {
|
||||
flow_id: Some(task_id),
|
||||
source_table_ids,
|
||||
sink_table_name: Some(sink_table_name),
|
||||
create_if_not_exists,
|
||||
expire_after,
|
||||
comment,
|
||||
sql,
|
||||
flow_options,
|
||||
or_replace,
|
||||
})) => {
|
||||
let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
|
||||
let sink_table_name = [
|
||||
sink_table_name.catalog_name,
|
||||
sink_table_name.schema_name,
|
||||
sink_table_name.table_name,
|
||||
];
|
||||
let expire_after = expire_after.map(|e| e.value);
|
||||
let args = CreateFlowArgs {
|
||||
flow_id: task_id.id as u64,
|
||||
sink_table_name,
|
||||
source_table_ids,
|
||||
create_if_not_exists,
|
||||
or_replace,
|
||||
expire_after,
|
||||
comment: Some(comment),
|
||||
sql: sql.clone(),
|
||||
flow_options,
|
||||
query_ctx,
|
||||
};
|
||||
let ret = self
|
||||
.create_flow(args)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.with_context(|_| CreateFlowSnafu { sql: sql.clone() })
|
||||
.map_err(to_meta_err(snafu::location!()))?;
|
||||
METRIC_FLOW_TASK_COUNT.inc();
|
||||
Ok(FlowResponse {
|
||||
affected_flows: ret
|
||||
.map(|id| greptime_proto::v1::FlowId { id: id as u32 })
|
||||
.into_iter()
|
||||
.collect_vec(),
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
Some(flow_request::Body::Drop(DropRequest {
|
||||
flow_id: Some(flow_id),
|
||||
})) => {
|
||||
self.remove_flow(flow_id.id as u64)
|
||||
.await
|
||||
.map_err(to_meta_err(snafu::location!()))?;
|
||||
METRIC_FLOW_TASK_COUNT.dec();
|
||||
Ok(Default::default())
|
||||
}
|
||||
Some(flow_request::Body::Flush(FlushFlow {
|
||||
flow_id: Some(flow_id),
|
||||
})) => {
|
||||
let row = self
|
||||
.flush_flow_inner(flow_id.id as u64)
|
||||
.await
|
||||
.map_err(to_meta_err(snafu::location!()))?;
|
||||
Ok(FlowResponse {
|
||||
affected_flows: vec![flow_id],
|
||||
affected_rows: row as u64,
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_inserts(&self, request: InsertRequests) -> MetaResult<FlowResponse> {
|
||||
self.handle_inserts_inner(request)
|
||||
.await
|
||||
.map(|_| Default::default())
|
||||
.map_err(to_meta_err(snafu::location!()))
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowEngine for StreamingEngine {
|
||||
async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
|
||||
self.create_flow_inner(args).await
|
||||
|
||||
@@ -388,6 +388,20 @@ impl BatchingEngine {
|
||||
pub async fn flow_exist_inner(&self, flow_id: FlowId) -> bool {
|
||||
self.tasks.read().await.contains_key(&flow_id)
|
||||
}
|
||||
|
||||
pub async fn adjust_flow(
|
||||
&self,
|
||||
flow_id: FlowId,
|
||||
min_run_interval_secs: u64,
|
||||
max_filter_num_per_query: usize,
|
||||
) -> Result<(), Error> {
|
||||
let task = self.tasks.read().await.get(&flow_id).cloned();
|
||||
let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?;
|
||||
debug!("Adjusting flow {flow_id} with min_run_interval_secs={} and max_filter_num_per_query={}", min_run_interval_secs, max_filter_num_per_query);
|
||||
task.adjust(min_run_interval_secs, max_filter_num_per_query);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowEngine for BatchingEngine {
|
||||
|
||||
@@ -14,8 +14,9 @@
|
||||
|
||||
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
|
||||
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::SystemTime;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex, Weak};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use api::v1::greptime_request::Request;
|
||||
use api::v1::CreateTableExpr;
|
||||
@@ -26,20 +27,21 @@ use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::rpc::store::RangeRequest;
|
||||
use common_query::Output;
|
||||
use common_telemetry::warn;
|
||||
use common_telemetry::{debug, warn};
|
||||
use itertools::Itertools;
|
||||
use meta_client::client::MetaClient;
|
||||
use rand::rng;
|
||||
use rand::seq::SliceRandom;
|
||||
use servers::query_handler::grpc::GrpcQueryHandler;
|
||||
use session::context::{QueryContextBuilder, QueryContextRef};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::batching_mode::task::BatchingTask;
|
||||
use crate::batching_mode::{
|
||||
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT,
|
||||
GRPC_MAX_RETRIES,
|
||||
};
|
||||
use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
|
||||
use crate::{Error, FlowAuthHeader};
|
||||
use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD;
|
||||
use crate::{Error, FlowAuthHeader, FlowId};
|
||||
|
||||
/// Just like [`GrpcQueryHandler`] but use BoxedError
|
||||
///
|
||||
@@ -74,6 +76,105 @@ impl<
|
||||
|
||||
type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
|
||||
|
||||
/// Statistics about running query on this frontend from flownode
|
||||
#[derive(Debug, Default, Clone)]
|
||||
struct FrontendStat {
|
||||
/// The query for flow id has been running since this timestamp
|
||||
since: HashMap<FlowId, Instant>,
|
||||
/// The average query time for each flow id
|
||||
/// This is used to calculate the average query time for each flow id
|
||||
past_query_avg: HashMap<FlowId, (usize, Duration)>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct FrontendStats {
|
||||
/// The statistics for each flow id
|
||||
stats: Arc<Mutex<HashMap<String, FrontendStat>>>,
|
||||
}
|
||||
|
||||
impl FrontendStats {
|
||||
pub fn observe(&self, frontend_addr: &str, flow_id: FlowId) -> FrontendStatsGuard {
|
||||
let mut stats = self.stats.lock().expect("Failed to lock frontend stats");
|
||||
let stat = stats.entry(frontend_addr.to_string()).or_default();
|
||||
stat.since.insert(flow_id, Instant::now());
|
||||
|
||||
FrontendStatsGuard {
|
||||
stats: self.stats.clone(),
|
||||
frontend_addr: frontend_addr.to_string(),
|
||||
cur: flow_id,
|
||||
}
|
||||
}
|
||||
|
||||
/// return frontend addrs sorted by load, from lightest to heaviest
|
||||
/// The load is calculated as the total average query time for each flow id plus running query's total running time elapsed
|
||||
pub fn sort_by_load(&self) -> Vec<String> {
|
||||
let stats = self.stats.lock().expect("Failed to lock frontend stats");
|
||||
let fe_load_factor = stats
|
||||
.iter()
|
||||
.map(|(node_addr, stat)| {
|
||||
// total expected avg running time for all currently running queries
|
||||
let total_expect_avg_run_time = stat
|
||||
.since
|
||||
.keys()
|
||||
.map(|f| {
|
||||
let (count, total_duration) =
|
||||
stat.past_query_avg.get(f).unwrap_or(&(0, Duration::ZERO));
|
||||
if *count == 0 {
|
||||
0.0
|
||||
} else {
|
||||
total_duration.as_secs_f64() / *count as f64
|
||||
}
|
||||
})
|
||||
.sum::<f64>();
|
||||
let total_cur_running_time = stat
|
||||
.since
|
||||
.values()
|
||||
.map(|since| since.elapsed().as_secs_f64())
|
||||
.sum::<f64>();
|
||||
(
|
||||
node_addr.to_string(),
|
||||
total_expect_avg_run_time + total_cur_running_time,
|
||||
)
|
||||
})
|
||||
.sorted_by(|(_, load_a), (_, load_b)| {
|
||||
load_a
|
||||
.partial_cmp(load_b)
|
||||
.unwrap_or(std::cmp::Ordering::Equal)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
debug!("Frontend load factor: {:?}", fe_load_factor);
|
||||
for (node_addr, load) in &fe_load_factor {
|
||||
METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD
|
||||
.with_label_values(&[&node_addr.to_string()])
|
||||
.observe(*load);
|
||||
}
|
||||
fe_load_factor
|
||||
.into_iter()
|
||||
.map(|(addr, _)| addr)
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FrontendStatsGuard {
|
||||
stats: Arc<Mutex<HashMap<String, FrontendStat>>>,
|
||||
frontend_addr: String,
|
||||
cur: FlowId,
|
||||
}
|
||||
|
||||
impl Drop for FrontendStatsGuard {
|
||||
fn drop(&mut self) {
|
||||
let mut stats = self.stats.lock().expect("Failed to lock frontend stats");
|
||||
if let Some(stat) = stats.get_mut(&self.frontend_addr) {
|
||||
if let Some(since) = stat.since.remove(&self.cur) {
|
||||
let elapsed = since.elapsed();
|
||||
let (count, total_duration) = stat.past_query_avg.entry(self.cur).or_default();
|
||||
*count += 1;
|
||||
*total_duration += elapsed;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A simple frontend client able to execute sql using grpc protocol
|
||||
///
|
||||
/// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode
|
||||
@@ -83,6 +184,7 @@ pub enum FrontendClient {
|
||||
meta_client: Arc<MetaClient>,
|
||||
chnl_mgr: ChannelManager,
|
||||
auth: Option<FlowAuthHeader>,
|
||||
fe_stats: FrontendStats,
|
||||
},
|
||||
Standalone {
|
||||
/// for the sake of simplicity still use grpc even in standalone mode
|
||||
@@ -114,6 +216,7 @@ impl FrontendClient {
|
||||
ChannelManager::with_config(cfg)
|
||||
},
|
||||
auth,
|
||||
fe_stats: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -183,7 +286,7 @@ impl FrontendClient {
|
||||
|
||||
/// Get the frontend with recent enough(less than 1 minute from now) `last_activity_ts`
|
||||
/// and is able to process query
|
||||
async fn get_random_active_frontend(
|
||||
pub(crate) async fn get_random_active_frontend(
|
||||
&self,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
@@ -192,6 +295,7 @@ impl FrontendClient {
|
||||
meta_client: _,
|
||||
chnl_mgr,
|
||||
auth,
|
||||
fe_stats,
|
||||
} = self
|
||||
else {
|
||||
return UnexpectedSnafu {
|
||||
@@ -208,8 +312,21 @@ impl FrontendClient {
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis() as i64;
|
||||
// shuffle the frontends to avoid always pick the same one
|
||||
frontends.shuffle(&mut rng());
|
||||
let node_addrs_by_load = fe_stats.sort_by_load();
|
||||
// index+1 to load order asc, so that the lightest node has load 1 and non-existent node has load 0
|
||||
let addr2load = node_addrs_by_load
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, id)| (id.clone(), i + 1))
|
||||
.collect::<HashMap<_, _>>();
|
||||
// sort frontends by load, from lightest to heaviest
|
||||
frontends.sort_by(|(_, a), (_, b)| {
|
||||
// if not even in stats, treat as 0 load since never been queried
|
||||
let load_a = addr2load.get(&a.peer.addr).unwrap_or(&0);
|
||||
let load_b = addr2load.get(&b.peer.addr).unwrap_or(&0);
|
||||
load_a.cmp(load_b)
|
||||
});
|
||||
debug!("Frontend nodes sorted by load: {:?}", frontends);
|
||||
|
||||
// found node with maximum last_activity_ts
|
||||
for (_, node_info) in frontends
|
||||
@@ -257,6 +374,7 @@ impl FrontendClient {
|
||||
create: CreateTableExpr,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
task: Option<&BatchingTask>,
|
||||
) -> Result<u32, Error> {
|
||||
self.handle(
|
||||
Request::Ddl(api::v1::DdlRequest {
|
||||
@@ -264,7 +382,8 @@ impl FrontendClient {
|
||||
}),
|
||||
catalog,
|
||||
schema,
|
||||
&mut None,
|
||||
None,
|
||||
task,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -275,15 +394,31 @@ impl FrontendClient {
|
||||
req: api::v1::greptime_request::Request,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
peer_desc: &mut Option<PeerDesc>,
|
||||
use_peer: Option<Peer>,
|
||||
task: Option<&BatchingTask>,
|
||||
) -> Result<u32, Error> {
|
||||
match self {
|
||||
FrontendClient::Distributed { .. } => {
|
||||
let db = self.get_random_active_frontend(catalog, schema).await?;
|
||||
FrontendClient::Distributed {
|
||||
fe_stats, chnl_mgr, ..
|
||||
} => {
|
||||
let db = if let Some(peer) = use_peer {
|
||||
DatabaseWithPeer::new(
|
||||
Database::new(
|
||||
catalog,
|
||||
schema,
|
||||
Client::with_manager_and_urls(
|
||||
chnl_mgr.clone(),
|
||||
vec![peer.addr.clone()],
|
||||
),
|
||||
),
|
||||
peer,
|
||||
)
|
||||
} else {
|
||||
self.get_random_active_frontend(catalog, schema).await?
|
||||
};
|
||||
|
||||
*peer_desc = Some(PeerDesc::Dist {
|
||||
peer: db.peer.clone(),
|
||||
});
|
||||
let flow_id = task.map(|t| t.config.flow_id).unwrap_or_default();
|
||||
let _guard = fe_stats.observe(&db.peer.addr, flow_id);
|
||||
|
||||
db.database
|
||||
.handle_with_retry(req.clone(), GRPC_MAX_RETRIES)
|
||||
|
||||
@@ -32,6 +32,7 @@ use crate::batching_mode::MIN_REFRESH_DURATION;
|
||||
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
|
||||
use crate::metrics::{
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT,
|
||||
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT,
|
||||
};
|
||||
use crate::{Error, FlowId};
|
||||
|
||||
@@ -52,6 +53,13 @@ pub struct TaskState {
|
||||
pub(crate) shutdown_rx: oneshot::Receiver<()>,
|
||||
/// Task handle
|
||||
pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
|
||||
/// Slow Query metrics update task handle
|
||||
pub(crate) slow_query_metric_task: Option<tokio::task::JoinHandle<()>>,
|
||||
|
||||
/// min run interval in seconds
|
||||
pub(crate) min_run_interval: Option<u64>,
|
||||
/// max filter number per query
|
||||
pub(crate) max_filter_num: Option<usize>,
|
||||
}
|
||||
impl TaskState {
|
||||
pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
|
||||
@@ -63,6 +71,9 @@ impl TaskState {
|
||||
exec_state: ExecState::Idle,
|
||||
shutdown_rx,
|
||||
task_handle: None,
|
||||
slow_query_metric_task: None,
|
||||
min_run_interval: None,
|
||||
max_filter_num: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,20 +98,17 @@ impl TaskState {
|
||||
pub fn get_next_start_query_time(
|
||||
&self,
|
||||
flow_id: FlowId,
|
||||
time_window_size: &Option<Duration>,
|
||||
_time_window_size: &Option<Duration>,
|
||||
max_timeout: Option<Duration>,
|
||||
) -> Instant {
|
||||
let last_duration = max_timeout
|
||||
let next_duration = max_timeout
|
||||
.unwrap_or(self.last_query_duration)
|
||||
.min(self.last_query_duration)
|
||||
.max(MIN_REFRESH_DURATION);
|
||||
|
||||
let next_duration = time_window_size
|
||||
.map(|t| {
|
||||
let half = t / 2;
|
||||
half.max(last_duration)
|
||||
})
|
||||
.unwrap_or(last_duration);
|
||||
.max(
|
||||
self.min_run_interval
|
||||
.map(Duration::from_secs)
|
||||
.unwrap_or(MIN_REFRESH_DURATION),
|
||||
);
|
||||
|
||||
// if have dirty time window, execute immediately to clean dirty time window
|
||||
if self.dirty_time_windows.windows.is_empty() {
|
||||
@@ -243,9 +251,19 @@ impl DirtyTimeWindows {
|
||||
};
|
||||
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
|
||||
.with_label_values(&[flow_id.to_string().as_str()])
|
||||
.with_label_values(&[
|
||||
flow_id.to_string().as_str(),
|
||||
format!("{}", window_size).as_str(),
|
||||
])
|
||||
.observe(first_nth.len() as f64);
|
||||
|
||||
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT
|
||||
.with_label_values(&[
|
||||
flow_id.to_string().as_str(),
|
||||
format!("{}", window_size).as_str(),
|
||||
])
|
||||
.observe(self.windows.len() as f64);
|
||||
|
||||
let full_time_range = first_nth
|
||||
.iter()
|
||||
.fold(chrono::Duration::zero(), |acc, (start, end)| {
|
||||
@@ -257,7 +275,10 @@ impl DirtyTimeWindows {
|
||||
})
|
||||
.num_seconds() as f64;
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE
|
||||
.with_label_values(&[flow_id.to_string().as_str()])
|
||||
.with_label_values(&[
|
||||
flow_id.to_string().as_str(),
|
||||
format!("{}", window_size).as_str(),
|
||||
])
|
||||
.observe(full_time_range);
|
||||
|
||||
let mut expr_lst = vec![];
|
||||
|
||||
@@ -61,7 +61,8 @@ use crate::error::{
|
||||
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::metrics::{
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT,
|
||||
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
|
||||
};
|
||||
use crate::{Error, FlowId};
|
||||
|
||||
@@ -81,6 +82,14 @@ pub struct TaskConfig {
|
||||
query_type: QueryType,
|
||||
}
|
||||
|
||||
impl TaskConfig {
|
||||
pub fn time_window_size(&self) -> Option<Duration> {
|
||||
self.time_window_expr
|
||||
.as_ref()
|
||||
.and_then(|expr| *expr.time_window_size())
|
||||
}
|
||||
}
|
||||
|
||||
fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
|
||||
let stmts =
|
||||
ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default())
|
||||
@@ -144,6 +153,12 @@ impl BatchingTask {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn adjust(&self, min_run_interval_secs: u64, max_filter_num_per_query: usize) {
|
||||
let mut state = self.state.write().unwrap();
|
||||
state.min_run_interval = Some(min_run_interval_secs);
|
||||
state.max_filter_num = Some(max_filter_num_per_query);
|
||||
}
|
||||
|
||||
/// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set)
|
||||
///
|
||||
/// useful for flush_flow to flush dirty time windows range
|
||||
@@ -280,7 +295,7 @@ impl BatchingTask {
|
||||
let catalog = &self.config.sink_table_name[0];
|
||||
let schema = &self.config.sink_table_name[1];
|
||||
frontend_client
|
||||
.create(expr.clone(), catalog, schema)
|
||||
.create(expr.clone(), catalog, schema, Some(self))
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -328,11 +343,53 @@ impl BatchingTask {
|
||||
})?;
|
||||
|
||||
let plan = expanded_plan;
|
||||
let mut peer_desc = None;
|
||||
|
||||
let db = frontend_client
|
||||
.get_random_active_frontend(catalog, schema)
|
||||
.await?;
|
||||
let peer_desc = db.peer.clone();
|
||||
|
||||
let (tx, mut rx) = oneshot::channel();
|
||||
let peer_inner = peer_desc.clone();
|
||||
let window_size_pretty = format!(
|
||||
"{}s",
|
||||
self.config.time_window_size().unwrap_or_default().as_secs()
|
||||
);
|
||||
let inner_window_size_pretty = window_size_pretty.clone();
|
||||
let flow_id = self.config.flow_id;
|
||||
let slow_query_metric_task = tokio::task::spawn(async move {
|
||||
tokio::time::sleep(SLOW_QUERY_THRESHOLD).await;
|
||||
METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT
|
||||
.with_label_values(&[
|
||||
flow_id.to_string().as_str(),
|
||||
&peer_inner.to_string(),
|
||||
inner_window_size_pretty.as_str(),
|
||||
])
|
||||
.add(1.0);
|
||||
while rx.try_recv() == Err(TryRecvError::Empty) {
|
||||
// sleep for a while before next update
|
||||
tokio::time::sleep(MIN_REFRESH_DURATION).await;
|
||||
}
|
||||
METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT
|
||||
.with_label_values(&[
|
||||
flow_id.to_string().as_str(),
|
||||
&peer_inner.to_string(),
|
||||
inner_window_size_pretty.as_str(),
|
||||
])
|
||||
.sub(1.0);
|
||||
});
|
||||
self.state.write().unwrap().slow_query_metric_task = Some(slow_query_metric_task);
|
||||
|
||||
let res = {
|
||||
let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
|
||||
.with_label_values(&[flow_id.to_string().as_str()])
|
||||
.with_label_values(&[
|
||||
flow_id.to_string().as_str(),
|
||||
format!(
|
||||
"{}s",
|
||||
self.config.time_window_size().unwrap_or_default().as_secs()
|
||||
)
|
||||
.as_str(),
|
||||
])
|
||||
.start_timer();
|
||||
|
||||
// hack and special handling the insert logical plan
|
||||
@@ -361,10 +418,12 @@ impl BatchingTask {
|
||||
};
|
||||
|
||||
frontend_client
|
||||
.handle(req, catalog, schema, &mut peer_desc)
|
||||
.handle(req, catalog, schema, Some(db.peer), Some(self))
|
||||
.await
|
||||
};
|
||||
|
||||
// signaling the slow query metric task to stop
|
||||
let _ = tx.send(());
|
||||
let elapsed = instant.elapsed();
|
||||
if let Ok(affected_rows) = &res {
|
||||
debug!(
|
||||
@@ -387,7 +446,12 @@ impl BatchingTask {
|
||||
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
|
||||
.with_label_values(&[
|
||||
flow_id.to_string().as_str(),
|
||||
&peer_desc.unwrap_or_default().to_string(),
|
||||
&peer_desc.to_string(),
|
||||
format!(
|
||||
"{}s",
|
||||
self.config.time_window_size().unwrap_or_default().as_secs()
|
||||
)
|
||||
.as_str(),
|
||||
])
|
||||
.observe(elapsed.as_secs_f64());
|
||||
}
|
||||
@@ -580,19 +644,20 @@ impl BatchingTask {
|
||||
),
|
||||
})?;
|
||||
|
||||
let expr = self
|
||||
.state
|
||||
.write()
|
||||
.unwrap()
|
||||
.dirty_time_windows
|
||||
.gen_filter_exprs(
|
||||
let expr = {
|
||||
let mut state = self.state.write().unwrap();
|
||||
let max_window_cnt = state
|
||||
.max_filter_num
|
||||
.unwrap_or(DirtyTimeWindows::MAX_FILTER_NUM);
|
||||
state.dirty_time_windows.gen_filter_exprs(
|
||||
&col_name,
|
||||
Some(l),
|
||||
window_size,
|
||||
DirtyTimeWindows::MAX_FILTER_NUM,
|
||||
max_window_cnt,
|
||||
self.config.flow_id,
|
||||
Some(self),
|
||||
)?;
|
||||
)?
|
||||
};
|
||||
|
||||
debug!(
|
||||
"Flow id={:?}, Generated filter expr: {:?}",
|
||||
|
||||
@@ -31,22 +31,37 @@ lazy_static! {
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME: HistogramVec = register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_query_time_secs",
|
||||
"flow batching engine query time(seconds)",
|
||||
&["flow_id"],
|
||||
&["flow_id", "time_window_granularity"],
|
||||
vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_slow_query_secs",
|
||||
"flow batching engine slow query(seconds)",
|
||||
&["flow_id", "peer"],
|
||||
"flow batching engine slow query(seconds), updated after query finished",
|
||||
&["flow_id", "peer", "time_window_granularity"],
|
||||
vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT: GaugeVec =
|
||||
register_gauge_vec!(
|
||||
"greptime_flow_batching_engine_real_time_slow_query_number",
|
||||
"flow batching engine real time slow query number, updated in real time",
|
||||
&["flow_id", "peer", "time_window_granularity"],
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT: HistogramVec =
|
||||
register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_stalled_query_window_cnt",
|
||||
"flow batching engine stalled query time window count",
|
||||
&["flow_id", "time_window_granularity"],
|
||||
vec![0.0, 5., 10., 20., 40.]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT: HistogramVec =
|
||||
register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_query_window_cnt",
|
||||
"flow batching engine query time window count",
|
||||
&["flow_id"],
|
||||
&["flow_id", "time_window_granularity"],
|
||||
vec![0.0, 5., 10., 20., 40.]
|
||||
)
|
||||
.unwrap();
|
||||
@@ -54,7 +69,15 @@ lazy_static! {
|
||||
register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_query_time_range_secs",
|
||||
"flow batching engine query time range(seconds)",
|
||||
&["flow_id"],
|
||||
&["flow_id", "time_window_granularity"],
|
||||
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD: HistogramVec =
|
||||
register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_guess_fe_load",
|
||||
"flow batching engine guessed frontend load",
|
||||
&["fe_addr"],
|
||||
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -130,7 +130,13 @@ impl JaegerQueryHandler for Instance {
|
||||
.await?)
|
||||
}
|
||||
|
||||
async fn get_trace(&self, ctx: QueryContextRef, trace_id: &str) -> ServerResult<Output> {
|
||||
async fn get_trace(
|
||||
&self,
|
||||
ctx: QueryContextRef,
|
||||
trace_id: &str,
|
||||
start_time: Option<i64>,
|
||||
end_time: Option<i64>,
|
||||
) -> ServerResult<Output> {
|
||||
// It's equivalent to
|
||||
//
|
||||
// ```
|
||||
@@ -139,13 +145,25 @@ impl JaegerQueryHandler for Instance {
|
||||
// FROM
|
||||
// {db}.{trace_table}
|
||||
// WHERE
|
||||
// trace_id = '{trace_id}'
|
||||
// trace_id = '{trace_id}' AND
|
||||
// timestamp >= {start_time} AND
|
||||
// timestamp <= {end_time}
|
||||
// ORDER BY
|
||||
// timestamp DESC
|
||||
// ```.
|
||||
let selects = vec![wildcard()];
|
||||
|
||||
let filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
|
||||
let mut filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
|
||||
|
||||
if let Some(start_time) = start_time {
|
||||
// Microseconds to nanoseconds.
|
||||
filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time * 1_000)));
|
||||
}
|
||||
|
||||
if let Some(end_time) = end_time {
|
||||
// Microseconds to nanoseconds.
|
||||
filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time * 1_000)));
|
||||
}
|
||||
|
||||
Ok(query_trace_table(
|
||||
ctx,
|
||||
|
||||
@@ -31,8 +31,6 @@ use common_meta::kv_backend::rds::MySqlStore;
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
use common_meta::kv_backend::rds::PgStore;
|
||||
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
use common_telemetry::error;
|
||||
use common_telemetry::info;
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
use deadpool_postgres::{Config, Runtime};
|
||||
@@ -144,7 +142,8 @@ impl MetasrvInstance {
|
||||
let (serve_state_tx, serve_state_rx) = oneshot::channel();
|
||||
|
||||
let socket_addr =
|
||||
bootstrap_metasrv_with_router(&self.opts.bind_addr, router, serve_state_tx, rx).await?;
|
||||
bootstrap_metasrv_with_router(&self.opts.grpc.bind_addr, router, serve_state_tx, rx)
|
||||
.await?;
|
||||
self.bind_addr = Some(socket_addr);
|
||||
|
||||
let addr = self.opts.http.addr.parse().context(error::ParseAddrSnafu {
|
||||
@@ -260,7 +259,7 @@ pub async fn metasrv_builder(
|
||||
let etcd_client = create_etcd_client(&opts.store_addrs).await?;
|
||||
let kv_backend = EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops);
|
||||
let election = EtcdElection::with_etcd_client(
|
||||
&opts.server_addr,
|
||||
&opts.grpc.server_addr,
|
||||
etcd_client,
|
||||
opts.store_key_prefix.clone(),
|
||||
)
|
||||
@@ -270,22 +269,41 @@ pub async fn metasrv_builder(
|
||||
}
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
(None, BackendImpl::PostgresStore) => {
|
||||
let pool = create_postgres_pool(&opts.store_addrs).await?;
|
||||
let kv_backend = PgStore::with_pg_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
|
||||
.await
|
||||
.context(error::KvBackendSnafu)?;
|
||||
// Client for election should be created separately since we need a different session keep-alive idle time.
|
||||
let election_client = create_postgres_client(opts).await?;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_meta::distributed_time_constants::POSTGRES_KEEP_ALIVE_SECS;
|
||||
|
||||
use crate::election::rds::postgres::ElectionPgClient;
|
||||
|
||||
let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS);
|
||||
let execution_timeout = Duration::from_secs(META_LEASE_SECS);
|
||||
let statement_timeout = Duration::from_secs(META_LEASE_SECS);
|
||||
let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS);
|
||||
|
||||
let mut cfg = Config::new();
|
||||
cfg.keepalives = Some(true);
|
||||
cfg.keepalives_idle = Some(Duration::from_secs(POSTGRES_KEEP_ALIVE_SECS));
|
||||
// We use a separate pool for election since we need a different session keep-alive idle time.
|
||||
let pool = create_postgres_pool_with(&opts.store_addrs, cfg).await?;
|
||||
|
||||
let election_client =
|
||||
ElectionPgClient::new(pool, execution_timeout, meta_lease_ttl, statement_timeout)?;
|
||||
let election = PgElection::with_pg_client(
|
||||
opts.server_addr.clone(),
|
||||
opts.grpc.server_addr.clone(),
|
||||
election_client,
|
||||
opts.store_key_prefix.clone(),
|
||||
CANDIDATE_LEASE_SECS,
|
||||
META_LEASE_SECS,
|
||||
candidate_lease_ttl,
|
||||
meta_lease_ttl,
|
||||
&opts.meta_table_name,
|
||||
opts.meta_election_lock_id,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let pool = create_postgres_pool(&opts.store_addrs).await?;
|
||||
let kv_backend = PgStore::with_pg_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
|
||||
.await
|
||||
.context(error::KvBackendSnafu)?;
|
||||
|
||||
(kv_backend, Some(election))
|
||||
}
|
||||
#[cfg(feature = "mysql_kvbackend")]
|
||||
@@ -299,7 +317,7 @@ pub async fn metasrv_builder(
|
||||
let election_table_name = opts.meta_table_name.clone() + "_election";
|
||||
let election_client = create_mysql_client(opts).await?;
|
||||
let election = MySqlElection::with_mysql_client(
|
||||
opts.server_addr.clone(),
|
||||
opts.grpc.server_addr.clone(),
|
||||
election_client,
|
||||
opts.store_key_prefix.clone(),
|
||||
CANDIDATE_LEASE_SECS,
|
||||
@@ -372,31 +390,24 @@ pub async fn create_etcd_client(store_addrs: &[String]) -> Result<Client> {
|
||||
}
|
||||
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
async fn create_postgres_client(opts: &MetasrvOptions) -> Result<tokio_postgres::Client> {
|
||||
let postgres_url = opts
|
||||
.store_addrs
|
||||
.first()
|
||||
.context(error::InvalidArgumentsSnafu {
|
||||
err_msg: "empty store addrs",
|
||||
})?;
|
||||
let (client, connection) = tokio_postgres::connect(postgres_url, NoTls)
|
||||
.await
|
||||
.context(error::ConnectPostgresSnafu)?;
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
error!(e; "connection error");
|
||||
}
|
||||
});
|
||||
Ok(client)
|
||||
/// Creates a pool for the Postgres backend.
|
||||
///
|
||||
/// It only use first store addr to create a pool.
|
||||
pub async fn create_postgres_pool(store_addrs: &[String]) -> Result<deadpool_postgres::Pool> {
|
||||
create_postgres_pool_with(store_addrs, Config::new()).await
|
||||
}
|
||||
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
pub async fn create_postgres_pool(store_addrs: &[String]) -> Result<deadpool_postgres::Pool> {
|
||||
/// Creates a pool for the Postgres backend.
|
||||
///
|
||||
/// It only use first store addr to create a pool, and use the given config to create a pool.
|
||||
pub async fn create_postgres_pool_with(
|
||||
store_addrs: &[String],
|
||||
mut cfg: Config,
|
||||
) -> Result<deadpool_postgres::Pool> {
|
||||
let postgres_url = store_addrs.first().context(error::InvalidArgumentsSnafu {
|
||||
err_msg: "empty store addrs",
|
||||
})?;
|
||||
let mut cfg = Config::new();
|
||||
cfg.url = Some(postgres_url.to_string());
|
||||
let pool = cfg
|
||||
.create_pool(Some(Runtime::Tokio1), NoTls)
|
||||
|
||||
@@ -157,6 +157,11 @@ pub trait Election: Send + Sync {
|
||||
/// but only one can be the leader at a time.
|
||||
async fn campaign(&self) -> Result<()>;
|
||||
|
||||
/// Resets the campaign.
|
||||
///
|
||||
/// Reset the client and the leader flag if needed.
|
||||
async fn reset_campaign(&self) {}
|
||||
|
||||
/// Returns the leader value for the current election.
|
||||
async fn leader(&self) -> Result<Self::Leader>;
|
||||
|
||||
|
||||
@@ -18,11 +18,12 @@ use std::time::Duration;
|
||||
|
||||
use common_telemetry::{error, warn};
|
||||
use common_time::Timestamp;
|
||||
use deadpool_postgres::{Manager, Pool};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::{broadcast, RwLock};
|
||||
use tokio::time::MissedTickBehavior;
|
||||
use tokio_postgres::types::ToSql;
|
||||
use tokio_postgres::Client;
|
||||
use tokio_postgres::Row;
|
||||
|
||||
use crate::election::rds::{parse_value_and_expire_time, Lease, RdsLeaderKey, LEASE_SEP};
|
||||
use crate::election::{
|
||||
@@ -30,15 +31,14 @@ use crate::election::{
|
||||
CANDIDATES_ROOT, ELECTION_KEY,
|
||||
};
|
||||
use crate::error::{
|
||||
DeserializeFromJsonSnafu, NoLeaderSnafu, PostgresExecutionSnafu, Result, SerializeToJsonSnafu,
|
||||
UnexpectedSnafu,
|
||||
DeserializeFromJsonSnafu, GetPostgresClientSnafu, NoLeaderSnafu, PostgresExecutionSnafu,
|
||||
Result, SerializeToJsonSnafu, SqlExecutionTimeoutSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
|
||||
|
||||
struct ElectionSqlFactory<'a> {
|
||||
lock_id: u64,
|
||||
table_name: &'a str,
|
||||
meta_lease_ttl_secs: u64,
|
||||
}
|
||||
|
||||
struct ElectionSqlSet {
|
||||
@@ -88,11 +88,10 @@ struct ElectionSqlSet {
|
||||
}
|
||||
|
||||
impl<'a> ElectionSqlFactory<'a> {
|
||||
fn new(lock_id: u64, table_name: &'a str, meta_lease_ttl_secs: u64) -> Self {
|
||||
fn new(lock_id: u64, table_name: &'a str) -> Self {
|
||||
Self {
|
||||
lock_id,
|
||||
table_name,
|
||||
meta_lease_ttl_secs,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -108,15 +107,6 @@ impl<'a> ElectionSqlFactory<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
// Currently the session timeout is longer than the leader lease time.
|
||||
// So the leader will renew the lease twice before the session timeout if everything goes well.
|
||||
fn set_idle_session_timeout_sql(&self) -> String {
|
||||
format!(
|
||||
"SET idle_session_timeout = '{}s';",
|
||||
self.meta_lease_ttl_secs + 1
|
||||
)
|
||||
}
|
||||
|
||||
fn campaign_sql(&self) -> String {
|
||||
format!("SELECT pg_try_advisory_lock({})", self.lock_id)
|
||||
}
|
||||
@@ -171,46 +161,165 @@ impl<'a> ElectionSqlFactory<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
/// PgClient for election.
|
||||
pub struct ElectionPgClient {
|
||||
current: Option<deadpool::managed::Object<Manager>>,
|
||||
pool: Pool,
|
||||
/// The client-side timeout for statement execution.
|
||||
///
|
||||
/// This timeout is enforced by the client application and is independent of any server-side timeouts.
|
||||
/// If a statement takes longer than this duration to execute, the client will abort the operation.
|
||||
execution_timeout: Duration,
|
||||
|
||||
/// The idle session timeout.
|
||||
///
|
||||
/// This timeout is configured per client session and is enforced by the PostgreSQL server.
|
||||
/// If a session remains idle for longer than this duration, the server will terminate it.
|
||||
idle_session_timeout: Duration,
|
||||
|
||||
/// The statement timeout.
|
||||
///
|
||||
/// This timeout is configured per client session and is enforced by the PostgreSQL server.
|
||||
/// If a statement takes longer than this duration to execute, the server will abort it.
|
||||
statement_timeout: Duration,
|
||||
}
|
||||
|
||||
impl ElectionPgClient {
|
||||
pub fn new(
|
||||
pool: Pool,
|
||||
execution_timeout: Duration,
|
||||
idle_session_timeout: Duration,
|
||||
statement_timeout: Duration,
|
||||
) -> Result<ElectionPgClient> {
|
||||
Ok(ElectionPgClient {
|
||||
current: None,
|
||||
pool,
|
||||
execution_timeout,
|
||||
idle_session_timeout,
|
||||
statement_timeout,
|
||||
})
|
||||
}
|
||||
|
||||
fn set_idle_session_timeout_sql(&self) -> String {
|
||||
format!(
|
||||
"SET idle_session_timeout = '{}s';",
|
||||
self.idle_session_timeout.as_secs()
|
||||
)
|
||||
}
|
||||
|
||||
fn set_statement_timeout_sql(&self) -> String {
|
||||
format!(
|
||||
"SET statement_timeout = '{}s';",
|
||||
self.statement_timeout.as_secs()
|
||||
)
|
||||
}
|
||||
|
||||
async fn reset_client(&mut self) -> Result<()> {
|
||||
self.current = None;
|
||||
self.maybe_init_client().await
|
||||
}
|
||||
|
||||
async fn maybe_init_client(&mut self) -> Result<()> {
|
||||
if self.current.is_none() {
|
||||
let client = self.pool.get().await.context(GetPostgresClientSnafu)?;
|
||||
|
||||
self.current = Some(client);
|
||||
// Set idle session timeout and statement timeout.
|
||||
let idle_session_timeout_sql = self.set_idle_session_timeout_sql();
|
||||
self.execute(&idle_session_timeout_sql, &[]).await?;
|
||||
let statement_timeout_sql = self.set_statement_timeout_sql();
|
||||
self.execute(&statement_timeout_sql, &[]).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the result of the query.
|
||||
///
|
||||
/// # Panics
|
||||
/// if `current` is `None`.
|
||||
async fn execute(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64> {
|
||||
let result = tokio::time::timeout(
|
||||
self.execution_timeout,
|
||||
self.current.as_ref().unwrap().execute(sql, params),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| {
|
||||
SqlExecutionTimeoutSnafu {
|
||||
sql: sql.to_string(),
|
||||
duration: self.execution_timeout,
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
|
||||
result.context(PostgresExecutionSnafu { sql })
|
||||
}
|
||||
|
||||
/// Returns the result of the query.
|
||||
///
|
||||
/// # Panics
|
||||
/// if `current` is `None`.
|
||||
async fn query(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>> {
|
||||
let result = tokio::time::timeout(
|
||||
self.execution_timeout,
|
||||
self.current.as_ref().unwrap().query(sql, params),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| {
|
||||
SqlExecutionTimeoutSnafu {
|
||||
sql: sql.to_string(),
|
||||
duration: self.execution_timeout,
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
|
||||
result.context(PostgresExecutionSnafu { sql })
|
||||
}
|
||||
}
|
||||
|
||||
/// PostgreSql implementation of Election.
|
||||
pub struct PgElection {
|
||||
leader_value: String,
|
||||
client: Client,
|
||||
pg_client: RwLock<ElectionPgClient>,
|
||||
is_leader: AtomicBool,
|
||||
leader_infancy: AtomicBool,
|
||||
leader_watcher: broadcast::Sender<LeaderChangeMessage>,
|
||||
store_key_prefix: String,
|
||||
candidate_lease_ttl_secs: u64,
|
||||
meta_lease_ttl_secs: u64,
|
||||
candidate_lease_ttl: Duration,
|
||||
meta_lease_ttl: Duration,
|
||||
sql_set: ElectionSqlSet,
|
||||
}
|
||||
|
||||
impl PgElection {
|
||||
async fn maybe_init_client(&self) -> Result<()> {
|
||||
if self.pg_client.read().await.current.is_none() {
|
||||
self.pg_client.write().await.maybe_init_client().await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn with_pg_client(
|
||||
leader_value: String,
|
||||
client: Client,
|
||||
pg_client: ElectionPgClient,
|
||||
store_key_prefix: String,
|
||||
candidate_lease_ttl_secs: u64,
|
||||
meta_lease_ttl_secs: u64,
|
||||
candidate_lease_ttl: Duration,
|
||||
meta_lease_ttl: Duration,
|
||||
table_name: &str,
|
||||
lock_id: u64,
|
||||
) -> Result<ElectionRef> {
|
||||
let sql_factory = ElectionSqlFactory::new(lock_id, table_name, meta_lease_ttl_secs);
|
||||
// Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead advisory lock.
|
||||
client
|
||||
.execute(&sql_factory.set_idle_session_timeout_sql(), &[])
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
let sql_factory = ElectionSqlFactory::new(lock_id, table_name);
|
||||
|
||||
let tx = listen_leader_change(leader_value.clone());
|
||||
Ok(Arc::new(Self {
|
||||
leader_value,
|
||||
client,
|
||||
pg_client: RwLock::new(pg_client),
|
||||
is_leader: AtomicBool::new(false),
|
||||
leader_infancy: AtomicBool::new(false),
|
||||
leader_watcher: tx,
|
||||
store_key_prefix,
|
||||
candidate_lease_ttl_secs,
|
||||
meta_lease_ttl_secs,
|
||||
candidate_lease_ttl,
|
||||
meta_lease_ttl,
|
||||
sql_set: sql_factory.build(),
|
||||
}))
|
||||
}
|
||||
@@ -249,18 +358,17 @@ impl Election for PgElection {
|
||||
input: format!("{node_info:?}"),
|
||||
})?;
|
||||
let res = self
|
||||
.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl_secs)
|
||||
.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
|
||||
.await?;
|
||||
// May registered before, just update the lease.
|
||||
if !res {
|
||||
self.delete_value(&key).await?;
|
||||
self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl_secs)
|
||||
self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Check if the current lease has expired and renew the lease.
|
||||
let mut keep_alive_interval =
|
||||
tokio::time::interval(Duration::from_secs(self.candidate_lease_ttl_secs / 2));
|
||||
let mut keep_alive_interval = tokio::time::interval(self.candidate_lease_ttl / 2);
|
||||
loop {
|
||||
let _ = keep_alive_interval.tick().await;
|
||||
|
||||
@@ -282,13 +390,8 @@ impl Election for PgElection {
|
||||
);
|
||||
|
||||
// Safety: origin is Some since we are using `get_value_with_lease` with `true`.
|
||||
self.update_value_with_lease(
|
||||
&key,
|
||||
&lease.origin,
|
||||
&node_info,
|
||||
self.candidate_lease_ttl_secs,
|
||||
)
|
||||
.await?;
|
||||
self.update_value_with_lease(&key, &lease.origin, &node_info, self.candidate_lease_ttl)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -321,16 +424,17 @@ impl Election for PgElection {
|
||||
/// - If the lock is not acquired (result is false), it calls the `follower_action` method
|
||||
/// to perform actions as a follower.
|
||||
async fn campaign(&self) -> Result<()> {
|
||||
let mut keep_alive_interval =
|
||||
tokio::time::interval(Duration::from_secs(self.meta_lease_ttl_secs / 2));
|
||||
let mut keep_alive_interval = tokio::time::interval(self.meta_lease_ttl / 2);
|
||||
keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
|
||||
|
||||
self.maybe_init_client().await?;
|
||||
loop {
|
||||
let res = self
|
||||
.client
|
||||
.query(&self.sql_set.campaign, &[])
|
||||
.pg_client
|
||||
.read()
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
.query(&self.sql_set.campaign, &[])
|
||||
.await?;
|
||||
let row = res.first().context(UnexpectedSnafu {
|
||||
violated: "Failed to get the result of acquiring advisory lock",
|
||||
})?;
|
||||
@@ -349,6 +453,12 @@ impl Election for PgElection {
|
||||
}
|
||||
}
|
||||
|
||||
async fn reset_campaign(&self) {
|
||||
if let Err(err) = self.pg_client.write().await.reset_client().await {
|
||||
error!(err; "Failed to reset client");
|
||||
}
|
||||
}
|
||||
|
||||
async fn leader(&self) -> Result<Self::Leader> {
|
||||
if self.is_leader.load(Ordering::Relaxed) {
|
||||
Ok(self.leader_value.as_bytes().into())
|
||||
@@ -376,11 +486,13 @@ impl PgElection {
|
||||
/// Returns value, expire time and current time. If `with_origin` is true, the origin string is also returned.
|
||||
async fn get_value_with_lease(&self, key: &str) -> Result<Option<Lease>> {
|
||||
let key = key.as_bytes();
|
||||
self.maybe_init_client().await?;
|
||||
let res = self
|
||||
.client
|
||||
.query(&self.sql_set.get_value_with_lease, &[&key])
|
||||
.pg_client
|
||||
.read()
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
.query(&self.sql_set.get_value_with_lease, &[&key])
|
||||
.await?;
|
||||
|
||||
if res.is_empty() {
|
||||
Ok(None)
|
||||
@@ -414,11 +526,13 @@ impl PgElection {
|
||||
key_prefix: &str,
|
||||
) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
|
||||
let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
|
||||
self.maybe_init_client().await?;
|
||||
let res = self
|
||||
.client
|
||||
.query(&self.sql_set.get_value_with_lease_by_prefix, &[&key_prefix])
|
||||
.pg_client
|
||||
.read()
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
.query(&self.sql_set.get_value_with_lease_by_prefix, &[&key_prefix])
|
||||
.await?;
|
||||
|
||||
let mut values_with_leases = vec![];
|
||||
let mut current = Timestamp::default();
|
||||
@@ -445,18 +559,21 @@ impl PgElection {
|
||||
key: &str,
|
||||
prev: &str,
|
||||
updated: &str,
|
||||
lease_ttl: u64,
|
||||
lease_ttl: Duration,
|
||||
) -> Result<()> {
|
||||
let key = key.as_bytes();
|
||||
let prev = prev.as_bytes();
|
||||
self.maybe_init_client().await?;
|
||||
let lease_ttl_secs = lease_ttl.as_secs() as f64;
|
||||
let res = self
|
||||
.client
|
||||
.pg_client
|
||||
.read()
|
||||
.await
|
||||
.execute(
|
||||
&self.sql_set.update_value_with_lease,
|
||||
&[&key, &prev, &updated, &(lease_ttl as f64)],
|
||||
&[&key, &prev, &updated, &lease_ttl_secs],
|
||||
)
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
.await?;
|
||||
|
||||
ensure!(
|
||||
res == 1,
|
||||
@@ -473,16 +590,18 @@ impl PgElection {
|
||||
&self,
|
||||
key: &str,
|
||||
value: &str,
|
||||
lease_ttl_secs: u64,
|
||||
lease_ttl: Duration,
|
||||
) -> Result<bool> {
|
||||
let key = key.as_bytes();
|
||||
let lease_ttl_secs = lease_ttl_secs as f64;
|
||||
let lease_ttl_secs = lease_ttl.as_secs() as f64;
|
||||
let params: Vec<&(dyn ToSql + Sync)> = vec![&key, &value, &lease_ttl_secs];
|
||||
self.maybe_init_client().await?;
|
||||
let res = self
|
||||
.client
|
||||
.query(&self.sql_set.put_value_with_lease, ¶ms)
|
||||
.pg_client
|
||||
.read()
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
.query(&self.sql_set.put_value_with_lease, ¶ms)
|
||||
.await?;
|
||||
Ok(res.is_empty())
|
||||
}
|
||||
|
||||
@@ -490,11 +609,13 @@ impl PgElection {
|
||||
/// Caution: Should only delete the key if the lease is expired.
|
||||
async fn delete_value(&self, key: &str) -> Result<bool> {
|
||||
let key = key.as_bytes();
|
||||
self.maybe_init_client().await?;
|
||||
let res = self
|
||||
.client
|
||||
.query(&self.sql_set.delete_value, &[&key])
|
||||
.pg_client
|
||||
.read()
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
.query(&self.sql_set.delete_value, &[&key])
|
||||
.await?;
|
||||
|
||||
Ok(res.len() == 1)
|
||||
}
|
||||
@@ -536,7 +657,7 @@ impl PgElection {
|
||||
&key,
|
||||
&lease.origin,
|
||||
&self.leader_value,
|
||||
self.meta_lease_ttl_secs,
|
||||
self.meta_lease_ttl,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
@@ -605,10 +726,12 @@ impl PgElection {
|
||||
..Default::default()
|
||||
};
|
||||
self.delete_value(&key).await?;
|
||||
self.client
|
||||
.query(&self.sql_set.step_down, &[])
|
||||
self.maybe_init_client().await?;
|
||||
self.pg_client
|
||||
.read()
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
.query(&self.sql_set.step_down, &[])
|
||||
.await?;
|
||||
send_leader_change_and_set_flags(
|
||||
&self.is_leader,
|
||||
&self.leader_infancy,
|
||||
@@ -651,7 +774,7 @@ impl PgElection {
|
||||
..Default::default()
|
||||
};
|
||||
self.delete_value(&key).await?;
|
||||
self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl_secs)
|
||||
self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl)
|
||||
.await?;
|
||||
|
||||
if self
|
||||
@@ -674,15 +797,21 @@ impl PgElection {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::env;
|
||||
|
||||
use common_meta::maybe_skip_postgres_integration_test;
|
||||
use tokio_postgres::{Client, NoTls};
|
||||
|
||||
use super::*;
|
||||
use crate::error::PostgresExecutionSnafu;
|
||||
use crate::bootstrap::create_postgres_pool;
|
||||
use crate::error;
|
||||
|
||||
async fn create_postgres_client(table_name: Option<&str>) -> Result<Client> {
|
||||
async fn create_postgres_client(
|
||||
table_name: Option<&str>,
|
||||
execution_timeout: Duration,
|
||||
idle_session_timeout: Duration,
|
||||
statement_timeout: Duration,
|
||||
) -> Result<ElectionPgClient> {
|
||||
let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default();
|
||||
if endpoint.is_empty() {
|
||||
return UnexpectedSnafu {
|
||||
@@ -690,25 +819,34 @@ mod tests {
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
let (client, connection) = tokio_postgres::connect(&endpoint, NoTls)
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
tokio::spawn(async move {
|
||||
connection.await.context(PostgresExecutionSnafu).unwrap();
|
||||
});
|
||||
let pool = create_postgres_pool(&[endpoint]).await.unwrap();
|
||||
let mut pg_client = ElectionPgClient::new(
|
||||
pool,
|
||||
execution_timeout,
|
||||
idle_session_timeout,
|
||||
statement_timeout,
|
||||
)
|
||||
.unwrap();
|
||||
pg_client.maybe_init_client().await?;
|
||||
if let Some(table_name) = table_name {
|
||||
let create_table_sql = format!(
|
||||
"CREATE TABLE IF NOT EXISTS \"{}\"(k bytea PRIMARY KEY, v bytea);",
|
||||
table_name
|
||||
);
|
||||
client.execute(&create_table_sql, &[]).await.unwrap();
|
||||
pg_client.execute(&create_table_sql, &[]).await?;
|
||||
}
|
||||
Ok(client)
|
||||
Ok(pg_client)
|
||||
}
|
||||
|
||||
async fn drop_table(client: &Client, table_name: &str) {
|
||||
async fn drop_table(pg_election: &PgElection, table_name: &str) {
|
||||
let sql = format!("DROP TABLE IF EXISTS \"{}\";", table_name);
|
||||
client.execute(&sql, &[]).await.unwrap();
|
||||
pg_election
|
||||
.pg_client
|
||||
.read()
|
||||
.await
|
||||
.execute(&sql, &[])
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -719,23 +857,35 @@ mod tests {
|
||||
|
||||
let uuid = uuid::Uuid::new_v4().to_string();
|
||||
let table_name = "test_postgres_crud_greptime_metakv";
|
||||
let client = create_postgres_client(Some(table_name)).await.unwrap();
|
||||
let candidate_lease_ttl = Duration::from_secs(10);
|
||||
let execution_timeout = Duration::from_secs(10);
|
||||
let statement_timeout = Duration::from_secs(10);
|
||||
let meta_lease_ttl = Duration::from_secs(2);
|
||||
let idle_session_timeout = Duration::from_secs(0);
|
||||
let client = create_postgres_client(
|
||||
Some(table_name),
|
||||
execution_timeout,
|
||||
idle_session_timeout,
|
||||
statement_timeout,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let (tx, _) = broadcast::channel(100);
|
||||
let pg_election = PgElection {
|
||||
leader_value: "test_leader".to_string(),
|
||||
client,
|
||||
pg_client: RwLock::new(client),
|
||||
is_leader: AtomicBool::new(false),
|
||||
leader_infancy: AtomicBool::new(true),
|
||||
leader_watcher: tx,
|
||||
store_key_prefix: uuid,
|
||||
candidate_lease_ttl_secs: 10,
|
||||
meta_lease_ttl_secs: 2,
|
||||
sql_set: ElectionSqlFactory::new(28319, table_name, 2).build(),
|
||||
candidate_lease_ttl,
|
||||
meta_lease_ttl,
|
||||
sql_set: ElectionSqlFactory::new(28319, table_name).build(),
|
||||
};
|
||||
|
||||
let res = pg_election
|
||||
.put_value_with_lease(&key, &value, 10)
|
||||
.put_value_with_lease(&key, &value, candidate_lease_ttl)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(res);
|
||||
@@ -748,7 +898,7 @@ mod tests {
|
||||
assert_eq!(lease.leader_value, value);
|
||||
|
||||
pg_election
|
||||
.update_value_with_lease(&key, &lease.origin, &value, pg_election.meta_lease_ttl_secs)
|
||||
.update_value_with_lease(&key, &lease.origin, &value, pg_election.meta_lease_ttl)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -762,7 +912,7 @@ mod tests {
|
||||
let key = format!("test_key_{}", i);
|
||||
let value = format!("test_value_{}", i);
|
||||
pg_election
|
||||
.put_value_with_lease(&key, &value, 10)
|
||||
.put_value_with_lease(&key, &value, candidate_lease_ttl)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
@@ -787,28 +937,39 @@ mod tests {
|
||||
assert!(res.is_empty());
|
||||
assert!(current == Timestamp::default());
|
||||
|
||||
drop_table(&pg_election.client, table_name).await;
|
||||
drop_table(&pg_election, table_name).await;
|
||||
}
|
||||
|
||||
async fn candidate(
|
||||
leader_value: String,
|
||||
candidate_lease_ttl_secs: u64,
|
||||
candidate_lease_ttl: Duration,
|
||||
store_key_prefix: String,
|
||||
table_name: String,
|
||||
) {
|
||||
let client = create_postgres_client(None).await.unwrap();
|
||||
let execution_timeout = Duration::from_secs(10);
|
||||
let statement_timeout = Duration::from_secs(10);
|
||||
let meta_lease_ttl = Duration::from_secs(2);
|
||||
let idle_session_timeout = Duration::from_secs(0);
|
||||
let client = create_postgres_client(
|
||||
None,
|
||||
execution_timeout,
|
||||
idle_session_timeout,
|
||||
statement_timeout,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let (tx, _) = broadcast::channel(100);
|
||||
let pg_election = PgElection {
|
||||
leader_value,
|
||||
client,
|
||||
pg_client: RwLock::new(client),
|
||||
is_leader: AtomicBool::new(false),
|
||||
leader_infancy: AtomicBool::new(true),
|
||||
leader_watcher: tx,
|
||||
store_key_prefix,
|
||||
candidate_lease_ttl_secs,
|
||||
meta_lease_ttl_secs: 2,
|
||||
sql_set: ElectionSqlFactory::new(28319, &table_name, 2).build(),
|
||||
candidate_lease_ttl,
|
||||
meta_lease_ttl,
|
||||
sql_set: ElectionSqlFactory::new(28319, &table_name).build(),
|
||||
};
|
||||
|
||||
let node_info = MetasrvNodeInfo {
|
||||
@@ -824,17 +985,28 @@ mod tests {
|
||||
async fn test_candidate_registration() {
|
||||
maybe_skip_postgres_integration_test!();
|
||||
let leader_value_prefix = "test_leader".to_string();
|
||||
let candidate_lease_ttl_secs = 5;
|
||||
let uuid = uuid::Uuid::new_v4().to_string();
|
||||
let table_name = "test_candidate_registration_greptime_metakv";
|
||||
let mut handles = vec![];
|
||||
let client = create_postgres_client(Some(table_name)).await.unwrap();
|
||||
let candidate_lease_ttl = Duration::from_secs(5);
|
||||
let execution_timeout = Duration::from_secs(10);
|
||||
let statement_timeout = Duration::from_secs(10);
|
||||
let meta_lease_ttl = Duration::from_secs(2);
|
||||
let idle_session_timeout = Duration::from_secs(0);
|
||||
let client = create_postgres_client(
|
||||
Some(table_name),
|
||||
execution_timeout,
|
||||
idle_session_timeout,
|
||||
statement_timeout,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
for i in 0..10 {
|
||||
let leader_value = format!("{}{}", leader_value_prefix, i);
|
||||
let handle = tokio::spawn(candidate(
|
||||
leader_value,
|
||||
candidate_lease_ttl_secs,
|
||||
candidate_lease_ttl,
|
||||
uuid.clone(),
|
||||
table_name.to_string(),
|
||||
));
|
||||
@@ -847,14 +1019,14 @@ mod tests {
|
||||
let leader_value = "test_leader".to_string();
|
||||
let pg_election = PgElection {
|
||||
leader_value,
|
||||
client,
|
||||
pg_client: RwLock::new(client),
|
||||
is_leader: AtomicBool::new(false),
|
||||
leader_infancy: AtomicBool::new(true),
|
||||
leader_watcher: tx,
|
||||
store_key_prefix: uuid.clone(),
|
||||
candidate_lease_ttl_secs,
|
||||
meta_lease_ttl_secs: 2,
|
||||
sql_set: ElectionSqlFactory::new(28319, table_name, 2).build(),
|
||||
candidate_lease_ttl,
|
||||
meta_lease_ttl,
|
||||
sql_set: ElectionSqlFactory::new(28319, table_name).build(),
|
||||
};
|
||||
|
||||
let candidates = pg_election.all_candidates().await.unwrap();
|
||||
@@ -876,29 +1048,40 @@ mod tests {
|
||||
assert!(res);
|
||||
}
|
||||
|
||||
drop_table(&pg_election.client, table_name).await;
|
||||
drop_table(&pg_election, table_name).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_elected_and_step_down() {
|
||||
maybe_skip_postgres_integration_test!();
|
||||
let leader_value = "test_leader".to_string();
|
||||
let candidate_lease_ttl_secs = 5;
|
||||
let uuid = uuid::Uuid::new_v4().to_string();
|
||||
let table_name = "test_elected_and_step_down_greptime_metakv";
|
||||
let client = create_postgres_client(Some(table_name)).await.unwrap();
|
||||
let candidate_lease_ttl = Duration::from_secs(5);
|
||||
let execution_timeout = Duration::from_secs(10);
|
||||
let statement_timeout = Duration::from_secs(10);
|
||||
let meta_lease_ttl = Duration::from_secs(2);
|
||||
let idle_session_timeout = Duration::from_secs(0);
|
||||
let client = create_postgres_client(
|
||||
Some(table_name),
|
||||
execution_timeout,
|
||||
idle_session_timeout,
|
||||
statement_timeout,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let (tx, mut rx) = broadcast::channel(100);
|
||||
let leader_pg_election = PgElection {
|
||||
leader_value: leader_value.clone(),
|
||||
client,
|
||||
pg_client: RwLock::new(client),
|
||||
is_leader: AtomicBool::new(false),
|
||||
leader_infancy: AtomicBool::new(true),
|
||||
leader_watcher: tx,
|
||||
store_key_prefix: uuid,
|
||||
candidate_lease_ttl_secs,
|
||||
meta_lease_ttl_secs: 2,
|
||||
sql_set: ElectionSqlFactory::new(28320, table_name, 2).build(),
|
||||
candidate_lease_ttl,
|
||||
meta_lease_ttl,
|
||||
sql_set: ElectionSqlFactory::new(28320, table_name).build(),
|
||||
};
|
||||
|
||||
leader_pg_election.elected().await.unwrap();
|
||||
@@ -990,7 +1173,7 @@ mod tests {
|
||||
_ => panic!("Expected LeaderChangeMessage::StepDown"),
|
||||
}
|
||||
|
||||
drop_table(&leader_pg_election.client, table_name).await;
|
||||
drop_table(&leader_pg_election, table_name).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -999,25 +1182,38 @@ mod tests {
|
||||
let leader_value = "test_leader".to_string();
|
||||
let uuid = uuid::Uuid::new_v4().to_string();
|
||||
let table_name = "test_leader_action_greptime_metakv";
|
||||
let candidate_lease_ttl_secs = 5;
|
||||
let client = create_postgres_client(Some(table_name)).await.unwrap();
|
||||
let candidate_lease_ttl = Duration::from_secs(5);
|
||||
let execution_timeout = Duration::from_secs(10);
|
||||
let statement_timeout = Duration::from_secs(10);
|
||||
let meta_lease_ttl = Duration::from_secs(2);
|
||||
let idle_session_timeout = Duration::from_secs(0);
|
||||
let client = create_postgres_client(
|
||||
Some(table_name),
|
||||
execution_timeout,
|
||||
idle_session_timeout,
|
||||
statement_timeout,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let (tx, mut rx) = broadcast::channel(100);
|
||||
let leader_pg_election = PgElection {
|
||||
leader_value: leader_value.clone(),
|
||||
client,
|
||||
pg_client: RwLock::new(client),
|
||||
is_leader: AtomicBool::new(false),
|
||||
leader_infancy: AtomicBool::new(true),
|
||||
leader_watcher: tx,
|
||||
store_key_prefix: uuid,
|
||||
candidate_lease_ttl_secs,
|
||||
meta_lease_ttl_secs: 2,
|
||||
sql_set: ElectionSqlFactory::new(28321, table_name, 2).build(),
|
||||
candidate_lease_ttl,
|
||||
meta_lease_ttl,
|
||||
sql_set: ElectionSqlFactory::new(28321, table_name).build(),
|
||||
};
|
||||
|
||||
// Step 1: No leader exists, campaign and elected.
|
||||
let res = leader_pg_election
|
||||
.client
|
||||
.pg_client
|
||||
.read()
|
||||
.await
|
||||
.query(&leader_pg_election.sql_set.campaign, &[])
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1048,7 +1244,9 @@ mod tests {
|
||||
|
||||
// Step 2: As a leader, renew the lease.
|
||||
let res = leader_pg_election
|
||||
.client
|
||||
.pg_client
|
||||
.read()
|
||||
.await
|
||||
.query(&leader_pg_election.sql_set.campaign, &[])
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1070,7 +1268,9 @@ mod tests {
|
||||
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||
|
||||
let res = leader_pg_election
|
||||
.client
|
||||
.pg_client
|
||||
.read()
|
||||
.await
|
||||
.query(&leader_pg_election.sql_set.campaign, &[])
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1098,7 +1298,9 @@ mod tests {
|
||||
|
||||
// Step 4: Re-campaign and elected.
|
||||
let res = leader_pg_election
|
||||
.client
|
||||
.pg_client
|
||||
.read()
|
||||
.await
|
||||
.query(&leader_pg_election.sql_set.campaign, &[])
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1155,7 +1357,9 @@ mod tests {
|
||||
|
||||
// Step 6: Re-campaign and elected.
|
||||
let res = leader_pg_election
|
||||
.client
|
||||
.pg_client
|
||||
.read()
|
||||
.await
|
||||
.query(&leader_pg_election.sql_set.campaign, &[])
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1186,7 +1390,9 @@ mod tests {
|
||||
|
||||
// Step 7: Something wrong, the leader key changed by others.
|
||||
let res = leader_pg_election
|
||||
.client
|
||||
.pg_client
|
||||
.read()
|
||||
.await
|
||||
.query(&leader_pg_election.sql_set.campaign, &[])
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1197,7 +1403,11 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
leader_pg_election
|
||||
.put_value_with_lease(&leader_pg_election.election_key(), "test", 10)
|
||||
.put_value_with_lease(
|
||||
&leader_pg_election.election_key(),
|
||||
"test",
|
||||
Duration::from_secs(10),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
leader_pg_election.leader_action().await.unwrap();
|
||||
@@ -1223,52 +1433,74 @@ mod tests {
|
||||
|
||||
// Clean up
|
||||
leader_pg_election
|
||||
.client
|
||||
.pg_client
|
||||
.read()
|
||||
.await
|
||||
.query(&leader_pg_election.sql_set.step_down, &[])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
drop_table(&leader_pg_election.client, table_name).await;
|
||||
drop_table(&leader_pg_election, table_name).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_follower_action() {
|
||||
maybe_skip_postgres_integration_test!();
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let candidate_lease_ttl_secs = 5;
|
||||
let uuid = uuid::Uuid::new_v4().to_string();
|
||||
let table_name = "test_follower_action_greptime_metakv";
|
||||
|
||||
let follower_client = create_postgres_client(Some(table_name)).await.unwrap();
|
||||
let candidate_lease_ttl = Duration::from_secs(5);
|
||||
let execution_timeout = Duration::from_secs(10);
|
||||
let statement_timeout = Duration::from_secs(10);
|
||||
let meta_lease_ttl = Duration::from_secs(2);
|
||||
let idle_session_timeout = Duration::from_secs(0);
|
||||
let follower_client = create_postgres_client(
|
||||
Some(table_name),
|
||||
execution_timeout,
|
||||
idle_session_timeout,
|
||||
statement_timeout,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let (tx, mut rx) = broadcast::channel(100);
|
||||
let follower_pg_election = PgElection {
|
||||
leader_value: "test_follower".to_string(),
|
||||
client: follower_client,
|
||||
pg_client: RwLock::new(follower_client),
|
||||
is_leader: AtomicBool::new(false),
|
||||
leader_infancy: AtomicBool::new(true),
|
||||
leader_watcher: tx,
|
||||
store_key_prefix: uuid.clone(),
|
||||
candidate_lease_ttl_secs,
|
||||
meta_lease_ttl_secs: 2,
|
||||
sql_set: ElectionSqlFactory::new(28322, table_name, 2).build(),
|
||||
candidate_lease_ttl,
|
||||
meta_lease_ttl,
|
||||
sql_set: ElectionSqlFactory::new(28322, table_name).build(),
|
||||
};
|
||||
|
||||
let leader_client = create_postgres_client(Some(table_name)).await.unwrap();
|
||||
let leader_client = create_postgres_client(
|
||||
Some(table_name),
|
||||
execution_timeout,
|
||||
idle_session_timeout,
|
||||
statement_timeout,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let (tx, _) = broadcast::channel(100);
|
||||
let leader_pg_election = PgElection {
|
||||
leader_value: "test_leader".to_string(),
|
||||
client: leader_client,
|
||||
pg_client: RwLock::new(leader_client),
|
||||
is_leader: AtomicBool::new(false),
|
||||
leader_infancy: AtomicBool::new(true),
|
||||
leader_watcher: tx,
|
||||
store_key_prefix: uuid,
|
||||
candidate_lease_ttl_secs,
|
||||
meta_lease_ttl_secs: 2,
|
||||
sql_set: ElectionSqlFactory::new(28322, table_name, 2).build(),
|
||||
candidate_lease_ttl,
|
||||
meta_lease_ttl,
|
||||
sql_set: ElectionSqlFactory::new(28322, table_name).build(),
|
||||
};
|
||||
|
||||
leader_pg_election
|
||||
.client
|
||||
.pg_client
|
||||
.read()
|
||||
.await
|
||||
.query(&leader_pg_election.sql_set.campaign, &[])
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1309,11 +1541,41 @@ mod tests {
|
||||
|
||||
// Clean up
|
||||
leader_pg_election
|
||||
.client
|
||||
.pg_client
|
||||
.read()
|
||||
.await
|
||||
.query(&leader_pg_election.sql_set.step_down, &[])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
drop_table(&follower_pg_election.client, table_name).await;
|
||||
drop_table(&follower_pg_election, table_name).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_idle_session_timeout() {
|
||||
maybe_skip_postgres_integration_test!();
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let execution_timeout = Duration::from_secs(10);
|
||||
let statement_timeout = Duration::from_secs(10);
|
||||
let idle_session_timeout = Duration::from_secs(1);
|
||||
let mut client = create_postgres_client(
|
||||
None,
|
||||
execution_timeout,
|
||||
idle_session_timeout,
|
||||
statement_timeout,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
tokio::time::sleep(Duration::from_millis(1100)).await;
|
||||
// Wait for the idle session timeout.
|
||||
let err = client.query("SELECT 1", &[]).await.unwrap_err();
|
||||
assert_matches!(err, error::Error::PostgresExecution { .. });
|
||||
let error::Error::PostgresExecution { error, .. } = err else {
|
||||
panic!("Expected PostgresExecution error");
|
||||
};
|
||||
assert!(error.is_closed());
|
||||
// Reset the client and try again.
|
||||
client.reset_client().await.unwrap();
|
||||
let _ = client.query("SELECT 1", &[]).await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -748,21 +748,31 @@ pub enum Error {
|
||||
},
|
||||
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
#[snafu(display("Failed to execute via postgres"))]
|
||||
#[snafu(display("Failed to execute via postgres, sql: {}", sql))]
|
||||
PostgresExecution {
|
||||
#[snafu(source)]
|
||||
error: tokio_postgres::Error,
|
||||
sql: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
#[snafu(display("Failed to connect to Postgres"))]
|
||||
ConnectPostgres {
|
||||
#[snafu(source)]
|
||||
error: tokio_postgres::Error,
|
||||
#[snafu(display("Failed to get Postgres client"))]
|
||||
GetPostgresClient {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
#[snafu(source)]
|
||||
error: deadpool::managed::PoolError<tokio_postgres::Error>,
|
||||
},
|
||||
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
#[snafu(display("Sql execution timeout, sql: {}, duration: {:?}", sql, duration))]
|
||||
SqlExecutionTimeout {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
sql: String,
|
||||
duration: std::time::Duration,
|
||||
},
|
||||
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
@@ -1005,9 +1015,10 @@ impl ErrorExt for Error {
|
||||
Error::LookupPeer { source, .. } => source.status_code(),
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
Error::CreatePostgresPool { .. }
|
||||
| Error::GetPostgresClient { .. }
|
||||
| Error::GetPostgresConnection { .. }
|
||||
| Error::PostgresExecution { .. }
|
||||
| Error::ConnectPostgres { .. } => StatusCode::Internal,
|
||||
| Error::SqlExecutionTimeout { .. } => StatusCode::Internal,
|
||||
#[cfg(feature = "mysql_kvbackend")]
|
||||
Error::MySqlExecution { .. }
|
||||
| Error::CreateMySqlPool { .. }
|
||||
|
||||
@@ -46,6 +46,7 @@ use common_telemetry::{error, info, warn};
|
||||
use common_wal::config::MetasrvWalConfig;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use servers::export_metrics::ExportMetricsOption;
|
||||
use servers::grpc::GrpcOptions;
|
||||
use servers::http::HttpOptions;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::RegionId;
|
||||
@@ -96,8 +97,10 @@ pub enum BackendImpl {
|
||||
#[serde(default)]
|
||||
pub struct MetasrvOptions {
|
||||
/// The address the server listens on.
|
||||
#[deprecated(note = "Use grpc.bind_addr instead")]
|
||||
pub bind_addr: String,
|
||||
/// The address the server advertises to the clients.
|
||||
#[deprecated(note = "Use grpc.server_addr instead")]
|
||||
pub server_addr: String,
|
||||
/// The address of the store, e.g., etcd.
|
||||
pub store_addrs: Vec<String>,
|
||||
@@ -112,6 +115,7 @@ pub struct MetasrvOptions {
|
||||
/// If it's true, the region failover will be allowed even if the local WAL is used.
|
||||
/// Note that this option is not recommended to be set to true, because it may lead to data loss during failover.
|
||||
pub allow_region_failover_on_local_wal: bool,
|
||||
pub grpc: GrpcOptions,
|
||||
/// The HTTP server options.
|
||||
pub http: HttpOptions,
|
||||
/// The logging options.
|
||||
@@ -166,8 +170,6 @@ impl fmt::Debug for MetasrvOptions {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
let mut debug_struct = f.debug_struct("MetasrvOptions");
|
||||
debug_struct
|
||||
.field("bind_addr", &self.bind_addr)
|
||||
.field("server_addr", &self.server_addr)
|
||||
.field("store_addrs", &self.sanitize_store_addrs())
|
||||
.field("selector", &self.selector)
|
||||
.field("use_memory_store", &self.use_memory_store)
|
||||
@@ -176,6 +178,7 @@ impl fmt::Debug for MetasrvOptions {
|
||||
"allow_region_failover_on_local_wal",
|
||||
&self.allow_region_failover_on_local_wal,
|
||||
)
|
||||
.field("grpc", &self.grpc)
|
||||
.field("http", &self.http)
|
||||
.field("logging", &self.logging)
|
||||
.field("procedure", &self.procedure)
|
||||
@@ -208,14 +211,19 @@ const DEFAULT_METASRV_ADDR_PORT: &str = "3002";
|
||||
impl Default for MetasrvOptions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
bind_addr: format!("127.0.0.1:{}", DEFAULT_METASRV_ADDR_PORT),
|
||||
// If server_addr is not set, the server will use the local ip address as the server address.
|
||||
#[allow(deprecated)]
|
||||
bind_addr: String::new(),
|
||||
#[allow(deprecated)]
|
||||
server_addr: String::new(),
|
||||
store_addrs: vec!["127.0.0.1:2379".to_string()],
|
||||
selector: SelectorType::default(),
|
||||
use_memory_store: false,
|
||||
enable_region_failover: false,
|
||||
allow_region_failover_on_local_wal: false,
|
||||
grpc: GrpcOptions {
|
||||
bind_addr: format!("127.0.0.1:{}", DEFAULT_METASRV_ADDR_PORT),
|
||||
..Default::default()
|
||||
},
|
||||
http: HttpOptions::default(),
|
||||
logging: LoggingOptions::default(),
|
||||
procedure: ProcedureConfig {
|
||||
@@ -253,37 +261,6 @@ impl Configurable for MetasrvOptions {
|
||||
}
|
||||
|
||||
impl MetasrvOptions {
|
||||
/// Detect server address.
|
||||
#[cfg(not(target_os = "android"))]
|
||||
pub fn detect_server_addr(&mut self) {
|
||||
if self.server_addr.is_empty() {
|
||||
match local_ip_address::local_ip() {
|
||||
Ok(ip) => {
|
||||
let detected_addr = format!(
|
||||
"{}:{}",
|
||||
ip,
|
||||
self.bind_addr
|
||||
.split(':')
|
||||
.nth(1)
|
||||
.unwrap_or(DEFAULT_METASRV_ADDR_PORT)
|
||||
);
|
||||
info!("Using detected: {} as server address", detected_addr);
|
||||
self.server_addr = detected_addr;
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to detect local ip address: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(target_os = "android")]
|
||||
pub fn detect_server_addr(&mut self) {
|
||||
if self.server_addr.is_empty() {
|
||||
common_telemetry::debug!("detect local IP is not supported on Android");
|
||||
}
|
||||
}
|
||||
|
||||
fn sanitize_store_addrs(&self) -> Vec<String> {
|
||||
self.store_addrs
|
||||
.iter()
|
||||
@@ -582,6 +559,7 @@ impl Metasrv {
|
||||
if let Err(e) = res {
|
||||
warn!(e; "Metasrv election error");
|
||||
}
|
||||
election.reset_campaign().await;
|
||||
info!("Metasrv re-initiate election");
|
||||
}
|
||||
info!("Metasrv stopped");
|
||||
@@ -638,7 +616,7 @@ impl Metasrv {
|
||||
pub fn node_info(&self) -> MetasrvNodeInfo {
|
||||
let build_info = common_version::build_info();
|
||||
MetasrvNodeInfo {
|
||||
addr: self.options().server_addr.clone(),
|
||||
addr: self.options().grpc.server_addr.clone(),
|
||||
version: build_info.version.to_string(),
|
||||
git_commit: build_info.commit_short.to_string(),
|
||||
start_time_ms: self.start_time_ms(),
|
||||
@@ -730,7 +708,7 @@ impl Metasrv {
|
||||
|
||||
#[inline]
|
||||
pub fn new_ctx(&self) -> Context {
|
||||
let server_addr = self.options().server_addr.clone();
|
||||
let server_addr = self.options().grpc.server_addr.clone();
|
||||
let in_memory = self.in_memory.clone();
|
||||
let kv_backend = self.kv_backend.clone();
|
||||
let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
|
||||
|
||||
@@ -179,8 +179,8 @@ impl MetasrvBuilder {
|
||||
let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
|
||||
|
||||
let state = Arc::new(RwLock::new(match election {
|
||||
None => State::leader(options.server_addr.to_string(), true),
|
||||
Some(_) => State::follower(options.server_addr.to_string()),
|
||||
None => State::leader(options.grpc.server_addr.to_string(), true),
|
||||
Some(_) => State::follower(options.grpc.server_addr.to_string()),
|
||||
}));
|
||||
|
||||
let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::new(
|
||||
@@ -203,7 +203,7 @@ impl MetasrvBuilder {
|
||||
));
|
||||
let maintenance_mode_manager = Arc::new(MaintenanceModeManager::new(kv_backend.clone()));
|
||||
let selector_ctx = SelectorContext {
|
||||
server_addr: options.server_addr.clone(),
|
||||
server_addr: options.grpc.server_addr.clone(),
|
||||
datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS,
|
||||
flownode_lease_secs: distributed_time_constants::FLOWNODE_LEASE_SECS,
|
||||
kv_backend: kv_backend.clone(),
|
||||
@@ -272,7 +272,7 @@ impl MetasrvBuilder {
|
||||
let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new(
|
||||
mailbox.clone(),
|
||||
MetasrvInfo {
|
||||
server_addr: options.server_addr.clone(),
|
||||
server_addr: options.grpc.server_addr.clone(),
|
||||
},
|
||||
));
|
||||
let peer_lookup_service = Arc::new(MetaPeerLookupService::new(meta_peer_client.clone()));
|
||||
@@ -315,7 +315,7 @@ impl MetasrvBuilder {
|
||||
memory_region_keeper.clone(),
|
||||
region_failure_detector_controller.clone(),
|
||||
mailbox.clone(),
|
||||
options.server_addr.clone(),
|
||||
options.grpc.server_addr.clone(),
|
||||
cache_invalidator.clone(),
|
||||
),
|
||||
));
|
||||
@@ -390,7 +390,7 @@ impl MetasrvBuilder {
|
||||
client: Arc::new(kafka_client),
|
||||
table_metadata_manager: table_metadata_manager.clone(),
|
||||
leader_region_registry: leader_region_registry.clone(),
|
||||
server_addr: options.server_addr.clone(),
|
||||
server_addr: options.grpc.server_addr.clone(),
|
||||
mailbox: mailbox.clone(),
|
||||
};
|
||||
let wal_prune_manager = WalPruneManager::new(
|
||||
|
||||
@@ -26,6 +26,7 @@ use common_meta::kv_backend::etcd::EtcdStore;
|
||||
use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
|
||||
use hyper_util::rt::TokioIo;
|
||||
use servers::grpc::GrpcOptions;
|
||||
use tonic::codec::CompressionEncoding;
|
||||
use tower::service_fn;
|
||||
|
||||
@@ -47,7 +48,10 @@ pub async fn mock_with_memstore() -> MockInfo {
|
||||
let in_memory = Arc::new(MemoryKvBackend::new());
|
||||
mock(
|
||||
MetasrvOptions {
|
||||
server_addr: "127.0.0.1:3002".to_string(),
|
||||
grpc: GrpcOptions {
|
||||
server_addr: "127.0.0.1:3002".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
..Default::default()
|
||||
},
|
||||
kv_backend,
|
||||
@@ -62,7 +66,10 @@ pub async fn mock_with_etcdstore(addr: &str) -> MockInfo {
|
||||
let kv_backend = EtcdStore::with_endpoints([addr], 128).await.unwrap();
|
||||
mock(
|
||||
MetasrvOptions {
|
||||
server_addr: "127.0.0.1:3002".to_string(),
|
||||
grpc: GrpcOptions {
|
||||
server_addr: "127.0.0.1:3002".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
..Default::default()
|
||||
},
|
||||
kv_backend,
|
||||
@@ -80,7 +87,7 @@ pub async fn mock(
|
||||
datanode_clients: Option<Arc<NodeClients>>,
|
||||
in_memory: Option<ResettableKvBackendRef>,
|
||||
) -> MockInfo {
|
||||
let server_addr = opts.server_addr.clone();
|
||||
let server_addr = opts.grpc.server_addr.clone();
|
||||
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
|
||||
|
||||
table_metadata_manager.init().await.unwrap();
|
||||
|
||||
@@ -88,7 +88,7 @@ impl cluster_server::Cluster for Metasrv {
|
||||
return Ok(Response::new(resp));
|
||||
}
|
||||
|
||||
let leader_addr = &self.options().server_addr;
|
||||
let leader_addr = &self.options().grpc.server_addr;
|
||||
let (leader, followers) = match self.election() {
|
||||
Some(election) => {
|
||||
let nodes = election.all_candidates().await?;
|
||||
|
||||
@@ -190,6 +190,7 @@ mod tests {
|
||||
use api::v1::meta::*;
|
||||
use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
use common_telemetry::tracing_context::W3cTrace;
|
||||
use servers::grpc::GrpcOptions;
|
||||
use tonic::IntoRequest;
|
||||
|
||||
use super::get_node_id;
|
||||
@@ -203,7 +204,10 @@ mod tests {
|
||||
let metasrv = MetasrvBuilder::new()
|
||||
.kv_backend(kv_backend)
|
||||
.options(MetasrvOptions {
|
||||
server_addr: "127.0.0.1:3002".to_string(),
|
||||
grpc: GrpcOptions {
|
||||
server_addr: "127.0.0.1:3002".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
..Default::default()
|
||||
})
|
||||
.build()
|
||||
@@ -216,7 +220,7 @@ mod tests {
|
||||
|
||||
let res = metasrv.ask_leader(req.into_request()).await.unwrap();
|
||||
let res = res.into_inner();
|
||||
assert_eq!(metasrv.options().bind_addr, res.leader.unwrap().addr);
|
||||
assert_eq!(metasrv.options().grpc.server_addr, res.leader.unwrap().addr);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::flow::FlowRequestHeader;
|
||||
use api::v1::flow::{AdjustFlow, FlowRequestHeader};
|
||||
use async_trait::async_trait;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_function::handlers::FlowServiceHandler;
|
||||
@@ -22,6 +22,7 @@ use common_query::error::Result;
|
||||
use common_telemetry::tracing_context::TracingContext;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::StreamExt;
|
||||
use serde_json::json;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
@@ -57,9 +58,96 @@ impl FlowServiceHandler for FlowServiceOperator {
|
||||
) -> Result<api::v1::flow::FlowResponse> {
|
||||
self.flush_inner(catalog, flow, ctx).await
|
||||
}
|
||||
|
||||
async fn adjust(
|
||||
&self,
|
||||
catalog: &str,
|
||||
flow: &str,
|
||||
min_run_interval_secs: u64,
|
||||
max_filter_num_per_query: usize,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<api::v1::flow::FlowResponse> {
|
||||
self.adjust_inner(
|
||||
catalog,
|
||||
flow,
|
||||
min_run_interval_secs,
|
||||
max_filter_num_per_query,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowServiceOperator {
|
||||
async fn adjust_inner(
|
||||
&self,
|
||||
catalog: &str,
|
||||
flow: &str,
|
||||
min_run_interval_secs: u64,
|
||||
max_filter_num_per_query: usize,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<api::v1::flow::FlowResponse> {
|
||||
let id = self
|
||||
.flow_metadata_manager
|
||||
.flow_name_manager()
|
||||
.get(catalog, flow)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_query::error::ExecuteSnafu)?
|
||||
.context(common_meta::error::FlowNotFoundSnafu {
|
||||
flow_name: format!("{}.{}", catalog, flow),
|
||||
})
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_query::error::ExecuteSnafu)?
|
||||
.flow_id();
|
||||
|
||||
let all_flownode_peers = self
|
||||
.flow_metadata_manager
|
||||
.flow_route_manager()
|
||||
.routes(id)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_query::error::ExecuteSnafu)?;
|
||||
|
||||
// order of flownodes doesn't matter here
|
||||
let all_flow_nodes = FuturesUnordered::from_iter(
|
||||
all_flownode_peers
|
||||
.iter()
|
||||
.map(|(_key, peer)| self.node_manager.flownode(peer.peer())),
|
||||
)
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
|
||||
// TODO(discord9): use proper type for flow options
|
||||
let options = json!({
|
||||
"min_run_interval_secs": min_run_interval_secs,
|
||||
"max_filter_num_per_query": max_filter_num_per_query,
|
||||
});
|
||||
|
||||
for node in all_flow_nodes {
|
||||
let _res = {
|
||||
use api::v1::flow::{flow_request, FlowRequest};
|
||||
let flush_req = FlowRequest {
|
||||
header: Some(FlowRequestHeader {
|
||||
tracing_context: TracingContext::from_current_span().to_w3c(),
|
||||
query_context: Some(
|
||||
common_meta::rpc::ddl::QueryContext::from(ctx.clone()).into(),
|
||||
),
|
||||
}),
|
||||
body: Some(flow_request::Body::Adjust(AdjustFlow {
|
||||
flow_id: Some(api::v1::FlowId { id }),
|
||||
options: options.to_string(),
|
||||
})),
|
||||
};
|
||||
node.handle(flush_req)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_query::error::ExecuteSnafu)?
|
||||
};
|
||||
}
|
||||
Ok(Default::default())
|
||||
}
|
||||
|
||||
/// Flush the flownodes according to the flow id.
|
||||
async fn flush_inner(
|
||||
&self,
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::debug;
|
||||
use datafusion::datasource::DefaultTableSource;
|
||||
use datafusion::error::Result as DfResult;
|
||||
use datafusion_common::config::ConfigOptions;
|
||||
@@ -154,6 +155,7 @@ struct PlanRewriter {
|
||||
/// Partition columns of the table in current pass
|
||||
partition_cols: Option<Vec<String>>,
|
||||
column_requirements: HashSet<Column>,
|
||||
expand_on_next_call: bool,
|
||||
}
|
||||
|
||||
impl PlanRewriter {
|
||||
@@ -174,6 +176,10 @@ impl PlanRewriter {
|
||||
{
|
||||
return true;
|
||||
}
|
||||
if self.expand_on_next_call {
|
||||
self.expand_on_next_call = false;
|
||||
return true;
|
||||
}
|
||||
match Categorizer::check_plan(plan, self.partition_cols.clone()) {
|
||||
Commutativity::Commutative => {}
|
||||
Commutativity::PartialCommutative => {
|
||||
@@ -190,12 +196,17 @@ impl PlanRewriter {
|
||||
self.stage.push(plan)
|
||||
}
|
||||
}
|
||||
Commutativity::TransformedCommutative(transformer) => {
|
||||
Commutativity::TransformedCommutative {
|
||||
transformer,
|
||||
expand_on_parent,
|
||||
} => {
|
||||
if let Some(transformer) = transformer
|
||||
&& let Some(plan) = transformer(plan)
|
||||
&& let Some(changed_plan) = transformer(plan)
|
||||
{
|
||||
self.update_column_requirements(&plan);
|
||||
self.stage.push(plan)
|
||||
debug!("PlanRewriter: transformed plan: {changed_plan} from {plan}");
|
||||
self.update_column_requirements(&changed_plan);
|
||||
self.stage.push(changed_plan);
|
||||
self.expand_on_next_call = expand_on_parent;
|
||||
}
|
||||
}
|
||||
Commutativity::NonCommutative
|
||||
@@ -391,10 +402,21 @@ impl TreeNodeRewriter for PlanRewriter {
|
||||
return Ok(Transformed::yes(node));
|
||||
};
|
||||
|
||||
let parent = parent.clone();
|
||||
|
||||
// TODO(ruihang): avoid this clone
|
||||
if self.should_expand(&parent.clone()) {
|
||||
if self.should_expand(&parent) {
|
||||
// TODO(ruihang): does this work for nodes with multiple children?;
|
||||
let node = self.expand(node)?;
|
||||
debug!("PlanRewriter: should expand child:\n {node}\n Of Parent: {parent}");
|
||||
let node = self.expand(node);
|
||||
debug!(
|
||||
"PlanRewriter: expanded plan: {}",
|
||||
match &node {
|
||||
Ok(n) => n.to_string(),
|
||||
Err(e) => format!("Error expanding plan: {e}"),
|
||||
}
|
||||
);
|
||||
let node = node?;
|
||||
self.pop_stack();
|
||||
return Ok(Transformed::yes(node));
|
||||
}
|
||||
|
||||
@@ -15,6 +15,9 @@
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_function::aggr::{HllState, UddSketchState, HLL_NAME, UDDSKETCH_STATE_NAME};
|
||||
use common_telemetry::debug;
|
||||
use datafusion::functions_aggregate::sum::sum_udaf;
|
||||
use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNode};
|
||||
use promql::extension_plan::{
|
||||
EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize,
|
||||
@@ -23,12 +26,157 @@ use promql::extension_plan::{
|
||||
use crate::dist_plan::merge_sort::{merge_sort_transformer, MergeSortLogicalPlan};
|
||||
use crate::dist_plan::MergeScanLogicalPlan;
|
||||
|
||||
/// generate the upper aggregation plan that will execute on the frontend.
|
||||
pub fn step_aggr_to_upper_aggr(aggr_plan: &LogicalPlan) -> datafusion_common::Result<LogicalPlan> {
|
||||
let LogicalPlan::Aggregate(aggr) = aggr_plan else {
|
||||
return Err(datafusion_common::DataFusionError::Plan(
|
||||
"step_aggr_to_upper_aggr only accepts Aggregate plan".to_string(),
|
||||
));
|
||||
};
|
||||
if !is_all_aggr_exprs_steppable(&aggr.aggr_expr) {
|
||||
return Err(datafusion_common::DataFusionError::NotImplemented(
|
||||
"Some aggregate expressions are not steppable".to_string(),
|
||||
));
|
||||
}
|
||||
let mut upper_aggr_expr = vec![];
|
||||
for aggr_expr in &aggr.aggr_expr {
|
||||
let Some(aggr_func) = get_aggr_func(aggr_expr) else {
|
||||
return Err(datafusion_common::DataFusionError::NotImplemented(
|
||||
"Aggregate function not found".to_string(),
|
||||
));
|
||||
};
|
||||
let col_name = aggr_expr.name_for_alias()?;
|
||||
let input_column =
|
||||
Expr::Column(datafusion_common::Column::new_unqualified(col_name.clone()));
|
||||
let upper_func = match aggr_func.func.name() {
|
||||
"sum" | "min" | "max" | "last_value" | "first_value" => {
|
||||
// aggr_calc(aggr_merge(input_column))) as col_name
|
||||
let mut new_aggr_func = aggr_func.clone();
|
||||
new_aggr_func.args = vec![input_column.clone()];
|
||||
new_aggr_func
|
||||
}
|
||||
"count" => {
|
||||
// sum(input_column) as col_name
|
||||
let mut new_aggr_func = aggr_func.clone();
|
||||
new_aggr_func.func = sum_udaf();
|
||||
new_aggr_func.args = vec![input_column.clone()];
|
||||
new_aggr_func
|
||||
}
|
||||
UDDSKETCH_STATE_NAME => {
|
||||
// udd_merge(bucket_size, error_rate input_column) as col_name
|
||||
let mut new_aggr_func = aggr_func.clone();
|
||||
new_aggr_func.func = Arc::new(UddSketchState::merge_udf_impl());
|
||||
new_aggr_func.args[2] = input_column.clone();
|
||||
new_aggr_func
|
||||
}
|
||||
HLL_NAME => {
|
||||
// hll_merge(input_column) as col_name
|
||||
let mut new_aggr_func = aggr_func.clone();
|
||||
new_aggr_func.func = Arc::new(HllState::merge_udf_impl());
|
||||
new_aggr_func.args = vec![input_column.clone()];
|
||||
new_aggr_func
|
||||
}
|
||||
_ => {
|
||||
return Err(datafusion_common::DataFusionError::NotImplemented(format!(
|
||||
"Aggregate function {} is not supported for Step aggregation",
|
||||
aggr_func.func.name()
|
||||
)))
|
||||
}
|
||||
};
|
||||
|
||||
// deal with nested alias case
|
||||
let mut new_aggr_expr = aggr_expr.clone();
|
||||
{
|
||||
let new_aggr_func = get_aggr_func_mut(&mut new_aggr_expr).unwrap();
|
||||
*new_aggr_func = upper_func;
|
||||
}
|
||||
|
||||
// make the column name the same, so parent can recognize it
|
||||
upper_aggr_expr.push(new_aggr_expr.alias(col_name));
|
||||
}
|
||||
let mut new_aggr = aggr.clone();
|
||||
new_aggr.aggr_expr = upper_aggr_expr;
|
||||
// group by expr also need alias to avoid duplicated computing
|
||||
|
||||
let mut new_group_expr = new_aggr.group_expr.clone();
|
||||
for expr in &mut new_group_expr {
|
||||
if let Expr::Column(_) = expr {
|
||||
// already a column, no need to change
|
||||
continue;
|
||||
}
|
||||
let col_name = expr.name_for_alias()?;
|
||||
let input_column =
|
||||
Expr::Column(datafusion_common::Column::new_unqualified(col_name.clone()));
|
||||
*expr = input_column.alias(col_name);
|
||||
}
|
||||
new_aggr.group_expr = new_group_expr;
|
||||
// return the new logical plan
|
||||
Ok(LogicalPlan::Aggregate(new_aggr))
|
||||
}
|
||||
|
||||
/// Check if the given aggregate expression is steppable.
|
||||
/// As in if it can be split into multiple steps:
|
||||
/// i.e. on datanode first call `state(input)` then
|
||||
/// on frontend call `calc(merge(state))` to get the final result.
|
||||
///
|
||||
pub fn is_all_aggr_exprs_steppable(aggr_exprs: &[Expr]) -> bool {
|
||||
let step_action = HashSet::from([
|
||||
"sum",
|
||||
"count",
|
||||
"min",
|
||||
"max",
|
||||
"first_value",
|
||||
"last_value",
|
||||
UDDSKETCH_STATE_NAME,
|
||||
HLL_NAME,
|
||||
]);
|
||||
aggr_exprs.iter().all(|expr| {
|
||||
if let Some(aggr_func) = get_aggr_func(expr) {
|
||||
if aggr_func.distinct {
|
||||
// Distinct aggregate functions are not steppable(yet).
|
||||
return false;
|
||||
}
|
||||
step_action.contains(aggr_func.func.name())
|
||||
} else {
|
||||
false
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get_aggr_func(expr: &Expr) -> Option<&datafusion_expr::expr::AggregateFunction> {
|
||||
let mut expr_ref = expr;
|
||||
while let Expr::Alias(alias) = expr_ref {
|
||||
expr_ref = &alias.expr;
|
||||
}
|
||||
if let Expr::AggregateFunction(aggr_func) = expr_ref {
|
||||
Some(aggr_func)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_aggr_func_mut(expr: &mut Expr) -> Option<&mut datafusion_expr::expr::AggregateFunction> {
|
||||
let mut expr_ref = expr;
|
||||
while let Expr::Alias(alias) = expr_ref {
|
||||
expr_ref = &mut alias.expr;
|
||||
}
|
||||
if let Expr::AggregateFunction(aggr_func) = expr_ref {
|
||||
Some(aggr_func)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub enum Commutativity {
|
||||
Commutative,
|
||||
PartialCommutative,
|
||||
ConditionalCommutative(Option<Transformer>),
|
||||
TransformedCommutative(Option<Transformer>),
|
||||
TransformedCommutative {
|
||||
transformer: Option<Transformer>,
|
||||
/// whether the transformer changes the child to parent
|
||||
expand_on_parent: bool,
|
||||
},
|
||||
NonCommutative,
|
||||
Unimplemented,
|
||||
/// For unrelated plans like DDL
|
||||
@@ -55,7 +203,18 @@ impl Categorizer {
|
||||
LogicalPlan::Filter(filter) => Self::check_expr(&filter.predicate),
|
||||
LogicalPlan::Window(_) => Commutativity::Unimplemented,
|
||||
LogicalPlan::Aggregate(aggr) => {
|
||||
if !Self::check_partition(&aggr.group_expr, &partition_cols) {
|
||||
let is_all_steppable = is_all_aggr_exprs_steppable(&aggr.aggr_expr);
|
||||
let is_partition = Self::check_partition(&aggr.group_expr, &partition_cols);
|
||||
if !is_partition && is_all_steppable {
|
||||
debug!("Plan is steppable: {plan}");
|
||||
return Commutativity::TransformedCommutative {
|
||||
transformer: Some(Arc::new(|plan: &LogicalPlan| {
|
||||
step_aggr_to_upper_aggr(plan).ok()
|
||||
})),
|
||||
expand_on_parent: true,
|
||||
};
|
||||
}
|
||||
if !is_partition {
|
||||
return Commutativity::NonCommutative;
|
||||
}
|
||||
for expr in &aggr.aggr_expr {
|
||||
|
||||
@@ -52,7 +52,7 @@ impl FlightRecordBatchStream {
|
||||
rx,
|
||||
join_handle,
|
||||
done: false,
|
||||
encoder: FlightEncoder::with_compression_disabled(),
|
||||
encoder: FlightEncoder::default(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -403,7 +403,10 @@ pub async fn handle_get_trace(
|
||||
.with_label_values(&[&db, "/api/traces"])
|
||||
.start_timer();
|
||||
|
||||
let output = match handler.get_trace(query_ctx, &trace_id).await {
|
||||
let output = match handler
|
||||
.get_trace(query_ctx, &trace_id, query_params.start, query_params.end)
|
||||
.await
|
||||
{
|
||||
Ok(output) => output,
|
||||
Err(err) => {
|
||||
return handle_query_error(
|
||||
|
||||
@@ -203,7 +203,13 @@ pub trait JaegerQueryHandler {
|
||||
) -> Result<Output>;
|
||||
|
||||
/// Get trace by trace id. It's used for `/api/traces/{trace_id}` API.
|
||||
async fn get_trace(&self, ctx: QueryContextRef, trace_id: &str) -> Result<Output>;
|
||||
async fn get_trace(
|
||||
&self,
|
||||
ctx: QueryContextRef,
|
||||
trace_id: &str,
|
||||
start_time: Option<i64>,
|
||||
end_time: Option<i64>,
|
||||
) -> Result<Output>;
|
||||
|
||||
/// Find traces by query params. It's used for `/api/traces` API.
|
||||
async fn find_traces(
|
||||
|
||||
@@ -56,6 +56,7 @@ use meta_srv::metasrv::{Metasrv, MetasrvOptions, SelectorRef};
|
||||
use meta_srv::mocks::MockInfo;
|
||||
use servers::grpc::flight::FlightCraftWrapper;
|
||||
use servers::grpc::region_server::RegionServerRequestHandler;
|
||||
use servers::grpc::GrpcOptions;
|
||||
use servers::heartbeat_options::HeartbeatOptions;
|
||||
use servers::server::ServerHandlers;
|
||||
use tempfile::TempDir;
|
||||
@@ -190,7 +191,10 @@ impl GreptimeDbClusterBuilder {
|
||||
max_running_procedures: 128,
|
||||
},
|
||||
wal: self.metasrv_wal_config.clone(),
|
||||
server_addr: "127.0.0.1:3002".to_string(),
|
||||
grpc: GrpcOptions {
|
||||
server_addr: "127.0.0.1:3002".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
|
||||
@@ -139,8 +139,7 @@ mod test {
|
||||
let schema = record_batches[0].schema.arrow_schema().clone();
|
||||
|
||||
let stream = futures::stream::once(async move {
|
||||
let mut schema_data =
|
||||
FlightEncoder::with_compression_disabled().encode(FlightMessage::Schema(schema));
|
||||
let mut schema_data = FlightEncoder::default().encode(FlightMessage::Schema(schema));
|
||||
let metadata = DoPutMetadata::new(0);
|
||||
schema_data.app_metadata = serde_json::to_vec(&metadata).unwrap().into();
|
||||
// first message in "DoPut" stream should carry table name in flight descriptor
|
||||
@@ -155,7 +154,7 @@ mod test {
|
||||
tokio_stream::iter(record_batches)
|
||||
.enumerate()
|
||||
.map(|(i, x)| {
|
||||
let mut encoder = FlightEncoder::with_compression_disabled();
|
||||
let mut encoder = FlightEncoder::default();
|
||||
let message = FlightMessage::RecordBatch(x.into_df_record_batch());
|
||||
let mut data = encoder.encode(message);
|
||||
let metadata = DoPutMetadata::new((i + 1) as i64);
|
||||
|
||||
@@ -4542,7 +4542,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
|
||||
let expected: Value = serde_json::from_str(expected).unwrap();
|
||||
assert_eq!(resp, expected);
|
||||
|
||||
// Test `/api/traces/{trace_id}` API.
|
||||
// Test `/api/traces/{trace_id}` API without start and end.
|
||||
let res = client
|
||||
.get("/v1/jaeger/api/traces/5611dce1bc9ebed65352d99a027b08ea")
|
||||
.header("x-greptime-trace-table-name", trace_table_name)
|
||||
@@ -4658,6 +4658,122 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
|
||||
let expected: Value = serde_json::from_str(expected).unwrap();
|
||||
assert_eq!(resp, expected);
|
||||
|
||||
// Test `/api/traces/{trace_id}` API with start and end in microseconds.
|
||||
let res = client
|
||||
.get("/v1/jaeger/api/traces/5611dce1bc9ebed65352d99a027b08ea?start=1738726754492421&end=1738726754642422")
|
||||
.header("x-greptime-trace-table-name", trace_table_name)
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
let expected = r#"{
|
||||
"data": [
|
||||
{
|
||||
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
|
||||
"spans": [
|
||||
{
|
||||
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
|
||||
"spanID": "ffa03416a7b9ea48",
|
||||
"operationName": "access-redis",
|
||||
"references": [],
|
||||
"startTime": 1738726754492422,
|
||||
"duration": 100000,
|
||||
"tags": [
|
||||
{
|
||||
"key": "net.peer.ip",
|
||||
"type": "string",
|
||||
"value": "1.2.3.4"
|
||||
},
|
||||
{
|
||||
"key": "operation.type",
|
||||
"type": "string",
|
||||
"value": "access-redis"
|
||||
},
|
||||
{
|
||||
"key": "otel.scope.name",
|
||||
"type": "string",
|
||||
"value": "test-jaeger-query-api"
|
||||
},
|
||||
{
|
||||
"key": "otel.scope.version",
|
||||
"type": "string",
|
||||
"value": "1.0.0"
|
||||
},
|
||||
{
|
||||
"key": "peer.service",
|
||||
"type": "string",
|
||||
"value": "test-jaeger-query-api"
|
||||
},
|
||||
{
|
||||
"key": "span.kind",
|
||||
"type": "string",
|
||||
"value": "server"
|
||||
}
|
||||
],
|
||||
"logs": [],
|
||||
"processID": "p1"
|
||||
},
|
||||
{
|
||||
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
|
||||
"spanID": "008421dbbd33a3e9",
|
||||
"operationName": "access-mysql",
|
||||
"references": [],
|
||||
"startTime": 1738726754492421,
|
||||
"duration": 100000,
|
||||
"tags": [
|
||||
{
|
||||
"key": "net.peer.ip",
|
||||
"type": "string",
|
||||
"value": "1.2.3.4"
|
||||
},
|
||||
{
|
||||
"key": "operation.type",
|
||||
"type": "string",
|
||||
"value": "access-mysql"
|
||||
},
|
||||
{
|
||||
"key": "otel.scope.name",
|
||||
"type": "string",
|
||||
"value": "test-jaeger-query-api"
|
||||
},
|
||||
{
|
||||
"key": "otel.scope.version",
|
||||
"type": "string",
|
||||
"value": "1.0.0"
|
||||
},
|
||||
{
|
||||
"key": "peer.service",
|
||||
"type": "string",
|
||||
"value": "test-jaeger-query-api"
|
||||
},
|
||||
{
|
||||
"key": "span.kind",
|
||||
"type": "string",
|
||||
"value": "server"
|
||||
}
|
||||
],
|
||||
"logs": [],
|
||||
"processID": "p1"
|
||||
}
|
||||
],
|
||||
"processes": {
|
||||
"p1": {
|
||||
"serviceName": "test-jaeger-query-api",
|
||||
"tags": []
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"total": 0,
|
||||
"limit": 0,
|
||||
"offset": 0,
|
||||
"errors": []
|
||||
}
|
||||
"#;
|
||||
|
||||
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
|
||||
let expected: Value = serde_json::from_str(expected).unwrap();
|
||||
assert_eq!(resp, expected);
|
||||
|
||||
// Test `/api/traces` API.
|
||||
let res = client
|
||||
.get("/v1/jaeger/api/traces?service=test-jaeger-query-api&operation=access-mysql&start=1738726754492421&end=1738726754642422&tags=%7B%22operation.type%22%3A%22access-mysql%22%7D")
|
||||
|
||||
409
tests/cases/distributed/explain/step_aggr.result
Normal file
409
tests/cases/distributed/explain/step_aggr.result
Normal file
@@ -0,0 +1,409 @@
|
||||
CREATE TABLE integers(
|
||||
host STRING,
|
||||
i BIGINT,
|
||||
ts TIMESTAMP TIME INDEX
|
||||
) PARTITION ON COLUMNS (host) (
|
||||
host < '550-A',
|
||||
host >= '550-A'
|
||||
AND host < '550-W',
|
||||
host >= '550-W'
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO integers (host, i, ts) VALUES
|
||||
('550-A', 1, '2023-01-01 00:00:00'),
|
||||
('550-A', 2, '2023-01-01 01:00:00'),
|
||||
('550-W', 3, '2023-01-01 02:00:00'),
|
||||
('550-W', 4, '2023-01-01 03:00:00');
|
||||
|
||||
Affected Rows: 4
|
||||
|
||||
-- count
|
||||
EXPLAIN SELECT
|
||||
count(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[count(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[count(integers.i)] |
|
||||
| | CoalescePartitionsExec |
|
||||
| | AggregateExec: mode=Partial, gby=[], aggr=[count(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[count(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Projection: count(integers.i) |
|
||||
| | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[count(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | ProjectionExec: expr=[count(integers.i)@1 as count(integers.i)] |
|
||||
| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
|
||||
-- sum
|
||||
EXPLAIN SELECT
|
||||
sum(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[]], aggr=[[sum(sum(integers.i)) AS sum(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[sum(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[sum(integers.i)] |
|
||||
| | CoalescePartitionsExec |
|
||||
| | AggregateExec: mode=Partial, gby=[], aggr=[sum(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
sum(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(sum(integers.i)) AS sum(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[sum(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
sum(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Projection: sum(integers.i) |
|
||||
| | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(sum(integers.i)) AS sum(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[sum(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | ProjectionExec: expr=[sum(integers.i)@1 as sum(integers.i)] |
|
||||
| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
|
||||
-- min
|
||||
EXPLAIN SELECT
|
||||
min(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[]], aggr=[[min(min(integers.i)) AS min(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[min(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[min(integers.i)] |
|
||||
| | CoalescePartitionsExec |
|
||||
| | AggregateExec: mode=Partial, gby=[], aggr=[min(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
min(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[min(min(integers.i)) AS min(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[min(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[min(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[min(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
min(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Projection: min(integers.i) |
|
||||
| | Aggregate: groupBy=[[integers.ts]], aggr=[[min(min(integers.i)) AS min(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[min(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | ProjectionExec: expr=[min(integers.i)@1 as min(integers.i)] |
|
||||
| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[min(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[min(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
|
||||
-- max
|
||||
EXPLAIN SELECT
|
||||
max(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[]], aggr=[[max(max(integers.i)) AS max(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[max(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[max(integers.i)] |
|
||||
| | CoalescePartitionsExec |
|
||||
| | AggregateExec: mode=Partial, gby=[], aggr=[max(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
max(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[max(max(integers.i)) AS max(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[max(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[max(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[max(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
max(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Projection: max(integers.i) |
|
||||
| | Aggregate: groupBy=[[integers.ts]], aggr=[[max(max(integers.i)) AS max(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[max(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | ProjectionExec: expr=[max(integers.i)@1 as max(integers.i)] |
|
||||
| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[max(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[max(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
|
||||
-- uddsketch_state
|
||||
EXPLAIN SELECT
|
||||
uddsketch_state(128, 0.01, i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[]], aggr=[[uddsketch_merge(Int64(128), Float64(0.01), uddsketch_state(Int64(128),Float64(0.01),integers.i)) AS uddsketch_state(Int64(128),Float64(0.01),integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[uddsketch_state(Int64(128), Float64(0.01), CAST(integers.i AS Float64))]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
|
||||
| | CoalescePartitionsExec |
|
||||
| | AggregateExec: mode=Partial, gby=[], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
uddsketch_state(128, 0.01, i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[uddsketch_merge(Int64(128), Float64(0.01), uddsketch_state(Int64(128),Float64(0.01),integers.i)) AS uddsketch_state(Int64(128),Float64(0.01),integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[uddsketch_state(Int64(128), Float64(0.01), CAST(integers.i AS Float64))]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
uddsketch_state(128, 0.01, i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Projection: uddsketch_state(Int64(128),Float64(0.01),integers.i) |
|
||||
| | Aggregate: groupBy=[[integers.ts]], aggr=[[uddsketch_merge(Int64(128), Float64(0.01), uddsketch_state(Int64(128),Float64(0.01),integers.i)) AS uddsketch_state(Int64(128),Float64(0.01),integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[uddsketch_state(Int64(128), Float64(0.01), CAST(integers.i AS Float64))]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | ProjectionExec: expr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)@1 as uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
|
||||
| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
-- hll
|
||||
EXPLAIN SELECT
|
||||
hll(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
+---------------+----------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+----------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[]], aggr=[[hll_merge(hll(integers.i)) AS hll(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[hll(CAST(integers.i AS Utf8))]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[hll(integers.i)] |
|
||||
| | CoalescePartitionsExec |
|
||||
| | AggregateExec: mode=Partial, gby=[], aggr=[hll(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+----------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
hll(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+---------------------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[hll_merge(hll(integers.i)) AS hll(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[hll(CAST(integers.i AS Utf8))]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[hll(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[hll(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
hll(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Projection: hll(integers.i) |
|
||||
| | Aggregate: groupBy=[[integers.ts]], aggr=[[hll_merge(hll(integers.i)) AS hll(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[hll(CAST(integers.i AS Utf8))]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | ProjectionExec: expr=[hll(integers.i)@1 as hll(integers.i)] |
|
||||
| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[hll(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[hll(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
DROP TABLE integers;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
144
tests/cases/distributed/explain/step_aggr.sql
Normal file
144
tests/cases/distributed/explain/step_aggr.sql
Normal file
@@ -0,0 +1,144 @@
|
||||
CREATE TABLE integers(
|
||||
host STRING,
|
||||
i BIGINT,
|
||||
ts TIMESTAMP TIME INDEX
|
||||
) PARTITION ON COLUMNS (host) (
|
||||
host < '550-A',
|
||||
host >= '550-A'
|
||||
AND host < '550-W',
|
||||
host >= '550-W'
|
||||
);
|
||||
|
||||
INSERT INTO integers (host, i, ts) VALUES
|
||||
('550-A', 1, '2023-01-01 00:00:00'),
|
||||
('550-A', 2, '2023-01-01 01:00:00'),
|
||||
('550-W', 3, '2023-01-01 02:00:00'),
|
||||
('550-W', 4, '2023-01-01 03:00:00');
|
||||
|
||||
-- count
|
||||
EXPLAIN SELECT
|
||||
count(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
EXPLAIN SELECT
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
-- sum
|
||||
EXPLAIN SELECT
|
||||
sum(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
sum(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
EXPLAIN SELECT
|
||||
sum(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
-- min
|
||||
EXPLAIN SELECT
|
||||
min(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
min(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
EXPLAIN SELECT
|
||||
min(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
-- max
|
||||
EXPLAIN SELECT
|
||||
max(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
max(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
EXPLAIN SELECT
|
||||
max(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
-- uddsketch_state
|
||||
EXPLAIN SELECT
|
||||
uddsketch_state(128, 0.01, i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
uddsketch_state(128, 0.01, i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
EXPLAIN SELECT
|
||||
uddsketch_state(128, 0.01, i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
-- hll
|
||||
EXPLAIN SELECT
|
||||
hll(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
hll(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
EXPLAIN SELECT
|
||||
hll(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
DROP TABLE integers;
|
||||
80
tests/cases/distributed/explain/step_aggr_basic.result
Normal file
80
tests/cases/distributed/explain/step_aggr_basic.result
Normal file
@@ -0,0 +1,80 @@
|
||||
CREATE TABLE integers(
|
||||
host STRING,
|
||||
i BIGINT,
|
||||
ts TIMESTAMP TIME INDEX
|
||||
) PARTITION ON COLUMNS (host) (
|
||||
host < '550-A',
|
||||
host >= '550-A'
|
||||
AND host < '550-W',
|
||||
host >= '550-W'
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- count
|
||||
EXPLAIN SELECT
|
||||
count(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[count(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[count(integers.i)] |
|
||||
| | CoalescePartitionsExec |
|
||||
| | AggregateExec: mode=Partial, gby=[], aggr=[count(integers.i)] |
|
||||
| | MergeScanExec: peers=[4398046511104(1024, 0), 4398046511105(1024, 1), 4398046511106(1024, 2), ] |
|
||||
| | |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[count(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(integers.i)] |
|
||||
| | MergeScanExec: peers=[4398046511104(1024, 0), 4398046511105(1024, 1), 4398046511106(1024, 2), ] |
|
||||
| | |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
date_bin('1 hour'::INTERVAL, ts),
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
date_bin('1 hour'::INTERVAL, ts);
|
||||
|
||||
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[date_bin(Utf8("1 hour"),integers.ts) AS date_bin(Utf8("1 hour"),integers.ts)]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[date_bin(CAST(Utf8("1 hour") AS Interval(MonthDayNano)), integers.ts)]], aggr=[[count(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[date_bin(Utf8("1 hour"),integers.ts)@0 as date_bin(Utf8("1 hour"),integers.ts)], aggr=[count(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([date_bin(Utf8("1 hour"),integers.ts)@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[date_bin(Utf8("1 hour"),integers.ts)@0 as date_bin(Utf8("1 hour"),integers.ts)], aggr=[count(integers.i)] |
|
||||
| | MergeScanExec: peers=[4398046511104(1024, 0), 4398046511105(1024, 1), 4398046511106(1024, 2), ] |
|
||||
| | |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
DROP TABLE integers;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
34
tests/cases/distributed/explain/step_aggr_basic.sql
Normal file
34
tests/cases/distributed/explain/step_aggr_basic.sql
Normal file
@@ -0,0 +1,34 @@
|
||||
CREATE TABLE integers(
|
||||
host STRING,
|
||||
i BIGINT,
|
||||
ts TIMESTAMP TIME INDEX
|
||||
) PARTITION ON COLUMNS (host) (
|
||||
host < '550-A',
|
||||
host >= '550-A'
|
||||
AND host < '550-W',
|
||||
host >= '550-W'
|
||||
);
|
||||
|
||||
-- count
|
||||
EXPLAIN SELECT
|
||||
count(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
EXPLAIN SELECT
|
||||
date_bin('1 hour'::INTERVAL, ts),
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
date_bin('1 hour'::INTERVAL, ts);
|
||||
|
||||
DROP TABLE integers;
|
||||
Reference in New Issue
Block a user