Compare commits

..

3 Commits

Author SHA1 Message Date
discord9
0443042f14 feat: better metrics 2025-06-09 20:28:29 +08:00
discord9
1786190235 feat(exp): adjust_flow admin function 2025-06-09 17:02:42 +08:00
discord9
705b2007cf feat: flownode to frontend load balance with guess 2025-06-09 16:34:19 +08:00
34 changed files with 344 additions and 1614 deletions

View File

@@ -59,7 +59,7 @@ runs:
--set base.podTemplate.main.resources.requests.cpu=50m \
--set base.podTemplate.main.resources.requests.memory=256Mi \
--set base.podTemplate.main.resources.limits.cpu=2000m \
--set base.podTemplate.main.resources.limits.memory=3Gi \
--set base.podTemplate.main.resources.limits.memory=2Gi \
--set frontend.replicas=${{ inputs.frontend-replicas }} \
--set datanode.replicas=${{ inputs.datanode-replicas }} \
--set meta.replicas=${{ inputs.meta-replicas }} \

View File

@@ -250,11 +250,6 @@ jobs:
name: unstable-fuzz-logs
path: /tmp/unstable-greptime/
retention-days: 3
- name: Describe pods
if: failure()
shell: bash
run: |
kubectl describe pod -n my-greptimedb
build-greptime-ci:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
@@ -410,11 +405,6 @@ jobs:
shell: bash
run: |
kubectl describe nodes
- name: Describe pod
if: failure()
shell: bash
run: |
kubectl describe pod -n my-greptimedb
- name: Export kind logs
if: failure()
shell: bash
@@ -564,11 +554,6 @@ jobs:
shell: bash
run: |
kubectl describe nodes
- name: Describe pods
if: failure()
shell: bash
run: |
kubectl describe pod -n my-greptimedb
- name: Export kind logs
if: failure()
shell: bash

20
Cargo.lock generated
View File

@@ -4511,9 +4511,9 @@ dependencies = [
[[package]]
name = "flate2"
version = "1.1.2"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a3d7db9596fecd151c5f638c0ee5d5bd487b6e0ea232e5dc96d5250f6f94b1d"
checksum = "7ced92e76e966ca2fd84c8f7aa01a4aea65b0eb6648d72f7c8f3e2764a67fece"
dependencies = [
"crc32fast",
"libz-rs-sys",
@@ -5146,9 +5146,9 @@ dependencies = [
[[package]]
name = "grok"
version = "2.1.0"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c52724b609896f661a3f4641dd3a44dc602958ef615857c12d00756b4e9355b"
checksum = "273797968160270573071022613fc4aa28b91fe68f3eef6c96a1b2a1947ddfbd"
dependencies = [
"glob",
"onig",
@@ -6716,9 +6716,9 @@ dependencies = [
[[package]]
name = "libz-rs-sys"
version = "0.5.1"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "172a788537a2221661b480fee8dc5f96c580eb34fa88764d3205dc356c7e4221"
checksum = "6489ca9bd760fe9642d7644e827b0c9add07df89857b0416ee15c1cc1a3b8c5a"
dependencies = [
"zlib-rs",
]
@@ -9664,9 +9664,9 @@ dependencies = [
[[package]]
name = "psl"
version = "2.1.119"
version = "2.1.112"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0e49aa528239f2ca13ad87387977c208e59c3fb8c437609f95f1b3898ec6ef1"
checksum = "1c6b4c497a0c6bfb466f75167c728b1a861b0cdc39de9c35b877208a270a9590"
dependencies = [
"psl-types",
]
@@ -14701,9 +14701,9 @@ dependencies = [
[[package]]
name = "zlib-rs"
version = "0.5.1"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "626bd9fa9734751fc50d6060752170984d7053f5a39061f524cda68023d4db8a"
checksum = "868b928d7949e09af2f6086dfc1e01936064cc7a819253bce650d4e2a2d63ba8"
[[package]]
name = "zstd"

View File

@@ -195,13 +195,13 @@
| `slow_query.record_type` | String | Unset | The record type of slow queries. It can be `system_table` or `log`. |
| `slow_query.threshold` | String | Unset | The threshold of slow query. |
| `slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. |
| `export_metrics` | -- | -- | The standalone can export its metrics and send to Prometheus compatible service (e.g. `greptimedb`) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommended to collect metrics generated by itself<br/>You must create the database before enabling it. |
| `export_metrics.self_import.db` | String | Unset | -- |
| `export_metrics.remote_write` | -- | -- | -- |
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
@@ -298,11 +298,13 @@
| `slow_query.threshold` | String | `30s` | The threshold of slow query. It can be human readable time string, for example: `10s`, `100ms`, `1s`. |
| `slow_query.sample_ratio` | Float | `1.0` | The sampling ratio of slow query log. The value should be in the range of (0, 1]. For example, `0.1` means 10% of the slow queries will be logged and `1.0` means all slow queries will be logged. |
| `slow_query.ttl` | String | `30d` | The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`. |
| `export_metrics` | -- | -- | The frontend can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
| `export_metrics.self_import.db` | String | Unset | -- |
| `export_metrics.remote_write` | -- | -- | -- |
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
@@ -313,9 +315,11 @@
| Key | Type | Default | Descriptions |
| --- | -----| ------- | ----------- |
| `data_home` | String | `./greptimedb_data` | The working home directory. |
| `bind_addr` | String | `127.0.0.1:3002` | The bind address of metasrv. |
| `server_addr` | String | `127.0.0.1:3002` | The communication server address for the frontend and datanode to connect to metasrv.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `bind_addr`. |
| `store_addrs` | Array | -- | Store server address default to etcd store.<br/>For postgres store, the format is:<br/>"password=password dbname=postgres user=postgres host=localhost port=5432"<br/>For etcd store, the format is:<br/>"127.0.0.1:2379" |
| `store_key_prefix` | String | `""` | If it's not empty, the metasrv will store all data with this key prefix. |
| `backend` | String | `etcd_store` | The datastore for meta server.<br/>Available values:<br/>- `etcd_store` (default value)<br/>- `memory_store`<br/>- `postgres_store`<br/>- `mysql_store` |
| `backend` | String | `etcd_store` | The datastore for meta server.<br/>Available values:<br/>- `etcd_store` (default value)<br/>- `memory_store`<br/>- `postgres_store` |
| `meta_table_name` | String | `greptime_metakv` | Table name in RDS to store metadata. Effect when using a RDS kvbackend.<br/>**Only used when backend is `postgres_store`.** |
| `meta_election_lock_id` | Integer | `1` | Advisory lock id in PostgreSQL for election. Effect when using PostgreSQL as kvbackend<br/>Only used when backend is `postgres_store`. |
| `selector` | String | `round_robin` | Datanode selector type.<br/>- `round_robin` (default value)<br/>- `lease_based`<br/>- `load_based`<br/>For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". |
@@ -327,12 +331,6 @@
| `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.bind_addr` | String | `127.0.0.1:3002` | The address to bind the gRPC server. |
| `grpc.server_addr` | String | `127.0.0.1:3002` | The communication server address for the frontend and datanode to connect to metasrv.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `bind_addr`. |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
@@ -374,11 +372,13 @@
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `export_metrics` | -- | -- | The metasrv can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
| `export_metrics.self_import.db` | String | Unset | -- |
| `export_metrics.remote_write` | -- | -- | -- |
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
@@ -536,11 +536,13 @@
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
| `export_metrics.self_import.db` | String | Unset | -- |
| `export_metrics.remote_write` | -- | -- | -- |
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |

View File

@@ -635,16 +635,24 @@ max_log_files = 720
[logging.tracing_sample_ratio]
default_ratio = 1.0
## The datanode can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
[export_metrics]
## whether enable export metrics.
enable = false
## The interval of export metrics.
write_interval = "30s"
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
## You must create the database before enabling it.
[export_metrics.self_import]
## @toml2docs:none-default
db = "greptime_metrics"
[export_metrics.remote_write]
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
url = ""
## HTTP headers of Prometheus remote-write carry.

View File

@@ -247,16 +247,24 @@ sample_ratio = 1.0
## The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`.
ttl = "30d"
## The frontend can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
[export_metrics]
## whether enable export metrics.
enable = false
## The interval of export metrics.
write_interval = "30s"
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
## You must create the database before enabling it.
[export_metrics.self_import]
## @toml2docs:none-default
db = "greptime_metrics"
[export_metrics.remote_write]
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
url = ""
## HTTP headers of Prometheus remote-write carry.

View File

@@ -1,6 +1,14 @@
## The working home directory.
data_home = "./greptimedb_data"
## The bind address of metasrv.
bind_addr = "127.0.0.1:3002"
## The communication server address for the frontend and datanode to connect to metasrv.
## If left empty or unset, the server will automatically use the IP address of the first network interface
## on the host, with the same port number as the one specified in `bind_addr`.
server_addr = "127.0.0.1:3002"
## Store server address default to etcd store.
## For postgres store, the format is:
## "password=password dbname=postgres user=postgres host=localhost port=5432"
@@ -16,7 +24,6 @@ store_key_prefix = ""
## - `etcd_store` (default value)
## - `memory_store`
## - `postgres_store`
## - `mysql_store`
backend = "etcd_store"
## Table name in RDS to store metadata. Effect when using a RDS kvbackend.
@@ -60,21 +67,6 @@ node_max_idle_time = "24hours"
## The number of threads to execute the runtime for global write operations.
#+ compact_rt_size = 4
## The gRPC server options.
[grpc]
## The address to bind the gRPC server.
bind_addr = "127.0.0.1:3002"
## The communication server address for the frontend and datanode to connect to metasrv.
## If left empty or unset, the server will automatically use the IP address of the first network interface
## on the host, with the same port number as the one specified in `bind_addr`.
server_addr = "127.0.0.1:3002"
## The number of server worker threads.
runtime_size = 8
## The maximum receive message size for gRPC server.
max_recv_message_size = "512MB"
## The maximum send message size for gRPC server.
max_send_message_size = "512MB"
## The HTTP server options.
[http]
## The address to bind the HTTP server.
@@ -237,16 +229,24 @@ max_log_files = 720
[logging.tracing_sample_ratio]
default_ratio = 1.0
## The metasrv can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
[export_metrics]
## whether enable export metrics.
enable = false
## The interval of export metrics.
write_interval = "30s"
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
## You must create the database before enabling it.
[export_metrics.self_import]
## @toml2docs:none-default
db = "greptime_metrics"
[export_metrics.remote_write]
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
url = ""
## HTTP headers of Prometheus remote-write carry.

View File

@@ -750,11 +750,13 @@ default_ratio = 1.0
## @toml2docs:none-default
#+ sample_ratio = 1.0
## The standalone can export its metrics and send to Prometheus compatible service (e.g. `greptimedb`) from remote-write API.
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
[export_metrics]
## whether enable export metrics.
enable = false
## The interval of export metrics.
write_interval = "30s"
@@ -765,7 +767,7 @@ write_interval = "30s"
db = "greptime_metrics"
[export_metrics.remote_write]
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
url = ""
## HTTP headers of Prometheus remote-write carry.

View File

@@ -237,20 +237,12 @@ impl StartCommand {
tokio_console_addr: global_options.tokio_console_addr.clone(),
};
#[allow(deprecated)]
if let Some(addr) = &self.rpc_bind_addr {
opts.bind_addr.clone_from(addr);
opts.grpc.bind_addr.clone_from(addr);
} else if !opts.bind_addr.is_empty() {
opts.grpc.bind_addr.clone_from(&opts.bind_addr);
}
#[allow(deprecated)]
if let Some(addr) = &self.rpc_server_addr {
opts.server_addr.clone_from(addr);
opts.grpc.server_addr.clone_from(addr);
} else if !opts.server_addr.is_empty() {
opts.grpc.server_addr.clone_from(&opts.server_addr);
}
if let Some(addrs) = &self.store_addrs {
@@ -327,7 +319,7 @@ impl StartCommand {
let plugin_opts = opts.plugins;
let mut opts = opts.component;
opts.grpc.detect_server_addr();
opts.detect_server_addr();
info!("Metasrv options: {:#?}", opts);
@@ -371,7 +363,7 @@ mod tests {
};
let options = cmd.load_options(&Default::default()).unwrap().component;
assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr);
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
assert_eq!(vec!["127.0.0.1:2380".to_string()], options.store_addrs);
assert_eq!(SelectorType::LoadBased, options.selector);
}
@@ -404,8 +396,8 @@ mod tests {
};
let options = cmd.load_options(&Default::default()).unwrap().component;
assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr);
assert_eq!("127.0.0.1:3002".to_string(), options.grpc.server_addr);
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
assert_eq!("127.0.0.1:3002".to_string(), options.server_addr);
assert_eq!(vec!["127.0.0.1:2379".to_string()], options.store_addrs);
assert_eq!(SelectorType::LeaseBased, options.selector);
assert_eq!("debug", options.logging.level.as_ref().unwrap());
@@ -517,10 +509,10 @@ mod tests {
let opts = command.load_options(&Default::default()).unwrap().component;
// Should be read from env, env > default values.
assert_eq!(opts.grpc.bind_addr, "127.0.0.1:14002");
assert_eq!(opts.bind_addr, "127.0.0.1:14002");
// Should be read from config file, config file > env > default values.
assert_eq!(opts.grpc.server_addr, "127.0.0.1:3002");
assert_eq!(opts.server_addr, "127.0.0.1:3002");
// Should be read from cli, cli > config file > env > default values.
assert_eq!(opts.http.addr, "127.0.0.1:14000");

View File

@@ -95,7 +95,7 @@ fn test_load_datanode_example_config() {
..Default::default()
},
export_metrics: ExportMetricsOption {
self_import: None,
self_import: Some(Default::default()),
remote_write: Some(Default::default()),
..Default::default()
},
@@ -148,7 +148,7 @@ fn test_load_frontend_example_config() {
},
},
export_metrics: ExportMetricsOption {
self_import: None,
self_import: Some(Default::default()),
remote_write: Some(Default::default()),
..Default::default()
},
@@ -176,11 +176,7 @@ fn test_load_metasrv_example_config() {
component: MetasrvOptions {
selector: SelectorType::default(),
data_home: DEFAULT_DATA_HOME.to_string(),
grpc: GrpcOptions {
bind_addr: "127.0.0.1:3002".to_string(),
server_addr: "127.0.0.1:3002".to_string(),
..Default::default()
},
server_addr: "127.0.0.1:3002".to_string(),
logging: LoggingOptions {
dir: Path::new(DEFAULT_DATA_HOME)
.join(DEFAULT_LOGGING_DIR)
@@ -199,7 +195,7 @@ fn test_load_metasrv_example_config() {
},
},
export_metrics: ExportMetricsOption {
self_import: None,
self_import: Some(Default::default()),
remote_write: Some(Default::default()),
..Default::default()
},

View File

@@ -64,6 +64,18 @@ impl Default for FlightEncoder {
}
impl FlightEncoder {
pub fn with_compression_disabled() -> Self {
let write_options = writer::IpcWriteOptions::default()
.try_with_compression(None)
.unwrap();
Self {
write_options,
data_gen: writer::IpcDataGenerator::default(),
dictionary_tracker: writer::DictionaryTracker::new(false),
}
}
pub fn encode(&mut self, flight_message: FlightMessage) -> FlightData {
match flight_message {
FlightMessage::Schema(schema) => SchemaAsIpc::new(&schema, &self.write_options).into(),

View File

@@ -35,9 +35,6 @@ pub const FLOWNODE_LEASE_SECS: u64 = DATANODE_LEASE_SECS;
/// The lease seconds of metasrv leader.
pub const META_LEASE_SECS: u64 = 5;
/// The keep-alive interval of the Postgres connection.
pub const POSTGRES_KEEP_ALIVE_SECS: u64 = 30;
/// In a lease, there are two opportunities for renewal.
pub const META_KEEP_ALIVE_INTERVAL_SECS: u64 = META_LEASE_SECS / 2;

View File

@@ -130,13 +130,7 @@ impl JaegerQueryHandler for Instance {
.await?)
}
async fn get_trace(
&self,
ctx: QueryContextRef,
trace_id: &str,
start_time: Option<i64>,
end_time: Option<i64>,
) -> ServerResult<Output> {
async fn get_trace(&self, ctx: QueryContextRef, trace_id: &str) -> ServerResult<Output> {
// It's equivalent to
//
// ```
@@ -145,25 +139,13 @@ impl JaegerQueryHandler for Instance {
// FROM
// {db}.{trace_table}
// WHERE
// trace_id = '{trace_id}' AND
// timestamp >= {start_time} AND
// timestamp <= {end_time}
// trace_id = '{trace_id}'
// ORDER BY
// timestamp DESC
// ```.
let selects = vec![wildcard()];
let mut filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
if let Some(start_time) = start_time {
// Microseconds to nanoseconds.
filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time * 1_000)));
}
if let Some(end_time) = end_time {
// Microseconds to nanoseconds.
filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time * 1_000)));
}
let filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
Ok(query_trace_table(
ctx,

View File

@@ -31,6 +31,8 @@ use common_meta::kv_backend::rds::MySqlStore;
#[cfg(feature = "pg_kvbackend")]
use common_meta::kv_backend::rds::PgStore;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
#[cfg(feature = "pg_kvbackend")]
use common_telemetry::error;
use common_telemetry::info;
#[cfg(feature = "pg_kvbackend")]
use deadpool_postgres::{Config, Runtime};
@@ -142,8 +144,7 @@ impl MetasrvInstance {
let (serve_state_tx, serve_state_rx) = oneshot::channel();
let socket_addr =
bootstrap_metasrv_with_router(&self.opts.grpc.bind_addr, router, serve_state_tx, rx)
.await?;
bootstrap_metasrv_with_router(&self.opts.bind_addr, router, serve_state_tx, rx).await?;
self.bind_addr = Some(socket_addr);
let addr = self.opts.http.addr.parse().context(error::ParseAddrSnafu {
@@ -259,7 +260,7 @@ pub async fn metasrv_builder(
let etcd_client = create_etcd_client(&opts.store_addrs).await?;
let kv_backend = EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops);
let election = EtcdElection::with_etcd_client(
&opts.grpc.server_addr,
&opts.server_addr,
etcd_client,
opts.store_key_prefix.clone(),
)
@@ -269,41 +270,22 @@ pub async fn metasrv_builder(
}
#[cfg(feature = "pg_kvbackend")]
(None, BackendImpl::PostgresStore) => {
use std::time::Duration;
use common_meta::distributed_time_constants::POSTGRES_KEEP_ALIVE_SECS;
use crate::election::rds::postgres::ElectionPgClient;
let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS);
let execution_timeout = Duration::from_secs(META_LEASE_SECS);
let statement_timeout = Duration::from_secs(META_LEASE_SECS);
let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS);
let mut cfg = Config::new();
cfg.keepalives = Some(true);
cfg.keepalives_idle = Some(Duration::from_secs(POSTGRES_KEEP_ALIVE_SECS));
// We use a separate pool for election since we need a different session keep-alive idle time.
let pool = create_postgres_pool_with(&opts.store_addrs, cfg).await?;
let election_client =
ElectionPgClient::new(pool, execution_timeout, meta_lease_ttl, statement_timeout)?;
let election = PgElection::with_pg_client(
opts.grpc.server_addr.clone(),
election_client,
opts.store_key_prefix.clone(),
candidate_lease_ttl,
meta_lease_ttl,
&opts.meta_table_name,
opts.meta_election_lock_id,
)
.await?;
let pool = create_postgres_pool(&opts.store_addrs).await?;
let kv_backend = PgStore::with_pg_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
.await
.context(error::KvBackendSnafu)?;
// Client for election should be created separately since we need a different session keep-alive idle time.
let election_client = create_postgres_client(opts).await?;
let election = PgElection::with_pg_client(
opts.server_addr.clone(),
election_client,
opts.store_key_prefix.clone(),
CANDIDATE_LEASE_SECS,
META_LEASE_SECS,
&opts.meta_table_name,
opts.meta_election_lock_id,
)
.await?;
(kv_backend, Some(election))
}
#[cfg(feature = "mysql_kvbackend")]
@@ -317,7 +299,7 @@ pub async fn metasrv_builder(
let election_table_name = opts.meta_table_name.clone() + "_election";
let election_client = create_mysql_client(opts).await?;
let election = MySqlElection::with_mysql_client(
opts.grpc.server_addr.clone(),
opts.server_addr.clone(),
election_client,
opts.store_key_prefix.clone(),
CANDIDATE_LEASE_SECS,
@@ -390,24 +372,31 @@ pub async fn create_etcd_client(store_addrs: &[String]) -> Result<Client> {
}
#[cfg(feature = "pg_kvbackend")]
/// Creates a pool for the Postgres backend.
///
/// It only use first store addr to create a pool.
pub async fn create_postgres_pool(store_addrs: &[String]) -> Result<deadpool_postgres::Pool> {
create_postgres_pool_with(store_addrs, Config::new()).await
async fn create_postgres_client(opts: &MetasrvOptions) -> Result<tokio_postgres::Client> {
let postgres_url = opts
.store_addrs
.first()
.context(error::InvalidArgumentsSnafu {
err_msg: "empty store addrs",
})?;
let (client, connection) = tokio_postgres::connect(postgres_url, NoTls)
.await
.context(error::ConnectPostgresSnafu)?;
tokio::spawn(async move {
if let Err(e) = connection.await {
error!(e; "connection error");
}
});
Ok(client)
}
#[cfg(feature = "pg_kvbackend")]
/// Creates a pool for the Postgres backend.
///
/// It only use first store addr to create a pool, and use the given config to create a pool.
pub async fn create_postgres_pool_with(
store_addrs: &[String],
mut cfg: Config,
) -> Result<deadpool_postgres::Pool> {
pub async fn create_postgres_pool(store_addrs: &[String]) -> Result<deadpool_postgres::Pool> {
let postgres_url = store_addrs.first().context(error::InvalidArgumentsSnafu {
err_msg: "empty store addrs",
})?;
let mut cfg = Config::new();
cfg.url = Some(postgres_url.to_string());
let pool = cfg
.create_pool(Some(Runtime::Tokio1), NoTls)

View File

@@ -157,11 +157,6 @@ pub trait Election: Send + Sync {
/// but only one can be the leader at a time.
async fn campaign(&self) -> Result<()>;
/// Resets the campaign.
///
/// Reset the client and the leader flag if needed.
async fn reset_campaign(&self) {}
/// Returns the leader value for the current election.
async fn leader(&self) -> Result<Self::Leader>;

View File

@@ -18,12 +18,11 @@ use std::time::Duration;
use common_telemetry::{error, warn};
use common_time::Timestamp;
use deadpool_postgres::{Manager, Pool};
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::{broadcast, RwLock};
use tokio::sync::broadcast;
use tokio::time::MissedTickBehavior;
use tokio_postgres::types::ToSql;
use tokio_postgres::Row;
use tokio_postgres::Client;
use crate::election::rds::{parse_value_and_expire_time, Lease, RdsLeaderKey, LEASE_SEP};
use crate::election::{
@@ -31,14 +30,15 @@ use crate::election::{
CANDIDATES_ROOT, ELECTION_KEY,
};
use crate::error::{
DeserializeFromJsonSnafu, GetPostgresClientSnafu, NoLeaderSnafu, PostgresExecutionSnafu,
Result, SerializeToJsonSnafu, SqlExecutionTimeoutSnafu, UnexpectedSnafu,
DeserializeFromJsonSnafu, NoLeaderSnafu, PostgresExecutionSnafu, Result, SerializeToJsonSnafu,
UnexpectedSnafu,
};
use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
struct ElectionSqlFactory<'a> {
lock_id: u64,
table_name: &'a str,
meta_lease_ttl_secs: u64,
}
struct ElectionSqlSet {
@@ -88,10 +88,11 @@ struct ElectionSqlSet {
}
impl<'a> ElectionSqlFactory<'a> {
fn new(lock_id: u64, table_name: &'a str) -> Self {
fn new(lock_id: u64, table_name: &'a str, meta_lease_ttl_secs: u64) -> Self {
Self {
lock_id,
table_name,
meta_lease_ttl_secs,
}
}
@@ -107,6 +108,15 @@ impl<'a> ElectionSqlFactory<'a> {
}
}
// Currently the session timeout is longer than the leader lease time.
// So the leader will renew the lease twice before the session timeout if everything goes well.
fn set_idle_session_timeout_sql(&self) -> String {
format!(
"SET idle_session_timeout = '{}s';",
self.meta_lease_ttl_secs + 1
)
}
fn campaign_sql(&self) -> String {
format!("SELECT pg_try_advisory_lock({})", self.lock_id)
}
@@ -161,165 +171,46 @@ impl<'a> ElectionSqlFactory<'a> {
}
}
/// PgClient for election.
pub struct ElectionPgClient {
current: Option<deadpool::managed::Object<Manager>>,
pool: Pool,
/// The client-side timeout for statement execution.
///
/// This timeout is enforced by the client application and is independent of any server-side timeouts.
/// If a statement takes longer than this duration to execute, the client will abort the operation.
execution_timeout: Duration,
/// The idle session timeout.
///
/// This timeout is configured per client session and is enforced by the PostgreSQL server.
/// If a session remains idle for longer than this duration, the server will terminate it.
idle_session_timeout: Duration,
/// The statement timeout.
///
/// This timeout is configured per client session and is enforced by the PostgreSQL server.
/// If a statement takes longer than this duration to execute, the server will abort it.
statement_timeout: Duration,
}
impl ElectionPgClient {
pub fn new(
pool: Pool,
execution_timeout: Duration,
idle_session_timeout: Duration,
statement_timeout: Duration,
) -> Result<ElectionPgClient> {
Ok(ElectionPgClient {
current: None,
pool,
execution_timeout,
idle_session_timeout,
statement_timeout,
})
}
fn set_idle_session_timeout_sql(&self) -> String {
format!(
"SET idle_session_timeout = '{}s';",
self.idle_session_timeout.as_secs()
)
}
fn set_statement_timeout_sql(&self) -> String {
format!(
"SET statement_timeout = '{}s';",
self.statement_timeout.as_secs()
)
}
async fn reset_client(&mut self) -> Result<()> {
self.current = None;
self.maybe_init_client().await
}
async fn maybe_init_client(&mut self) -> Result<()> {
if self.current.is_none() {
let client = self.pool.get().await.context(GetPostgresClientSnafu)?;
self.current = Some(client);
// Set idle session timeout and statement timeout.
let idle_session_timeout_sql = self.set_idle_session_timeout_sql();
self.execute(&idle_session_timeout_sql, &[]).await?;
let statement_timeout_sql = self.set_statement_timeout_sql();
self.execute(&statement_timeout_sql, &[]).await?;
}
Ok(())
}
/// Returns the result of the query.
///
/// # Panics
/// if `current` is `None`.
async fn execute(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64> {
let result = tokio::time::timeout(
self.execution_timeout,
self.current.as_ref().unwrap().execute(sql, params),
)
.await
.map_err(|_| {
SqlExecutionTimeoutSnafu {
sql: sql.to_string(),
duration: self.execution_timeout,
}
.build()
})?;
result.context(PostgresExecutionSnafu { sql })
}
/// Returns the result of the query.
///
/// # Panics
/// if `current` is `None`.
async fn query(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>> {
let result = tokio::time::timeout(
self.execution_timeout,
self.current.as_ref().unwrap().query(sql, params),
)
.await
.map_err(|_| {
SqlExecutionTimeoutSnafu {
sql: sql.to_string(),
duration: self.execution_timeout,
}
.build()
})?;
result.context(PostgresExecutionSnafu { sql })
}
}
/// PostgreSql implementation of Election.
pub struct PgElection {
leader_value: String,
pg_client: RwLock<ElectionPgClient>,
client: Client,
is_leader: AtomicBool,
leader_infancy: AtomicBool,
leader_watcher: broadcast::Sender<LeaderChangeMessage>,
store_key_prefix: String,
candidate_lease_ttl: Duration,
meta_lease_ttl: Duration,
candidate_lease_ttl_secs: u64,
meta_lease_ttl_secs: u64,
sql_set: ElectionSqlSet,
}
impl PgElection {
async fn maybe_init_client(&self) -> Result<()> {
if self.pg_client.read().await.current.is_none() {
self.pg_client.write().await.maybe_init_client().await?;
}
Ok(())
}
pub async fn with_pg_client(
leader_value: String,
pg_client: ElectionPgClient,
client: Client,
store_key_prefix: String,
candidate_lease_ttl: Duration,
meta_lease_ttl: Duration,
candidate_lease_ttl_secs: u64,
meta_lease_ttl_secs: u64,
table_name: &str,
lock_id: u64,
) -> Result<ElectionRef> {
let sql_factory = ElectionSqlFactory::new(lock_id, table_name);
let sql_factory = ElectionSqlFactory::new(lock_id, table_name, meta_lease_ttl_secs);
// Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead advisory lock.
client
.execute(&sql_factory.set_idle_session_timeout_sql(), &[])
.await
.context(PostgresExecutionSnafu)?;
let tx = listen_leader_change(leader_value.clone());
Ok(Arc::new(Self {
leader_value,
pg_client: RwLock::new(pg_client),
client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(false),
leader_watcher: tx,
store_key_prefix,
candidate_lease_ttl,
meta_lease_ttl,
candidate_lease_ttl_secs,
meta_lease_ttl_secs,
sql_set: sql_factory.build(),
}))
}
@@ -358,17 +249,18 @@ impl Election for PgElection {
input: format!("{node_info:?}"),
})?;
let res = self
.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl_secs)
.await?;
// May registered before, just update the lease.
if !res {
self.delete_value(&key).await?;
self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl_secs)
.await?;
}
// Check if the current lease has expired and renew the lease.
let mut keep_alive_interval = tokio::time::interval(self.candidate_lease_ttl / 2);
let mut keep_alive_interval =
tokio::time::interval(Duration::from_secs(self.candidate_lease_ttl_secs / 2));
loop {
let _ = keep_alive_interval.tick().await;
@@ -390,8 +282,13 @@ impl Election for PgElection {
);
// Safety: origin is Some since we are using `get_value_with_lease` with `true`.
self.update_value_with_lease(&key, &lease.origin, &node_info, self.candidate_lease_ttl)
.await?;
self.update_value_with_lease(
&key,
&lease.origin,
&node_info,
self.candidate_lease_ttl_secs,
)
.await?;
}
}
@@ -424,17 +321,16 @@ impl Election for PgElection {
/// - If the lock is not acquired (result is false), it calls the `follower_action` method
/// to perform actions as a follower.
async fn campaign(&self) -> Result<()> {
let mut keep_alive_interval = tokio::time::interval(self.meta_lease_ttl / 2);
let mut keep_alive_interval =
tokio::time::interval(Duration::from_secs(self.meta_lease_ttl_secs / 2));
keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
self.maybe_init_client().await?;
loop {
let res = self
.pg_client
.read()
.await
.client
.query(&self.sql_set.campaign, &[])
.await?;
.await
.context(PostgresExecutionSnafu)?;
let row = res.first().context(UnexpectedSnafu {
violated: "Failed to get the result of acquiring advisory lock",
})?;
@@ -453,12 +349,6 @@ impl Election for PgElection {
}
}
async fn reset_campaign(&self) {
if let Err(err) = self.pg_client.write().await.reset_client().await {
error!(err; "Failed to reset client");
}
}
async fn leader(&self) -> Result<Self::Leader> {
if self.is_leader.load(Ordering::Relaxed) {
Ok(self.leader_value.as_bytes().into())
@@ -486,13 +376,11 @@ impl PgElection {
/// Returns value, expire time and current time. If `with_origin` is true, the origin string is also returned.
async fn get_value_with_lease(&self, key: &str) -> Result<Option<Lease>> {
let key = key.as_bytes();
self.maybe_init_client().await?;
let res = self
.pg_client
.read()
.await
.client
.query(&self.sql_set.get_value_with_lease, &[&key])
.await?;
.await
.context(PostgresExecutionSnafu)?;
if res.is_empty() {
Ok(None)
@@ -526,13 +414,11 @@ impl PgElection {
key_prefix: &str,
) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
self.maybe_init_client().await?;
let res = self
.pg_client
.read()
.await
.client
.query(&self.sql_set.get_value_with_lease_by_prefix, &[&key_prefix])
.await?;
.await
.context(PostgresExecutionSnafu)?;
let mut values_with_leases = vec![];
let mut current = Timestamp::default();
@@ -559,21 +445,18 @@ impl PgElection {
key: &str,
prev: &str,
updated: &str,
lease_ttl: Duration,
lease_ttl: u64,
) -> Result<()> {
let key = key.as_bytes();
let prev = prev.as_bytes();
self.maybe_init_client().await?;
let lease_ttl_secs = lease_ttl.as_secs() as f64;
let res = self
.pg_client
.read()
.await
.client
.execute(
&self.sql_set.update_value_with_lease,
&[&key, &prev, &updated, &lease_ttl_secs],
&[&key, &prev, &updated, &(lease_ttl as f64)],
)
.await?;
.await
.context(PostgresExecutionSnafu)?;
ensure!(
res == 1,
@@ -590,18 +473,16 @@ impl PgElection {
&self,
key: &str,
value: &str,
lease_ttl: Duration,
lease_ttl_secs: u64,
) -> Result<bool> {
let key = key.as_bytes();
let lease_ttl_secs = lease_ttl.as_secs() as f64;
let lease_ttl_secs = lease_ttl_secs as f64;
let params: Vec<&(dyn ToSql + Sync)> = vec![&key, &value, &lease_ttl_secs];
self.maybe_init_client().await?;
let res = self
.pg_client
.read()
.await
.client
.query(&self.sql_set.put_value_with_lease, &params)
.await?;
.await
.context(PostgresExecutionSnafu)?;
Ok(res.is_empty())
}
@@ -609,13 +490,11 @@ impl PgElection {
/// Caution: Should only delete the key if the lease is expired.
async fn delete_value(&self, key: &str) -> Result<bool> {
let key = key.as_bytes();
self.maybe_init_client().await?;
let res = self
.pg_client
.read()
.await
.client
.query(&self.sql_set.delete_value, &[&key])
.await?;
.await
.context(PostgresExecutionSnafu)?;
Ok(res.len() == 1)
}
@@ -657,7 +536,7 @@ impl PgElection {
&key,
&lease.origin,
&self.leader_value,
self.meta_lease_ttl,
self.meta_lease_ttl_secs,
)
.await?;
}
@@ -726,12 +605,10 @@ impl PgElection {
..Default::default()
};
self.delete_value(&key).await?;
self.maybe_init_client().await?;
self.pg_client
.read()
.await
self.client
.query(&self.sql_set.step_down, &[])
.await?;
.await
.context(PostgresExecutionSnafu)?;
send_leader_change_and_set_flags(
&self.is_leader,
&self.leader_infancy,
@@ -774,7 +651,7 @@ impl PgElection {
..Default::default()
};
self.delete_value(&key).await?;
self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl)
self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl_secs)
.await?;
if self
@@ -797,21 +674,15 @@ impl PgElection {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::env;
use common_meta::maybe_skip_postgres_integration_test;
use tokio_postgres::{Client, NoTls};
use super::*;
use crate::bootstrap::create_postgres_pool;
use crate::error;
use crate::error::PostgresExecutionSnafu;
async fn create_postgres_client(
table_name: Option<&str>,
execution_timeout: Duration,
idle_session_timeout: Duration,
statement_timeout: Duration,
) -> Result<ElectionPgClient> {
async fn create_postgres_client(table_name: Option<&str>) -> Result<Client> {
let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default();
if endpoint.is_empty() {
return UnexpectedSnafu {
@@ -819,34 +690,25 @@ mod tests {
}
.fail();
}
let pool = create_postgres_pool(&[endpoint]).await.unwrap();
let mut pg_client = ElectionPgClient::new(
pool,
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.unwrap();
pg_client.maybe_init_client().await?;
let (client, connection) = tokio_postgres::connect(&endpoint, NoTls)
.await
.context(PostgresExecutionSnafu)?;
tokio::spawn(async move {
connection.await.context(PostgresExecutionSnafu).unwrap();
});
if let Some(table_name) = table_name {
let create_table_sql = format!(
"CREATE TABLE IF NOT EXISTS \"{}\"(k bytea PRIMARY KEY, v bytea);",
table_name
);
pg_client.execute(&create_table_sql, &[]).await?;
client.execute(&create_table_sql, &[]).await.unwrap();
}
Ok(pg_client)
Ok(client)
}
async fn drop_table(pg_election: &PgElection, table_name: &str) {
async fn drop_table(client: &Client, table_name: &str) {
let sql = format!("DROP TABLE IF EXISTS \"{}\";", table_name);
pg_election
.pg_client
.read()
.await
.execute(&sql, &[])
.await
.unwrap();
client.execute(&sql, &[]).await.unwrap();
}
#[tokio::test]
@@ -857,35 +719,23 @@ mod tests {
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_postgres_crud_greptime_metakv";
let candidate_lease_ttl = Duration::from_secs(10);
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let meta_lease_ttl = Duration::from_secs(2);
let idle_session_timeout = Duration::from_secs(0);
let client = create_postgres_client(
Some(table_name),
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let client = create_postgres_client(Some(table_name)).await.unwrap();
let (tx, _) = broadcast::channel(100);
let pg_election = PgElection {
leader_value: "test_leader".to_string(),
pg_client: RwLock::new(client),
client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid,
candidate_lease_ttl,
meta_lease_ttl,
sql_set: ElectionSqlFactory::new(28319, table_name).build(),
candidate_lease_ttl_secs: 10,
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28319, table_name, 2).build(),
};
let res = pg_election
.put_value_with_lease(&key, &value, candidate_lease_ttl)
.put_value_with_lease(&key, &value, 10)
.await
.unwrap();
assert!(res);
@@ -898,7 +748,7 @@ mod tests {
assert_eq!(lease.leader_value, value);
pg_election
.update_value_with_lease(&key, &lease.origin, &value, pg_election.meta_lease_ttl)
.update_value_with_lease(&key, &lease.origin, &value, pg_election.meta_lease_ttl_secs)
.await
.unwrap();
@@ -912,7 +762,7 @@ mod tests {
let key = format!("test_key_{}", i);
let value = format!("test_value_{}", i);
pg_election
.put_value_with_lease(&key, &value, candidate_lease_ttl)
.put_value_with_lease(&key, &value, 10)
.await
.unwrap();
}
@@ -937,39 +787,28 @@ mod tests {
assert!(res.is_empty());
assert!(current == Timestamp::default());
drop_table(&pg_election, table_name).await;
drop_table(&pg_election.client, table_name).await;
}
async fn candidate(
leader_value: String,
candidate_lease_ttl: Duration,
candidate_lease_ttl_secs: u64,
store_key_prefix: String,
table_name: String,
) {
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let meta_lease_ttl = Duration::from_secs(2);
let idle_session_timeout = Duration::from_secs(0);
let client = create_postgres_client(
None,
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let client = create_postgres_client(None).await.unwrap();
let (tx, _) = broadcast::channel(100);
let pg_election = PgElection {
leader_value,
pg_client: RwLock::new(client),
client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix,
candidate_lease_ttl,
meta_lease_ttl,
sql_set: ElectionSqlFactory::new(28319, &table_name).build(),
candidate_lease_ttl_secs,
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28319, &table_name, 2).build(),
};
let node_info = MetasrvNodeInfo {
@@ -985,28 +824,17 @@ mod tests {
async fn test_candidate_registration() {
maybe_skip_postgres_integration_test!();
let leader_value_prefix = "test_leader".to_string();
let candidate_lease_ttl_secs = 5;
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_candidate_registration_greptime_metakv";
let mut handles = vec![];
let candidate_lease_ttl = Duration::from_secs(5);
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let meta_lease_ttl = Duration::from_secs(2);
let idle_session_timeout = Duration::from_secs(0);
let client = create_postgres_client(
Some(table_name),
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let client = create_postgres_client(Some(table_name)).await.unwrap();
for i in 0..10 {
let leader_value = format!("{}{}", leader_value_prefix, i);
let handle = tokio::spawn(candidate(
leader_value,
candidate_lease_ttl,
candidate_lease_ttl_secs,
uuid.clone(),
table_name.to_string(),
));
@@ -1019,14 +847,14 @@ mod tests {
let leader_value = "test_leader".to_string();
let pg_election = PgElection {
leader_value,
pg_client: RwLock::new(client),
client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid.clone(),
candidate_lease_ttl,
meta_lease_ttl,
sql_set: ElectionSqlFactory::new(28319, table_name).build(),
candidate_lease_ttl_secs,
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28319, table_name, 2).build(),
};
let candidates = pg_election.all_candidates().await.unwrap();
@@ -1048,40 +876,29 @@ mod tests {
assert!(res);
}
drop_table(&pg_election, table_name).await;
drop_table(&pg_election.client, table_name).await;
}
#[tokio::test]
async fn test_elected_and_step_down() {
maybe_skip_postgres_integration_test!();
let leader_value = "test_leader".to_string();
let candidate_lease_ttl_secs = 5;
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_elected_and_step_down_greptime_metakv";
let candidate_lease_ttl = Duration::from_secs(5);
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let meta_lease_ttl = Duration::from_secs(2);
let idle_session_timeout = Duration::from_secs(0);
let client = create_postgres_client(
Some(table_name),
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let client = create_postgres_client(Some(table_name)).await.unwrap();
let (tx, mut rx) = broadcast::channel(100);
let leader_pg_election = PgElection {
leader_value: leader_value.clone(),
pg_client: RwLock::new(client),
client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid,
candidate_lease_ttl,
meta_lease_ttl,
sql_set: ElectionSqlFactory::new(28320, table_name).build(),
candidate_lease_ttl_secs,
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28320, table_name, 2).build(),
};
leader_pg_election.elected().await.unwrap();
@@ -1173,7 +990,7 @@ mod tests {
_ => panic!("Expected LeaderChangeMessage::StepDown"),
}
drop_table(&leader_pg_election, table_name).await;
drop_table(&leader_pg_election.client, table_name).await;
}
#[tokio::test]
@@ -1182,38 +999,25 @@ mod tests {
let leader_value = "test_leader".to_string();
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_leader_action_greptime_metakv";
let candidate_lease_ttl = Duration::from_secs(5);
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let meta_lease_ttl = Duration::from_secs(2);
let idle_session_timeout = Duration::from_secs(0);
let client = create_postgres_client(
Some(table_name),
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let candidate_lease_ttl_secs = 5;
let client = create_postgres_client(Some(table_name)).await.unwrap();
let (tx, mut rx) = broadcast::channel(100);
let leader_pg_election = PgElection {
leader_value: leader_value.clone(),
pg_client: RwLock::new(client),
client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid,
candidate_lease_ttl,
meta_lease_ttl,
sql_set: ElectionSqlFactory::new(28321, table_name).build(),
candidate_lease_ttl_secs,
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28321, table_name, 2).build(),
};
// Step 1: No leader exists, campaign and elected.
let res = leader_pg_election
.pg_client
.read()
.await
.client
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
@@ -1244,9 +1048,7 @@ mod tests {
// Step 2: As a leader, renew the lease.
let res = leader_pg_election
.pg_client
.read()
.await
.client
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
@@ -1268,9 +1070,7 @@ mod tests {
tokio::time::sleep(Duration::from_secs(2)).await;
let res = leader_pg_election
.pg_client
.read()
.await
.client
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
@@ -1298,9 +1098,7 @@ mod tests {
// Step 4: Re-campaign and elected.
let res = leader_pg_election
.pg_client
.read()
.await
.client
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
@@ -1357,9 +1155,7 @@ mod tests {
// Step 6: Re-campaign and elected.
let res = leader_pg_election
.pg_client
.read()
.await
.client
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
@@ -1390,9 +1186,7 @@ mod tests {
// Step 7: Something wrong, the leader key changed by others.
let res = leader_pg_election
.pg_client
.read()
.await
.client
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
@@ -1403,11 +1197,7 @@ mod tests {
.await
.unwrap();
leader_pg_election
.put_value_with_lease(
&leader_pg_election.election_key(),
"test",
Duration::from_secs(10),
)
.put_value_with_lease(&leader_pg_election.election_key(), "test", 10)
.await
.unwrap();
leader_pg_election.leader_action().await.unwrap();
@@ -1433,74 +1223,52 @@ mod tests {
// Clean up
leader_pg_election
.pg_client
.read()
.await
.client
.query(&leader_pg_election.sql_set.step_down, &[])
.await
.unwrap();
drop_table(&leader_pg_election, table_name).await;
drop_table(&leader_pg_election.client, table_name).await;
}
#[tokio::test]
async fn test_follower_action() {
maybe_skip_postgres_integration_test!();
common_telemetry::init_default_ut_logging();
let candidate_lease_ttl_secs = 5;
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_follower_action_greptime_metakv";
let candidate_lease_ttl = Duration::from_secs(5);
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let meta_lease_ttl = Duration::from_secs(2);
let idle_session_timeout = Duration::from_secs(0);
let follower_client = create_postgres_client(
Some(table_name),
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let follower_client = create_postgres_client(Some(table_name)).await.unwrap();
let (tx, mut rx) = broadcast::channel(100);
let follower_pg_election = PgElection {
leader_value: "test_follower".to_string(),
pg_client: RwLock::new(follower_client),
client: follower_client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid.clone(),
candidate_lease_ttl,
meta_lease_ttl,
sql_set: ElectionSqlFactory::new(28322, table_name).build(),
candidate_lease_ttl_secs,
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28322, table_name, 2).build(),
};
let leader_client = create_postgres_client(
Some(table_name),
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let leader_client = create_postgres_client(Some(table_name)).await.unwrap();
let (tx, _) = broadcast::channel(100);
let leader_pg_election = PgElection {
leader_value: "test_leader".to_string(),
pg_client: RwLock::new(leader_client),
client: leader_client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid,
candidate_lease_ttl,
meta_lease_ttl,
sql_set: ElectionSqlFactory::new(28322, table_name).build(),
candidate_lease_ttl_secs,
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28322, table_name, 2).build(),
};
leader_pg_election
.pg_client
.read()
.await
.client
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
@@ -1541,41 +1309,11 @@ mod tests {
// Clean up
leader_pg_election
.pg_client
.read()
.await
.client
.query(&leader_pg_election.sql_set.step_down, &[])
.await
.unwrap();
drop_table(&follower_pg_election, table_name).await;
}
#[tokio::test]
async fn test_idle_session_timeout() {
maybe_skip_postgres_integration_test!();
common_telemetry::init_default_ut_logging();
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let idle_session_timeout = Duration::from_secs(1);
let mut client = create_postgres_client(
None,
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(1100)).await;
// Wait for the idle session timeout.
let err = client.query("SELECT 1", &[]).await.unwrap_err();
assert_matches!(err, error::Error::PostgresExecution { .. });
let error::Error::PostgresExecution { error, .. } = err else {
panic!("Expected PostgresExecution error");
};
assert!(error.is_closed());
// Reset the client and try again.
client.reset_client().await.unwrap();
let _ = client.query("SELECT 1", &[]).await.unwrap();
drop_table(&follower_pg_election.client, table_name).await;
}
}

View File

@@ -748,31 +748,21 @@ pub enum Error {
},
#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Failed to execute via postgres, sql: {}", sql))]
#[snafu(display("Failed to execute via postgres"))]
PostgresExecution {
#[snafu(source)]
error: tokio_postgres::Error,
sql: String,
#[snafu(implicit)]
location: Location,
},
#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Failed to get Postgres client"))]
GetPostgresClient {
#[snafu(implicit)]
location: Location,
#[snafu(display("Failed to connect to Postgres"))]
ConnectPostgres {
#[snafu(source)]
error: deadpool::managed::PoolError<tokio_postgres::Error>,
},
#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Sql execution timeout, sql: {}, duration: {:?}", sql, duration))]
SqlExecutionTimeout {
error: tokio_postgres::Error,
#[snafu(implicit)]
location: Location,
sql: String,
duration: std::time::Duration,
},
#[cfg(feature = "pg_kvbackend")]
@@ -1015,10 +1005,9 @@ impl ErrorExt for Error {
Error::LookupPeer { source, .. } => source.status_code(),
#[cfg(feature = "pg_kvbackend")]
Error::CreatePostgresPool { .. }
| Error::GetPostgresClient { .. }
| Error::GetPostgresConnection { .. }
| Error::PostgresExecution { .. }
| Error::SqlExecutionTimeout { .. } => StatusCode::Internal,
| Error::ConnectPostgres { .. } => StatusCode::Internal,
#[cfg(feature = "mysql_kvbackend")]
Error::MySqlExecution { .. }
| Error::CreateMySqlPool { .. }

View File

@@ -46,7 +46,6 @@ use common_telemetry::{error, info, warn};
use common_wal::config::MetasrvWalConfig;
use serde::{Deserialize, Serialize};
use servers::export_metrics::ExportMetricsOption;
use servers::grpc::GrpcOptions;
use servers::http::HttpOptions;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
@@ -97,10 +96,8 @@ pub enum BackendImpl {
#[serde(default)]
pub struct MetasrvOptions {
/// The address the server listens on.
#[deprecated(note = "Use grpc.bind_addr instead")]
pub bind_addr: String,
/// The address the server advertises to the clients.
#[deprecated(note = "Use grpc.server_addr instead")]
pub server_addr: String,
/// The address of the store, e.g., etcd.
pub store_addrs: Vec<String>,
@@ -115,7 +112,6 @@ pub struct MetasrvOptions {
/// If it's true, the region failover will be allowed even if the local WAL is used.
/// Note that this option is not recommended to be set to true, because it may lead to data loss during failover.
pub allow_region_failover_on_local_wal: bool,
pub grpc: GrpcOptions,
/// The HTTP server options.
pub http: HttpOptions,
/// The logging options.
@@ -170,6 +166,8 @@ impl fmt::Debug for MetasrvOptions {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut debug_struct = f.debug_struct("MetasrvOptions");
debug_struct
.field("bind_addr", &self.bind_addr)
.field("server_addr", &self.server_addr)
.field("store_addrs", &self.sanitize_store_addrs())
.field("selector", &self.selector)
.field("use_memory_store", &self.use_memory_store)
@@ -178,7 +176,6 @@ impl fmt::Debug for MetasrvOptions {
"allow_region_failover_on_local_wal",
&self.allow_region_failover_on_local_wal,
)
.field("grpc", &self.grpc)
.field("http", &self.http)
.field("logging", &self.logging)
.field("procedure", &self.procedure)
@@ -211,19 +208,14 @@ const DEFAULT_METASRV_ADDR_PORT: &str = "3002";
impl Default for MetasrvOptions {
fn default() -> Self {
Self {
#[allow(deprecated)]
bind_addr: String::new(),
#[allow(deprecated)]
bind_addr: format!("127.0.0.1:{}", DEFAULT_METASRV_ADDR_PORT),
// If server_addr is not set, the server will use the local ip address as the server address.
server_addr: String::new(),
store_addrs: vec!["127.0.0.1:2379".to_string()],
selector: SelectorType::default(),
use_memory_store: false,
enable_region_failover: false,
allow_region_failover_on_local_wal: false,
grpc: GrpcOptions {
bind_addr: format!("127.0.0.1:{}", DEFAULT_METASRV_ADDR_PORT),
..Default::default()
},
http: HttpOptions::default(),
logging: LoggingOptions::default(),
procedure: ProcedureConfig {
@@ -261,6 +253,37 @@ impl Configurable for MetasrvOptions {
}
impl MetasrvOptions {
/// Detect server address.
#[cfg(not(target_os = "android"))]
pub fn detect_server_addr(&mut self) {
if self.server_addr.is_empty() {
match local_ip_address::local_ip() {
Ok(ip) => {
let detected_addr = format!(
"{}:{}",
ip,
self.bind_addr
.split(':')
.nth(1)
.unwrap_or(DEFAULT_METASRV_ADDR_PORT)
);
info!("Using detected: {} as server address", detected_addr);
self.server_addr = detected_addr;
}
Err(e) => {
error!("Failed to detect local ip address: {}", e);
}
}
}
}
#[cfg(target_os = "android")]
pub fn detect_server_addr(&mut self) {
if self.server_addr.is_empty() {
common_telemetry::debug!("detect local IP is not supported on Android");
}
}
fn sanitize_store_addrs(&self) -> Vec<String> {
self.store_addrs
.iter()
@@ -559,7 +582,6 @@ impl Metasrv {
if let Err(e) = res {
warn!(e; "Metasrv election error");
}
election.reset_campaign().await;
info!("Metasrv re-initiate election");
}
info!("Metasrv stopped");
@@ -616,7 +638,7 @@ impl Metasrv {
pub fn node_info(&self) -> MetasrvNodeInfo {
let build_info = common_version::build_info();
MetasrvNodeInfo {
addr: self.options().grpc.server_addr.clone(),
addr: self.options().server_addr.clone(),
version: build_info.version.to_string(),
git_commit: build_info.commit_short.to_string(),
start_time_ms: self.start_time_ms(),
@@ -708,7 +730,7 @@ impl Metasrv {
#[inline]
pub fn new_ctx(&self) -> Context {
let server_addr = self.options().grpc.server_addr.clone();
let server_addr = self.options().server_addr.clone();
let in_memory = self.in_memory.clone();
let kv_backend = self.kv_backend.clone();
let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();

View File

@@ -179,8 +179,8 @@ impl MetasrvBuilder {
let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
let state = Arc::new(RwLock::new(match election {
None => State::leader(options.grpc.server_addr.to_string(), true),
Some(_) => State::follower(options.grpc.server_addr.to_string()),
None => State::leader(options.server_addr.to_string(), true),
Some(_) => State::follower(options.server_addr.to_string()),
}));
let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::new(
@@ -203,7 +203,7 @@ impl MetasrvBuilder {
));
let maintenance_mode_manager = Arc::new(MaintenanceModeManager::new(kv_backend.clone()));
let selector_ctx = SelectorContext {
server_addr: options.grpc.server_addr.clone(),
server_addr: options.server_addr.clone(),
datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS,
flownode_lease_secs: distributed_time_constants::FLOWNODE_LEASE_SECS,
kv_backend: kv_backend.clone(),
@@ -272,7 +272,7 @@ impl MetasrvBuilder {
let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new(
mailbox.clone(),
MetasrvInfo {
server_addr: options.grpc.server_addr.clone(),
server_addr: options.server_addr.clone(),
},
));
let peer_lookup_service = Arc::new(MetaPeerLookupService::new(meta_peer_client.clone()));
@@ -315,7 +315,7 @@ impl MetasrvBuilder {
memory_region_keeper.clone(),
region_failure_detector_controller.clone(),
mailbox.clone(),
options.grpc.server_addr.clone(),
options.server_addr.clone(),
cache_invalidator.clone(),
),
));
@@ -390,7 +390,7 @@ impl MetasrvBuilder {
client: Arc::new(kafka_client),
table_metadata_manager: table_metadata_manager.clone(),
leader_region_registry: leader_region_registry.clone(),
server_addr: options.grpc.server_addr.clone(),
server_addr: options.server_addr.clone(),
mailbox: mailbox.clone(),
};
let wal_prune_manager = WalPruneManager::new(

View File

@@ -26,7 +26,6 @@ use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use hyper_util::rt::TokioIo;
use servers::grpc::GrpcOptions;
use tonic::codec::CompressionEncoding;
use tower::service_fn;
@@ -48,10 +47,7 @@ pub async fn mock_with_memstore() -> MockInfo {
let in_memory = Arc::new(MemoryKvBackend::new());
mock(
MetasrvOptions {
grpc: GrpcOptions {
server_addr: "127.0.0.1:3002".to_string(),
..Default::default()
},
server_addr: "127.0.0.1:3002".to_string(),
..Default::default()
},
kv_backend,
@@ -66,10 +62,7 @@ pub async fn mock_with_etcdstore(addr: &str) -> MockInfo {
let kv_backend = EtcdStore::with_endpoints([addr], 128).await.unwrap();
mock(
MetasrvOptions {
grpc: GrpcOptions {
server_addr: "127.0.0.1:3002".to_string(),
..Default::default()
},
server_addr: "127.0.0.1:3002".to_string(),
..Default::default()
},
kv_backend,
@@ -87,7 +80,7 @@ pub async fn mock(
datanode_clients: Option<Arc<NodeClients>>,
in_memory: Option<ResettableKvBackendRef>,
) -> MockInfo {
let server_addr = opts.grpc.server_addr.clone();
let server_addr = opts.server_addr.clone();
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
table_metadata_manager.init().await.unwrap();

View File

@@ -88,7 +88,7 @@ impl cluster_server::Cluster for Metasrv {
return Ok(Response::new(resp));
}
let leader_addr = &self.options().grpc.server_addr;
let leader_addr = &self.options().server_addr;
let (leader, followers) = match self.election() {
Some(election) => {
let nodes = election.all_candidates().await?;

View File

@@ -190,7 +190,6 @@ mod tests {
use api::v1::meta::*;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_telemetry::tracing_context::W3cTrace;
use servers::grpc::GrpcOptions;
use tonic::IntoRequest;
use super::get_node_id;
@@ -204,10 +203,7 @@ mod tests {
let metasrv = MetasrvBuilder::new()
.kv_backend(kv_backend)
.options(MetasrvOptions {
grpc: GrpcOptions {
server_addr: "127.0.0.1:3002".to_string(),
..Default::default()
},
server_addr: "127.0.0.1:3002".to_string(),
..Default::default()
})
.build()
@@ -220,7 +216,7 @@ mod tests {
let res = metasrv.ask_leader(req.into_request()).await.unwrap();
let res = res.into_inner();
assert_eq!(metasrv.options().grpc.server_addr, res.leader.unwrap().addr);
assert_eq!(metasrv.options().bind_addr, res.leader.unwrap().addr);
}
#[test]

View File

@@ -15,7 +15,6 @@
use std::collections::HashSet;
use std::sync::Arc;
use common_telemetry::debug;
use datafusion::datasource::DefaultTableSource;
use datafusion::error::Result as DfResult;
use datafusion_common::config::ConfigOptions;
@@ -155,7 +154,6 @@ struct PlanRewriter {
/// Partition columns of the table in current pass
partition_cols: Option<Vec<String>>,
column_requirements: HashSet<Column>,
expand_on_next_call: bool,
}
impl PlanRewriter {
@@ -176,10 +174,6 @@ impl PlanRewriter {
{
return true;
}
if self.expand_on_next_call {
self.expand_on_next_call = false;
return true;
}
match Categorizer::check_plan(plan, self.partition_cols.clone()) {
Commutativity::Commutative => {}
Commutativity::PartialCommutative => {
@@ -196,17 +190,12 @@ impl PlanRewriter {
self.stage.push(plan)
}
}
Commutativity::TransformedCommutative {
transformer,
expand_on_parent,
} => {
Commutativity::TransformedCommutative(transformer) => {
if let Some(transformer) = transformer
&& let Some(changed_plan) = transformer(plan)
&& let Some(plan) = transformer(plan)
{
debug!("PlanRewriter: transformed plan: {changed_plan} from {plan}");
self.update_column_requirements(&changed_plan);
self.stage.push(changed_plan);
self.expand_on_next_call = expand_on_parent;
self.update_column_requirements(&plan);
self.stage.push(plan)
}
}
Commutativity::NonCommutative
@@ -402,21 +391,10 @@ impl TreeNodeRewriter for PlanRewriter {
return Ok(Transformed::yes(node));
};
let parent = parent.clone();
// TODO(ruihang): avoid this clone
if self.should_expand(&parent) {
if self.should_expand(&parent.clone()) {
// TODO(ruihang): does this work for nodes with multiple children?;
debug!("PlanRewriter: should expand child:\n {node}\n Of Parent: {parent}");
let node = self.expand(node);
debug!(
"PlanRewriter: expanded plan: {}",
match &node {
Ok(n) => n.to_string(),
Err(e) => format!("Error expanding plan: {e}"),
}
);
let node = node?;
let node = self.expand(node)?;
self.pop_stack();
return Ok(Transformed::yes(node));
}

View File

@@ -15,9 +15,6 @@
use std::collections::HashSet;
use std::sync::Arc;
use common_function::aggr::{HllState, UddSketchState, HLL_NAME, UDDSKETCH_STATE_NAME};
use common_telemetry::debug;
use datafusion::functions_aggregate::sum::sum_udaf;
use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNode};
use promql::extension_plan::{
EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize,
@@ -26,157 +23,12 @@ use promql::extension_plan::{
use crate::dist_plan::merge_sort::{merge_sort_transformer, MergeSortLogicalPlan};
use crate::dist_plan::MergeScanLogicalPlan;
/// generate the upper aggregation plan that will execute on the frontend.
pub fn step_aggr_to_upper_aggr(aggr_plan: &LogicalPlan) -> datafusion_common::Result<LogicalPlan> {
let LogicalPlan::Aggregate(aggr) = aggr_plan else {
return Err(datafusion_common::DataFusionError::Plan(
"step_aggr_to_upper_aggr only accepts Aggregate plan".to_string(),
));
};
if !is_all_aggr_exprs_steppable(&aggr.aggr_expr) {
return Err(datafusion_common::DataFusionError::NotImplemented(
"Some aggregate expressions are not steppable".to_string(),
));
}
let mut upper_aggr_expr = vec![];
for aggr_expr in &aggr.aggr_expr {
let Some(aggr_func) = get_aggr_func(aggr_expr) else {
return Err(datafusion_common::DataFusionError::NotImplemented(
"Aggregate function not found".to_string(),
));
};
let col_name = aggr_expr.name_for_alias()?;
let input_column =
Expr::Column(datafusion_common::Column::new_unqualified(col_name.clone()));
let upper_func = match aggr_func.func.name() {
"sum" | "min" | "max" | "last_value" | "first_value" => {
// aggr_calc(aggr_merge(input_column))) as col_name
let mut new_aggr_func = aggr_func.clone();
new_aggr_func.args = vec![input_column.clone()];
new_aggr_func
}
"count" => {
// sum(input_column) as col_name
let mut new_aggr_func = aggr_func.clone();
new_aggr_func.func = sum_udaf();
new_aggr_func.args = vec![input_column.clone()];
new_aggr_func
}
UDDSKETCH_STATE_NAME => {
// udd_merge(bucket_size, error_rate input_column) as col_name
let mut new_aggr_func = aggr_func.clone();
new_aggr_func.func = Arc::new(UddSketchState::merge_udf_impl());
new_aggr_func.args[2] = input_column.clone();
new_aggr_func
}
HLL_NAME => {
// hll_merge(input_column) as col_name
let mut new_aggr_func = aggr_func.clone();
new_aggr_func.func = Arc::new(HllState::merge_udf_impl());
new_aggr_func.args = vec![input_column.clone()];
new_aggr_func
}
_ => {
return Err(datafusion_common::DataFusionError::NotImplemented(format!(
"Aggregate function {} is not supported for Step aggregation",
aggr_func.func.name()
)))
}
};
// deal with nested alias case
let mut new_aggr_expr = aggr_expr.clone();
{
let new_aggr_func = get_aggr_func_mut(&mut new_aggr_expr).unwrap();
*new_aggr_func = upper_func;
}
// make the column name the same, so parent can recognize it
upper_aggr_expr.push(new_aggr_expr.alias(col_name));
}
let mut new_aggr = aggr.clone();
new_aggr.aggr_expr = upper_aggr_expr;
// group by expr also need alias to avoid duplicated computing
let mut new_group_expr = new_aggr.group_expr.clone();
for expr in &mut new_group_expr {
if let Expr::Column(_) = expr {
// already a column, no need to change
continue;
}
let col_name = expr.name_for_alias()?;
let input_column =
Expr::Column(datafusion_common::Column::new_unqualified(col_name.clone()));
*expr = input_column.alias(col_name);
}
new_aggr.group_expr = new_group_expr;
// return the new logical plan
Ok(LogicalPlan::Aggregate(new_aggr))
}
/// Check if the given aggregate expression is steppable.
/// As in if it can be split into multiple steps:
/// i.e. on datanode first call `state(input)` then
/// on frontend call `calc(merge(state))` to get the final result.
///
pub fn is_all_aggr_exprs_steppable(aggr_exprs: &[Expr]) -> bool {
let step_action = HashSet::from([
"sum",
"count",
"min",
"max",
"first_value",
"last_value",
UDDSKETCH_STATE_NAME,
HLL_NAME,
]);
aggr_exprs.iter().all(|expr| {
if let Some(aggr_func) = get_aggr_func(expr) {
if aggr_func.distinct {
// Distinct aggregate functions are not steppable(yet).
return false;
}
step_action.contains(aggr_func.func.name())
} else {
false
}
})
}
pub fn get_aggr_func(expr: &Expr) -> Option<&datafusion_expr::expr::AggregateFunction> {
let mut expr_ref = expr;
while let Expr::Alias(alias) = expr_ref {
expr_ref = &alias.expr;
}
if let Expr::AggregateFunction(aggr_func) = expr_ref {
Some(aggr_func)
} else {
None
}
}
pub fn get_aggr_func_mut(expr: &mut Expr) -> Option<&mut datafusion_expr::expr::AggregateFunction> {
let mut expr_ref = expr;
while let Expr::Alias(alias) = expr_ref {
expr_ref = &mut alias.expr;
}
if let Expr::AggregateFunction(aggr_func) = expr_ref {
Some(aggr_func)
} else {
None
}
}
#[allow(dead_code)]
pub enum Commutativity {
Commutative,
PartialCommutative,
ConditionalCommutative(Option<Transformer>),
TransformedCommutative {
transformer: Option<Transformer>,
/// whether the transformer changes the child to parent
expand_on_parent: bool,
},
TransformedCommutative(Option<Transformer>),
NonCommutative,
Unimplemented,
/// For unrelated plans like DDL
@@ -203,18 +55,7 @@ impl Categorizer {
LogicalPlan::Filter(filter) => Self::check_expr(&filter.predicate),
LogicalPlan::Window(_) => Commutativity::Unimplemented,
LogicalPlan::Aggregate(aggr) => {
let is_all_steppable = is_all_aggr_exprs_steppable(&aggr.aggr_expr);
let is_partition = Self::check_partition(&aggr.group_expr, &partition_cols);
if !is_partition && is_all_steppable {
debug!("Plan is steppable: {plan}");
return Commutativity::TransformedCommutative {
transformer: Some(Arc::new(|plan: &LogicalPlan| {
step_aggr_to_upper_aggr(plan).ok()
})),
expand_on_parent: true,
};
}
if !is_partition {
if !Self::check_partition(&aggr.group_expr, &partition_cols) {
return Commutativity::NonCommutative;
}
for expr in &aggr.aggr_expr {

View File

@@ -52,7 +52,7 @@ impl FlightRecordBatchStream {
rx,
join_handle,
done: false,
encoder: FlightEncoder::default(),
encoder: FlightEncoder::with_compression_disabled(),
}
}

View File

@@ -403,10 +403,7 @@ pub async fn handle_get_trace(
.with_label_values(&[&db, "/api/traces"])
.start_timer();
let output = match handler
.get_trace(query_ctx, &trace_id, query_params.start, query_params.end)
.await
{
let output = match handler.get_trace(query_ctx, &trace_id).await {
Ok(output) => output,
Err(err) => {
return handle_query_error(

View File

@@ -203,13 +203,7 @@ pub trait JaegerQueryHandler {
) -> Result<Output>;
/// Get trace by trace id. It's used for `/api/traces/{trace_id}` API.
async fn get_trace(
&self,
ctx: QueryContextRef,
trace_id: &str,
start_time: Option<i64>,
end_time: Option<i64>,
) -> Result<Output>;
async fn get_trace(&self, ctx: QueryContextRef, trace_id: &str) -> Result<Output>;
/// Find traces by query params. It's used for `/api/traces` API.
async fn find_traces(

View File

@@ -56,7 +56,6 @@ use meta_srv::metasrv::{Metasrv, MetasrvOptions, SelectorRef};
use meta_srv::mocks::MockInfo;
use servers::grpc::flight::FlightCraftWrapper;
use servers::grpc::region_server::RegionServerRequestHandler;
use servers::grpc::GrpcOptions;
use servers::heartbeat_options::HeartbeatOptions;
use servers::server::ServerHandlers;
use tempfile::TempDir;
@@ -191,10 +190,7 @@ impl GreptimeDbClusterBuilder {
max_running_procedures: 128,
},
wal: self.metasrv_wal_config.clone(),
grpc: GrpcOptions {
server_addr: "127.0.0.1:3002".to_string(),
..Default::default()
},
server_addr: "127.0.0.1:3002".to_string(),
..Default::default()
};

View File

@@ -139,7 +139,8 @@ mod test {
let schema = record_batches[0].schema.arrow_schema().clone();
let stream = futures::stream::once(async move {
let mut schema_data = FlightEncoder::default().encode(FlightMessage::Schema(schema));
let mut schema_data =
FlightEncoder::with_compression_disabled().encode(FlightMessage::Schema(schema));
let metadata = DoPutMetadata::new(0);
schema_data.app_metadata = serde_json::to_vec(&metadata).unwrap().into();
// first message in "DoPut" stream should carry table name in flight descriptor
@@ -154,7 +155,7 @@ mod test {
tokio_stream::iter(record_batches)
.enumerate()
.map(|(i, x)| {
let mut encoder = FlightEncoder::default();
let mut encoder = FlightEncoder::with_compression_disabled();
let message = FlightMessage::RecordBatch(x.into_df_record_batch());
let mut data = encoder.encode(message);
let metadata = DoPutMetadata::new((i + 1) as i64);

View File

@@ -4542,7 +4542,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
let expected: Value = serde_json::from_str(expected).unwrap();
assert_eq!(resp, expected);
// Test `/api/traces/{trace_id}` API without start and end.
// Test `/api/traces/{trace_id}` API.
let res = client
.get("/v1/jaeger/api/traces/5611dce1bc9ebed65352d99a027b08ea")
.header("x-greptime-trace-table-name", trace_table_name)
@@ -4658,122 +4658,6 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
let expected: Value = serde_json::from_str(expected).unwrap();
assert_eq!(resp, expected);
// Test `/api/traces/{trace_id}` API with start and end in microseconds.
let res = client
.get("/v1/jaeger/api/traces/5611dce1bc9ebed65352d99a027b08ea?start=1738726754492421&end=1738726754642422")
.header("x-greptime-trace-table-name", trace_table_name)
.send()
.await;
assert_eq!(StatusCode::OK, res.status());
let expected = r#"{
"data": [
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spans": [
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spanID": "ffa03416a7b9ea48",
"operationName": "access-redis",
"references": [],
"startTime": 1738726754492422,
"duration": 100000,
"tags": [
{
"key": "net.peer.ip",
"type": "string",
"value": "1.2.3.4"
},
{
"key": "operation.type",
"type": "string",
"value": "access-redis"
},
{
"key": "otel.scope.name",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "otel.scope.version",
"type": "string",
"value": "1.0.0"
},
{
"key": "peer.service",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "span.kind",
"type": "string",
"value": "server"
}
],
"logs": [],
"processID": "p1"
},
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spanID": "008421dbbd33a3e9",
"operationName": "access-mysql",
"references": [],
"startTime": 1738726754492421,
"duration": 100000,
"tags": [
{
"key": "net.peer.ip",
"type": "string",
"value": "1.2.3.4"
},
{
"key": "operation.type",
"type": "string",
"value": "access-mysql"
},
{
"key": "otel.scope.name",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "otel.scope.version",
"type": "string",
"value": "1.0.0"
},
{
"key": "peer.service",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "span.kind",
"type": "string",
"value": "server"
}
],
"logs": [],
"processID": "p1"
}
],
"processes": {
"p1": {
"serviceName": "test-jaeger-query-api",
"tags": []
}
}
}
],
"total": 0,
"limit": 0,
"offset": 0,
"errors": []
}
"#;
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
let expected: Value = serde_json::from_str(expected).unwrap();
assert_eq!(resp, expected);
// Test `/api/traces` API.
let res = client
.get("/v1/jaeger/api/traces?service=test-jaeger-query-api&operation=access-mysql&start=1738726754492421&end=1738726754642422&tags=%7B%22operation.type%22%3A%22access-mysql%22%7D")

View File

@@ -1,409 +0,0 @@
CREATE TABLE integers(
host STRING,
i BIGINT,
ts TIMESTAMP TIME INDEX
) PARTITION ON COLUMNS (host) (
host < '550-A',
host >= '550-A'
AND host < '550-W',
host >= '550-W'
);
Affected Rows: 0
INSERT INTO integers (host, i, ts) VALUES
('550-A', 1, '2023-01-01 00:00:00'),
('550-A', 2, '2023-01-01 01:00:00'),
('550-W', 3, '2023-01-01 02:00:00'),
('550-W', 4, '2023-01-01 03:00:00');
Affected Rows: 4
-- count
EXPLAIN SELECT
count(i)
FROM
integers;
+---------------+-------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------+
| logical_plan | Aggregate: groupBy=[[]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] |
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[count(integers.i)]] |
| | TableScan: integers] |
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[count(integers.i)] |
| | CoalescePartitionsExec |
| | AggregateExec: mode=Partial, gby=[], aggr=[count(integers.i)] |
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
| | |
+---------------+-------------------------------------------------------------------------------------------------------+
EXPLAIN SELECT
ts,
count(i)
FROM
integers
GROUP BY
ts;
+---------------+---------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------+
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] |
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[count(integers.i)]] |
| | TableScan: integers] |
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(integers.i)] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(integers.i)] |
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
| | |
+---------------+---------------------------------------------------------------------------------------------------------+
EXPLAIN SELECT
count(i)
FROM
integers
GROUP BY
ts;
+---------------+-----------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: count(integers.i) |
| | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] |
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[count(integers.i)]] |
| | TableScan: integers] |
| physical_plan | ProjectionExec: expr=[count(integers.i)@1 as count(integers.i)] |
| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(integers.i)] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(integers.i)] |
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
| | |
+---------------+-----------------------------------------------------------------------------------------------------------+
-- sum
EXPLAIN SELECT
sum(i)
FROM
integers;
+---------------+-------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------+
| logical_plan | Aggregate: groupBy=[[]], aggr=[[sum(sum(integers.i)) AS sum(integers.i)]] |
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[sum(integers.i)]] |
| | TableScan: integers] |
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[sum(integers.i)] |
| | CoalescePartitionsExec |
| | AggregateExec: mode=Partial, gby=[], aggr=[sum(integers.i)] |
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
| | |
+---------------+-------------------------------------------------------------------------------------------------------+
EXPLAIN SELECT
ts,
sum(i)
FROM
integers
GROUP BY
ts;
+---------------+---------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------+
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(sum(integers.i)) AS sum(integers.i)]] |
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[sum(integers.i)]] |
| | TableScan: integers] |
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(integers.i)] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(integers.i)] |
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
| | |
+---------------+---------------------------------------------------------------------------------------------------------+
EXPLAIN SELECT
sum(i)
FROM
integers
GROUP BY
ts;
+---------------+-----------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: sum(integers.i) |
| | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(sum(integers.i)) AS sum(integers.i)]] |
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[sum(integers.i)]] |
| | TableScan: integers] |
| physical_plan | ProjectionExec: expr=[sum(integers.i)@1 as sum(integers.i)] |
| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(integers.i)] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(integers.i)] |
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
| | |
+---------------+-----------------------------------------------------------------------------------------------------------+
-- min
EXPLAIN SELECT
min(i)
FROM
integers;
+---------------+-------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------+
| logical_plan | Aggregate: groupBy=[[]], aggr=[[min(min(integers.i)) AS min(integers.i)]] |
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[min(integers.i)]] |
| | TableScan: integers] |
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[min(integers.i)] |
| | CoalescePartitionsExec |
| | AggregateExec: mode=Partial, gby=[], aggr=[min(integers.i)] |
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
| | |
+---------------+-------------------------------------------------------------------------------------------------------+
EXPLAIN SELECT
ts,
min(i)
FROM
integers
GROUP BY
ts;
+---------------+---------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------+
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[min(min(integers.i)) AS min(integers.i)]] |
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[min(integers.i)]] |
| | TableScan: integers] |
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[min(integers.i)] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[min(integers.i)] |
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
| | |
+---------------+---------------------------------------------------------------------------------------------------------+
EXPLAIN SELECT
min(i)
FROM
integers
GROUP BY
ts;
+---------------+-----------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: min(integers.i) |
| | Aggregate: groupBy=[[integers.ts]], aggr=[[min(min(integers.i)) AS min(integers.i)]] |
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[min(integers.i)]] |
| | TableScan: integers] |
| physical_plan | ProjectionExec: expr=[min(integers.i)@1 as min(integers.i)] |
| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[min(integers.i)] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[min(integers.i)] |
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
| | |
+---------------+-----------------------------------------------------------------------------------------------------------+
-- max
EXPLAIN SELECT
max(i)
FROM
integers;
+---------------+-------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------+
| logical_plan | Aggregate: groupBy=[[]], aggr=[[max(max(integers.i)) AS max(integers.i)]] |
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[max(integers.i)]] |
| | TableScan: integers] |
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[max(integers.i)] |
| | CoalescePartitionsExec |
| | AggregateExec: mode=Partial, gby=[], aggr=[max(integers.i)] |
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
| | |
+---------------+-------------------------------------------------------------------------------------------------------+
EXPLAIN SELECT
ts,
max(i)
FROM
integers
GROUP BY
ts;
+---------------+---------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------+
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[max(max(integers.i)) AS max(integers.i)]] |
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[max(integers.i)]] |
| | TableScan: integers] |
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[max(integers.i)] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[max(integers.i)] |
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
| | |
+---------------+---------------------------------------------------------------------------------------------------------+
EXPLAIN SELECT
max(i)
FROM
integers
GROUP BY
ts;
+---------------+-----------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: max(integers.i) |
| | Aggregate: groupBy=[[integers.ts]], aggr=[[max(max(integers.i)) AS max(integers.i)]] |
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[max(integers.i)]] |
| | TableScan: integers] |
| physical_plan | ProjectionExec: expr=[max(integers.i)@1 as max(integers.i)] |
| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[max(integers.i)] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[max(integers.i)] |
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
| | |
+---------------+-----------------------------------------------------------------------------------------------------------+
-- uddsketch_state
EXPLAIN SELECT
uddsketch_state(128, 0.01, i)
FROM
integers;
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Aggregate: groupBy=[[]], aggr=[[uddsketch_merge(Int64(128), Float64(0.01), uddsketch_state(Int64(128),Float64(0.01),integers.i)) AS uddsketch_state(Int64(128),Float64(0.01),integers.i)]] |
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[uddsketch_state(Int64(128), Float64(0.01), CAST(integers.i AS Float64))]] |
| | TableScan: integers] |
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
| | CoalescePartitionsExec |
| | AggregateExec: mode=Partial, gby=[], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
| | |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
EXPLAIN SELECT
ts,
uddsketch_state(128, 0.01, i)
FROM
integers
GROUP BY
ts;
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[uddsketch_merge(Int64(128), Float64(0.01), uddsketch_state(Int64(128),Float64(0.01),integers.i)) AS uddsketch_state(Int64(128),Float64(0.01),integers.i)]] |
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[uddsketch_state(Int64(128), Float64(0.01), CAST(integers.i AS Float64))]] |
| | TableScan: integers] |
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
EXPLAIN SELECT
uddsketch_state(128, 0.01, i)
FROM
integers
GROUP BY
ts;
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: uddsketch_state(Int64(128),Float64(0.01),integers.i) |
| | Aggregate: groupBy=[[integers.ts]], aggr=[[uddsketch_merge(Int64(128), Float64(0.01), uddsketch_state(Int64(128),Float64(0.01),integers.i)) AS uddsketch_state(Int64(128),Float64(0.01),integers.i)]] |
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[uddsketch_state(Int64(128), Float64(0.01), CAST(integers.i AS Float64))]] |
| | TableScan: integers] |
| physical_plan | ProjectionExec: expr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)@1 as uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- hll
EXPLAIN SELECT
hll(i)
FROM
integers;
+---------------+----------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------------------------------------+
| logical_plan | Aggregate: groupBy=[[]], aggr=[[hll_merge(hll(integers.i)) AS hll(integers.i)]] |
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[hll(CAST(integers.i AS Utf8))]] |
| | TableScan: integers] |
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[hll(integers.i)] |
| | CoalescePartitionsExec |
| | AggregateExec: mode=Partial, gby=[], aggr=[hll(integers.i)] |
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
| | |
+---------------+----------------------------------------------------------------------------------------------------------+
EXPLAIN SELECT
ts,
hll(i)
FROM
integers
GROUP BY
ts;
+---------------+---------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------+
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[hll_merge(hll(integers.i)) AS hll(integers.i)]] |
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[hll(CAST(integers.i AS Utf8))]] |
| | TableScan: integers] |
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[hll(integers.i)] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[hll(integers.i)] |
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------+
EXPLAIN SELECT
hll(i)
FROM
integers
GROUP BY
ts;
+---------------+-----------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: hll(integers.i) |
| | Aggregate: groupBy=[[integers.ts]], aggr=[[hll_merge(hll(integers.i)) AS hll(integers.i)]] |
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[hll(CAST(integers.i AS Utf8))]] |
| | TableScan: integers] |
| physical_plan | ProjectionExec: expr=[hll(integers.i)@1 as hll(integers.i)] |
| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[hll(integers.i)] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[hll(integers.i)] |
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------+
DROP TABLE integers;
Affected Rows: 0

View File

@@ -1,144 +0,0 @@
CREATE TABLE integers(
host STRING,
i BIGINT,
ts TIMESTAMP TIME INDEX
) PARTITION ON COLUMNS (host) (
host < '550-A',
host >= '550-A'
AND host < '550-W',
host >= '550-W'
);
INSERT INTO integers (host, i, ts) VALUES
('550-A', 1, '2023-01-01 00:00:00'),
('550-A', 2, '2023-01-01 01:00:00'),
('550-W', 3, '2023-01-01 02:00:00'),
('550-W', 4, '2023-01-01 03:00:00');
-- count
EXPLAIN SELECT
count(i)
FROM
integers;
EXPLAIN SELECT
ts,
count(i)
FROM
integers
GROUP BY
ts;
EXPLAIN SELECT
count(i)
FROM
integers
GROUP BY
ts;
-- sum
EXPLAIN SELECT
sum(i)
FROM
integers;
EXPLAIN SELECT
ts,
sum(i)
FROM
integers
GROUP BY
ts;
EXPLAIN SELECT
sum(i)
FROM
integers
GROUP BY
ts;
-- min
EXPLAIN SELECT
min(i)
FROM
integers;
EXPLAIN SELECT
ts,
min(i)
FROM
integers
GROUP BY
ts;
EXPLAIN SELECT
min(i)
FROM
integers
GROUP BY
ts;
-- max
EXPLAIN SELECT
max(i)
FROM
integers;
EXPLAIN SELECT
ts,
max(i)
FROM
integers
GROUP BY
ts;
EXPLAIN SELECT
max(i)
FROM
integers
GROUP BY
ts;
-- uddsketch_state
EXPLAIN SELECT
uddsketch_state(128, 0.01, i)
FROM
integers;
EXPLAIN SELECT
ts,
uddsketch_state(128, 0.01, i)
FROM
integers
GROUP BY
ts;
EXPLAIN SELECT
uddsketch_state(128, 0.01, i)
FROM
integers
GROUP BY
ts;
-- hll
EXPLAIN SELECT
hll(i)
FROM
integers;
EXPLAIN SELECT
ts,
hll(i)
FROM
integers
GROUP BY
ts;
EXPLAIN SELECT
hll(i)
FROM
integers
GROUP BY
ts;
DROP TABLE integers;

View File

@@ -1,80 +0,0 @@
CREATE TABLE integers(
host STRING,
i BIGINT,
ts TIMESTAMP TIME INDEX
) PARTITION ON COLUMNS (host) (
host < '550-A',
host >= '550-A'
AND host < '550-W',
host >= '550-W'
);
Affected Rows: 0
-- count
EXPLAIN SELECT
count(i)
FROM
integers;
+---------------+-------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------+
| logical_plan | Aggregate: groupBy=[[]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] |
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[count(integers.i)]] |
| | TableScan: integers] |
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[count(integers.i)] |
| | CoalescePartitionsExec |
| | AggregateExec: mode=Partial, gby=[], aggr=[count(integers.i)] |
| | MergeScanExec: peers=[4398046511104(1024, 0), 4398046511105(1024, 1), 4398046511106(1024, 2), ] |
| | |
+---------------+-------------------------------------------------------------------------------------------------------+
EXPLAIN SELECT
ts,
count(i)
FROM
integers
GROUP BY
ts;
+---------------+---------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------+
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] |
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[count(integers.i)]] |
| | TableScan: integers] |
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(integers.i)] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(integers.i)] |
| | MergeScanExec: peers=[4398046511104(1024, 0), 4398046511105(1024, 1), 4398046511106(1024, 2), ] |
| | |
+---------------+---------------------------------------------------------------------------------------------------------+
EXPLAIN SELECT
date_bin('1 hour'::INTERVAL, ts),
count(i)
FROM
integers
GROUP BY
date_bin('1 hour'::INTERVAL, ts);
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Aggregate: groupBy=[[date_bin(Utf8("1 hour"),integers.ts) AS date_bin(Utf8("1 hour"),integers.ts)]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] |
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[date_bin(CAST(Utf8("1 hour") AS Interval(MonthDayNano)), integers.ts)]], aggr=[[count(integers.i)]] |
| | TableScan: integers] |
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[date_bin(Utf8("1 hour"),integers.ts)@0 as date_bin(Utf8("1 hour"),integers.ts)], aggr=[count(integers.i)] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=Hash([date_bin(Utf8("1 hour"),integers.ts)@0], 20), input_partitions=20 |
| | AggregateExec: mode=Partial, gby=[date_bin(Utf8("1 hour"),integers.ts)@0 as date_bin(Utf8("1 hour"),integers.ts)], aggr=[count(integers.i)] |
| | MergeScanExec: peers=[4398046511104(1024, 0), 4398046511105(1024, 1), 4398046511106(1024, 2), ] |
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
DROP TABLE integers;
Affected Rows: 0

View File

@@ -1,34 +0,0 @@
CREATE TABLE integers(
host STRING,
i BIGINT,
ts TIMESTAMP TIME INDEX
) PARTITION ON COLUMNS (host) (
host < '550-A',
host >= '550-A'
AND host < '550-W',
host >= '550-W'
);
-- count
EXPLAIN SELECT
count(i)
FROM
integers;
EXPLAIN SELECT
ts,
count(i)
FROM
integers
GROUP BY
ts;
EXPLAIN SELECT
date_bin('1 hour'::INTERVAL, ts),
count(i)
FROM
integers
GROUP BY
date_bin('1 hour'::INTERVAL, ts);
DROP TABLE integers;