Compare commits

..

1 Commits

Author SHA1 Message Date
discord9
8ca3d72e3f feat(exp): adjust_flow admin function 2025-06-05 13:52:08 +08:00
198 changed files with 8260 additions and 13882 deletions

View File

@@ -64,11 +64,11 @@ inputs:
upload-max-retry-times:
description: Max retry times for uploading artifacts to S3
required: false
default: "30"
default: "20"
upload-retry-timeout:
description: Timeout for uploading artifacts to S3
required: false
default: "120" # minutes
default: "30" # minutes
runs:
using: composite
steps:

View File

@@ -59,7 +59,7 @@ runs:
--set base.podTemplate.main.resources.requests.cpu=50m \
--set base.podTemplate.main.resources.requests.memory=256Mi \
--set base.podTemplate.main.resources.limits.cpu=2000m \
--set base.podTemplate.main.resources.limits.memory=3Gi \
--set base.podTemplate.main.resources.limits.memory=2Gi \
--set frontend.replicas=${{ inputs.frontend-replicas }} \
--set datanode.replicas=${{ inputs.datanode-replicas }} \
--set meta.replicas=${{ inputs.meta-replicas }} \

View File

@@ -250,11 +250,6 @@ jobs:
name: unstable-fuzz-logs
path: /tmp/unstable-greptime/
retention-days: 3
- name: Describe pods
if: failure()
shell: bash
run: |
kubectl describe pod -n my-greptimedb
build-greptime-ci:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
@@ -410,11 +405,6 @@ jobs:
shell: bash
run: |
kubectl describe nodes
- name: Describe pod
if: failure()
shell: bash
run: |
kubectl describe pod -n my-greptimedb
- name: Export kind logs
if: failure()
shell: bash
@@ -564,11 +554,6 @@ jobs:
shell: bash
run: |
kubectl describe nodes
- name: Describe pods
if: failure()
shell: bash
run: |
kubectl describe pod -n my-greptimedb
- name: Export kind logs
if: failure()
shell: bash

View File

@@ -441,8 +441,8 @@ jobs:
aws-region: ${{ vars.EC2_RUNNER_REGION }}
github-token: ${{ secrets.GH_PERSONAL_ACCESS_TOKEN }}
bump-downstream-repo-versions:
name: Bump downstream repo versions
bump-doc-version:
name: Bump doc version
if: ${{ github.event_name == 'push' || github.event_name == 'schedule' }}
needs: [allocate-runners, publish-github-release]
runs-on: ubuntu-latest
@@ -456,16 +456,36 @@ jobs:
fetch-depth: 0
persist-credentials: false
- uses: ./.github/actions/setup-cyborg
- name: Bump downstream repo versions
- name: Bump doc version
working-directory: cyborg
run: pnpm tsx bin/bump-versions.ts
run: pnpm tsx bin/bump-doc-version.ts
env:
VERSION: ${{ needs.allocate-runners.outputs.version }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
DOCS_REPO_TOKEN: ${{ secrets.DOCS_REPO_TOKEN }}
bump-website-version:
name: Bump website version
if: ${{ github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
needs: [allocate-runners, publish-github-release]
runs-on: ubuntu-latest
# Permission reference: https://docs.github.com/en/actions/using-jobs/assigning-permissions-to-jobs
permissions:
issues: write # Allows the action to create issues for cyborg.
contents: write # Allows the action to create a release.
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
persist-credentials: false
- uses: ./.github/actions/setup-cyborg
- name: Bump website version
working-directory: cyborg
run: pnpm tsx bin/bump-website-version.ts
env:
TARGET_REPOS: website,docs,demo
VERSION: ${{ needs.allocate-runners.outputs.version }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
WEBSITE_REPO_TOKEN: ${{ secrets.WEBSITE_REPO_TOKEN }}
DOCS_REPO_TOKEN: ${{ secrets.DOCS_REPO_TOKEN }}
DEMO_REPO_TOKEN: ${{ secrets.DEMO_REPO_TOKEN }}
bump-helm-charts-version:
name: Bump helm charts version

778
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -30,7 +30,6 @@ members = [
"src/common/recordbatch",
"src/common/runtime",
"src/common/session",
"src/common/stat",
"src/common/substrait",
"src/common/telemetry",
"src/common/test-util",
@@ -149,7 +148,6 @@ meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev =
mockall = "0.13"
moka = "0.12"
nalgebra = "0.33"
nix = { version = "0.30.1", default-features = false, features = ["event", "fs", "process"] }
notify = "8.0"
num_cpus = "1.16"
object_store_opendal = "0.50"
@@ -289,7 +287,6 @@ query = { path = "src/query" }
servers = { path = "src/servers" }
session = { path = "src/session" }
sql = { path = "src/sql" }
stat = { path = "src/common/stat" }
store-api = { path = "src/store-api" }
substrait = { path = "src/common/substrait" }
table = { path = "src/table" }

View File

@@ -100,7 +100,7 @@
| `query` | -- | -- | The query engine options. |
| `query.parallelism` | Integer | `0` | Parallelism of the query engine.<br/>Default to 0, which means the number of CPU cores. |
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `./greptimedb_data` | The working home directory. |
| `storage.data_home` | String | `./greptimedb_data/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
@@ -195,13 +195,13 @@
| `slow_query.record_type` | String | Unset | The record type of slow queries. It can be `system_table` or `log`. |
| `slow_query.threshold` | String | Unset | The threshold of slow query. |
| `slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. |
| `export_metrics` | -- | -- | The standalone can export its metrics and send to Prometheus compatible service (e.g. `greptimedb`) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommended to collect metrics generated by itself<br/>You must create the database before enabling it. |
| `export_metrics.self_import.db` | String | Unset | -- |
| `export_metrics.remote_write` | -- | -- | -- |
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
@@ -232,7 +232,6 @@
| `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
| `grpc.server_addr` | String | `127.0.0.1:4001` | The address advertised to the metasrv, and used for connections from outside the host.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `grpc.bind_addr`. |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for frontend side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression. |
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
| `grpc.tls.mode` | String | `disable` | TLS mode. |
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |
@@ -299,11 +298,13 @@
| `slow_query.threshold` | String | `30s` | The threshold of slow query. It can be human readable time string, for example: `10s`, `100ms`, `1s`. |
| `slow_query.sample_ratio` | Float | `1.0` | The sampling ratio of slow query log. The value should be in the range of (0, 1]. For example, `0.1` means 10% of the slow queries will be logged and `1.0` means all slow queries will be logged. |
| `slow_query.ttl` | String | `30d` | The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`. |
| `export_metrics` | -- | -- | The frontend can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
| `export_metrics.self_import.db` | String | Unset | -- |
| `export_metrics.remote_write` | -- | -- | -- |
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
@@ -313,10 +314,12 @@
| Key | Type | Default | Descriptions |
| --- | -----| ------- | ----------- |
| `data_home` | String | `./greptimedb_data` | The working home directory. |
| `data_home` | String | `./greptimedb_data/metasrv/` | The working home directory. |
| `bind_addr` | String | `127.0.0.1:3002` | The bind address of metasrv. |
| `server_addr` | String | `127.0.0.1:3002` | The communication server address for the frontend and datanode to connect to metasrv.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `bind_addr`. |
| `store_addrs` | Array | -- | Store server address default to etcd store.<br/>For postgres store, the format is:<br/>"password=password dbname=postgres user=postgres host=localhost port=5432"<br/>For etcd store, the format is:<br/>"127.0.0.1:2379" |
| `store_key_prefix` | String | `""` | If it's not empty, the metasrv will store all data with this key prefix. |
| `backend` | String | `etcd_store` | The datastore for meta server.<br/>Available values:<br/>- `etcd_store` (default value)<br/>- `memory_store`<br/>- `postgres_store`<br/>- `mysql_store` |
| `backend` | String | `etcd_store` | The datastore for meta server.<br/>Available values:<br/>- `etcd_store` (default value)<br/>- `memory_store`<br/>- `postgres_store` |
| `meta_table_name` | String | `greptime_metakv` | Table name in RDS to store metadata. Effect when using a RDS kvbackend.<br/>**Only used when backend is `postgres_store`.** |
| `meta_election_lock_id` | Integer | `1` | Advisory lock id in PostgreSQL for election. Effect when using PostgreSQL as kvbackend<br/>Only used when backend is `postgres_store`. |
| `selector` | String | `round_robin` | Datanode selector type.<br/>- `round_robin` (default value)<br/>- `lease_based`<br/>- `load_based`<br/>For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". |
@@ -328,12 +331,6 @@
| `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.bind_addr` | String | `127.0.0.1:3002` | The address to bind the gRPC server. |
| `grpc.server_addr` | String | `127.0.0.1:3002` | The communication server address for the frontend and datanode to connect to metasrv.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `bind_addr`. |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
@@ -375,11 +372,13 @@
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `export_metrics` | -- | -- | The metasrv can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
| `export_metrics.self_import.db` | String | Unset | -- |
| `export_metrics.remote_write` | -- | -- | -- |
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
@@ -405,7 +404,6 @@
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for datanode side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression. |
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
| `grpc.tls.mode` | String | `disable` | TLS mode. |
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |
@@ -448,7 +446,7 @@
| `query` | -- | -- | The query engine options. |
| `query.parallelism` | Integer | `0` | Parallelism of the query engine.<br/>Default to 0, which means the number of CPU cores. |
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `./greptimedb_data` | The working home directory. |
| `storage.data_home` | String | `./greptimedb_data/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
@@ -538,11 +536,13 @@
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
| `export_metrics.self_import.db` | String | Unset | -- |
| `export_metrics.remote_write` | -- | -- | -- |
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |

View File

@@ -44,12 +44,6 @@ runtime_size = 8
max_recv_message_size = "512MB"
## The maximum send message size for gRPC server.
max_send_message_size = "512MB"
## Compression mode for datanode side Arrow IPC service. Available options:
## - `none`: disable all compression
## - `transport`: only enable gRPC transport compression (zstd)
## - `arrow_ipc`: only enable Arrow IPC compression (lz4)
## - `all`: enable all compression.
flight_compression = "arrow_ipc"
## gRPC server TLS options, see `mysql.tls` section.
[grpc.tls]
@@ -258,7 +252,7 @@ parallelism = 0
## The data storage options.
[storage]
## The working home directory.
data_home = "./greptimedb_data"
data_home = "./greptimedb_data/"
## The storage type used to store the data.
## - `File`: the data is stored in the local file system.
@@ -641,16 +635,24 @@ max_log_files = 720
[logging.tracing_sample_ratio]
default_ratio = 1.0
## The datanode can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
[export_metrics]
## whether enable export metrics.
enable = false
## The interval of export metrics.
write_interval = "30s"
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
## You must create the database before enabling it.
[export_metrics.self_import]
## @toml2docs:none-default
db = "greptime_metrics"
[export_metrics.remote_write]
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
url = ""
## HTTP headers of Prometheus remote-write carry.

View File

@@ -54,12 +54,6 @@ bind_addr = "127.0.0.1:4001"
server_addr = "127.0.0.1:4001"
## The number of server worker threads.
runtime_size = 8
## Compression mode for frontend side Arrow IPC service. Available options:
## - `none`: disable all compression
## - `transport`: only enable gRPC transport compression (zstd)
## - `arrow_ipc`: only enable Arrow IPC compression (lz4)
## - `all`: enable all compression.
flight_compression = "arrow_ipc"
## gRPC server TLS options, see `mysql.tls` section.
[grpc.tls]
@@ -253,16 +247,24 @@ sample_ratio = 1.0
## The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`.
ttl = "30d"
## The frontend can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
[export_metrics]
## whether enable export metrics.
enable = false
## The interval of export metrics.
write_interval = "30s"
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
## You must create the database before enabling it.
[export_metrics.self_import]
## @toml2docs:none-default
db = "greptime_metrics"
[export_metrics.remote_write]
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
url = ""
## HTTP headers of Prometheus remote-write carry.

View File

@@ -1,5 +1,13 @@
## The working home directory.
data_home = "./greptimedb_data"
data_home = "./greptimedb_data/metasrv/"
## The bind address of metasrv.
bind_addr = "127.0.0.1:3002"
## The communication server address for the frontend and datanode to connect to metasrv.
## If left empty or unset, the server will automatically use the IP address of the first network interface
## on the host, with the same port number as the one specified in `bind_addr`.
server_addr = "127.0.0.1:3002"
## Store server address default to etcd store.
## For postgres store, the format is:
@@ -16,7 +24,6 @@ store_key_prefix = ""
## - `etcd_store` (default value)
## - `memory_store`
## - `postgres_store`
## - `mysql_store`
backend = "etcd_store"
## Table name in RDS to store metadata. Effect when using a RDS kvbackend.
@@ -60,21 +67,6 @@ node_max_idle_time = "24hours"
## The number of threads to execute the runtime for global write operations.
#+ compact_rt_size = 4
## The gRPC server options.
[grpc]
## The address to bind the gRPC server.
bind_addr = "127.0.0.1:3002"
## The communication server address for the frontend and datanode to connect to metasrv.
## If left empty or unset, the server will automatically use the IP address of the first network interface
## on the host, with the same port number as the one specified in `bind_addr`.
server_addr = "127.0.0.1:3002"
## The number of server worker threads.
runtime_size = 8
## The maximum receive message size for gRPC server.
max_recv_message_size = "512MB"
## The maximum send message size for gRPC server.
max_send_message_size = "512MB"
## The HTTP server options.
[http]
## The address to bind the HTTP server.
@@ -237,16 +229,24 @@ max_log_files = 720
[logging.tracing_sample_ratio]
default_ratio = 1.0
## The metasrv can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
[export_metrics]
## whether enable export metrics.
enable = false
## The interval of export metrics.
write_interval = "30s"
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
## You must create the database before enabling it.
[export_metrics.self_import]
## @toml2docs:none-default
db = "greptime_metrics"
[export_metrics.remote_write]
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
url = ""
## HTTP headers of Prometheus remote-write carry.

View File

@@ -350,7 +350,7 @@ parallelism = 0
## The data storage options.
[storage]
## The working home directory.
data_home = "./greptimedb_data"
data_home = "./greptimedb_data/"
## The storage type used to store the data.
## - `File`: the data is stored in the local file system.
@@ -750,11 +750,13 @@ default_ratio = 1.0
## @toml2docs:none-default
#+ sample_ratio = 1.0
## The standalone can export its metrics and send to Prometheus compatible service (e.g. `greptimedb`) from remote-write API.
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
[export_metrics]
## whether enable export metrics.
enable = false
## The interval of export metrics.
write_interval = "30s"
@@ -765,7 +767,7 @@ write_interval = "30s"
db = "greptime_metrics"
[export_metrics.remote_write]
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
url = ""
## HTTP headers of Prometheus remote-write carry.

View File

@@ -0,0 +1,75 @@
/*
* Copyright 2023 Greptime Team
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import * as core from "@actions/core";
import {obtainClient} from "@/common";
async function triggerWorkflow(workflowId: string, version: string) {
const docsClient = obtainClient("DOCS_REPO_TOKEN")
try {
await docsClient.rest.actions.createWorkflowDispatch({
owner: "GreptimeTeam",
repo: "docs",
workflow_id: workflowId,
ref: "main",
inputs: {
version,
},
});
console.log(`Successfully triggered ${workflowId} workflow with version ${version}`);
} catch (error) {
core.setFailed(`Failed to trigger workflow: ${error.message}`);
}
}
function determineWorkflow(version: string): [string, string] {
// Check if it's a nightly version
if (version.includes('nightly')) {
return ['bump-nightly-version.yml', version];
}
const parts = version.split('.');
if (parts.length !== 3) {
throw new Error('Invalid version format');
}
// If patch version (last number) is 0, it's a major version
// Return only major.minor version
if (parts[2] === '0') {
return ['bump-version.yml', `${parts[0]}.${parts[1]}`];
}
// Otherwise it's a patch version, use full version
return ['bump-patch-version.yml', version];
}
const version = process.env.VERSION;
if (!version) {
core.setFailed("VERSION environment variable is required");
process.exit(1);
}
// Remove 'v' prefix if exists
const cleanVersion = version.startsWith('v') ? version.slice(1) : version;
try {
const [workflowId, apiVersion] = determineWorkflow(cleanVersion);
triggerWorkflow(workflowId, apiVersion);
} catch (error) {
core.setFailed(`Error processing version: ${error.message}`);
process.exit(1);
}

View File

@@ -1,156 +0,0 @@
/*
* Copyright 2023 Greptime Team
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import * as core from "@actions/core";
import {obtainClient} from "@/common";
interface RepoConfig {
tokenEnv: string;
repo: string;
workflowLogic: (version: string) => [string, string] | null;
}
const REPO_CONFIGS: Record<string, RepoConfig> = {
website: {
tokenEnv: "WEBSITE_REPO_TOKEN",
repo: "website",
workflowLogic: (version: string) => {
// Skip nightly versions for website
if (version.includes('nightly')) {
console.log('Nightly version detected for website, skipping workflow trigger.');
return null;
}
return ['bump-patch-version.yml', version];
}
},
demo: {
tokenEnv: "DEMO_REPO_TOKEN",
repo: "demo-scene",
workflowLogic: (version: string) => {
// Skip nightly versions for demo
if (version.includes('nightly')) {
console.log('Nightly version detected for demo, skipping workflow trigger.');
return null;
}
return ['bump-patch-version.yml', version];
}
},
docs: {
tokenEnv: "DOCS_REPO_TOKEN",
repo: "docs",
workflowLogic: (version: string) => {
// Check if it's a nightly version
if (version.includes('nightly')) {
return ['bump-nightly-version.yml', version];
}
const parts = version.split('.');
if (parts.length !== 3) {
throw new Error('Invalid version format');
}
// If patch version (last number) is 0, it's a major version
// Return only major.minor version
if (parts[2] === '0') {
return ['bump-version.yml', `${parts[0]}.${parts[1]}`];
}
// Otherwise it's a patch version, use full version
return ['bump-patch-version.yml', version];
}
}
};
async function triggerWorkflow(repoConfig: RepoConfig, workflowId: string, version: string) {
const client = obtainClient(repoConfig.tokenEnv);
try {
await client.rest.actions.createWorkflowDispatch({
owner: "GreptimeTeam",
repo: repoConfig.repo,
workflow_id: workflowId,
ref: "main",
inputs: {
version,
},
});
console.log(`Successfully triggered ${workflowId} workflow for ${repoConfig.repo} with version ${version}`);
} catch (error) {
core.setFailed(`Failed to trigger workflow for ${repoConfig.repo}: ${error.message}`);
throw error;
}
}
async function processRepo(repoName: string, version: string) {
const repoConfig = REPO_CONFIGS[repoName];
if (!repoConfig) {
throw new Error(`Unknown repository: ${repoName}`);
}
try {
const workflowResult = repoConfig.workflowLogic(version);
if (workflowResult === null) {
// Skip this repo (e.g., nightly version for website)
return;
}
const [workflowId, apiVersion] = workflowResult;
await triggerWorkflow(repoConfig, workflowId, apiVersion);
} catch (error) {
core.setFailed(`Error processing ${repoName} with version ${version}: ${error.message}`);
throw error;
}
}
async function main() {
const version = process.env.VERSION;
if (!version) {
core.setFailed("VERSION environment variable is required");
process.exit(1);
}
// Remove 'v' prefix if exists
const cleanVersion = version.startsWith('v') ? version.slice(1) : version;
// Get target repositories from environment variable
// Default to both if not specified
const targetRepos = process.env.TARGET_REPOS?.split(',').map(repo => repo.trim()) || ['website', 'docs'];
console.log(`Processing version ${cleanVersion} for repositories: ${targetRepos.join(', ')}`);
const errors: string[] = [];
// Process each repository
for (const repo of targetRepos) {
try {
await processRepo(repo, cleanVersion);
} catch (error) {
errors.push(`${repo}: ${error.message}`);
}
}
if (errors.length > 0) {
core.setFailed(`Failed to process some repositories: ${errors.join('; ')}`);
process.exit(1);
}
console.log('All repositories processed successfully');
}
// Execute main function
main().catch((error) => {
core.setFailed(`Unexpected error: ${error.message}`);
process.exit(1);
});

View File

@@ -0,0 +1,57 @@
/*
* Copyright 2023 Greptime Team
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import * as core from "@actions/core";
import {obtainClient} from "@/common";
async function triggerWorkflow(workflowId: string, version: string) {
const websiteClient = obtainClient("WEBSITE_REPO_TOKEN")
try {
await websiteClient.rest.actions.createWorkflowDispatch({
owner: "GreptimeTeam",
repo: "website",
workflow_id: workflowId,
ref: "main",
inputs: {
version,
},
});
console.log(`Successfully triggered ${workflowId} workflow with version ${version}`);
} catch (error) {
core.setFailed(`Failed to trigger workflow: ${error.message}`);
}
}
const version = process.env.VERSION;
if (!version) {
core.setFailed("VERSION environment variable is required");
process.exit(1);
}
// Remove 'v' prefix if exists
const cleanVersion = version.startsWith('v') ? version.slice(1) : version;
if (cleanVersion.includes('nightly')) {
console.log('Nightly version detected, skipping workflow trigger.');
process.exit(0);
}
try {
triggerWorkflow('bump-patch-version.yml', cleanVersion);
} catch (error) {
core.setFailed(`Error processing version: ${error.message}`);
process.exit(1);
}

File diff suppressed because it is too large Load Diff

View File

@@ -60,7 +60,7 @@
| Read Stage P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_read_stage_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))` | `timeseries` | Read Stage P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]` |
| Write Stage P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_write_stage_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))` | `timeseries` | Write Stage P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]` |
| Compaction OPS per Instance | `sum by(instance, pod) (rate(greptime_mito_compaction_total_elapsed_count{instance=~"$datanode"}[$__rate_interval]))` | `timeseries` | Compaction OPS per Instance. | `prometheus` | `ops` | `[{{ instance }}]-[{{pod}}]` |
| Compaction Elapsed Time per Instance by Stage | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))`<br/>`sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{instance=~"$datanode"}[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_count{instance=~"$datanode"}[$__rate_interval]))` | `timeseries` | Compaction latency by stage | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-p99` |
| Compaction Elapsed Time per Instance by Stage | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))`<br/>`sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{instance=~"$datanode"}[$__rate_interval]))/sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_count{instance=~"$datanode"}[$__rate_interval]))` | `timeseries` | Compaction latency by stage | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-p99` |
| Compaction P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le,stage) (rate(greptime_mito_compaction_total_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))` | `timeseries` | Compaction P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-compaction` |
| WAL write size | `histogram_quantile(0.95, sum by(le,instance, pod) (rate(raft_engine_write_size_bucket[$__rate_interval])))`<br/>`histogram_quantile(0.99, sum by(le,instance,pod) (rate(raft_engine_write_size_bucket[$__rate_interval])))`<br/>`sum by (instance, pod)(rate(raft_engine_write_size_sum[$__rate_interval]))` | `timeseries` | Write-ahead logs write size as bytes. This chart includes stats of p95 and p99 size by instance, total WAL write rate. | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-req-size-p95` |
| Cached Bytes per Instance | `greptime_mito_cache_bytes{instance=~"$datanode"}` | `timeseries` | Cached Bytes per Instance. | `prometheus` | `decbytes` | `[{{instance}}]-[{{pod}}]-[{{type}}]` |
@@ -69,7 +69,7 @@
| Log Store op duration seconds | `histogram_quantile(0.99, sum by(le,logstore,optype,instance, pod) (rate(greptime_logstore_op_elapsed_bucket[$__rate_interval])))` | `timeseries` | Write-ahead log operations latency at p99 | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{logstore}}]-[{{optype}}]-p99` |
| Inflight Flush | `greptime_mito_inflight_flush_count` | `timeseries` | Ongoing flush task count | `prometheus` | `none` | `[{{instance}}]-[{{pod}}]` |
| Compaction Input/Output Bytes | `sum by(instance, pod) (greptime_mito_compaction_input_bytes)`<br/>`sum by(instance, pod) (greptime_mito_compaction_output_bytes)` | `timeseries` | Compaction oinput output bytes | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-input` |
| Region Worker Handle Bulk Insert Requests | `histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))`<br/>`sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to handle bulk insert region requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
| Region Worker Handle Bulk Insert Requests | `histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to handle bulk insert region requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
| Region Worker Convert Requests | `histogram_quantile(0.95, sum by(le, instance, stage, pod) (rate(greptime_datanode_convert_region_request_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to decode requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
# OpenDAL
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
@@ -88,19 +88,9 @@
# Metasrv
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |
| Region migration datanode | `greptime_meta_region_migration_stat{datanode_type="src"}`<br/>`greptime_meta_region_migration_stat{datanode_type="desc"}` | `status-history` | Counter of region migration by source and destination | `prometheus` | -- | `from-datanode-{{datanode_id}}` |
| Region migration error | `greptime_meta_region_migration_error` | `timeseries` | Counter of region migration error | `prometheus` | `none` | `{{pod}}-{{state}}-{{error_type}}` |
| Datanode load | `greptime_datanode_load` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `binBps` | `Datanode-{{datanode_id}}-writeload` |
| Rate of SQL Executions (RDS) | `rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_count[$__rate_interval])` | `timeseries` | Displays the rate of SQL executions processed by the Meta service using the RDS backend. | `prometheus` | `none` | `{{pod}} {{op}} {{type}} {{result}} ` |
| SQL Execution Latency (RDS) | `histogram_quantile(0.90, sum by(pod, op, type, result, le) (rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_bucket[$__rate_interval])))` | `timeseries` | Measures the response time of SQL executions via the RDS backend. | `prometheus` | `ms` | `{{pod}} {{op}} {{type}} {{result}} p90` |
| Handler Execution Latency | `histogram_quantile(0.90, sum by(pod, le, name) (
rate(greptime_meta_handler_execute_bucket[$__rate_interval])
))` | `timeseries` | Shows latency of Meta handlers by pod and handler name, useful for monitoring handler performance and detecting latency spikes.<br/> | `prometheus` | `s` | `{{pod}} {{name}} p90` |
| Heartbeat Packet Size | `histogram_quantile(0.9, sum by(pod, le) (greptime_meta_heartbeat_stat_memory_size_bucket))` | `timeseries` | Shows p90 heartbeat message sizes, helping track network usage and identify anomalies in heartbeat payload.<br/> | `prometheus` | `bytes` | `{{pod}}` |
| Meta Heartbeat Receive Rate | `rate(greptime_meta_heartbeat_rate[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `{{pod}}` |
| Meta KV Ops Latency | `histogram_quantile(0.99, sum by(pod, le, op, target) (greptime_meta_kv_request_elapsed_bucket))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `{{pod}}-{{op}} p99` |
| Rate of meta KV Ops | `rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `{{pod}}-{{op}} p99` |
| DDL Latency | `histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `CreateLogicalTables-{{step}} p90` |
| Region migration datanode | `greptime_meta_region_migration_stat{datanode_type="src"}`<br/>`greptime_meta_region_migration_stat{datanode_type="desc"}` | `state-timeline` | Counter of region migration by source and destination | `prometheus` | `none` | `from-datanode-{{datanode_id}}` |
| Region migration error | `greptime_meta_region_migration_error` | `timeseries` | Counter of region migration error | `prometheus` | `none` | `__auto` |
| Datanode load | `greptime_datanode_load` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `__auto` |
# Flownode
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |

View File

@@ -497,7 +497,7 @@ groups:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-p99'
- expr: sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{instance=~"$datanode"}[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_count{instance=~"$datanode"}[$__rate_interval]))
- expr: sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{instance=~"$datanode"}[$__rate_interval]))/sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_count{instance=~"$datanode"}[$__rate_interval]))
datasource:
type: prometheus
uid: ${metrics}
@@ -607,7 +607,7 @@ groups:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
- expr: sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))
- expr: sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))
datasource:
type: prometheus
uid: ${metrics}
@@ -741,8 +741,9 @@ groups:
- title: Metasrv
panels:
- title: Region migration datanode
type: status-history
type: state-timeline
description: Counter of region migration by source and destination
unit: none
queries:
- expr: greptime_meta_region_migration_stat{datanode_type="src"}
datasource:
@@ -763,127 +764,17 @@ groups:
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{state}}-{{error_type}}'
legendFormat: __auto
- title: Datanode load
type: timeseries
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
unit: binBps
unit: none
queries:
- expr: greptime_datanode_load
datasource:
type: prometheus
uid: ${metrics}
legendFormat: Datanode-{{datanode_id}}-writeload
- title: Rate of SQL Executions (RDS)
type: timeseries
description: Displays the rate of SQL executions processed by the Meta service using the RDS backend.
unit: none
queries:
- expr: rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_count[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}} {{op}} {{type}} {{result}} '
- title: SQL Execution Latency (RDS)
type: timeseries
description: 'Measures the response time of SQL executions via the RDS backend. '
unit: ms
queries:
- expr: histogram_quantile(0.90, sum by(pod, op, type, result, le) (rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_bucket[$__rate_interval])))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}} {{op}} {{type}} {{result}} p90'
- title: Handler Execution Latency
type: timeseries
description: |
Shows latency of Meta handlers by pod and handler name, useful for monitoring handler performance and detecting latency spikes.
unit: s
queries:
- expr: |-
histogram_quantile(0.90, sum by(pod, le, name) (
rate(greptime_meta_handler_execute_bucket[$__rate_interval])
))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}} {{name}} p90'
- title: Heartbeat Packet Size
type: timeseries
description: |
Shows p90 heartbeat message sizes, helping track network usage and identify anomalies in heartbeat payload.
unit: bytes
queries:
- expr: histogram_quantile(0.9, sum by(pod, le) (greptime_meta_heartbeat_stat_memory_size_bucket))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}'
- title: Meta Heartbeat Receive Rate
type: timeseries
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
unit: s
queries:
- expr: rate(greptime_meta_heartbeat_rate[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}'
- title: Meta KV Ops Latency
type: timeseries
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
unit: s
queries:
- expr: histogram_quantile(0.99, sum by(pod, le, op, target) (greptime_meta_kv_request_elapsed_bucket))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{op}} p99'
- title: Rate of meta KV Ops
type: timeseries
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
unit: none
queries:
- expr: rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{op}} p99'
- title: DDL Latency
type: timeseries
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
unit: s
queries:
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: CreateLogicalTables-{{step}} p90
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: CreateTable-{{step}} p90
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: CreateView-{{step}} p90
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: CreateFlow-{{step}} p90
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: DropTable-{{step}} p90
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: AlterTable-{{step}} p90
legendFormat: __auto
- title: Flownode
panels:
- title: Flow Ingest / Output Rate

File diff suppressed because it is too large Load Diff

View File

@@ -60,7 +60,7 @@
| Read Stage P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_read_stage_elapsed_bucket{}[$__rate_interval])))` | `timeseries` | Read Stage P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]` |
| Write Stage P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_write_stage_elapsed_bucket{}[$__rate_interval])))` | `timeseries` | Write Stage P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]` |
| Compaction OPS per Instance | `sum by(instance, pod) (rate(greptime_mito_compaction_total_elapsed_count{}[$__rate_interval]))` | `timeseries` | Compaction OPS per Instance. | `prometheus` | `ops` | `[{{ instance }}]-[{{pod}}]` |
| Compaction Elapsed Time per Instance by Stage | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{}[$__rate_interval])))`<br/>`sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{}[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_count{}[$__rate_interval]))` | `timeseries` | Compaction latency by stage | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-p99` |
| Compaction Elapsed Time per Instance by Stage | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{}[$__rate_interval])))`<br/>`sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{}[$__rate_interval]))/sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_count{}[$__rate_interval]))` | `timeseries` | Compaction latency by stage | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-p99` |
| Compaction P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le,stage) (rate(greptime_mito_compaction_total_elapsed_bucket{}[$__rate_interval])))` | `timeseries` | Compaction P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-compaction` |
| WAL write size | `histogram_quantile(0.95, sum by(le,instance, pod) (rate(raft_engine_write_size_bucket[$__rate_interval])))`<br/>`histogram_quantile(0.99, sum by(le,instance,pod) (rate(raft_engine_write_size_bucket[$__rate_interval])))`<br/>`sum by (instance, pod)(rate(raft_engine_write_size_sum[$__rate_interval]))` | `timeseries` | Write-ahead logs write size as bytes. This chart includes stats of p95 and p99 size by instance, total WAL write rate. | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-req-size-p95` |
| Cached Bytes per Instance | `greptime_mito_cache_bytes{}` | `timeseries` | Cached Bytes per Instance. | `prometheus` | `decbytes` | `[{{instance}}]-[{{pod}}]-[{{type}}]` |
@@ -69,7 +69,7 @@
| Log Store op duration seconds | `histogram_quantile(0.99, sum by(le,logstore,optype,instance, pod) (rate(greptime_logstore_op_elapsed_bucket[$__rate_interval])))` | `timeseries` | Write-ahead log operations latency at p99 | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{logstore}}]-[{{optype}}]-p99` |
| Inflight Flush | `greptime_mito_inflight_flush_count` | `timeseries` | Ongoing flush task count | `prometheus` | `none` | `[{{instance}}]-[{{pod}}]` |
| Compaction Input/Output Bytes | `sum by(instance, pod) (greptime_mito_compaction_input_bytes)`<br/>`sum by(instance, pod) (greptime_mito_compaction_output_bytes)` | `timeseries` | Compaction oinput output bytes | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-input` |
| Region Worker Handle Bulk Insert Requests | `histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))`<br/>`sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to handle bulk insert region requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
| Region Worker Handle Bulk Insert Requests | `histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to handle bulk insert region requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
| Region Worker Convert Requests | `histogram_quantile(0.95, sum by(le, instance, stage, pod) (rate(greptime_datanode_convert_region_request_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to decode requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
# OpenDAL
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
@@ -88,19 +88,9 @@
# Metasrv
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |
| Region migration datanode | `greptime_meta_region_migration_stat{datanode_type="src"}`<br/>`greptime_meta_region_migration_stat{datanode_type="desc"}` | `status-history` | Counter of region migration by source and destination | `prometheus` | -- | `from-datanode-{{datanode_id}}` |
| Region migration error | `greptime_meta_region_migration_error` | `timeseries` | Counter of region migration error | `prometheus` | `none` | `{{pod}}-{{state}}-{{error_type}}` |
| Datanode load | `greptime_datanode_load` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `binBps` | `Datanode-{{datanode_id}}-writeload` |
| Rate of SQL Executions (RDS) | `rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_count[$__rate_interval])` | `timeseries` | Displays the rate of SQL executions processed by the Meta service using the RDS backend. | `prometheus` | `none` | `{{pod}} {{op}} {{type}} {{result}} ` |
| SQL Execution Latency (RDS) | `histogram_quantile(0.90, sum by(pod, op, type, result, le) (rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_bucket[$__rate_interval])))` | `timeseries` | Measures the response time of SQL executions via the RDS backend. | `prometheus` | `ms` | `{{pod}} {{op}} {{type}} {{result}} p90` |
| Handler Execution Latency | `histogram_quantile(0.90, sum by(pod, le, name) (
rate(greptime_meta_handler_execute_bucket[$__rate_interval])
))` | `timeseries` | Shows latency of Meta handlers by pod and handler name, useful for monitoring handler performance and detecting latency spikes.<br/> | `prometheus` | `s` | `{{pod}} {{name}} p90` |
| Heartbeat Packet Size | `histogram_quantile(0.9, sum by(pod, le) (greptime_meta_heartbeat_stat_memory_size_bucket))` | `timeseries` | Shows p90 heartbeat message sizes, helping track network usage and identify anomalies in heartbeat payload.<br/> | `prometheus` | `bytes` | `{{pod}}` |
| Meta Heartbeat Receive Rate | `rate(greptime_meta_heartbeat_rate[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `{{pod}}` |
| Meta KV Ops Latency | `histogram_quantile(0.99, sum by(pod, le, op, target) (greptime_meta_kv_request_elapsed_bucket))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `{{pod}}-{{op}} p99` |
| Rate of meta KV Ops | `rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `{{pod}}-{{op}} p99` |
| DDL Latency | `histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `CreateLogicalTables-{{step}} p90` |
| Region migration datanode | `greptime_meta_region_migration_stat{datanode_type="src"}`<br/>`greptime_meta_region_migration_stat{datanode_type="desc"}` | `state-timeline` | Counter of region migration by source and destination | `prometheus` | `none` | `from-datanode-{{datanode_id}}` |
| Region migration error | `greptime_meta_region_migration_error` | `timeseries` | Counter of region migration error | `prometheus` | `none` | `__auto` |
| Datanode load | `greptime_datanode_load` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `__auto` |
# Flownode
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |

View File

@@ -497,7 +497,7 @@ groups:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-p99'
- expr: sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{}[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_count{}[$__rate_interval]))
- expr: sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{}[$__rate_interval]))/sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_count{}[$__rate_interval]))
datasource:
type: prometheus
uid: ${metrics}
@@ -607,7 +607,7 @@ groups:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
- expr: sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))
- expr: sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))
datasource:
type: prometheus
uid: ${metrics}
@@ -741,8 +741,9 @@ groups:
- title: Metasrv
panels:
- title: Region migration datanode
type: status-history
type: state-timeline
description: Counter of region migration by source and destination
unit: none
queries:
- expr: greptime_meta_region_migration_stat{datanode_type="src"}
datasource:
@@ -763,127 +764,17 @@ groups:
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{state}}-{{error_type}}'
legendFormat: __auto
- title: Datanode load
type: timeseries
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
unit: binBps
unit: none
queries:
- expr: greptime_datanode_load
datasource:
type: prometheus
uid: ${metrics}
legendFormat: Datanode-{{datanode_id}}-writeload
- title: Rate of SQL Executions (RDS)
type: timeseries
description: Displays the rate of SQL executions processed by the Meta service using the RDS backend.
unit: none
queries:
- expr: rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_count[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}} {{op}} {{type}} {{result}} '
- title: SQL Execution Latency (RDS)
type: timeseries
description: 'Measures the response time of SQL executions via the RDS backend. '
unit: ms
queries:
- expr: histogram_quantile(0.90, sum by(pod, op, type, result, le) (rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_bucket[$__rate_interval])))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}} {{op}} {{type}} {{result}} p90'
- title: Handler Execution Latency
type: timeseries
description: |
Shows latency of Meta handlers by pod and handler name, useful for monitoring handler performance and detecting latency spikes.
unit: s
queries:
- expr: |-
histogram_quantile(0.90, sum by(pod, le, name) (
rate(greptime_meta_handler_execute_bucket[$__rate_interval])
))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}} {{name}} p90'
- title: Heartbeat Packet Size
type: timeseries
description: |
Shows p90 heartbeat message sizes, helping track network usage and identify anomalies in heartbeat payload.
unit: bytes
queries:
- expr: histogram_quantile(0.9, sum by(pod, le) (greptime_meta_heartbeat_stat_memory_size_bucket))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}'
- title: Meta Heartbeat Receive Rate
type: timeseries
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
unit: s
queries:
- expr: rate(greptime_meta_heartbeat_rate[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}'
- title: Meta KV Ops Latency
type: timeseries
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
unit: s
queries:
- expr: histogram_quantile(0.99, sum by(pod, le, op, target) (greptime_meta_kv_request_elapsed_bucket))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{op}} p99'
- title: Rate of meta KV Ops
type: timeseries
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
unit: none
queries:
- expr: rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{op}} p99'
- title: DDL Latency
type: timeseries
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
unit: s
queries:
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: CreateLogicalTables-{{step}} p90
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: CreateTable-{{step}} p90
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: CreateView-{{step}} p90
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: CreateFlow-{{step}} p90
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: DropTable-{{step}} p90
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: AlterTable-{{step}} p90
legendFormat: __auto
- title: Flownode
panels:
- title: Flow Ingest / Output Rate

View File

@@ -19,11 +19,9 @@ use std::time::Duration;
use async_trait::async_trait;
use clap::{Parser, ValueEnum};
use common_base::secrets::{ExposeSecret, SecretString};
use common_error::ext::BoxedError;
use common_telemetry::{debug, error, info};
use object_store::layers::LoggingLayer;
use object_store::services::Oss;
use object_store::{services, ObjectStore};
use serde_json::Value;
use snafu::{OptionExt, ResultExt};
@@ -112,15 +110,15 @@ pub struct ExportCommand {
#[clap(long)]
s3: bool,
/// if both `ddl_local_dir` and remote storage (s3/oss) are set, `ddl_local_dir` will be only used for
/// exported SQL files, and the data will be exported to remote storage.
/// if both `s3_ddl_local_dir` and `s3` are set, `s3_ddl_local_dir` will be only used for
/// exported SQL files, and the data will be exported to s3.
///
/// Note that `ddl_local_dir` export sql files to **LOCAL** file system, this is useful if export client don't have
/// direct access to remote storage.
/// Note that `s3_ddl_local_dir` export sql files to **LOCAL** file system, this is useful if export client don't have
/// direct access to s3.
///
/// if remote storage is set but `ddl_local_dir` is not set, both SQL&data will be exported to remote storage.
/// if `s3` is set but `s3_ddl_local_dir` is not set, both SQL&data will be exported to s3.
#[clap(long)]
ddl_local_dir: Option<String>,
s3_ddl_local_dir: Option<String>,
/// The s3 bucket name
/// if s3 is set, this is required
@@ -151,30 +149,6 @@ pub struct ExportCommand {
/// if s3 is set, this is required
#[clap(long)]
s3_region: Option<String>,
/// if export data to oss
#[clap(long)]
oss: bool,
/// The oss bucket name
/// if oss is set, this is required
#[clap(long)]
oss_bucket: Option<String>,
/// The oss endpoint
/// if oss is set, this is required
#[clap(long)]
oss_endpoint: Option<String>,
/// The oss access key id
/// if oss is set, this is required
#[clap(long)]
oss_access_key_id: Option<String>,
/// The oss access key secret
/// if oss is set, this is required
#[clap(long)]
oss_access_key_secret: Option<String>,
}
impl ExportCommand {
@@ -188,7 +162,7 @@ impl ExportCommand {
{
return Err(BoxedError::new(S3ConfigNotSetSnafu {}.build()));
}
if !self.s3 && !self.oss && self.output_dir.is_none() {
if !self.s3 && self.output_dir.is_none() {
return Err(BoxedError::new(OutputDirNotSetSnafu {}.build()));
}
let (catalog, schema) =
@@ -213,32 +187,13 @@ impl ExportCommand {
start_time: self.start_time.clone(),
end_time: self.end_time.clone(),
s3: self.s3,
ddl_local_dir: self.ddl_local_dir.clone(),
s3_ddl_local_dir: self.s3_ddl_local_dir.clone(),
s3_bucket: self.s3_bucket.clone(),
s3_root: self.s3_root.clone(),
s3_endpoint: self.s3_endpoint.clone(),
// Wrap sensitive values in SecretString
s3_access_key: self
.s3_access_key
.as_ref()
.map(|k| SecretString::from(k.clone())),
s3_secret_key: self
.s3_secret_key
.as_ref()
.map(|k| SecretString::from(k.clone())),
s3_access_key: self.s3_access_key.clone(),
s3_secret_key: self.s3_secret_key.clone(),
s3_region: self.s3_region.clone(),
oss: self.oss,
oss_bucket: self.oss_bucket.clone(),
oss_endpoint: self.oss_endpoint.clone(),
// Wrap sensitive values in SecretString
oss_access_key_id: self
.oss_access_key_id
.as_ref()
.map(|k| SecretString::from(k.clone())),
oss_access_key_secret: self
.oss_access_key_secret
.as_ref()
.map(|k| SecretString::from(k.clone())),
}))
}
}
@@ -254,30 +209,23 @@ pub struct Export {
start_time: Option<String>,
end_time: Option<String>,
s3: bool,
ddl_local_dir: Option<String>,
s3_ddl_local_dir: Option<String>,
s3_bucket: Option<String>,
s3_root: Option<String>,
s3_endpoint: Option<String>,
// Changed to SecretString for sensitive data
s3_access_key: Option<SecretString>,
s3_secret_key: Option<SecretString>,
s3_access_key: Option<String>,
s3_secret_key: Option<String>,
s3_region: Option<String>,
oss: bool,
oss_bucket: Option<String>,
oss_endpoint: Option<String>,
// Changed to SecretString for sensitive data
oss_access_key_id: Option<SecretString>,
oss_access_key_secret: Option<SecretString>,
}
impl Export {
fn catalog_path(&self) -> PathBuf {
if self.s3 || self.oss {
if self.s3 {
PathBuf::from(&self.catalog)
} else if let Some(dir) = &self.output_dir {
PathBuf::from(dir).join(&self.catalog)
} else {
unreachable!("catalog_path: output_dir must be set when not using remote storage")
unreachable!("catalog_path: output_dir must be set when not using s3")
}
}
@@ -479,7 +427,7 @@ impl Export {
.await?;
// Create directory if needed for file system storage
if !export_self.s3 && !export_self.oss {
if !export_self.s3 {
let db_dir = format!("{}/{}/", export_self.catalog, schema);
operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
}
@@ -525,8 +473,6 @@ impl Export {
async fn build_operator(&self) -> Result<ObjectStore> {
if self.s3 {
self.build_s3_operator().await
} else if self.oss {
self.build_oss_operator().await
} else {
self.build_fs_operator().await
}
@@ -534,8 +480,9 @@ impl Export {
/// build operator with preference for file system
async fn build_prefer_fs_operator(&self) -> Result<ObjectStore> {
if (self.s3 || self.oss) && self.ddl_local_dir.is_some() {
let root = self.ddl_local_dir.as_ref().unwrap().clone();
// is under s3 mode and s3_ddl_dir is set, use it as root
if self.s3 && self.s3_ddl_local_dir.is_some() {
let root = self.s3_ddl_local_dir.as_ref().unwrap().clone();
let op = ObjectStore::new(services::Fs::default().root(&root))
.context(OpenDalSnafu)?
.layer(LoggingLayer::default())
@@ -543,8 +490,6 @@ impl Export {
Ok(op)
} else if self.s3 {
self.build_s3_operator().await
} else if self.oss {
self.build_oss_operator().await
} else {
self.build_fs_operator().await
}
@@ -570,35 +515,11 @@ impl Export {
}
if let Some(key_id) = self.s3_access_key.as_ref() {
builder = builder.access_key_id(key_id.expose_secret());
builder = builder.access_key_id(key_id);
}
if let Some(secret_key) = self.s3_secret_key.as_ref() {
builder = builder.secret_access_key(secret_key.expose_secret());
}
let op = ObjectStore::new(builder)
.context(OpenDalSnafu)?
.layer(LoggingLayer::default())
.finish();
Ok(op)
}
async fn build_oss_operator(&self) -> Result<ObjectStore> {
let mut builder = Oss::default()
.bucket(self.oss_bucket.as_ref().expect("oss_bucket must be set"))
.endpoint(
self.oss_endpoint
.as_ref()
.expect("oss_endpoint must be set"),
);
// Use expose_secret() to access the actual secret value
if let Some(key_id) = self.oss_access_key_id.as_ref() {
builder = builder.access_key_id(key_id.expose_secret());
}
if let Some(secret_key) = self.oss_access_key_secret.as_ref() {
builder = builder.access_key_secret(secret_key.expose_secret());
builder = builder.secret_access_key(secret_key);
}
let op = ObjectStore::new(builder)
@@ -641,8 +562,8 @@ impl Export {
tasks.push(async move {
let _permit = semaphore_moved.acquire().await.unwrap();
// Create directory if not using remote storage
if !export_self.s3 && !export_self.oss {
// Create directory if not using S3
if !export_self.s3 {
let db_dir = format!("{}/{}/", export_self.catalog, schema);
operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
}
@@ -654,11 +575,7 @@ impl Export {
r#"COPY DATABASE "{}"."{}" TO '{}' WITH ({}){};"#,
export_self.catalog, schema, path, with_options_clone, connection_part
);
// Log SQL command but mask sensitive information
let safe_sql = export_self.mask_sensitive_sql(&sql);
info!("Executing sql: {}", safe_sql);
info!("Executing sql: {sql}");
export_self.database_client.sql_in_public(&sql).await?;
info!(
"Finished exporting {}.{} data to {}",
@@ -698,29 +615,6 @@ impl Export {
Ok(())
}
/// Mask sensitive information in SQL commands for safe logging
fn mask_sensitive_sql(&self, sql: &str) -> String {
let mut masked_sql = sql.to_string();
// Mask S3 credentials
if let Some(access_key) = &self.s3_access_key {
masked_sql = masked_sql.replace(access_key.expose_secret(), "[REDACTED]");
}
if let Some(secret_key) = &self.s3_secret_key {
masked_sql = masked_sql.replace(secret_key.expose_secret(), "[REDACTED]");
}
// Mask OSS credentials
if let Some(access_key_id) = &self.oss_access_key_id {
masked_sql = masked_sql.replace(access_key_id.expose_secret(), "[REDACTED]");
}
if let Some(access_key_secret) = &self.oss_access_key_secret {
masked_sql = masked_sql.replace(access_key_secret.expose_secret(), "[REDACTED]");
}
masked_sql
}
fn get_file_path(&self, schema: &str, file_name: &str) -> String {
format!("{}/{}/{}", self.catalog, schema, file_name)
}
@@ -737,13 +631,6 @@ impl Export {
},
file_path
)
} else if self.oss {
format!(
"oss://{}/{}/{}",
self.oss_bucket.as_ref().unwrap_or(&String::new()),
self.catalog,
file_path
)
} else {
format!(
"{}/{}",
@@ -788,36 +675,15 @@ impl Export {
};
// Safety: All s3 options are required
// Use expose_secret() to access the actual secret values
let connection_options = format!(
"ACCESS_KEY_ID='{}', SECRET_ACCESS_KEY='{}', REGION='{}'{}",
self.s3_access_key.as_ref().unwrap().expose_secret(),
self.s3_secret_key.as_ref().unwrap().expose_secret(),
self.s3_access_key.as_ref().unwrap(),
self.s3_secret_key.as_ref().unwrap(),
self.s3_region.as_ref().unwrap(),
endpoint_option
);
(s3_path, format!(" CONNECTION ({})", connection_options))
} else if self.oss {
let oss_path = format!(
"oss://{}/{}/{}/",
self.oss_bucket.as_ref().unwrap(),
self.catalog,
schema
);
let endpoint_option = if let Some(endpoint) = self.oss_endpoint.as_ref() {
format!(", ENDPOINT='{}'", endpoint)
} else {
String::new()
};
let connection_options = format!(
"ACCESS_KEY_ID='{}', ACCESS_KEY_SECRET='{}'{}",
self.oss_access_key_id.as_ref().unwrap().expose_secret(),
self.oss_access_key_secret.as_ref().unwrap().expose_secret(),
endpoint_option
);
(oss_path, format!(" CONNECTION ({})", connection_options))
} else {
(
self.catalog_path()

View File

@@ -167,7 +167,9 @@ impl Client {
let client = FlightServiceClient::new(channel)
.max_decoding_message_size(self.max_grpc_recv_message_size())
.max_encoding_message_size(self.max_grpc_send_message_size());
.max_encoding_message_size(self.max_grpc_send_message_size())
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Zstd);
Ok(FlightClient { addr, client })
}
@@ -176,7 +178,9 @@ impl Client {
let (addr, channel) = self.find_channel()?;
let client = PbRegionClient::new(channel)
.max_decoding_message_size(self.max_grpc_recv_message_size())
.max_encoding_message_size(self.max_grpc_send_message_size());
.max_encoding_message_size(self.max_grpc_send_message_size())
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Zstd);
Ok((addr, client))
}

View File

@@ -80,7 +80,6 @@ servers.workspace = true
session.workspace = true
similar-asserts.workspace = true
snafu.workspace = true
stat.workspace = true
store-api.workspace = true
substrait.workspace = true
table.workspace = true

View File

@@ -14,13 +14,12 @@
pub mod builder;
use std::path::Path;
use std::time::Duration;
use async_trait::async_trait;
use clap::Parser;
use common_config::Configurable;
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
use common_telemetry::logging::TracingOptions;
use common_telemetry::{info, warn};
use common_wal::config::DatanodeWalConfig;
use datanode::datanode::Datanode;
@@ -249,14 +248,6 @@ impl StartCommand {
raft_engine_config.dir.replace(wal_dir.clone());
}
// If the logging dir is not set, use the default logs dir in the data home.
if opts.logging.dir.is_empty() {
opts.logging.dir = Path::new(&opts.storage.data_home)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string();
}
if let Some(http_addr) = &self.http_addr {
opts.http.addr.clone_from(http_addr);
}

View File

@@ -28,7 +28,7 @@ use tracing_appender::non_blocking::WorkerGuard;
use crate::datanode::{DatanodeOptions, Instance, APP_NAME};
use crate::error::{MetaClientInitSnafu, MissingConfigSnafu, Result, StartDatanodeSnafu};
use crate::{create_resource_limit_metrics, log_versions};
use crate::log_versions;
/// Builder for Datanode instance.
pub struct InstanceBuilder {
@@ -68,7 +68,6 @@ impl InstanceBuilder {
);
log_versions(version(), short_version(), APP_NAME);
create_resource_limit_metrics(APP_NAME);
plugins::setup_datanode_plugins(plugins, &opts.plugins, dn_opts)
.await

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
@@ -22,7 +21,7 @@ use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKv
use clap::Parser;
use client::client_manager::NodeClients;
use common_base::Plugins;
use common_config::{Configurable, DEFAULT_DATA_HOME};
use common_config::Configurable;
use common_grpc::channel_manager::ChannelConfig;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
@@ -31,7 +30,7 @@ use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::TableMetadataManager;
use common_telemetry::info;
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
use common_telemetry::logging::TracingOptions;
use common_version::{short_version, version};
use flow::{
get_flow_auth_options, FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder,
@@ -46,7 +45,7 @@ use crate::error::{
MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{create_resource_limit_metrics, log_versions, App};
use crate::{log_versions, App};
pub const APP_NAME: &str = "greptime-flownode";
@@ -187,14 +186,6 @@ impl StartCommand {
opts.logging.dir.clone_from(dir);
}
// If the logging dir is not set, use the default logs dir in the data home.
if opts.logging.dir.is_empty() {
opts.logging.dir = Path::new(DEFAULT_DATA_HOME)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string();
}
if global_options.log_level.is_some() {
opts.logging.level.clone_from(&global_options.log_level);
}
@@ -255,9 +246,7 @@ impl StartCommand {
opts.component.node_id.map(|x| x.to_string()),
None,
);
log_versions(version(), short_version(), APP_NAME);
create_resource_limit_metrics(APP_NAME);
info!("Flownode start command: {:#?}", self);
info!("Flownode options: {:#?}", opts);

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
@@ -23,14 +22,14 @@ use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKv
use clap::Parser;
use client::client_manager::NodeClients;
use common_base::Plugins;
use common_config::{Configurable, DEFAULT_DATA_HOME};
use common_config::Configurable;
use common_grpc::channel_manager::ChannelConfig;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_telemetry::info;
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
use common_telemetry::logging::TracingOptions;
use common_time::timezone::set_default_timezone;
use common_version::{short_version, version};
use frontend::frontend::Frontend;
@@ -45,7 +44,7 @@ use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{self, Result};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{create_resource_limit_metrics, log_versions, App};
use crate::{log_versions, App};
type FrontendOptions = GreptimeOptions<frontend::frontend::FrontendOptions>;
@@ -195,14 +194,6 @@ impl StartCommand {
opts.logging.dir.clone_from(dir);
}
// If the logging dir is not set, use the default logs dir in the data home.
if opts.logging.dir.is_empty() {
opts.logging.dir = Path::new(DEFAULT_DATA_HOME)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string();
}
if global_options.log_level.is_some() {
opts.logging.level.clone_from(&global_options.log_level);
}
@@ -279,9 +270,7 @@ impl StartCommand {
opts.component.node_id.clone(),
opts.component.slow_query.as_ref(),
);
log_versions(version(), short_version(), APP_NAME);
create_resource_limit_metrics(APP_NAME);
info!("Frontend start command: {:#?}", self);
info!("Frontend options: {:#?}", opts);

View File

@@ -16,7 +16,6 @@
use async_trait::async_trait;
use common_telemetry::{error, info};
use stat::{get_cpu_limit, get_memory_limit};
use crate::error::Result;
@@ -32,12 +31,6 @@ pub mod standalone;
lazy_static::lazy_static! {
static ref APP_VERSION: prometheus::IntGaugeVec =
prometheus::register_int_gauge_vec!("greptime_app_version", "app version", &["version", "short_version", "app"]).unwrap();
static ref CPU_LIMIT: prometheus::IntGaugeVec =
prometheus::register_int_gauge_vec!("greptime_cpu_limit_in_millicores", "cpu limit in millicores", &["app"]).unwrap();
static ref MEMORY_LIMIT: prometheus::IntGaugeVec =
prometheus::register_int_gauge_vec!("greptime_memory_limit_in_bytes", "memory limit in bytes", &["app"]).unwrap();
}
/// wait for the close signal, for unix platform it's SIGINT or SIGTERM
@@ -121,24 +114,6 @@ pub fn log_versions(version: &str, short_version: &str, app: &str) {
log_env_flags();
}
pub fn create_resource_limit_metrics(app: &str) {
if let Some(cpu_limit) = get_cpu_limit() {
info!(
"GreptimeDB start with cpu limit in millicores: {}",
cpu_limit
);
CPU_LIMIT.with_label_values(&[app]).set(cpu_limit);
}
if let Some(memory_limit) = get_memory_limit() {
info!(
"GreptimeDB start with memory limit in bytes: {}",
memory_limit
);
MEMORY_LIMIT.with_label_values(&[app]).set(memory_limit);
}
}
fn log_env_flags() {
info!("command line arguments");
for argument in std::env::args() {

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use std::fmt;
use std::path::Path;
use std::time::Duration;
use async_trait::async_trait;
@@ -21,7 +20,7 @@ use clap::Parser;
use common_base::Plugins;
use common_config::Configurable;
use common_telemetry::info;
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
use common_telemetry::logging::TracingOptions;
use common_version::{short_version, version};
use meta_srv::bootstrap::MetasrvInstance;
use meta_srv::metasrv::BackendImpl;
@@ -30,7 +29,7 @@ use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{self, LoadLayeredConfigSnafu, Result, StartMetaServerSnafu};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{create_resource_limit_metrics, log_versions, App};
use crate::{log_versions, App};
type MetasrvOptions = GreptimeOptions<meta_srv::metasrv::MetasrvOptions>;
@@ -237,20 +236,12 @@ impl StartCommand {
tokio_console_addr: global_options.tokio_console_addr.clone(),
};
#[allow(deprecated)]
if let Some(addr) = &self.rpc_bind_addr {
opts.bind_addr.clone_from(addr);
opts.grpc.bind_addr.clone_from(addr);
} else if !opts.bind_addr.is_empty() {
opts.grpc.bind_addr.clone_from(&opts.bind_addr);
}
#[allow(deprecated)]
if let Some(addr) = &self.rpc_server_addr {
opts.server_addr.clone_from(addr);
opts.grpc.server_addr.clone_from(addr);
} else if !opts.server_addr.is_empty() {
opts.grpc.server_addr.clone_from(&opts.server_addr);
}
if let Some(addrs) = &self.store_addrs {
@@ -283,14 +274,6 @@ impl StartCommand {
opts.data_home.clone_from(data_home);
}
// If the logging dir is not set, use the default logs dir in the data home.
if opts.logging.dir.is_empty() {
opts.logging.dir = Path::new(&opts.data_home)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string();
}
if !self.store_key_prefix.is_empty() {
opts.store_key_prefix.clone_from(&self.store_key_prefix)
}
@@ -319,15 +302,13 @@ impl StartCommand {
None,
None,
);
log_versions(version(), short_version(), APP_NAME);
create_resource_limit_metrics(APP_NAME);
info!("Metasrv start command: {:#?}", self);
let plugin_opts = opts.plugins;
let mut opts = opts.component;
opts.grpc.detect_server_addr();
opts.detect_server_addr();
info!("Metasrv options: {:#?}", opts);
@@ -371,7 +352,7 @@ mod tests {
};
let options = cmd.load_options(&Default::default()).unwrap().component;
assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr);
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
assert_eq!(vec!["127.0.0.1:2380".to_string()], options.store_addrs);
assert_eq!(SelectorType::LoadBased, options.selector);
}
@@ -404,8 +385,8 @@ mod tests {
};
let options = cmd.load_options(&Default::default()).unwrap().component;
assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr);
assert_eq!("127.0.0.1:3002".to_string(), options.grpc.server_addr);
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
assert_eq!("127.0.0.1:3002".to_string(), options.server_addr);
assert_eq!(vec!["127.0.0.1:2379".to_string()], options.store_addrs);
assert_eq!(SelectorType::LeaseBased, options.selector);
assert_eq!("debug", options.logging.level.as_ref().unwrap());
@@ -517,10 +498,10 @@ mod tests {
let opts = command.load_options(&Default::default()).unwrap().component;
// Should be read from env, env > default values.
assert_eq!(opts.grpc.bind_addr, "127.0.0.1:14002");
assert_eq!(opts.bind_addr, "127.0.0.1:14002");
// Should be read from config file, config file > env > default values.
assert_eq!(opts.grpc.server_addr, "127.0.0.1:3002");
assert_eq!(opts.server_addr, "127.0.0.1:3002");
// Should be read from cli, cli > config file > env > default values.
assert_eq!(opts.http.addr, "127.0.0.1:14000");

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;
use std::{fs, path};
@@ -50,9 +49,7 @@ use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::{build_wal_options_allocator, WalOptionsAllocatorRef};
use common_procedure::{ProcedureInfo, ProcedureManagerRef};
use common_telemetry::info;
use common_telemetry::logging::{
LoggingOptions, SlowQueryOptions, TracingOptions, DEFAULT_LOGGING_DIR,
};
use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions};
use common_time::timezone::set_default_timezone;
use common_version::{short_version, version};
use common_wal::config::DatanodeWalConfig;
@@ -86,7 +83,7 @@ use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{Result, StartFlownodeSnafu};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{create_resource_limit_metrics, error, log_versions, App};
use crate::{error, log_versions, App};
pub const APP_NAME: &str = "greptime-standalone";
@@ -410,14 +407,6 @@ impl StartCommand {
opts.storage.data_home.clone_from(data_home);
}
// If the logging dir is not set, use the default logs dir in the data home.
if opts.logging.dir.is_empty() {
opts.logging.dir = Path::new(&opts.storage.data_home)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string();
}
if let Some(addr) = &self.rpc_bind_addr {
// frontend grpc addr conflict with datanode default grpc addr
let datanode_grpc_addr = DatanodeOptions::default().grpc.bind_addr;
@@ -468,9 +457,7 @@ impl StartCommand {
None,
opts.component.slow_query.as_ref(),
);
log_versions(version(), short_version(), APP_NAME);
create_resource_limit_metrics(APP_NAME);
info!("Standalone start command: {:#?}", self);
info!("Standalone options: {opts:#?}");

View File

@@ -12,14 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::Path;
use std::time::Duration;
use cmd::options::GreptimeOptions;
use cmd::standalone::StandaloneOptions;
use common_config::{Configurable, DEFAULT_DATA_HOME};
use common_config::Configurable;
use common_options::datanode::{ClientOptions, DatanodeClientOptions};
use common_telemetry::logging::{LoggingOptions, DEFAULT_LOGGING_DIR, DEFAULT_OTLP_ENDPOINT};
use common_telemetry::logging::{LoggingOptions, DEFAULT_OTLP_ENDPOINT};
use common_wal::config::raft_engine::RaftEngineConfig;
use common_wal::config::DatanodeWalConfig;
use datanode::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
@@ -33,7 +32,6 @@ use mito2::config::MitoConfig;
use servers::export_metrics::ExportMetricsOption;
use servers::grpc::GrpcOptions;
use servers::http::HttpOptions;
use store_api::path_utils::WAL_DIR;
#[allow(deprecated)]
#[test]
@@ -58,18 +56,13 @@ fn test_load_datanode_example_config() {
metadata_cache_tti: Duration::from_secs(300),
}),
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
dir: Some(
Path::new(DEFAULT_DATA_HOME)
.join(WAL_DIR)
.to_string_lossy()
.to_string(),
),
dir: Some("./greptimedb_data/wal".to_string()),
sync_period: Some(Duration::from_secs(10)),
recovery_parallelism: 2,
..Default::default()
}),
storage: StorageConfig {
data_home: DEFAULT_DATA_HOME.to_string(),
data_home: "./greptimedb_data/".to_string(),
..Default::default()
},
region_engine: vec![
@@ -86,16 +79,12 @@ fn test_load_datanode_example_config() {
],
logging: LoggingOptions {
level: Some("info".to_string()),
dir: Path::new(DEFAULT_DATA_HOME)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string(),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
..Default::default()
},
export_metrics: ExportMetricsOption {
self_import: None,
self_import: Some(Default::default()),
remote_write: Some(Default::default()),
..Default::default()
},
@@ -132,10 +121,6 @@ fn test_load_frontend_example_config() {
}),
logging: LoggingOptions {
level: Some("info".to_string()),
dir: Path::new(DEFAULT_DATA_HOME)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string(),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
..Default::default()
@@ -148,7 +133,7 @@ fn test_load_frontend_example_config() {
},
},
export_metrics: ExportMetricsOption {
self_import: None,
self_import: Some(Default::default()),
remote_write: Some(Default::default()),
..Default::default()
},
@@ -175,17 +160,10 @@ fn test_load_metasrv_example_config() {
let expected = GreptimeOptions::<MetasrvOptions> {
component: MetasrvOptions {
selector: SelectorType::default(),
data_home: DEFAULT_DATA_HOME.to_string(),
grpc: GrpcOptions {
bind_addr: "127.0.0.1:3002".to_string(),
server_addr: "127.0.0.1:3002".to_string(),
..Default::default()
},
data_home: "./greptimedb_data/metasrv/".to_string(),
server_addr: "127.0.0.1:3002".to_string(),
logging: LoggingOptions {
dir: Path::new(DEFAULT_DATA_HOME)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string(),
dir: "./greptimedb_data/logs".to_string(),
level: Some("info".to_string()),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
@@ -199,7 +177,7 @@ fn test_load_metasrv_example_config() {
},
},
export_metrics: ExportMetricsOption {
self_import: None,
self_import: Some(Default::default()),
remote_write: Some(Default::default()),
..Default::default()
},
@@ -220,12 +198,7 @@ fn test_load_standalone_example_config() {
component: StandaloneOptions {
default_timezone: Some("UTC".to_string()),
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
dir: Some(
Path::new(DEFAULT_DATA_HOME)
.join(WAL_DIR)
.to_string_lossy()
.to_string(),
),
dir: Some("./greptimedb_data/wal".to_string()),
sync_period: Some(Duration::from_secs(10)),
recovery_parallelism: 2,
..Default::default()
@@ -243,15 +216,11 @@ fn test_load_standalone_example_config() {
}),
],
storage: StorageConfig {
data_home: DEFAULT_DATA_HOME.to_string(),
data_home: "./greptimedb_data/".to_string(),
..Default::default()
},
logging: LoggingOptions {
level: Some("info".to_string()),
dir: Path::new(DEFAULT_DATA_HOME)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string(),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
..Default::default()

View File

@@ -26,9 +26,6 @@ pub fn metadata_store_dir(store_dir: &str) -> String {
format!("{store_dir}/metadata")
}
/// The default data home directory.
pub const DEFAULT_DATA_HOME: &str = "./greptimedb_data";
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct KvBackendConfig {

View File

@@ -13,9 +13,7 @@
// limitations under the License.
pub mod fs;
pub mod oss;
pub mod s3;
use std::collections::HashMap;
use lazy_static::lazy_static;
@@ -27,12 +25,10 @@ use url::{ParseError, Url};
use self::fs::build_fs_backend;
use self::s3::build_s3_backend;
use crate::error::{self, Result};
use crate::object_store::oss::build_oss_backend;
use crate::util::find_dir_and_filename;
pub const FS_SCHEMA: &str = "FS";
pub const S3_SCHEMA: &str = "S3";
pub const OSS_SCHEMA: &str = "OSS";
/// Returns `(schema, Option<host>, path)`
pub fn parse_url(url: &str) -> Result<(String, Option<String>, String)> {
@@ -68,12 +64,6 @@ pub fn build_backend(url: &str, connection: &HashMap<String, String>) -> Result<
})?;
Ok(build_s3_backend(&host, &root, connection)?)
}
OSS_SCHEMA => {
let host = host.context(error::EmptyHostPathSnafu {
url: url.to_string(),
})?;
Ok(build_oss_backend(&host, &root, connection)?)
}
FS_SCHEMA => Ok(build_fs_backend(&root)?),
_ => error::UnsupportedBackendProtocolSnafu {

View File

@@ -1,118 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use object_store::services::Oss;
use object_store::ObjectStore;
use snafu::ResultExt;
use crate::error::{self, Result};
const BUCKET: &str = "bucket";
const ENDPOINT: &str = "endpoint";
const ACCESS_KEY_ID: &str = "access_key_id";
const ACCESS_KEY_SECRET: &str = "access_key_secret";
const ROOT: &str = "root";
const ALLOW_ANONYMOUS: &str = "allow_anonymous";
/// Check if the key is supported in OSS configuration.
pub fn is_supported_in_oss(key: &str) -> bool {
[
ROOT,
ALLOW_ANONYMOUS,
BUCKET,
ENDPOINT,
ACCESS_KEY_ID,
ACCESS_KEY_SECRET,
]
.contains(&key)
}
/// Build an OSS backend using the provided bucket, root, and connection parameters.
pub fn build_oss_backend(
bucket: &str,
root: &str,
connection: &HashMap<String, String>,
) -> Result<ObjectStore> {
let mut builder = Oss::default().bucket(bucket).root(root);
if let Some(endpoint) = connection.get(ENDPOINT) {
builder = builder.endpoint(endpoint);
}
if let Some(access_key_id) = connection.get(ACCESS_KEY_ID) {
builder = builder.access_key_id(access_key_id);
}
if let Some(access_key_secret) = connection.get(ACCESS_KEY_SECRET) {
builder = builder.access_key_secret(access_key_secret);
}
if let Some(allow_anonymous) = connection.get(ALLOW_ANONYMOUS) {
let allow = allow_anonymous.as_str().parse::<bool>().map_err(|e| {
error::InvalidConnectionSnafu {
msg: format!(
"failed to parse the option {}={}, {}",
ALLOW_ANONYMOUS, allow_anonymous, e
),
}
.build()
})?;
if allow {
builder = builder.allow_anonymous();
}
}
let op = ObjectStore::new(builder)
.context(error::BuildBackendSnafu)?
.layer(object_store::layers::LoggingLayer::default())
.layer(object_store::layers::TracingLayer)
.layer(object_store::layers::build_prometheus_metrics_layer(true))
.finish();
Ok(op)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_is_supported_in_oss() {
assert!(is_supported_in_oss(ROOT));
assert!(is_supported_in_oss(ALLOW_ANONYMOUS));
assert!(is_supported_in_oss(BUCKET));
assert!(is_supported_in_oss(ENDPOINT));
assert!(is_supported_in_oss(ACCESS_KEY_ID));
assert!(is_supported_in_oss(ACCESS_KEY_SECRET));
assert!(!is_supported_in_oss("foo"));
assert!(!is_supported_in_oss("BAR"));
}
#[test]
fn test_build_oss_backend_all_fields_valid() {
let mut connection = HashMap::new();
connection.insert(
ENDPOINT.to_string(),
"http://oss-ap-southeast-1.aliyuncs.com".to_string(),
);
connection.insert(ACCESS_KEY_ID.to_string(), "key_id".to_string());
connection.insert(ACCESS_KEY_SECRET.to_string(), "key_secret".to_string());
connection.insert(ALLOW_ANONYMOUS.to_string(), "true".to_string());
let result = build_oss_backend("my-bucket", "my-root", &connection);
assert!(result.is_ok());
}
}

View File

@@ -12,6 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod cgroups;
mod geo_path;
mod hll;
mod uddsketch_state;
pub use cgroups::*;
pub use geo_path::{GeoPathAccumulator, GEO_PATH_NAME};
pub(crate) use hll::HllStateType;
pub use hll::{HllState, HLL_MERGE_NAME, HLL_NAME};
pub use uddsketch_state::{UddSketchState, UDDSKETCH_MERGE_NAME, UDDSKETCH_STATE_NAME};

View File

@@ -47,7 +47,7 @@ impl GeoPathAccumulator {
Self::default()
}
pub fn uadf_impl() -> AggregateUDF {
pub fn udf_impl() -> AggregateUDF {
create_udaf(
GEO_PATH_NAME,
// Input types: lat, lng, timestamp

View File

@@ -1,18 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod approximate;
#[cfg(feature = "geo")]
pub mod geo;
pub mod vector;

View File

@@ -1,32 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::function_registry::FunctionRegistry;
pub(crate) mod hll;
mod uddsketch;
pub(crate) struct ApproximateFunction;
impl ApproximateFunction {
pub fn register(registry: &FunctionRegistry) {
// uddsketch
registry.register_aggr(uddsketch::UddSketchState::state_udf_impl());
registry.register_aggr(uddsketch::UddSketchState::merge_udf_impl());
// hll
registry.register_aggr(hll::HllState::state_udf_impl());
registry.register_aggr(hll::HllState::merge_udf_impl());
}
}

View File

@@ -1,27 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::function_registry::FunctionRegistry;
mod encoding;
mod geo_path;
pub(crate) struct GeoFunction;
impl GeoFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_aggr(geo_path::GeoPathAccumulator::uadf_impl());
registry.register_aggr(encoding::JsonPathAccumulator::uadf_impl());
}
}

View File

@@ -1,29 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::aggrs::vector::product::VectorProduct;
use crate::aggrs::vector::sum::VectorSum;
use crate::function_registry::FunctionRegistry;
mod product;
mod sum;
pub(crate) struct VectorFunction;
impl VectorFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_aggr(VectorSum::uadf_impl());
registry.register_aggr(VectorProduct::uadf_impl());
}
}

View File

@@ -1,63 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use datafusion_expr::ScalarUDF;
use crate::function::{FunctionContext, FunctionRef};
use crate::scalars::udf::create_udf;
/// A factory for creating `ScalarUDF` that require a function context.
#[derive(Clone)]
pub struct ScalarFunctionFactory {
name: String,
factory: Arc<dyn Fn(FunctionContext) -> ScalarUDF + Send + Sync>,
}
impl ScalarFunctionFactory {
/// Returns the name of the function.
pub fn name(&self) -> &str {
&self.name
}
/// Returns a `ScalarUDF` when given a function context.
pub fn provide(&self, ctx: FunctionContext) -> ScalarUDF {
(self.factory)(ctx)
}
}
impl From<ScalarUDF> for ScalarFunctionFactory {
fn from(df_udf: ScalarUDF) -> Self {
let name = df_udf.name().to_string();
let func = Arc::new(move |_ctx| df_udf.clone());
Self {
name,
factory: func,
}
}
}
impl From<FunctionRef> for ScalarFunctionFactory {
fn from(func: FunctionRef) -> Self {
let name = func.name().to_string();
let func = Arc::new(move |ctx: FunctionContext| {
create_udf(func.clone(), ctx.query_ctx, ctx.state)
});
Self {
name,
factory: func,
}
}
}

View File

@@ -16,14 +16,11 @@
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use datafusion_expr::AggregateUDF;
use once_cell::sync::Lazy;
use crate::admin::AdminFunction;
use crate::aggrs::approximate::ApproximateFunction;
use crate::aggrs::vector::VectorFunction as VectorAggrFunction;
use crate::function::{AsyncFunctionRef, Function, FunctionRef};
use crate::function_factory::ScalarFunctionFactory;
use crate::function::{AsyncFunctionRef, FunctionRef};
use crate::scalars::aggregate::{AggregateFunctionMetaRef, AggregateFunctions};
use crate::scalars::date::DateFunction;
use crate::scalars::expression::ExpressionFunction;
use crate::scalars::hll_count::HllCalcFunction;
@@ -34,19 +31,18 @@ use crate::scalars::matches_term::MatchesTermFunction;
use crate::scalars::math::MathFunction;
use crate::scalars::timestamp::TimestampFunction;
use crate::scalars::uddsketch_calc::UddSketchCalcFunction;
use crate::scalars::vector::VectorFunction as VectorScalarFunction;
use crate::scalars::vector::VectorFunction;
use crate::system::SystemFunction;
#[derive(Default)]
pub struct FunctionRegistry {
functions: RwLock<HashMap<String, ScalarFunctionFactory>>,
functions: RwLock<HashMap<String, FunctionRef>>,
async_functions: RwLock<HashMap<String, AsyncFunctionRef>>,
aggregate_functions: RwLock<HashMap<String, AggregateUDF>>,
aggregate_functions: RwLock<HashMap<String, AggregateFunctionMetaRef>>,
}
impl FunctionRegistry {
pub fn register(&self, func: impl Into<ScalarFunctionFactory>) {
let func = func.into();
pub fn register(&self, func: FunctionRef) {
let _ = self
.functions
.write()
@@ -54,10 +50,6 @@ impl FunctionRegistry {
.insert(func.name().to_string(), func);
}
pub fn register_scalar(&self, func: impl Function + 'static) {
self.register(Arc::new(func) as FunctionRef);
}
pub fn register_async(&self, func: AsyncFunctionRef) {
let _ = self
.async_functions
@@ -66,14 +58,6 @@ impl FunctionRegistry {
.insert(func.name().to_string(), func);
}
pub fn register_aggr(&self, func: AggregateUDF) {
let _ = self
.aggregate_functions
.write()
.unwrap()
.insert(func.name().to_string(), func);
}
pub fn get_async_function(&self, name: &str) -> Option<AsyncFunctionRef> {
self.async_functions.read().unwrap().get(name).cloned()
}
@@ -87,16 +71,27 @@ impl FunctionRegistry {
.collect()
}
#[cfg(test)]
pub fn get_function(&self, name: &str) -> Option<ScalarFunctionFactory> {
pub fn register_aggregate_function(&self, func: AggregateFunctionMetaRef) {
let _ = self
.aggregate_functions
.write()
.unwrap()
.insert(func.name(), func);
}
pub fn get_aggr_function(&self, name: &str) -> Option<AggregateFunctionMetaRef> {
self.aggregate_functions.read().unwrap().get(name).cloned()
}
pub fn get_function(&self, name: &str) -> Option<FunctionRef> {
self.functions.read().unwrap().get(name).cloned()
}
pub fn scalar_functions(&self) -> Vec<ScalarFunctionFactory> {
pub fn functions(&self) -> Vec<FunctionRef> {
self.functions.read().unwrap().values().cloned().collect()
}
pub fn aggregate_functions(&self) -> Vec<AggregateUDF> {
pub fn aggregate_functions(&self) -> Vec<AggregateFunctionMetaRef> {
self.aggregate_functions
.read()
.unwrap()
@@ -117,6 +112,9 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
UddSketchCalcFunction::register(&function_registry);
HllCalcFunction::register(&function_registry);
// Aggregate functions
AggregateFunctions::register(&function_registry);
// Full text search function
MatchesFunction::register(&function_registry);
MatchesTermFunction::register(&function_registry);
@@ -129,21 +127,15 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
JsonFunction::register(&function_registry);
// Vector related functions
VectorScalarFunction::register(&function_registry);
VectorAggrFunction::register(&function_registry);
VectorFunction::register(&function_registry);
// Geo functions
#[cfg(feature = "geo")]
crate::scalars::geo::GeoFunctions::register(&function_registry);
#[cfg(feature = "geo")]
crate::aggrs::geo::GeoFunction::register(&function_registry);
// Ip functions
IpFunctions::register(&function_registry);
// Approximate functions
ApproximateFunction::register(&function_registry);
Arc::new(function_registry)
});
@@ -155,11 +147,12 @@ mod tests {
#[test]
fn test_function_registry() {
let registry = FunctionRegistry::default();
let func = Arc::new(TestAndFunction);
assert!(registry.get_function("test_and").is_none());
assert!(registry.scalar_functions().is_empty());
registry.register_scalar(TestAndFunction);
assert!(registry.functions().is_empty());
registry.register(func);
let _ = registry.get_function("test_and").unwrap();
assert_eq!(1, registry.scalar_functions().len());
assert_eq!(1, registry.functions().len());
}
}

View File

@@ -19,14 +19,13 @@ mod adjust_flow;
mod admin;
mod flush_flow;
mod macros;
pub mod scalars;
mod system;
pub mod aggrs;
pub mod aggr;
pub mod function;
pub mod function_factory;
pub mod function_registry;
pub mod handlers;
pub mod helper;
pub mod scalars;
pub mod state;
pub mod utils;

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod aggregate;
pub(crate) mod date;
pub mod expression;
#[cfg(feature = "geo")]

View File

@@ -0,0 +1,89 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! # Deprecate Warning:
//!
//! This module is deprecated and will be removed in the future.
//! All UDAF implementation here are not maintained and should
//! not be used before they are refactored into the `src/aggr`
//! version.
use std::sync::Arc;
use common_query::logical_plan::AggregateFunctionCreatorRef;
use crate::function_registry::FunctionRegistry;
use crate::scalars::vector::product::VectorProductCreator;
use crate::scalars::vector::sum::VectorSumCreator;
/// A function creates `AggregateFunctionCreator`.
/// "Aggregator" *is* AggregatorFunction. Since the later one is long, we named an short alias for it.
/// The two names might be used interchangeably.
type AggregatorCreatorFunction = Arc<dyn Fn() -> AggregateFunctionCreatorRef + Send + Sync>;
/// `AggregateFunctionMeta` dynamically creates AggregateFunctionCreator.
#[derive(Clone)]
pub struct AggregateFunctionMeta {
name: String,
args_count: u8,
creator: AggregatorCreatorFunction,
}
pub type AggregateFunctionMetaRef = Arc<AggregateFunctionMeta>;
impl AggregateFunctionMeta {
pub fn new(name: &str, args_count: u8, creator: AggregatorCreatorFunction) -> Self {
Self {
name: name.to_string(),
args_count,
creator,
}
}
pub fn name(&self) -> String {
self.name.to_string()
}
pub fn args_count(&self) -> u8 {
self.args_count
}
pub fn create(&self) -> AggregateFunctionCreatorRef {
(self.creator)()
}
}
pub(crate) struct AggregateFunctions;
impl AggregateFunctions {
pub fn register(registry: &FunctionRegistry) {
registry.register_aggregate_function(Arc::new(AggregateFunctionMeta::new(
"vec_sum",
1,
Arc::new(|| Arc::new(VectorSumCreator::default())),
)));
registry.register_aggregate_function(Arc::new(AggregateFunctionMeta::new(
"vec_product",
1,
Arc::new(|| Arc::new(VectorProductCreator::default())),
)));
#[cfg(feature = "geo")]
registry.register_aggregate_function(Arc::new(AggregateFunctionMeta::new(
"json_encode_path",
3,
Arc::new(|| Arc::new(super::geo::encoding::JsonPathEncodeFunctionCreator::default())),
)));
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
mod date_add;
mod date_format;
mod date_sub;
@@ -26,8 +27,8 @@ pub(crate) struct DateFunction;
impl DateFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(DateAddFunction);
registry.register_scalar(DateSubFunction);
registry.register_scalar(DateFormatFunction);
registry.register(Arc::new(DateAddFunction));
registry.register(Arc::new(DateSubFunction));
registry.register(Arc::new(DateFormatFunction));
}
}

View File

@@ -17,6 +17,8 @@ mod ctx;
mod is_null;
mod unary;
use std::sync::Arc;
pub use binary::scalar_binary_op;
pub use ctx::EvalContext;
pub use unary::scalar_unary_op;
@@ -28,6 +30,6 @@ pub(crate) struct ExpressionFunction;
impl ExpressionFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(IsNullFunction);
registry.register(Arc::new(IsNullFunction));
}
}

View File

@@ -12,9 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
pub(crate) mod encoding;
mod geohash;
mod h3;
pub(crate) mod helpers;
mod helpers;
mod measure;
mod relation;
mod s2;
@@ -27,57 +29,57 @@ pub(crate) struct GeoFunctions;
impl GeoFunctions {
pub fn register(registry: &FunctionRegistry) {
// geohash
registry.register_scalar(geohash::GeohashFunction);
registry.register_scalar(geohash::GeohashNeighboursFunction);
registry.register(Arc::new(geohash::GeohashFunction));
registry.register(Arc::new(geohash::GeohashNeighboursFunction));
// h3 index
registry.register_scalar(h3::H3LatLngToCell);
registry.register_scalar(h3::H3LatLngToCellString);
registry.register(Arc::new(h3::H3LatLngToCell));
registry.register(Arc::new(h3::H3LatLngToCellString));
// h3 index inspection
registry.register_scalar(h3::H3CellBase);
registry.register_scalar(h3::H3CellIsPentagon);
registry.register_scalar(h3::H3StringToCell);
registry.register_scalar(h3::H3CellToString);
registry.register_scalar(h3::H3CellCenterLatLng);
registry.register_scalar(h3::H3CellResolution);
registry.register(Arc::new(h3::H3CellBase));
registry.register(Arc::new(h3::H3CellIsPentagon));
registry.register(Arc::new(h3::H3StringToCell));
registry.register(Arc::new(h3::H3CellToString));
registry.register(Arc::new(h3::H3CellCenterLatLng));
registry.register(Arc::new(h3::H3CellResolution));
// h3 hierarchical grid
registry.register_scalar(h3::H3CellCenterChild);
registry.register_scalar(h3::H3CellParent);
registry.register_scalar(h3::H3CellToChildren);
registry.register_scalar(h3::H3CellToChildrenSize);
registry.register_scalar(h3::H3CellToChildPos);
registry.register_scalar(h3::H3ChildPosToCell);
registry.register_scalar(h3::H3CellContains);
registry.register(Arc::new(h3::H3CellCenterChild));
registry.register(Arc::new(h3::H3CellParent));
registry.register(Arc::new(h3::H3CellToChildren));
registry.register(Arc::new(h3::H3CellToChildrenSize));
registry.register(Arc::new(h3::H3CellToChildPos));
registry.register(Arc::new(h3::H3ChildPosToCell));
registry.register(Arc::new(h3::H3CellContains));
// h3 grid traversal
registry.register_scalar(h3::H3GridDisk);
registry.register_scalar(h3::H3GridDiskDistances);
registry.register_scalar(h3::H3GridDistance);
registry.register_scalar(h3::H3GridPathCells);
registry.register(Arc::new(h3::H3GridDisk));
registry.register(Arc::new(h3::H3GridDiskDistances));
registry.register(Arc::new(h3::H3GridDistance));
registry.register(Arc::new(h3::H3GridPathCells));
// h3 measurement
registry.register_scalar(h3::H3CellDistanceSphereKm);
registry.register_scalar(h3::H3CellDistanceEuclideanDegree);
registry.register(Arc::new(h3::H3CellDistanceSphereKm));
registry.register(Arc::new(h3::H3CellDistanceEuclideanDegree));
// s2
registry.register_scalar(s2::S2LatLngToCell);
registry.register_scalar(s2::S2CellLevel);
registry.register_scalar(s2::S2CellToToken);
registry.register_scalar(s2::S2CellParent);
registry.register(Arc::new(s2::S2LatLngToCell));
registry.register(Arc::new(s2::S2CellLevel));
registry.register(Arc::new(s2::S2CellToToken));
registry.register(Arc::new(s2::S2CellParent));
// spatial data type
registry.register_scalar(wkt::LatLngToPointWkt);
registry.register(Arc::new(wkt::LatLngToPointWkt));
// spatial relation
registry.register_scalar(relation::STContains);
registry.register_scalar(relation::STWithin);
registry.register_scalar(relation::STIntersects);
registry.register(Arc::new(relation::STContains));
registry.register(Arc::new(relation::STWithin));
registry.register(Arc::new(relation::STIntersects));
// spatial measure
registry.register_scalar(measure::STDistance);
registry.register_scalar(measure::STDistanceSphere);
registry.register_scalar(measure::STArea);
registry.register(Arc::new(measure::STDistance));
registry.register(Arc::new(measure::STDistanceSphere));
registry.register(Arc::new(measure::STArea));
}
}

View File

@@ -19,12 +19,9 @@ use common_error::status_code::StatusCode;
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{self, InvalidInputStateSnafu, Result};
use common_query::logical_plan::accumulator::AggrFuncTypeStore;
use common_query::logical_plan::{
create_aggregate_function, Accumulator, AggregateFunctionCreator,
};
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
use common_query::prelude::AccumulatorCreatorFunction;
use common_time::Timestamp;
use datafusion_expr::AggregateUDF;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::{ListValue, Value};
use datatypes::vectors::VectorRef;
@@ -50,16 +47,6 @@ impl JsonPathAccumulator {
timestamp_type,
}
}
/// Create a new `AggregateUDF` for the `json_encode_path` aggregate function.
pub fn uadf_impl() -> AggregateUDF {
create_aggregate_function(
"json_encode_path".to_string(),
3,
Arc::new(JsonPathEncodeFunctionCreator::default()),
)
.into()
}
}
impl Accumulator for JsonPathAccumulator {

View File

@@ -37,7 +37,7 @@ macro_rules! ensure_columns_len {
};
}
pub(crate) use ensure_columns_len;
pub(super) use ensure_columns_len;
macro_rules! ensure_columns_n {
($columns:ident, $n:literal) => {
@@ -58,7 +58,7 @@ macro_rules! ensure_columns_n {
};
}
pub(crate) use ensure_columns_n;
pub(super) use ensure_columns_n;
macro_rules! ensure_and_coerce {
($compare:expr, $coerce:expr) => {{
@@ -72,4 +72,4 @@ macro_rules! ensure_and_coerce {
}};
}
pub(crate) use ensure_and_coerce;
pub(super) use ensure_and_coerce;

View File

@@ -16,6 +16,7 @@
use std::fmt;
use std::fmt::Display;
use std::sync::Arc;
use common_query::error::{DowncastVectorSnafu, InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, Volatility};
@@ -26,7 +27,7 @@ use datatypes::vectors::{BinaryVector, MutableVector, UInt64VectorBuilder, Vecto
use hyperloglogplus::HyperLogLog;
use snafu::OptionExt;
use crate::aggrs::approximate::hll::HllStateType;
use crate::aggr::HllStateType;
use crate::function::{Function, FunctionContext};
use crate::function_registry::FunctionRegistry;
@@ -43,7 +44,7 @@ pub struct HllCalcFunction;
impl HllCalcFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(HllCalcFunction);
registry.register(Arc::new(HllCalcFunction));
}
}
@@ -116,8 +117,6 @@ impl Function for HllCalcFunction {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use datatypes::vectors::BinaryVector;
use super::*;

View File

@@ -17,6 +17,8 @@ mod ipv4;
mod ipv6;
mod range;
use std::sync::Arc;
use cidr::{Ipv4ToCidr, Ipv6ToCidr};
use ipv4::{Ipv4NumToString, Ipv4StringToNum};
use ipv6::{Ipv6NumToString, Ipv6StringToNum};
@@ -29,15 +31,15 @@ pub(crate) struct IpFunctions;
impl IpFunctions {
pub fn register(registry: &FunctionRegistry) {
// Register IPv4 functions
registry.register_scalar(Ipv4NumToString);
registry.register_scalar(Ipv4StringToNum);
registry.register_scalar(Ipv4ToCidr);
registry.register_scalar(Ipv4InRange);
registry.register(Arc::new(Ipv4NumToString));
registry.register(Arc::new(Ipv4StringToNum));
registry.register(Arc::new(Ipv4ToCidr));
registry.register(Arc::new(Ipv4InRange));
// Register IPv6 functions
registry.register_scalar(Ipv6NumToString);
registry.register_scalar(Ipv6StringToNum);
registry.register_scalar(Ipv6ToCidr);
registry.register_scalar(Ipv6InRange);
registry.register(Arc::new(Ipv6NumToString));
registry.register(Arc::new(Ipv6StringToNum));
registry.register(Arc::new(Ipv6ToCidr));
registry.register(Arc::new(Ipv6InRange));
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
pub mod json_get;
mod json_is;
mod json_path_exists;
@@ -32,23 +33,23 @@ pub(crate) struct JsonFunction;
impl JsonFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(JsonToStringFunction);
registry.register_scalar(ParseJsonFunction);
registry.register(Arc::new(JsonToStringFunction));
registry.register(Arc::new(ParseJsonFunction));
registry.register_scalar(JsonGetInt);
registry.register_scalar(JsonGetFloat);
registry.register_scalar(JsonGetString);
registry.register_scalar(JsonGetBool);
registry.register(Arc::new(JsonGetInt));
registry.register(Arc::new(JsonGetFloat));
registry.register(Arc::new(JsonGetString));
registry.register(Arc::new(JsonGetBool));
registry.register_scalar(JsonIsNull);
registry.register_scalar(JsonIsInt);
registry.register_scalar(JsonIsFloat);
registry.register_scalar(JsonIsString);
registry.register_scalar(JsonIsBool);
registry.register_scalar(JsonIsArray);
registry.register_scalar(JsonIsObject);
registry.register(Arc::new(JsonIsNull));
registry.register(Arc::new(JsonIsInt));
registry.register(Arc::new(JsonIsFloat));
registry.register(Arc::new(JsonIsString));
registry.register(Arc::new(JsonIsBool));
registry.register(Arc::new(JsonIsArray));
registry.register(Arc::new(JsonIsObject));
registry.register_scalar(json_path_exists::JsonPathExistsFunction);
registry.register_scalar(json_path_match::JsonPathMatchFunction);
registry.register(Arc::new(json_path_exists::JsonPathExistsFunction));
registry.register(Arc::new(json_path_match::JsonPathMatchFunction));
}
}

View File

@@ -38,11 +38,11 @@ use crate::function_registry::FunctionRegistry;
///
/// Usage: matches(`<col>`, `<pattern>`) -> boolean
#[derive(Clone, Debug, Default)]
pub struct MatchesFunction;
pub(crate) struct MatchesFunction;
impl MatchesFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(MatchesFunction);
registry.register(Arc::new(MatchesFunction));
}
}

View File

@@ -77,7 +77,7 @@ pub struct MatchesTermFunction;
impl MatchesTermFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(MatchesTermFunction);
registry.register(Arc::new(MatchesTermFunction));
}
}

View File

@@ -18,6 +18,7 @@ mod pow;
mod rate;
use std::fmt;
use std::sync::Arc;
pub use clamp::{ClampFunction, ClampMaxFunction, ClampMinFunction};
use common_query::error::{GeneralDataFusionSnafu, Result};
@@ -38,13 +39,13 @@ pub(crate) struct MathFunction;
impl MathFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(ModuloFunction);
registry.register_scalar(PowFunction);
registry.register_scalar(RateFunction);
registry.register_scalar(RangeFunction);
registry.register_scalar(ClampFunction);
registry.register_scalar(ClampMinFunction);
registry.register_scalar(ClampMaxFunction);
registry.register(Arc::new(ModuloFunction));
registry.register(Arc::new(PowFunction));
registry.register(Arc::new(RateFunction));
registry.register(Arc::new(RangeFunction));
registry.register(Arc::new(ClampFunction));
registry.register(Arc::new(ClampMinFunction));
registry.register(Arc::new(ClampMaxFunction));
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
mod to_unixtime;
use to_unixtime::ToUnixtimeFunction;
@@ -22,6 +23,6 @@ pub(crate) struct TimestampFunction;
impl TimestampFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(ToUnixtimeFunction);
registry.register(Arc::new(ToUnixtimeFunction));
}
}

View File

@@ -16,6 +16,7 @@
use std::fmt;
use std::fmt::Display;
use std::sync::Arc;
use common_query::error::{DowncastVectorSnafu, InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, Volatility};
@@ -43,7 +44,7 @@ pub struct UddSketchCalcFunction;
impl UddSketchCalcFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(UddSketchCalcFunction);
registry.register(Arc::new(UddSketchCalcFunction));
}
}

View File

@@ -17,8 +17,10 @@ mod distance;
mod elem_product;
mod elem_sum;
pub mod impl_conv;
pub(crate) mod product;
mod scalar_add;
mod scalar_mul;
pub(crate) mod sum;
mod vector_add;
mod vector_dim;
mod vector_div;
@@ -28,34 +30,37 @@ mod vector_norm;
mod vector_sub;
mod vector_subvector;
use std::sync::Arc;
use crate::function_registry::FunctionRegistry;
pub(crate) struct VectorFunction;
impl VectorFunction {
pub fn register(registry: &FunctionRegistry) {
// conversion
registry.register_scalar(convert::ParseVectorFunction);
registry.register_scalar(convert::VectorToStringFunction);
registry.register(Arc::new(convert::ParseVectorFunction));
registry.register(Arc::new(convert::VectorToStringFunction));
// distance
registry.register_scalar(distance::CosDistanceFunction);
registry.register_scalar(distance::DotProductFunction);
registry.register_scalar(distance::L2SqDistanceFunction);
registry.register(Arc::new(distance::CosDistanceFunction));
registry.register(Arc::new(distance::DotProductFunction));
registry.register(Arc::new(distance::L2SqDistanceFunction));
// scalar calculation
registry.register_scalar(scalar_add::ScalarAddFunction);
registry.register_scalar(scalar_mul::ScalarMulFunction);
registry.register(Arc::new(scalar_add::ScalarAddFunction));
registry.register(Arc::new(scalar_mul::ScalarMulFunction));
// vector calculation
registry.register_scalar(vector_add::VectorAddFunction);
registry.register_scalar(vector_sub::VectorSubFunction);
registry.register_scalar(vector_mul::VectorMulFunction);
registry.register_scalar(vector_div::VectorDivFunction);
registry.register_scalar(vector_norm::VectorNormFunction);
registry.register_scalar(vector_dim::VectorDimFunction);
registry.register_scalar(vector_kth_elem::VectorKthElemFunction);
registry.register_scalar(vector_subvector::VectorSubvectorFunction);
registry.register_scalar(elem_sum::ElemSumFunction);
registry.register_scalar(elem_product::ElemProductFunction);
registry.register(Arc::new(vector_add::VectorAddFunction));
registry.register(Arc::new(vector_sub::VectorSubFunction));
registry.register(Arc::new(vector_mul::VectorMulFunction));
registry.register(Arc::new(vector_div::VectorDivFunction));
registry.register(Arc::new(vector_norm::VectorNormFunction));
registry.register(Arc::new(vector_dim::VectorDimFunction));
registry.register(Arc::new(vector_kth_elem::VectorKthElemFunction));
registry.register(Arc::new(vector_subvector::VectorSubvectorFunction));
registry.register(Arc::new(elem_sum::ElemSumFunction));
registry.register(Arc::new(elem_product::ElemProductFunction));
}
}

View File

@@ -16,11 +16,8 @@ use std::sync::Arc;
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{CreateAccumulatorSnafu, Error, InvalidFuncArgsSnafu};
use common_query::logical_plan::{
create_aggregate_function, Accumulator, AggregateFunctionCreator,
};
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
use common_query::prelude::AccumulatorCreatorFunction;
use datafusion_expr::AggregateUDF;
use datatypes::prelude::{ConcreteDataType, Value, *};
use datatypes::vectors::VectorRef;
use nalgebra::{Const, DVectorView, Dyn, OVector};
@@ -78,16 +75,6 @@ impl AggregateFunctionCreator for VectorProductCreator {
}
impl VectorProduct {
/// Create a new `AggregateUDF` for the `vec_product` aggregate function.
pub fn uadf_impl() -> AggregateUDF {
create_aggregate_function(
"vec_product".to_string(),
1,
Arc::new(VectorProductCreator::default()),
)
.into()
}
fn inner(&mut self, len: usize) -> &mut OVector<f32, Dyn> {
self.product.get_or_insert_with(|| {
OVector::from_iterator_generic(Dyn(len), Const::<1>, (0..len).map(|_| 1.0))

View File

@@ -16,11 +16,8 @@ use std::sync::Arc;
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{CreateAccumulatorSnafu, Error, InvalidFuncArgsSnafu};
use common_query::logical_plan::{
create_aggregate_function, Accumulator, AggregateFunctionCreator,
};
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
use common_query::prelude::AccumulatorCreatorFunction;
use datafusion_expr::AggregateUDF;
use datatypes::prelude::{ConcreteDataType, Value, *};
use datatypes::vectors::VectorRef;
use nalgebra::{Const, DVectorView, Dyn, OVector};
@@ -28,7 +25,6 @@ use snafu::ensure;
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const, veclit_to_binlit};
/// The accumulator for the `vec_sum` aggregate function.
#[derive(Debug, Default)]
pub struct VectorSum {
sum: Option<OVector<f32, Dyn>>,
@@ -78,16 +74,6 @@ impl AggregateFunctionCreator for VectorSumCreator {
}
impl VectorSum {
/// Create a new `AggregateUDF` for the `vec_sum` aggregate function.
pub fn uadf_impl() -> AggregateUDF {
create_aggregate_function(
"vec_sum".to_string(),
1,
Arc::new(VectorSumCreator::default()),
)
.into()
}
fn inner(&mut self, len: usize) -> &mut OVector<f32, Dyn> {
self.sum
.get_or_insert_with(|| OVector::zeros_generic(Dyn(len), Const::<1>))

View File

@@ -36,13 +36,13 @@ pub(crate) struct SystemFunction;
impl SystemFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(BuildFunction);
registry.register_scalar(VersionFunction);
registry.register_scalar(CurrentSchemaFunction);
registry.register_scalar(DatabaseFunction);
registry.register_scalar(SessionUserFunction);
registry.register_scalar(ReadPreferenceFunction);
registry.register_scalar(TimezoneFunction);
registry.register(Arc::new(BuildFunction));
registry.register(Arc::new(VersionFunction));
registry.register(Arc::new(CurrentSchemaFunction));
registry.register(Arc::new(DatabaseFunction));
registry.register(Arc::new(SessionUserFunction));
registry.register(Arc::new(ReadPreferenceFunction));
registry.register(Arc::new(TimezoneFunction));
registry.register_async(Arc::new(ProcedureStateFunction));
PGCatalogFunction::register(registry);
}

View File

@@ -16,6 +16,8 @@ mod pg_get_userbyid;
mod table_is_visible;
mod version;
use std::sync::Arc;
use pg_get_userbyid::PGGetUserByIdFunction;
use table_is_visible::PGTableIsVisibleFunction;
use version::PGVersionFunction;
@@ -33,8 +35,8 @@ pub(super) struct PGCatalogFunction;
impl PGCatalogFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(PGTableIsVisibleFunction);
registry.register_scalar(PGGetUserByIdFunction);
registry.register_scalar(PGVersionFunction);
registry.register(Arc::new(PGTableIsVisibleFunction));
registry.register(Arc::new(PGGetUserByIdFunction));
registry.register(Arc::new(PGVersionFunction));
}
}

View File

@@ -64,19 +64,6 @@ impl Default for FlightEncoder {
}
impl FlightEncoder {
/// Creates new [FlightEncoder] with compression disabled.
pub fn with_compression_disabled() -> Self {
let write_options = writer::IpcWriteOptions::default()
.try_with_compression(None)
.unwrap();
Self {
write_options,
data_gen: writer::IpcDataGenerator::default(),
dictionary_tracker: writer::DictionaryTracker::new(false),
}
}
pub fn encode(&mut self, flight_message: FlightMessage) -> FlightData {
match flight_message {
FlightMessage::Schema(schema) => SchemaAsIpc::new(&schema, &self.write_options).into(),

View File

@@ -18,7 +18,6 @@ pub mod create_table;
pub mod datanode_handler;
pub mod flownode_handler;
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use api::v1::meta::Partition;
@@ -76,6 +75,8 @@ pub async fn create_logical_table(
physical_table_id: TableId,
table_name: &str,
) -> TableId {
use std::assert_matches::assert_matches;
let tasks = vec![test_create_logical_table_task(table_name)];
let mut procedure = CreateLogicalTablesProcedure::new(tasks, physical_table_id, ddl_context);
let status = procedure.on_prepare().await.unwrap();

View File

@@ -35,9 +35,6 @@ pub const FLOWNODE_LEASE_SECS: u64 = DATANODE_LEASE_SECS;
/// The lease seconds of metasrv leader.
pub const META_LEASE_SECS: u64 = 5;
/// The keep-alive interval of the Postgres connection.
pub const POSTGRES_KEEP_ALIVE_SECS: u64 = 30;
/// In a lease, there are two opportunities for renewal.
pub const META_KEEP_ALIVE_INTERVAL_SECS: u64 = META_LEASE_SECS / 2;

View File

@@ -188,71 +188,7 @@ pub const CACHE_KEY_PREFIXES: [&str; 5] = [
NODE_ADDRESS_PREFIX,
];
/// A set of regions with the same role.
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)]
pub struct RegionRoleSet {
/// Leader regions.
pub leader_regions: Vec<RegionNumber>,
/// Follower regions.
pub follower_regions: Vec<RegionNumber>,
}
impl<'de> Deserialize<'de> for RegionRoleSet {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum RegionRoleSetOrLeaderOnly {
Full {
leader_regions: Vec<RegionNumber>,
follower_regions: Vec<RegionNumber>,
},
LeaderOnly(Vec<RegionNumber>),
}
match RegionRoleSetOrLeaderOnly::deserialize(deserializer)? {
RegionRoleSetOrLeaderOnly::Full {
leader_regions,
follower_regions,
} => Ok(RegionRoleSet::new(leader_regions, follower_regions)),
RegionRoleSetOrLeaderOnly::LeaderOnly(leader_regions) => {
Ok(RegionRoleSet::new(leader_regions, vec![]))
}
}
}
}
impl RegionRoleSet {
/// Create a new region role set.
pub fn new(leader_regions: Vec<RegionNumber>, follower_regions: Vec<RegionNumber>) -> Self {
Self {
leader_regions,
follower_regions,
}
}
/// Add a leader region to the set.
pub fn add_leader_region(&mut self, region_number: RegionNumber) {
self.leader_regions.push(region_number);
}
/// Add a follower region to the set.
pub fn add_follower_region(&mut self, region_number: RegionNumber) {
self.follower_regions.push(region_number);
}
/// Sort the regions.
pub fn sort(&mut self) {
self.follower_regions.sort();
self.leader_regions.sort();
}
}
/// The distribution of regions.
///
/// The key is the datanode id, the value is the region role set.
pub type RegionDistribution = BTreeMap<DatanodeId, RegionRoleSet>;
pub type RegionDistribution = BTreeMap<DatanodeId, Vec<RegionNumber>>;
/// The id of flow.
pub type FlowId = u32;
@@ -1432,8 +1368,7 @@ mod tests {
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::key::{
DeserializedValueWithBytes, RegionDistribution, RegionRoleSet, TableMetadataManager,
ViewInfoValue, TOPIC_REGION_PREFIX,
DeserializedValueWithBytes, TableMetadataManager, ViewInfoValue, TOPIC_REGION_PREFIX,
};
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::KvBackend;
@@ -2060,8 +1995,7 @@ mod tests {
.unwrap()
.unwrap();
assert_eq!(got.regions, regions.leader_regions);
assert_eq!(got.follower_regions, regions.follower_regions);
assert_eq!(got.regions, regions)
}
}
@@ -2478,28 +2412,4 @@ mod tests {
assert_eq!(current_view_info.columns, new_columns);
assert_eq!(current_view_info.plan_columns, new_plan_columns);
}
#[test]
fn test_region_role_set_deserialize() {
let s = r#"{"leader_regions": [1, 2, 3], "follower_regions": [4, 5, 6]}"#;
let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
assert_eq!(region_role_set.follower_regions, vec![4, 5, 6]);
let s = r#"[1, 2, 3]"#;
let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
assert!(region_role_set.follower_regions.is_empty());
}
#[test]
fn test_region_distribution_deserialize() {
let s = r#"{"1": [1,2,3], "2": {"leader_regions": [7, 8, 9], "follower_regions": [10, 11, 12]}}"#;
let region_distribution: RegionDistribution = serde_json::from_str(s).unwrap();
assert_eq!(region_distribution.len(), 2);
assert_eq!(region_distribution[&1].leader_regions, vec![1, 2, 3]);
assert!(region_distribution[&1].follower_regions.is_empty());
assert_eq!(region_distribution[&2].leader_regions, vec![7, 8, 9]);
assert_eq!(region_distribution[&2].follower_regions, vec![10, 11, 12]);
}
}

View File

@@ -24,7 +24,7 @@ use table::metadata::TableId;
use crate::error::{DatanodeTableInfoNotFoundSnafu, InvalidMetadataSnafu, Result};
use crate::key::table_route::PhysicalTableRouteValue;
use crate::key::{
MetadataKey, MetadataValue, RegionDistribution, RegionRoleSet, DATANODE_TABLE_KEY_PATTERN,
MetadataKey, MetadataValue, RegionDistribution, DATANODE_TABLE_KEY_PATTERN,
DATANODE_TABLE_KEY_PREFIX,
};
use crate::kv_backend::txn::{Txn, TxnOp};
@@ -118,31 +118,23 @@ impl Display for DatanodeTableKey {
pub struct DatanodeTableValue {
pub table_id: TableId,
pub regions: Vec<RegionNumber>,
#[serde(default)]
pub follower_regions: Vec<RegionNumber>,
#[serde(flatten)]
pub region_info: RegionInfo,
version: u64,
}
impl DatanodeTableValue {
pub fn new(table_id: TableId, region_role_set: RegionRoleSet, region_info: RegionInfo) -> Self {
let RegionRoleSet {
leader_regions,
follower_regions,
} = region_role_set;
pub fn new(table_id: TableId, regions: Vec<RegionNumber>, region_info: RegionInfo) -> Self {
Self {
table_id,
regions: leader_regions,
follower_regions,
regions,
region_info,
version: 0,
}
}
}
/// Decodes [`KeyValue`] to [`DatanodeTableValue`].
/// Decodes `KeyValue` to ((),`DatanodeTableValue`)
pub fn datanode_table_value_decoder(kv: KeyValue) -> Result<DatanodeTableValue> {
DatanodeTableValue::try_from_raw_value(&kv.value)
}
@@ -381,11 +373,10 @@ mod tests {
let value = DatanodeTableValue {
table_id: 42,
regions: vec![1, 2, 3],
follower_regions: vec![],
region_info: RegionInfo::default(),
version: 1,
};
let literal = br#"{"table_id":42,"regions":[1,2,3],"follower_regions":[],"engine":"","region_storage_path":"","region_options":{},"region_wal_options":{},"version":1}"#;
let literal = br#"{"table_id":42,"regions":[1,2,3],"engine":"","region_storage_path":"","region_options":{},"region_wal_options":{},"version":1}"#;
let raw_value = value.try_as_raw_value().unwrap();
assert_eq!(raw_value, literal);
@@ -476,7 +467,6 @@ mod tests {
let table_value = DatanodeTableValue {
table_id: 1,
regions: vec![],
follower_regions: vec![],
region_info,
version: 1,
};

View File

@@ -14,7 +14,7 @@
pub mod flow_info;
pub(crate) mod flow_name;
pub mod flow_route;
pub(crate) mod flow_route;
pub mod flow_state;
mod flownode_addr_helper;
pub(crate) mod flownode_flow;

View File

@@ -114,37 +114,37 @@ impl<'a> MetadataKey<'a, FlowInfoKeyInner> for FlowInfoKeyInner {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct FlowInfoValue {
/// The source tables used by the flow.
pub source_table_ids: Vec<TableId>,
pub(crate) source_table_ids: Vec<TableId>,
/// The sink table used by the flow.
pub sink_table_name: TableName,
pub(crate) sink_table_name: TableName,
/// Which flow nodes this flow is running on.
pub flownode_ids: BTreeMap<FlowPartitionId, FlownodeId>,
pub(crate) flownode_ids: BTreeMap<FlowPartitionId, FlownodeId>,
/// The catalog name.
pub catalog_name: String,
pub(crate) catalog_name: String,
/// The query context used when create flow.
/// Although flow doesn't belong to any schema, this query_context is needed to remember
/// the query context when `create_flow` is executed
/// for recovering flow using the same sql&query_context after db restart.
/// if none, should use default query context
#[serde(default)]
pub query_context: Option<crate::rpc::ddl::QueryContext>,
pub(crate) query_context: Option<crate::rpc::ddl::QueryContext>,
/// The flow name.
pub flow_name: String,
pub(crate) flow_name: String,
/// The raw sql.
pub raw_sql: String,
pub(crate) raw_sql: String,
/// The expr of expire.
/// Duration in seconds as `i64`.
pub expire_after: Option<i64>,
pub(crate) expire_after: Option<i64>,
/// The comment.
pub comment: String,
pub(crate) comment: String,
/// The options.
pub options: HashMap<String, String>,
pub(crate) options: HashMap<String, String>,
/// The created time
#[serde(default)]
pub created_time: DateTime<Utc>,
pub(crate) created_time: DateTime<Utc>,
/// The updated time.
#[serde(default)]
pub updated_time: DateTime<Utc>,
pub(crate) updated_time: DateTime<Utc>,
}
impl FlowInfoValue {

View File

@@ -40,23 +40,17 @@ pub fn region_distribution(region_routes: &[RegionRoute]) -> RegionDistribution
let mut regions_id_map = RegionDistribution::new();
for route in region_routes.iter() {
if let Some(peer) = route.leader_peer.as_ref() {
let region_number = route.region.id.region_number();
regions_id_map
.entry(peer.id)
.or_default()
.add_leader_region(region_number);
let region_id = route.region.id.region_number();
regions_id_map.entry(peer.id).or_default().push(region_id);
}
for peer in route.follower_peers.iter() {
let region_number = route.region.id.region_number();
regions_id_map
.entry(peer.id)
.or_default()
.add_follower_region(region_number);
let region_id = route.region.id.region_number();
regions_id_map.entry(peer.id).or_default().push(region_id);
}
}
for (_, region_role_set) in regions_id_map.iter_mut() {
// Sort the regions in ascending order.
region_role_set.sort()
for (_, regions) in regions_id_map.iter_mut() {
// id asc
regions.sort()
}
regions_id_map
}
@@ -461,7 +455,6 @@ impl From<PbPartition> for Partition {
#[cfg(test)]
mod tests {
use super::*;
use crate::key::RegionRoleSet;
#[test]
fn test_leader_is_downgraded() {
@@ -618,8 +611,8 @@ mod tests {
let distribution = region_distribution(&region_routes);
assert_eq!(distribution.len(), 3);
assert_eq!(distribution[&1], RegionRoleSet::new(vec![1], vec![2]));
assert_eq!(distribution[&2], RegionRoleSet::new(vec![2], vec![1]));
assert_eq!(distribution[&3], RegionRoleSet::new(vec![], vec![1, 2]));
assert_eq!(distribution[&1], vec![1, 2]);
assert_eq!(distribution[&2], vec![1, 2]);
assert_eq!(distribution[&3], vec![1, 2]);
}
}

View File

@@ -133,18 +133,6 @@ pub enum Error {
source: datatypes::error::Error,
},
#[snafu(display(
"Failed to downcast vector of type '{:?}' to type '{:?}'",
from_type,
to_type
))]
DowncastVector {
from_type: ConcreteDataType,
to_type: ConcreteDataType,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Error occurs when performing arrow computation"))]
ArrowCompute {
#[snafu(source)]
@@ -204,8 +192,6 @@ impl ErrorExt for Error {
| Error::PhysicalExpr { .. }
| Error::RecordBatchSliceIndexOverflow { .. } => StatusCode::Internal,
Error::DowncastVector { .. } => StatusCode::Unexpected,
Error::PollStream { .. } => StatusCode::EngineExecuteQuery,
Error::ArrowCompute { .. } => StatusCode::IllegalState,

View File

@@ -30,16 +30,13 @@ pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecord
use datatypes::arrow::compute::SortOptions;
pub use datatypes::arrow::record_batch::RecordBatch as DfRecordBatch;
use datatypes::arrow::util::pretty;
use datatypes::prelude::{ConcreteDataType, VectorRef};
use datatypes::scalars::{ScalarVector, ScalarVectorBuilder};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::types::json_type_value_to_string;
use datatypes::vectors::{BinaryVector, StringVectorBuilder};
use datatypes::prelude::VectorRef;
use datatypes::schema::{Schema, SchemaRef};
use error::Result;
use futures::task::{Context, Poll};
use futures::{Stream, TryStreamExt};
pub use recordbatch::RecordBatch;
use snafu::{ensure, OptionExt, ResultExt};
use snafu::{ensure, ResultExt};
pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
fn name(&self) -> &str {
@@ -61,146 +58,6 @@ pub struct OrderOption {
pub options: SortOptions,
}
/// A wrapper that maps a [RecordBatchStream] to a new [RecordBatchStream] by applying a function to each [RecordBatch].
///
/// The mapper function is applied to each [RecordBatch] in the stream.
/// The schema of the new [RecordBatchStream] is the same as the schema of the inner [RecordBatchStream] after applying the schema mapper function.
/// The output ordering of the new [RecordBatchStream] is the same as the output ordering of the inner [RecordBatchStream].
/// The metrics of the new [RecordBatchStream] is the same as the metrics of the inner [RecordBatchStream] if it is not `None`.
pub struct SendableRecordBatchMapper {
inner: SendableRecordBatchStream,
/// The mapper function is applied to each [RecordBatch] in the stream.
/// The original schema and the mapped schema are passed to the mapper function.
mapper: fn(RecordBatch, &SchemaRef, &SchemaRef) -> Result<RecordBatch>,
/// The schema of the new [RecordBatchStream] is the same as the schema of the inner [RecordBatchStream] after applying the schema mapper function.
schema: SchemaRef,
/// Whether the mapper function is applied to each [RecordBatch] in the stream.
apply_mapper: bool,
}
/// Maps the json type to string in the batch.
///
/// The json type is mapped to string by converting the json value to string.
/// The batch is updated to have the same number of columns as the original batch,
/// but with the json type mapped to string.
pub fn map_json_type_to_string(
batch: RecordBatch,
original_schema: &SchemaRef,
mapped_schema: &SchemaRef,
) -> Result<RecordBatch> {
let mut vectors = Vec::with_capacity(original_schema.column_schemas().len());
for (vector, schema) in batch.columns.iter().zip(original_schema.column_schemas()) {
if let ConcreteDataType::Json(j) = schema.data_type {
let mut string_vector_builder = StringVectorBuilder::with_capacity(vector.len());
let binary_vector = vector
.as_any()
.downcast_ref::<BinaryVector>()
.with_context(|| error::DowncastVectorSnafu {
from_type: schema.data_type.clone(),
to_type: ConcreteDataType::binary_datatype(),
})?;
for value in binary_vector.iter_data() {
let Some(value) = value else {
string_vector_builder.push(None);
continue;
};
let string_value =
json_type_value_to_string(value, &j.format).with_context(|_| {
error::CastVectorSnafu {
from_type: schema.data_type.clone(),
to_type: ConcreteDataType::string_datatype(),
}
})?;
string_vector_builder.push(Some(string_value.as_str()));
}
let string_vector = string_vector_builder.finish();
vectors.push(Arc::new(string_vector) as VectorRef);
} else {
vectors.push(vector.clone());
}
}
RecordBatch::new(mapped_schema.clone(), vectors)
}
/// Maps the json type to string in the schema.
///
/// The json type is mapped to string by converting the json value to string.
/// The schema is updated to have the same number of columns as the original schema,
/// but with the json type mapped to string.
///
/// Returns the new schema and whether the schema needs to be mapped to string.
pub fn map_json_type_to_string_schema(schema: SchemaRef) -> (SchemaRef, bool) {
let mut new_columns = Vec::with_capacity(schema.column_schemas().len());
let mut apply_mapper = false;
for column in schema.column_schemas() {
if matches!(column.data_type, ConcreteDataType::Json(_)) {
new_columns.push(ColumnSchema::new(
column.name.to_string(),
ConcreteDataType::string_datatype(),
column.is_nullable(),
));
apply_mapper = true;
} else {
new_columns.push(column.clone());
}
}
(Arc::new(Schema::new(new_columns)), apply_mapper)
}
impl SendableRecordBatchMapper {
/// Creates a new [SendableRecordBatchMapper] with the given inner [RecordBatchStream], mapper function, and schema mapper function.
pub fn new(
inner: SendableRecordBatchStream,
mapper: fn(RecordBatch, &SchemaRef, &SchemaRef) -> Result<RecordBatch>,
schema_mapper: fn(SchemaRef) -> (SchemaRef, bool),
) -> Self {
let (mapped_schema, apply_mapper) = schema_mapper(inner.schema());
Self {
inner,
mapper,
schema: mapped_schema,
apply_mapper,
}
}
}
impl RecordBatchStream for SendableRecordBatchMapper {
fn name(&self) -> &str {
"SendableRecordBatchMapper"
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn output_ordering(&self) -> Option<&[OrderOption]> {
self.inner.output_ordering()
}
fn metrics(&self) -> Option<RecordBatchMetrics> {
self.inner.metrics()
}
}
impl Stream for SendableRecordBatchMapper {
type Item = Result<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.apply_mapper {
Pin::new(&mut self.inner).poll_next(cx).map(|opt| {
opt.map(|result| {
result
.and_then(|batch| (self.mapper)(batch, &self.inner.schema(), &self.schema))
})
})
} else {
Pin::new(&mut self.inner).poll_next(cx)
}
}
}
/// EmptyRecordBatchStream can be used to create a RecordBatchStream
/// that will produce no results
pub struct EmptyRecordBatchStream {

View File

@@ -1,11 +0,0 @@
[package]
name = "stat"
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
nix.workspace = true
[lints]
workspace = true

View File

@@ -1,183 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
use std::fs::read_to_string;
use std::path::Path;
#[cfg(target_os = "linux")]
use nix::sys::{statfs, statfs::statfs};
/// `MAX_VALUE` is used to indicate that the resource is unlimited.
pub const MAX_VALUE: i64 = -1;
const CGROUP_UNIFIED_MOUNTPOINT: &str = "/sys/fs/cgroup";
const MEMORY_MAX_FILE_CGROUP_V2: &str = "memory.max";
const MEMORY_MAX_FILE_CGROUP_V1: &str = "memory.limit_in_bytes";
const CPU_MAX_FILE_CGROUP_V2: &str = "cpu.max";
const CPU_QUOTA_FILE_CGROUP_V1: &str = "cpu.cfs_quota_us";
const CPU_PERIOD_FILE_CGROUP_V1: &str = "cpu.cfs_period_us";
// `MAX_VALUE_CGROUP_V2` string in `/sys/fs/cgroup/cpu.max` and `/sys/fs/cgroup/memory.max` to indicate that the resource is unlimited.
const MAX_VALUE_CGROUP_V2: &str = "max";
// For cgroup v1, if the memory is unlimited, it will return a very large value(different from platform) that close to 2^63.
// For easier comparison, if the memory limit is larger than 1PB we consider it as unlimited.
const MAX_MEMORY_IN_BYTES: i64 = 1125899906842624; // 1PB
/// Get the limit of memory in bytes.
///
/// - If the memory is unlimited, return `-1`.
/// - Return `None` if it fails to read the memory limit or not on linux.
pub fn get_memory_limit() -> Option<i64> {
#[cfg(target_os = "linux")]
{
let memory_max_file = if is_cgroup_v2()? {
// Read `/sys/fs/cgroup/memory.max` to get the memory limit.
MEMORY_MAX_FILE_CGROUP_V2
} else {
// Read `/sys/fs/cgroup/memory.limit_in_bytes` to get the memory limit.
MEMORY_MAX_FILE_CGROUP_V1
};
// For cgroup v1, it will return a very large value(different from platform) if the memory is unlimited.
let memory_limit =
read_value_from_file(Path::new(CGROUP_UNIFIED_MOUNTPOINT).join(memory_max_file))?;
// If memory limit exceeds 1PB(cgroup v1), consider it as unlimited.
if memory_limit > MAX_MEMORY_IN_BYTES {
return Some(MAX_VALUE);
}
Some(memory_limit)
}
#[cfg(not(target_os = "linux"))]
None
}
/// Get the limit of cpu in millicores.
///
/// - If the cpu is unlimited, return `-1`.
/// - Return `None` if it fails to read the cpu limit or not on linux.
pub fn get_cpu_limit() -> Option<i64> {
#[cfg(target_os = "linux")]
if is_cgroup_v2()? {
// Read `/sys/fs/cgroup/cpu.max` to get the cpu limit.
get_cgroup_v2_cpu_limit(Path::new(CGROUP_UNIFIED_MOUNTPOINT).join(CPU_MAX_FILE_CGROUP_V2))
} else {
// Read `/sys/fs/cgroup/cpu.cfs_quota_us` and `/sys/fs/cgroup/cpu.cfs_period_us` to get the cpu limit.
let quota = read_value_from_file(
Path::new(CGROUP_UNIFIED_MOUNTPOINT).join(CPU_QUOTA_FILE_CGROUP_V1),
)?;
if quota == MAX_VALUE {
return Some(MAX_VALUE);
}
let period = read_value_from_file(
Path::new(CGROUP_UNIFIED_MOUNTPOINT).join(CPU_PERIOD_FILE_CGROUP_V1),
)?;
// Return the cpu limit in millicores.
Some(quota * 1000 / period)
}
#[cfg(not(target_os = "linux"))]
None
}
// Check whether the cgroup is v2.
// - Return `true` if the cgroup is v2, otherwise return `false`.
// - Return `None` if the detection fails or not on linux.
fn is_cgroup_v2() -> Option<bool> {
#[cfg(target_os = "linux")]
{
let path = Path::new(CGROUP_UNIFIED_MOUNTPOINT);
let fs_stat = statfs(path).ok()?;
Some(fs_stat.filesystem_type() == statfs::CGROUP2_SUPER_MAGIC)
}
#[cfg(not(target_os = "linux"))]
None
}
fn read_value_from_file<P: AsRef<Path>>(path: P) -> Option<i64> {
let content = read_to_string(&path).ok()?;
// If the content starts with "max", return `MAX_VALUE`.
if content.starts_with(MAX_VALUE_CGROUP_V2) {
return Some(MAX_VALUE);
}
content.trim().parse::<i64>().ok()
}
fn get_cgroup_v2_cpu_limit<P: AsRef<Path>>(path: P) -> Option<i64> {
let content = read_to_string(&path).ok()?;
let fields = content.trim().split(' ').collect::<Vec<&str>>();
if fields.len() != 2 {
return None;
}
// If the cpu is unlimited, it will be `-1`.
let quota = fields[0].trim();
if quota == MAX_VALUE_CGROUP_V2 {
return Some(MAX_VALUE);
}
let quota = quota.parse::<i64>().ok()?;
let period = fields[1].trim();
let period = period.parse::<i64>().ok()?;
// Return the cpu limit in millicores.
Some(quota * 1000 / period)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_read_value_from_file() {
assert_eq!(
read_value_from_file(Path::new("testdata").join("memory.max")).unwrap(),
100000
);
assert_eq!(
read_value_from_file(Path::new("testdata").join("memory.max.unlimited")).unwrap(),
MAX_VALUE
);
assert_eq!(read_value_from_file(Path::new("non_existent_file")), None);
}
#[test]
fn test_get_cgroup_v2_cpu_limit() {
assert_eq!(
get_cgroup_v2_cpu_limit(Path::new("testdata").join("cpu.max")).unwrap(),
1500
);
assert_eq!(
get_cgroup_v2_cpu_limit(Path::new("testdata").join("cpu.max.unlimited")).unwrap(),
MAX_VALUE
);
assert_eq!(
get_cgroup_v2_cpu_limit(Path::new("non_existent_file")),
None
);
}
}

View File

@@ -1 +0,0 @@
150000 100000

View File

@@ -1 +0,0 @@
max 100000

View File

@@ -1 +0,0 @@
100000

View File

@@ -1 +0,0 @@
max

View File

@@ -37,9 +37,6 @@ use crate::tracing_sampler::{create_sampler, TracingSampleOptions};
pub const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317";
/// The default logs directory.
pub const DEFAULT_LOGGING_DIR: &str = "logs";
// Handle for reloading log level
pub static RELOAD_HANDLE: OnceCell<tracing_subscriber::reload::Handle<Targets, Registry>> =
OnceCell::new();
@@ -136,8 +133,7 @@ impl Eq for LoggingOptions {}
impl Default for LoggingOptions {
fn default() -> Self {
Self {
// The directory path will be configured at application startup, typically using the data home directory as a base.
dir: "".to_string(),
dir: "./greptimedb_data/logs".to_string(),
level: None,
log_format: LogFormat::Text,
enable_otlp_tracing: false,

View File

@@ -6,7 +6,6 @@ license.workspace = true
[features]
testing = []
enterprise = []
[lints]
workspace = true

View File

@@ -18,7 +18,7 @@ use core::time::Duration;
use common_base::readable_size::ReadableSize;
use common_base::secrets::{ExposeSecret, SecretString};
use common_config::{Configurable, DEFAULT_DATA_HOME};
use common_config::Configurable;
pub use common_procedure::options::ProcedureConfig;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
use common_wal::config::DatanodeWalConfig;
@@ -36,6 +36,9 @@ use servers::http::HttpOptions;
pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5);
/// Default data home in file storage
const DEFAULT_DATA_HOME: &str = "./greptimedb_data";
/// Object storage config
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type")]

View File

@@ -372,7 +372,6 @@ impl DatanodeBuilder {
opts.max_concurrent_queries,
//TODO: revaluate the hardcoded timeout on the next version of datanode concurrency limiter.
Duration::from_millis(100),
opts.grpc.flight_compression,
);
let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
@@ -560,8 +559,6 @@ async fn open_all_regions(
init_regions_parallelism: usize,
) -> Result<()> {
let mut regions = vec![];
#[cfg(feature = "enterprise")]
let mut follower_regions = vec![];
for table_value in table_values {
for region_number in table_value.regions {
// Augments region options with wal options if a wal options is provided.
@@ -579,24 +576,6 @@ async fn open_all_regions(
region_options,
));
}
#[cfg(feature = "enterprise")]
for region_number in table_value.follower_regions {
// Augments region options with wal options if a wal options is provided.
let mut region_options = table_value.region_info.region_options.clone();
prepare_wal_options(
&mut region_options,
RegionId::new(table_value.table_id, region_number),
&table_value.region_info.region_wal_options,
);
follower_regions.push((
RegionId::new(table_value.table_id, region_number),
table_value.region_info.engine.clone(),
table_value.region_info.region_storage_path.clone(),
region_options,
));
}
}
let num_regions = regions.len();
info!("going to open {} region(s)", num_regions);
@@ -638,43 +617,6 @@ async fn open_all_regions(
}
}
}
#[cfg(feature = "enterprise")]
if !follower_regions.is_empty() {
info!(
"going to open {} follower region(s)",
follower_regions.len()
);
let mut region_requests = Vec::with_capacity(follower_regions.len());
for (region_id, engine, store_path, options) in follower_regions {
let region_dir = region_dir(&store_path, region_id);
region_requests.push((
region_id,
RegionOpenRequest {
engine,
region_dir,
options,
skip_wal_replay: true,
},
));
}
let open_regions = region_server
.handle_batch_open_requests(init_regions_parallelism, region_requests)
.await?;
ensure!(
open_regions.len() == num_regions,
error::UnexpectedSnafu {
violated: format!(
"Expected to open {} of follower regions, only {} of regions has opened",
num_regions,
open_regions.len()
)
}
);
}
info!("all regions are opened");
Ok(())
@@ -690,7 +632,6 @@ mod tests {
use common_base::Plugins;
use common_meta::cache::LayeredCacheRegistryBuilder;
use common_meta::key::datanode_table::DatanodeTableManager;
use common_meta::key::RegionRoleSet;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use mito2::engine::MITO_ENGINE_NAME;
@@ -710,7 +651,7 @@ mod tests {
"foo/bar/weny",
HashMap::from([("foo".to_string(), "bar".to_string())]),
HashMap::default(),
BTreeMap::from([(0, RegionRoleSet::new(vec![0, 1, 2], vec![]))]),
BTreeMap::from([(0, vec![0, 1, 2])]),
)
.unwrap();

View File

@@ -50,7 +50,6 @@ use query::QueryEngineRef;
use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
use servers::grpc::region_server::RegionServerHandler;
use servers::grpc::FlightCompression;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metric_engine_consts::{
@@ -81,7 +80,6 @@ use crate::event_listener::RegionServerEventListenerRef;
#[derive(Clone)]
pub struct RegionServer {
inner: Arc<RegionServerInner>,
flight_compression: FlightCompression,
}
pub struct RegionStat {
@@ -95,7 +93,6 @@ impl RegionServer {
query_engine: QueryEngineRef,
runtime: Runtime,
event_listener: RegionServerEventListenerRef,
flight_compression: FlightCompression,
) -> Self {
Self::with_table_provider(
query_engine,
@@ -104,7 +101,6 @@ impl RegionServer {
Arc::new(DummyTableProviderFactory),
0,
Duration::from_millis(0),
flight_compression,
)
}
@@ -115,7 +111,6 @@ impl RegionServer {
table_provider_factory: TableProviderFactoryRef,
max_concurrent_queries: usize,
concurrent_query_limiter_timeout: Duration,
flight_compression: FlightCompression,
) -> Self {
Self {
inner: Arc::new(RegionServerInner::new(
@@ -128,7 +123,6 @@ impl RegionServer {
concurrent_query_limiter_timeout,
),
)),
flight_compression,
}
}
@@ -542,11 +536,7 @@ impl FlightCraft for RegionServer {
.trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
.await?;
let stream = Box::pin(FlightRecordBatchStream::new(
result,
tracing_context,
self.flight_compression,
));
let stream = Box::pin(FlightRecordBatchStream::new(result, tracing_context));
Ok(Response::new(stream))
}
}

View File

@@ -19,16 +19,16 @@ use std::time::Duration;
use api::region::RegionResponse;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_function::function_factory::ScalarFunctionFactory;
use common_function::function::FunctionRef;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_query::Output;
use common_runtime::runtime::{BuilderBuild, RuntimeTrait};
use common_runtime::Runtime;
use datafusion_expr::{AggregateUDF, LogicalPlan};
use datafusion_expr::LogicalPlan;
use query::dataframe::DataFrame;
use query::planner::LogicalPlanner;
use query::query_engine::{DescribeResult, QueryEngineState};
use query::{QueryEngine, QueryEngineContext};
use servers::grpc::FlightCompression;
use session::context::QueryContextRef;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
@@ -76,9 +76,9 @@ impl QueryEngine for MockQueryEngine {
unimplemented!()
}
fn register_aggregate_function(&self, _func: AggregateUDF) {}
fn register_aggregate_function(&self, _func: AggregateFunctionMetaRef) {}
fn register_scalar_function(&self, _func: ScalarFunctionFactory) {}
fn register_function(&self, _func: FunctionRef) {}
fn read_table(&self, _table: TableRef) -> query::error::Result<DataFrame> {
unimplemented!()
@@ -98,7 +98,6 @@ pub fn mock_region_server() -> RegionServer {
Arc::new(MockQueryEngine),
Runtime::builder().build().unwrap(),
Box::new(NoopRegionServerEventListener),
FlightCompression::default(),
)
}

View File

@@ -14,9 +14,8 @@
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
use std::collections::HashMap;
use std::sync::{Arc, Mutex, Weak};
use std::time::{Duration, Instant, SystemTime};
use std::sync::{Arc, Weak};
use std::time::SystemTime;
use api::v1::greptime_request::Request;
use api::v1::CreateTableExpr;
@@ -27,21 +26,20 @@ use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
use common_meta::peer::Peer;
use common_meta::rpc::store::RangeRequest;
use common_query::Output;
use common_telemetry::{debug, warn};
use itertools::Itertools;
use common_telemetry::warn;
use meta_client::client::MetaClient;
use rand::rng;
use rand::seq::SliceRandom;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
use crate::batching_mode::task::BatchingTask;
use crate::batching_mode::{
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT,
GRPC_MAX_RETRIES,
};
use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD;
use crate::{Error, FlowAuthHeader, FlowId};
use crate::{Error, FlowAuthHeader};
/// Just like [`GrpcQueryHandler`] but use BoxedError
///
@@ -76,105 +74,6 @@ impl<
type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
/// Statistics about running query on this frontend from flownode
#[derive(Debug, Default, Clone)]
struct FrontendStat {
/// The query for flow id has been running since this timestamp
since: HashMap<FlowId, Instant>,
/// The average query time for each flow id
/// This is used to calculate the average query time for each flow id
past_query_avg: HashMap<FlowId, (usize, Duration)>,
}
#[derive(Debug, Default, Clone)]
pub struct FrontendStats {
/// The statistics for each flow id
stats: Arc<Mutex<HashMap<String, FrontendStat>>>,
}
impl FrontendStats {
pub fn observe(&self, frontend_addr: &str, flow_id: FlowId) -> FrontendStatsGuard {
let mut stats = self.stats.lock().expect("Failed to lock frontend stats");
let stat = stats.entry(frontend_addr.to_string()).or_default();
stat.since.insert(flow_id, Instant::now());
FrontendStatsGuard {
stats: self.stats.clone(),
frontend_addr: frontend_addr.to_string(),
cur: flow_id,
}
}
/// return frontend addrs sorted by load, from lightest to heaviest
/// The load is calculated as the total average query time for each flow id plus running query's total running time elapsed
pub fn sort_by_load(&self) -> Vec<String> {
let stats = self.stats.lock().expect("Failed to lock frontend stats");
let fe_load_factor = stats
.iter()
.map(|(node_addr, stat)| {
// total expected avg running time for all currently running queries
let total_expect_avg_run_time = stat
.since
.keys()
.map(|f| {
let (count, total_duration) =
stat.past_query_avg.get(f).unwrap_or(&(0, Duration::ZERO));
if *count == 0 {
0.0
} else {
total_duration.as_secs_f64() / *count as f64
}
})
.sum::<f64>();
let total_cur_running_time = stat
.since
.values()
.map(|since| since.elapsed().as_secs_f64())
.sum::<f64>();
(
node_addr.to_string(),
total_expect_avg_run_time + total_cur_running_time,
)
})
.sorted_by(|(_, load_a), (_, load_b)| {
load_a
.partial_cmp(load_b)
.unwrap_or(std::cmp::Ordering::Equal)
})
.collect::<Vec<_>>();
debug!("Frontend load factor: {:?}", fe_load_factor);
for (node_addr, load) in &fe_load_factor {
METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD
.with_label_values(&[&node_addr.to_string()])
.observe(*load);
}
fe_load_factor
.into_iter()
.map(|(addr, _)| addr)
.collect::<Vec<_>>()
}
}
pub struct FrontendStatsGuard {
stats: Arc<Mutex<HashMap<String, FrontendStat>>>,
frontend_addr: String,
cur: FlowId,
}
impl Drop for FrontendStatsGuard {
fn drop(&mut self) {
let mut stats = self.stats.lock().expect("Failed to lock frontend stats");
if let Some(stat) = stats.get_mut(&self.frontend_addr) {
if let Some(since) = stat.since.remove(&self.cur) {
let elapsed = since.elapsed();
let (count, total_duration) = stat.past_query_avg.entry(self.cur).or_default();
*count += 1;
*total_duration += elapsed;
}
}
}
}
/// A simple frontend client able to execute sql using grpc protocol
///
/// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode
@@ -184,7 +83,6 @@ pub enum FrontendClient {
meta_client: Arc<MetaClient>,
chnl_mgr: ChannelManager,
auth: Option<FlowAuthHeader>,
fe_stats: FrontendStats,
},
Standalone {
/// for the sake of simplicity still use grpc even in standalone mode
@@ -216,7 +114,6 @@ impl FrontendClient {
ChannelManager::with_config(cfg)
},
auth,
fe_stats: Default::default(),
}
}
@@ -286,7 +183,7 @@ impl FrontendClient {
/// Get the frontend with recent enough(less than 1 minute from now) `last_activity_ts`
/// and is able to process query
pub(crate) async fn get_random_active_frontend(
async fn get_random_active_frontend(
&self,
catalog: &str,
schema: &str,
@@ -295,7 +192,6 @@ impl FrontendClient {
meta_client: _,
chnl_mgr,
auth,
fe_stats,
} = self
else {
return UnexpectedSnafu {
@@ -312,21 +208,8 @@ impl FrontendClient {
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
let node_addrs_by_load = fe_stats.sort_by_load();
// index+1 to load order asc, so that the lightest node has load 1 and non-existent node has load 0
let addr2load = node_addrs_by_load
.iter()
.enumerate()
.map(|(i, id)| (id.clone(), i + 1))
.collect::<HashMap<_, _>>();
// sort frontends by load, from lightest to heaviest
frontends.sort_by(|(_, a), (_, b)| {
// if not even in stats, treat as 0 load since never been queried
let load_a = addr2load.get(&a.peer.addr).unwrap_or(&0);
let load_b = addr2load.get(&b.peer.addr).unwrap_or(&0);
load_a.cmp(load_b)
});
debug!("Frontend nodes sorted by load: {:?}", frontends);
// shuffle the frontends to avoid always pick the same one
frontends.shuffle(&mut rng());
// found node with maximum last_activity_ts
for (_, node_info) in frontends
@@ -374,7 +257,6 @@ impl FrontendClient {
create: CreateTableExpr,
catalog: &str,
schema: &str,
task: Option<&BatchingTask>,
) -> Result<u32, Error> {
self.handle(
Request::Ddl(api::v1::DdlRequest {
@@ -382,8 +264,7 @@ impl FrontendClient {
}),
catalog,
schema,
None,
task,
&mut None,
)
.await
}
@@ -394,31 +275,15 @@ impl FrontendClient {
req: api::v1::greptime_request::Request,
catalog: &str,
schema: &str,
use_peer: Option<Peer>,
task: Option<&BatchingTask>,
peer_desc: &mut Option<PeerDesc>,
) -> Result<u32, Error> {
match self {
FrontendClient::Distributed {
fe_stats, chnl_mgr, ..
} => {
let db = if let Some(peer) = use_peer {
DatabaseWithPeer::new(
Database::new(
catalog,
schema,
Client::with_manager_and_urls(
chnl_mgr.clone(),
vec![peer.addr.clone()],
),
),
peer,
)
} else {
self.get_random_active_frontend(catalog, schema).await?
};
FrontendClient::Distributed { .. } => {
let db = self.get_random_active_frontend(catalog, schema).await?;
let flow_id = task.map(|t| t.config.flow_id).unwrap_or_default();
let _guard = fe_stats.observe(&db.peer.addr, flow_id);
*peer_desc = Some(PeerDesc::Dist {
peer: db.peer.clone(),
});
db.database
.handle_with_retry(req.clone(), GRPC_MAX_RETRIES)

View File

@@ -53,8 +53,6 @@ pub struct TaskState {
pub(crate) shutdown_rx: oneshot::Receiver<()>,
/// Task handle
pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
/// Slow Query metrics update task handle
pub(crate) slow_query_metric_task: Option<tokio::task::JoinHandle<()>>,
/// min run interval in seconds
pub(crate) min_run_interval: Option<u64>,
@@ -71,7 +69,6 @@ impl TaskState {
exec_state: ExecState::Idle,
shutdown_rx,
task_handle: None,
slow_query_metric_task: None,
min_run_interval: None,
max_filter_num: None,
}
@@ -106,7 +103,7 @@ impl TaskState {
.min(self.last_query_duration)
.max(
self.min_run_interval
.map(Duration::from_secs)
.map(|s| Duration::from_secs(s))
.unwrap_or(MIN_REFRESH_DURATION),
);
@@ -214,69 +211,51 @@ impl DirtyTimeWindows {
// get the first `window_cnt` time windows
let max_time_range = window_size * window_cnt as i32;
let mut to_be_query = BTreeMap::new();
let mut new_windows = self.windows.clone();
let mut cur_time_range = chrono::Duration::zero();
for (idx, (start, end)) in self.windows.iter().enumerate() {
let first_end = start
.add_duration(window_size.to_std().unwrap())
.context(TimeSnafu)?;
let end = end.unwrap_or(first_end);
// if time range is too long, stop
if cur_time_range >= max_time_range {
break;
}
// if we have enough time windows, stop
if idx >= window_cnt {
break;
}
if let Some(x) = end.sub(start) {
if cur_time_range + x <= max_time_range {
to_be_query.insert(*start, Some(end));
new_windows.remove(start);
cur_time_range += x;
} else {
// too large a window, split it
// split at window_size * times
let surplus = max_time_range - cur_time_range;
let times = surplus.num_seconds() / window_size.num_seconds();
let split_offset = window_size * times as i32;
let split_at = start
.add_duration(split_offset.to_std().unwrap())
.context(TimeSnafu)?;
to_be_query.insert(*start, Some(split_at));
// remove the original window
new_windows.remove(start);
new_windows.insert(split_at, Some(end));
cur_time_range += split_offset;
let nth = {
let mut cur_time_range = chrono::Duration::zero();
let mut nth_key = None;
for (idx, (start, end)) in self.windows.iter().enumerate() {
// if time range is too long, stop
if cur_time_range > max_time_range {
nth_key = Some(*start);
break;
}
}
}
self.windows = new_windows;
// if we have enough time windows, stop
if idx >= window_cnt {
nth_key = Some(*start);
break;
}
if let Some(end) = end {
if let Some(x) = end.sub(start) {
cur_time_range += x;
}
}
}
nth_key
};
let first_nth = {
if let Some(nth) = nth {
let mut after = self.windows.split_off(&nth);
std::mem::swap(&mut self.windows, &mut after);
after
} else {
std::mem::take(&mut self.windows)
}
};
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
.with_label_values(&[
flow_id.to_string().as_str(),
format!("{}", window_size).as_str(),
])
.observe(to_be_query.len() as f64);
.with_label_values(&[flow_id.to_string().as_str()])
.observe(first_nth.len() as f64);
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT
.with_label_values(&[
flow_id.to_string().as_str(),
format!("{}", window_size).as_str(),
])
.with_label_values(&[flow_id.to_string().as_str()])
.observe(self.windows.len() as f64);
let full_time_range = to_be_query
let full_time_range = first_nth
.iter()
.fold(chrono::Duration::zero(), |acc, (start, end)| {
if let Some(end) = end {
@@ -287,14 +266,11 @@ impl DirtyTimeWindows {
})
.num_seconds() as f64;
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE
.with_label_values(&[
flow_id.to_string().as_str(),
format!("{}", window_size).as_str(),
])
.with_label_values(&[flow_id.to_string().as_str()])
.observe(full_time_range);
let mut expr_lst = vec![];
for (start, end) in to_be_query.into_iter() {
for (start, end) in first_nth.into_iter() {
// align using time window exprs
let (start, end) = if let Some(ctx) = task_ctx {
let Some(time_window_expr) = &ctx.config.time_window_expr else {
@@ -528,64 +504,6 @@ mod test {
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
)
),
// split range
(
Vec::from_iter((0..20).map(|i|Timestamp::new_second(i*3)).chain(std::iter::once(
Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
))),
(chrono::Duration::seconds(3), None),
BTreeMap::from([
(
Timestamp::new_second(0),
Some(Timestamp::new_second(
60
)),
),
(
Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
Some(Timestamp::new_second(
60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1) + 3
)),
)]),
Some(
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
)
),
// split 2 min into 1 min
(
Vec::from_iter((0..40).map(|i|Timestamp::new_second(i*3))),
(chrono::Duration::seconds(3), None),
BTreeMap::from([
(
Timestamp::new_second(0),
Some(Timestamp::new_second(
40 * 3
)),
)]),
Some(
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
)
),
// split 3s + 1min into 3s + 57s
(
Vec::from_iter(std::iter::once(Timestamp::new_second(0)).chain((0..40).map(|i|Timestamp::new_second(20+i*3)))),
(chrono::Duration::seconds(3), None),
BTreeMap::from([
(
Timestamp::new_second(0),
Some(Timestamp::new_second(
3
)),
),(
Timestamp::new_second(20),
Some(Timestamp::new_second(
140
)),
)]),
Some(
"(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:03' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:00:20' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:17' AS TIMESTAMP))))",
)
),
// expired
(
vec![
@@ -602,8 +520,6 @@ mod test {
None
),
];
// let len = testcases.len();
// let testcases = testcases[(len - 2)..(len - 1)].to_vec();
for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
testcases
{

View File

@@ -61,8 +61,7 @@ use crate::error::{
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
};
use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT,
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
};
use crate::{Error, FlowId};
@@ -82,14 +81,6 @@ pub struct TaskConfig {
query_type: QueryType,
}
impl TaskConfig {
pub fn time_window_size(&self) -> Option<Duration> {
self.time_window_expr
.as_ref()
.and_then(|expr| *expr.time_window_size())
}
}
fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
let stmts =
ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default())
@@ -295,7 +286,7 @@ impl BatchingTask {
let catalog = &self.config.sink_table_name[0];
let schema = &self.config.sink_table_name[1];
frontend_client
.create(expr.clone(), catalog, schema, Some(self))
.create(expr.clone(), catalog, schema)
.await?;
Ok(())
}
@@ -343,53 +334,11 @@ impl BatchingTask {
})?;
let plan = expanded_plan;
let db = frontend_client
.get_random_active_frontend(catalog, schema)
.await?;
let peer_desc = db.peer.clone();
let (tx, mut rx) = oneshot::channel();
let peer_inner = peer_desc.clone();
let window_size_pretty = format!(
"{}s",
self.config.time_window_size().unwrap_or_default().as_secs()
);
let inner_window_size_pretty = window_size_pretty.clone();
let flow_id = self.config.flow_id;
let slow_query_metric_task = tokio::task::spawn(async move {
tokio::time::sleep(SLOW_QUERY_THRESHOLD).await;
METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT
.with_label_values(&[
flow_id.to_string().as_str(),
&peer_inner.to_string(),
inner_window_size_pretty.as_str(),
])
.add(1.0);
while rx.try_recv() == Err(TryRecvError::Empty) {
// sleep for a while before next update
tokio::time::sleep(MIN_REFRESH_DURATION).await;
}
METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT
.with_label_values(&[
flow_id.to_string().as_str(),
&peer_inner.to_string(),
inner_window_size_pretty.as_str(),
])
.sub(1.0);
});
self.state.write().unwrap().slow_query_metric_task = Some(slow_query_metric_task);
let mut peer_desc = None;
let res = {
let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
.with_label_values(&[
flow_id.to_string().as_str(),
format!(
"{}s",
self.config.time_window_size().unwrap_or_default().as_secs()
)
.as_str(),
])
.with_label_values(&[flow_id.to_string().as_str()])
.start_timer();
// hack and special handling the insert logical plan
@@ -418,12 +367,10 @@ impl BatchingTask {
};
frontend_client
.handle(req, catalog, schema, Some(db.peer), Some(self))
.handle(req, catalog, schema, &mut peer_desc)
.await
};
// signaling the slow query metric task to stop
let _ = tx.send(());
let elapsed = instant.elapsed();
if let Ok(affected_rows) = &res {
debug!(
@@ -446,12 +393,7 @@ impl BatchingTask {
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
.with_label_values(&[
flow_id.to_string().as_str(),
&peer_desc.to_string(),
format!(
"{}s",
self.config.time_window_size().unwrap_or_default().as_secs()
)
.as_str(),
&peer_desc.unwrap_or_default().to_string(),
])
.observe(elapsed.as_secs_f64());
}

View File

@@ -31,29 +31,22 @@ lazy_static! {
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME: HistogramVec = register_histogram_vec!(
"greptime_flow_batching_engine_query_time_secs",
"flow batching engine query time(seconds)",
&["flow_id", "time_window_granularity"],
&["flow_id"],
vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!(
"greptime_flow_batching_engine_slow_query_secs",
"flow batching engine slow query(seconds), updated after query finished",
&["flow_id", "peer", "time_window_granularity"],
"flow batching engine slow query(seconds)",
&["flow_id", "peer"],
vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT: GaugeVec =
register_gauge_vec!(
"greptime_flow_batching_engine_real_time_slow_query_number",
"flow batching engine real time slow query number, updated in real time",
&["flow_id", "peer", "time_window_granularity"],
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT: HistogramVec =
register_histogram_vec!(
"greptime_flow_batching_engine_stalled_query_window_cnt",
"flow batching engine stalled query time window count",
&["flow_id", "time_window_granularity"],
&["flow_id"],
vec![0.0, 5., 10., 20., 40.]
)
.unwrap();
@@ -61,7 +54,7 @@ lazy_static! {
register_histogram_vec!(
"greptime_flow_batching_engine_query_window_cnt",
"flow batching engine query time window count",
&["flow_id", "time_window_granularity"],
&["flow_id"],
vec![0.0, 5., 10., 20., 40.]
)
.unwrap();
@@ -69,15 +62,7 @@ lazy_static! {
register_histogram_vec!(
"greptime_flow_batching_engine_query_time_range_secs",
"flow batching engine query time range(seconds)",
&["flow_id", "time_window_granularity"],
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD: HistogramVec =
register_histogram_vec!(
"greptime_flow_batching_engine_guess_fe_load",
"flow batching engine guessed frontend load",
&["fe_addr"],
&["flow_id"],
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
)
.unwrap();

View File

@@ -17,7 +17,7 @@ use std::collections::BTreeMap;
use std::sync::Arc;
use common_error::ext::BoxedError;
use common_function::function::{FunctionContext, FunctionRef};
use common_function::function::FunctionContext;
use datafusion_substrait::extensions::Extensions;
use datatypes::data_type::ConcreteDataType as CDT;
use query::QueryEngine;
@@ -108,13 +108,9 @@ impl FunctionExtensions {
/// register flow-specific functions to the query engine
pub fn register_function_to_query_engine(engine: &Arc<dyn QueryEngine>) {
let tumble_fn = Arc::new(TumbleFunction::new("tumble")) as FunctionRef;
let tumble_start_fn = Arc::new(TumbleFunction::new(TUMBLE_START)) as FunctionRef;
let tumble_end_fn = Arc::new(TumbleFunction::new(TUMBLE_END)) as FunctionRef;
engine.register_scalar_function(tumble_fn.into());
engine.register_scalar_function(tumble_start_fn.into());
engine.register_scalar_function(tumble_end_fn.into());
engine.register_function(Arc::new(TumbleFunction::new("tumble")));
engine.register_function(Arc::new(TumbleFunction::new(TUMBLE_START)));
engine.register_function(Arc::new(TumbleFunction::new(TUMBLE_END)));
}
#[derive(Debug)]

View File

@@ -130,13 +130,7 @@ impl JaegerQueryHandler for Instance {
.await?)
}
async fn get_trace(
&self,
ctx: QueryContextRef,
trace_id: &str,
start_time: Option<i64>,
end_time: Option<i64>,
) -> ServerResult<Output> {
async fn get_trace(&self, ctx: QueryContextRef, trace_id: &str) -> ServerResult<Output> {
// It's equivalent to
//
// ```
@@ -145,25 +139,13 @@ impl JaegerQueryHandler for Instance {
// FROM
// {db}.{trace_table}
// WHERE
// trace_id = '{trace_id}' AND
// timestamp >= {start_time} AND
// timestamp <= {end_time}
// trace_id = '{trace_id}'
// ORDER BY
// timestamp DESC
// ```.
let selects = vec![wildcard()];
let mut filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
if let Some(start_time) = start_time {
// Microseconds to nanoseconds.
filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time * 1_000)));
}
if let Some(end_time) = end_time {
// Microseconds to nanoseconds.
filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time * 1_000)));
}
let filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
Ok(query_trace_table(
ctx,

View File

@@ -154,7 +154,6 @@ where
ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()),
user_provider.clone(),
runtime,
opts.grpc.flight_compression,
);
let grpc_server = builder

View File

@@ -31,6 +31,8 @@ use common_meta::kv_backend::rds::MySqlStore;
#[cfg(feature = "pg_kvbackend")]
use common_meta::kv_backend::rds::PgStore;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
#[cfg(feature = "pg_kvbackend")]
use common_telemetry::error;
use common_telemetry::info;
#[cfg(feature = "pg_kvbackend")]
use deadpool_postgres::{Config, Runtime};
@@ -142,8 +144,7 @@ impl MetasrvInstance {
let (serve_state_tx, serve_state_rx) = oneshot::channel();
let socket_addr =
bootstrap_metasrv_with_router(&self.opts.grpc.bind_addr, router, serve_state_tx, rx)
.await?;
bootstrap_metasrv_with_router(&self.opts.bind_addr, router, serve_state_tx, rx).await?;
self.bind_addr = Some(socket_addr);
let addr = self.opts.http.addr.parse().context(error::ParseAddrSnafu {
@@ -259,7 +260,7 @@ pub async fn metasrv_builder(
let etcd_client = create_etcd_client(&opts.store_addrs).await?;
let kv_backend = EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops);
let election = EtcdElection::with_etcd_client(
&opts.grpc.server_addr,
&opts.server_addr,
etcd_client,
opts.store_key_prefix.clone(),
)
@@ -269,41 +270,22 @@ pub async fn metasrv_builder(
}
#[cfg(feature = "pg_kvbackend")]
(None, BackendImpl::PostgresStore) => {
use std::time::Duration;
use common_meta::distributed_time_constants::POSTGRES_KEEP_ALIVE_SECS;
use crate::election::rds::postgres::ElectionPgClient;
let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS);
let execution_timeout = Duration::from_secs(META_LEASE_SECS);
let statement_timeout = Duration::from_secs(META_LEASE_SECS);
let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS);
let mut cfg = Config::new();
cfg.keepalives = Some(true);
cfg.keepalives_idle = Some(Duration::from_secs(POSTGRES_KEEP_ALIVE_SECS));
// We use a separate pool for election since we need a different session keep-alive idle time.
let pool = create_postgres_pool_with(&opts.store_addrs, cfg).await?;
let election_client =
ElectionPgClient::new(pool, execution_timeout, meta_lease_ttl, statement_timeout)?;
let election = PgElection::with_pg_client(
opts.grpc.server_addr.clone(),
election_client,
opts.store_key_prefix.clone(),
candidate_lease_ttl,
meta_lease_ttl,
&opts.meta_table_name,
opts.meta_election_lock_id,
)
.await?;
let pool = create_postgres_pool(&opts.store_addrs).await?;
let kv_backend = PgStore::with_pg_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
.await
.context(error::KvBackendSnafu)?;
// Client for election should be created separately since we need a different session keep-alive idle time.
let election_client = create_postgres_client(opts).await?;
let election = PgElection::with_pg_client(
opts.server_addr.clone(),
election_client,
opts.store_key_prefix.clone(),
CANDIDATE_LEASE_SECS,
META_LEASE_SECS,
&opts.meta_table_name,
opts.meta_election_lock_id,
)
.await?;
(kv_backend, Some(election))
}
#[cfg(feature = "mysql_kvbackend")]
@@ -317,7 +299,7 @@ pub async fn metasrv_builder(
let election_table_name = opts.meta_table_name.clone() + "_election";
let election_client = create_mysql_client(opts).await?;
let election = MySqlElection::with_mysql_client(
opts.grpc.server_addr.clone(),
opts.server_addr.clone(),
election_client,
opts.store_key_prefix.clone(),
CANDIDATE_LEASE_SECS,
@@ -390,24 +372,31 @@ pub async fn create_etcd_client(store_addrs: &[String]) -> Result<Client> {
}
#[cfg(feature = "pg_kvbackend")]
/// Creates a pool for the Postgres backend.
///
/// It only use first store addr to create a pool.
pub async fn create_postgres_pool(store_addrs: &[String]) -> Result<deadpool_postgres::Pool> {
create_postgres_pool_with(store_addrs, Config::new()).await
async fn create_postgres_client(opts: &MetasrvOptions) -> Result<tokio_postgres::Client> {
let postgres_url = opts
.store_addrs
.first()
.context(error::InvalidArgumentsSnafu {
err_msg: "empty store addrs",
})?;
let (client, connection) = tokio_postgres::connect(postgres_url, NoTls)
.await
.context(error::ConnectPostgresSnafu)?;
tokio::spawn(async move {
if let Err(e) = connection.await {
error!(e; "connection error");
}
});
Ok(client)
}
#[cfg(feature = "pg_kvbackend")]
/// Creates a pool for the Postgres backend.
///
/// It only use first store addr to create a pool, and use the given config to create a pool.
pub async fn create_postgres_pool_with(
store_addrs: &[String],
mut cfg: Config,
) -> Result<deadpool_postgres::Pool> {
pub async fn create_postgres_pool(store_addrs: &[String]) -> Result<deadpool_postgres::Pool> {
let postgres_url = store_addrs.first().context(error::InvalidArgumentsSnafu {
err_msg: "empty store addrs",
})?;
let mut cfg = Config::new();
cfg.url = Some(postgres_url.to_string());
let pool = cfg
.create_pool(Some(Runtime::Tokio1), NoTls)

View File

@@ -157,11 +157,6 @@ pub trait Election: Send + Sync {
/// but only one can be the leader at a time.
async fn campaign(&self) -> Result<()>;
/// Resets the campaign.
///
/// Reset the client and the leader flag if needed.
async fn reset_campaign(&self) {}
/// Returns the leader value for the current election.
async fn leader(&self) -> Result<Self::Leader>;

View File

@@ -18,12 +18,11 @@ use std::time::Duration;
use common_telemetry::{error, warn};
use common_time::Timestamp;
use deadpool_postgres::{Manager, Pool};
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::{broadcast, RwLock};
use tokio::sync::broadcast;
use tokio::time::MissedTickBehavior;
use tokio_postgres::types::ToSql;
use tokio_postgres::Row;
use tokio_postgres::Client;
use crate::election::rds::{parse_value_and_expire_time, Lease, RdsLeaderKey, LEASE_SEP};
use crate::election::{
@@ -31,14 +30,15 @@ use crate::election::{
CANDIDATES_ROOT, ELECTION_KEY,
};
use crate::error::{
DeserializeFromJsonSnafu, GetPostgresClientSnafu, NoLeaderSnafu, PostgresExecutionSnafu,
Result, SerializeToJsonSnafu, SqlExecutionTimeoutSnafu, UnexpectedSnafu,
DeserializeFromJsonSnafu, NoLeaderSnafu, PostgresExecutionSnafu, Result, SerializeToJsonSnafu,
UnexpectedSnafu,
};
use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
struct ElectionSqlFactory<'a> {
lock_id: u64,
table_name: &'a str,
meta_lease_ttl_secs: u64,
}
struct ElectionSqlSet {
@@ -88,10 +88,11 @@ struct ElectionSqlSet {
}
impl<'a> ElectionSqlFactory<'a> {
fn new(lock_id: u64, table_name: &'a str) -> Self {
fn new(lock_id: u64, table_name: &'a str, meta_lease_ttl_secs: u64) -> Self {
Self {
lock_id,
table_name,
meta_lease_ttl_secs,
}
}
@@ -107,6 +108,15 @@ impl<'a> ElectionSqlFactory<'a> {
}
}
// Currently the session timeout is longer than the leader lease time.
// So the leader will renew the lease twice before the session timeout if everything goes well.
fn set_idle_session_timeout_sql(&self) -> String {
format!(
"SET idle_session_timeout = '{}s';",
self.meta_lease_ttl_secs + 1
)
}
fn campaign_sql(&self) -> String {
format!("SELECT pg_try_advisory_lock({})", self.lock_id)
}
@@ -161,165 +171,46 @@ impl<'a> ElectionSqlFactory<'a> {
}
}
/// PgClient for election.
pub struct ElectionPgClient {
current: Option<deadpool::managed::Object<Manager>>,
pool: Pool,
/// The client-side timeout for statement execution.
///
/// This timeout is enforced by the client application and is independent of any server-side timeouts.
/// If a statement takes longer than this duration to execute, the client will abort the operation.
execution_timeout: Duration,
/// The idle session timeout.
///
/// This timeout is configured per client session and is enforced by the PostgreSQL server.
/// If a session remains idle for longer than this duration, the server will terminate it.
idle_session_timeout: Duration,
/// The statement timeout.
///
/// This timeout is configured per client session and is enforced by the PostgreSQL server.
/// If a statement takes longer than this duration to execute, the server will abort it.
statement_timeout: Duration,
}
impl ElectionPgClient {
pub fn new(
pool: Pool,
execution_timeout: Duration,
idle_session_timeout: Duration,
statement_timeout: Duration,
) -> Result<ElectionPgClient> {
Ok(ElectionPgClient {
current: None,
pool,
execution_timeout,
idle_session_timeout,
statement_timeout,
})
}
fn set_idle_session_timeout_sql(&self) -> String {
format!(
"SET idle_session_timeout = '{}s';",
self.idle_session_timeout.as_secs()
)
}
fn set_statement_timeout_sql(&self) -> String {
format!(
"SET statement_timeout = '{}s';",
self.statement_timeout.as_secs()
)
}
async fn reset_client(&mut self) -> Result<()> {
self.current = None;
self.maybe_init_client().await
}
async fn maybe_init_client(&mut self) -> Result<()> {
if self.current.is_none() {
let client = self.pool.get().await.context(GetPostgresClientSnafu)?;
self.current = Some(client);
// Set idle session timeout and statement timeout.
let idle_session_timeout_sql = self.set_idle_session_timeout_sql();
self.execute(&idle_session_timeout_sql, &[]).await?;
let statement_timeout_sql = self.set_statement_timeout_sql();
self.execute(&statement_timeout_sql, &[]).await?;
}
Ok(())
}
/// Returns the result of the query.
///
/// # Panics
/// if `current` is `None`.
async fn execute(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64> {
let result = tokio::time::timeout(
self.execution_timeout,
self.current.as_ref().unwrap().execute(sql, params),
)
.await
.map_err(|_| {
SqlExecutionTimeoutSnafu {
sql: sql.to_string(),
duration: self.execution_timeout,
}
.build()
})?;
result.context(PostgresExecutionSnafu { sql })
}
/// Returns the result of the query.
///
/// # Panics
/// if `current` is `None`.
async fn query(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>> {
let result = tokio::time::timeout(
self.execution_timeout,
self.current.as_ref().unwrap().query(sql, params),
)
.await
.map_err(|_| {
SqlExecutionTimeoutSnafu {
sql: sql.to_string(),
duration: self.execution_timeout,
}
.build()
})?;
result.context(PostgresExecutionSnafu { sql })
}
}
/// PostgreSql implementation of Election.
pub struct PgElection {
leader_value: String,
pg_client: RwLock<ElectionPgClient>,
client: Client,
is_leader: AtomicBool,
leader_infancy: AtomicBool,
leader_watcher: broadcast::Sender<LeaderChangeMessage>,
store_key_prefix: String,
candidate_lease_ttl: Duration,
meta_lease_ttl: Duration,
candidate_lease_ttl_secs: u64,
meta_lease_ttl_secs: u64,
sql_set: ElectionSqlSet,
}
impl PgElection {
async fn maybe_init_client(&self) -> Result<()> {
if self.pg_client.read().await.current.is_none() {
self.pg_client.write().await.maybe_init_client().await?;
}
Ok(())
}
pub async fn with_pg_client(
leader_value: String,
pg_client: ElectionPgClient,
client: Client,
store_key_prefix: String,
candidate_lease_ttl: Duration,
meta_lease_ttl: Duration,
candidate_lease_ttl_secs: u64,
meta_lease_ttl_secs: u64,
table_name: &str,
lock_id: u64,
) -> Result<ElectionRef> {
let sql_factory = ElectionSqlFactory::new(lock_id, table_name);
let sql_factory = ElectionSqlFactory::new(lock_id, table_name, meta_lease_ttl_secs);
// Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead advisory lock.
client
.execute(&sql_factory.set_idle_session_timeout_sql(), &[])
.await
.context(PostgresExecutionSnafu)?;
let tx = listen_leader_change(leader_value.clone());
Ok(Arc::new(Self {
leader_value,
pg_client: RwLock::new(pg_client),
client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(false),
leader_watcher: tx,
store_key_prefix,
candidate_lease_ttl,
meta_lease_ttl,
candidate_lease_ttl_secs,
meta_lease_ttl_secs,
sql_set: sql_factory.build(),
}))
}
@@ -358,17 +249,18 @@ impl Election for PgElection {
input: format!("{node_info:?}"),
})?;
let res = self
.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl_secs)
.await?;
// May registered before, just update the lease.
if !res {
self.delete_value(&key).await?;
self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl_secs)
.await?;
}
// Check if the current lease has expired and renew the lease.
let mut keep_alive_interval = tokio::time::interval(self.candidate_lease_ttl / 2);
let mut keep_alive_interval =
tokio::time::interval(Duration::from_secs(self.candidate_lease_ttl_secs / 2));
loop {
let _ = keep_alive_interval.tick().await;
@@ -390,8 +282,13 @@ impl Election for PgElection {
);
// Safety: origin is Some since we are using `get_value_with_lease` with `true`.
self.update_value_with_lease(&key, &lease.origin, &node_info, self.candidate_lease_ttl)
.await?;
self.update_value_with_lease(
&key,
&lease.origin,
&node_info,
self.candidate_lease_ttl_secs,
)
.await?;
}
}
@@ -424,17 +321,16 @@ impl Election for PgElection {
/// - If the lock is not acquired (result is false), it calls the `follower_action` method
/// to perform actions as a follower.
async fn campaign(&self) -> Result<()> {
let mut keep_alive_interval = tokio::time::interval(self.meta_lease_ttl / 2);
let mut keep_alive_interval =
tokio::time::interval(Duration::from_secs(self.meta_lease_ttl_secs / 2));
keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
self.maybe_init_client().await?;
loop {
let res = self
.pg_client
.read()
.await
.client
.query(&self.sql_set.campaign, &[])
.await?;
.await
.context(PostgresExecutionSnafu)?;
let row = res.first().context(UnexpectedSnafu {
violated: "Failed to get the result of acquiring advisory lock",
})?;
@@ -453,12 +349,6 @@ impl Election for PgElection {
}
}
async fn reset_campaign(&self) {
if let Err(err) = self.pg_client.write().await.reset_client().await {
error!(err; "Failed to reset client");
}
}
async fn leader(&self) -> Result<Self::Leader> {
if self.is_leader.load(Ordering::Relaxed) {
Ok(self.leader_value.as_bytes().into())
@@ -486,13 +376,11 @@ impl PgElection {
/// Returns value, expire time and current time. If `with_origin` is true, the origin string is also returned.
async fn get_value_with_lease(&self, key: &str) -> Result<Option<Lease>> {
let key = key.as_bytes();
self.maybe_init_client().await?;
let res = self
.pg_client
.read()
.await
.client
.query(&self.sql_set.get_value_with_lease, &[&key])
.await?;
.await
.context(PostgresExecutionSnafu)?;
if res.is_empty() {
Ok(None)
@@ -526,13 +414,11 @@ impl PgElection {
key_prefix: &str,
) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
self.maybe_init_client().await?;
let res = self
.pg_client
.read()
.await
.client
.query(&self.sql_set.get_value_with_lease_by_prefix, &[&key_prefix])
.await?;
.await
.context(PostgresExecutionSnafu)?;
let mut values_with_leases = vec![];
let mut current = Timestamp::default();
@@ -559,21 +445,18 @@ impl PgElection {
key: &str,
prev: &str,
updated: &str,
lease_ttl: Duration,
lease_ttl: u64,
) -> Result<()> {
let key = key.as_bytes();
let prev = prev.as_bytes();
self.maybe_init_client().await?;
let lease_ttl_secs = lease_ttl.as_secs() as f64;
let res = self
.pg_client
.read()
.await
.client
.execute(
&self.sql_set.update_value_with_lease,
&[&key, &prev, &updated, &lease_ttl_secs],
&[&key, &prev, &updated, &(lease_ttl as f64)],
)
.await?;
.await
.context(PostgresExecutionSnafu)?;
ensure!(
res == 1,
@@ -590,18 +473,16 @@ impl PgElection {
&self,
key: &str,
value: &str,
lease_ttl: Duration,
lease_ttl_secs: u64,
) -> Result<bool> {
let key = key.as_bytes();
let lease_ttl_secs = lease_ttl.as_secs() as f64;
let lease_ttl_secs = lease_ttl_secs as f64;
let params: Vec<&(dyn ToSql + Sync)> = vec![&key, &value, &lease_ttl_secs];
self.maybe_init_client().await?;
let res = self
.pg_client
.read()
.await
.client
.query(&self.sql_set.put_value_with_lease, &params)
.await?;
.await
.context(PostgresExecutionSnafu)?;
Ok(res.is_empty())
}
@@ -609,13 +490,11 @@ impl PgElection {
/// Caution: Should only delete the key if the lease is expired.
async fn delete_value(&self, key: &str) -> Result<bool> {
let key = key.as_bytes();
self.maybe_init_client().await?;
let res = self
.pg_client
.read()
.await
.client
.query(&self.sql_set.delete_value, &[&key])
.await?;
.await
.context(PostgresExecutionSnafu)?;
Ok(res.len() == 1)
}
@@ -657,7 +536,7 @@ impl PgElection {
&key,
&lease.origin,
&self.leader_value,
self.meta_lease_ttl,
self.meta_lease_ttl_secs,
)
.await?;
}
@@ -726,12 +605,10 @@ impl PgElection {
..Default::default()
};
self.delete_value(&key).await?;
self.maybe_init_client().await?;
self.pg_client
.read()
.await
self.client
.query(&self.sql_set.step_down, &[])
.await?;
.await
.context(PostgresExecutionSnafu)?;
send_leader_change_and_set_flags(
&self.is_leader,
&self.leader_infancy,
@@ -774,7 +651,7 @@ impl PgElection {
..Default::default()
};
self.delete_value(&key).await?;
self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl)
self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl_secs)
.await?;
if self
@@ -797,21 +674,15 @@ impl PgElection {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::env;
use common_meta::maybe_skip_postgres_integration_test;
use tokio_postgres::{Client, NoTls};
use super::*;
use crate::bootstrap::create_postgres_pool;
use crate::error;
use crate::error::PostgresExecutionSnafu;
async fn create_postgres_client(
table_name: Option<&str>,
execution_timeout: Duration,
idle_session_timeout: Duration,
statement_timeout: Duration,
) -> Result<ElectionPgClient> {
async fn create_postgres_client(table_name: Option<&str>) -> Result<Client> {
let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default();
if endpoint.is_empty() {
return UnexpectedSnafu {
@@ -819,34 +690,25 @@ mod tests {
}
.fail();
}
let pool = create_postgres_pool(&[endpoint]).await.unwrap();
let mut pg_client = ElectionPgClient::new(
pool,
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.unwrap();
pg_client.maybe_init_client().await?;
let (client, connection) = tokio_postgres::connect(&endpoint, NoTls)
.await
.context(PostgresExecutionSnafu)?;
tokio::spawn(async move {
connection.await.context(PostgresExecutionSnafu).unwrap();
});
if let Some(table_name) = table_name {
let create_table_sql = format!(
"CREATE TABLE IF NOT EXISTS \"{}\"(k bytea PRIMARY KEY, v bytea);",
table_name
);
pg_client.execute(&create_table_sql, &[]).await?;
client.execute(&create_table_sql, &[]).await.unwrap();
}
Ok(pg_client)
Ok(client)
}
async fn drop_table(pg_election: &PgElection, table_name: &str) {
async fn drop_table(client: &Client, table_name: &str) {
let sql = format!("DROP TABLE IF EXISTS \"{}\";", table_name);
pg_election
.pg_client
.read()
.await
.execute(&sql, &[])
.await
.unwrap();
client.execute(&sql, &[]).await.unwrap();
}
#[tokio::test]
@@ -857,35 +719,23 @@ mod tests {
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_postgres_crud_greptime_metakv";
let candidate_lease_ttl = Duration::from_secs(10);
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let meta_lease_ttl = Duration::from_secs(2);
let idle_session_timeout = Duration::from_secs(0);
let client = create_postgres_client(
Some(table_name),
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let client = create_postgres_client(Some(table_name)).await.unwrap();
let (tx, _) = broadcast::channel(100);
let pg_election = PgElection {
leader_value: "test_leader".to_string(),
pg_client: RwLock::new(client),
client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid,
candidate_lease_ttl,
meta_lease_ttl,
sql_set: ElectionSqlFactory::new(28319, table_name).build(),
candidate_lease_ttl_secs: 10,
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28319, table_name, 2).build(),
};
let res = pg_election
.put_value_with_lease(&key, &value, candidate_lease_ttl)
.put_value_with_lease(&key, &value, 10)
.await
.unwrap();
assert!(res);
@@ -898,7 +748,7 @@ mod tests {
assert_eq!(lease.leader_value, value);
pg_election
.update_value_with_lease(&key, &lease.origin, &value, pg_election.meta_lease_ttl)
.update_value_with_lease(&key, &lease.origin, &value, pg_election.meta_lease_ttl_secs)
.await
.unwrap();
@@ -912,7 +762,7 @@ mod tests {
let key = format!("test_key_{}", i);
let value = format!("test_value_{}", i);
pg_election
.put_value_with_lease(&key, &value, candidate_lease_ttl)
.put_value_with_lease(&key, &value, 10)
.await
.unwrap();
}
@@ -937,39 +787,28 @@ mod tests {
assert!(res.is_empty());
assert!(current == Timestamp::default());
drop_table(&pg_election, table_name).await;
drop_table(&pg_election.client, table_name).await;
}
async fn candidate(
leader_value: String,
candidate_lease_ttl: Duration,
candidate_lease_ttl_secs: u64,
store_key_prefix: String,
table_name: String,
) {
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let meta_lease_ttl = Duration::from_secs(2);
let idle_session_timeout = Duration::from_secs(0);
let client = create_postgres_client(
None,
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let client = create_postgres_client(None).await.unwrap();
let (tx, _) = broadcast::channel(100);
let pg_election = PgElection {
leader_value,
pg_client: RwLock::new(client),
client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix,
candidate_lease_ttl,
meta_lease_ttl,
sql_set: ElectionSqlFactory::new(28319, &table_name).build(),
candidate_lease_ttl_secs,
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28319, &table_name, 2).build(),
};
let node_info = MetasrvNodeInfo {
@@ -985,28 +824,17 @@ mod tests {
async fn test_candidate_registration() {
maybe_skip_postgres_integration_test!();
let leader_value_prefix = "test_leader".to_string();
let candidate_lease_ttl_secs = 5;
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_candidate_registration_greptime_metakv";
let mut handles = vec![];
let candidate_lease_ttl = Duration::from_secs(5);
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let meta_lease_ttl = Duration::from_secs(2);
let idle_session_timeout = Duration::from_secs(0);
let client = create_postgres_client(
Some(table_name),
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let client = create_postgres_client(Some(table_name)).await.unwrap();
for i in 0..10 {
let leader_value = format!("{}{}", leader_value_prefix, i);
let handle = tokio::spawn(candidate(
leader_value,
candidate_lease_ttl,
candidate_lease_ttl_secs,
uuid.clone(),
table_name.to_string(),
));
@@ -1019,14 +847,14 @@ mod tests {
let leader_value = "test_leader".to_string();
let pg_election = PgElection {
leader_value,
pg_client: RwLock::new(client),
client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid.clone(),
candidate_lease_ttl,
meta_lease_ttl,
sql_set: ElectionSqlFactory::new(28319, table_name).build(),
candidate_lease_ttl_secs,
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28319, table_name, 2).build(),
};
let candidates = pg_election.all_candidates().await.unwrap();
@@ -1048,40 +876,29 @@ mod tests {
assert!(res);
}
drop_table(&pg_election, table_name).await;
drop_table(&pg_election.client, table_name).await;
}
#[tokio::test]
async fn test_elected_and_step_down() {
maybe_skip_postgres_integration_test!();
let leader_value = "test_leader".to_string();
let candidate_lease_ttl_secs = 5;
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_elected_and_step_down_greptime_metakv";
let candidate_lease_ttl = Duration::from_secs(5);
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let meta_lease_ttl = Duration::from_secs(2);
let idle_session_timeout = Duration::from_secs(0);
let client = create_postgres_client(
Some(table_name),
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let client = create_postgres_client(Some(table_name)).await.unwrap();
let (tx, mut rx) = broadcast::channel(100);
let leader_pg_election = PgElection {
leader_value: leader_value.clone(),
pg_client: RwLock::new(client),
client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid,
candidate_lease_ttl,
meta_lease_ttl,
sql_set: ElectionSqlFactory::new(28320, table_name).build(),
candidate_lease_ttl_secs,
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28320, table_name, 2).build(),
};
leader_pg_election.elected().await.unwrap();
@@ -1173,7 +990,7 @@ mod tests {
_ => panic!("Expected LeaderChangeMessage::StepDown"),
}
drop_table(&leader_pg_election, table_name).await;
drop_table(&leader_pg_election.client, table_name).await;
}
#[tokio::test]
@@ -1182,38 +999,25 @@ mod tests {
let leader_value = "test_leader".to_string();
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_leader_action_greptime_metakv";
let candidate_lease_ttl = Duration::from_secs(5);
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let meta_lease_ttl = Duration::from_secs(2);
let idle_session_timeout = Duration::from_secs(0);
let client = create_postgres_client(
Some(table_name),
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let candidate_lease_ttl_secs = 5;
let client = create_postgres_client(Some(table_name)).await.unwrap();
let (tx, mut rx) = broadcast::channel(100);
let leader_pg_election = PgElection {
leader_value: leader_value.clone(),
pg_client: RwLock::new(client),
client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid,
candidate_lease_ttl,
meta_lease_ttl,
sql_set: ElectionSqlFactory::new(28321, table_name).build(),
candidate_lease_ttl_secs,
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28321, table_name, 2).build(),
};
// Step 1: No leader exists, campaign and elected.
let res = leader_pg_election
.pg_client
.read()
.await
.client
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
@@ -1244,9 +1048,7 @@ mod tests {
// Step 2: As a leader, renew the lease.
let res = leader_pg_election
.pg_client
.read()
.await
.client
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
@@ -1268,9 +1070,7 @@ mod tests {
tokio::time::sleep(Duration::from_secs(2)).await;
let res = leader_pg_election
.pg_client
.read()
.await
.client
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
@@ -1298,9 +1098,7 @@ mod tests {
// Step 4: Re-campaign and elected.
let res = leader_pg_election
.pg_client
.read()
.await
.client
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
@@ -1357,9 +1155,7 @@ mod tests {
// Step 6: Re-campaign and elected.
let res = leader_pg_election
.pg_client
.read()
.await
.client
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
@@ -1390,9 +1186,7 @@ mod tests {
// Step 7: Something wrong, the leader key changed by others.
let res = leader_pg_election
.pg_client
.read()
.await
.client
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
@@ -1403,11 +1197,7 @@ mod tests {
.await
.unwrap();
leader_pg_election
.put_value_with_lease(
&leader_pg_election.election_key(),
"test",
Duration::from_secs(10),
)
.put_value_with_lease(&leader_pg_election.election_key(), "test", 10)
.await
.unwrap();
leader_pg_election.leader_action().await.unwrap();
@@ -1433,74 +1223,52 @@ mod tests {
// Clean up
leader_pg_election
.pg_client
.read()
.await
.client
.query(&leader_pg_election.sql_set.step_down, &[])
.await
.unwrap();
drop_table(&leader_pg_election, table_name).await;
drop_table(&leader_pg_election.client, table_name).await;
}
#[tokio::test]
async fn test_follower_action() {
maybe_skip_postgres_integration_test!();
common_telemetry::init_default_ut_logging();
let candidate_lease_ttl_secs = 5;
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_follower_action_greptime_metakv";
let candidate_lease_ttl = Duration::from_secs(5);
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let meta_lease_ttl = Duration::from_secs(2);
let idle_session_timeout = Duration::from_secs(0);
let follower_client = create_postgres_client(
Some(table_name),
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let follower_client = create_postgres_client(Some(table_name)).await.unwrap();
let (tx, mut rx) = broadcast::channel(100);
let follower_pg_election = PgElection {
leader_value: "test_follower".to_string(),
pg_client: RwLock::new(follower_client),
client: follower_client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid.clone(),
candidate_lease_ttl,
meta_lease_ttl,
sql_set: ElectionSqlFactory::new(28322, table_name).build(),
candidate_lease_ttl_secs,
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28322, table_name, 2).build(),
};
let leader_client = create_postgres_client(
Some(table_name),
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
let leader_client = create_postgres_client(Some(table_name)).await.unwrap();
let (tx, _) = broadcast::channel(100);
let leader_pg_election = PgElection {
leader_value: "test_leader".to_string(),
pg_client: RwLock::new(leader_client),
client: leader_client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: uuid,
candidate_lease_ttl,
meta_lease_ttl,
sql_set: ElectionSqlFactory::new(28322, table_name).build(),
candidate_lease_ttl_secs,
meta_lease_ttl_secs: 2,
sql_set: ElectionSqlFactory::new(28322, table_name, 2).build(),
};
leader_pg_election
.pg_client
.read()
.await
.client
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
@@ -1541,41 +1309,11 @@ mod tests {
// Clean up
leader_pg_election
.pg_client
.read()
.await
.client
.query(&leader_pg_election.sql_set.step_down, &[])
.await
.unwrap();
drop_table(&follower_pg_election, table_name).await;
}
#[tokio::test]
async fn test_idle_session_timeout() {
maybe_skip_postgres_integration_test!();
common_telemetry::init_default_ut_logging();
let execution_timeout = Duration::from_secs(10);
let statement_timeout = Duration::from_secs(10);
let idle_session_timeout = Duration::from_secs(1);
let mut client = create_postgres_client(
None,
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(1100)).await;
// Wait for the idle session timeout.
let err = client.query("SELECT 1", &[]).await.unwrap_err();
assert_matches!(err, error::Error::PostgresExecution { .. });
let error::Error::PostgresExecution { error, .. } = err else {
panic!("Expected PostgresExecution error");
};
assert!(error.is_closed());
// Reset the client and try again.
client.reset_client().await.unwrap();
let _ = client.query("SELECT 1", &[]).await.unwrap();
drop_table(&follower_pg_election.client, table_name).await;
}
}

View File

@@ -748,31 +748,21 @@ pub enum Error {
},
#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Failed to execute via postgres, sql: {}", sql))]
#[snafu(display("Failed to execute via postgres"))]
PostgresExecution {
#[snafu(source)]
error: tokio_postgres::Error,
sql: String,
#[snafu(implicit)]
location: Location,
},
#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Failed to get Postgres client"))]
GetPostgresClient {
#[snafu(implicit)]
location: Location,
#[snafu(display("Failed to connect to Postgres"))]
ConnectPostgres {
#[snafu(source)]
error: deadpool::managed::PoolError<tokio_postgres::Error>,
},
#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Sql execution timeout, sql: {}, duration: {:?}", sql, duration))]
SqlExecutionTimeout {
error: tokio_postgres::Error,
#[snafu(implicit)]
location: Location,
sql: String,
duration: std::time::Duration,
},
#[cfg(feature = "pg_kvbackend")]
@@ -1015,10 +1005,9 @@ impl ErrorExt for Error {
Error::LookupPeer { source, .. } => source.status_code(),
#[cfg(feature = "pg_kvbackend")]
Error::CreatePostgresPool { .. }
| Error::GetPostgresClient { .. }
| Error::GetPostgresConnection { .. }
| Error::PostgresExecution { .. }
| Error::SqlExecutionTimeout { .. } => StatusCode::Internal,
| Error::ConnectPostgres { .. } => StatusCode::Internal,
#[cfg(feature = "mysql_kvbackend")]
Error::MySqlExecution { .. }
| Error::CreateMySqlPool { .. }

View File

@@ -375,7 +375,7 @@ impl HeartbeatMailbox {
/// Parses the [Instruction] from [MailboxMessage].
#[cfg(test)]
pub fn json_instruction(msg: &MailboxMessage) -> Result<Instruction> {
pub(crate) fn json_instruction(msg: &MailboxMessage) -> Result<Instruction> {
let Payload::Json(payload) =
msg.payload
.as_ref()

View File

@@ -22,7 +22,7 @@ use std::time::Duration;
use clap::ValueEnum;
use common_base::readable_size::ReadableSize;
use common_base::Plugins;
use common_config::{Configurable, DEFAULT_DATA_HOME};
use common_config::Configurable;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::ProcedureExecutorRef;
@@ -46,7 +46,6 @@ use common_telemetry::{error, info, warn};
use common_wal::config::MetasrvWalConfig;
use serde::{Deserialize, Serialize};
use servers::export_metrics::ExportMetricsOption;
use servers::grpc::GrpcOptions;
use servers::http::HttpOptions;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
@@ -74,7 +73,7 @@ use crate::state::{become_follower, become_leader, StateRef};
pub const TABLE_ID_SEQ: &str = "table_id";
pub const FLOW_ID_SEQ: &str = "flow_id";
pub const METASRV_DATA_DIR: &str = "metasrv";
pub const METASRV_HOME: &str = "./greptimedb_data/metasrv";
// The datastores that implements metadata kvbackend.
#[derive(Clone, Debug, PartialEq, Serialize, Default, Deserialize, ValueEnum)]
@@ -97,10 +96,8 @@ pub enum BackendImpl {
#[serde(default)]
pub struct MetasrvOptions {
/// The address the server listens on.
#[deprecated(note = "Use grpc.bind_addr instead")]
pub bind_addr: String,
/// The address the server advertises to the clients.
#[deprecated(note = "Use grpc.server_addr instead")]
pub server_addr: String,
/// The address of the store, e.g., etcd.
pub store_addrs: Vec<String>,
@@ -115,7 +112,6 @@ pub struct MetasrvOptions {
/// If it's true, the region failover will be allowed even if the local WAL is used.
/// Note that this option is not recommended to be set to true, because it may lead to data loss during failover.
pub allow_region_failover_on_local_wal: bool,
pub grpc: GrpcOptions,
/// The HTTP server options.
pub http: HttpOptions,
/// The logging options.
@@ -170,6 +166,8 @@ impl fmt::Debug for MetasrvOptions {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut debug_struct = f.debug_struct("MetasrvOptions");
debug_struct
.field("bind_addr", &self.bind_addr)
.field("server_addr", &self.server_addr)
.field("store_addrs", &self.sanitize_store_addrs())
.field("selector", &self.selector)
.field("use_memory_store", &self.use_memory_store)
@@ -178,7 +176,6 @@ impl fmt::Debug for MetasrvOptions {
"allow_region_failover_on_local_wal",
&self.allow_region_failover_on_local_wal,
)
.field("grpc", &self.grpc)
.field("http", &self.http)
.field("logging", &self.logging)
.field("procedure", &self.procedure)
@@ -211,21 +208,19 @@ const DEFAULT_METASRV_ADDR_PORT: &str = "3002";
impl Default for MetasrvOptions {
fn default() -> Self {
Self {
#[allow(deprecated)]
bind_addr: String::new(),
#[allow(deprecated)]
bind_addr: format!("127.0.0.1:{}", DEFAULT_METASRV_ADDR_PORT),
// If server_addr is not set, the server will use the local ip address as the server address.
server_addr: String::new(),
store_addrs: vec!["127.0.0.1:2379".to_string()],
selector: SelectorType::default(),
use_memory_store: false,
enable_region_failover: false,
allow_region_failover_on_local_wal: false,
grpc: GrpcOptions {
bind_addr: format!("127.0.0.1:{}", DEFAULT_METASRV_ADDR_PORT),
http: HttpOptions::default(),
logging: LoggingOptions {
dir: format!("{METASRV_HOME}/logs"),
..Default::default()
},
http: HttpOptions::default(),
logging: LoggingOptions::default(),
procedure: ProcedureConfig {
max_retry_times: 12,
retry_delay: Duration::from_millis(500),
@@ -237,7 +232,7 @@ impl Default for MetasrvOptions {
failure_detector: PhiAccrualFailureDetectorOptions::default(),
datanode: DatanodeClientOptions::default(),
enable_telemetry: true,
data_home: DEFAULT_DATA_HOME.to_string(),
data_home: METASRV_HOME.to_string(),
wal: MetasrvWalConfig::default(),
export_metrics: ExportMetricsOption::default(),
store_key_prefix: String::new(),
@@ -261,6 +256,37 @@ impl Configurable for MetasrvOptions {
}
impl MetasrvOptions {
/// Detect server address.
#[cfg(not(target_os = "android"))]
pub fn detect_server_addr(&mut self) {
if self.server_addr.is_empty() {
match local_ip_address::local_ip() {
Ok(ip) => {
let detected_addr = format!(
"{}:{}",
ip,
self.bind_addr
.split(':')
.nth(1)
.unwrap_or(DEFAULT_METASRV_ADDR_PORT)
);
info!("Using detected: {} as server address", detected_addr);
self.server_addr = detected_addr;
}
Err(e) => {
error!("Failed to detect local ip address: {}", e);
}
}
}
}
#[cfg(target_os = "android")]
pub fn detect_server_addr(&mut self) {
if self.server_addr.is_empty() {
common_telemetry::debug!("detect local IP is not supported on Android");
}
}
fn sanitize_store_addrs(&self) -> Vec<String> {
self.store_addrs
.iter()
@@ -559,7 +585,6 @@ impl Metasrv {
if let Err(e) = res {
warn!(e; "Metasrv election error");
}
election.reset_campaign().await;
info!("Metasrv re-initiate election");
}
info!("Metasrv stopped");
@@ -616,7 +641,7 @@ impl Metasrv {
pub fn node_info(&self) -> MetasrvNodeInfo {
let build_info = common_version::build_info();
MetasrvNodeInfo {
addr: self.options().grpc.server_addr.clone(),
addr: self.options().server_addr.clone(),
version: build_info.version.to_string(),
git_commit: build_info.commit_short.to_string(),
start_time_ms: self.start_time_ms(),
@@ -708,7 +733,7 @@ impl Metasrv {
#[inline]
pub fn new_ctx(&self) -> Context {
let server_addr = self.options().grpc.server_addr.clone();
let server_addr = self.options().server_addr.clone();
let in_memory = self.in_memory.clone();
let kv_backend = self.kv_backend.clone();
let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();

Some files were not shown because too many files have changed in this diff Show More