Compare commits

..

2 Commits

Author SHA1 Message Date
discord9
1786190235 feat(exp): adjust_flow admin function 2025-06-09 17:02:42 +08:00
discord9
705b2007cf feat: flownode to frontend load balance with guess 2025-06-09 16:34:19 +08:00
78 changed files with 798 additions and 1554 deletions

View File

@@ -59,7 +59,7 @@ runs:
--set base.podTemplate.main.resources.requests.cpu=50m \
--set base.podTemplate.main.resources.requests.memory=256Mi \
--set base.podTemplate.main.resources.limits.cpu=2000m \
--set base.podTemplate.main.resources.limits.memory=3Gi \
--set base.podTemplate.main.resources.limits.memory=2Gi \
--set frontend.replicas=${{ inputs.frontend-replicas }} \
--set datanode.replicas=${{ inputs.datanode-replicas }} \
--set meta.replicas=${{ inputs.meta-replicas }} \

View File

@@ -250,11 +250,6 @@ jobs:
name: unstable-fuzz-logs
path: /tmp/unstable-greptime/
retention-days: 3
- name: Describe pods
if: failure()
shell: bash
run: |
kubectl describe pod -n my-greptimedb
build-greptime-ci:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
@@ -410,11 +405,6 @@ jobs:
shell: bash
run: |
kubectl describe nodes
- name: Describe pod
if: failure()
shell: bash
run: |
kubectl describe pod -n my-greptimedb
- name: Export kind logs
if: failure()
shell: bash
@@ -564,11 +554,6 @@ jobs:
shell: bash
run: |
kubectl describe nodes
- name: Describe pods
if: failure()
shell: bash
run: |
kubectl describe pod -n my-greptimedb
- name: Export kind logs
if: failure()
shell: bash

20
Cargo.lock generated
View File

@@ -4511,9 +4511,9 @@ dependencies = [
[[package]]
name = "flate2"
version = "1.1.2"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a3d7db9596fecd151c5f638c0ee5d5bd487b6e0ea232e5dc96d5250f6f94b1d"
checksum = "7ced92e76e966ca2fd84c8f7aa01a4aea65b0eb6648d72f7c8f3e2764a67fece"
dependencies = [
"crc32fast",
"libz-rs-sys",
@@ -5146,9 +5146,9 @@ dependencies = [
[[package]]
name = "grok"
version = "2.1.0"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c52724b609896f661a3f4641dd3a44dc602958ef615857c12d00756b4e9355b"
checksum = "273797968160270573071022613fc4aa28b91fe68f3eef6c96a1b2a1947ddfbd"
dependencies = [
"glob",
"onig",
@@ -6716,9 +6716,9 @@ dependencies = [
[[package]]
name = "libz-rs-sys"
version = "0.5.1"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "172a788537a2221661b480fee8dc5f96c580eb34fa88764d3205dc356c7e4221"
checksum = "6489ca9bd760fe9642d7644e827b0c9add07df89857b0416ee15c1cc1a3b8c5a"
dependencies = [
"zlib-rs",
]
@@ -9664,9 +9664,9 @@ dependencies = [
[[package]]
name = "psl"
version = "2.1.119"
version = "2.1.112"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0e49aa528239f2ca13ad87387977c208e59c3fb8c437609f95f1b3898ec6ef1"
checksum = "1c6b4c497a0c6bfb466f75167c728b1a861b0cdc39de9c35b877208a270a9590"
dependencies = [
"psl-types",
]
@@ -14701,9 +14701,9 @@ dependencies = [
[[package]]
name = "zlib-rs"
version = "0.5.1"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "626bd9fa9734751fc50d6060752170984d7053f5a39061f524cda68023d4db8a"
checksum = "868b928d7949e09af2f6086dfc1e01936064cc7a819253bce650d4e2a2d63ba8"
[[package]]
name = "zstd"

View File

@@ -195,13 +195,13 @@
| `slow_query.record_type` | String | Unset | The record type of slow queries. It can be `system_table` or `log`. |
| `slow_query.threshold` | String | Unset | The threshold of slow query. |
| `slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. |
| `export_metrics` | -- | -- | The standalone can export its metrics and send to Prometheus compatible service (e.g. `greptimedb`) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommended to collect metrics generated by itself<br/>You must create the database before enabling it. |
| `export_metrics.self_import.db` | String | Unset | -- |
| `export_metrics.remote_write` | -- | -- | -- |
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
@@ -232,7 +232,6 @@
| `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
| `grpc.server_addr` | String | `127.0.0.1:4001` | The address advertised to the metasrv, and used for connections from outside the host.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `grpc.bind_addr`. |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for frontend side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression. |
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
| `grpc.tls.mode` | String | `disable` | TLS mode. |
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |
@@ -299,11 +298,13 @@
| `slow_query.threshold` | String | `30s` | The threshold of slow query. It can be human readable time string, for example: `10s`, `100ms`, `1s`. |
| `slow_query.sample_ratio` | Float | `1.0` | The sampling ratio of slow query log. The value should be in the range of (0, 1]. For example, `0.1` means 10% of the slow queries will be logged and `1.0` means all slow queries will be logged. |
| `slow_query.ttl` | String | `30d` | The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`. |
| `export_metrics` | -- | -- | The frontend can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
| `export_metrics.self_import.db` | String | Unset | -- |
| `export_metrics.remote_write` | -- | -- | -- |
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
@@ -314,9 +315,11 @@
| Key | Type | Default | Descriptions |
| --- | -----| ------- | ----------- |
| `data_home` | String | `./greptimedb_data` | The working home directory. |
| `bind_addr` | String | `127.0.0.1:3002` | The bind address of metasrv. |
| `server_addr` | String | `127.0.0.1:3002` | The communication server address for the frontend and datanode to connect to metasrv.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `bind_addr`. |
| `store_addrs` | Array | -- | Store server address default to etcd store.<br/>For postgres store, the format is:<br/>"password=password dbname=postgres user=postgres host=localhost port=5432"<br/>For etcd store, the format is:<br/>"127.0.0.1:2379" |
| `store_key_prefix` | String | `""` | If it's not empty, the metasrv will store all data with this key prefix. |
| `backend` | String | `etcd_store` | The datastore for meta server.<br/>Available values:<br/>- `etcd_store` (default value)<br/>- `memory_store`<br/>- `postgres_store`<br/>- `mysql_store` |
| `backend` | String | `etcd_store` | The datastore for meta server.<br/>Available values:<br/>- `etcd_store` (default value)<br/>- `memory_store`<br/>- `postgres_store` |
| `meta_table_name` | String | `greptime_metakv` | Table name in RDS to store metadata. Effect when using a RDS kvbackend.<br/>**Only used when backend is `postgres_store`.** |
| `meta_election_lock_id` | Integer | `1` | Advisory lock id in PostgreSQL for election. Effect when using PostgreSQL as kvbackend<br/>Only used when backend is `postgres_store`. |
| `selector` | String | `round_robin` | Datanode selector type.<br/>- `round_robin` (default value)<br/>- `lease_based`<br/>- `load_based`<br/>For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". |
@@ -328,12 +331,6 @@
| `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.bind_addr` | String | `127.0.0.1:3002` | The address to bind the gRPC server. |
| `grpc.server_addr` | String | `127.0.0.1:3002` | The communication server address for the frontend and datanode to connect to metasrv.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `bind_addr`. |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
@@ -375,11 +372,13 @@
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `export_metrics` | -- | -- | The metasrv can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
| `export_metrics.self_import.db` | String | Unset | -- |
| `export_metrics.remote_write` | -- | -- | -- |
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
@@ -405,7 +404,6 @@
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for datanode side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression. |
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
| `grpc.tls.mode` | String | `disable` | TLS mode. |
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |
@@ -538,11 +536,13 @@
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
| `export_metrics.self_import.db` | String | Unset | -- |
| `export_metrics.remote_write` | -- | -- | -- |
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |

View File

@@ -44,12 +44,6 @@ runtime_size = 8
max_recv_message_size = "512MB"
## The maximum send message size for gRPC server.
max_send_message_size = "512MB"
## Compression mode for datanode side Arrow IPC service. Available options:
## - `none`: disable all compression
## - `transport`: only enable gRPC transport compression (zstd)
## - `arrow_ipc`: only enable Arrow IPC compression (lz4)
## - `all`: enable all compression.
flight_compression = "arrow_ipc"
## gRPC server TLS options, see `mysql.tls` section.
[grpc.tls]
@@ -641,16 +635,24 @@ max_log_files = 720
[logging.tracing_sample_ratio]
default_ratio = 1.0
## The datanode can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
[export_metrics]
## whether enable export metrics.
enable = false
## The interval of export metrics.
write_interval = "30s"
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
## You must create the database before enabling it.
[export_metrics.self_import]
## @toml2docs:none-default
db = "greptime_metrics"
[export_metrics.remote_write]
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
url = ""
## HTTP headers of Prometheus remote-write carry.

View File

@@ -54,12 +54,6 @@ bind_addr = "127.0.0.1:4001"
server_addr = "127.0.0.1:4001"
## The number of server worker threads.
runtime_size = 8
## Compression mode for frontend side Arrow IPC service. Available options:
## - `none`: disable all compression
## - `transport`: only enable gRPC transport compression (zstd)
## - `arrow_ipc`: only enable Arrow IPC compression (lz4)
## - `all`: enable all compression.
flight_compression = "arrow_ipc"
## gRPC server TLS options, see `mysql.tls` section.
[grpc.tls]
@@ -253,16 +247,24 @@ sample_ratio = 1.0
## The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`.
ttl = "30d"
## The frontend can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
[export_metrics]
## whether enable export metrics.
enable = false
## The interval of export metrics.
write_interval = "30s"
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
## You must create the database before enabling it.
[export_metrics.self_import]
## @toml2docs:none-default
db = "greptime_metrics"
[export_metrics.remote_write]
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
url = ""
## HTTP headers of Prometheus remote-write carry.

View File

@@ -1,6 +1,14 @@
## The working home directory.
data_home = "./greptimedb_data"
## The bind address of metasrv.
bind_addr = "127.0.0.1:3002"
## The communication server address for the frontend and datanode to connect to metasrv.
## If left empty or unset, the server will automatically use the IP address of the first network interface
## on the host, with the same port number as the one specified in `bind_addr`.
server_addr = "127.0.0.1:3002"
## Store server address default to etcd store.
## For postgres store, the format is:
## "password=password dbname=postgres user=postgres host=localhost port=5432"
@@ -16,7 +24,6 @@ store_key_prefix = ""
## - `etcd_store` (default value)
## - `memory_store`
## - `postgres_store`
## - `mysql_store`
backend = "etcd_store"
## Table name in RDS to store metadata. Effect when using a RDS kvbackend.
@@ -60,21 +67,6 @@ node_max_idle_time = "24hours"
## The number of threads to execute the runtime for global write operations.
#+ compact_rt_size = 4
## The gRPC server options.
[grpc]
## The address to bind the gRPC server.
bind_addr = "127.0.0.1:3002"
## The communication server address for the frontend and datanode to connect to metasrv.
## If left empty or unset, the server will automatically use the IP address of the first network interface
## on the host, with the same port number as the one specified in `bind_addr`.
server_addr = "127.0.0.1:3002"
## The number of server worker threads.
runtime_size = 8
## The maximum receive message size for gRPC server.
max_recv_message_size = "512MB"
## The maximum send message size for gRPC server.
max_send_message_size = "512MB"
## The HTTP server options.
[http]
## The address to bind the HTTP server.
@@ -237,16 +229,24 @@ max_log_files = 720
[logging.tracing_sample_ratio]
default_ratio = 1.0
## The metasrv can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
[export_metrics]
## whether enable export metrics.
enable = false
## The interval of export metrics.
write_interval = "30s"
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
## You must create the database before enabling it.
[export_metrics.self_import]
## @toml2docs:none-default
db = "greptime_metrics"
[export_metrics.remote_write]
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
url = ""
## HTTP headers of Prometheus remote-write carry.

View File

@@ -750,11 +750,13 @@ default_ratio = 1.0
## @toml2docs:none-default
#+ sample_ratio = 1.0
## The standalone can export its metrics and send to Prometheus compatible service (e.g. `greptimedb`) from remote-write API.
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
[export_metrics]
## whether enable export metrics.
enable = false
## The interval of export metrics.
write_interval = "30s"
@@ -765,7 +767,7 @@ write_interval = "30s"
db = "greptime_metrics"
[export_metrics.remote_write]
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
url = ""
## HTTP headers of Prometheus remote-write carry.

View File

@@ -237,20 +237,12 @@ impl StartCommand {
tokio_console_addr: global_options.tokio_console_addr.clone(),
};
#[allow(deprecated)]
if let Some(addr) = &self.rpc_bind_addr {
opts.bind_addr.clone_from(addr);
opts.grpc.bind_addr.clone_from(addr);
} else if !opts.bind_addr.is_empty() {
opts.grpc.bind_addr.clone_from(&opts.bind_addr);
}
#[allow(deprecated)]
if let Some(addr) = &self.rpc_server_addr {
opts.server_addr.clone_from(addr);
opts.grpc.server_addr.clone_from(addr);
} else if !opts.server_addr.is_empty() {
opts.grpc.server_addr.clone_from(&opts.server_addr);
}
if let Some(addrs) = &self.store_addrs {
@@ -327,7 +319,7 @@ impl StartCommand {
let plugin_opts = opts.plugins;
let mut opts = opts.component;
opts.grpc.detect_server_addr();
opts.detect_server_addr();
info!("Metasrv options: {:#?}", opts);
@@ -371,7 +363,7 @@ mod tests {
};
let options = cmd.load_options(&Default::default()).unwrap().component;
assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr);
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
assert_eq!(vec!["127.0.0.1:2380".to_string()], options.store_addrs);
assert_eq!(SelectorType::LoadBased, options.selector);
}
@@ -404,8 +396,8 @@ mod tests {
};
let options = cmd.load_options(&Default::default()).unwrap().component;
assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr);
assert_eq!("127.0.0.1:3002".to_string(), options.grpc.server_addr);
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
assert_eq!("127.0.0.1:3002".to_string(), options.server_addr);
assert_eq!(vec!["127.0.0.1:2379".to_string()], options.store_addrs);
assert_eq!(SelectorType::LeaseBased, options.selector);
assert_eq!("debug", options.logging.level.as_ref().unwrap());
@@ -517,10 +509,10 @@ mod tests {
let opts = command.load_options(&Default::default()).unwrap().component;
// Should be read from env, env > default values.
assert_eq!(opts.grpc.bind_addr, "127.0.0.1:14002");
assert_eq!(opts.bind_addr, "127.0.0.1:14002");
// Should be read from config file, config file > env > default values.
assert_eq!(opts.grpc.server_addr, "127.0.0.1:3002");
assert_eq!(opts.server_addr, "127.0.0.1:3002");
// Should be read from cli, cli > config file > env > default values.
assert_eq!(opts.http.addr, "127.0.0.1:14000");

View File

@@ -95,7 +95,7 @@ fn test_load_datanode_example_config() {
..Default::default()
},
export_metrics: ExportMetricsOption {
self_import: None,
self_import: Some(Default::default()),
remote_write: Some(Default::default()),
..Default::default()
},
@@ -148,7 +148,7 @@ fn test_load_frontend_example_config() {
},
},
export_metrics: ExportMetricsOption {
self_import: None,
self_import: Some(Default::default()),
remote_write: Some(Default::default()),
..Default::default()
},
@@ -176,11 +176,7 @@ fn test_load_metasrv_example_config() {
component: MetasrvOptions {
selector: SelectorType::default(),
data_home: DEFAULT_DATA_HOME.to_string(),
grpc: GrpcOptions {
bind_addr: "127.0.0.1:3002".to_string(),
server_addr: "127.0.0.1:3002".to_string(),
..Default::default()
},
server_addr: "127.0.0.1:3002".to_string(),
logging: LoggingOptions {
dir: Path::new(DEFAULT_DATA_HOME)
.join(DEFAULT_LOGGING_DIR)
@@ -199,7 +195,7 @@ fn test_load_metasrv_example_config() {
},
},
export_metrics: ExportMetricsOption {
self_import: None,
self_import: Some(Default::default()),
remote_write: Some(Default::default()),
..Default::default()
},

View File

@@ -12,7 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod approximate;
#[cfg(feature = "geo")]
pub mod geo;
pub mod vector;
mod geo_path;
mod hll;
mod uddsketch_state;
pub use geo_path::{GeoPathAccumulator, GEO_PATH_NAME};
pub(crate) use hll::HllStateType;
pub use hll::{HllState, HLL_MERGE_NAME, HLL_NAME};
pub use uddsketch_state::{UddSketchState, UDDSKETCH_MERGE_NAME, UDDSKETCH_STATE_NAME};

View File

@@ -47,7 +47,7 @@ impl GeoPathAccumulator {
Self::default()
}
pub fn uadf_impl() -> AggregateUDF {
pub fn udf_impl() -> AggregateUDF {
create_udaf(
GEO_PATH_NAME,
// Input types: lat, lng, timestamp

View File

@@ -1,32 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::function_registry::FunctionRegistry;
pub(crate) mod hll;
mod uddsketch;
pub(crate) struct ApproximateFunction;
impl ApproximateFunction {
pub fn register(registry: &FunctionRegistry) {
// uddsketch
registry.register_aggr(uddsketch::UddSketchState::state_udf_impl());
registry.register_aggr(uddsketch::UddSketchState::merge_udf_impl());
// hll
registry.register_aggr(hll::HllState::state_udf_impl());
registry.register_aggr(hll::HllState::merge_udf_impl());
}
}

View File

@@ -1,27 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::function_registry::FunctionRegistry;
mod encoding;
mod geo_path;
pub(crate) struct GeoFunction;
impl GeoFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_aggr(geo_path::GeoPathAccumulator::uadf_impl());
registry.register_aggr(encoding::JsonPathAccumulator::uadf_impl());
}
}

View File

@@ -1,29 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::aggrs::vector::product::VectorProduct;
use crate::aggrs::vector::sum::VectorSum;
use crate::function_registry::FunctionRegistry;
mod product;
mod sum;
pub(crate) struct VectorFunction;
impl VectorFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_aggr(VectorSum::uadf_impl());
registry.register_aggr(VectorProduct::uadf_impl());
}
}

View File

@@ -1,63 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use datafusion_expr::ScalarUDF;
use crate::function::{FunctionContext, FunctionRef};
use crate::scalars::udf::create_udf;
/// A factory for creating `ScalarUDF` that require a function context.
#[derive(Clone)]
pub struct ScalarFunctionFactory {
name: String,
factory: Arc<dyn Fn(FunctionContext) -> ScalarUDF + Send + Sync>,
}
impl ScalarFunctionFactory {
/// Returns the name of the function.
pub fn name(&self) -> &str {
&self.name
}
/// Returns a `ScalarUDF` when given a function context.
pub fn provide(&self, ctx: FunctionContext) -> ScalarUDF {
(self.factory)(ctx)
}
}
impl From<ScalarUDF> for ScalarFunctionFactory {
fn from(df_udf: ScalarUDF) -> Self {
let name = df_udf.name().to_string();
let func = Arc::new(move |_ctx| df_udf.clone());
Self {
name,
factory: func,
}
}
}
impl From<FunctionRef> for ScalarFunctionFactory {
fn from(func: FunctionRef) -> Self {
let name = func.name().to_string();
let func = Arc::new(move |ctx: FunctionContext| {
create_udf(func.clone(), ctx.query_ctx, ctx.state)
});
Self {
name,
factory: func,
}
}
}

View File

@@ -16,14 +16,11 @@
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use datafusion_expr::AggregateUDF;
use once_cell::sync::Lazy;
use crate::admin::AdminFunction;
use crate::aggrs::approximate::ApproximateFunction;
use crate::aggrs::vector::VectorFunction as VectorAggrFunction;
use crate::function::{AsyncFunctionRef, Function, FunctionRef};
use crate::function_factory::ScalarFunctionFactory;
use crate::function::{AsyncFunctionRef, FunctionRef};
use crate::scalars::aggregate::{AggregateFunctionMetaRef, AggregateFunctions};
use crate::scalars::date::DateFunction;
use crate::scalars::expression::ExpressionFunction;
use crate::scalars::hll_count::HllCalcFunction;
@@ -34,19 +31,18 @@ use crate::scalars::matches_term::MatchesTermFunction;
use crate::scalars::math::MathFunction;
use crate::scalars::timestamp::TimestampFunction;
use crate::scalars::uddsketch_calc::UddSketchCalcFunction;
use crate::scalars::vector::VectorFunction as VectorScalarFunction;
use crate::scalars::vector::VectorFunction;
use crate::system::SystemFunction;
#[derive(Default)]
pub struct FunctionRegistry {
functions: RwLock<HashMap<String, ScalarFunctionFactory>>,
functions: RwLock<HashMap<String, FunctionRef>>,
async_functions: RwLock<HashMap<String, AsyncFunctionRef>>,
aggregate_functions: RwLock<HashMap<String, AggregateUDF>>,
aggregate_functions: RwLock<HashMap<String, AggregateFunctionMetaRef>>,
}
impl FunctionRegistry {
pub fn register(&self, func: impl Into<ScalarFunctionFactory>) {
let func = func.into();
pub fn register(&self, func: FunctionRef) {
let _ = self
.functions
.write()
@@ -54,10 +50,6 @@ impl FunctionRegistry {
.insert(func.name().to_string(), func);
}
pub fn register_scalar(&self, func: impl Function + 'static) {
self.register(Arc::new(func) as FunctionRef);
}
pub fn register_async(&self, func: AsyncFunctionRef) {
let _ = self
.async_functions
@@ -66,14 +58,6 @@ impl FunctionRegistry {
.insert(func.name().to_string(), func);
}
pub fn register_aggr(&self, func: AggregateUDF) {
let _ = self
.aggregate_functions
.write()
.unwrap()
.insert(func.name().to_string(), func);
}
pub fn get_async_function(&self, name: &str) -> Option<AsyncFunctionRef> {
self.async_functions.read().unwrap().get(name).cloned()
}
@@ -87,16 +71,27 @@ impl FunctionRegistry {
.collect()
}
#[cfg(test)]
pub fn get_function(&self, name: &str) -> Option<ScalarFunctionFactory> {
pub fn register_aggregate_function(&self, func: AggregateFunctionMetaRef) {
let _ = self
.aggregate_functions
.write()
.unwrap()
.insert(func.name(), func);
}
pub fn get_aggr_function(&self, name: &str) -> Option<AggregateFunctionMetaRef> {
self.aggregate_functions.read().unwrap().get(name).cloned()
}
pub fn get_function(&self, name: &str) -> Option<FunctionRef> {
self.functions.read().unwrap().get(name).cloned()
}
pub fn scalar_functions(&self) -> Vec<ScalarFunctionFactory> {
pub fn functions(&self) -> Vec<FunctionRef> {
self.functions.read().unwrap().values().cloned().collect()
}
pub fn aggregate_functions(&self) -> Vec<AggregateUDF> {
pub fn aggregate_functions(&self) -> Vec<AggregateFunctionMetaRef> {
self.aggregate_functions
.read()
.unwrap()
@@ -117,6 +112,9 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
UddSketchCalcFunction::register(&function_registry);
HllCalcFunction::register(&function_registry);
// Aggregate functions
AggregateFunctions::register(&function_registry);
// Full text search function
MatchesFunction::register(&function_registry);
MatchesTermFunction::register(&function_registry);
@@ -129,21 +127,15 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
JsonFunction::register(&function_registry);
// Vector related functions
VectorScalarFunction::register(&function_registry);
VectorAggrFunction::register(&function_registry);
VectorFunction::register(&function_registry);
// Geo functions
#[cfg(feature = "geo")]
crate::scalars::geo::GeoFunctions::register(&function_registry);
#[cfg(feature = "geo")]
crate::aggrs::geo::GeoFunction::register(&function_registry);
// Ip functions
IpFunctions::register(&function_registry);
// Approximate functions
ApproximateFunction::register(&function_registry);
Arc::new(function_registry)
});
@@ -155,11 +147,12 @@ mod tests {
#[test]
fn test_function_registry() {
let registry = FunctionRegistry::default();
let func = Arc::new(TestAndFunction);
assert!(registry.get_function("test_and").is_none());
assert!(registry.scalar_functions().is_empty());
registry.register_scalar(TestAndFunction);
assert!(registry.functions().is_empty());
registry.register(func);
let _ = registry.get_function("test_and").unwrap();
assert_eq!(1, registry.scalar_functions().len());
assert_eq!(1, registry.functions().len());
}
}

View File

@@ -19,14 +19,13 @@ mod adjust_flow;
mod admin;
mod flush_flow;
mod macros;
pub mod scalars;
mod system;
pub mod aggrs;
pub mod aggr;
pub mod function;
pub mod function_factory;
pub mod function_registry;
pub mod handlers;
pub mod helper;
pub mod scalars;
pub mod state;
pub mod utils;

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod aggregate;
pub(crate) mod date;
pub mod expression;
#[cfg(feature = "geo")]

View File

@@ -0,0 +1,89 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! # Deprecate Warning:
//!
//! This module is deprecated and will be removed in the future.
//! All UDAF implementation here are not maintained and should
//! not be used before they are refactored into the `src/aggr`
//! version.
use std::sync::Arc;
use common_query::logical_plan::AggregateFunctionCreatorRef;
use crate::function_registry::FunctionRegistry;
use crate::scalars::vector::product::VectorProductCreator;
use crate::scalars::vector::sum::VectorSumCreator;
/// A function creates `AggregateFunctionCreator`.
/// "Aggregator" *is* AggregatorFunction. Since the later one is long, we named an short alias for it.
/// The two names might be used interchangeably.
type AggregatorCreatorFunction = Arc<dyn Fn() -> AggregateFunctionCreatorRef + Send + Sync>;
/// `AggregateFunctionMeta` dynamically creates AggregateFunctionCreator.
#[derive(Clone)]
pub struct AggregateFunctionMeta {
name: String,
args_count: u8,
creator: AggregatorCreatorFunction,
}
pub type AggregateFunctionMetaRef = Arc<AggregateFunctionMeta>;
impl AggregateFunctionMeta {
pub fn new(name: &str, args_count: u8, creator: AggregatorCreatorFunction) -> Self {
Self {
name: name.to_string(),
args_count,
creator,
}
}
pub fn name(&self) -> String {
self.name.to_string()
}
pub fn args_count(&self) -> u8 {
self.args_count
}
pub fn create(&self) -> AggregateFunctionCreatorRef {
(self.creator)()
}
}
pub(crate) struct AggregateFunctions;
impl AggregateFunctions {
pub fn register(registry: &FunctionRegistry) {
registry.register_aggregate_function(Arc::new(AggregateFunctionMeta::new(
"vec_sum",
1,
Arc::new(|| Arc::new(VectorSumCreator::default())),
)));
registry.register_aggregate_function(Arc::new(AggregateFunctionMeta::new(
"vec_product",
1,
Arc::new(|| Arc::new(VectorProductCreator::default())),
)));
#[cfg(feature = "geo")]
registry.register_aggregate_function(Arc::new(AggregateFunctionMeta::new(
"json_encode_path",
3,
Arc::new(|| Arc::new(super::geo::encoding::JsonPathEncodeFunctionCreator::default())),
)));
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
mod date_add;
mod date_format;
mod date_sub;
@@ -26,8 +27,8 @@ pub(crate) struct DateFunction;
impl DateFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(DateAddFunction);
registry.register_scalar(DateSubFunction);
registry.register_scalar(DateFormatFunction);
registry.register(Arc::new(DateAddFunction));
registry.register(Arc::new(DateSubFunction));
registry.register(Arc::new(DateFormatFunction));
}
}

View File

@@ -17,6 +17,8 @@ mod ctx;
mod is_null;
mod unary;
use std::sync::Arc;
pub use binary::scalar_binary_op;
pub use ctx::EvalContext;
pub use unary::scalar_unary_op;
@@ -28,6 +30,6 @@ pub(crate) struct ExpressionFunction;
impl ExpressionFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(IsNullFunction);
registry.register(Arc::new(IsNullFunction));
}
}

View File

@@ -12,9 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
pub(crate) mod encoding;
mod geohash;
mod h3;
pub(crate) mod helpers;
mod helpers;
mod measure;
mod relation;
mod s2;
@@ -27,57 +29,57 @@ pub(crate) struct GeoFunctions;
impl GeoFunctions {
pub fn register(registry: &FunctionRegistry) {
// geohash
registry.register_scalar(geohash::GeohashFunction);
registry.register_scalar(geohash::GeohashNeighboursFunction);
registry.register(Arc::new(geohash::GeohashFunction));
registry.register(Arc::new(geohash::GeohashNeighboursFunction));
// h3 index
registry.register_scalar(h3::H3LatLngToCell);
registry.register_scalar(h3::H3LatLngToCellString);
registry.register(Arc::new(h3::H3LatLngToCell));
registry.register(Arc::new(h3::H3LatLngToCellString));
// h3 index inspection
registry.register_scalar(h3::H3CellBase);
registry.register_scalar(h3::H3CellIsPentagon);
registry.register_scalar(h3::H3StringToCell);
registry.register_scalar(h3::H3CellToString);
registry.register_scalar(h3::H3CellCenterLatLng);
registry.register_scalar(h3::H3CellResolution);
registry.register(Arc::new(h3::H3CellBase));
registry.register(Arc::new(h3::H3CellIsPentagon));
registry.register(Arc::new(h3::H3StringToCell));
registry.register(Arc::new(h3::H3CellToString));
registry.register(Arc::new(h3::H3CellCenterLatLng));
registry.register(Arc::new(h3::H3CellResolution));
// h3 hierarchical grid
registry.register_scalar(h3::H3CellCenterChild);
registry.register_scalar(h3::H3CellParent);
registry.register_scalar(h3::H3CellToChildren);
registry.register_scalar(h3::H3CellToChildrenSize);
registry.register_scalar(h3::H3CellToChildPos);
registry.register_scalar(h3::H3ChildPosToCell);
registry.register_scalar(h3::H3CellContains);
registry.register(Arc::new(h3::H3CellCenterChild));
registry.register(Arc::new(h3::H3CellParent));
registry.register(Arc::new(h3::H3CellToChildren));
registry.register(Arc::new(h3::H3CellToChildrenSize));
registry.register(Arc::new(h3::H3CellToChildPos));
registry.register(Arc::new(h3::H3ChildPosToCell));
registry.register(Arc::new(h3::H3CellContains));
// h3 grid traversal
registry.register_scalar(h3::H3GridDisk);
registry.register_scalar(h3::H3GridDiskDistances);
registry.register_scalar(h3::H3GridDistance);
registry.register_scalar(h3::H3GridPathCells);
registry.register(Arc::new(h3::H3GridDisk));
registry.register(Arc::new(h3::H3GridDiskDistances));
registry.register(Arc::new(h3::H3GridDistance));
registry.register(Arc::new(h3::H3GridPathCells));
// h3 measurement
registry.register_scalar(h3::H3CellDistanceSphereKm);
registry.register_scalar(h3::H3CellDistanceEuclideanDegree);
registry.register(Arc::new(h3::H3CellDistanceSphereKm));
registry.register(Arc::new(h3::H3CellDistanceEuclideanDegree));
// s2
registry.register_scalar(s2::S2LatLngToCell);
registry.register_scalar(s2::S2CellLevel);
registry.register_scalar(s2::S2CellToToken);
registry.register_scalar(s2::S2CellParent);
registry.register(Arc::new(s2::S2LatLngToCell));
registry.register(Arc::new(s2::S2CellLevel));
registry.register(Arc::new(s2::S2CellToToken));
registry.register(Arc::new(s2::S2CellParent));
// spatial data type
registry.register_scalar(wkt::LatLngToPointWkt);
registry.register(Arc::new(wkt::LatLngToPointWkt));
// spatial relation
registry.register_scalar(relation::STContains);
registry.register_scalar(relation::STWithin);
registry.register_scalar(relation::STIntersects);
registry.register(Arc::new(relation::STContains));
registry.register(Arc::new(relation::STWithin));
registry.register(Arc::new(relation::STIntersects));
// spatial measure
registry.register_scalar(measure::STDistance);
registry.register_scalar(measure::STDistanceSphere);
registry.register_scalar(measure::STArea);
registry.register(Arc::new(measure::STDistance));
registry.register(Arc::new(measure::STDistanceSphere));
registry.register(Arc::new(measure::STArea));
}
}

View File

@@ -19,12 +19,9 @@ use common_error::status_code::StatusCode;
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{self, InvalidInputStateSnafu, Result};
use common_query::logical_plan::accumulator::AggrFuncTypeStore;
use common_query::logical_plan::{
create_aggregate_function, Accumulator, AggregateFunctionCreator,
};
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
use common_query::prelude::AccumulatorCreatorFunction;
use common_time::Timestamp;
use datafusion_expr::AggregateUDF;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::{ListValue, Value};
use datatypes::vectors::VectorRef;
@@ -50,16 +47,6 @@ impl JsonPathAccumulator {
timestamp_type,
}
}
/// Create a new `AggregateUDF` for the `json_encode_path` aggregate function.
pub fn uadf_impl() -> AggregateUDF {
create_aggregate_function(
"json_encode_path".to_string(),
3,
Arc::new(JsonPathEncodeFunctionCreator::default()),
)
.into()
}
}
impl Accumulator for JsonPathAccumulator {

View File

@@ -37,7 +37,7 @@ macro_rules! ensure_columns_len {
};
}
pub(crate) use ensure_columns_len;
pub(super) use ensure_columns_len;
macro_rules! ensure_columns_n {
($columns:ident, $n:literal) => {
@@ -58,7 +58,7 @@ macro_rules! ensure_columns_n {
};
}
pub(crate) use ensure_columns_n;
pub(super) use ensure_columns_n;
macro_rules! ensure_and_coerce {
($compare:expr, $coerce:expr) => {{
@@ -72,4 +72,4 @@ macro_rules! ensure_and_coerce {
}};
}
pub(crate) use ensure_and_coerce;
pub(super) use ensure_and_coerce;

View File

@@ -16,6 +16,7 @@
use std::fmt;
use std::fmt::Display;
use std::sync::Arc;
use common_query::error::{DowncastVectorSnafu, InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, Volatility};
@@ -26,7 +27,7 @@ use datatypes::vectors::{BinaryVector, MutableVector, UInt64VectorBuilder, Vecto
use hyperloglogplus::HyperLogLog;
use snafu::OptionExt;
use crate::aggrs::approximate::hll::HllStateType;
use crate::aggr::HllStateType;
use crate::function::{Function, FunctionContext};
use crate::function_registry::FunctionRegistry;
@@ -43,7 +44,7 @@ pub struct HllCalcFunction;
impl HllCalcFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(HllCalcFunction);
registry.register(Arc::new(HllCalcFunction));
}
}
@@ -116,8 +117,6 @@ impl Function for HllCalcFunction {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use datatypes::vectors::BinaryVector;
use super::*;

View File

@@ -17,6 +17,8 @@ mod ipv4;
mod ipv6;
mod range;
use std::sync::Arc;
use cidr::{Ipv4ToCidr, Ipv6ToCidr};
use ipv4::{Ipv4NumToString, Ipv4StringToNum};
use ipv6::{Ipv6NumToString, Ipv6StringToNum};
@@ -29,15 +31,15 @@ pub(crate) struct IpFunctions;
impl IpFunctions {
pub fn register(registry: &FunctionRegistry) {
// Register IPv4 functions
registry.register_scalar(Ipv4NumToString);
registry.register_scalar(Ipv4StringToNum);
registry.register_scalar(Ipv4ToCidr);
registry.register_scalar(Ipv4InRange);
registry.register(Arc::new(Ipv4NumToString));
registry.register(Arc::new(Ipv4StringToNum));
registry.register(Arc::new(Ipv4ToCidr));
registry.register(Arc::new(Ipv4InRange));
// Register IPv6 functions
registry.register_scalar(Ipv6NumToString);
registry.register_scalar(Ipv6StringToNum);
registry.register_scalar(Ipv6ToCidr);
registry.register_scalar(Ipv6InRange);
registry.register(Arc::new(Ipv6NumToString));
registry.register(Arc::new(Ipv6StringToNum));
registry.register(Arc::new(Ipv6ToCidr));
registry.register(Arc::new(Ipv6InRange));
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
pub mod json_get;
mod json_is;
mod json_path_exists;
@@ -32,23 +33,23 @@ pub(crate) struct JsonFunction;
impl JsonFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(JsonToStringFunction);
registry.register_scalar(ParseJsonFunction);
registry.register(Arc::new(JsonToStringFunction));
registry.register(Arc::new(ParseJsonFunction));
registry.register_scalar(JsonGetInt);
registry.register_scalar(JsonGetFloat);
registry.register_scalar(JsonGetString);
registry.register_scalar(JsonGetBool);
registry.register(Arc::new(JsonGetInt));
registry.register(Arc::new(JsonGetFloat));
registry.register(Arc::new(JsonGetString));
registry.register(Arc::new(JsonGetBool));
registry.register_scalar(JsonIsNull);
registry.register_scalar(JsonIsInt);
registry.register_scalar(JsonIsFloat);
registry.register_scalar(JsonIsString);
registry.register_scalar(JsonIsBool);
registry.register_scalar(JsonIsArray);
registry.register_scalar(JsonIsObject);
registry.register(Arc::new(JsonIsNull));
registry.register(Arc::new(JsonIsInt));
registry.register(Arc::new(JsonIsFloat));
registry.register(Arc::new(JsonIsString));
registry.register(Arc::new(JsonIsBool));
registry.register(Arc::new(JsonIsArray));
registry.register(Arc::new(JsonIsObject));
registry.register_scalar(json_path_exists::JsonPathExistsFunction);
registry.register_scalar(json_path_match::JsonPathMatchFunction);
registry.register(Arc::new(json_path_exists::JsonPathExistsFunction));
registry.register(Arc::new(json_path_match::JsonPathMatchFunction));
}
}

View File

@@ -38,11 +38,11 @@ use crate::function_registry::FunctionRegistry;
///
/// Usage: matches(`<col>`, `<pattern>`) -> boolean
#[derive(Clone, Debug, Default)]
pub struct MatchesFunction;
pub(crate) struct MatchesFunction;
impl MatchesFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(MatchesFunction);
registry.register(Arc::new(MatchesFunction));
}
}

View File

@@ -77,7 +77,7 @@ pub struct MatchesTermFunction;
impl MatchesTermFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(MatchesTermFunction);
registry.register(Arc::new(MatchesTermFunction));
}
}

View File

@@ -18,6 +18,7 @@ mod pow;
mod rate;
use std::fmt;
use std::sync::Arc;
pub use clamp::{ClampFunction, ClampMaxFunction, ClampMinFunction};
use common_query::error::{GeneralDataFusionSnafu, Result};
@@ -38,13 +39,13 @@ pub(crate) struct MathFunction;
impl MathFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(ModuloFunction);
registry.register_scalar(PowFunction);
registry.register_scalar(RateFunction);
registry.register_scalar(RangeFunction);
registry.register_scalar(ClampFunction);
registry.register_scalar(ClampMinFunction);
registry.register_scalar(ClampMaxFunction);
registry.register(Arc::new(ModuloFunction));
registry.register(Arc::new(PowFunction));
registry.register(Arc::new(RateFunction));
registry.register(Arc::new(RangeFunction));
registry.register(Arc::new(ClampFunction));
registry.register(Arc::new(ClampMinFunction));
registry.register(Arc::new(ClampMaxFunction));
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
mod to_unixtime;
use to_unixtime::ToUnixtimeFunction;
@@ -22,6 +23,6 @@ pub(crate) struct TimestampFunction;
impl TimestampFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(ToUnixtimeFunction);
registry.register(Arc::new(ToUnixtimeFunction));
}
}

View File

@@ -16,6 +16,7 @@
use std::fmt;
use std::fmt::Display;
use std::sync::Arc;
use common_query::error::{DowncastVectorSnafu, InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, Volatility};
@@ -43,7 +44,7 @@ pub struct UddSketchCalcFunction;
impl UddSketchCalcFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(UddSketchCalcFunction);
registry.register(Arc::new(UddSketchCalcFunction));
}
}

View File

@@ -17,8 +17,10 @@ mod distance;
mod elem_product;
mod elem_sum;
pub mod impl_conv;
pub(crate) mod product;
mod scalar_add;
mod scalar_mul;
pub(crate) mod sum;
mod vector_add;
mod vector_dim;
mod vector_div;
@@ -28,34 +30,37 @@ mod vector_norm;
mod vector_sub;
mod vector_subvector;
use std::sync::Arc;
use crate::function_registry::FunctionRegistry;
pub(crate) struct VectorFunction;
impl VectorFunction {
pub fn register(registry: &FunctionRegistry) {
// conversion
registry.register_scalar(convert::ParseVectorFunction);
registry.register_scalar(convert::VectorToStringFunction);
registry.register(Arc::new(convert::ParseVectorFunction));
registry.register(Arc::new(convert::VectorToStringFunction));
// distance
registry.register_scalar(distance::CosDistanceFunction);
registry.register_scalar(distance::DotProductFunction);
registry.register_scalar(distance::L2SqDistanceFunction);
registry.register(Arc::new(distance::CosDistanceFunction));
registry.register(Arc::new(distance::DotProductFunction));
registry.register(Arc::new(distance::L2SqDistanceFunction));
// scalar calculation
registry.register_scalar(scalar_add::ScalarAddFunction);
registry.register_scalar(scalar_mul::ScalarMulFunction);
registry.register(Arc::new(scalar_add::ScalarAddFunction));
registry.register(Arc::new(scalar_mul::ScalarMulFunction));
// vector calculation
registry.register_scalar(vector_add::VectorAddFunction);
registry.register_scalar(vector_sub::VectorSubFunction);
registry.register_scalar(vector_mul::VectorMulFunction);
registry.register_scalar(vector_div::VectorDivFunction);
registry.register_scalar(vector_norm::VectorNormFunction);
registry.register_scalar(vector_dim::VectorDimFunction);
registry.register_scalar(vector_kth_elem::VectorKthElemFunction);
registry.register_scalar(vector_subvector::VectorSubvectorFunction);
registry.register_scalar(elem_sum::ElemSumFunction);
registry.register_scalar(elem_product::ElemProductFunction);
registry.register(Arc::new(vector_add::VectorAddFunction));
registry.register(Arc::new(vector_sub::VectorSubFunction));
registry.register(Arc::new(vector_mul::VectorMulFunction));
registry.register(Arc::new(vector_div::VectorDivFunction));
registry.register(Arc::new(vector_norm::VectorNormFunction));
registry.register(Arc::new(vector_dim::VectorDimFunction));
registry.register(Arc::new(vector_kth_elem::VectorKthElemFunction));
registry.register(Arc::new(vector_subvector::VectorSubvectorFunction));
registry.register(Arc::new(elem_sum::ElemSumFunction));
registry.register(Arc::new(elem_product::ElemProductFunction));
}
}

View File

@@ -16,11 +16,8 @@ use std::sync::Arc;
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{CreateAccumulatorSnafu, Error, InvalidFuncArgsSnafu};
use common_query::logical_plan::{
create_aggregate_function, Accumulator, AggregateFunctionCreator,
};
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
use common_query::prelude::AccumulatorCreatorFunction;
use datafusion_expr::AggregateUDF;
use datatypes::prelude::{ConcreteDataType, Value, *};
use datatypes::vectors::VectorRef;
use nalgebra::{Const, DVectorView, Dyn, OVector};
@@ -78,16 +75,6 @@ impl AggregateFunctionCreator for VectorProductCreator {
}
impl VectorProduct {
/// Create a new `AggregateUDF` for the `vec_product` aggregate function.
pub fn uadf_impl() -> AggregateUDF {
create_aggregate_function(
"vec_product".to_string(),
1,
Arc::new(VectorProductCreator::default()),
)
.into()
}
fn inner(&mut self, len: usize) -> &mut OVector<f32, Dyn> {
self.product.get_or_insert_with(|| {
OVector::from_iterator_generic(Dyn(len), Const::<1>, (0..len).map(|_| 1.0))

View File

@@ -16,11 +16,8 @@ use std::sync::Arc;
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{CreateAccumulatorSnafu, Error, InvalidFuncArgsSnafu};
use common_query::logical_plan::{
create_aggregate_function, Accumulator, AggregateFunctionCreator,
};
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
use common_query::prelude::AccumulatorCreatorFunction;
use datafusion_expr::AggregateUDF;
use datatypes::prelude::{ConcreteDataType, Value, *};
use datatypes::vectors::VectorRef;
use nalgebra::{Const, DVectorView, Dyn, OVector};
@@ -28,7 +25,6 @@ use snafu::ensure;
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const, veclit_to_binlit};
/// The accumulator for the `vec_sum` aggregate function.
#[derive(Debug, Default)]
pub struct VectorSum {
sum: Option<OVector<f32, Dyn>>,
@@ -78,16 +74,6 @@ impl AggregateFunctionCreator for VectorSumCreator {
}
impl VectorSum {
/// Create a new `AggregateUDF` for the `vec_sum` aggregate function.
pub fn uadf_impl() -> AggregateUDF {
create_aggregate_function(
"vec_sum".to_string(),
1,
Arc::new(VectorSumCreator::default()),
)
.into()
}
fn inner(&mut self, len: usize) -> &mut OVector<f32, Dyn> {
self.sum
.get_or_insert_with(|| OVector::zeros_generic(Dyn(len), Const::<1>))

View File

@@ -36,13 +36,13 @@ pub(crate) struct SystemFunction;
impl SystemFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(BuildFunction);
registry.register_scalar(VersionFunction);
registry.register_scalar(CurrentSchemaFunction);
registry.register_scalar(DatabaseFunction);
registry.register_scalar(SessionUserFunction);
registry.register_scalar(ReadPreferenceFunction);
registry.register_scalar(TimezoneFunction);
registry.register(Arc::new(BuildFunction));
registry.register(Arc::new(VersionFunction));
registry.register(Arc::new(CurrentSchemaFunction));
registry.register(Arc::new(DatabaseFunction));
registry.register(Arc::new(SessionUserFunction));
registry.register(Arc::new(ReadPreferenceFunction));
registry.register(Arc::new(TimezoneFunction));
registry.register_async(Arc::new(ProcedureStateFunction));
PGCatalogFunction::register(registry);
}

View File

@@ -16,6 +16,8 @@ mod pg_get_userbyid;
mod table_is_visible;
mod version;
use std::sync::Arc;
use pg_get_userbyid::PGGetUserByIdFunction;
use table_is_visible::PGTableIsVisibleFunction;
use version::PGVersionFunction;
@@ -33,8 +35,8 @@ pub(super) struct PGCatalogFunction;
impl PGCatalogFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(PGTableIsVisibleFunction);
registry.register_scalar(PGGetUserByIdFunction);
registry.register_scalar(PGVersionFunction);
registry.register(Arc::new(PGTableIsVisibleFunction));
registry.register(Arc::new(PGGetUserByIdFunction));
registry.register(Arc::new(PGVersionFunction));
}
}

View File

@@ -64,7 +64,6 @@ impl Default for FlightEncoder {
}
impl FlightEncoder {
/// Creates new [FlightEncoder] with compression disabled.
pub fn with_compression_disabled() -> Self {
let write_options = writer::IpcWriteOptions::default()
.try_with_compression(None)

View File

@@ -35,9 +35,6 @@ pub const FLOWNODE_LEASE_SECS: u64 = DATANODE_LEASE_SECS;
/// The lease seconds of metasrv leader.
pub const META_LEASE_SECS: u64 = 5;
/// The keep-alive interval of the Postgres connection.
pub const POSTGRES_KEEP_ALIVE_SECS: u64 = 30;
/// In a lease, there are two opportunities for renewal.
pub const META_KEEP_ALIVE_INTERVAL_SECS: u64 = META_LEASE_SECS / 2;

View File

@@ -372,7 +372,6 @@ impl DatanodeBuilder {
opts.max_concurrent_queries,
//TODO: revaluate the hardcoded timeout on the next version of datanode concurrency limiter.
Duration::from_millis(100),
opts.grpc.flight_compression,
);
let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;

View File

@@ -50,7 +50,6 @@ use query::QueryEngineRef;
use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
use servers::grpc::region_server::RegionServerHandler;
use servers::grpc::FlightCompression;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metric_engine_consts::{
@@ -81,7 +80,6 @@ use crate::event_listener::RegionServerEventListenerRef;
#[derive(Clone)]
pub struct RegionServer {
inner: Arc<RegionServerInner>,
flight_compression: FlightCompression,
}
pub struct RegionStat {
@@ -95,7 +93,6 @@ impl RegionServer {
query_engine: QueryEngineRef,
runtime: Runtime,
event_listener: RegionServerEventListenerRef,
flight_compression: FlightCompression,
) -> Self {
Self::with_table_provider(
query_engine,
@@ -104,7 +101,6 @@ impl RegionServer {
Arc::new(DummyTableProviderFactory),
0,
Duration::from_millis(0),
flight_compression,
)
}
@@ -115,7 +111,6 @@ impl RegionServer {
table_provider_factory: TableProviderFactoryRef,
max_concurrent_queries: usize,
concurrent_query_limiter_timeout: Duration,
flight_compression: FlightCompression,
) -> Self {
Self {
inner: Arc::new(RegionServerInner::new(
@@ -128,7 +123,6 @@ impl RegionServer {
concurrent_query_limiter_timeout,
),
)),
flight_compression,
}
}
@@ -542,11 +536,7 @@ impl FlightCraft for RegionServer {
.trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
.await?;
let stream = Box::pin(FlightRecordBatchStream::new(
result,
tracing_context,
self.flight_compression,
));
let stream = Box::pin(FlightRecordBatchStream::new(result, tracing_context));
Ok(Response::new(stream))
}
}

View File

@@ -19,16 +19,16 @@ use std::time::Duration;
use api::region::RegionResponse;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_function::function_factory::ScalarFunctionFactory;
use common_function::function::FunctionRef;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_query::Output;
use common_runtime::runtime::{BuilderBuild, RuntimeTrait};
use common_runtime::Runtime;
use datafusion_expr::{AggregateUDF, LogicalPlan};
use datafusion_expr::LogicalPlan;
use query::dataframe::DataFrame;
use query::planner::LogicalPlanner;
use query::query_engine::{DescribeResult, QueryEngineState};
use query::{QueryEngine, QueryEngineContext};
use servers::grpc::FlightCompression;
use session::context::QueryContextRef;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
@@ -76,9 +76,9 @@ impl QueryEngine for MockQueryEngine {
unimplemented!()
}
fn register_aggregate_function(&self, _func: AggregateUDF) {}
fn register_aggregate_function(&self, _func: AggregateFunctionMetaRef) {}
fn register_scalar_function(&self, _func: ScalarFunctionFactory) {}
fn register_function(&self, _func: FunctionRef) {}
fn read_table(&self, _table: TableRef) -> query::error::Result<DataFrame> {
unimplemented!()
@@ -98,7 +98,6 @@ pub fn mock_region_server() -> RegionServer {
Arc::new(MockQueryEngine),
Runtime::builder().build().unwrap(),
Box::new(NoopRegionServerEventListener),
FlightCompression::default(),
)
}

View File

@@ -286,7 +286,7 @@ impl FrontendClient {
/// Get the frontend with recent enough(less than 1 minute from now) `last_activity_ts`
/// and is able to process query
pub(crate) async fn get_random_active_frontend(
async fn get_random_active_frontend(
&self,
catalog: &str,
schema: &str,
@@ -382,7 +382,7 @@ impl FrontendClient {
}),
catalog,
schema,
None,
&mut None,
task,
)
.await
@@ -394,28 +394,16 @@ impl FrontendClient {
req: api::v1::greptime_request::Request,
catalog: &str,
schema: &str,
use_peer: Option<Peer>,
peer_desc: &mut Option<PeerDesc>,
task: Option<&BatchingTask>,
) -> Result<u32, Error> {
match self {
FrontendClient::Distributed {
fe_stats, chnl_mgr, ..
} => {
let db = if let Some(peer) = use_peer {
DatabaseWithPeer::new(
Database::new(
catalog,
schema,
Client::with_manager_and_urls(
chnl_mgr.clone(),
vec![peer.addr.clone()],
),
),
peer,
)
} else {
self.get_random_active_frontend(catalog, schema).await?
};
FrontendClient::Distributed { fe_stats, .. } => {
let db = self.get_random_active_frontend(catalog, schema).await?;
*peer_desc = Some(PeerDesc::Dist {
peer: db.peer.clone(),
});
let flow_id = task.map(|t| t.config.flow_id).unwrap_or_default();
let _guard = fe_stats.observe(&db.peer.addr, flow_id);

View File

@@ -53,8 +53,6 @@ pub struct TaskState {
pub(crate) shutdown_rx: oneshot::Receiver<()>,
/// Task handle
pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
/// Slow Query metrics update task handle
pub(crate) slow_query_metric_task: Option<tokio::task::JoinHandle<()>>,
/// min run interval in seconds
pub(crate) min_run_interval: Option<u64>,
@@ -71,7 +69,6 @@ impl TaskState {
exec_state: ExecState::Idle,
shutdown_rx,
task_handle: None,
slow_query_metric_task: None,
min_run_interval: None,
max_filter_num: None,
}
@@ -214,69 +211,51 @@ impl DirtyTimeWindows {
// get the first `window_cnt` time windows
let max_time_range = window_size * window_cnt as i32;
let mut to_be_query = BTreeMap::new();
let mut new_windows = self.windows.clone();
let mut cur_time_range = chrono::Duration::zero();
for (idx, (start, end)) in self.windows.iter().enumerate() {
let first_end = start
.add_duration(window_size.to_std().unwrap())
.context(TimeSnafu)?;
let end = end.unwrap_or(first_end);
// if time range is too long, stop
if cur_time_range >= max_time_range {
break;
}
// if we have enough time windows, stop
if idx >= window_cnt {
break;
}
if let Some(x) = end.sub(start) {
if cur_time_range + x <= max_time_range {
to_be_query.insert(*start, Some(end));
new_windows.remove(start);
cur_time_range += x;
} else {
// too large a window, split it
// split at window_size * times
let surplus = max_time_range - cur_time_range;
let times = surplus.num_seconds() / window_size.num_seconds();
let split_offset = window_size * times as i32;
let split_at = start
.add_duration(split_offset.to_std().unwrap())
.context(TimeSnafu)?;
to_be_query.insert(*start, Some(split_at));
// remove the original window
new_windows.remove(start);
new_windows.insert(split_at, Some(end));
cur_time_range += split_offset;
let nth = {
let mut cur_time_range = chrono::Duration::zero();
let mut nth_key = None;
for (idx, (start, end)) in self.windows.iter().enumerate() {
// if time range is too long, stop
if cur_time_range > max_time_range {
nth_key = Some(*start);
break;
}
}
}
self.windows = new_windows;
// if we have enough time windows, stop
if idx >= window_cnt {
nth_key = Some(*start);
break;
}
if let Some(end) = end {
if let Some(x) = end.sub(start) {
cur_time_range += x;
}
}
}
nth_key
};
let first_nth = {
if let Some(nth) = nth {
let mut after = self.windows.split_off(&nth);
std::mem::swap(&mut self.windows, &mut after);
after
} else {
std::mem::take(&mut self.windows)
}
};
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
.with_label_values(&[
flow_id.to_string().as_str(),
format!("{}", window_size).as_str(),
])
.observe(to_be_query.len() as f64);
.with_label_values(&[flow_id.to_string().as_str()])
.observe(first_nth.len() as f64);
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT
.with_label_values(&[
flow_id.to_string().as_str(),
format!("{}", window_size).as_str(),
])
.with_label_values(&[flow_id.to_string().as_str()])
.observe(self.windows.len() as f64);
let full_time_range = to_be_query
let full_time_range = first_nth
.iter()
.fold(chrono::Duration::zero(), |acc, (start, end)| {
if let Some(end) = end {
@@ -287,14 +266,11 @@ impl DirtyTimeWindows {
})
.num_seconds() as f64;
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE
.with_label_values(&[
flow_id.to_string().as_str(),
format!("{}", window_size).as_str(),
])
.with_label_values(&[flow_id.to_string().as_str()])
.observe(full_time_range);
let mut expr_lst = vec![];
for (start, end) in to_be_query.into_iter() {
for (start, end) in first_nth.into_iter() {
// align using time window exprs
let (start, end) = if let Some(ctx) = task_ctx {
let Some(time_window_expr) = &ctx.config.time_window_expr else {
@@ -528,64 +504,6 @@ mod test {
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
)
),
// split range
(
Vec::from_iter((0..20).map(|i|Timestamp::new_second(i*3)).chain(std::iter::once(
Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
))),
(chrono::Duration::seconds(3), None),
BTreeMap::from([
(
Timestamp::new_second(0),
Some(Timestamp::new_second(
60
)),
),
(
Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
Some(Timestamp::new_second(
60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1) + 3
)),
)]),
Some(
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
)
),
// split 2 min into 1 min
(
Vec::from_iter((0..40).map(|i|Timestamp::new_second(i*3))),
(chrono::Duration::seconds(3), None),
BTreeMap::from([
(
Timestamp::new_second(0),
Some(Timestamp::new_second(
40 * 3
)),
)]),
Some(
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
)
),
// split 3s + 1min into 3s + 57s
(
Vec::from_iter(std::iter::once(Timestamp::new_second(0)).chain((0..40).map(|i|Timestamp::new_second(20+i*3)))),
(chrono::Duration::seconds(3), None),
BTreeMap::from([
(
Timestamp::new_second(0),
Some(Timestamp::new_second(
3
)),
),(
Timestamp::new_second(20),
Some(Timestamp::new_second(
140
)),
)]),
Some(
"(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:03' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:00:20' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:17' AS TIMESTAMP))))",
)
),
// expired
(
vec![
@@ -602,8 +520,6 @@ mod test {
None
),
];
// let len = testcases.len();
// let testcases = testcases[(len - 2)..(len - 1)].to_vec();
for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
testcases
{

View File

@@ -61,8 +61,7 @@ use crate::error::{
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
};
use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT,
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
};
use crate::{Error, FlowId};
@@ -82,14 +81,6 @@ pub struct TaskConfig {
query_type: QueryType,
}
impl TaskConfig {
pub fn time_window_size(&self) -> Option<Duration> {
self.time_window_expr
.as_ref()
.and_then(|expr| *expr.time_window_size())
}
}
fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
let stmts =
ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default())
@@ -343,53 +334,11 @@ impl BatchingTask {
})?;
let plan = expanded_plan;
let db = frontend_client
.get_random_active_frontend(catalog, schema)
.await?;
let peer_desc = db.peer.clone();
let (tx, mut rx) = oneshot::channel();
let peer_inner = peer_desc.clone();
let window_size_pretty = format!(
"{}s",
self.config.time_window_size().unwrap_or_default().as_secs()
);
let inner_window_size_pretty = window_size_pretty.clone();
let flow_id = self.config.flow_id;
let slow_query_metric_task = tokio::task::spawn(async move {
tokio::time::sleep(SLOW_QUERY_THRESHOLD).await;
METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT
.with_label_values(&[
flow_id.to_string().as_str(),
&peer_inner.to_string(),
inner_window_size_pretty.as_str(),
])
.add(1.0);
while rx.try_recv() == Err(TryRecvError::Empty) {
// sleep for a while before next update
tokio::time::sleep(MIN_REFRESH_DURATION).await;
}
METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT
.with_label_values(&[
flow_id.to_string().as_str(),
&peer_inner.to_string(),
inner_window_size_pretty.as_str(),
])
.sub(1.0);
});
self.state.write().unwrap().slow_query_metric_task = Some(slow_query_metric_task);
let mut peer_desc = None;
let res = {
let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
.with_label_values(&[
flow_id.to_string().as_str(),
format!(
"{}s",
self.config.time_window_size().unwrap_or_default().as_secs()
)
.as_str(),
])
.with_label_values(&[flow_id.to_string().as_str()])
.start_timer();
// hack and special handling the insert logical plan
@@ -418,12 +367,10 @@ impl BatchingTask {
};
frontend_client
.handle(req, catalog, schema, Some(db.peer), Some(self))
.handle(req, catalog, schema, &mut peer_desc, Some(self))
.await
};
// signaling the slow query metric task to stop
let _ = tx.send(());
let elapsed = instant.elapsed();
if let Ok(affected_rows) = &res {
debug!(
@@ -446,12 +393,7 @@ impl BatchingTask {
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
.with_label_values(&[
flow_id.to_string().as_str(),
&peer_desc.to_string(),
format!(
"{}s",
self.config.time_window_size().unwrap_or_default().as_secs()
)
.as_str(),
&peer_desc.unwrap_or_default().to_string(),
])
.observe(elapsed.as_secs_f64());
}

View File

@@ -31,29 +31,22 @@ lazy_static! {
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME: HistogramVec = register_histogram_vec!(
"greptime_flow_batching_engine_query_time_secs",
"flow batching engine query time(seconds)",
&["flow_id", "time_window_granularity"],
&["flow_id"],
vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!(
"greptime_flow_batching_engine_slow_query_secs",
"flow batching engine slow query(seconds), updated after query finished",
&["flow_id", "peer", "time_window_granularity"],
"flow batching engine slow query(seconds)",
&["flow_id", "peer"],
vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT: GaugeVec =
register_gauge_vec!(
"greptime_flow_batching_engine_real_time_slow_query_number",
"flow batching engine real time slow query number, updated in real time",
&["flow_id", "peer", "time_window_granularity"],
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT: HistogramVec =
register_histogram_vec!(
"greptime_flow_batching_engine_stalled_query_window_cnt",
"flow batching engine stalled query time window count",
&["flow_id", "time_window_granularity"],
&["flow_id"],
vec![0.0, 5., 10., 20., 40.]
)
.unwrap();
@@ -61,7 +54,7 @@ lazy_static! {
register_histogram_vec!(
"greptime_flow_batching_engine_query_window_cnt",
"flow batching engine query time window count",
&["flow_id", "time_window_granularity"],
&["flow_id"],
vec![0.0, 5., 10., 20., 40.]
)
.unwrap();
@@ -69,7 +62,7 @@ lazy_static! {
register_histogram_vec!(
"greptime_flow_batching_engine_query_time_range_secs",
"flow batching engine query time range(seconds)",
&["flow_id", "time_window_granularity"],
&["flow_id"],
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
)
.unwrap();

View File

@@ -17,7 +17,7 @@ use std::collections::BTreeMap;
use std::sync::Arc;
use common_error::ext::BoxedError;
use common_function::function::{FunctionContext, FunctionRef};
use common_function::function::FunctionContext;
use datafusion_substrait::extensions::Extensions;
use datatypes::data_type::ConcreteDataType as CDT;
use query::QueryEngine;
@@ -108,13 +108,9 @@ impl FunctionExtensions {
/// register flow-specific functions to the query engine
pub fn register_function_to_query_engine(engine: &Arc<dyn QueryEngine>) {
let tumble_fn = Arc::new(TumbleFunction::new("tumble")) as FunctionRef;
let tumble_start_fn = Arc::new(TumbleFunction::new(TUMBLE_START)) as FunctionRef;
let tumble_end_fn = Arc::new(TumbleFunction::new(TUMBLE_END)) as FunctionRef;
engine.register_scalar_function(tumble_fn.into());
engine.register_scalar_function(tumble_start_fn.into());
engine.register_scalar_function(tumble_end_fn.into());
engine.register_function(Arc::new(TumbleFunction::new("tumble")));
engine.register_function(Arc::new(TumbleFunction::new(TUMBLE_START)));
engine.register_function(Arc::new(TumbleFunction::new(TUMBLE_END)));
}
#[derive(Debug)]

View File

@@ -130,13 +130,7 @@ impl JaegerQueryHandler for Instance {
.await?)
}
async fn get_trace(
&self,
ctx: QueryContextRef,
trace_id: &str,
start_time: Option<i64>,
end_time: Option<i64>,
) -> ServerResult<Output> {
async fn get_trace(&self, ctx: QueryContextRef, trace_id: &str) -> ServerResult<Output> {
// It's equivalent to
//
// ```
@@ -145,25 +139,13 @@ impl JaegerQueryHandler for Instance {
// FROM
// {db}.{trace_table}
// WHERE
// trace_id = '{trace_id}' AND
// timestamp >= {start_time} AND
// timestamp <= {end_time}
// trace_id = '{trace_id}'
// ORDER BY
// timestamp DESC
// ```.
let selects = vec![wildcard()];
let mut filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
if let Some(start_time) = start_time {
// Microseconds to nanoseconds.
filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time * 1_000)));
}
if let Some(end_time) = end_time {
// Microseconds to nanoseconds.
filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time * 1_000)));
}
let filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
Ok(query_trace_table(
ctx,

View File

@@ -154,7 +154,6 @@ where
ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()),
user_provider.clone(),
runtime,
opts.grpc.flight_compression,
);
let grpc_server = builder

View File

@@ -31,6 +31,8 @@ use common_meta::kv_backend::rds::MySqlStore;
#[cfg(feature = "pg_kvbackend")]
use common_meta::kv_backend::rds::PgStore;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
#[cfg(feature = "pg_kvbackend")]
use common_telemetry::error;
use common_telemetry::info;
#[cfg(feature = "pg_kvbackend")]
use deadpool_postgres::{Config, Runtime};
@@ -142,8 +144,7 @@ impl MetasrvInstance {
let (serve_state_tx, serve_state_rx) = oneshot::channel();
let socket_addr =
bootstrap_metasrv_with_router(&self.opts.grpc.bind_addr, router, serve_state_tx, rx)
.await?;
bootstrap_metasrv_with_router(&self.opts.bind_addr, router, serve_state_tx, rx).await?;
self.bind_addr = Some(socket_addr);
let addr = self.opts.http.addr.parse().context(error::ParseAddrSnafu {
@@ -259,7 +260,7 @@ pub async fn metasrv_builder(
let etcd_client = create_etcd_client(&opts.store_addrs).await?;
let kv_backend = EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops);
let election = EtcdElection::with_etcd_client(
&opts.grpc.server_addr,
&opts.server_addr,
etcd_client,
opts.store_key_prefix.clone(),
)
@@ -269,41 +270,22 @@ pub async fn metasrv_builder(
}
#[cfg(feature = "pg_kvbackend")]
(None, BackendImpl::PostgresStore) => {
use std::time::Duration;
use common_meta::distributed_time_constants::POSTGRES_KEEP_ALIVE_SECS;
use crate::election::rds::postgres::ElectionPgClient;
let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS);
let execution_timeout = Duration::from_secs(META_LEASE_SECS);
let statement_timeout = Duration::from_secs(META_LEASE_SECS);
let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS);
let mut cfg = Config::new();
cfg.keepalives = Some(true);
cfg.keepalives_idle = Some(Duration::from_secs(POSTGRES_KEEP_ALIVE_SECS));
// We use a separate pool for election since we need a different session keep-alive idle time.
let pool = create_postgres_pool_with(&opts.store_addrs, cfg).await?;
let election_client =
ElectionPgClient::new(pool, execution_timeout, meta_lease_ttl, statement_timeout)?;
let election = PgElection::with_pg_client(
opts.grpc.server_addr.clone(),
election_client,
opts.store_key_prefix.clone(),
candidate_lease_ttl,
meta_lease_ttl,
&opts.meta_table_name,
opts.meta_election_lock_id,
)
.await?;
let pool = create_postgres_pool(&opts.store_addrs).await?;
let kv_backend = PgStore::with_pg_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
.await
.context(error::KvBackendSnafu)?;
// Client for election should be created separately since we need a different session keep-alive idle time.
let election_client = create_postgres_client(opts).await?;
let election = PgElection::with_pg_client(
opts.server_addr.clone(),
election_client,
opts.store_key_prefix.clone(),
CANDIDATE_LEASE_SECS,
META_LEASE_SECS,
&opts.meta_table_name,
opts.meta_election_lock_id,
)
.await?;
(kv_backend, Some(election))
}
#[cfg(feature = "mysql_kvbackend")]
@@ -317,7 +299,7 @@ pub async fn metasrv_builder(
let election_table_name = opts.meta_table_name.clone() + "_election";
let election_client = create_mysql_client(opts).await?;
let election = MySqlElection::with_mysql_client(
opts.grpc.server_addr.clone(),
opts.server_addr.clone(),
election_client,
opts.store_key_prefix.clone(),
CANDIDATE_LEASE_SECS,
@@ -390,24 +372,31 @@ pub async fn create_etcd_client(store_addrs: &[String]) -> Result<Client> {
}
#[cfg(feature = "pg_kvbackend")]
/// Creates a pool for the Postgres backend.
///
/// It only use first store addr to create a pool.
pub async fn create_postgres_pool(store_addrs: &[String]) -> Result<deadpool_postgres::Pool> {
create_postgres_pool_with(store_addrs, Config::new()).await
async fn create_postgres_client(opts: &MetasrvOptions) -> Result<tokio_postgres::Client> {
let postgres_url = opts
.store_addrs
.first()
.context(error::InvalidArgumentsSnafu {
err_msg: "empty store addrs",
})?;
let (client, connection) = tokio_postgres::connect(postgres_url, NoTls)
.await
.context(error::ConnectPostgresSnafu)?;
tokio::spawn(async move {
if let Err(e) = connection.await {
error!(e; "connection error");
}
});
Ok(client)
}
#[cfg(feature = "pg_kvbackend")]
/// Creates a pool for the Postgres backend.
///
/// It only use first store addr to create a pool, and use the given config to create a pool.
pub async fn create_postgres_pool_with(
store_addrs: &[String],
mut cfg: Config,
) -> Result<deadpool_postgres::Pool> {
pub async fn create_postgres_pool(store_addrs: &[String]) -> Result<deadpool_postgres::Pool> {
let postgres_url = store_addrs.first().context(error::InvalidArgumentsSnafu {
err_msg: "empty store addrs",
})?;
let mut cfg = Config::new();
cfg.url = Some(postgres_url.to_string());
let pool = cfg
.create_pool(Some(Runtime::Tokio1), NoTls)

View File

@@ -157,11 +157,6 @@ pub trait Election: Send + Sync {
/// but only one can be the leader at a time.
async fn campaign(&self) -> Result<()>;
/// Resets the campaign.
///
/// Reset the client and the leader flag if needed.
async fn reset_campaign(&self) {}
/// Returns the leader value for the current election.
async fn leader(&self) -> Result<Self::Leader>;

View File

@@ -18,12 +18,11 @@ use std::time::Duration;
use common_telemetry::{error, warn};
use common_time::Timestamp;
use deadpool_postgres::{Manager, Pool};
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::{broadcast, RwLock};
use tokio::sync::broadcast;
use tokio::time::MissedTickBehavior;
use tokio_postgres::types::ToSql;
use tokio_postgres::Row;
use tokio_postgres::Client;
use crate::election::rds::{parse_value_and_expire_time, Lease, RdsLeaderKey, LEASE_SEP};
use crate::election::{
@@ -31,14 +30,15 @@ use crate::election::{
CANDIDATES_ROOT, ELECTION_KEY,
};
use crate::error::{
DeserializeFromJsonSnafu, GetPostgresClientSnafu, NoLeaderSnafu, PostgresExecutionSnafu,
Result, SerializeToJsonSnafu, SqlExecutionTimeoutSnafu, UnexpectedSnafu,
DeserializeFromJsonSnafu, NoLeaderSnafu, PostgresExecutionSnafu, Result, SerializeToJsonSnafu,
UnexpectedSnafu,
};
use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
struct ElectionSqlFactory<'a> {
lock_id: u64,
table_name: &'a str,
meta_lease_ttl_secs: u64,
}
struct ElectionSqlSet {
@@ -88,10 +88,11 @@ struct ElectionSqlSet {
}
impl<'a> ElectionSqlFactory<'a> {
fn new(lock_id: u64, table_name: &'a str) -> Self {
fn new(lock_id: u64, table_name: &'a str, meta_lease_ttl_secs: u64) -> Self {
Self {
lock_id,
table_name,
meta_lease_ttl_secs,
}
}
@@ -107,6 +108,15 @@ impl<'a> ElectionSqlFactory<'a> {
}
}
// Currently the session timeout is longer than the leader lease time.
// So the leader will renew the lease twice before the session timeout if everything goes well.
fn set_idle_session_timeout_sql(&self) -> String {
format!(
"SET idle_session_timeout = '{}s';",
self.meta_lease_ttl_secs + 1
)
}
fn campaign_sql(&self) -> String {
format!("SELECT pg_try_advisory_lock({})", self.lock_id)
}
@@ -161,165 +171,46 @@ impl<'a> ElectionSqlFactory<'a> {
}
}
/// PgClient for election.
pub struct ElectionPgClient {
current: Option<deadpool::managed::Object<Manager>>,
pool: Pool,
/// The client-side timeout for statement execution.
///
/// This timeout is enforced by the client application and is independent of any server-side timeouts.
/// If a statement takes longer than this duration to execute, the client will abort the operation.
execution_timeout: Duration,
/// The idle session timeout.
///
/// This timeout is configured per client session and is enforced by the PostgreSQL server.
/// If a session remains idle for longer than this duration, the server will terminate it.
idle_session_timeout: Duration,
/// The statement timeout.
///
/// This timeout is configured per client session and is enforced by the PostgreSQL server.
/// If a statement takes longer than this duration to execute, the server will abort it.
statement_timeout: Duration,
}
impl ElectionPgClient {
pub fn new(
pool: Pool,
execution_timeout: Duration,
idle_session_timeout: Duration,
statement_timeout: Duration,
) -> Result<ElectionPgClient> {
Ok(ElectionPgClient {
current: None,
pool,
execution_timeout,
idle_session_timeout,
statement_timeout,
})
}
fn set_idle_session_timeout_sql(&self) -> String {
format!(
"SET idle_session_timeout = '{}s';",
self.idle_session_timeout.as_secs()
)
}
fn set_statement_timeout_sql(&self) -> String {
format!(
"SET statement_timeout = '{}s';",
self.statement_timeout.as_secs()
)
}
async fn reset_client(&mut self) -> Result<()> {
self.current = None;
self.maybe_init_client().await
}
async fn maybe_init_client(&mut self) -> Result<()> {
if self.current.is_none() {
let client = self.pool.get().await.context(GetPostgresClientSnafu)?;
self.current = Some(client);
// Set idle session timeout and statement timeout.
let idle_session_timeout_sql = self.set_idle_session_timeout_sql();
self.execute(&idle_session_timeout_sql, &[]).await?;
let statement_timeout_sql = self.set_statement_timeout_sql();
self.execute(&statement_timeout_sql, &[]).await?;
}
Ok(())
}
/// Returns the result of the query.
///
/// # Panics
/// if `current` is `None`.
async fn execute(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64> {
let result = tokio::time::timeout(
self.execution_timeout,
self.current.as_ref().unwrap().execute(sql, params),
)
.await
.map_err(|_| {
SqlExecutionTimeoutSnafu {
sql: sql.to_string(),
duration: self.execution_timeout,
}
.build()
})?;
result.context(PostgresExecutionSnafu { sql })
}
/// Returns the result of the query.
///
/// # Panics
/// if `current` is `None`.
async fn query(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>> {
let result = tokio::time::timeout(
self.execution_timeout,
self.current.as_ref().unwrap().query(sql, params),
)
.await
.map_err(|_| {
SqlExecutionTimeoutSnafu {
sql: sql.to_string(),
duration: self.execution_timeout,
}
.build()
})?;
result.context(PostgresExecutionSnafu { sql })
}
}
/// PostgreSql implementation of Election.
pub struct PgElection {
leader_value: String,
pg_client: RwLock<ElectionPgClient>,
client: Client,
is_leader: AtomicBool,
leader_infancy: AtomicBool,
leader_watcher: broadcast::Sender<LeaderChangeMessage>,
store_key_prefix: String,
candidate_lease_ttl: Duration,
meta_lease_ttl: Duration,
candidate_lease_ttl_secs: u64,
meta_lease_ttl_secs: u64,
sql_set: ElectionSqlSet,
}
impl PgElection {
async fn maybe_init_client(&self) -> Result<()> {
if self.pg_client.read().await.current.is_none() {
self.pg_client.write().await.maybe_init_client().await?;
}
Ok(())
}
pub async fn with_pg_client(
leader_value: String,
pg_client: ElectionPgClient,
client: Client,
store_key_prefix: String,
candidate_lease_ttl: Duration,
meta_lease_ttl: Duration,
candidate_lease_ttl_secs: u64,
meta_lease_ttl_secs: u64,
table_name: &str,
lock_id: u64,
) -> Result<ElectionRef> {
let sql_factory = ElectionSqlFactory::new(lock_id, table_name);
let sql_factory = ElectionSqlFactory::new(lock_id, table_name, meta_lease_ttl_secs);
// Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead advisory lock.
client
.execute(&sql_factory.set_idle_session_timeout_sql(), &[])
.await
.context(PostgresExecutionSnafu)?;
let tx = listen_leader_change(leader_value.clone());
Ok(Arc::new(Self {
leader_value,
pg_client: RwLock::new(pg_client),
client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(false),
leader_watcher: tx,
store_key_prefix,
candidate_lease_ttl,
meta_lease_ttl,
candidate_lease_ttl_secs,
meta_lease_ttl_secs,
sql_set: sql_factory.build(),
}))
}
@@ -358,17 +249,18 @@ impl Election for PgElection {
input: format!("{node_info:?}"),
})?;
let res = self
.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl_secs)
.await?;
// May registered before, just update the lease.
if !res {
self.delete_value(&key).await?;
self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl_secs)
.await?;
}
// Check if the current lease has expired and renew the lease.
let mut keep_alive_interval = tokio::time::interval(self.candidate_lease_ttl / 2);
let mut keep_alive_interval =
tokio::time::interval(Duration::from_secs(self.candidate_lease_ttl_secs / 2));
loop {
let _ = keep_alive_interval.tick().await;
@@ -390,8 +282,13 @@ impl Election for PgElection {
);
// Safety: origin is Some since we are using `get_value_with_lease` with `true`.
self.update_value_with_lease(&key, &lease.origin, &node_info, self.candidate_lease_ttl)
.await?;
self.update_value_with_lease(
&key,
&lease.origin,
&node_info,
self.candidate_lease_ttl_secs,
)
.await?;
}
}
@@ -424,17 +321,16 @@ impl Election for PgElection {
/// - If the lock is not acquired (result is false), it calls the `follower_action` method
/// to perform actions as a follower.
async fn campaign(&self) -> Result<()> {
let mut keep_alive_interval = tokio::time::interval(self.meta_lease_ttl / 2);
let mut keep_alive_interval =
tokio::time::interval(Duration::from_secs(self.meta_lease_ttl_secs / 2));
keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
self.maybe_init_client().await?;
loop {
let res = self
.pg_client
.read()
.await
.client
.query(&self.sql_set.campaign, &[])
.await?;
.await
.context(PostgresExecutionSnafu)?;
let row = res.first().context(UnexpectedSnafu {
violated: "Failed to get the result of acquiring advisory lock",
})?;
@@ -453,12 +349,6 @@ impl Election for PgElection {
}
}
async fn reset_campaign(&self) {
if let Err(err) = self.pg_client.write().await.reset_client().await {
error!(err; "Failed to reset client");
}
}
async fn leader(&self) -> Result<Self::Leader> {
if self.is_leader.load(Ordering::Relaxed) {
Ok(self.leader_value.as_bytes().into())
@@ -486,13 +376,11 @@ impl PgElection {
/// Returns value, expire time and current time. If `with_origin` is true, the origin string is also returned.
async fn get_value_with_lease(&self, key: &str) -> Result<Option<Lease>> {
let key = key.as_bytes();
self.maybe_init_client().await?;
let res = self
.pg_client
.read()
.await
.client
.query(&self.sql_set.get_value_with_lease, &[&key])
.await?;
.await
.context(PostgresExecutionSnafu)?;
if res.is_empty() {
Ok(None)
@@ -526,13 +414,11 @@ impl PgElection {
key_prefix: &str,
) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
self.maybe_init_client().await?;
let res = self
.pg_client
.read()
.await
.client
.query(&self.sql_set.get_value_with_lease_by_prefix, &[&key_prefix])
.await?;
.await
.context(PostgresExecutionSnafu)?;
let mut values_with_leases = vec![];
let mut current = Timestamp::default();
@@ -559,21 +445,18 @@ impl PgElection {
key: &str,
prev: &str,
updated: &str,
lease_ttl: Duration,
lease_ttl: u64,
) -> Result<()> {
let key = key.as_bytes();
let prev = prev.as_bytes();
self.maybe_init_client().await?;
let lease_ttl_secs = lease_ttl.as_secs() as f64;
let res = self
.pg_client
.read()
.await
.client
.execute(
&self.sql_set.update_value_with_lease,
&[&key, &prev, &updated, &lease_ttl_secs],
&[&key, &prev, &updated, &(lease_ttl as f64)],
)
.await?;
.await
.context(PostgresExecutionSnafu)?;
ensure!(
res == 1,
@@ -590,18 +473,16 @@ impl PgElection {
&self,
key: &str,
value: &str,
lease_ttl: Duration,
lease_ttl_secs: u64,
) -> Result<bool> {
let key = key.as_bytes();
let lease_ttl_secs = lease_ttl.as_secs() as f64;
let lease_ttl_secs = lease_ttl_secs as f64;
let params: Vec<&(dyn ToSql + Sync)> = vec![&key, &value, &lease_ttl_secs];
self.maybe_init_client().await?;
let res = self
.pg_client
.read()
.await
.client
.query(&self.sql_set.put_value_with_lease, &params)
.await?;
.await
.context(PostgresExecutionSnafu)?;
Ok(res.is_empty())
}
@@ -609,13 +490,11 @@ impl PgElection {
/// Caution: Should only delete the key if the lease is expired.
async fn delete_value(&self, key: &str) -> Result<bool> {
let key = key.as_bytes();
self.maybe_init_client().await?;
let res = self
.pg_client
.read()
.await
.client
.query(&self.sql_set.delete_value, &[&key])
.await?;
.await
.context(PostgresExecutionSnafu)?;
Ok(res.len() == 1)
}
@@ -657,7 +536,7 @@ impl PgElection {
&key,
&lease.origin,
&self.leader_value,
self.meta_lease_ttl,
self.meta_lease_ttl_secs,
)
.await?;
}
@@ -726,12 +605,10 @@ impl PgElection {
..Default::default()
};
self.delete_value(&key).await?;
self.maybe_init_client().await?;
self.pg_client
.read()
.await
self.client
.query(&self.sql_set.step_down, &[])
.await?;
.await
.context(PostgresExecutionSnafu)?;
send_leader_change_and_set_flags(
&self.is_leader,
&self.leader_infancy,
@@ -774,7 +651,7 @@ impl PgElection {
..Default::default()
};
self.delete_value(&key).await?;
self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl)
self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl_secs)
.await?;
if self
@@ -797,21 +674,15 @@ impl PgElection {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::env;
use common_meta::maybe_skip_postgres_integration_test;
use tokio_postgres::{Client, NoTls};
use super::*;
use crate::bootstrap::create_postgres_pool;
use crate::error;
use crate::error::PostgresExecutionSnafu;
async fn create_postgres_client(
table_name: Option<&str>,
execution_timeout: Duration,
idle_session_timeout: Duration,
statement_timeout: Duration,
) -> Result<ElectionPgClient> {
async fn create_postgres_client(table_name: Option<&str>) -> Result<Client> {
let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default();
if endpoint.is_empty() {
return UnexpectedSnafu {
@@ -819,34 +690,25 @@ mod tests {
}
.fail();
}
let pool = create_postgres_pool(&[endpoint]).await.unwrap();
let mut pg_client = ElectionPgClient::new(
pool,
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.unwrap();
pg_client.maybe_init_client().await?;
let (client, connection) = tokio_postgres::connect(&endpoint, NoTls)
.await
.context(PostgresExecutionSnafu)?;
tokio::spawn(async move {
connection.await.context(PostgresExecutionSnafu).unwrap();
});
if let Some(table_name) = table_name {
let create_table_sql = format!(
"CREATE TABLE IF NOT EXISTS \"{}\"(k bytea PRIMARY KEY, v bytea);",
table_name
);
pg_client.execute(&create_table_sql, &[]).await?;
client.execute(&create_table_sql, &[]).await.unwrap();
}
Ok(pg_client)
Ok(client)
}
async fn drop_table(pg_election: &PgElection, table_name: &str) {
async fn drop_table(client: &Client, table_name: &str) {
let sql = format!("DROP TABLE IF EXISTS \"{}\";", table_name);
pg_election
.pg_client
.read()
.await
.execute(&sql, &[])
.await
.unwrap();
client.execute(&sql, &[]).await.unwrap();
}
#[tokio::test]
@@ -857,35 +719,23 @@ mod tests {
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_postgres_crud_greptime_metakv";
let candidate_lease_ttl = Duration::from_secs(10);
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let meta_lease_ttl = Duration::from_secs(2);
let idle_session_timeout = Duration::from_secs(0);
let client = create_postgres_client(
Some(table_name),
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let client = create_postgres_client(Some(table_name)).await.unwrap();
let (tx, _) = broadcast::channel(100);
let pg_election = PgElection {
leader_value: "test_leader".to_string(),
pg_client: RwLock::new(client),
client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid,
candidate_lease_ttl,
meta_lease_ttl,
sql_set: ElectionSqlFactory::new(28319, table_name).build(),
candidate_lease_ttl_secs: 10,
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28319, table_name, 2).build(),
};
let res = pg_election
.put_value_with_lease(&key, &value, candidate_lease_ttl)
.put_value_with_lease(&key, &value, 10)
.await
.unwrap();
assert!(res);
@@ -898,7 +748,7 @@ mod tests {
assert_eq!(lease.leader_value, value);
pg_election
.update_value_with_lease(&key, &lease.origin, &value, pg_election.meta_lease_ttl)
.update_value_with_lease(&key, &lease.origin, &value, pg_election.meta_lease_ttl_secs)
.await
.unwrap();
@@ -912,7 +762,7 @@ mod tests {
let key = format!("test_key_{}", i);
let value = format!("test_value_{}", i);
pg_election
.put_value_with_lease(&key, &value, candidate_lease_ttl)
.put_value_with_lease(&key, &value, 10)
.await
.unwrap();
}
@@ -937,39 +787,28 @@ mod tests {
assert!(res.is_empty());
assert!(current == Timestamp::default());
drop_table(&pg_election, table_name).await;
drop_table(&pg_election.client, table_name).await;
}
async fn candidate(
leader_value: String,
candidate_lease_ttl: Duration,
candidate_lease_ttl_secs: u64,
store_key_prefix: String,
table_name: String,
) {
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let meta_lease_ttl = Duration::from_secs(2);
let idle_session_timeout = Duration::from_secs(0);
let client = create_postgres_client(
None,
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let client = create_postgres_client(None).await.unwrap();
let (tx, _) = broadcast::channel(100);
let pg_election = PgElection {
leader_value,
pg_client: RwLock::new(client),
client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix,
candidate_lease_ttl,
meta_lease_ttl,
sql_set: ElectionSqlFactory::new(28319, &table_name).build(),
candidate_lease_ttl_secs,
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28319, &table_name, 2).build(),
};
let node_info = MetasrvNodeInfo {
@@ -985,28 +824,17 @@ mod tests {
async fn test_candidate_registration() {
maybe_skip_postgres_integration_test!();
let leader_value_prefix = "test_leader".to_string();
let candidate_lease_ttl_secs = 5;
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_candidate_registration_greptime_metakv";
let mut handles = vec![];
let candidate_lease_ttl = Duration::from_secs(5);
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let meta_lease_ttl = Duration::from_secs(2);
let idle_session_timeout = Duration::from_secs(0);
let client = create_postgres_client(
Some(table_name),
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let client = create_postgres_client(Some(table_name)).await.unwrap();
for i in 0..10 {
let leader_value = format!("{}{}", leader_value_prefix, i);
let handle = tokio::spawn(candidate(
leader_value,
candidate_lease_ttl,
candidate_lease_ttl_secs,
uuid.clone(),
table_name.to_string(),
));
@@ -1019,14 +847,14 @@ mod tests {
let leader_value = "test_leader".to_string();
let pg_election = PgElection {
leader_value,
pg_client: RwLock::new(client),
client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid.clone(),
candidate_lease_ttl,
meta_lease_ttl,
sql_set: ElectionSqlFactory::new(28319, table_name).build(),
candidate_lease_ttl_secs,
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28319, table_name, 2).build(),
};
let candidates = pg_election.all_candidates().await.unwrap();
@@ -1048,40 +876,29 @@ mod tests {
assert!(res);
}
drop_table(&pg_election, table_name).await;
drop_table(&pg_election.client, table_name).await;
}
#[tokio::test]
async fn test_elected_and_step_down() {
maybe_skip_postgres_integration_test!();
let leader_value = "test_leader".to_string();
let candidate_lease_ttl_secs = 5;
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_elected_and_step_down_greptime_metakv";
let candidate_lease_ttl = Duration::from_secs(5);
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let meta_lease_ttl = Duration::from_secs(2);
let idle_session_timeout = Duration::from_secs(0);
let client = create_postgres_client(
Some(table_name),
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let client = create_postgres_client(Some(table_name)).await.unwrap();
let (tx, mut rx) = broadcast::channel(100);
let leader_pg_election = PgElection {
leader_value: leader_value.clone(),
pg_client: RwLock::new(client),
client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid,
candidate_lease_ttl,
meta_lease_ttl,
sql_set: ElectionSqlFactory::new(28320, table_name).build(),
candidate_lease_ttl_secs,
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28320, table_name, 2).build(),
};
leader_pg_election.elected().await.unwrap();
@@ -1173,7 +990,7 @@ mod tests {
_ => panic!("Expected LeaderChangeMessage::StepDown"),
}
drop_table(&leader_pg_election, table_name).await;
drop_table(&leader_pg_election.client, table_name).await;
}
#[tokio::test]
@@ -1182,38 +999,25 @@ mod tests {
let leader_value = "test_leader".to_string();
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_leader_action_greptime_metakv";
let candidate_lease_ttl = Duration::from_secs(5);
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let meta_lease_ttl = Duration::from_secs(2);
let idle_session_timeout = Duration::from_secs(0);
let client = create_postgres_client(
Some(table_name),
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let candidate_lease_ttl_secs = 5;
let client = create_postgres_client(Some(table_name)).await.unwrap();
let (tx, mut rx) = broadcast::channel(100);
let leader_pg_election = PgElection {
leader_value: leader_value.clone(),
pg_client: RwLock::new(client),
client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid,
candidate_lease_ttl,
meta_lease_ttl,
sql_set: ElectionSqlFactory::new(28321, table_name).build(),
candidate_lease_ttl_secs,
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28321, table_name, 2).build(),
};
// Step 1: No leader exists, campaign and elected.
let res = leader_pg_election
.pg_client
.read()
.await
.client
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
@@ -1244,9 +1048,7 @@ mod tests {
// Step 2: As a leader, renew the lease.
let res = leader_pg_election
.pg_client
.read()
.await
.client
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
@@ -1268,9 +1070,7 @@ mod tests {
tokio::time::sleep(Duration::from_secs(2)).await;
let res = leader_pg_election
.pg_client
.read()
.await
.client
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
@@ -1298,9 +1098,7 @@ mod tests {
// Step 4: Re-campaign and elected.
let res = leader_pg_election
.pg_client
.read()
.await
.client
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
@@ -1357,9 +1155,7 @@ mod tests {
// Step 6: Re-campaign and elected.
let res = leader_pg_election
.pg_client
.read()
.await
.client
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
@@ -1390,9 +1186,7 @@ mod tests {
// Step 7: Something wrong, the leader key changed by others.
let res = leader_pg_election
.pg_client
.read()
.await
.client
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
@@ -1403,11 +1197,7 @@ mod tests {
.await
.unwrap();
leader_pg_election
.put_value_with_lease(
&leader_pg_election.election_key(),
"test",
Duration::from_secs(10),
)
.put_value_with_lease(&leader_pg_election.election_key(), "test", 10)
.await
.unwrap();
leader_pg_election.leader_action().await.unwrap();
@@ -1433,74 +1223,52 @@ mod tests {
// Clean up
leader_pg_election
.pg_client
.read()
.await
.client
.query(&leader_pg_election.sql_set.step_down, &[])
.await
.unwrap();
drop_table(&leader_pg_election, table_name).await;
drop_table(&leader_pg_election.client, table_name).await;
}
#[tokio::test]
async fn test_follower_action() {
maybe_skip_postgres_integration_test!();
common_telemetry::init_default_ut_logging();
let candidate_lease_ttl_secs = 5;
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_follower_action_greptime_metakv";
let candidate_lease_ttl = Duration::from_secs(5);
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let meta_lease_ttl = Duration::from_secs(2);
let idle_session_timeout = Duration::from_secs(0);
let follower_client = create_postgres_client(
Some(table_name),
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let follower_client = create_postgres_client(Some(table_name)).await.unwrap();
let (tx, mut rx) = broadcast::channel(100);
let follower_pg_election = PgElection {
leader_value: "test_follower".to_string(),
pg_client: RwLock::new(follower_client),
client: follower_client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid.clone(),
candidate_lease_ttl,
meta_lease_ttl,
sql_set: ElectionSqlFactory::new(28322, table_name).build(),
candidate_lease_ttl_secs,
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28322, table_name, 2).build(),
};
let leader_client = create_postgres_client(
Some(table_name),
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let leader_client = create_postgres_client(Some(table_name)).await.unwrap();
let (tx, _) = broadcast::channel(100);
let leader_pg_election = PgElection {
leader_value: "test_leader".to_string(),
pg_client: RwLock::new(leader_client),
client: leader_client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid,
candidate_lease_ttl,
meta_lease_ttl,
sql_set: ElectionSqlFactory::new(28322, table_name).build(),
candidate_lease_ttl_secs,
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28322, table_name, 2).build(),
};
leader_pg_election
.pg_client
.read()
.await
.client
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
@@ -1541,41 +1309,11 @@ mod tests {
// Clean up
leader_pg_election
.pg_client
.read()
.await
.client
.query(&leader_pg_election.sql_set.step_down, &[])
.await
.unwrap();
drop_table(&follower_pg_election, table_name).await;
}
#[tokio::test]
async fn test_idle_session_timeout() {
maybe_skip_postgres_integration_test!();
common_telemetry::init_default_ut_logging();
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let idle_session_timeout = Duration::from_secs(1);
let mut client = create_postgres_client(
None,
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(1100)).await;
// Wait for the idle session timeout.
let err = client.query("SELECT 1", &[]).await.unwrap_err();
assert_matches!(err, error::Error::PostgresExecution { .. });
let error::Error::PostgresExecution { error, .. } = err else {
panic!("Expected PostgresExecution error");
};
assert!(error.is_closed());
// Reset the client and try again.
client.reset_client().await.unwrap();
let _ = client.query("SELECT 1", &[]).await.unwrap();
drop_table(&follower_pg_election.client, table_name).await;
}
}

View File

@@ -748,31 +748,21 @@ pub enum Error {
},
#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Failed to execute via postgres, sql: {}", sql))]
#[snafu(display("Failed to execute via postgres"))]
PostgresExecution {
#[snafu(source)]
error: tokio_postgres::Error,
sql: String,
#[snafu(implicit)]
location: Location,
},
#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Failed to get Postgres client"))]
GetPostgresClient {
#[snafu(implicit)]
location: Location,
#[snafu(display("Failed to connect to Postgres"))]
ConnectPostgres {
#[snafu(source)]
error: deadpool::managed::PoolError<tokio_postgres::Error>,
},
#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Sql execution timeout, sql: {}, duration: {:?}", sql, duration))]
SqlExecutionTimeout {
error: tokio_postgres::Error,
#[snafu(implicit)]
location: Location,
sql: String,
duration: std::time::Duration,
},
#[cfg(feature = "pg_kvbackend")]
@@ -1015,10 +1005,9 @@ impl ErrorExt for Error {
Error::LookupPeer { source, .. } => source.status_code(),
#[cfg(feature = "pg_kvbackend")]
Error::CreatePostgresPool { .. }
| Error::GetPostgresClient { .. }
| Error::GetPostgresConnection { .. }
| Error::PostgresExecution { .. }
| Error::SqlExecutionTimeout { .. } => StatusCode::Internal,
| Error::ConnectPostgres { .. } => StatusCode::Internal,
#[cfg(feature = "mysql_kvbackend")]
Error::MySqlExecution { .. }
| Error::CreateMySqlPool { .. }

View File

@@ -46,7 +46,6 @@ use common_telemetry::{error, info, warn};
use common_wal::config::MetasrvWalConfig;
use serde::{Deserialize, Serialize};
use servers::export_metrics::ExportMetricsOption;
use servers::grpc::GrpcOptions;
use servers::http::HttpOptions;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
@@ -97,10 +96,8 @@ pub enum BackendImpl {
#[serde(default)]
pub struct MetasrvOptions {
/// The address the server listens on.
#[deprecated(note = "Use grpc.bind_addr instead")]
pub bind_addr: String,
/// The address the server advertises to the clients.
#[deprecated(note = "Use grpc.server_addr instead")]
pub server_addr: String,
/// The address of the store, e.g., etcd.
pub store_addrs: Vec<String>,
@@ -115,7 +112,6 @@ pub struct MetasrvOptions {
/// If it's true, the region failover will be allowed even if the local WAL is used.
/// Note that this option is not recommended to be set to true, because it may lead to data loss during failover.
pub allow_region_failover_on_local_wal: bool,
pub grpc: GrpcOptions,
/// The HTTP server options.
pub http: HttpOptions,
/// The logging options.
@@ -170,6 +166,8 @@ impl fmt::Debug for MetasrvOptions {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut debug_struct = f.debug_struct("MetasrvOptions");
debug_struct
.field("bind_addr", &self.bind_addr)
.field("server_addr", &self.server_addr)
.field("store_addrs", &self.sanitize_store_addrs())
.field("selector", &self.selector)
.field("use_memory_store", &self.use_memory_store)
@@ -178,7 +176,6 @@ impl fmt::Debug for MetasrvOptions {
"allow_region_failover_on_local_wal",
&self.allow_region_failover_on_local_wal,
)
.field("grpc", &self.grpc)
.field("http", &self.http)
.field("logging", &self.logging)
.field("procedure", &self.procedure)
@@ -211,19 +208,14 @@ const DEFAULT_METASRV_ADDR_PORT: &str = "3002";
impl Default for MetasrvOptions {
fn default() -> Self {
Self {
#[allow(deprecated)]
bind_addr: String::new(),
#[allow(deprecated)]
bind_addr: format!("127.0.0.1:{}", DEFAULT_METASRV_ADDR_PORT),
// If server_addr is not set, the server will use the local ip address as the server address.
server_addr: String::new(),
store_addrs: vec!["127.0.0.1:2379".to_string()],
selector: SelectorType::default(),
use_memory_store: false,
enable_region_failover: false,
allow_region_failover_on_local_wal: false,
grpc: GrpcOptions {
bind_addr: format!("127.0.0.1:{}", DEFAULT_METASRV_ADDR_PORT),
..Default::default()
},
http: HttpOptions::default(),
logging: LoggingOptions::default(),
procedure: ProcedureConfig {
@@ -261,6 +253,37 @@ impl Configurable for MetasrvOptions {
}
impl MetasrvOptions {
/// Detect server address.
#[cfg(not(target_os = "android"))]
pub fn detect_server_addr(&mut self) {
if self.server_addr.is_empty() {
match local_ip_address::local_ip() {
Ok(ip) => {
let detected_addr = format!(
"{}:{}",
ip,
self.bind_addr
.split(':')
.nth(1)
.unwrap_or(DEFAULT_METASRV_ADDR_PORT)
);
info!("Using detected: {} as server address", detected_addr);
self.server_addr = detected_addr;
}
Err(e) => {
error!("Failed to detect local ip address: {}", e);
}
}
}
}
#[cfg(target_os = "android")]
pub fn detect_server_addr(&mut self) {
if self.server_addr.is_empty() {
common_telemetry::debug!("detect local IP is not supported on Android");
}
}
fn sanitize_store_addrs(&self) -> Vec<String> {
self.store_addrs
.iter()
@@ -559,7 +582,6 @@ impl Metasrv {
if let Err(e) = res {
warn!(e; "Metasrv election error");
}
election.reset_campaign().await;
info!("Metasrv re-initiate election");
}
info!("Metasrv stopped");
@@ -616,7 +638,7 @@ impl Metasrv {
pub fn node_info(&self) -> MetasrvNodeInfo {
let build_info = common_version::build_info();
MetasrvNodeInfo {
addr: self.options().grpc.server_addr.clone(),
addr: self.options().server_addr.clone(),
version: build_info.version.to_string(),
git_commit: build_info.commit_short.to_string(),
start_time_ms: self.start_time_ms(),
@@ -708,7 +730,7 @@ impl Metasrv {
#[inline]
pub fn new_ctx(&self) -> Context {
let server_addr = self.options().grpc.server_addr.clone();
let server_addr = self.options().server_addr.clone();
let in_memory = self.in_memory.clone();
let kv_backend = self.kv_backend.clone();
let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();

View File

@@ -179,8 +179,8 @@ impl MetasrvBuilder {
let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
let state = Arc::new(RwLock::new(match election {
None => State::leader(options.grpc.server_addr.to_string(), true),
Some(_) => State::follower(options.grpc.server_addr.to_string()),
None => State::leader(options.server_addr.to_string(), true),
Some(_) => State::follower(options.server_addr.to_string()),
}));
let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::new(
@@ -203,7 +203,7 @@ impl MetasrvBuilder {
));
let maintenance_mode_manager = Arc::new(MaintenanceModeManager::new(kv_backend.clone()));
let selector_ctx = SelectorContext {
server_addr: options.grpc.server_addr.clone(),
server_addr: options.server_addr.clone(),
datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS,
flownode_lease_secs: distributed_time_constants::FLOWNODE_LEASE_SECS,
kv_backend: kv_backend.clone(),
@@ -272,7 +272,7 @@ impl MetasrvBuilder {
let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new(
mailbox.clone(),
MetasrvInfo {
server_addr: options.grpc.server_addr.clone(),
server_addr: options.server_addr.clone(),
},
));
let peer_lookup_service = Arc::new(MetaPeerLookupService::new(meta_peer_client.clone()));
@@ -315,7 +315,7 @@ impl MetasrvBuilder {
memory_region_keeper.clone(),
region_failure_detector_controller.clone(),
mailbox.clone(),
options.grpc.server_addr.clone(),
options.server_addr.clone(),
cache_invalidator.clone(),
),
));
@@ -390,7 +390,7 @@ impl MetasrvBuilder {
client: Arc::new(kafka_client),
table_metadata_manager: table_metadata_manager.clone(),
leader_region_registry: leader_region_registry.clone(),
server_addr: options.grpc.server_addr.clone(),
server_addr: options.server_addr.clone(),
mailbox: mailbox.clone(),
};
let wal_prune_manager = WalPruneManager::new(

View File

@@ -26,7 +26,6 @@ use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use hyper_util::rt::TokioIo;
use servers::grpc::GrpcOptions;
use tonic::codec::CompressionEncoding;
use tower::service_fn;
@@ -48,10 +47,7 @@ pub async fn mock_with_memstore() -> MockInfo {
let in_memory = Arc::new(MemoryKvBackend::new());
mock(
MetasrvOptions {
grpc: GrpcOptions {
server_addr: "127.0.0.1:3002".to_string(),
..Default::default()
},
server_addr: "127.0.0.1:3002".to_string(),
..Default::default()
},
kv_backend,
@@ -66,10 +62,7 @@ pub async fn mock_with_etcdstore(addr: &str) -> MockInfo {
let kv_backend = EtcdStore::with_endpoints([addr], 128).await.unwrap();
mock(
MetasrvOptions {
grpc: GrpcOptions {
server_addr: "127.0.0.1:3002".to_string(),
..Default::default()
},
server_addr: "127.0.0.1:3002".to_string(),
..Default::default()
},
kv_backend,
@@ -87,7 +80,7 @@ pub async fn mock(
datanode_clients: Option<Arc<NodeClients>>,
in_memory: Option<ResettableKvBackendRef>,
) -> MockInfo {
let server_addr = opts.grpc.server_addr.clone();
let server_addr = opts.server_addr.clone();
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
table_metadata_manager.init().await.unwrap();

View File

@@ -88,7 +88,7 @@ impl cluster_server::Cluster for Metasrv {
return Ok(Response::new(resp));
}
let leader_addr = &self.options().grpc.server_addr;
let leader_addr = &self.options().server_addr;
let (leader, followers) = match self.election() {
Some(election) => {
let nodes = election.all_candidates().await?;

View File

@@ -190,7 +190,6 @@ mod tests {
use api::v1::meta::*;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_telemetry::tracing_context::W3cTrace;
use servers::grpc::GrpcOptions;
use tonic::IntoRequest;
use super::get_node_id;
@@ -204,10 +203,7 @@ mod tests {
let metasrv = MetasrvBuilder::new()
.kv_backend(kv_backend)
.options(MetasrvOptions {
grpc: GrpcOptions {
server_addr: "127.0.0.1:3002".to_string(),
..Default::default()
},
server_addr: "127.0.0.1:3002".to_string(),
..Default::default()
})
.build()
@@ -220,7 +216,7 @@ mod tests {
let res = metasrv.ask_leader(req.into_request()).await.unwrap();
let res = res.into_inner();
assert_eq!(metasrv.options().grpc.server_addr, res.leader.unwrap().addr);
assert_eq!(metasrv.options().bind_addr, res.leader.unwrap().addr);
}
#[test]

View File

@@ -282,15 +282,14 @@ mod tests {
use std::sync::Arc;
use api::v1::SemanticType;
use common_function::function::FunctionRef;
use common_function::function_factory::ScalarFunctionFactory;
use common_function::scalars::matches::MatchesFunction;
use common_function::scalars::matches_term::MatchesTermFunction;
use common_function::function_registry::FUNCTION_REGISTRY;
use common_function::scalars::udf::create_udf;
use datafusion::functions::string::lower;
use datafusion_common::Column;
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::ScalarUDF;
use datatypes::schema::ColumnSchema;
use session::context::QueryContext;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
@@ -318,17 +317,19 @@ mod tests {
}
fn matches_func() -> Arc<ScalarUDF> {
Arc::new(
ScalarFunctionFactory::from(Arc::new(MatchesFunction) as FunctionRef)
.provide(Default::default()),
)
Arc::new(create_udf(
FUNCTION_REGISTRY.get_function("matches").unwrap(),
QueryContext::arc(),
Default::default(),
))
}
fn matches_term_func() -> Arc<ScalarUDF> {
Arc::new(
ScalarFunctionFactory::from(Arc::new(MatchesTermFunction) as FunctionRef)
.provide(Default::default()),
)
Arc::new(create_udf(
FUNCTION_REGISTRY.get_function("matches_term").unwrap(),
QueryContext::arc(),
Default::default(),
))
}
#[test]

View File

@@ -25,7 +25,8 @@ use async_trait::async_trait;
use common_base::Plugins;
use common_catalog::consts::is_readonly_schema;
use common_error::ext::BoxedError;
use common_function::function_factory::ScalarFunctionFactory;
use common_function::function::FunctionRef;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_query::{Output, OutputData, OutputMeta};
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream};
@@ -34,9 +35,7 @@ use datafusion::physical_plan::analyze::AnalyzeExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::ResolvedTableReference;
use datafusion_expr::{
AggregateUDF, DmlStatement, LogicalPlan as DfLogicalPlan, LogicalPlan, WriteOp,
};
use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, LogicalPlan, WriteOp};
use datatypes::prelude::VectorRef;
use datatypes::schema::Schema;
use futures_util::StreamExt;
@@ -455,14 +454,14 @@ impl QueryEngine for DatafusionQueryEngine {
/// `SELECT "my_UDAF"(x)` will look for an aggregate named `"my_UDAF"`
///
/// So it's better to make UDAF name lowercase when creating one.
fn register_aggregate_function(&self, func: AggregateUDF) {
self.state.register_aggr_function(func);
fn register_aggregate_function(&self, func: AggregateFunctionMetaRef) {
self.state.register_aggregate_function(func);
}
/// Register an scalar function.
/// Register an UDF function.
/// Will override if the function with same name is already registered.
fn register_scalar_function(&self, func: ScalarFunctionFactory) {
self.state.register_scalar_function(func);
fn register_function(&self, func: FunctionRef) {
self.state.register_function(func);
}
fn read_table(&self, table: TableRef) -> Result<DataFrame> {

View File

@@ -18,7 +18,12 @@ use std::sync::Arc;
use arrow_schema::DataType;
use catalog::table_source::DfTableSourceProvider;
use common_function::function::FunctionContext;
use common_function::aggr::{
GeoPathAccumulator, HllState, UddSketchState, GEO_PATH_NAME, HLL_MERGE_NAME, HLL_NAME,
UDDSKETCH_MERGE_NAME, UDDSKETCH_STATE_NAME,
};
use common_function::scalars::udf::create_udf;
use common_query::logical_plan::create_aggregate_function;
use datafusion::common::TableReference;
use datafusion::datasource::cte_worktable::CteWorkTable;
use datafusion::datasource::file_format::{format_as_file_type, FileFormatFactory};
@@ -146,21 +151,38 @@ impl ContextProvider for DfContextProviderAdapter {
}
fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
self.engine_state.scalar_function(name).map_or_else(
self.engine_state.udf_function(name).map_or_else(
|| self.session_state.scalar_functions().get(name).cloned(),
|func| {
Some(Arc::new(func.provide(FunctionContext {
query_ctx: self.query_ctx.clone(),
state: self.engine_state.function_state(),
})))
Some(Arc::new(create_udf(
func,
self.query_ctx.clone(),
self.engine_state.function_state(),
)))
},
)
}
fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
self.engine_state.aggr_function(name).map_or_else(
if name == UDDSKETCH_STATE_NAME {
return Some(Arc::new(UddSketchState::state_udf_impl()));
} else if name == UDDSKETCH_MERGE_NAME {
return Some(Arc::new(UddSketchState::merge_udf_impl()));
} else if name == HLL_NAME {
return Some(Arc::new(HllState::state_udf_impl()));
} else if name == HLL_MERGE_NAME {
return Some(Arc::new(HllState::merge_udf_impl()));
} else if name == GEO_PATH_NAME {
return Some(Arc::new(GeoPathAccumulator::udf_impl()));
}
self.engine_state.aggregate_function(name).map_or_else(
|| self.session_state.aggregate_functions().get(name).cloned(),
|func| Some(Arc::new(func)),
|func| {
Some(Arc::new(
create_aggregate_function(func.name(), func.args_count(), func.create()).into(),
))
},
)
}
@@ -191,13 +213,13 @@ impl ContextProvider for DfContextProviderAdapter {
}
fn udf_names(&self) -> Vec<String> {
let mut names = self.engine_state.scalar_names();
let mut names = self.engine_state.udf_names();
names.extend(self.session_state.scalar_functions().keys().cloned());
names
}
fn udaf_names(&self) -> Vec<String> {
let mut names = self.engine_state.aggr_names();
let mut names = self.engine_state.udaf_names();
names.extend(self.session_state.aggregate_functions().keys().cloned());
names
}

View File

@@ -22,13 +22,14 @@ use std::sync::Arc;
use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_base::Plugins;
use common_function::function_factory::ScalarFunctionFactory;
use common_function::function::FunctionRef;
use common_function::function_registry::FUNCTION_REGISTRY;
use common_function::handlers::{
FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
};
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_query::Output;
use datafusion_expr::{AggregateUDF, LogicalPlan};
use datafusion_expr::LogicalPlan;
use datatypes::schema::Schema;
pub use default_serializer::{DefaultPlanDecoder, DefaultSerializer};
use session::context::QueryContextRef;
@@ -78,11 +79,11 @@ pub trait QueryEngine: Send + Sync {
///
/// # Panics
/// Will panic if the function with same name is already registered.
fn register_aggregate_function(&self, func: AggregateUDF);
fn register_aggregate_function(&self, func: AggregateFunctionMetaRef);
/// Register a scalar function.
/// Register a SQL function.
/// Will override if the function with same name is already registered.
fn register_scalar_function(&self, func: ScalarFunctionFactory);
fn register_function(&self, func: FunctionRef);
/// Create a DataFrame from a table.
fn read_table(&self, table: TableRef) -> Result<DataFrame>;
@@ -153,8 +154,8 @@ impl QueryEngineFactory {
/// Register all functions implemented by GreptimeDB
fn register_functions(query_engine: &Arc<DatafusionQueryEngine>) {
for func in FUNCTION_REGISTRY.scalar_functions() {
query_engine.register_scalar_function(func);
for func in FUNCTION_REGISTRY.functions() {
query_engine.register_function(func);
}
for accumulator in FUNCTION_REGISTRY.aggregate_functions() {

View File

@@ -15,8 +15,9 @@
use std::sync::Arc;
use common_error::ext::BoxedError;
use common_function::function::FunctionContext;
use common_function::aggr::{GeoPathAccumulator, HllState, UddSketchState};
use common_function::function_registry::FUNCTION_REGISTRY;
use common_function::scalars::udf::create_udf;
use common_query::error::RegisterUdfSnafu;
use common_query::logical_plan::SubstraitPlanDecoder;
use datafusion::catalog::CatalogProviderList;
@@ -123,46 +124,43 @@ impl SubstraitPlanDecoder for DefaultPlanDecoder {
// if they have the same name as the default UDFs or their alias.
// e.g. The default UDF `to_char()` has an alias `date_format()`, if we register a UDF with the name `date_format()`
// before we build the session state, the UDF will be lost.
for func in FUNCTION_REGISTRY.scalar_functions() {
let udf = func.provide(FunctionContext {
query_ctx: self.query_ctx.clone(),
state: Default::default(),
});
for func in FUNCTION_REGISTRY.functions() {
let udf = Arc::new(create_udf(
func.clone(),
self.query_ctx.clone(),
Default::default(),
));
session_state
.register_udf(Arc::new(udf))
.register_udf(udf)
.context(RegisterUdfSnafu { name: func.name() })?;
let _ = session_state.register_udaf(Arc::new(UddSketchState::state_udf_impl()));
let _ = session_state.register_udaf(Arc::new(UddSketchState::merge_udf_impl()));
let _ = session_state.register_udaf(Arc::new(HllState::state_udf_impl()));
let _ = session_state.register_udaf(Arc::new(HllState::merge_udf_impl()));
let _ = session_state.register_udaf(Arc::new(GeoPathAccumulator::udf_impl()));
let _ = session_state.register_udaf(quantile_udaf());
let _ = session_state.register_udf(Arc::new(IDelta::<false>::scalar_udf()));
let _ = session_state.register_udf(Arc::new(IDelta::<true>::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Rate::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Increase::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Delta::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Resets::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Changes::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Deriv::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Round::scalar_udf()));
let _ = session_state.register_udf(Arc::new(AvgOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(MinOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(MaxOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(SumOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(CountOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(LastOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(AbsentOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(PresentOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(StddevOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(StdvarOverTime::scalar_udf()));
// TODO(ruihang): add quantile_over_time, predict_linear, holt_winters, round
}
for func in FUNCTION_REGISTRY.aggregate_functions() {
let name = func.name().to_string();
session_state
.register_udaf(Arc::new(func))
.context(RegisterUdfSnafu { name })?;
}
let _ = session_state.register_udaf(quantile_udaf());
let _ = session_state.register_udf(Arc::new(IDelta::<false>::scalar_udf()));
let _ = session_state.register_udf(Arc::new(IDelta::<true>::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Rate::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Increase::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Delta::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Resets::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Changes::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Deriv::scalar_udf()));
let _ = session_state.register_udf(Arc::new(Round::scalar_udf()));
let _ = session_state.register_udf(Arc::new(AvgOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(MinOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(MaxOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(SumOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(CountOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(LastOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(AbsentOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(PresentOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(StddevOverTime::scalar_udf()));
let _ = session_state.register_udf(Arc::new(StdvarOverTime::scalar_udf()));
// TODO(ruihang): add quantile_over_time, predict_linear, holt_winters, round
let logical_plan = DFLogicalSubstraitConvertor
.decode(message, session_state)
.await

View File

@@ -19,10 +19,11 @@ use std::sync::{Arc, RwLock};
use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_base::Plugins;
use common_function::function_factory::ScalarFunctionFactory;
use common_function::function::FunctionRef;
use common_function::handlers::{
FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
};
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_function::state::FunctionState;
use common_telemetry::warn;
use datafusion::dataframe::DataFrame;
@@ -36,7 +37,7 @@ use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner};
use datafusion_expr::{AggregateUDF, LogicalPlan as DfLogicalPlan};
use datafusion_expr::LogicalPlan as DfLogicalPlan;
use datafusion_optimizer::analyzer::count_wildcard_rule::CountWildcardRule;
use datafusion_optimizer::analyzer::{Analyzer, AnalyzerRule};
use datafusion_optimizer::optimizer::Optimizer;
@@ -69,8 +70,8 @@ pub struct QueryEngineState {
df_context: SessionContext,
catalog_manager: CatalogManagerRef,
function_state: Arc<FunctionState>,
scalar_functions: Arc<RwLock<HashMap<String, ScalarFunctionFactory>>>,
aggr_functions: Arc<RwLock<HashMap<String, AggregateUDF>>>,
udf_functions: Arc<RwLock<HashMap<String, FunctionRef>>>,
aggregate_functions: Arc<RwLock<HashMap<String, AggregateFunctionMetaRef>>>,
extension_rules: Vec<Arc<dyn ExtensionAnalyzerRule + Send + Sync>>,
plugins: Plugins,
}
@@ -185,10 +186,10 @@ impl QueryEngineState {
procedure_service_handler,
flow_service_handler,
}),
aggr_functions: Arc::new(RwLock::new(HashMap::new())),
aggregate_functions: Arc::new(RwLock::new(HashMap::new())),
extension_rules,
plugins,
scalar_functions: Arc::new(RwLock::new(HashMap::new())),
udf_functions: Arc::new(RwLock::new(HashMap::new())),
}
}
@@ -221,28 +222,38 @@ impl QueryEngineState {
self.session_state().optimize(&plan)
}
/// Retrieve the scalar function by name
pub fn scalar_function(&self, function_name: &str) -> Option<ScalarFunctionFactory> {
self.scalar_functions
/// Register an udf function.
/// Will override if the function with same name is already registered.
pub fn register_function(&self, func: FunctionRef) {
let name = func.name().to_string();
let x = self
.udf_functions
.write()
.unwrap()
.insert(name.clone(), func);
if x.is_some() {
warn!("Already registered udf function '{name}'");
}
}
/// Retrieve the udf function by name
pub fn udf_function(&self, function_name: &str) -> Option<FunctionRef> {
self.udf_functions
.read()
.unwrap()
.get(function_name)
.cloned()
}
/// Retrieve scalar function names.
pub fn scalar_names(&self) -> Vec<String> {
self.scalar_functions
.read()
.unwrap()
.keys()
.cloned()
.collect()
/// Retrieve udf function names.
pub fn udf_names(&self) -> Vec<String> {
self.udf_functions.read().unwrap().keys().cloned().collect()
}
/// Retrieve the aggregate function by name
pub fn aggr_function(&self, function_name: &str) -> Option<AggregateUDF> {
self.aggr_functions
pub fn aggregate_function(&self, function_name: &str) -> Option<AggregateFunctionMetaRef> {
self.aggregate_functions
.read()
.unwrap()
.get(function_name)
@@ -250,8 +261,8 @@ impl QueryEngineState {
}
/// Retrieve aggregate function names.
pub fn aggr_names(&self) -> Vec<String> {
self.aggr_functions
pub fn udaf_names(&self) -> Vec<String> {
self.aggregate_functions
.read()
.unwrap()
.keys()
@@ -259,21 +270,6 @@ impl QueryEngineState {
.collect()
}
/// Register an scalar function.
/// Will override if the function with same name is already registered.
pub fn register_scalar_function(&self, func: ScalarFunctionFactory) {
let name = func.name().to_string();
let x = self
.scalar_functions
.write()
.unwrap()
.insert(name.clone(), func);
if x.is_some() {
warn!("Already registered scalar function '{name}'");
}
}
/// Register an aggregate function.
///
/// # Panics
@@ -282,10 +278,10 @@ impl QueryEngineState {
/// Panicking consideration: currently the aggregated functions are all statically registered,
/// user cannot define their own aggregate functions on the fly. So we can panic here. If that
/// invariant is broken in the future, we should return an error instead of panicking.
pub fn register_aggr_function(&self, func: AggregateUDF) {
let name = func.name().to_string();
pub fn register_aggregate_function(&self, func: AggregateFunctionMetaRef) {
let name = func.name();
let x = self
.aggr_functions
.aggregate_functions
.write()
.unwrap()
.insert(name.clone(), func);

View File

@@ -16,12 +16,11 @@ use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;
use common_function::scalars::aggregate::AggregateFunctionMeta;
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{CreateAccumulatorSnafu, Result as QueryResult};
use common_query::logical_plan::accumulator::AggrFuncTypeStore;
use common_query::logical_plan::{
create_aggregate_function, Accumulator, AggregateFunctionCreator,
};
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
use common_query::prelude::*;
use common_recordbatch::{RecordBatch, RecordBatches};
use datatypes::prelude::*;
@@ -208,14 +207,11 @@ where
let engine = new_query_engine_with_table(testing_table);
engine.register_aggregate_function(
create_aggregate_function(
"my_sum".to_string(),
1,
Arc::new(MySumAccumulatorCreator::default()),
)
.into(),
);
engine.register_aggregate_function(Arc::new(AggregateFunctionMeta::new(
"my_sum",
1,
Arc::new(|| Arc::new(MySumAccumulatorCreator::default())),
)));
let sql = format!("select MY_SUM({column_name}) as my_sum from {table_name}");
let batches = exec_selection(engine, &sql).await;

View File

@@ -66,8 +66,6 @@ pub struct GrpcOptions {
pub max_recv_message_size: ReadableSize,
/// Max gRPC sending(encoding) message size
pub max_send_message_size: ReadableSize,
/// Compression mode in Arrow Flight service.
pub flight_compression: FlightCompression,
pub runtime_size: usize,
#[serde(default = "Default::default")]
pub tls: TlsOption,
@@ -116,7 +114,6 @@ impl Default for GrpcOptions {
server_addr: String::new(),
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
flight_compression: FlightCompression::ArrowIpc,
runtime_size: 8,
tls: TlsOption::default(),
}
@@ -135,30 +132,6 @@ impl GrpcOptions {
}
}
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")]
pub enum FlightCompression {
/// Disable all compression in Arrow Flight service.
None,
/// Enable only transport layer compression (zstd).
Transport,
/// Enable only payload compression (lz4)
#[default]
ArrowIpc,
/// Enable all compression.
All,
}
impl FlightCompression {
pub fn transport_compression(&self) -> bool {
self == &FlightCompression::Transport || self == &FlightCompression::All
}
pub fn arrow_compression(&self) -> bool {
self == &FlightCompression::ArrowIpc || self == &FlightCompression::All
}
}
pub struct GrpcServer {
// states
shutdown_tx: Mutex<Option<Sender<()>>>,

View File

@@ -45,7 +45,7 @@ use tonic::{Request, Response, Status, Streaming};
use crate::error::{InvalidParameterSnafu, ParseJsonSnafu, Result, ToJsonSnafu};
pub use crate::grpc::flight::stream::FlightRecordBatchStream;
use crate::grpc::greptime_handler::{get_request_type, GreptimeRequestHandler};
use crate::grpc::{FlightCompression, TonicResult};
use crate::grpc::TonicResult;
use crate::http::header::constants::GREPTIME_DB_HEADER_NAME;
use crate::http::AUTHORIZATION_HEADER;
use crate::{error, hint_headers};
@@ -195,14 +195,9 @@ impl FlightCraft for GreptimeRequestHandler {
protocol = "grpc",
request_type = get_request_type(&request)
);
let flight_compression = self.flight_compression;
async {
let output = self.handle_request(request, hints).await?;
let stream = to_flight_data_stream(
output,
TracingContext::from_current_span(),
flight_compression,
);
let stream = to_flight_data_stream(output, TracingContext::from_current_span());
Ok(Response::new(stream))
}
.trace(span)
@@ -370,16 +365,14 @@ impl Stream for PutRecordBatchRequestStream {
fn to_flight_data_stream(
output: Output,
tracing_context: TracingContext,
flight_compression: FlightCompression,
) -> TonicStream<FlightData> {
match output.data {
OutputData::Stream(stream) => {
let stream = FlightRecordBatchStream::new(stream, tracing_context, flight_compression);
let stream = FlightRecordBatchStream::new(stream, tracing_context);
Box::pin(stream) as _
}
OutputData::RecordBatches(x) => {
let stream =
FlightRecordBatchStream::new(x.as_stream(), tracing_context, flight_compression);
let stream = FlightRecordBatchStream::new(x.as_stream(), tracing_context);
Box::pin(stream) as _
}
OutputData::AffectedRows(rows) => {

View File

@@ -30,7 +30,6 @@ use tokio::task::JoinHandle;
use crate::error;
use crate::grpc::flight::TonicResult;
use crate::grpc::FlightCompression;
#[pin_project(PinnedDrop)]
pub struct FlightRecordBatchStream {
@@ -42,27 +41,18 @@ pub struct FlightRecordBatchStream {
}
impl FlightRecordBatchStream {
pub fn new(
recordbatches: SendableRecordBatchStream,
tracing_context: TracingContext,
compression: FlightCompression,
) -> Self {
pub fn new(recordbatches: SendableRecordBatchStream, tracing_context: TracingContext) -> Self {
let (tx, rx) = mpsc::channel::<TonicResult<FlightMessage>>(1);
let join_handle = common_runtime::spawn_global(async move {
Self::flight_data_stream(recordbatches, tx)
.trace(tracing_context.attach(info_span!("flight_data_stream")))
.await
});
let encoder = if compression.arrow_compression() {
FlightEncoder::default()
} else {
FlightEncoder::with_compression_disabled()
};
Self {
rx,
join_handle,
done: false,
encoder,
encoder: FlightEncoder::with_compression_disabled(),
}
}
@@ -171,11 +161,7 @@ mod test {
let recordbatches = RecordBatches::try_new(schema.clone(), vec![recordbatch.clone()])
.unwrap()
.as_stream();
let mut stream = FlightRecordBatchStream::new(
recordbatches,
TracingContext::default(),
FlightCompression::default(),
);
let mut stream = FlightRecordBatchStream::new(recordbatches, TracingContext::default());
let mut raw_data = Vec::with_capacity(2);
raw_data.push(stream.next().await.unwrap().unwrap());

View File

@@ -49,7 +49,7 @@ use crate::error::{
JoinTaskSnafu, NotFoundAuthHeaderSnafu, Result, UnknownHintSnafu,
};
use crate::grpc::flight::{PutRecordBatchRequest, PutRecordBatchRequestStream};
use crate::grpc::{FlightCompression, TonicResult};
use crate::grpc::TonicResult;
use crate::metrics;
use crate::metrics::{METRIC_AUTH_FAILURE, METRIC_SERVER_GRPC_DB_REQUEST_TIMER};
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
@@ -59,7 +59,6 @@ pub struct GreptimeRequestHandler {
handler: ServerGrpcQueryHandlerRef,
user_provider: Option<UserProviderRef>,
runtime: Option<Runtime>,
pub(crate) flight_compression: FlightCompression,
}
impl GreptimeRequestHandler {
@@ -67,13 +66,11 @@ impl GreptimeRequestHandler {
handler: ServerGrpcQueryHandlerRef,
user_provider: Option<UserProviderRef>,
runtime: Option<Runtime>,
flight_compression: FlightCompression,
) -> Self {
Self {
handler,
user_provider,
runtime,
flight_compression,
}
}

View File

@@ -403,10 +403,7 @@ pub async fn handle_get_trace(
.with_label_values(&[&db, "/api/traces"])
.start_timer();
let output = match handler
.get_trace(query_ctx, &trace_id, query_params.start, query_params.end)
.await
{
let output = match handler.get_trace(query_ctx, &trace_id).await {
Ok(output) => output,
Err(err) => {
return handle_query_error(

View File

@@ -203,13 +203,7 @@ pub trait JaegerQueryHandler {
) -> Result<Output>;
/// Get trace by trace id. It's used for `/api/traces/{trace_id}` API.
async fn get_trace(
&self,
ctx: QueryContextRef,
trace_id: &str,
start_time: Option<i64>,
end_time: Option<i64>,
) -> Result<Output>;
async fn get_trace(&self, ctx: QueryContextRef, trace_id: &str) -> Result<Output>;
/// Find traces by query params. It's used for `/api/traces` API.
async fn find_traces(

View File

@@ -56,7 +56,6 @@ use meta_srv::metasrv::{Metasrv, MetasrvOptions, SelectorRef};
use meta_srv::mocks::MockInfo;
use servers::grpc::flight::FlightCraftWrapper;
use servers::grpc::region_server::RegionServerRequestHandler;
use servers::grpc::GrpcOptions;
use servers::heartbeat_options::HeartbeatOptions;
use servers::server::ServerHandlers;
use tempfile::TempDir;
@@ -191,10 +190,7 @@ impl GreptimeDbClusterBuilder {
max_running_procedures: 128,
},
wal: self.metasrv_wal_config.clone(),
grpc: GrpcOptions {
server_addr: "127.0.0.1:3002".to_string(),
..Default::default()
},
server_addr: "127.0.0.1:3002".to_string(),
..Default::default()
};

View File

@@ -34,7 +34,7 @@ mod test {
use itertools::Itertools;
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::greptime_handler::GreptimeRequestHandler;
use servers::grpc::{FlightCompression, GrpcServerConfig};
use servers::grpc::GrpcServerConfig;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
use servers::server::Server;
@@ -94,7 +94,6 @@ mod test {
)
.ok(),
Some(runtime.clone()),
FlightCompression::default(),
);
let mut grpc_server = GrpcServerBuilder::new(GrpcServerConfig::default(), runtime)
.flight_handler(Arc::new(greptime_request_handler))
@@ -140,7 +139,8 @@ mod test {
let schema = record_batches[0].schema.arrow_schema().clone();
let stream = futures::stream::once(async move {
let mut schema_data = FlightEncoder::default().encode(FlightMessage::Schema(schema));
let mut schema_data =
FlightEncoder::with_compression_disabled().encode(FlightMessage::Schema(schema));
let metadata = DoPutMetadata::new(0);
schema_data.app_metadata = serde_json::to_vec(&metadata).unwrap().into();
// first message in "DoPut" stream should carry table name in flight descriptor
@@ -155,7 +155,7 @@ mod test {
tokio_stream::iter(record_batches)
.enumerate()
.map(|(i, x)| {
let mut encoder = FlightEncoder::default();
let mut encoder = FlightEncoder::with_compression_disabled();
let message = FlightMessage::RecordBatch(x.into_df_record_batch());
let mut data = encoder.encode(message);
let metadata = DoPutMetadata::new((i + 1) as i64);

View File

@@ -42,7 +42,7 @@ use object_store::test_util::TempFolder;
use object_store::ObjectStore;
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::greptime_handler::GreptimeRequestHandler;
use servers::grpc::{FlightCompression, GrpcOptions, GrpcServer, GrpcServerConfig};
use servers::grpc::{GrpcOptions, GrpcServer, GrpcServerConfig};
use servers::http::{HttpOptions, HttpServerBuilder, PromValidationMode};
use servers::metrics_handler::MetricsHandler;
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
@@ -585,7 +585,6 @@ pub async fn setup_grpc_server_with(
ServerGrpcQueryHandlerAdapter::arc(fe_instance_ref.clone()),
user_provider.clone(),
Some(runtime.clone()),
FlightCompression::default(),
);
let flight_handler = Arc::new(greptime_request_handler.clone());

View File

@@ -1025,7 +1025,6 @@ bind_addr = "127.0.0.1:4001"
server_addr = "127.0.0.1:4001"
max_recv_message_size = "512MiB"
max_send_message_size = "512MiB"
flight_compression = "arrow_ipc"
runtime_size = 8
[grpc.tls]
@@ -4543,7 +4542,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
let expected: Value = serde_json::from_str(expected).unwrap();
assert_eq!(resp, expected);
// Test `/api/traces/{trace_id}` API without start and end.
// Test `/api/traces/{trace_id}` API.
let res = client
.get("/v1/jaeger/api/traces/5611dce1bc9ebed65352d99a027b08ea")
.header("x-greptime-trace-table-name", trace_table_name)
@@ -4659,122 +4658,6 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
let expected: Value = serde_json::from_str(expected).unwrap();
assert_eq!(resp, expected);
// Test `/api/traces/{trace_id}` API with start and end in microseconds.
let res = client
.get("/v1/jaeger/api/traces/5611dce1bc9ebed65352d99a027b08ea?start=1738726754492421&end=1738726754642422")
.header("x-greptime-trace-table-name", trace_table_name)
.send()
.await;
assert_eq!(StatusCode::OK, res.status());
let expected = r#"{
"data": [
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spans": [
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spanID": "ffa03416a7b9ea48",
"operationName": "access-redis",
"references": [],
"startTime": 1738726754492422,
"duration": 100000,
"tags": [
{
"key": "net.peer.ip",
"type": "string",
"value": "1.2.3.4"
},
{
"key": "operation.type",
"type": "string",
"value": "access-redis"
},
{
"key": "otel.scope.name",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "otel.scope.version",
"type": "string",
"value": "1.0.0"
},
{
"key": "peer.service",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "span.kind",
"type": "string",
"value": "server"
}
],
"logs": [],
"processID": "p1"
},
{
"traceID": "5611dce1bc9ebed65352d99a027b08ea",
"spanID": "008421dbbd33a3e9",
"operationName": "access-mysql",
"references": [],
"startTime": 1738726754492421,
"duration": 100000,
"tags": [
{
"key": "net.peer.ip",
"type": "string",
"value": "1.2.3.4"
},
{
"key": "operation.type",
"type": "string",
"value": "access-mysql"
},
{
"key": "otel.scope.name",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "otel.scope.version",
"type": "string",
"value": "1.0.0"
},
{
"key": "peer.service",
"type": "string",
"value": "test-jaeger-query-api"
},
{
"key": "span.kind",
"type": "string",
"value": "server"
}
],
"logs": [],
"processID": "p1"
}
],
"processes": {
"p1": {
"serviceName": "test-jaeger-query-api",
"tags": []
}
}
}
],
"total": 0,
"limit": 0,
"offset": 0,
"errors": []
}
"#;
let resp: Value = serde_json::from_str(&res.text().await).unwrap();
let expected: Value = serde_json::from_str(expected).unwrap();
assert_eq!(resp, expected);
// Test `/api/traces` API.
let res = client
.get("/v1/jaeger/api/traces?service=test-jaeger-query-api&operation=access-mysql&start=1738726754492421&end=1738726754642422&tags=%7B%22operation.type%22%3A%22access-mysql%22%7D")