Compare commits

..

1 Commits

Author SHA1 Message Date
liyang
e631d0c4ae test!:Update config.yml 2025-05-21 15:47:40 +08:00
431 changed files with 10797 additions and 23434 deletions

View File

@@ -2,7 +2,7 @@ blank_issues_enabled: false
contact_links: contact_links:
- name: Greptime Community Slack - name: Greptime Community Slack
url: https://greptime.com/slack url: https://greptime.com/slack
about: Get free help from the Greptime community about: Get free help from the Greptime community.
- name: Greptime Community Discussion - name: Greptime Community Discussion
url: https://github.com/greptimeTeam/discussions url: https://github.com/greptimeTeam/discussions
about: Get free help from the Greptime community about: Get free help from the Greptime community.

View File

@@ -52,7 +52,7 @@ runs:
uses: ./.github/actions/build-greptime-binary uses: ./.github/actions/build-greptime-binary
with: with:
base-image: ubuntu base-image: ubuntu
features: servers/dashboard features: servers/dashboard,pg_kvbackend,mysql_kvbackend
cargo-profile: ${{ inputs.cargo-profile }} cargo-profile: ${{ inputs.cargo-profile }}
artifacts-dir: greptime-linux-${{ inputs.arch }}-${{ inputs.version }} artifacts-dir: greptime-linux-${{ inputs.arch }}-${{ inputs.version }}
version: ${{ inputs.version }} version: ${{ inputs.version }}
@@ -70,7 +70,7 @@ runs:
if: ${{ inputs.arch == 'amd64' && inputs.dev-mode == 'false' }} # Builds greptime for centos if the host machine is amd64. if: ${{ inputs.arch == 'amd64' && inputs.dev-mode == 'false' }} # Builds greptime for centos if the host machine is amd64.
with: with:
base-image: centos base-image: centos
features: servers/dashboard features: servers/dashboard,pg_kvbackend,mysql_kvbackend
cargo-profile: ${{ inputs.cargo-profile }} cargo-profile: ${{ inputs.cargo-profile }}
artifacts-dir: greptime-linux-${{ inputs.arch }}-centos-${{ inputs.version }} artifacts-dir: greptime-linux-${{ inputs.arch }}-centos-${{ inputs.version }}
version: ${{ inputs.version }} version: ${{ inputs.version }}

View File

@@ -64,11 +64,11 @@ inputs:
upload-max-retry-times: upload-max-retry-times:
description: Max retry times for uploading artifacts to S3 description: Max retry times for uploading artifacts to S3
required: false required: false
default: "30" default: "20"
upload-retry-timeout: upload-retry-timeout:
description: Timeout for uploading artifacts to S3 description: Timeout for uploading artifacts to S3
required: false required: false
default: "120" # minutes default: "30" # minutes
runs: runs:
using: composite using: composite
steps: steps:

View File

@@ -59,7 +59,7 @@ runs:
--set base.podTemplate.main.resources.requests.cpu=50m \ --set base.podTemplate.main.resources.requests.cpu=50m \
--set base.podTemplate.main.resources.requests.memory=256Mi \ --set base.podTemplate.main.resources.requests.memory=256Mi \
--set base.podTemplate.main.resources.limits.cpu=2000m \ --set base.podTemplate.main.resources.limits.cpu=2000m \
--set base.podTemplate.main.resources.limits.memory=3Gi \ --set base.podTemplate.main.resources.limits.memory=2Gi \
--set frontend.replicas=${{ inputs.frontend-replicas }} \ --set frontend.replicas=${{ inputs.frontend-replicas }} \
--set datanode.replicas=${{ inputs.datanode-replicas }} \ --set datanode.replicas=${{ inputs.datanode-replicas }} \
--set meta.replicas=${{ inputs.meta-replicas }} \ --set meta.replicas=${{ inputs.meta-replicas }} \

View File

@@ -16,8 +16,7 @@ function create_version() {
if [ -z "$NEXT_RELEASE_VERSION" ]; then if [ -z "$NEXT_RELEASE_VERSION" ]; then
echo "NEXT_RELEASE_VERSION is empty, use version from Cargo.toml" >&2 echo "NEXT_RELEASE_VERSION is empty, use version from Cargo.toml" >&2
# NOTE: Need a `v` prefix for the version string. export NEXT_RELEASE_VERSION=$(grep '^version = ' Cargo.toml | cut -d '"' -f 2 | head -n 1)
export NEXT_RELEASE_VERSION=v$(grep '^version = ' Cargo.toml | cut -d '"' -f 2 | head -n 1)
fi fi
if [ -z "$NIGHTLY_RELEASE_PREFIX" ]; then if [ -z "$NIGHTLY_RELEASE_PREFIX" ]; then

View File

@@ -55,11 +55,6 @@ on:
description: Build and push images to DockerHub and ACR description: Build and push images to DockerHub and ACR
required: false required: false
default: true default: true
upload_artifacts_to_s3:
type: boolean
description: Whether upload artifacts to s3
required: false
default: false
cargo_profile: cargo_profile:
type: choice type: choice
description: The cargo profile to use in building GreptimeDB. description: The cargo profile to use in building GreptimeDB.
@@ -243,7 +238,7 @@ jobs:
version: ${{ needs.allocate-runners.outputs.version }} version: ${{ needs.allocate-runners.outputs.version }}
push-latest-tag: false # Don't push the latest tag to registry. push-latest-tag: false # Don't push the latest tag to registry.
dev-mode: true # Only build the standard images. dev-mode: true # Only build the standard images.
- name: Echo Docker image tag to step summary - name: Echo Docker image tag to step summary
run: | run: |
echo "## Docker Image Tag" >> $GITHUB_STEP_SUMMARY echo "## Docker Image Tag" >> $GITHUB_STEP_SUMMARY
@@ -286,7 +281,7 @@ jobs:
aws-cn-access-key-id: ${{ secrets.AWS_CN_ACCESS_KEY_ID }} aws-cn-access-key-id: ${{ secrets.AWS_CN_ACCESS_KEY_ID }}
aws-cn-secret-access-key: ${{ secrets.AWS_CN_SECRET_ACCESS_KEY }} aws-cn-secret-access-key: ${{ secrets.AWS_CN_SECRET_ACCESS_KEY }}
aws-cn-region: ${{ vars.AWS_RELEASE_BUCKET_REGION }} aws-cn-region: ${{ vars.AWS_RELEASE_BUCKET_REGION }}
upload-to-s3: ${{ inputs.upload_artifacts_to_s3 }} upload-to-s3: false
dev-mode: true # Only build the standard images(exclude centos images). dev-mode: true # Only build the standard images(exclude centos images).
push-latest-tag: false # Don't push the latest tag to registry. push-latest-tag: false # Don't push the latest tag to registry.
update-version-info: false # Don't update the version info in S3. update-version-info: false # Don't update the version info in S3.

View File

@@ -195,7 +195,6 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
timeout-minutes: 60 timeout-minutes: 60
strategy: strategy:
fail-fast: false
matrix: matrix:
target: [ "unstable_fuzz_create_table_standalone" ] target: [ "unstable_fuzz_create_table_standalone" ]
steps: steps:
@@ -250,11 +249,6 @@ jobs:
name: unstable-fuzz-logs name: unstable-fuzz-logs
path: /tmp/unstable-greptime/ path: /tmp/unstable-greptime/
retention-days: 3 retention-days: 3
- name: Describe pods
if: failure()
shell: bash
run: |
kubectl describe pod -n my-greptimedb
build-greptime-ci: build-greptime-ci:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }} if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
@@ -305,7 +299,6 @@ jobs:
needs: build-greptime-ci needs: build-greptime-ci
timeout-minutes: 60 timeout-minutes: 60
strategy: strategy:
fail-fast: false
matrix: matrix:
target: [ "fuzz_create_table", "fuzz_alter_table", "fuzz_create_database", "fuzz_create_logical_table", "fuzz_alter_logical_table", "fuzz_insert", "fuzz_insert_logical_table" ] target: [ "fuzz_create_table", "fuzz_alter_table", "fuzz_create_database", "fuzz_create_logical_table", "fuzz_alter_logical_table", "fuzz_insert", "fuzz_insert_logical_table" ]
mode: mode:
@@ -410,11 +403,6 @@ jobs:
shell: bash shell: bash
run: | run: |
kubectl describe nodes kubectl describe nodes
- name: Describe pod
if: failure()
shell: bash
run: |
kubectl describe pod -n my-greptimedb
- name: Export kind logs - name: Export kind logs
if: failure() if: failure()
shell: bash shell: bash
@@ -443,7 +431,6 @@ jobs:
needs: build-greptime-ci needs: build-greptime-ci
timeout-minutes: 60 timeout-minutes: 60
strategy: strategy:
fail-fast: false
matrix: matrix:
target: ["fuzz_migrate_mito_regions", "fuzz_migrate_metric_regions", "fuzz_failover_mito_regions", "fuzz_failover_metric_regions"] target: ["fuzz_migrate_mito_regions", "fuzz_migrate_metric_regions", "fuzz_failover_mito_regions", "fuzz_failover_metric_regions"]
mode: mode:
@@ -564,11 +551,6 @@ jobs:
shell: bash shell: bash
run: | run: |
kubectl describe nodes kubectl describe nodes
- name: Describe pods
if: failure()
shell: bash
run: |
kubectl describe pod -n my-greptimedb
- name: Export kind logs - name: Export kind logs
if: failure() if: failure()
shell: bash shell: bash
@@ -596,7 +578,6 @@ jobs:
needs: build needs: build
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
strategy: strategy:
fail-fast: false
matrix: matrix:
os: [ ubuntu-latest ] os: [ ubuntu-latest ]
mode: mode:

View File

@@ -124,7 +124,9 @@ jobs:
fetch-depth: 0 fetch-depth: 0
persist-credentials: false persist-credentials: false
- uses: cachix/install-nix-action@v31 - uses: cachix/install-nix-action@v31
- run: nix develop --command cargo check --bin greptime with:
nix_path: nixpkgs=channel:nixos-24.11
- run: nix develop --command cargo build --bin greptime
env: env:
CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=mold" CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=mold"

View File

@@ -441,8 +441,8 @@ jobs:
aws-region: ${{ vars.EC2_RUNNER_REGION }} aws-region: ${{ vars.EC2_RUNNER_REGION }}
github-token: ${{ secrets.GH_PERSONAL_ACCESS_TOKEN }} github-token: ${{ secrets.GH_PERSONAL_ACCESS_TOKEN }}
bump-downstream-repo-versions: bump-doc-version:
name: Bump downstream repo versions name: Bump doc version
if: ${{ github.event_name == 'push' || github.event_name == 'schedule' }} if: ${{ github.event_name == 'push' || github.event_name == 'schedule' }}
needs: [allocate-runners, publish-github-release] needs: [allocate-runners, publish-github-release]
runs-on: ubuntu-latest runs-on: ubuntu-latest
@@ -456,16 +456,36 @@ jobs:
fetch-depth: 0 fetch-depth: 0
persist-credentials: false persist-credentials: false
- uses: ./.github/actions/setup-cyborg - uses: ./.github/actions/setup-cyborg
- name: Bump downstream repo versions - name: Bump doc version
working-directory: cyborg working-directory: cyborg
run: pnpm tsx bin/bump-versions.ts run: pnpm tsx bin/bump-doc-version.ts
env:
VERSION: ${{ needs.allocate-runners.outputs.version }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
DOCS_REPO_TOKEN: ${{ secrets.DOCS_REPO_TOKEN }}
bump-website-version:
name: Bump website version
if: ${{ github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
needs: [allocate-runners, publish-github-release]
runs-on: ubuntu-latest
# Permission reference: https://docs.github.com/en/actions/using-jobs/assigning-permissions-to-jobs
permissions:
issues: write # Allows the action to create issues for cyborg.
contents: write # Allows the action to create a release.
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
persist-credentials: false
- uses: ./.github/actions/setup-cyborg
- name: Bump website version
working-directory: cyborg
run: pnpm tsx bin/bump-website-version.ts
env: env:
TARGET_REPOS: website,docs,demo
VERSION: ${{ needs.allocate-runners.outputs.version }} VERSION: ${{ needs.allocate-runners.outputs.version }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
WEBSITE_REPO_TOKEN: ${{ secrets.WEBSITE_REPO_TOKEN }} WEBSITE_REPO_TOKEN: ${{ secrets.WEBSITE_REPO_TOKEN }}
DOCS_REPO_TOKEN: ${{ secrets.DOCS_REPO_TOKEN }}
DEMO_REPO_TOKEN: ${{ secrets.DEMO_REPO_TOKEN }}
bump-helm-charts-version: bump-helm-charts-version:
name: Bump helm charts version name: Bump helm charts version

View File

@@ -16,7 +16,6 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
permissions: permissions:
pull-requests: write # Add permissions to modify PRs pull-requests: write # Add permissions to modify PRs
issues: write
timeout-minutes: 10 timeout-minutes: 10
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4

View File

@@ -108,7 +108,7 @@ of what you were trying to do and what went wrong. You can also reach for help i
The core team will be thrilled if you would like to participate in any way you like. When you are stuck, try to ask for help by filing an issue, with a detailed description of what you were trying to do and what went wrong. If you have any questions or if you would like to get involved in our community, please check out: The core team will be thrilled if you would like to participate in any way you like. When you are stuck, try to ask for help by filing an issue, with a detailed description of what you were trying to do and what went wrong. If you have any questions or if you would like to get involved in our community, please check out:
- [GreptimeDB Community Slack](https://greptime.com/slack) - [GreptimeDB Community Slack](https://greptime.com/slack)
- [GreptimeDB GitHub Discussions](https://github.com/GreptimeTeam/greptimedb/discussions) - [GreptimeDB Github Discussions](https://github.com/GreptimeTeam/greptimedb/discussions)
Also, see some extra GreptimeDB content: Also, see some extra GreptimeDB content:

860
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -30,7 +30,6 @@ members = [
"src/common/recordbatch", "src/common/recordbatch",
"src/common/runtime", "src/common/runtime",
"src/common/session", "src/common/session",
"src/common/stat",
"src/common/substrait", "src/common/substrait",
"src/common/telemetry", "src/common/telemetry",
"src/common/test-util", "src/common/test-util",
@@ -133,7 +132,7 @@ etcd-client = "0.14"
fst = "0.4.7" fst = "0.4.7"
futures = "0.3" futures = "0.3"
futures-util = "0.3" futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2dca1dc67862d7b410838aef81232274c019b3f6" } greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7668a882d57ca6a2333146e0574b8f0c9d5008ae" }
hex = "0.4" hex = "0.4"
http = "1" http = "1"
humantime = "2.1" humantime = "2.1"
@@ -149,7 +148,6 @@ meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev =
mockall = "0.13" mockall = "0.13"
moka = "0.12" moka = "0.12"
nalgebra = "0.33" nalgebra = "0.33"
nix = { version = "0.30.1", default-features = false, features = ["event", "fs", "process"] }
notify = "8.0" notify = "8.0"
num_cpus = "1.16" num_cpus = "1.16"
object_store_opendal = "0.50" object_store_opendal = "0.50"
@@ -289,7 +287,6 @@ query = { path = "src/query" }
servers = { path = "src/servers" } servers = { path = "src/servers" }
session = { path = "src/session" } session = { path = "src/session" }
sql = { path = "src/sql" } sql = { path = "src/sql" }
stat = { path = "src/common/stat" }
store-api = { path = "src/store-api" } store-api = { path = "src/store-api" }
substrait = { path = "src/common/substrait" } substrait = { path = "src/common/substrait" }
table = { path = "src/table" } table = { path = "src/table" }

View File

@@ -27,7 +27,6 @@
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. | | `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
| `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default<br/>This allows browser to access http APIs without CORS restrictions | | `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default<br/>This allows browser to access http APIs without CORS restrictions |
| `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. | | `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. |
| `http.prom_validation_mode` | String | `strict` | Whether to enable validation for Prometheus remote write requests.<br/>Available options:<br/>- strict: deny invalid UTF-8 strings (default).<br/>- lossy: allow invalid UTF-8 strings, replace invalid characters with REPLACEMENT_CHARACTER(U+FFFD).<br/>- unchecked: do not valid strings. |
| `grpc` | -- | -- | The gRPC server options. | | `grpc` | -- | -- | The gRPC server options. |
| `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. | | `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. | | `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
@@ -100,7 +99,7 @@
| `query` | -- | -- | The query engine options. | | `query` | -- | -- | The query engine options. |
| `query.parallelism` | Integer | `0` | Parallelism of the query engine.<br/>Default to 0, which means the number of CPU cores. | | `query.parallelism` | Integer | `0` | Parallelism of the query engine.<br/>Default to 0, which means the number of CPU cores. |
| `storage` | -- | -- | The data storage options. | | `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `./greptimedb_data` | The working home directory. | | `storage.data_home` | String | `./greptimedb_data/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. | | `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. | | `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. | | `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
@@ -195,13 +194,13 @@
| `slow_query.record_type` | String | Unset | The record type of slow queries. It can be `system_table` or `log`. | | `slow_query.record_type` | String | Unset | The record type of slow queries. It can be `system_table` or `log`. |
| `slow_query.threshold` | String | Unset | The threshold of slow query. | | `slow_query.threshold` | String | Unset | The threshold of slow query. |
| `slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. | | `slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. |
| `export_metrics` | -- | -- | The standalone can export its metrics and send to Prometheus compatible service (e.g. `greptimedb`) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. | | `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. | | `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. | | `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommended to collect metrics generated by itself<br/>You must create the database before enabling it. | | `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommended to collect metrics generated by itself<br/>You must create the database before enabling it. |
| `export_metrics.self_import.db` | String | Unset | -- | | `export_metrics.self_import.db` | String | Unset | -- |
| `export_metrics.remote_write` | -- | -- | -- | | `export_metrics.remote_write` | -- | -- | -- |
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. | | `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. | | `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. | | `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. | | `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
@@ -227,12 +226,10 @@
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. | | `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
| `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default<br/>This allows browser to access http APIs without CORS restrictions | | `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default<br/>This allows browser to access http APIs without CORS restrictions |
| `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. | | `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. |
| `http.prom_validation_mode` | String | `strict` | Whether to enable validation for Prometheus remote write requests.<br/>Available options:<br/>- strict: deny invalid UTF-8 strings (default).<br/>- lossy: allow invalid UTF-8 strings, replace invalid characters with REPLACEMENT_CHARACTER(U+FFFD).<br/>- unchecked: do not valid strings. |
| `grpc` | -- | -- | The gRPC server options. | | `grpc` | -- | -- | The gRPC server options. |
| `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. | | `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
| `grpc.server_addr` | String | `127.0.0.1:4001` | The address advertised to the metasrv, and used for connections from outside the host.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `grpc.bind_addr`. | | `grpc.server_addr` | String | `127.0.0.1:4001` | The address advertised to the metasrv, and used for connections from outside the host.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `grpc.bind_addr`. |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. | | `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for frontend side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression. |
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. | | `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
| `grpc.tls.mode` | String | `disable` | TLS mode. | | `grpc.tls.mode` | String | `disable` | TLS mode. |
| `grpc.tls.cert_path` | String | Unset | Certificate file path. | | `grpc.tls.cert_path` | String | Unset | Certificate file path. |
@@ -299,11 +296,13 @@
| `slow_query.threshold` | String | `30s` | The threshold of slow query. It can be human readable time string, for example: `10s`, `100ms`, `1s`. | | `slow_query.threshold` | String | `30s` | The threshold of slow query. It can be human readable time string, for example: `10s`, `100ms`, `1s`. |
| `slow_query.sample_ratio` | Float | `1.0` | The sampling ratio of slow query log. The value should be in the range of (0, 1]. For example, `0.1` means 10% of the slow queries will be logged and `1.0` means all slow queries will be logged. | | `slow_query.sample_ratio` | Float | `1.0` | The sampling ratio of slow query log. The value should be in the range of (0, 1]. For example, `0.1` means 10% of the slow queries will be logged and `1.0` means all slow queries will be logged. |
| `slow_query.ttl` | String | `30d` | The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`. | | `slow_query.ttl` | String | `30d` | The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`. |
| `export_metrics` | -- | -- | The frontend can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. | | `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. | | `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. | | `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
| `export_metrics.self_import.db` | String | Unset | -- |
| `export_metrics.remote_write` | -- | -- | -- | | `export_metrics.remote_write` | -- | -- | -- |
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. | | `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. | | `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. | | `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. | | `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
@@ -313,10 +312,12 @@
| Key | Type | Default | Descriptions | | Key | Type | Default | Descriptions |
| --- | -----| ------- | ----------- | | --- | -----| ------- | ----------- |
| `data_home` | String | `./greptimedb_data` | The working home directory. | | `data_home` | String | `./greptimedb_data/metasrv/` | The working home directory. |
| `bind_addr` | String | `127.0.0.1:3002` | The bind address of metasrv. |
| `server_addr` | String | `127.0.0.1:3002` | The communication server address for the frontend and datanode to connect to metasrv.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `bind_addr`. |
| `store_addrs` | Array | -- | Store server address default to etcd store.<br/>For postgres store, the format is:<br/>"password=password dbname=postgres user=postgres host=localhost port=5432"<br/>For etcd store, the format is:<br/>"127.0.0.1:2379" | | `store_addrs` | Array | -- | Store server address default to etcd store.<br/>For postgres store, the format is:<br/>"password=password dbname=postgres user=postgres host=localhost port=5432"<br/>For etcd store, the format is:<br/>"127.0.0.1:2379" |
| `store_key_prefix` | String | `""` | If it's not empty, the metasrv will store all data with this key prefix. | | `store_key_prefix` | String | `""` | If it's not empty, the metasrv will store all data with this key prefix. |
| `backend` | String | `etcd_store` | The datastore for meta server.<br/>Available values:<br/>- `etcd_store` (default value)<br/>- `memory_store`<br/>- `postgres_store`<br/>- `mysql_store` | | `backend` | String | `etcd_store` | The datastore for meta server.<br/>Available values:<br/>- `etcd_store` (default value)<br/>- `memory_store`<br/>- `postgres_store` |
| `meta_table_name` | String | `greptime_metakv` | Table name in RDS to store metadata. Effect when using a RDS kvbackend.<br/>**Only used when backend is `postgres_store`.** | | `meta_table_name` | String | `greptime_metakv` | Table name in RDS to store metadata. Effect when using a RDS kvbackend.<br/>**Only used when backend is `postgres_store`.** |
| `meta_election_lock_id` | Integer | `1` | Advisory lock id in PostgreSQL for election. Effect when using PostgreSQL as kvbackend<br/>Only used when backend is `postgres_store`. | | `meta_election_lock_id` | Integer | `1` | Advisory lock id in PostgreSQL for election. Effect when using PostgreSQL as kvbackend<br/>Only used when backend is `postgres_store`. |
| `selector` | String | `round_robin` | Datanode selector type.<br/>- `round_robin` (default value)<br/>- `lease_based`<br/>- `load_based`<br/>For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". | | `selector` | String | `round_robin` | Datanode selector type.<br/>- `round_robin` (default value)<br/>- `lease_based`<br/>- `load_based`<br/>For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". |
@@ -328,16 +329,6 @@
| `runtime` | -- | -- | The runtime options. | | `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | | `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. | | `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.bind_addr` | String | `127.0.0.1:3002` | The address to bind the gRPC server. |
| `grpc.server_addr` | String | `127.0.0.1:3002` | The communication server address for the frontend and datanode to connect to metasrv.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `bind_addr`. |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
| `procedure` | -- | -- | Procedure storage options. | | `procedure` | -- | -- | Procedure storage options. |
| `procedure.max_retry_times` | Integer | `12` | Procedure max retry time. | | `procedure.max_retry_times` | Integer | `12` | Procedure max retry time. |
| `procedure.retry_delay` | String | `500ms` | Initial retry delay of procedures, increases exponentially | | `procedure.retry_delay` | String | `500ms` | Initial retry delay of procedures, increases exponentially |
@@ -375,11 +366,13 @@
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. | | `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 | | `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- | | `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `export_metrics` | -- | -- | The metasrv can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. | | `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. | | `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. | | `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
| `export_metrics.self_import.db` | String | Unset | -- |
| `export_metrics.remote_write` | -- | -- | -- | | `export_metrics.remote_write` | -- | -- | -- |
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. | | `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. | | `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. | | `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. | | `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
@@ -405,7 +398,6 @@
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. | | `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. | | `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. | | `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for datanode side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression. |
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. | | `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
| `grpc.tls.mode` | String | `disable` | TLS mode. | | `grpc.tls.mode` | String | `disable` | TLS mode. |
| `grpc.tls.cert_path` | String | Unset | Certificate file path. | | `grpc.tls.cert_path` | String | Unset | Certificate file path. |
@@ -448,7 +440,7 @@
| `query` | -- | -- | The query engine options. | | `query` | -- | -- | The query engine options. |
| `query.parallelism` | Integer | `0` | Parallelism of the query engine.<br/>Default to 0, which means the number of CPU cores. | | `query.parallelism` | Integer | `0` | Parallelism of the query engine.<br/>Default to 0, which means the number of CPU cores. |
| `storage` | -- | -- | The data storage options. | | `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `./greptimedb_data` | The working home directory. | | `storage.data_home` | String | `./greptimedb_data/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. | | `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. | | `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. | | `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
@@ -538,11 +530,13 @@
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. | | `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 | | `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- | | `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. | | `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. | | `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. | | `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
| `export_metrics.self_import.db` | String | Unset | -- |
| `export_metrics.remote_write` | -- | -- | -- | | `export_metrics.remote_write` | -- | -- | -- |
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. | | `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. | | `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. | | `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. | | `tracing.tokio_console_addr` | String | Unset | The tokio console address. |

View File

@@ -44,12 +44,6 @@ runtime_size = 8
max_recv_message_size = "512MB" max_recv_message_size = "512MB"
## The maximum send message size for gRPC server. ## The maximum send message size for gRPC server.
max_send_message_size = "512MB" max_send_message_size = "512MB"
## Compression mode for datanode side Arrow IPC service. Available options:
## - `none`: disable all compression
## - `transport`: only enable gRPC transport compression (zstd)
## - `arrow_ipc`: only enable Arrow IPC compression (lz4)
## - `all`: enable all compression.
flight_compression = "arrow_ipc"
## gRPC server TLS options, see `mysql.tls` section. ## gRPC server TLS options, see `mysql.tls` section.
[grpc.tls] [grpc.tls]
@@ -258,7 +252,7 @@ parallelism = 0
## The data storage options. ## The data storage options.
[storage] [storage]
## The working home directory. ## The working home directory.
data_home = "./greptimedb_data" data_home = "./greptimedb_data/"
## The storage type used to store the data. ## The storage type used to store the data.
## - `File`: the data is stored in the local file system. ## - `File`: the data is stored in the local file system.
@@ -641,16 +635,24 @@ max_log_files = 720
[logging.tracing_sample_ratio] [logging.tracing_sample_ratio]
default_ratio = 1.0 default_ratio = 1.0
## The datanode can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API. ## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. ## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
[export_metrics] [export_metrics]
## whether enable export metrics. ## whether enable export metrics.
enable = false enable = false
## The interval of export metrics. ## The interval of export metrics.
write_interval = "30s" write_interval = "30s"
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
## You must create the database before enabling it.
[export_metrics.self_import]
## @toml2docs:none-default
db = "greptime_metrics"
[export_metrics.remote_write] [export_metrics.remote_write]
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. ## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
url = "" url = ""
## HTTP headers of Prometheus remote-write carry. ## HTTP headers of Prometheus remote-write carry.

View File

@@ -37,12 +37,6 @@ enable_cors = true
## Customize allowed origins for HTTP CORS. ## Customize allowed origins for HTTP CORS.
## @toml2docs:none-default ## @toml2docs:none-default
cors_allowed_origins = ["https://example.com"] cors_allowed_origins = ["https://example.com"]
## Whether to enable validation for Prometheus remote write requests.
## Available options:
## - strict: deny invalid UTF-8 strings (default).
## - lossy: allow invalid UTF-8 strings, replace invalid characters with REPLACEMENT_CHARACTER(U+FFFD).
## - unchecked: do not valid strings.
prom_validation_mode = "strict"
## The gRPC server options. ## The gRPC server options.
[grpc] [grpc]
@@ -54,12 +48,6 @@ bind_addr = "127.0.0.1:4001"
server_addr = "127.0.0.1:4001" server_addr = "127.0.0.1:4001"
## The number of server worker threads. ## The number of server worker threads.
runtime_size = 8 runtime_size = 8
## Compression mode for frontend side Arrow IPC service. Available options:
## - `none`: disable all compression
## - `transport`: only enable gRPC transport compression (zstd)
## - `arrow_ipc`: only enable Arrow IPC compression (lz4)
## - `all`: enable all compression.
flight_compression = "arrow_ipc"
## gRPC server TLS options, see `mysql.tls` section. ## gRPC server TLS options, see `mysql.tls` section.
[grpc.tls] [grpc.tls]
@@ -253,16 +241,24 @@ sample_ratio = 1.0
## The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`. ## The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`.
ttl = "30d" ttl = "30d"
## The frontend can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API. ## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. ## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
[export_metrics] [export_metrics]
## whether enable export metrics. ## whether enable export metrics.
enable = false enable = false
## The interval of export metrics. ## The interval of export metrics.
write_interval = "30s" write_interval = "30s"
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
## You must create the database before enabling it.
[export_metrics.self_import]
## @toml2docs:none-default
db = "greptime_metrics"
[export_metrics.remote_write] [export_metrics.remote_write]
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. ## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
url = "" url = ""
## HTTP headers of Prometheus remote-write carry. ## HTTP headers of Prometheus remote-write carry.

View File

@@ -1,5 +1,13 @@
## The working home directory. ## The working home directory.
data_home = "./greptimedb_data" data_home = "./greptimedb_data/metasrv/"
## The bind address of metasrv.
bind_addr = "127.0.0.1:3002"
## The communication server address for the frontend and datanode to connect to metasrv.
## If left empty or unset, the server will automatically use the IP address of the first network interface
## on the host, with the same port number as the one specified in `bind_addr`.
server_addr = "127.0.0.1:3002"
## Store server address default to etcd store. ## Store server address default to etcd store.
## For postgres store, the format is: ## For postgres store, the format is:
@@ -16,7 +24,6 @@ store_key_prefix = ""
## - `etcd_store` (default value) ## - `etcd_store` (default value)
## - `memory_store` ## - `memory_store`
## - `postgres_store` ## - `postgres_store`
## - `mysql_store`
backend = "etcd_store" backend = "etcd_store"
## Table name in RDS to store metadata. Effect when using a RDS kvbackend. ## Table name in RDS to store metadata. Effect when using a RDS kvbackend.
@@ -60,32 +67,6 @@ node_max_idle_time = "24hours"
## The number of threads to execute the runtime for global write operations. ## The number of threads to execute the runtime for global write operations.
#+ compact_rt_size = 4 #+ compact_rt_size = 4
## The gRPC server options.
[grpc]
## The address to bind the gRPC server.
bind_addr = "127.0.0.1:3002"
## The communication server address for the frontend and datanode to connect to metasrv.
## If left empty or unset, the server will automatically use the IP address of the first network interface
## on the host, with the same port number as the one specified in `bind_addr`.
server_addr = "127.0.0.1:3002"
## The number of server worker threads.
runtime_size = 8
## The maximum receive message size for gRPC server.
max_recv_message_size = "512MB"
## The maximum send message size for gRPC server.
max_send_message_size = "512MB"
## The HTTP server options.
[http]
## The address to bind the HTTP server.
addr = "127.0.0.1:4000"
## HTTP request timeout. Set to 0 to disable timeout.
timeout = "0s"
## HTTP request body limit.
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
## Set to 0 to disable limit.
body_limit = "64MB"
## Procedure storage options. ## Procedure storage options.
[procedure] [procedure]
@@ -237,16 +218,24 @@ max_log_files = 720
[logging.tracing_sample_ratio] [logging.tracing_sample_ratio]
default_ratio = 1.0 default_ratio = 1.0
## The metasrv can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API. ## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. ## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
[export_metrics] [export_metrics]
## whether enable export metrics. ## whether enable export metrics.
enable = false enable = false
## The interval of export metrics. ## The interval of export metrics.
write_interval = "30s" write_interval = "30s"
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
## You must create the database before enabling it.
[export_metrics.self_import]
## @toml2docs:none-default
db = "greptime_metrics"
[export_metrics.remote_write] [export_metrics.remote_write]
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. ## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
url = "" url = ""
## HTTP headers of Prometheus remote-write carry. ## HTTP headers of Prometheus remote-write carry.

View File

@@ -43,13 +43,6 @@ enable_cors = true
## @toml2docs:none-default ## @toml2docs:none-default
cors_allowed_origins = ["https://example.com"] cors_allowed_origins = ["https://example.com"]
## Whether to enable validation for Prometheus remote write requests.
## Available options:
## - strict: deny invalid UTF-8 strings (default).
## - lossy: allow invalid UTF-8 strings, replace invalid characters with REPLACEMENT_CHARACTER(U+FFFD).
## - unchecked: do not valid strings.
prom_validation_mode = "strict"
## The gRPC server options. ## The gRPC server options.
[grpc] [grpc]
## The address to bind the gRPC server. ## The address to bind the gRPC server.
@@ -350,7 +343,7 @@ parallelism = 0
## The data storage options. ## The data storage options.
[storage] [storage]
## The working home directory. ## The working home directory.
data_home = "./greptimedb_data" data_home = "./greptimedb_data/"
## The storage type used to store the data. ## The storage type used to store the data.
## - `File`: the data is stored in the local file system. ## - `File`: the data is stored in the local file system.
@@ -750,11 +743,13 @@ default_ratio = 1.0
## @toml2docs:none-default ## @toml2docs:none-default
#+ sample_ratio = 1.0 #+ sample_ratio = 1.0
## The standalone can export its metrics and send to Prometheus compatible service (e.g. `greptimedb`) from remote-write API. ## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. ## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
[export_metrics] [export_metrics]
## whether enable export metrics. ## whether enable export metrics.
enable = false enable = false
## The interval of export metrics. ## The interval of export metrics.
write_interval = "30s" write_interval = "30s"
@@ -765,7 +760,7 @@ write_interval = "30s"
db = "greptime_metrics" db = "greptime_metrics"
[export_metrics.remote_write] [export_metrics.remote_write]
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. ## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
url = "" url = ""
## HTTP headers of Prometheus remote-write carry. ## HTTP headers of Prometheus remote-write carry.

View File

@@ -0,0 +1,75 @@
/*
* Copyright 2023 Greptime Team
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import * as core from "@actions/core";
import {obtainClient} from "@/common";
async function triggerWorkflow(workflowId: string, version: string) {
const docsClient = obtainClient("DOCS_REPO_TOKEN")
try {
await docsClient.rest.actions.createWorkflowDispatch({
owner: "GreptimeTeam",
repo: "docs",
workflow_id: workflowId,
ref: "main",
inputs: {
version,
},
});
console.log(`Successfully triggered ${workflowId} workflow with version ${version}`);
} catch (error) {
core.setFailed(`Failed to trigger workflow: ${error.message}`);
}
}
function determineWorkflow(version: string): [string, string] {
// Check if it's a nightly version
if (version.includes('nightly')) {
return ['bump-nightly-version.yml', version];
}
const parts = version.split('.');
if (parts.length !== 3) {
throw new Error('Invalid version format');
}
// If patch version (last number) is 0, it's a major version
// Return only major.minor version
if (parts[2] === '0') {
return ['bump-version.yml', `${parts[0]}.${parts[1]}`];
}
// Otherwise it's a patch version, use full version
return ['bump-patch-version.yml', version];
}
const version = process.env.VERSION;
if (!version) {
core.setFailed("VERSION environment variable is required");
process.exit(1);
}
// Remove 'v' prefix if exists
const cleanVersion = version.startsWith('v') ? version.slice(1) : version;
try {
const [workflowId, apiVersion] = determineWorkflow(cleanVersion);
triggerWorkflow(workflowId, apiVersion);
} catch (error) {
core.setFailed(`Error processing version: ${error.message}`);
process.exit(1);
}

View File

@@ -1,156 +0,0 @@
/*
* Copyright 2023 Greptime Team
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import * as core from "@actions/core";
import {obtainClient} from "@/common";
interface RepoConfig {
tokenEnv: string;
repo: string;
workflowLogic: (version: string) => [string, string] | null;
}
const REPO_CONFIGS: Record<string, RepoConfig> = {
website: {
tokenEnv: "WEBSITE_REPO_TOKEN",
repo: "website",
workflowLogic: (version: string) => {
// Skip nightly versions for website
if (version.includes('nightly')) {
console.log('Nightly version detected for website, skipping workflow trigger.');
return null;
}
return ['bump-patch-version.yml', version];
}
},
demo: {
tokenEnv: "DEMO_REPO_TOKEN",
repo: "demo-scene",
workflowLogic: (version: string) => {
// Skip nightly versions for demo
if (version.includes('nightly')) {
console.log('Nightly version detected for demo, skipping workflow trigger.');
return null;
}
return ['bump-patch-version.yml', version];
}
},
docs: {
tokenEnv: "DOCS_REPO_TOKEN",
repo: "docs",
workflowLogic: (version: string) => {
// Check if it's a nightly version
if (version.includes('nightly')) {
return ['bump-nightly-version.yml', version];
}
const parts = version.split('.');
if (parts.length !== 3) {
throw new Error('Invalid version format');
}
// If patch version (last number) is 0, it's a major version
// Return only major.minor version
if (parts[2] === '0') {
return ['bump-version.yml', `${parts[0]}.${parts[1]}`];
}
// Otherwise it's a patch version, use full version
return ['bump-patch-version.yml', version];
}
}
};
async function triggerWorkflow(repoConfig: RepoConfig, workflowId: string, version: string) {
const client = obtainClient(repoConfig.tokenEnv);
try {
await client.rest.actions.createWorkflowDispatch({
owner: "GreptimeTeam",
repo: repoConfig.repo,
workflow_id: workflowId,
ref: "main",
inputs: {
version,
},
});
console.log(`Successfully triggered ${workflowId} workflow for ${repoConfig.repo} with version ${version}`);
} catch (error) {
core.setFailed(`Failed to trigger workflow for ${repoConfig.repo}: ${error.message}`);
throw error;
}
}
async function processRepo(repoName: string, version: string) {
const repoConfig = REPO_CONFIGS[repoName];
if (!repoConfig) {
throw new Error(`Unknown repository: ${repoName}`);
}
try {
const workflowResult = repoConfig.workflowLogic(version);
if (workflowResult === null) {
// Skip this repo (e.g., nightly version for website)
return;
}
const [workflowId, apiVersion] = workflowResult;
await triggerWorkflow(repoConfig, workflowId, apiVersion);
} catch (error) {
core.setFailed(`Error processing ${repoName} with version ${version}: ${error.message}`);
throw error;
}
}
async function main() {
const version = process.env.VERSION;
if (!version) {
core.setFailed("VERSION environment variable is required");
process.exit(1);
}
// Remove 'v' prefix if exists
const cleanVersion = version.startsWith('v') ? version.slice(1) : version;
// Get target repositories from environment variable
// Default to both if not specified
const targetRepos = process.env.TARGET_REPOS?.split(',').map(repo => repo.trim()) || ['website', 'docs'];
console.log(`Processing version ${cleanVersion} for repositories: ${targetRepos.join(', ')}`);
const errors: string[] = [];
// Process each repository
for (const repo of targetRepos) {
try {
await processRepo(repo, cleanVersion);
} catch (error) {
errors.push(`${repo}: ${error.message}`);
}
}
if (errors.length > 0) {
core.setFailed(`Failed to process some repositories: ${errors.join('; ')}`);
process.exit(1);
}
console.log('All repositories processed successfully');
}
// Execute main function
main().catch((error) => {
core.setFailed(`Unexpected error: ${error.message}`);
process.exit(1);
});

View File

@@ -0,0 +1,57 @@
/*
* Copyright 2023 Greptime Team
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import * as core from "@actions/core";
import {obtainClient} from "@/common";
async function triggerWorkflow(workflowId: string, version: string) {
const websiteClient = obtainClient("WEBSITE_REPO_TOKEN")
try {
await websiteClient.rest.actions.createWorkflowDispatch({
owner: "GreptimeTeam",
repo: "website",
workflow_id: workflowId,
ref: "main",
inputs: {
version,
},
});
console.log(`Successfully triggered ${workflowId} workflow with version ${version}`);
} catch (error) {
core.setFailed(`Failed to trigger workflow: ${error.message}`);
}
}
const version = process.env.VERSION;
if (!version) {
core.setFailed("VERSION environment variable is required");
process.exit(1);
}
// Remove 'v' prefix if exists
const cleanVersion = version.startsWith('v') ? version.slice(1) : version;
if (cleanVersion.includes('nightly')) {
console.log('Nightly version detected, skipping workflow trigger.');
process.exit(0);
}
try {
triggerWorkflow('bump-patch-version.yml', cleanVersion);
} catch (error) {
core.setFailed(`Error processing version: ${error.message}`);
process.exit(1);
}

View File

@@ -1,6 +1,6 @@
# Profile memory usage of GreptimeDB # Profile memory usage of GreptimeDB
This crate provides an easy approach to dump memory profiling info. A set of ready to use scripts is provided in [docs/how-to/memory-profile-scripts](./memory-profile-scripts/scripts). This crate provides an easy approach to dump memory profiling info. A set of ready to use scripts is provided in [docs/how-to/memory-profile-scripts](docs/how-to/memory-profile-scripts).
## Prerequisites ## Prerequisites
### jemalloc ### jemalloc

8
flake.lock generated
View File

@@ -41,16 +41,16 @@
}, },
"nixpkgs": { "nixpkgs": {
"locked": { "locked": {
"lastModified": 1748162331, "lastModified": 1745487689,
"narHash": "sha256-rqc2RKYTxP3tbjA+PB3VMRQNnjesrT0pEofXQTrMsS8=", "narHash": "sha256-FQoi3R0NjQeBAsEOo49b5tbDPcJSMWc3QhhaIi9eddw=",
"owner": "NixOS", "owner": "NixOS",
"repo": "nixpkgs", "repo": "nixpkgs",
"rev": "7c43f080a7f28b2774f3b3f43234ca11661bf334", "rev": "5630cf13cceac06cefe9fc607e8dfa8fb342dde3",
"type": "github" "type": "github"
}, },
"original": { "original": {
"owner": "NixOS", "owner": "NixOS",
"ref": "nixos-25.05", "ref": "nixos-24.11",
"repo": "nixpkgs", "repo": "nixpkgs",
"type": "github" "type": "github"
} }

View File

@@ -2,7 +2,7 @@
description = "Development environment flake"; description = "Development environment flake";
inputs = { inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-25.05"; nixpkgs.url = "github:NixOS/nixpkgs/nixos-24.11";
fenix = { fenix = {
url = "github:nix-community/fenix"; url = "github:nix-community/fenix";
inputs.nixpkgs.follows = "nixpkgs"; inputs.nixpkgs.follows = "nixpkgs";
@@ -51,7 +51,6 @@
]; ];
LD_LIBRARY_PATH = pkgs.lib.makeLibraryPath buildInputs; LD_LIBRARY_PATH = pkgs.lib.makeLibraryPath buildInputs;
NIX_HARDENING_ENABLE = "";
}; };
}); });
} }

File diff suppressed because it is too large Load Diff

View File

@@ -60,7 +60,7 @@
| Read Stage P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_read_stage_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))` | `timeseries` | Read Stage P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]` | | Read Stage P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_read_stage_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))` | `timeseries` | Read Stage P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]` |
| Write Stage P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_write_stage_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))` | `timeseries` | Write Stage P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]` | | Write Stage P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_write_stage_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))` | `timeseries` | Write Stage P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]` |
| Compaction OPS per Instance | `sum by(instance, pod) (rate(greptime_mito_compaction_total_elapsed_count{instance=~"$datanode"}[$__rate_interval]))` | `timeseries` | Compaction OPS per Instance. | `prometheus` | `ops` | `[{{ instance }}]-[{{pod}}]` | | Compaction OPS per Instance | `sum by(instance, pod) (rate(greptime_mito_compaction_total_elapsed_count{instance=~"$datanode"}[$__rate_interval]))` | `timeseries` | Compaction OPS per Instance. | `prometheus` | `ops` | `[{{ instance }}]-[{{pod}}]` |
| Compaction Elapsed Time per Instance by Stage | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))`<br/>`sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{instance=~"$datanode"}[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_count{instance=~"$datanode"}[$__rate_interval]))` | `timeseries` | Compaction latency by stage | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-p99` | | Compaction P99 per Instance by Stage | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))` | `timeseries` | Compaction latency by stage | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-p99` |
| Compaction P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le,stage) (rate(greptime_mito_compaction_total_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))` | `timeseries` | Compaction P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-compaction` | | Compaction P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le,stage) (rate(greptime_mito_compaction_total_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))` | `timeseries` | Compaction P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-compaction` |
| WAL write size | `histogram_quantile(0.95, sum by(le,instance, pod) (rate(raft_engine_write_size_bucket[$__rate_interval])))`<br/>`histogram_quantile(0.99, sum by(le,instance,pod) (rate(raft_engine_write_size_bucket[$__rate_interval])))`<br/>`sum by (instance, pod)(rate(raft_engine_write_size_sum[$__rate_interval]))` | `timeseries` | Write-ahead logs write size as bytes. This chart includes stats of p95 and p99 size by instance, total WAL write rate. | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-req-size-p95` | | WAL write size | `histogram_quantile(0.95, sum by(le,instance, pod) (rate(raft_engine_write_size_bucket[$__rate_interval])))`<br/>`histogram_quantile(0.99, sum by(le,instance,pod) (rate(raft_engine_write_size_bucket[$__rate_interval])))`<br/>`sum by (instance, pod)(rate(raft_engine_write_size_sum[$__rate_interval]))` | `timeseries` | Write-ahead logs write size as bytes. This chart includes stats of p95 and p99 size by instance, total WAL write rate. | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-req-size-p95` |
| Cached Bytes per Instance | `greptime_mito_cache_bytes{instance=~"$datanode"}` | `timeseries` | Cached Bytes per Instance. | `prometheus` | `decbytes` | `[{{instance}}]-[{{pod}}]-[{{type}}]` | | Cached Bytes per Instance | `greptime_mito_cache_bytes{instance=~"$datanode"}` | `timeseries` | Cached Bytes per Instance. | `prometheus` | `decbytes` | `[{{instance}}]-[{{pod}}]-[{{type}}]` |
@@ -69,8 +69,7 @@
| Log Store op duration seconds | `histogram_quantile(0.99, sum by(le,logstore,optype,instance, pod) (rate(greptime_logstore_op_elapsed_bucket[$__rate_interval])))` | `timeseries` | Write-ahead log operations latency at p99 | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{logstore}}]-[{{optype}}]-p99` | | Log Store op duration seconds | `histogram_quantile(0.99, sum by(le,logstore,optype,instance, pod) (rate(greptime_logstore_op_elapsed_bucket[$__rate_interval])))` | `timeseries` | Write-ahead log operations latency at p99 | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{logstore}}]-[{{optype}}]-p99` |
| Inflight Flush | `greptime_mito_inflight_flush_count` | `timeseries` | Ongoing flush task count | `prometheus` | `none` | `[{{instance}}]-[{{pod}}]` | | Inflight Flush | `greptime_mito_inflight_flush_count` | `timeseries` | Ongoing flush task count | `prometheus` | `none` | `[{{instance}}]-[{{pod}}]` |
| Compaction Input/Output Bytes | `sum by(instance, pod) (greptime_mito_compaction_input_bytes)`<br/>`sum by(instance, pod) (greptime_mito_compaction_output_bytes)` | `timeseries` | Compaction oinput output bytes | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-input` | | Compaction Input/Output Bytes | `sum by(instance, pod) (greptime_mito_compaction_input_bytes)`<br/>`sum by(instance, pod) (greptime_mito_compaction_output_bytes)` | `timeseries` | Compaction oinput output bytes | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-input` |
| Region Worker Handle Bulk Insert Requests | `histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))`<br/>`sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to handle bulk insert region requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` | | Region Worker Handle Bulk Insert Requests | `histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to handle bulk insert region requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
| Region Worker Convert Requests | `histogram_quantile(0.95, sum by(le, instance, stage, pod) (rate(greptime_datanode_convert_region_request_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to decode requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
# OpenDAL # OpenDAL
| Title | Query | Type | Description | Datasource | Unit | Legend Format | | Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- | | --- | --- | --- | --- | --- | --- | --- |
@@ -88,19 +87,9 @@
# Metasrv # Metasrv
| Title | Query | Type | Description | Datasource | Unit | Legend Format | | Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- | | --- | --- | --- | --- | --- | --- | --- |
| Region migration datanode | `greptime_meta_region_migration_stat{datanode_type="src"}`<br/>`greptime_meta_region_migration_stat{datanode_type="desc"}` | `status-history` | Counter of region migration by source and destination | `prometheus` | -- | `from-datanode-{{datanode_id}}` | | Region migration datanode | `greptime_meta_region_migration_stat{datanode_type="src"}`<br/>`greptime_meta_region_migration_stat{datanode_type="desc"}` | `state-timeline` | Counter of region migration by source and destination | `prometheus` | `none` | `from-datanode-{{datanode_id}}` |
| Region migration error | `greptime_meta_region_migration_error` | `timeseries` | Counter of region migration error | `prometheus` | `none` | `{{pod}}-{{state}}-{{error_type}}` | | Region migration error | `greptime_meta_region_migration_error` | `timeseries` | Counter of region migration error | `prometheus` | `none` | `__auto` |
| Datanode load | `greptime_datanode_load` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `binBps` | `Datanode-{{datanode_id}}-writeload` | | Datanode load | `greptime_datanode_load` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `__auto` |
| Rate of SQL Executions (RDS) | `rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_count[$__rate_interval])` | `timeseries` | Displays the rate of SQL executions processed by the Meta service using the RDS backend. | `prometheus` | `none` | `{{pod}} {{op}} {{type}} {{result}} ` |
| SQL Execution Latency (RDS) | `histogram_quantile(0.90, sum by(pod, op, type, result, le) (rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_bucket[$__rate_interval])))` | `timeseries` | Measures the response time of SQL executions via the RDS backend. | `prometheus` | `ms` | `{{pod}} {{op}} {{type}} {{result}} p90` |
| Handler Execution Latency | `histogram_quantile(0.90, sum by(pod, le, name) (
rate(greptime_meta_handler_execute_bucket[$__rate_interval])
))` | `timeseries` | Shows latency of Meta handlers by pod and handler name, useful for monitoring handler performance and detecting latency spikes.<br/> | `prometheus` | `s` | `{{pod}} {{name}} p90` |
| Heartbeat Packet Size | `histogram_quantile(0.9, sum by(pod, le) (greptime_meta_heartbeat_stat_memory_size_bucket))` | `timeseries` | Shows p90 heartbeat message sizes, helping track network usage and identify anomalies in heartbeat payload.<br/> | `prometheus` | `bytes` | `{{pod}}` |
| Meta Heartbeat Receive Rate | `rate(greptime_meta_heartbeat_rate[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `{{pod}}` |
| Meta KV Ops Latency | `histogram_quantile(0.99, sum by(pod, le, op, target) (greptime_meta_kv_request_elapsed_bucket))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `{{pod}}-{{op}} p99` |
| Rate of meta KV Ops | `rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `{{pod}}-{{op}} p99` |
| DDL Latency | `histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `CreateLogicalTables-{{step}} p90` |
# Flownode # Flownode
| Title | Query | Type | Description | Datasource | Unit | Legend Format | | Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- | | --- | --- | --- | --- | --- | --- | --- |

View File

@@ -487,7 +487,7 @@ groups:
type: prometheus type: prometheus
uid: ${metrics} uid: ${metrics}
legendFormat: '[{{ instance }}]-[{{pod}}]' legendFormat: '[{{ instance }}]-[{{pod}}]'
- title: Compaction Elapsed Time per Instance by Stage - title: Compaction P99 per Instance by Stage
type: timeseries type: timeseries
description: Compaction latency by stage description: Compaction latency by stage
unit: s unit: s
@@ -497,11 +497,6 @@ groups:
type: prometheus type: prometheus
uid: ${metrics} uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-p99' legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-p99'
- expr: sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{instance=~"$datanode"}[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_count{instance=~"$datanode"}[$__rate_interval]))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-avg'
- title: Compaction P99 per Instance - title: Compaction P99 per Instance
type: timeseries type: timeseries
description: Compaction P99 per Instance. description: Compaction P99 per Instance.
@@ -607,22 +602,7 @@ groups:
type: prometheus type: prometheus
uid: ${metrics} uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95' legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
- expr: sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval])) - expr: sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG'
- title: Region Worker Convert Requests
type: timeseries
description: Per-stage elapsed time for region worker to decode requests.
unit: s
queries:
- expr: histogram_quantile(0.95, sum by(le, instance, stage, pod) (rate(greptime_datanode_convert_region_request_bucket[$__rate_interval])))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
- expr: sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_count[$__rate_interval]))
datasource: datasource:
type: prometheus type: prometheus
uid: ${metrics} uid: ${metrics}
@@ -741,8 +721,9 @@ groups:
- title: Metasrv - title: Metasrv
panels: panels:
- title: Region migration datanode - title: Region migration datanode
type: status-history type: state-timeline
description: Counter of region migration by source and destination description: Counter of region migration by source and destination
unit: none
queries: queries:
- expr: greptime_meta_region_migration_stat{datanode_type="src"} - expr: greptime_meta_region_migration_stat{datanode_type="src"}
datasource: datasource:
@@ -763,127 +744,17 @@ groups:
datasource: datasource:
type: prometheus type: prometheus
uid: ${metrics} uid: ${metrics}
legendFormat: '{{pod}}-{{state}}-{{error_type}}' legendFormat: __auto
- title: Datanode load - title: Datanode load
type: timeseries type: timeseries
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
unit: binBps unit: none
queries: queries:
- expr: greptime_datanode_load - expr: greptime_datanode_load
datasource: datasource:
type: prometheus type: prometheus
uid: ${metrics} uid: ${metrics}
legendFormat: Datanode-{{datanode_id}}-writeload legendFormat: __auto
- title: Rate of SQL Executions (RDS)
type: timeseries
description: Displays the rate of SQL executions processed by the Meta service using the RDS backend.
unit: none
queries:
- expr: rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_count[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}} {{op}} {{type}} {{result}} '
- title: SQL Execution Latency (RDS)
type: timeseries
description: 'Measures the response time of SQL executions via the RDS backend. '
unit: ms
queries:
- expr: histogram_quantile(0.90, sum by(pod, op, type, result, le) (rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_bucket[$__rate_interval])))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}} {{op}} {{type}} {{result}} p90'
- title: Handler Execution Latency
type: timeseries
description: |
Shows latency of Meta handlers by pod and handler name, useful for monitoring handler performance and detecting latency spikes.
unit: s
queries:
- expr: |-
histogram_quantile(0.90, sum by(pod, le, name) (
rate(greptime_meta_handler_execute_bucket[$__rate_interval])
))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}} {{name}} p90'
- title: Heartbeat Packet Size
type: timeseries
description: |
Shows p90 heartbeat message sizes, helping track network usage and identify anomalies in heartbeat payload.
unit: bytes
queries:
- expr: histogram_quantile(0.9, sum by(pod, le) (greptime_meta_heartbeat_stat_memory_size_bucket))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}'
- title: Meta Heartbeat Receive Rate
type: timeseries
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
unit: s
queries:
- expr: rate(greptime_meta_heartbeat_rate[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}'
- title: Meta KV Ops Latency
type: timeseries
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
unit: s
queries:
- expr: histogram_quantile(0.99, sum by(pod, le, op, target) (greptime_meta_kv_request_elapsed_bucket))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{op}} p99'
- title: Rate of meta KV Ops
type: timeseries
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
unit: none
queries:
- expr: rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{op}} p99'
- title: DDL Latency
type: timeseries
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
unit: s
queries:
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: CreateLogicalTables-{{step}} p90
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: CreateTable-{{step}} p90
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: CreateView-{{step}} p90
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: CreateFlow-{{step}} p90
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: DropTable-{{step}} p90
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: AlterTable-{{step}} p90
- title: Flownode - title: Flownode
panels: panels:
- title: Flow Ingest / Output Rate - title: Flow Ingest / Output Rate

File diff suppressed because it is too large Load Diff

View File

@@ -60,7 +60,7 @@
| Read Stage P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_read_stage_elapsed_bucket{}[$__rate_interval])))` | `timeseries` | Read Stage P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]` | | Read Stage P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_read_stage_elapsed_bucket{}[$__rate_interval])))` | `timeseries` | Read Stage P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]` |
| Write Stage P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_write_stage_elapsed_bucket{}[$__rate_interval])))` | `timeseries` | Write Stage P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]` | | Write Stage P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_write_stage_elapsed_bucket{}[$__rate_interval])))` | `timeseries` | Write Stage P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]` |
| Compaction OPS per Instance | `sum by(instance, pod) (rate(greptime_mito_compaction_total_elapsed_count{}[$__rate_interval]))` | `timeseries` | Compaction OPS per Instance. | `prometheus` | `ops` | `[{{ instance }}]-[{{pod}}]` | | Compaction OPS per Instance | `sum by(instance, pod) (rate(greptime_mito_compaction_total_elapsed_count{}[$__rate_interval]))` | `timeseries` | Compaction OPS per Instance. | `prometheus` | `ops` | `[{{ instance }}]-[{{pod}}]` |
| Compaction Elapsed Time per Instance by Stage | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{}[$__rate_interval])))`<br/>`sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{}[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_count{}[$__rate_interval]))` | `timeseries` | Compaction latency by stage | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-p99` | | Compaction P99 per Instance by Stage | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{}[$__rate_interval])))` | `timeseries` | Compaction latency by stage | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-p99` |
| Compaction P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le,stage) (rate(greptime_mito_compaction_total_elapsed_bucket{}[$__rate_interval])))` | `timeseries` | Compaction P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-compaction` | | Compaction P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le,stage) (rate(greptime_mito_compaction_total_elapsed_bucket{}[$__rate_interval])))` | `timeseries` | Compaction P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-compaction` |
| WAL write size | `histogram_quantile(0.95, sum by(le,instance, pod) (rate(raft_engine_write_size_bucket[$__rate_interval])))`<br/>`histogram_quantile(0.99, sum by(le,instance,pod) (rate(raft_engine_write_size_bucket[$__rate_interval])))`<br/>`sum by (instance, pod)(rate(raft_engine_write_size_sum[$__rate_interval]))` | `timeseries` | Write-ahead logs write size as bytes. This chart includes stats of p95 and p99 size by instance, total WAL write rate. | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-req-size-p95` | | WAL write size | `histogram_quantile(0.95, sum by(le,instance, pod) (rate(raft_engine_write_size_bucket[$__rate_interval])))`<br/>`histogram_quantile(0.99, sum by(le,instance,pod) (rate(raft_engine_write_size_bucket[$__rate_interval])))`<br/>`sum by (instance, pod)(rate(raft_engine_write_size_sum[$__rate_interval]))` | `timeseries` | Write-ahead logs write size as bytes. This chart includes stats of p95 and p99 size by instance, total WAL write rate. | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-req-size-p95` |
| Cached Bytes per Instance | `greptime_mito_cache_bytes{}` | `timeseries` | Cached Bytes per Instance. | `prometheus` | `decbytes` | `[{{instance}}]-[{{pod}}]-[{{type}}]` | | Cached Bytes per Instance | `greptime_mito_cache_bytes{}` | `timeseries` | Cached Bytes per Instance. | `prometheus` | `decbytes` | `[{{instance}}]-[{{pod}}]-[{{type}}]` |
@@ -69,8 +69,7 @@
| Log Store op duration seconds | `histogram_quantile(0.99, sum by(le,logstore,optype,instance, pod) (rate(greptime_logstore_op_elapsed_bucket[$__rate_interval])))` | `timeseries` | Write-ahead log operations latency at p99 | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{logstore}}]-[{{optype}}]-p99` | | Log Store op duration seconds | `histogram_quantile(0.99, sum by(le,logstore,optype,instance, pod) (rate(greptime_logstore_op_elapsed_bucket[$__rate_interval])))` | `timeseries` | Write-ahead log operations latency at p99 | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{logstore}}]-[{{optype}}]-p99` |
| Inflight Flush | `greptime_mito_inflight_flush_count` | `timeseries` | Ongoing flush task count | `prometheus` | `none` | `[{{instance}}]-[{{pod}}]` | | Inflight Flush | `greptime_mito_inflight_flush_count` | `timeseries` | Ongoing flush task count | `prometheus` | `none` | `[{{instance}}]-[{{pod}}]` |
| Compaction Input/Output Bytes | `sum by(instance, pod) (greptime_mito_compaction_input_bytes)`<br/>`sum by(instance, pod) (greptime_mito_compaction_output_bytes)` | `timeseries` | Compaction oinput output bytes | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-input` | | Compaction Input/Output Bytes | `sum by(instance, pod) (greptime_mito_compaction_input_bytes)`<br/>`sum by(instance, pod) (greptime_mito_compaction_output_bytes)` | `timeseries` | Compaction oinput output bytes | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-input` |
| Region Worker Handle Bulk Insert Requests | `histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))`<br/>`sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to handle bulk insert region requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` | | Region Worker Handle Bulk Insert Requests | `histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to handle bulk insert region requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
| Region Worker Convert Requests | `histogram_quantile(0.95, sum by(le, instance, stage, pod) (rate(greptime_datanode_convert_region_request_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to decode requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
# OpenDAL # OpenDAL
| Title | Query | Type | Description | Datasource | Unit | Legend Format | | Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- | | --- | --- | --- | --- | --- | --- | --- |
@@ -88,19 +87,9 @@
# Metasrv # Metasrv
| Title | Query | Type | Description | Datasource | Unit | Legend Format | | Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- | | --- | --- | --- | --- | --- | --- | --- |
| Region migration datanode | `greptime_meta_region_migration_stat{datanode_type="src"}`<br/>`greptime_meta_region_migration_stat{datanode_type="desc"}` | `status-history` | Counter of region migration by source and destination | `prometheus` | -- | `from-datanode-{{datanode_id}}` | | Region migration datanode | `greptime_meta_region_migration_stat{datanode_type="src"}`<br/>`greptime_meta_region_migration_stat{datanode_type="desc"}` | `state-timeline` | Counter of region migration by source and destination | `prometheus` | `none` | `from-datanode-{{datanode_id}}` |
| Region migration error | `greptime_meta_region_migration_error` | `timeseries` | Counter of region migration error | `prometheus` | `none` | `{{pod}}-{{state}}-{{error_type}}` | | Region migration error | `greptime_meta_region_migration_error` | `timeseries` | Counter of region migration error | `prometheus` | `none` | `__auto` |
| Datanode load | `greptime_datanode_load` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `binBps` | `Datanode-{{datanode_id}}-writeload` | | Datanode load | `greptime_datanode_load` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `__auto` |
| Rate of SQL Executions (RDS) | `rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_count[$__rate_interval])` | `timeseries` | Displays the rate of SQL executions processed by the Meta service using the RDS backend. | `prometheus` | `none` | `{{pod}} {{op}} {{type}} {{result}} ` |
| SQL Execution Latency (RDS) | `histogram_quantile(0.90, sum by(pod, op, type, result, le) (rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_bucket[$__rate_interval])))` | `timeseries` | Measures the response time of SQL executions via the RDS backend. | `prometheus` | `ms` | `{{pod}} {{op}} {{type}} {{result}} p90` |
| Handler Execution Latency | `histogram_quantile(0.90, sum by(pod, le, name) (
rate(greptime_meta_handler_execute_bucket[$__rate_interval])
))` | `timeseries` | Shows latency of Meta handlers by pod and handler name, useful for monitoring handler performance and detecting latency spikes.<br/> | `prometheus` | `s` | `{{pod}} {{name}} p90` |
| Heartbeat Packet Size | `histogram_quantile(0.9, sum by(pod, le) (greptime_meta_heartbeat_stat_memory_size_bucket))` | `timeseries` | Shows p90 heartbeat message sizes, helping track network usage and identify anomalies in heartbeat payload.<br/> | `prometheus` | `bytes` | `{{pod}}` |
| Meta Heartbeat Receive Rate | `rate(greptime_meta_heartbeat_rate[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `{{pod}}` |
| Meta KV Ops Latency | `histogram_quantile(0.99, sum by(pod, le, op, target) (greptime_meta_kv_request_elapsed_bucket))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `{{pod}}-{{op}} p99` |
| Rate of meta KV Ops | `rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `{{pod}}-{{op}} p99` |
| DDL Latency | `histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `CreateLogicalTables-{{step}} p90` |
# Flownode # Flownode
| Title | Query | Type | Description | Datasource | Unit | Legend Format | | Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- | | --- | --- | --- | --- | --- | --- | --- |

View File

@@ -487,7 +487,7 @@ groups:
type: prometheus type: prometheus
uid: ${metrics} uid: ${metrics}
legendFormat: '[{{ instance }}]-[{{pod}}]' legendFormat: '[{{ instance }}]-[{{pod}}]'
- title: Compaction Elapsed Time per Instance by Stage - title: Compaction P99 per Instance by Stage
type: timeseries type: timeseries
description: Compaction latency by stage description: Compaction latency by stage
unit: s unit: s
@@ -497,11 +497,6 @@ groups:
type: prometheus type: prometheus
uid: ${metrics} uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-p99' legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-p99'
- expr: sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{}[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_count{}[$__rate_interval]))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-avg'
- title: Compaction P99 per Instance - title: Compaction P99 per Instance
type: timeseries type: timeseries
description: Compaction P99 per Instance. description: Compaction P99 per Instance.
@@ -607,22 +602,7 @@ groups:
type: prometheus type: prometheus
uid: ${metrics} uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95' legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
- expr: sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval])) - expr: sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG'
- title: Region Worker Convert Requests
type: timeseries
description: Per-stage elapsed time for region worker to decode requests.
unit: s
queries:
- expr: histogram_quantile(0.95, sum by(le, instance, stage, pod) (rate(greptime_datanode_convert_region_request_bucket[$__rate_interval])))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
- expr: sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_count[$__rate_interval]))
datasource: datasource:
type: prometheus type: prometheus
uid: ${metrics} uid: ${metrics}
@@ -741,8 +721,9 @@ groups:
- title: Metasrv - title: Metasrv
panels: panels:
- title: Region migration datanode - title: Region migration datanode
type: status-history type: state-timeline
description: Counter of region migration by source and destination description: Counter of region migration by source and destination
unit: none
queries: queries:
- expr: greptime_meta_region_migration_stat{datanode_type="src"} - expr: greptime_meta_region_migration_stat{datanode_type="src"}
datasource: datasource:
@@ -763,127 +744,17 @@ groups:
datasource: datasource:
type: prometheus type: prometheus
uid: ${metrics} uid: ${metrics}
legendFormat: '{{pod}}-{{state}}-{{error_type}}' legendFormat: __auto
- title: Datanode load - title: Datanode load
type: timeseries type: timeseries
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
unit: binBps unit: none
queries: queries:
- expr: greptime_datanode_load - expr: greptime_datanode_load
datasource: datasource:
type: prometheus type: prometheus
uid: ${metrics} uid: ${metrics}
legendFormat: Datanode-{{datanode_id}}-writeload legendFormat: __auto
- title: Rate of SQL Executions (RDS)
type: timeseries
description: Displays the rate of SQL executions processed by the Meta service using the RDS backend.
unit: none
queries:
- expr: rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_count[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}} {{op}} {{type}} {{result}} '
- title: SQL Execution Latency (RDS)
type: timeseries
description: 'Measures the response time of SQL executions via the RDS backend. '
unit: ms
queries:
- expr: histogram_quantile(0.90, sum by(pod, op, type, result, le) (rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_bucket[$__rate_interval])))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}} {{op}} {{type}} {{result}} p90'
- title: Handler Execution Latency
type: timeseries
description: |
Shows latency of Meta handlers by pod and handler name, useful for monitoring handler performance and detecting latency spikes.
unit: s
queries:
- expr: |-
histogram_quantile(0.90, sum by(pod, le, name) (
rate(greptime_meta_handler_execute_bucket[$__rate_interval])
))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}} {{name}} p90'
- title: Heartbeat Packet Size
type: timeseries
description: |
Shows p90 heartbeat message sizes, helping track network usage and identify anomalies in heartbeat payload.
unit: bytes
queries:
- expr: histogram_quantile(0.9, sum by(pod, le) (greptime_meta_heartbeat_stat_memory_size_bucket))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}'
- title: Meta Heartbeat Receive Rate
type: timeseries
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
unit: s
queries:
- expr: rate(greptime_meta_heartbeat_rate[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}'
- title: Meta KV Ops Latency
type: timeseries
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
unit: s
queries:
- expr: histogram_quantile(0.99, sum by(pod, le, op, target) (greptime_meta_kv_request_elapsed_bucket))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{op}} p99'
- title: Rate of meta KV Ops
type: timeseries
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
unit: none
queries:
- expr: rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{op}} p99'
- title: DDL Latency
type: timeseries
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
unit: s
queries:
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: CreateLogicalTables-{{step}} p90
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: CreateTable-{{step}} p90
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: CreateView-{{step}} p90
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: CreateFlow-{{step}} p90
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: DropTable-{{step}} p90
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: AlterTable-{{step}} p90
- title: Flownode - title: Flownode
panels: panels:
- title: Flow Ingest / Output Rate - title: Flow Ingest / Output Rate

View File

@@ -6,7 +6,7 @@ DAC_IMAGE=ghcr.io/zyy17/dac:20250423-522bd35
remove_instance_filters() { remove_instance_filters() {
# Remove the instance filters for the standalone dashboards. # Remove the instance filters for the standalone dashboards.
sed -E 's/instance=~\\"(\$datanode|\$frontend|\$metasrv|\$flownode)\\",?//g' "$CLUSTER_DASHBOARD_DIR/dashboard.json" > "$STANDALONE_DASHBOARD_DIR/dashboard.json" sed 's/instance=~\\"$datanode\\",//; s/instance=~\\"$datanode\\"//; s/instance=~\\"$frontend\\",//; s/instance=~\\"$frontend\\"//; s/instance=~\\"$metasrv\\",//; s/instance=~\\"$metasrv\\"//; s/instance=~\\"$flownode\\",//; s/instance=~\\"$flownode\\"//;' $CLUSTER_DASHBOARD_DIR/dashboard.json > $STANDALONE_DASHBOARD_DIR/dashboard.json
} }
generate_intermediate_dashboards_and_docs() { generate_intermediate_dashboards_and_docs() {

View File

@@ -26,13 +26,6 @@ excludes = [
"src/common/base/src/secrets.rs", "src/common/base/src/secrets.rs",
"src/servers/src/repeated_field.rs", "src/servers/src/repeated_field.rs",
"src/servers/src/http/test_helpers.rs", "src/servers/src/http/test_helpers.rs",
# enterprise
"src/common/meta/src/rpc/ddl/trigger.rs",
"src/operator/src/expr_helper/trigger.rs",
"src/sql/src/statements/create/trigger.rs",
"src/sql/src/statements/show/trigger.rs",
"src/sql/src/parsers/create_parser/trigger.rs",
"src/sql/src/parsers/show_parser/trigger.rs",
] ]
[properties] [properties]

View File

@@ -5,12 +5,8 @@ edition.workspace = true
license.workspace = true license.workspace = true
[features] [features]
default = [ pg_kvbackend = ["common-meta/pg_kvbackend"]
"pg_kvbackend", mysql_kvbackend = ["common-meta/mysql_kvbackend"]
"mysql_kvbackend",
]
pg_kvbackend = ["common-meta/pg_kvbackend", "meta-srv/pg_kvbackend"]
mysql_kvbackend = ["common-meta/mysql_kvbackend", "meta-srv/mysql_kvbackend"]
[lints] [lints]
workspace = true workspace = true
@@ -47,9 +43,11 @@ etcd-client.workspace = true
futures.workspace = true futures.workspace = true
humantime.workspace = true humantime.workspace = true
meta-client.workspace = true meta-client.workspace = true
meta-srv.workspace = true
nu-ansi-term = "0.46" nu-ansi-term = "0.46"
object-store.workspace = true opendal = { version = "0.51.1", features = [
"services-fs",
"services-s3",
] }
query.workspace = true query.workspace = true
rand.workspace = true rand.workspace = true
reqwest.workspace = true reqwest.workspace = true

View File

@@ -17,7 +17,6 @@ use std::any::Any;
use common_error::ext::{BoxedError, ErrorExt}; use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode; use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug; use common_macro::stack_trace_debug;
use object_store::Error as ObjectStoreError;
use snafu::{Location, Snafu}; use snafu::{Location, Snafu};
#[derive(Snafu)] #[derive(Snafu)]
@@ -226,7 +225,7 @@ pub enum Error {
#[snafu(implicit)] #[snafu(implicit)]
location: Location, location: Location,
#[snafu(source)] #[snafu(source)]
error: ObjectStoreError, error: opendal::Error,
}, },
#[snafu(display("S3 config need be set"))] #[snafu(display("S3 config need be set"))]
S3ConfigNotSet { S3ConfigNotSet {
@@ -238,12 +237,6 @@ pub enum Error {
#[snafu(implicit)] #[snafu(implicit)]
location: Location, location: Location,
}, },
#[snafu(display("KV backend not set: {}", backend))]
KvBackendNotSet {
backend: String,
#[snafu(implicit)]
location: Location,
},
} }
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
@@ -280,9 +273,8 @@ impl ErrorExt for Error {
Error::Other { source, .. } => source.status_code(), Error::Other { source, .. } => source.status_code(),
Error::OpenDal { .. } => StatusCode::Internal, Error::OpenDal { .. } => StatusCode::Internal,
Error::S3ConfigNotSet { .. } Error::S3ConfigNotSet { .. } => StatusCode::InvalidArguments,
| Error::OutputDirNotSet { .. } Error::OutputDirNotSet { .. } => StatusCode::InvalidArguments,
| Error::KvBackendNotSet { .. } => StatusCode::InvalidArguments,
Error::BuildRuntime { source, .. } => source.status_code(), Error::BuildRuntime { source, .. } => source.status_code(),

View File

@@ -19,12 +19,10 @@ use std::time::Duration;
use async_trait::async_trait; use async_trait::async_trait;
use clap::{Parser, ValueEnum}; use clap::{Parser, ValueEnum};
use common_base::secrets::{ExposeSecret, SecretString};
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
use common_telemetry::{debug, error, info}; use common_telemetry::{debug, error, info};
use object_store::layers::LoggingLayer; use opendal::layers::LoggingLayer;
use object_store::services::Oss; use opendal::{services, Operator};
use object_store::{services, ObjectStore};
use serde_json::Value; use serde_json::Value;
use snafu::{OptionExt, ResultExt}; use snafu::{OptionExt, ResultExt};
use tokio::sync::Semaphore; use tokio::sync::Semaphore;
@@ -112,15 +110,15 @@ pub struct ExportCommand {
#[clap(long)] #[clap(long)]
s3: bool, s3: bool,
/// if both `ddl_local_dir` and remote storage (s3/oss) are set, `ddl_local_dir` will be only used for /// if both `s3_ddl_local_dir` and `s3` are set, `s3_ddl_local_dir` will be only used for
/// exported SQL files, and the data will be exported to remote storage. /// exported SQL files, and the data will be exported to s3.
/// ///
/// Note that `ddl_local_dir` export sql files to **LOCAL** file system, this is useful if export client don't have /// Note that `s3_ddl_local_dir` export sql files to **LOCAL** file system, this is useful if export client don't have
/// direct access to remote storage. /// direct access to s3.
/// ///
/// if remote storage is set but `ddl_local_dir` is not set, both SQL&data will be exported to remote storage. /// if `s3` is set but `s3_ddl_local_dir` is not set, both SQL&data will be exported to s3.
#[clap(long)] #[clap(long)]
ddl_local_dir: Option<String>, s3_ddl_local_dir: Option<String>,
/// The s3 bucket name /// The s3 bucket name
/// if s3 is set, this is required /// if s3 is set, this is required
@@ -151,30 +149,6 @@ pub struct ExportCommand {
/// if s3 is set, this is required /// if s3 is set, this is required
#[clap(long)] #[clap(long)]
s3_region: Option<String>, s3_region: Option<String>,
/// if export data to oss
#[clap(long)]
oss: bool,
/// The oss bucket name
/// if oss is set, this is required
#[clap(long)]
oss_bucket: Option<String>,
/// The oss endpoint
/// if oss is set, this is required
#[clap(long)]
oss_endpoint: Option<String>,
/// The oss access key id
/// if oss is set, this is required
#[clap(long)]
oss_access_key_id: Option<String>,
/// The oss access key secret
/// if oss is set, this is required
#[clap(long)]
oss_access_key_secret: Option<String>,
} }
impl ExportCommand { impl ExportCommand {
@@ -188,7 +162,7 @@ impl ExportCommand {
{ {
return Err(BoxedError::new(S3ConfigNotSetSnafu {}.build())); return Err(BoxedError::new(S3ConfigNotSetSnafu {}.build()));
} }
if !self.s3 && !self.oss && self.output_dir.is_none() { if !self.s3 && self.output_dir.is_none() {
return Err(BoxedError::new(OutputDirNotSetSnafu {}.build())); return Err(BoxedError::new(OutputDirNotSetSnafu {}.build()));
} }
let (catalog, schema) = let (catalog, schema) =
@@ -213,32 +187,13 @@ impl ExportCommand {
start_time: self.start_time.clone(), start_time: self.start_time.clone(),
end_time: self.end_time.clone(), end_time: self.end_time.clone(),
s3: self.s3, s3: self.s3,
ddl_local_dir: self.ddl_local_dir.clone(), s3_ddl_local_dir: self.s3_ddl_local_dir.clone(),
s3_bucket: self.s3_bucket.clone(), s3_bucket: self.s3_bucket.clone(),
s3_root: self.s3_root.clone(), s3_root: self.s3_root.clone(),
s3_endpoint: self.s3_endpoint.clone(), s3_endpoint: self.s3_endpoint.clone(),
// Wrap sensitive values in SecretString s3_access_key: self.s3_access_key.clone(),
s3_access_key: self s3_secret_key: self.s3_secret_key.clone(),
.s3_access_key
.as_ref()
.map(|k| SecretString::from(k.clone())),
s3_secret_key: self
.s3_secret_key
.as_ref()
.map(|k| SecretString::from(k.clone())),
s3_region: self.s3_region.clone(), s3_region: self.s3_region.clone(),
oss: self.oss,
oss_bucket: self.oss_bucket.clone(),
oss_endpoint: self.oss_endpoint.clone(),
// Wrap sensitive values in SecretString
oss_access_key_id: self
.oss_access_key_id
.as_ref()
.map(|k| SecretString::from(k.clone())),
oss_access_key_secret: self
.oss_access_key_secret
.as_ref()
.map(|k| SecretString::from(k.clone())),
})) }))
} }
} }
@@ -254,30 +209,23 @@ pub struct Export {
start_time: Option<String>, start_time: Option<String>,
end_time: Option<String>, end_time: Option<String>,
s3: bool, s3: bool,
ddl_local_dir: Option<String>, s3_ddl_local_dir: Option<String>,
s3_bucket: Option<String>, s3_bucket: Option<String>,
s3_root: Option<String>, s3_root: Option<String>,
s3_endpoint: Option<String>, s3_endpoint: Option<String>,
// Changed to SecretString for sensitive data s3_access_key: Option<String>,
s3_access_key: Option<SecretString>, s3_secret_key: Option<String>,
s3_secret_key: Option<SecretString>,
s3_region: Option<String>, s3_region: Option<String>,
oss: bool,
oss_bucket: Option<String>,
oss_endpoint: Option<String>,
// Changed to SecretString for sensitive data
oss_access_key_id: Option<SecretString>,
oss_access_key_secret: Option<SecretString>,
} }
impl Export { impl Export {
fn catalog_path(&self) -> PathBuf { fn catalog_path(&self) -> PathBuf {
if self.s3 || self.oss { if self.s3 {
PathBuf::from(&self.catalog) PathBuf::from(&self.catalog)
} else if let Some(dir) = &self.output_dir { } else if let Some(dir) = &self.output_dir {
PathBuf::from(dir).join(&self.catalog) PathBuf::from(dir).join(&self.catalog)
} else { } else {
unreachable!("catalog_path: output_dir must be set when not using remote storage") unreachable!("catalog_path: output_dir must be set when not using s3")
} }
} }
@@ -479,7 +427,7 @@ impl Export {
.await?; .await?;
// Create directory if needed for file system storage // Create directory if needed for file system storage
if !export_self.s3 && !export_self.oss { if !export_self.s3 {
let db_dir = format!("{}/{}/", export_self.catalog, schema); let db_dir = format!("{}/{}/", export_self.catalog, schema);
operator.create_dir(&db_dir).await.context(OpenDalSnafu)?; operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
} }
@@ -522,35 +470,32 @@ impl Export {
Ok(()) Ok(())
} }
async fn build_operator(&self) -> Result<ObjectStore> { async fn build_operator(&self) -> Result<Operator> {
if self.s3 { if self.s3 {
self.build_s3_operator().await self.build_s3_operator().await
} else if self.oss {
self.build_oss_operator().await
} else { } else {
self.build_fs_operator().await self.build_fs_operator().await
} }
} }
/// build operator with preference for file system /// build operator with preference for file system
async fn build_prefer_fs_operator(&self) -> Result<ObjectStore> { async fn build_prefer_fs_operator(&self) -> Result<Operator> {
if (self.s3 || self.oss) && self.ddl_local_dir.is_some() { // is under s3 mode and s3_ddl_dir is set, use it as root
let root = self.ddl_local_dir.as_ref().unwrap().clone(); if self.s3 && self.s3_ddl_local_dir.is_some() {
let op = ObjectStore::new(services::Fs::default().root(&root)) let root = self.s3_ddl_local_dir.as_ref().unwrap().clone();
let op = Operator::new(services::Fs::default().root(&root))
.context(OpenDalSnafu)? .context(OpenDalSnafu)?
.layer(LoggingLayer::default()) .layer(LoggingLayer::default())
.finish(); .finish();
Ok(op) Ok(op)
} else if self.s3 { } else if self.s3 {
self.build_s3_operator().await self.build_s3_operator().await
} else if self.oss {
self.build_oss_operator().await
} else { } else {
self.build_fs_operator().await self.build_fs_operator().await
} }
} }
async fn build_s3_operator(&self) -> Result<ObjectStore> { async fn build_s3_operator(&self) -> Result<Operator> {
let mut builder = services::S3::default().bucket( let mut builder = services::S3::default().bucket(
self.s3_bucket self.s3_bucket
.as_ref() .as_ref()
@@ -570,51 +515,27 @@ impl Export {
} }
if let Some(key_id) = self.s3_access_key.as_ref() { if let Some(key_id) = self.s3_access_key.as_ref() {
builder = builder.access_key_id(key_id.expose_secret()); builder = builder.access_key_id(key_id);
} }
if let Some(secret_key) = self.s3_secret_key.as_ref() { if let Some(secret_key) = self.s3_secret_key.as_ref() {
builder = builder.secret_access_key(secret_key.expose_secret()); builder = builder.secret_access_key(secret_key);
} }
let op = ObjectStore::new(builder) let op = Operator::new(builder)
.context(OpenDalSnafu)? .context(OpenDalSnafu)?
.layer(LoggingLayer::default()) .layer(LoggingLayer::default())
.finish(); .finish();
Ok(op) Ok(op)
} }
async fn build_oss_operator(&self) -> Result<ObjectStore> { async fn build_fs_operator(&self) -> Result<Operator> {
let mut builder = Oss::default()
.bucket(self.oss_bucket.as_ref().expect("oss_bucket must be set"))
.endpoint(
self.oss_endpoint
.as_ref()
.expect("oss_endpoint must be set"),
);
// Use expose_secret() to access the actual secret value
if let Some(key_id) = self.oss_access_key_id.as_ref() {
builder = builder.access_key_id(key_id.expose_secret());
}
if let Some(secret_key) = self.oss_access_key_secret.as_ref() {
builder = builder.access_key_secret(secret_key.expose_secret());
}
let op = ObjectStore::new(builder)
.context(OpenDalSnafu)?
.layer(LoggingLayer::default())
.finish();
Ok(op)
}
async fn build_fs_operator(&self) -> Result<ObjectStore> {
let root = self let root = self
.output_dir .output_dir
.as_ref() .as_ref()
.context(OutputDirNotSetSnafu)? .context(OutputDirNotSetSnafu)?
.clone(); .clone();
let op = ObjectStore::new(services::Fs::default().root(&root)) let op = Operator::new(services::Fs::default().root(&root))
.context(OpenDalSnafu)? .context(OpenDalSnafu)?
.layer(LoggingLayer::default()) .layer(LoggingLayer::default())
.finish(); .finish();
@@ -641,8 +562,8 @@ impl Export {
tasks.push(async move { tasks.push(async move {
let _permit = semaphore_moved.acquire().await.unwrap(); let _permit = semaphore_moved.acquire().await.unwrap();
// Create directory if not using remote storage // Create directory if not using S3
if !export_self.s3 && !export_self.oss { if !export_self.s3 {
let db_dir = format!("{}/{}/", export_self.catalog, schema); let db_dir = format!("{}/{}/", export_self.catalog, schema);
operator.create_dir(&db_dir).await.context(OpenDalSnafu)?; operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
} }
@@ -654,11 +575,7 @@ impl Export {
r#"COPY DATABASE "{}"."{}" TO '{}' WITH ({}){};"#, r#"COPY DATABASE "{}"."{}" TO '{}' WITH ({}){};"#,
export_self.catalog, schema, path, with_options_clone, connection_part export_self.catalog, schema, path, with_options_clone, connection_part
); );
info!("Executing sql: {sql}");
// Log SQL command but mask sensitive information
let safe_sql = export_self.mask_sensitive_sql(&sql);
info!("Executing sql: {}", safe_sql);
export_self.database_client.sql_in_public(&sql).await?; export_self.database_client.sql_in_public(&sql).await?;
info!( info!(
"Finished exporting {}.{} data to {}", "Finished exporting {}.{} data to {}",
@@ -698,29 +615,6 @@ impl Export {
Ok(()) Ok(())
} }
/// Mask sensitive information in SQL commands for safe logging
fn mask_sensitive_sql(&self, sql: &str) -> String {
let mut masked_sql = sql.to_string();
// Mask S3 credentials
if let Some(access_key) = &self.s3_access_key {
masked_sql = masked_sql.replace(access_key.expose_secret(), "[REDACTED]");
}
if let Some(secret_key) = &self.s3_secret_key {
masked_sql = masked_sql.replace(secret_key.expose_secret(), "[REDACTED]");
}
// Mask OSS credentials
if let Some(access_key_id) = &self.oss_access_key_id {
masked_sql = masked_sql.replace(access_key_id.expose_secret(), "[REDACTED]");
}
if let Some(access_key_secret) = &self.oss_access_key_secret {
masked_sql = masked_sql.replace(access_key_secret.expose_secret(), "[REDACTED]");
}
masked_sql
}
fn get_file_path(&self, schema: &str, file_name: &str) -> String { fn get_file_path(&self, schema: &str, file_name: &str) -> String {
format!("{}/{}/{}", self.catalog, schema, file_name) format!("{}/{}/{}", self.catalog, schema, file_name)
} }
@@ -737,13 +631,6 @@ impl Export {
}, },
file_path file_path
) )
} else if self.oss {
format!(
"oss://{}/{}/{}",
self.oss_bucket.as_ref().unwrap_or(&String::new()),
self.catalog,
file_path
)
} else { } else {
format!( format!(
"{}/{}", "{}/{}",
@@ -755,14 +642,11 @@ impl Export {
async fn write_to_storage( async fn write_to_storage(
&self, &self,
op: &ObjectStore, op: &Operator,
file_path: &str, file_path: &str,
content: Vec<u8>, content: Vec<u8>,
) -> Result<()> { ) -> Result<()> {
op.write(file_path, content) op.write(file_path, content).await.context(OpenDalSnafu)
.await
.context(OpenDalSnafu)
.map(|_| ())
} }
fn get_storage_params(&self, schema: &str) -> (String, String) { fn get_storage_params(&self, schema: &str) -> (String, String) {
@@ -788,36 +672,15 @@ impl Export {
}; };
// Safety: All s3 options are required // Safety: All s3 options are required
// Use expose_secret() to access the actual secret values
let connection_options = format!( let connection_options = format!(
"ACCESS_KEY_ID='{}', SECRET_ACCESS_KEY='{}', REGION='{}'{}", "ACCESS_KEY_ID='{}', SECRET_ACCESS_KEY='{}', REGION='{}'{}",
self.s3_access_key.as_ref().unwrap().expose_secret(), self.s3_access_key.as_ref().unwrap(),
self.s3_secret_key.as_ref().unwrap().expose_secret(), self.s3_secret_key.as_ref().unwrap(),
self.s3_region.as_ref().unwrap(), self.s3_region.as_ref().unwrap(),
endpoint_option endpoint_option
); );
(s3_path, format!(" CONNECTION ({})", connection_options)) (s3_path, format!(" CONNECTION ({})", connection_options))
} else if self.oss {
let oss_path = format!(
"oss://{}/{}/{}/",
self.oss_bucket.as_ref().unwrap(),
self.catalog,
schema
);
let endpoint_option = if let Some(endpoint) = self.oss_endpoint.as_ref() {
format!(", ENDPOINT='{}'", endpoint)
} else {
String::new()
};
let connection_options = format!(
"ACCESS_KEY_ID='{}', ACCESS_KEY_SECRET='{}'{}",
self.oss_access_key_id.as_ref().unwrap().expose_secret(),
self.oss_access_key_secret.as_ref().unwrap().expose_secret(),
endpoint_option
);
(oss_path, format!(" CONNECTION ({})", connection_options))
} else { } else {
( (
self.catalog_path() self.catalog_path()

View File

@@ -17,7 +17,6 @@ mod database;
pub mod error; pub mod error;
mod export; mod export;
mod import; mod import;
mod meta_snapshot;
use async_trait::async_trait; use async_trait::async_trait;
use clap::Parser; use clap::Parser;
@@ -28,7 +27,6 @@ use error::Result;
pub use crate::bench::BenchTableMetadataCommand; pub use crate::bench::BenchTableMetadataCommand;
pub use crate::export::ExportCommand; pub use crate::export::ExportCommand;
pub use crate::import::ImportCommand; pub use crate::import::ImportCommand;
pub use crate::meta_snapshot::{MetaRestoreCommand, MetaSnapshotCommand};
#[async_trait] #[async_trait]
pub trait Tool: Send + Sync { pub trait Tool: Send + Sync {

View File

@@ -1,329 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use async_trait::async_trait;
use clap::Parser;
use common_base::secrets::{ExposeSecret, SecretString};
use common_error::ext::BoxedError;
use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::KvBackendRef;
use common_meta::snapshot::MetadataSnapshotManager;
use meta_srv::bootstrap::create_etcd_client;
use meta_srv::metasrv::BackendImpl;
use object_store::services::{Fs, S3};
use object_store::ObjectStore;
use snafu::ResultExt;
use crate::error::{KvBackendNotSetSnafu, OpenDalSnafu, S3ConfigNotSetSnafu};
use crate::Tool;
#[derive(Debug, Default, Parser)]
struct MetaConnection {
/// The endpoint of store. one of etcd, pg or mysql.
#[clap(long, alias = "store-addr", value_delimiter = ',', num_args = 1..)]
store_addrs: Vec<String>,
/// The database backend.
#[clap(long, value_enum)]
backend: Option<BackendImpl>,
#[clap(long, default_value = "")]
store_key_prefix: String,
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
#[clap(long,default_value = common_meta::kv_backend::DEFAULT_META_TABLE_NAME)]
meta_table_name: String,
#[clap(long, default_value = "128")]
max_txn_ops: usize,
}
impl MetaConnection {
pub async fn build(&self) -> Result<KvBackendRef, BoxedError> {
let max_txn_ops = self.max_txn_ops;
let store_addrs = &self.store_addrs;
if store_addrs.is_empty() {
KvBackendNotSetSnafu { backend: "all" }
.fail()
.map_err(BoxedError::new)
} else {
let kvbackend = match self.backend {
Some(BackendImpl::EtcdStore) => {
let etcd_client = create_etcd_client(store_addrs)
.await
.map_err(BoxedError::new)?;
Ok(EtcdStore::with_etcd_client(etcd_client, max_txn_ops))
}
#[cfg(feature = "pg_kvbackend")]
Some(BackendImpl::PostgresStore) => {
let table_name = &self.meta_table_name;
let pool = meta_srv::bootstrap::create_postgres_pool(store_addrs)
.await
.map_err(BoxedError::new)?;
Ok(common_meta::kv_backend::rds::PgStore::with_pg_pool(
pool,
table_name,
max_txn_ops,
)
.await
.map_err(BoxedError::new)?)
}
#[cfg(feature = "mysql_kvbackend")]
Some(BackendImpl::MysqlStore) => {
let table_name = &self.meta_table_name;
let pool = meta_srv::bootstrap::create_mysql_pool(store_addrs)
.await
.map_err(BoxedError::new)?;
Ok(common_meta::kv_backend::rds::MySqlStore::with_mysql_pool(
pool,
table_name,
max_txn_ops,
)
.await
.map_err(BoxedError::new)?)
}
_ => KvBackendNotSetSnafu { backend: "all" }
.fail()
.map_err(BoxedError::new),
};
if self.store_key_prefix.is_empty() {
kvbackend
} else {
let chroot_kvbackend =
ChrootKvBackend::new(self.store_key_prefix.as_bytes().to_vec(), kvbackend?);
Ok(Arc::new(chroot_kvbackend))
}
}
}
}
// TODO(qtang): Abstract a generic s3 config for export import meta snapshot restore
#[derive(Debug, Default, Parser)]
struct S3Config {
/// whether to use s3 as the output directory. default is false.
#[clap(long, default_value = "false")]
s3: bool,
/// The s3 bucket name.
#[clap(long)]
s3_bucket: Option<String>,
/// The s3 region.
#[clap(long)]
s3_region: Option<String>,
/// The s3 access key.
#[clap(long)]
s3_access_key: Option<SecretString>,
/// The s3 secret key.
#[clap(long)]
s3_secret_key: Option<SecretString>,
/// The s3 endpoint. we will automatically use the default s3 decided by the region if not set.
#[clap(long)]
s3_endpoint: Option<String>,
}
impl S3Config {
pub fn build(&self, root: &str) -> Result<Option<ObjectStore>, BoxedError> {
if !self.s3 {
Ok(None)
} else {
if self.s3_region.is_none()
|| self.s3_access_key.is_none()
|| self.s3_secret_key.is_none()
|| self.s3_bucket.is_none()
{
return S3ConfigNotSetSnafu.fail().map_err(BoxedError::new);
}
// Safety, unwrap is safe because we have checked the options above.
let mut config = S3::default()
.bucket(self.s3_bucket.as_ref().unwrap())
.region(self.s3_region.as_ref().unwrap())
.access_key_id(self.s3_access_key.as_ref().unwrap().expose_secret())
.secret_access_key(self.s3_secret_key.as_ref().unwrap().expose_secret());
if !root.is_empty() && root != "." {
config = config.root(root);
}
if let Some(endpoint) = &self.s3_endpoint {
config = config.endpoint(endpoint);
}
Ok(Some(
ObjectStore::new(config)
.context(OpenDalSnafu)
.map_err(BoxedError::new)?
.finish(),
))
}
}
}
/// Export metadata snapshot tool.
/// This tool is used to export metadata snapshot from etcd, pg or mysql.
/// It will dump the metadata snapshot to local file or s3 bucket.
/// The snapshot file will be in binary format.
#[derive(Debug, Default, Parser)]
pub struct MetaSnapshotCommand {
/// The connection to the metadata store.
#[clap(flatten)]
connection: MetaConnection,
/// The s3 config.
#[clap(flatten)]
s3_config: S3Config,
/// The name of the target snapshot file. we will add the file extension automatically.
#[clap(long, default_value = "metadata_snapshot")]
file_name: String,
/// The directory to store the snapshot file.
/// if target output is s3 bucket, this is the root directory in the bucket.
/// if target output is local file, this is the local directory.
#[clap(long, default_value = "")]
output_dir: String,
}
fn create_local_file_object_store(root: &str) -> Result<ObjectStore, BoxedError> {
let root = if root.is_empty() { "." } else { root };
let object_store = ObjectStore::new(Fs::default().root(root))
.context(OpenDalSnafu)
.map_err(BoxedError::new)?
.finish();
Ok(object_store)
}
impl MetaSnapshotCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
let kvbackend = self.connection.build().await?;
let output_dir = &self.output_dir;
let object_store = self.s3_config.build(output_dir).map_err(BoxedError::new)?;
if let Some(store) = object_store {
let tool = MetaSnapshotTool {
inner: MetadataSnapshotManager::new(kvbackend, store),
target_file: self.file_name.clone(),
};
Ok(Box::new(tool))
} else {
let object_store = create_local_file_object_store(output_dir)?;
let tool = MetaSnapshotTool {
inner: MetadataSnapshotManager::new(kvbackend, object_store),
target_file: self.file_name.clone(),
};
Ok(Box::new(tool))
}
}
}
pub struct MetaSnapshotTool {
inner: MetadataSnapshotManager,
target_file: String,
}
#[async_trait]
impl Tool for MetaSnapshotTool {
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
self.inner
.dump("", &self.target_file)
.await
.map_err(BoxedError::new)?;
Ok(())
}
}
/// Restore metadata snapshot tool.
/// This tool is used to restore metadata snapshot from etcd, pg or mysql.
/// It will restore the metadata snapshot from local file or s3 bucket.
#[derive(Debug, Default, Parser)]
pub struct MetaRestoreCommand {
/// The connection to the metadata store.
#[clap(flatten)]
connection: MetaConnection,
/// The s3 config.
#[clap(flatten)]
s3_config: S3Config,
/// The name of the target snapshot file.
#[clap(long, default_value = "metadata_snapshot.metadata.fb")]
file_name: String,
/// The directory to store the snapshot file.
#[clap(long, default_value = ".")]
input_dir: String,
#[clap(long, default_value = "false")]
force: bool,
}
impl MetaRestoreCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
let kvbackend = self.connection.build().await?;
let input_dir = &self.input_dir;
let object_store = self.s3_config.build(input_dir).map_err(BoxedError::new)?;
if let Some(store) = object_store {
let tool = MetaRestoreTool::new(
MetadataSnapshotManager::new(kvbackend, store),
self.file_name.clone(),
self.force,
);
Ok(Box::new(tool))
} else {
let object_store = create_local_file_object_store(input_dir)?;
let tool = MetaRestoreTool::new(
MetadataSnapshotManager::new(kvbackend, object_store),
self.file_name.clone(),
self.force,
);
Ok(Box::new(tool))
}
}
}
pub struct MetaRestoreTool {
inner: MetadataSnapshotManager,
source_file: String,
force: bool,
}
impl MetaRestoreTool {
pub fn new(inner: MetadataSnapshotManager, source_file: String, force: bool) -> Self {
Self {
inner,
source_file,
force,
}
}
}
#[async_trait]
impl Tool for MetaRestoreTool {
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
let clean = self
.inner
.check_target_source_clean()
.await
.map_err(BoxedError::new)?;
if clean {
common_telemetry::info!(
"The target source is clean, we will restore the metadata snapshot."
);
self.inner
.restore(&self.source_file)
.await
.map_err(BoxedError::new)?;
Ok(())
} else if !self.force {
common_telemetry::warn!(
"The target source is not clean, if you want to restore the metadata snapshot forcefully, please use --force option."
);
Ok(())
} else {
common_telemetry::info!("The target source is not clean, We will restore the metadata snapshot with --force.");
self.inner
.restore(&self.source_file)
.await
.map_err(BoxedError::new)?;
Ok(())
}
}
}

View File

@@ -25,7 +25,6 @@ common-meta.workspace = true
common-query.workspace = true common-query.workspace = true
common-recordbatch.workspace = true common-recordbatch.workspace = true
common-telemetry.workspace = true common-telemetry.workspace = true
datatypes.workspace = true
enum_dispatch = "0.3" enum_dispatch = "0.3"
futures.workspace = true futures.workspace = true
futures-util.workspace = true futures-util.workspace = true

View File

@@ -167,7 +167,9 @@ impl Client {
let client = FlightServiceClient::new(channel) let client = FlightServiceClient::new(channel)
.max_decoding_message_size(self.max_grpc_recv_message_size()) .max_decoding_message_size(self.max_grpc_recv_message_size())
.max_encoding_message_size(self.max_grpc_send_message_size()); .max_encoding_message_size(self.max_grpc_send_message_size())
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Zstd);
Ok(FlightClient { addr, client }) Ok(FlightClient { addr, client })
} }
@@ -176,7 +178,9 @@ impl Client {
let (addr, channel) = self.find_channel()?; let (addr, channel) = self.find_channel()?;
let client = PbRegionClient::new(channel) let client = PbRegionClient::new(channel)
.max_decoding_message_size(self.max_grpc_recv_message_size()) .max_decoding_message_size(self.max_grpc_recv_message_size())
.max_encoding_message_size(self.max_grpc_send_message_size()); .max_encoding_message_size(self.max_grpc_send_message_size())
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Zstd);
Ok((addr, client)) Ok((addr, client))
} }

View File

@@ -14,7 +14,6 @@
use std::pin::Pin; use std::pin::Pin;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc;
use api::v1::auth_header::AuthScheme; use api::v1::auth_header::AuthScheme;
use api::v1::ddl_request::Expr as DdlExpr; use api::v1::ddl_request::Expr as DdlExpr;
@@ -36,7 +35,7 @@ use common_grpc::flight::do_put::DoPutResponse;
use common_grpc::flight::{FlightDecoder, FlightMessage}; use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_query::Output; use common_query::Output;
use common_recordbatch::error::ExternalSnafu; use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper}; use common_recordbatch::RecordBatchStreamWrapper;
use common_telemetry::tracing_context::W3cTrace; use common_telemetry::tracing_context::W3cTrace;
use common_telemetry::{error, warn}; use common_telemetry::{error, warn};
use futures::future; use futures::future;
@@ -50,7 +49,7 @@ use crate::error::{
ConvertFlightDataSnafu, Error, FlightGetSnafu, IllegalFlightMessagesSnafu, ConvertFlightDataSnafu, Error, FlightGetSnafu, IllegalFlightMessagesSnafu,
InvalidTonicMetadataValueSnafu, ServerSnafu, InvalidTonicMetadataValueSnafu, ServerSnafu,
}; };
use crate::{error, from_grpc_response, Client, Result}; use crate::{from_grpc_response, Client, Result};
type FlightDataStream = Pin<Box<dyn Stream<Item = FlightData> + Send>>; type FlightDataStream = Pin<Box<dyn Stream<Item = FlightData> + Send>>;
@@ -338,30 +337,20 @@ impl Database {
); );
Ok(Output::new_with_affected_rows(rows)) Ok(Output::new_with_affected_rows(rows))
} }
FlightMessage::RecordBatch(_) | FlightMessage::Metrics(_) => { FlightMessage::Recordbatch(_) | FlightMessage::Metrics(_) => {
IllegalFlightMessagesSnafu { IllegalFlightMessagesSnafu {
reason: "The first flight message cannot be a RecordBatch or Metrics message", reason: "The first flight message cannot be a RecordBatch or Metrics message",
} }
.fail() .fail()
} }
FlightMessage::Schema(schema) => { FlightMessage::Schema(schema) => {
let schema = Arc::new(
datatypes::schema::Schema::try_from(schema)
.context(error::ConvertSchemaSnafu)?,
);
let schema_cloned = schema.clone();
let stream = Box::pin(stream!({ let stream = Box::pin(stream!({
while let Some(flight_message) = flight_message_stream.next().await { while let Some(flight_message) = flight_message_stream.next().await {
let flight_message = flight_message let flight_message = flight_message
.map_err(BoxedError::new) .map_err(BoxedError::new)
.context(ExternalSnafu)?; .context(ExternalSnafu)?;
match flight_message { match flight_message {
FlightMessage::RecordBatch(arrow_batch) => { FlightMessage::Recordbatch(record_batch) => yield Ok(record_batch),
yield RecordBatch::try_from_df_record_batch(
schema_cloned.clone(),
arrow_batch,
)
}
FlightMessage::Metrics(_) => {} FlightMessage::Metrics(_) => {}
FlightMessage::AffectedRows(_) | FlightMessage::Schema(_) => { FlightMessage::AffectedRows(_) | FlightMessage::Schema(_) => {
yield IllegalFlightMessagesSnafu {reason: format!("A Schema message must be succeeded exclusively by a set of RecordBatch messages, flight_message: {:?}", flight_message)} yield IllegalFlightMessagesSnafu {reason: format!("A Schema message must be succeeded exclusively by a set of RecordBatch messages, flight_message: {:?}", flight_message)}

View File

@@ -117,13 +117,6 @@ pub enum Error {
#[snafu(implicit)] #[snafu(implicit)]
location: Location, location: Location,
}, },
#[snafu(display("Failed to convert Schema"))]
ConvertSchema {
#[snafu(implicit)]
location: Location,
source: datatypes::error::Error,
},
} }
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
@@ -144,7 +137,6 @@ impl ErrorExt for Error {
| Error::CreateTlsChannel { source, .. } => source.status_code(), | Error::CreateTlsChannel { source, .. } => source.status_code(),
Error::IllegalGrpcClientState { .. } => StatusCode::Unexpected, Error::IllegalGrpcClientState { .. } => StatusCode::Unexpected,
Error::InvalidTonicMetadataValue { .. } => StatusCode::InvalidArguments, Error::InvalidTonicMetadataValue { .. } => StatusCode::InvalidArguments,
Error::ConvertSchema { source, .. } => source.status_code(),
} }
} }

View File

@@ -28,7 +28,7 @@ use common_meta::error::{self as meta_error, Result as MetaResult};
use common_meta::node_manager::Datanode; use common_meta::node_manager::Datanode;
use common_query::request::QueryRequest; use common_query::request::QueryRequest;
use common_recordbatch::error::ExternalSnafu; use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream}; use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_telemetry::error; use common_telemetry::error;
use common_telemetry::tracing_context::TracingContext; use common_telemetry::tracing_context::TracingContext;
use prost::Message; use prost::Message;
@@ -55,7 +55,6 @@ impl Datanode for RegionRequester {
if err.should_retry() { if err.should_retry() {
meta_error::Error::RetryLater { meta_error::Error::RetryLater {
source: BoxedError::new(err), source: BoxedError::new(err),
clean_poisons: false,
} }
} else { } else {
meta_error::Error::External { meta_error::Error::External {
@@ -147,10 +146,6 @@ impl RegionRequester {
let tracing_context = TracingContext::from_current_span(); let tracing_context = TracingContext::from_current_span();
let schema = Arc::new(
datatypes::schema::Schema::try_from(schema).context(error::ConvertSchemaSnafu)?,
);
let schema_cloned = schema.clone();
let stream = Box::pin(stream!({ let stream = Box::pin(stream!({
let _span = tracing_context.attach(common_telemetry::tracing::info_span!( let _span = tracing_context.attach(common_telemetry::tracing::info_span!(
"poll_flight_data_stream" "poll_flight_data_stream"
@@ -161,12 +156,7 @@ impl RegionRequester {
.context(ExternalSnafu)?; .context(ExternalSnafu)?;
match flight_message { match flight_message {
FlightMessage::RecordBatch(record_batch) => { FlightMessage::Recordbatch(record_batch) => yield Ok(record_batch),
yield RecordBatch::try_from_df_record_batch(
schema_cloned.clone(),
record_batch,
)
}
FlightMessage::Metrics(s) => { FlightMessage::Metrics(s) => {
let m = serde_json::from_str(&s).ok().map(Arc::new); let m = serde_json::from_str(&s).ok().map(Arc::new);
metrics_ref.swap(m); metrics_ref.swap(m);

View File

@@ -10,13 +10,7 @@ name = "greptime"
path = "src/bin/greptime.rs" path = "src/bin/greptime.rs"
[features] [features]
default = [ default = ["servers/pprof", "servers/mem-prof"]
"servers/pprof",
"servers/mem-prof",
"meta-srv/pg_kvbackend",
"meta-srv/mysql_kvbackend",
]
enterprise = ["common-meta/enterprise", "frontend/enterprise", "meta-srv/enterprise"]
tokio-console = ["common-telemetry/tokio-console"] tokio-console = ["common-telemetry/tokio-console"]
[lints] [lints]
@@ -80,7 +74,6 @@ servers.workspace = true
session.workspace = true session.workspace = true
similar-asserts.workspace = true similar-asserts.workspace = true
snafu.workspace = true snafu.workspace = true
stat.workspace = true
store-api.workspace = true store-api.workspace = true
substrait.workspace = true substrait.workspace = true
table.workspace = true table.workspace = true

View File

@@ -14,13 +14,12 @@
pub mod builder; pub mod builder;
use std::path::Path;
use std::time::Duration; use std::time::Duration;
use async_trait::async_trait; use async_trait::async_trait;
use clap::Parser; use clap::Parser;
use common_config::Configurable; use common_config::Configurable;
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR}; use common_telemetry::logging::TracingOptions;
use common_telemetry::{info, warn}; use common_telemetry::{info, warn};
use common_wal::config::DatanodeWalConfig; use common_wal::config::DatanodeWalConfig;
use datanode::datanode::Datanode; use datanode::datanode::Datanode;
@@ -249,14 +248,6 @@ impl StartCommand {
raft_engine_config.dir.replace(wal_dir.clone()); raft_engine_config.dir.replace(wal_dir.clone());
} }
// If the logging dir is not set, use the default logs dir in the data home.
if opts.logging.dir.is_empty() {
opts.logging.dir = Path::new(&opts.storage.data_home)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string();
}
if let Some(http_addr) = &self.http_addr { if let Some(http_addr) = &self.http_addr {
opts.http.addr.clone_from(http_addr); opts.http.addr.clone_from(http_addr);
} }

View File

@@ -28,7 +28,7 @@ use tracing_appender::non_blocking::WorkerGuard;
use crate::datanode::{DatanodeOptions, Instance, APP_NAME}; use crate::datanode::{DatanodeOptions, Instance, APP_NAME};
use crate::error::{MetaClientInitSnafu, MissingConfigSnafu, Result, StartDatanodeSnafu}; use crate::error::{MetaClientInitSnafu, MissingConfigSnafu, Result, StartDatanodeSnafu};
use crate::{create_resource_limit_metrics, log_versions}; use crate::log_versions;
/// Builder for Datanode instance. /// Builder for Datanode instance.
pub struct InstanceBuilder { pub struct InstanceBuilder {
@@ -68,7 +68,6 @@ impl InstanceBuilder {
); );
log_versions(version(), short_version(), APP_NAME); log_versions(version(), short_version(), APP_NAME);
create_resource_limit_metrics(APP_NAME);
plugins::setup_datanode_plugins(plugins, &opts.plugins, dn_opts) plugins::setup_datanode_plugins(plugins, &opts.plugins, dn_opts)
.await .await

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@@ -22,7 +21,7 @@ use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKv
use clap::Parser; use clap::Parser;
use client::client_manager::NodeClients; use client::client_manager::NodeClients;
use common_base::Plugins; use common_base::Plugins;
use common_config::{Configurable, DEFAULT_DATA_HOME}; use common_config::Configurable;
use common_grpc::channel_manager::ChannelConfig; use common_grpc::channel_manager::ChannelConfig;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder}; use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler; use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
@@ -31,7 +30,7 @@ use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::key::flow::FlowMetadataManager; use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::TableMetadataManager; use common_meta::key::TableMetadataManager;
use common_telemetry::info; use common_telemetry::info;
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR}; use common_telemetry::logging::TracingOptions;
use common_version::{short_version, version}; use common_version::{short_version, version};
use flow::{ use flow::{
get_flow_auth_options, FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder, get_flow_auth_options, FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder,
@@ -46,7 +45,7 @@ use crate::error::{
MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu, MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
}; };
use crate::options::{GlobalOptions, GreptimeOptions}; use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{create_resource_limit_metrics, log_versions, App}; use crate::{log_versions, App};
pub const APP_NAME: &str = "greptime-flownode"; pub const APP_NAME: &str = "greptime-flownode";
@@ -187,14 +186,6 @@ impl StartCommand {
opts.logging.dir.clone_from(dir); opts.logging.dir.clone_from(dir);
} }
// If the logging dir is not set, use the default logs dir in the data home.
if opts.logging.dir.is_empty() {
opts.logging.dir = Path::new(DEFAULT_DATA_HOME)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string();
}
if global_options.log_level.is_some() { if global_options.log_level.is_some() {
opts.logging.level.clone_from(&global_options.log_level); opts.logging.level.clone_from(&global_options.log_level);
} }
@@ -255,9 +246,7 @@ impl StartCommand {
opts.component.node_id.map(|x| x.to_string()), opts.component.node_id.map(|x| x.to_string()),
None, None,
); );
log_versions(version(), short_version(), APP_NAME); log_versions(version(), short_version(), APP_NAME);
create_resource_limit_metrics(APP_NAME);
info!("Flownode start command: {:#?}", self); info!("Flownode start command: {:#?}", self);
info!("Flownode options: {:#?}", opts); info!("Flownode options: {:#?}", opts);

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@@ -23,14 +22,14 @@ use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKv
use clap::Parser; use clap::Parser;
use client::client_manager::NodeClients; use client::client_manager::NodeClients;
use common_base::Plugins; use common_base::Plugins;
use common_config::{Configurable, DEFAULT_DATA_HOME}; use common_config::Configurable;
use common_grpc::channel_manager::ChannelConfig; use common_grpc::channel_manager::ChannelConfig;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder}; use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler; use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor; use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_telemetry::info; use common_telemetry::info;
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR}; use common_telemetry::logging::TracingOptions;
use common_time::timezone::set_default_timezone; use common_time::timezone::set_default_timezone;
use common_version::{short_version, version}; use common_version::{short_version, version};
use frontend::frontend::Frontend; use frontend::frontend::Frontend;
@@ -45,7 +44,7 @@ use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{self, Result}; use crate::error::{self, Result};
use crate::options::{GlobalOptions, GreptimeOptions}; use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{create_resource_limit_metrics, log_versions, App}; use crate::{log_versions, App};
type FrontendOptions = GreptimeOptions<frontend::frontend::FrontendOptions>; type FrontendOptions = GreptimeOptions<frontend::frontend::FrontendOptions>;
@@ -195,14 +194,6 @@ impl StartCommand {
opts.logging.dir.clone_from(dir); opts.logging.dir.clone_from(dir);
} }
// If the logging dir is not set, use the default logs dir in the data home.
if opts.logging.dir.is_empty() {
opts.logging.dir = Path::new(DEFAULT_DATA_HOME)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string();
}
if global_options.log_level.is_some() { if global_options.log_level.is_some() {
opts.logging.level.clone_from(&global_options.log_level); opts.logging.level.clone_from(&global_options.log_level);
} }
@@ -279,9 +270,7 @@ impl StartCommand {
opts.component.node_id.clone(), opts.component.node_id.clone(),
opts.component.slow_query.as_ref(), opts.component.slow_query.as_ref(),
); );
log_versions(version(), short_version(), APP_NAME); log_versions(version(), short_version(), APP_NAME);
create_resource_limit_metrics(APP_NAME);
info!("Frontend start command: {:#?}", self); info!("Frontend start command: {:#?}", self);
info!("Frontend options: {:#?}", opts); info!("Frontend options: {:#?}", opts);

View File

@@ -16,7 +16,6 @@
use async_trait::async_trait; use async_trait::async_trait;
use common_telemetry::{error, info}; use common_telemetry::{error, info};
use stat::{get_cpu_limit, get_memory_limit};
use crate::error::Result; use crate::error::Result;
@@ -32,12 +31,6 @@ pub mod standalone;
lazy_static::lazy_static! { lazy_static::lazy_static! {
static ref APP_VERSION: prometheus::IntGaugeVec = static ref APP_VERSION: prometheus::IntGaugeVec =
prometheus::register_int_gauge_vec!("greptime_app_version", "app version", &["version", "short_version", "app"]).unwrap(); prometheus::register_int_gauge_vec!("greptime_app_version", "app version", &["version", "short_version", "app"]).unwrap();
static ref CPU_LIMIT: prometheus::IntGaugeVec =
prometheus::register_int_gauge_vec!("greptime_cpu_limit_in_millicores", "cpu limit in millicores", &["app"]).unwrap();
static ref MEMORY_LIMIT: prometheus::IntGaugeVec =
prometheus::register_int_gauge_vec!("greptime_memory_limit_in_bytes", "memory limit in bytes", &["app"]).unwrap();
} }
/// wait for the close signal, for unix platform it's SIGINT or SIGTERM /// wait for the close signal, for unix platform it's SIGINT or SIGTERM
@@ -121,24 +114,6 @@ pub fn log_versions(version: &str, short_version: &str, app: &str) {
log_env_flags(); log_env_flags();
} }
pub fn create_resource_limit_metrics(app: &str) {
if let Some(cpu_limit) = get_cpu_limit() {
info!(
"GreptimeDB start with cpu limit in millicores: {}",
cpu_limit
);
CPU_LIMIT.with_label_values(&[app]).set(cpu_limit);
}
if let Some(memory_limit) = get_memory_limit() {
info!(
"GreptimeDB start with memory limit in bytes: {}",
memory_limit
);
MEMORY_LIMIT.with_label_values(&[app]).set(memory_limit);
}
}
fn log_env_flags() { fn log_env_flags() {
info!("command line arguments"); info!("command line arguments");
for argument in std::env::args() { for argument in std::env::args() {

View File

@@ -13,7 +13,6 @@
// limitations under the License. // limitations under the License.
use std::fmt; use std::fmt;
use std::path::Path;
use std::time::Duration; use std::time::Duration;
use async_trait::async_trait; use async_trait::async_trait;
@@ -21,7 +20,7 @@ use clap::Parser;
use common_base::Plugins; use common_base::Plugins;
use common_config::Configurable; use common_config::Configurable;
use common_telemetry::info; use common_telemetry::info;
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR}; use common_telemetry::logging::TracingOptions;
use common_version::{short_version, version}; use common_version::{short_version, version};
use meta_srv::bootstrap::MetasrvInstance; use meta_srv::bootstrap::MetasrvInstance;
use meta_srv::metasrv::BackendImpl; use meta_srv::metasrv::BackendImpl;
@@ -30,7 +29,7 @@ use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{self, LoadLayeredConfigSnafu, Result, StartMetaServerSnafu}; use crate::error::{self, LoadLayeredConfigSnafu, Result, StartMetaServerSnafu};
use crate::options::{GlobalOptions, GreptimeOptions}; use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{create_resource_limit_metrics, log_versions, App}; use crate::{log_versions, App};
type MetasrvOptions = GreptimeOptions<meta_srv::metasrv::MetasrvOptions>; type MetasrvOptions = GreptimeOptions<meta_srv::metasrv::MetasrvOptions>;
@@ -237,20 +236,12 @@ impl StartCommand {
tokio_console_addr: global_options.tokio_console_addr.clone(), tokio_console_addr: global_options.tokio_console_addr.clone(),
}; };
#[allow(deprecated)]
if let Some(addr) = &self.rpc_bind_addr { if let Some(addr) = &self.rpc_bind_addr {
opts.bind_addr.clone_from(addr); opts.bind_addr.clone_from(addr);
opts.grpc.bind_addr.clone_from(addr);
} else if !opts.bind_addr.is_empty() {
opts.grpc.bind_addr.clone_from(&opts.bind_addr);
} }
#[allow(deprecated)]
if let Some(addr) = &self.rpc_server_addr { if let Some(addr) = &self.rpc_server_addr {
opts.server_addr.clone_from(addr); opts.server_addr.clone_from(addr);
opts.grpc.server_addr.clone_from(addr);
} else if !opts.server_addr.is_empty() {
opts.grpc.server_addr.clone_from(&opts.server_addr);
} }
if let Some(addrs) = &self.store_addrs { if let Some(addrs) = &self.store_addrs {
@@ -283,14 +274,6 @@ impl StartCommand {
opts.data_home.clone_from(data_home); opts.data_home.clone_from(data_home);
} }
// If the logging dir is not set, use the default logs dir in the data home.
if opts.logging.dir.is_empty() {
opts.logging.dir = Path::new(&opts.data_home)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string();
}
if !self.store_key_prefix.is_empty() { if !self.store_key_prefix.is_empty() {
opts.store_key_prefix.clone_from(&self.store_key_prefix) opts.store_key_prefix.clone_from(&self.store_key_prefix)
} }
@@ -319,15 +302,13 @@ impl StartCommand {
None, None,
None, None,
); );
log_versions(version(), short_version(), APP_NAME); log_versions(version(), short_version(), APP_NAME);
create_resource_limit_metrics(APP_NAME);
info!("Metasrv start command: {:#?}", self); info!("Metasrv start command: {:#?}", self);
let plugin_opts = opts.plugins; let plugin_opts = opts.plugins;
let mut opts = opts.component; let mut opts = opts.component;
opts.grpc.detect_server_addr(); opts.detect_server_addr();
info!("Metasrv options: {:#?}", opts); info!("Metasrv options: {:#?}", opts);
@@ -371,7 +352,7 @@ mod tests {
}; };
let options = cmd.load_options(&Default::default()).unwrap().component; let options = cmd.load_options(&Default::default()).unwrap().component;
assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr); assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
assert_eq!(vec!["127.0.0.1:2380".to_string()], options.store_addrs); assert_eq!(vec!["127.0.0.1:2380".to_string()], options.store_addrs);
assert_eq!(SelectorType::LoadBased, options.selector); assert_eq!(SelectorType::LoadBased, options.selector);
} }
@@ -404,8 +385,8 @@ mod tests {
}; };
let options = cmd.load_options(&Default::default()).unwrap().component; let options = cmd.load_options(&Default::default()).unwrap().component;
assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr); assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
assert_eq!("127.0.0.1:3002".to_string(), options.grpc.server_addr); assert_eq!("127.0.0.1:3002".to_string(), options.server_addr);
assert_eq!(vec!["127.0.0.1:2379".to_string()], options.store_addrs); assert_eq!(vec!["127.0.0.1:2379".to_string()], options.store_addrs);
assert_eq!(SelectorType::LeaseBased, options.selector); assert_eq!(SelectorType::LeaseBased, options.selector);
assert_eq!("debug", options.logging.level.as_ref().unwrap()); assert_eq!("debug", options.logging.level.as_ref().unwrap());
@@ -517,10 +498,10 @@ mod tests {
let opts = command.load_options(&Default::default()).unwrap().component; let opts = command.load_options(&Default::default()).unwrap().component;
// Should be read from env, env > default values. // Should be read from env, env > default values.
assert_eq!(opts.grpc.bind_addr, "127.0.0.1:14002"); assert_eq!(opts.bind_addr, "127.0.0.1:14002");
// Should be read from config file, config file > env > default values. // Should be read from config file, config file > env > default values.
assert_eq!(opts.grpc.server_addr, "127.0.0.1:3002"); assert_eq!(opts.server_addr, "127.0.0.1:3002");
// Should be read from cli, cli > config file > env > default values. // Should be read from cli, cli > config file > env > default values.
assert_eq!(opts.http.addr, "127.0.0.1:14000"); assert_eq!(opts.http.addr, "127.0.0.1:14000");

View File

@@ -13,7 +13,6 @@
// limitations under the License. // limitations under the License.
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use std::{fs, path}; use std::{fs, path};
@@ -36,8 +35,6 @@ use common_meta::ddl::flow_meta::{FlowMetadataAllocator, FlowMetadataAllocatorRe
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl, ProcedureExecutorRef}; use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl, ProcedureExecutorRef};
use common_meta::ddl_manager::DdlManager; use common_meta::ddl_manager::DdlManager;
#[cfg(feature = "enterprise")]
use common_meta::ddl_manager::TriggerDdlManagerRef;
use common_meta::key::flow::flow_state::FlowStat; use common_meta::key::flow::flow_state::FlowStat;
use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef}; use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
@@ -50,9 +47,7 @@ use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::{build_wal_options_allocator, WalOptionsAllocatorRef}; use common_meta::wal_options_allocator::{build_wal_options_allocator, WalOptionsAllocatorRef};
use common_procedure::{ProcedureInfo, ProcedureManagerRef}; use common_procedure::{ProcedureInfo, ProcedureManagerRef};
use common_telemetry::info; use common_telemetry::info;
use common_telemetry::logging::{ use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions};
LoggingOptions, SlowQueryOptions, TracingOptions, DEFAULT_LOGGING_DIR,
};
use common_time::timezone::set_default_timezone; use common_time::timezone::set_default_timezone;
use common_version::{short_version, version}; use common_version::{short_version, version};
use common_wal::config::DatanodeWalConfig; use common_wal::config::DatanodeWalConfig;
@@ -74,7 +69,6 @@ use frontend::service_config::{
}; };
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ}; use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
use mito2::config::MitoConfig; use mito2::config::MitoConfig;
use query::options::QueryOptions;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask}; use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
use servers::grpc::GrpcOptions; use servers::grpc::GrpcOptions;
@@ -86,7 +80,7 @@ use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{Result, StartFlownodeSnafu}; use crate::error::{Result, StartFlownodeSnafu};
use crate::options::{GlobalOptions, GreptimeOptions}; use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{create_resource_limit_metrics, error, log_versions, App}; use crate::{error, log_versions, App};
pub const APP_NAME: &str = "greptime-standalone"; pub const APP_NAME: &str = "greptime-standalone";
@@ -159,7 +153,6 @@ pub struct StandaloneOptions {
pub init_regions_parallelism: usize, pub init_regions_parallelism: usize,
pub max_in_flight_write_bytes: Option<ReadableSize>, pub max_in_flight_write_bytes: Option<ReadableSize>,
pub slow_query: Option<SlowQueryOptions>, pub slow_query: Option<SlowQueryOptions>,
pub query: QueryOptions,
} }
impl Default for StandaloneOptions { impl Default for StandaloneOptions {
@@ -192,7 +185,6 @@ impl Default for StandaloneOptions {
init_regions_parallelism: 16, init_regions_parallelism: 16,
max_in_flight_write_bytes: None, max_in_flight_write_bytes: None,
slow_query: Some(SlowQueryOptions::default()), slow_query: Some(SlowQueryOptions::default()),
query: QueryOptions::default(),
} }
} }
} }
@@ -248,7 +240,6 @@ impl StandaloneOptions {
grpc: cloned_opts.grpc, grpc: cloned_opts.grpc,
init_regions_in_background: cloned_opts.init_regions_in_background, init_regions_in_background: cloned_opts.init_regions_in_background,
init_regions_parallelism: cloned_opts.init_regions_parallelism, init_regions_parallelism: cloned_opts.init_regions_parallelism,
query: cloned_opts.query,
..Default::default() ..Default::default()
} }
} }
@@ -410,14 +401,6 @@ impl StartCommand {
opts.storage.data_home.clone_from(data_home); opts.storage.data_home.clone_from(data_home);
} }
// If the logging dir is not set, use the default logs dir in the data home.
if opts.logging.dir.is_empty() {
opts.logging.dir = Path::new(&opts.storage.data_home)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string();
}
if let Some(addr) = &self.rpc_bind_addr { if let Some(addr) = &self.rpc_bind_addr {
// frontend grpc addr conflict with datanode default grpc addr // frontend grpc addr conflict with datanode default grpc addr
let datanode_grpc_addr = DatanodeOptions::default().grpc.bind_addr; let datanode_grpc_addr = DatanodeOptions::default().grpc.bind_addr;
@@ -468,9 +451,7 @@ impl StartCommand {
None, None,
opts.component.slow_query.as_ref(), opts.component.slow_query.as_ref(),
); );
log_versions(version(), short_version(), APP_NAME); log_versions(version(), short_version(), APP_NAME);
create_resource_limit_metrics(APP_NAME);
info!("Standalone start command: {:#?}", self); info!("Standalone start command: {:#?}", self);
info!("Standalone options: {opts:#?}"); info!("Standalone options: {opts:#?}");
@@ -598,8 +579,6 @@ impl StartCommand {
flow_id_sequence, flow_id_sequence,
)); ));
#[cfg(feature = "enterprise")]
let trigger_ddl_manager: Option<TriggerDdlManagerRef> = plugins.get();
let ddl_task_executor = Self::create_ddl_task_executor( let ddl_task_executor = Self::create_ddl_task_executor(
procedure_manager.clone(), procedure_manager.clone(),
node_manager.clone(), node_manager.clone(),
@@ -608,8 +587,6 @@ impl StartCommand {
table_meta_allocator, table_meta_allocator,
flow_metadata_manager, flow_metadata_manager,
flow_meta_allocator, flow_meta_allocator,
#[cfg(feature = "enterprise")]
trigger_ddl_manager,
) )
.await?; .await?;
@@ -674,7 +651,6 @@ impl StartCommand {
}) })
} }
#[allow(clippy::too_many_arguments)]
pub async fn create_ddl_task_executor( pub async fn create_ddl_task_executor(
procedure_manager: ProcedureManagerRef, procedure_manager: ProcedureManagerRef,
node_manager: NodeManagerRef, node_manager: NodeManagerRef,
@@ -683,7 +659,6 @@ impl StartCommand {
table_metadata_allocator: TableMetadataAllocatorRef, table_metadata_allocator: TableMetadataAllocatorRef,
flow_metadata_manager: FlowMetadataManagerRef, flow_metadata_manager: FlowMetadataManagerRef,
flow_metadata_allocator: FlowMetadataAllocatorRef, flow_metadata_allocator: FlowMetadataAllocatorRef,
#[cfg(feature = "enterprise")] trigger_ddl_manager: Option<TriggerDdlManagerRef>,
) -> Result<ProcedureExecutorRef> { ) -> Result<ProcedureExecutorRef> {
let procedure_executor: ProcedureExecutorRef = Arc::new( let procedure_executor: ProcedureExecutorRef = Arc::new(
DdlManager::try_new( DdlManager::try_new(
@@ -700,8 +675,6 @@ impl StartCommand {
}, },
procedure_manager, procedure_manager,
true, true,
#[cfg(feature = "enterprise")]
trigger_ddl_manager,
) )
.context(error::InitDdlManagerSnafu)?, .context(error::InitDdlManagerSnafu)?,
); );

View File

@@ -12,14 +12,13 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::path::Path;
use std::time::Duration; use std::time::Duration;
use cmd::options::GreptimeOptions; use cmd::options::GreptimeOptions;
use cmd::standalone::StandaloneOptions; use cmd::standalone::StandaloneOptions;
use common_config::{Configurable, DEFAULT_DATA_HOME}; use common_config::Configurable;
use common_options::datanode::{ClientOptions, DatanodeClientOptions}; use common_options::datanode::{ClientOptions, DatanodeClientOptions};
use common_telemetry::logging::{LoggingOptions, DEFAULT_LOGGING_DIR, DEFAULT_OTLP_ENDPOINT}; use common_telemetry::logging::{LoggingOptions, DEFAULT_OTLP_ENDPOINT};
use common_wal::config::raft_engine::RaftEngineConfig; use common_wal::config::raft_engine::RaftEngineConfig;
use common_wal::config::DatanodeWalConfig; use common_wal::config::DatanodeWalConfig;
use datanode::config::{DatanodeOptions, RegionEngineConfig, StorageConfig}; use datanode::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
@@ -33,7 +32,6 @@ use mito2::config::MitoConfig;
use servers::export_metrics::ExportMetricsOption; use servers::export_metrics::ExportMetricsOption;
use servers::grpc::GrpcOptions; use servers::grpc::GrpcOptions;
use servers::http::HttpOptions; use servers::http::HttpOptions;
use store_api::path_utils::WAL_DIR;
#[allow(deprecated)] #[allow(deprecated)]
#[test] #[test]
@@ -58,18 +56,13 @@ fn test_load_datanode_example_config() {
metadata_cache_tti: Duration::from_secs(300), metadata_cache_tti: Duration::from_secs(300),
}), }),
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig { wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
dir: Some( dir: Some("./greptimedb_data/wal".to_string()),
Path::new(DEFAULT_DATA_HOME)
.join(WAL_DIR)
.to_string_lossy()
.to_string(),
),
sync_period: Some(Duration::from_secs(10)), sync_period: Some(Duration::from_secs(10)),
recovery_parallelism: 2, recovery_parallelism: 2,
..Default::default() ..Default::default()
}), }),
storage: StorageConfig { storage: StorageConfig {
data_home: DEFAULT_DATA_HOME.to_string(), data_home: "./greptimedb_data/".to_string(),
..Default::default() ..Default::default()
}, },
region_engine: vec![ region_engine: vec![
@@ -86,16 +79,12 @@ fn test_load_datanode_example_config() {
], ],
logging: LoggingOptions { logging: LoggingOptions {
level: Some("info".to_string()), level: Some("info".to_string()),
dir: Path::new(DEFAULT_DATA_HOME)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string(),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()), otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()), tracing_sample_ratio: Some(Default::default()),
..Default::default() ..Default::default()
}, },
export_metrics: ExportMetricsOption { export_metrics: ExportMetricsOption {
self_import: None, self_import: Some(Default::default()),
remote_write: Some(Default::default()), remote_write: Some(Default::default()),
..Default::default() ..Default::default()
}, },
@@ -132,10 +121,6 @@ fn test_load_frontend_example_config() {
}), }),
logging: LoggingOptions { logging: LoggingOptions {
level: Some("info".to_string()), level: Some("info".to_string()),
dir: Path::new(DEFAULT_DATA_HOME)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string(),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()), otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()), tracing_sample_ratio: Some(Default::default()),
..Default::default() ..Default::default()
@@ -148,7 +133,7 @@ fn test_load_frontend_example_config() {
}, },
}, },
export_metrics: ExportMetricsOption { export_metrics: ExportMetricsOption {
self_import: None, self_import: Some(Default::default()),
remote_write: Some(Default::default()), remote_write: Some(Default::default()),
..Default::default() ..Default::default()
}, },
@@ -175,17 +160,10 @@ fn test_load_metasrv_example_config() {
let expected = GreptimeOptions::<MetasrvOptions> { let expected = GreptimeOptions::<MetasrvOptions> {
component: MetasrvOptions { component: MetasrvOptions {
selector: SelectorType::default(), selector: SelectorType::default(),
data_home: DEFAULT_DATA_HOME.to_string(), data_home: "./greptimedb_data/metasrv/".to_string(),
grpc: GrpcOptions { server_addr: "127.0.0.1:3002".to_string(),
bind_addr: "127.0.0.1:3002".to_string(),
server_addr: "127.0.0.1:3002".to_string(),
..Default::default()
},
logging: LoggingOptions { logging: LoggingOptions {
dir: Path::new(DEFAULT_DATA_HOME) dir: "./greptimedb_data/logs".to_string(),
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string(),
level: Some("info".to_string()), level: Some("info".to_string()),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()), otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()), tracing_sample_ratio: Some(Default::default()),
@@ -199,7 +177,7 @@ fn test_load_metasrv_example_config() {
}, },
}, },
export_metrics: ExportMetricsOption { export_metrics: ExportMetricsOption {
self_import: None, self_import: Some(Default::default()),
remote_write: Some(Default::default()), remote_write: Some(Default::default()),
..Default::default() ..Default::default()
}, },
@@ -220,12 +198,7 @@ fn test_load_standalone_example_config() {
component: StandaloneOptions { component: StandaloneOptions {
default_timezone: Some("UTC".to_string()), default_timezone: Some("UTC".to_string()),
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig { wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
dir: Some( dir: Some("./greptimedb_data/wal".to_string()),
Path::new(DEFAULT_DATA_HOME)
.join(WAL_DIR)
.to_string_lossy()
.to_string(),
),
sync_period: Some(Duration::from_secs(10)), sync_period: Some(Duration::from_secs(10)),
recovery_parallelism: 2, recovery_parallelism: 2,
..Default::default() ..Default::default()
@@ -243,15 +216,11 @@ fn test_load_standalone_example_config() {
}), }),
], ],
storage: StorageConfig { storage: StorageConfig {
data_home: DEFAULT_DATA_HOME.to_string(), data_home: "./greptimedb_data/".to_string(),
..Default::default() ..Default::default()
}, },
logging: LoggingOptions { logging: LoggingOptions {
level: Some("info".to_string()), level: Some("info".to_string()),
dir: Path::new(DEFAULT_DATA_HOME)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string(),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()), otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()), tracing_sample_ratio: Some(Default::default()),
..Default::default() ..Default::default()

View File

@@ -26,9 +26,6 @@ pub fn metadata_store_dir(store_dir: &str) -> String {
format!("{store_dir}/metadata") format!("{store_dir}/metadata")
} }
/// The default data home directory.
pub const DEFAULT_DATA_HOME: &str = "./greptimedb_data";
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)] #[serde(default)]
pub struct KvBackendConfig { pub struct KvBackendConfig {

View File

@@ -13,9 +13,7 @@
// limitations under the License. // limitations under the License.
pub mod fs; pub mod fs;
pub mod oss;
pub mod s3; pub mod s3;
use std::collections::HashMap; use std::collections::HashMap;
use lazy_static::lazy_static; use lazy_static::lazy_static;
@@ -27,12 +25,10 @@ use url::{ParseError, Url};
use self::fs::build_fs_backend; use self::fs::build_fs_backend;
use self::s3::build_s3_backend; use self::s3::build_s3_backend;
use crate::error::{self, Result}; use crate::error::{self, Result};
use crate::object_store::oss::build_oss_backend;
use crate::util::find_dir_and_filename; use crate::util::find_dir_and_filename;
pub const FS_SCHEMA: &str = "FS"; pub const FS_SCHEMA: &str = "FS";
pub const S3_SCHEMA: &str = "S3"; pub const S3_SCHEMA: &str = "S3";
pub const OSS_SCHEMA: &str = "OSS";
/// Returns `(schema, Option<host>, path)` /// Returns `(schema, Option<host>, path)`
pub fn parse_url(url: &str) -> Result<(String, Option<String>, String)> { pub fn parse_url(url: &str) -> Result<(String, Option<String>, String)> {
@@ -68,12 +64,6 @@ pub fn build_backend(url: &str, connection: &HashMap<String, String>) -> Result<
})?; })?;
Ok(build_s3_backend(&host, &root, connection)?) Ok(build_s3_backend(&host, &root, connection)?)
} }
OSS_SCHEMA => {
let host = host.context(error::EmptyHostPathSnafu {
url: url.to_string(),
})?;
Ok(build_oss_backend(&host, &root, connection)?)
}
FS_SCHEMA => Ok(build_fs_backend(&root)?), FS_SCHEMA => Ok(build_fs_backend(&root)?),
_ => error::UnsupportedBackendProtocolSnafu { _ => error::UnsupportedBackendProtocolSnafu {

View File

@@ -1,118 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use object_store::services::Oss;
use object_store::ObjectStore;
use snafu::ResultExt;
use crate::error::{self, Result};
const BUCKET: &str = "bucket";
const ENDPOINT: &str = "endpoint";
const ACCESS_KEY_ID: &str = "access_key_id";
const ACCESS_KEY_SECRET: &str = "access_key_secret";
const ROOT: &str = "root";
const ALLOW_ANONYMOUS: &str = "allow_anonymous";
/// Check if the key is supported in OSS configuration.
pub fn is_supported_in_oss(key: &str) -> bool {
[
ROOT,
ALLOW_ANONYMOUS,
BUCKET,
ENDPOINT,
ACCESS_KEY_ID,
ACCESS_KEY_SECRET,
]
.contains(&key)
}
/// Build an OSS backend using the provided bucket, root, and connection parameters.
pub fn build_oss_backend(
bucket: &str,
root: &str,
connection: &HashMap<String, String>,
) -> Result<ObjectStore> {
let mut builder = Oss::default().bucket(bucket).root(root);
if let Some(endpoint) = connection.get(ENDPOINT) {
builder = builder.endpoint(endpoint);
}
if let Some(access_key_id) = connection.get(ACCESS_KEY_ID) {
builder = builder.access_key_id(access_key_id);
}
if let Some(access_key_secret) = connection.get(ACCESS_KEY_SECRET) {
builder = builder.access_key_secret(access_key_secret);
}
if let Some(allow_anonymous) = connection.get(ALLOW_ANONYMOUS) {
let allow = allow_anonymous.as_str().parse::<bool>().map_err(|e| {
error::InvalidConnectionSnafu {
msg: format!(
"failed to parse the option {}={}, {}",
ALLOW_ANONYMOUS, allow_anonymous, e
),
}
.build()
})?;
if allow {
builder = builder.allow_anonymous();
}
}
let op = ObjectStore::new(builder)
.context(error::BuildBackendSnafu)?
.layer(object_store::layers::LoggingLayer::default())
.layer(object_store::layers::TracingLayer)
.layer(object_store::layers::build_prometheus_metrics_layer(true))
.finish();
Ok(op)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_is_supported_in_oss() {
assert!(is_supported_in_oss(ROOT));
assert!(is_supported_in_oss(ALLOW_ANONYMOUS));
assert!(is_supported_in_oss(BUCKET));
assert!(is_supported_in_oss(ENDPOINT));
assert!(is_supported_in_oss(ACCESS_KEY_ID));
assert!(is_supported_in_oss(ACCESS_KEY_SECRET));
assert!(!is_supported_in_oss("foo"));
assert!(!is_supported_in_oss("BAR"));
}
#[test]
fn test_build_oss_backend_all_fields_valid() {
let mut connection = HashMap::new();
connection.insert(
ENDPOINT.to_string(),
"http://oss-ap-southeast-1.aliyuncs.com".to_string(),
);
connection.insert(ACCESS_KEY_ID.to_string(), "key_id".to_string());
connection.insert(ACCESS_KEY_SECRET.to_string(), "key_secret".to_string());
connection.insert(ALLOW_ANONYMOUS.to_string(), "true".to_string());
let result = build_oss_backend("my-bucket", "my-root", &connection);
assert!(result.is_ok());
}
}

View File

@@ -1,90 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_macro::admin_fn;
use common_query::error::{
InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::Signature;
use datafusion::logical_expr::Volatility;
use datatypes::value::{Value, ValueRef};
use session::context::QueryContextRef;
use snafu::ensure;
use store_api::storage::ConcreteDataType;
use crate::handlers::FlowServiceHandlerRef;
use crate::helper::parse_catalog_flow;
fn adjust_signature() -> Signature {
Signature::exact(
vec![
ConcreteDataType::string_datatype(), // flow name
ConcreteDataType::uint64_datatype(), // min_run_interval in seconds
ConcreteDataType::uint64_datatype(), // max filter number per query
],
Volatility::Immutable,
)
}
#[admin_fn(
name = AdjustFlowFunction,
display_name = adjust_flow,
sig_fn = adjust_signature,
ret = uint64
)]
pub(crate) async fn adjust_flow(
flow_service_handler: &FlowServiceHandlerRef,
query_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
ensure!(
params.len() == 3,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 3, have: {}",
params.len()
),
}
);
let (flow_name, min_run_interval, max_filter_num) = match (params[0], params[1], params[2]) {
(
ValueRef::String(flow_name),
ValueRef::UInt64(min_run_interval),
ValueRef::UInt64(max_filter_num),
) => (flow_name, min_run_interval, max_filter_num),
_ => {
return UnsupportedInputDataTypeSnafu {
function: "adjust_flow",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
}
};
let (catalog_name, flow_name) = parse_catalog_flow(flow_name, query_ctx)?;
let res = flow_service_handler
.adjust(
&catalog_name,
&flow_name,
min_run_interval,
max_filter_num as usize,
query_ctx.clone(),
)
.await?;
let affected_rows = res.affected_rows;
Ok(Value::from(affected_rows))
}

View File

@@ -26,7 +26,6 @@ use flush_compact_table::{CompactTableFunction, FlushTableFunction};
use migrate_region::MigrateRegionFunction; use migrate_region::MigrateRegionFunction;
use remove_region_follower::RemoveRegionFollowerFunction; use remove_region_follower::RemoveRegionFollowerFunction;
use crate::adjust_flow::AdjustFlowFunction;
use crate::flush_flow::FlushFlowFunction; use crate::flush_flow::FlushFlowFunction;
use crate::function_registry::FunctionRegistry; use crate::function_registry::FunctionRegistry;
@@ -44,6 +43,5 @@ impl AdminFunction {
registry.register_async(Arc::new(FlushTableFunction)); registry.register_async(Arc::new(FlushTableFunction));
registry.register_async(Arc::new(CompactTableFunction)); registry.register_async(Arc::new(CompactTableFunction));
registry.register_async(Arc::new(FlushFlowFunction)); registry.register_async(Arc::new(FlushFlowFunction));
registry.register_async(Arc::new(AdjustFlowFunction));
} }
} }

View File

@@ -12,6 +12,11 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
mod cgroups; mod geo_path;
mod hll;
mod uddsketch_state;
pub use cgroups::*; pub use geo_path::{GeoPathAccumulator, GEO_PATH_NAME};
pub(crate) use hll::HllStateType;
pub use hll::{HllState, HLL_MERGE_NAME, HLL_NAME};
pub use uddsketch_state::{UddSketchState, UDDSKETCH_MERGE_NAME, UDDSKETCH_STATE_NAME};

View File

@@ -47,7 +47,7 @@ impl GeoPathAccumulator {
Self::default() Self::default()
} }
pub fn uadf_impl() -> AggregateUDF { pub fn udf_impl() -> AggregateUDF {
create_udaf( create_udaf(
GEO_PATH_NAME, GEO_PATH_NAME,
// Input types: lat, lng, timestamp // Input types: lat, lng, timestamp

View File

@@ -1,18 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod approximate;
#[cfg(feature = "geo")]
pub mod geo;
pub mod vector;

View File

@@ -1,32 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::function_registry::FunctionRegistry;
pub(crate) mod hll;
mod uddsketch;
pub(crate) struct ApproximateFunction;
impl ApproximateFunction {
pub fn register(registry: &FunctionRegistry) {
// uddsketch
registry.register_aggr(uddsketch::UddSketchState::state_udf_impl());
registry.register_aggr(uddsketch::UddSketchState::merge_udf_impl());
// hll
registry.register_aggr(hll::HllState::state_udf_impl());
registry.register_aggr(hll::HllState::merge_udf_impl());
}
}

View File

@@ -1,27 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::function_registry::FunctionRegistry;
mod encoding;
mod geo_path;
pub(crate) struct GeoFunction;
impl GeoFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_aggr(geo_path::GeoPathAccumulator::uadf_impl());
registry.register_aggr(encoding::JsonPathAccumulator::uadf_impl());
}
}

View File

@@ -1,29 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::aggrs::vector::product::VectorProduct;
use crate::aggrs::vector::sum::VectorSum;
use crate::function_registry::FunctionRegistry;
mod product;
mod sum;
pub(crate) struct VectorFunction;
impl VectorFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_aggr(VectorSum::uadf_impl());
registry.register_aggr(VectorProduct::uadf_impl());
}
}

View File

@@ -12,19 +12,21 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use common_error::ext::BoxedError;
use common_macro::admin_fn; use common_macro::admin_fn;
use common_query::error::{ use common_query::error::{
InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu, ExecuteSnafu, InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result,
UnsupportedInputDataTypeSnafu,
}; };
use common_query::prelude::Signature; use common_query::prelude::Signature;
use datafusion::logical_expr::Volatility; use datafusion::logical_expr::Volatility;
use datatypes::value::{Value, ValueRef}; use datatypes::value::{Value, ValueRef};
use session::context::QueryContextRef; use session::context::QueryContextRef;
use snafu::ensure; use snafu::{ensure, ResultExt};
use sql::parser::ParserContext;
use store_api::storage::ConcreteDataType; use store_api::storage::ConcreteDataType;
use crate::handlers::FlowServiceHandlerRef; use crate::handlers::FlowServiceHandlerRef;
use crate::helper::parse_catalog_flow;
fn flush_signature() -> Signature { fn flush_signature() -> Signature {
Signature::uniform( Signature::uniform(
@@ -45,6 +47,20 @@ pub(crate) async fn flush_flow(
query_ctx: &QueryContextRef, query_ctx: &QueryContextRef,
params: &[ValueRef<'_>], params: &[ValueRef<'_>],
) -> Result<Value> { ) -> Result<Value> {
let (catalog_name, flow_name) = parse_flush_flow(params, query_ctx)?;
let res = flow_service_handler
.flush(&catalog_name, &flow_name, query_ctx.clone())
.await?;
let affected_rows = res.affected_rows;
Ok(Value::from(affected_rows))
}
fn parse_flush_flow(
params: &[ValueRef<'_>],
query_ctx: &QueryContextRef,
) -> Result<(String, String)> {
ensure!( ensure!(
params.len() == 1, params.len() == 1,
InvalidFuncArgsSnafu { InvalidFuncArgsSnafu {
@@ -54,6 +70,7 @@ pub(crate) async fn flush_flow(
), ),
} }
); );
let ValueRef::String(flow_name) = params[0] else { let ValueRef::String(flow_name) = params[0] else {
return UnsupportedInputDataTypeSnafu { return UnsupportedInputDataTypeSnafu {
function: "flush_flow", function: "flush_flow",
@@ -61,14 +78,27 @@ pub(crate) async fn flush_flow(
} }
.fail(); .fail();
}; };
let (catalog_name, flow_name) = parse_catalog_flow(flow_name, query_ctx)?; let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect())
.map_err(BoxedError::new)
.context(ExecuteSnafu)?;
let res = flow_service_handler let (catalog_name, flow_name) = match &obj_name.0[..] {
.flush(&catalog_name, &flow_name, query_ctx.clone()) [flow_name] => (
.await?; query_ctx.current_catalog().to_string(),
let affected_rows = res.affected_rows; flow_name.value.clone(),
),
Ok(Value::from(affected_rows)) [catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()),
_ => {
return InvalidFuncArgsSnafu {
err_msg: format!(
"expect flow name to be <catalog>.<flow-name> or <flow-name>, actual: {}",
obj_name
),
}
.fail()
}
};
Ok((catalog_name, flow_name))
} }
#[cfg(test)] #[cfg(test)]
@@ -124,7 +154,10 @@ mod test {
("catalog.flow_name", ("catalog", "flow_name")), ("catalog.flow_name", ("catalog", "flow_name")),
]; ];
for (input, expected) in testcases.iter() { for (input, expected) in testcases.iter() {
let result = parse_catalog_flow(input, &QueryContext::arc()).unwrap(); let args = vec![*input];
let args = args.into_iter().map(ValueRef::String).collect::<Vec<_>>();
let result = parse_flush_flow(&args, &QueryContext::arc()).unwrap();
assert_eq!(*expected, (result.0.as_str(), result.1.as_str())); assert_eq!(*expected, (result.0.as_str(), result.1.as_str()));
} }
} }

View File

@@ -1,63 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use datafusion_expr::ScalarUDF;
use crate::function::{FunctionContext, FunctionRef};
use crate::scalars::udf::create_udf;
/// A factory for creating `ScalarUDF` that require a function context.
#[derive(Clone)]
pub struct ScalarFunctionFactory {
name: String,
factory: Arc<dyn Fn(FunctionContext) -> ScalarUDF + Send + Sync>,
}
impl ScalarFunctionFactory {
/// Returns the name of the function.
pub fn name(&self) -> &str {
&self.name
}
/// Returns a `ScalarUDF` when given a function context.
pub fn provide(&self, ctx: FunctionContext) -> ScalarUDF {
(self.factory)(ctx)
}
}
impl From<ScalarUDF> for ScalarFunctionFactory {
fn from(df_udf: ScalarUDF) -> Self {
let name = df_udf.name().to_string();
let func = Arc::new(move |_ctx| df_udf.clone());
Self {
name,
factory: func,
}
}
}
impl From<FunctionRef> for ScalarFunctionFactory {
fn from(func: FunctionRef) -> Self {
let name = func.name().to_string();
let func = Arc::new(move |ctx: FunctionContext| {
create_udf(func.clone(), ctx.query_ctx, ctx.state)
});
Self {
name,
factory: func,
}
}
}

View File

@@ -16,14 +16,11 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use datafusion_expr::AggregateUDF;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use crate::admin::AdminFunction; use crate::admin::AdminFunction;
use crate::aggrs::approximate::ApproximateFunction; use crate::function::{AsyncFunctionRef, FunctionRef};
use crate::aggrs::vector::VectorFunction as VectorAggrFunction; use crate::scalars::aggregate::{AggregateFunctionMetaRef, AggregateFunctions};
use crate::function::{AsyncFunctionRef, Function, FunctionRef};
use crate::function_factory::ScalarFunctionFactory;
use crate::scalars::date::DateFunction; use crate::scalars::date::DateFunction;
use crate::scalars::expression::ExpressionFunction; use crate::scalars::expression::ExpressionFunction;
use crate::scalars::hll_count::HllCalcFunction; use crate::scalars::hll_count::HllCalcFunction;
@@ -34,19 +31,18 @@ use crate::scalars::matches_term::MatchesTermFunction;
use crate::scalars::math::MathFunction; use crate::scalars::math::MathFunction;
use crate::scalars::timestamp::TimestampFunction; use crate::scalars::timestamp::TimestampFunction;
use crate::scalars::uddsketch_calc::UddSketchCalcFunction; use crate::scalars::uddsketch_calc::UddSketchCalcFunction;
use crate::scalars::vector::VectorFunction as VectorScalarFunction; use crate::scalars::vector::VectorFunction;
use crate::system::SystemFunction; use crate::system::SystemFunction;
#[derive(Default)] #[derive(Default)]
pub struct FunctionRegistry { pub struct FunctionRegistry {
functions: RwLock<HashMap<String, ScalarFunctionFactory>>, functions: RwLock<HashMap<String, FunctionRef>>,
async_functions: RwLock<HashMap<String, AsyncFunctionRef>>, async_functions: RwLock<HashMap<String, AsyncFunctionRef>>,
aggregate_functions: RwLock<HashMap<String, AggregateUDF>>, aggregate_functions: RwLock<HashMap<String, AggregateFunctionMetaRef>>,
} }
impl FunctionRegistry { impl FunctionRegistry {
pub fn register(&self, func: impl Into<ScalarFunctionFactory>) { pub fn register(&self, func: FunctionRef) {
let func = func.into();
let _ = self let _ = self
.functions .functions
.write() .write()
@@ -54,10 +50,6 @@ impl FunctionRegistry {
.insert(func.name().to_string(), func); .insert(func.name().to_string(), func);
} }
pub fn register_scalar(&self, func: impl Function + 'static) {
self.register(Arc::new(func) as FunctionRef);
}
pub fn register_async(&self, func: AsyncFunctionRef) { pub fn register_async(&self, func: AsyncFunctionRef) {
let _ = self let _ = self
.async_functions .async_functions
@@ -66,14 +58,6 @@ impl FunctionRegistry {
.insert(func.name().to_string(), func); .insert(func.name().to_string(), func);
} }
pub fn register_aggr(&self, func: AggregateUDF) {
let _ = self
.aggregate_functions
.write()
.unwrap()
.insert(func.name().to_string(), func);
}
pub fn get_async_function(&self, name: &str) -> Option<AsyncFunctionRef> { pub fn get_async_function(&self, name: &str) -> Option<AsyncFunctionRef> {
self.async_functions.read().unwrap().get(name).cloned() self.async_functions.read().unwrap().get(name).cloned()
} }
@@ -87,16 +71,27 @@ impl FunctionRegistry {
.collect() .collect()
} }
#[cfg(test)] pub fn register_aggregate_function(&self, func: AggregateFunctionMetaRef) {
pub fn get_function(&self, name: &str) -> Option<ScalarFunctionFactory> { let _ = self
.aggregate_functions
.write()
.unwrap()
.insert(func.name(), func);
}
pub fn get_aggr_function(&self, name: &str) -> Option<AggregateFunctionMetaRef> {
self.aggregate_functions.read().unwrap().get(name).cloned()
}
pub fn get_function(&self, name: &str) -> Option<FunctionRef> {
self.functions.read().unwrap().get(name).cloned() self.functions.read().unwrap().get(name).cloned()
} }
pub fn scalar_functions(&self) -> Vec<ScalarFunctionFactory> { pub fn functions(&self) -> Vec<FunctionRef> {
self.functions.read().unwrap().values().cloned().collect() self.functions.read().unwrap().values().cloned().collect()
} }
pub fn aggregate_functions(&self) -> Vec<AggregateUDF> { pub fn aggregate_functions(&self) -> Vec<AggregateFunctionMetaRef> {
self.aggregate_functions self.aggregate_functions
.read() .read()
.unwrap() .unwrap()
@@ -117,6 +112,9 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
UddSketchCalcFunction::register(&function_registry); UddSketchCalcFunction::register(&function_registry);
HllCalcFunction::register(&function_registry); HllCalcFunction::register(&function_registry);
// Aggregate functions
AggregateFunctions::register(&function_registry);
// Full text search function // Full text search function
MatchesFunction::register(&function_registry); MatchesFunction::register(&function_registry);
MatchesTermFunction::register(&function_registry); MatchesTermFunction::register(&function_registry);
@@ -129,21 +127,15 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
JsonFunction::register(&function_registry); JsonFunction::register(&function_registry);
// Vector related functions // Vector related functions
VectorScalarFunction::register(&function_registry); VectorFunction::register(&function_registry);
VectorAggrFunction::register(&function_registry);
// Geo functions // Geo functions
#[cfg(feature = "geo")] #[cfg(feature = "geo")]
crate::scalars::geo::GeoFunctions::register(&function_registry); crate::scalars::geo::GeoFunctions::register(&function_registry);
#[cfg(feature = "geo")]
crate::aggrs::geo::GeoFunction::register(&function_registry);
// Ip functions // Ip functions
IpFunctions::register(&function_registry); IpFunctions::register(&function_registry);
// Approximate functions
ApproximateFunction::register(&function_registry);
Arc::new(function_registry) Arc::new(function_registry)
}); });
@@ -155,11 +147,12 @@ mod tests {
#[test] #[test]
fn test_function_registry() { fn test_function_registry() {
let registry = FunctionRegistry::default(); let registry = FunctionRegistry::default();
let func = Arc::new(TestAndFunction);
assert!(registry.get_function("test_and").is_none()); assert!(registry.get_function("test_and").is_none());
assert!(registry.scalar_functions().is_empty()); assert!(registry.functions().is_empty());
registry.register_scalar(TestAndFunction); registry.register(func);
let _ = registry.get_function("test_and").unwrap(); let _ = registry.get_function("test_and").unwrap();
assert_eq!(1, registry.scalar_functions().len()); assert_eq!(1, registry.functions().len());
} }
} }

View File

@@ -87,15 +87,6 @@ pub trait FlowServiceHandler: Send + Sync {
flow: &str, flow: &str,
ctx: QueryContextRef, ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse>; ) -> Result<api::v1::flow::FlowResponse>;
async fn adjust(
&self,
catalog: &str,
flow: &str,
min_run_interval_secs: u64,
max_filter_num_per_query: usize,
ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse>;
} }
pub type TableMutationHandlerRef = Arc<dyn TableMutationHandler>; pub type TableMutationHandlerRef = Arc<dyn TableMutationHandler>;

View File

@@ -12,15 +12,12 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use common_error::ext::BoxedError; use common_query::error::{InvalidInputTypeSnafu, Result};
use common_query::error::{ExecuteSnafu, InvalidFuncArgsSnafu, InvalidInputTypeSnafu, Result};
use common_query::prelude::{Signature, TypeSignature, Volatility}; use common_query::prelude::{Signature, TypeSignature, Volatility};
use datatypes::prelude::ConcreteDataType; use datatypes::prelude::ConcreteDataType;
use datatypes::types::cast::cast; use datatypes::types::cast::cast;
use datatypes::value::ValueRef; use datatypes::value::ValueRef;
use session::context::QueryContextRef;
use snafu::ResultExt; use snafu::ResultExt;
use sql::parser::ParserContext;
/// Create a function signature with oneof signatures of interleaving two arguments. /// Create a function signature with oneof signatures of interleaving two arguments.
pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>) -> Signature { pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>) -> Signature {
@@ -46,30 +43,3 @@ pub fn cast_u64(value: &ValueRef) -> Result<Option<u64>> {
}) })
.map(|v| v.as_u64()) .map(|v| v.as_u64())
} }
pub fn parse_catalog_flow(
flow_name: &str,
query_ctx: &QueryContextRef,
) -> Result<(String, String)> {
let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect())
.map_err(BoxedError::new)
.context(ExecuteSnafu)?;
let (catalog_name, flow_name) = match &obj_name.0[..] {
[flow_name] => (
query_ctx.current_catalog().to_string(),
flow_name.value.clone(),
),
[catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()),
_ => {
return InvalidFuncArgsSnafu {
err_msg: format!(
"expect flow name to be <catalog>.<flow-name> or <flow-name>, actual: {}",
obj_name
),
}
.fail()
}
};
Ok((catalog_name, flow_name))
}

View File

@@ -15,18 +15,16 @@
#![feature(let_chains)] #![feature(let_chains)]
#![feature(try_blocks)] #![feature(try_blocks)]
mod adjust_flow;
mod admin; mod admin;
mod flush_flow; mod flush_flow;
mod macros; mod macros;
pub mod scalars;
mod system; mod system;
pub mod aggrs; pub mod aggr;
pub mod function; pub mod function;
pub mod function_factory;
pub mod function_registry; pub mod function_registry;
pub mod handlers; pub mod handlers;
pub mod helper; pub mod helper;
pub mod scalars;
pub mod state; pub mod state;
pub mod utils; pub mod utils;

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
pub mod aggregate;
pub(crate) mod date; pub(crate) mod date;
pub mod expression; pub mod expression;
#[cfg(feature = "geo")] #[cfg(feature = "geo")]

View File

@@ -0,0 +1,89 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! # Deprecate Warning:
//!
//! This module is deprecated and will be removed in the future.
//! All UDAF implementation here are not maintained and should
//! not be used before they are refactored into the `src/aggr`
//! version.
use std::sync::Arc;
use common_query::logical_plan::AggregateFunctionCreatorRef;
use crate::function_registry::FunctionRegistry;
use crate::scalars::vector::product::VectorProductCreator;
use crate::scalars::vector::sum::VectorSumCreator;
/// A function creates `AggregateFunctionCreator`.
/// "Aggregator" *is* AggregatorFunction. Since the later one is long, we named an short alias for it.
/// The two names might be used interchangeably.
type AggregatorCreatorFunction = Arc<dyn Fn() -> AggregateFunctionCreatorRef + Send + Sync>;
/// `AggregateFunctionMeta` dynamically creates AggregateFunctionCreator.
#[derive(Clone)]
pub struct AggregateFunctionMeta {
name: String,
args_count: u8,
creator: AggregatorCreatorFunction,
}
pub type AggregateFunctionMetaRef = Arc<AggregateFunctionMeta>;
impl AggregateFunctionMeta {
pub fn new(name: &str, args_count: u8, creator: AggregatorCreatorFunction) -> Self {
Self {
name: name.to_string(),
args_count,
creator,
}
}
pub fn name(&self) -> String {
self.name.to_string()
}
pub fn args_count(&self) -> u8 {
self.args_count
}
pub fn create(&self) -> AggregateFunctionCreatorRef {
(self.creator)()
}
}
pub(crate) struct AggregateFunctions;
impl AggregateFunctions {
pub fn register(registry: &FunctionRegistry) {
registry.register_aggregate_function(Arc::new(AggregateFunctionMeta::new(
"vec_sum",
1,
Arc::new(|| Arc::new(VectorSumCreator::default())),
)));
registry.register_aggregate_function(Arc::new(AggregateFunctionMeta::new(
"vec_product",
1,
Arc::new(|| Arc::new(VectorProductCreator::default())),
)));
#[cfg(feature = "geo")]
registry.register_aggregate_function(Arc::new(AggregateFunctionMeta::new(
"json_encode_path",
3,
Arc::new(|| Arc::new(super::geo::encoding::JsonPathEncodeFunctionCreator::default())),
)));
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::sync::Arc;
mod date_add; mod date_add;
mod date_format; mod date_format;
mod date_sub; mod date_sub;
@@ -26,8 +27,8 @@ pub(crate) struct DateFunction;
impl DateFunction { impl DateFunction {
pub fn register(registry: &FunctionRegistry) { pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(DateAddFunction); registry.register(Arc::new(DateAddFunction));
registry.register_scalar(DateSubFunction); registry.register(Arc::new(DateSubFunction));
registry.register_scalar(DateFormatFunction); registry.register(Arc::new(DateFormatFunction));
} }
} }

View File

@@ -17,6 +17,8 @@ mod ctx;
mod is_null; mod is_null;
mod unary; mod unary;
use std::sync::Arc;
pub use binary::scalar_binary_op; pub use binary::scalar_binary_op;
pub use ctx::EvalContext; pub use ctx::EvalContext;
pub use unary::scalar_unary_op; pub use unary::scalar_unary_op;
@@ -28,6 +30,6 @@ pub(crate) struct ExpressionFunction;
impl ExpressionFunction { impl ExpressionFunction {
pub fn register(registry: &FunctionRegistry) { pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(IsNullFunction); registry.register(Arc::new(IsNullFunction));
} }
} }

View File

@@ -12,9 +12,11 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::sync::Arc;
pub(crate) mod encoding;
mod geohash; mod geohash;
mod h3; mod h3;
pub(crate) mod helpers; mod helpers;
mod measure; mod measure;
mod relation; mod relation;
mod s2; mod s2;
@@ -27,57 +29,57 @@ pub(crate) struct GeoFunctions;
impl GeoFunctions { impl GeoFunctions {
pub fn register(registry: &FunctionRegistry) { pub fn register(registry: &FunctionRegistry) {
// geohash // geohash
registry.register_scalar(geohash::GeohashFunction); registry.register(Arc::new(geohash::GeohashFunction));
registry.register_scalar(geohash::GeohashNeighboursFunction); registry.register(Arc::new(geohash::GeohashNeighboursFunction));
// h3 index // h3 index
registry.register_scalar(h3::H3LatLngToCell); registry.register(Arc::new(h3::H3LatLngToCell));
registry.register_scalar(h3::H3LatLngToCellString); registry.register(Arc::new(h3::H3LatLngToCellString));
// h3 index inspection // h3 index inspection
registry.register_scalar(h3::H3CellBase); registry.register(Arc::new(h3::H3CellBase));
registry.register_scalar(h3::H3CellIsPentagon); registry.register(Arc::new(h3::H3CellIsPentagon));
registry.register_scalar(h3::H3StringToCell); registry.register(Arc::new(h3::H3StringToCell));
registry.register_scalar(h3::H3CellToString); registry.register(Arc::new(h3::H3CellToString));
registry.register_scalar(h3::H3CellCenterLatLng); registry.register(Arc::new(h3::H3CellCenterLatLng));
registry.register_scalar(h3::H3CellResolution); registry.register(Arc::new(h3::H3CellResolution));
// h3 hierarchical grid // h3 hierarchical grid
registry.register_scalar(h3::H3CellCenterChild); registry.register(Arc::new(h3::H3CellCenterChild));
registry.register_scalar(h3::H3CellParent); registry.register(Arc::new(h3::H3CellParent));
registry.register_scalar(h3::H3CellToChildren); registry.register(Arc::new(h3::H3CellToChildren));
registry.register_scalar(h3::H3CellToChildrenSize); registry.register(Arc::new(h3::H3CellToChildrenSize));
registry.register_scalar(h3::H3CellToChildPos); registry.register(Arc::new(h3::H3CellToChildPos));
registry.register_scalar(h3::H3ChildPosToCell); registry.register(Arc::new(h3::H3ChildPosToCell));
registry.register_scalar(h3::H3CellContains); registry.register(Arc::new(h3::H3CellContains));
// h3 grid traversal // h3 grid traversal
registry.register_scalar(h3::H3GridDisk); registry.register(Arc::new(h3::H3GridDisk));
registry.register_scalar(h3::H3GridDiskDistances); registry.register(Arc::new(h3::H3GridDiskDistances));
registry.register_scalar(h3::H3GridDistance); registry.register(Arc::new(h3::H3GridDistance));
registry.register_scalar(h3::H3GridPathCells); registry.register(Arc::new(h3::H3GridPathCells));
// h3 measurement // h3 measurement
registry.register_scalar(h3::H3CellDistanceSphereKm); registry.register(Arc::new(h3::H3CellDistanceSphereKm));
registry.register_scalar(h3::H3CellDistanceEuclideanDegree); registry.register(Arc::new(h3::H3CellDistanceEuclideanDegree));
// s2 // s2
registry.register_scalar(s2::S2LatLngToCell); registry.register(Arc::new(s2::S2LatLngToCell));
registry.register_scalar(s2::S2CellLevel); registry.register(Arc::new(s2::S2CellLevel));
registry.register_scalar(s2::S2CellToToken); registry.register(Arc::new(s2::S2CellToToken));
registry.register_scalar(s2::S2CellParent); registry.register(Arc::new(s2::S2CellParent));
// spatial data type // spatial data type
registry.register_scalar(wkt::LatLngToPointWkt); registry.register(Arc::new(wkt::LatLngToPointWkt));
// spatial relation // spatial relation
registry.register_scalar(relation::STContains); registry.register(Arc::new(relation::STContains));
registry.register_scalar(relation::STWithin); registry.register(Arc::new(relation::STWithin));
registry.register_scalar(relation::STIntersects); registry.register(Arc::new(relation::STIntersects));
// spatial measure // spatial measure
registry.register_scalar(measure::STDistance); registry.register(Arc::new(measure::STDistance));
registry.register_scalar(measure::STDistanceSphere); registry.register(Arc::new(measure::STDistanceSphere));
registry.register_scalar(measure::STArea); registry.register(Arc::new(measure::STArea));
} }
} }

View File

@@ -19,12 +19,9 @@ use common_error::status_code::StatusCode;
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore}; use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{self, InvalidInputStateSnafu, Result}; use common_query::error::{self, InvalidInputStateSnafu, Result};
use common_query::logical_plan::accumulator::AggrFuncTypeStore; use common_query::logical_plan::accumulator::AggrFuncTypeStore;
use common_query::logical_plan::{ use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
create_aggregate_function, Accumulator, AggregateFunctionCreator,
};
use common_query::prelude::AccumulatorCreatorFunction; use common_query::prelude::AccumulatorCreatorFunction;
use common_time::Timestamp; use common_time::Timestamp;
use datafusion_expr::AggregateUDF;
use datatypes::prelude::ConcreteDataType; use datatypes::prelude::ConcreteDataType;
use datatypes::value::{ListValue, Value}; use datatypes::value::{ListValue, Value};
use datatypes::vectors::VectorRef; use datatypes::vectors::VectorRef;
@@ -50,16 +47,6 @@ impl JsonPathAccumulator {
timestamp_type, timestamp_type,
} }
} }
/// Create a new `AggregateUDF` for the `json_encode_path` aggregate function.
pub fn uadf_impl() -> AggregateUDF {
create_aggregate_function(
"json_encode_path".to_string(),
3,
Arc::new(JsonPathEncodeFunctionCreator::default()),
)
.into()
}
} }
impl Accumulator for JsonPathAccumulator { impl Accumulator for JsonPathAccumulator {

View File

@@ -37,7 +37,7 @@ macro_rules! ensure_columns_len {
}; };
} }
pub(crate) use ensure_columns_len; pub(super) use ensure_columns_len;
macro_rules! ensure_columns_n { macro_rules! ensure_columns_n {
($columns:ident, $n:literal) => { ($columns:ident, $n:literal) => {
@@ -58,7 +58,7 @@ macro_rules! ensure_columns_n {
}; };
} }
pub(crate) use ensure_columns_n; pub(super) use ensure_columns_n;
macro_rules! ensure_and_coerce { macro_rules! ensure_and_coerce {
($compare:expr, $coerce:expr) => {{ ($compare:expr, $coerce:expr) => {{
@@ -72,4 +72,4 @@ macro_rules! ensure_and_coerce {
}}; }};
} }
pub(crate) use ensure_and_coerce; pub(super) use ensure_and_coerce;

View File

@@ -16,6 +16,7 @@
use std::fmt; use std::fmt;
use std::fmt::Display; use std::fmt::Display;
use std::sync::Arc;
use common_query::error::{DowncastVectorSnafu, InvalidFuncArgsSnafu, Result}; use common_query::error::{DowncastVectorSnafu, InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, Volatility}; use common_query::prelude::{Signature, Volatility};
@@ -26,7 +27,7 @@ use datatypes::vectors::{BinaryVector, MutableVector, UInt64VectorBuilder, Vecto
use hyperloglogplus::HyperLogLog; use hyperloglogplus::HyperLogLog;
use snafu::OptionExt; use snafu::OptionExt;
use crate::aggrs::approximate::hll::HllStateType; use crate::aggr::HllStateType;
use crate::function::{Function, FunctionContext}; use crate::function::{Function, FunctionContext};
use crate::function_registry::FunctionRegistry; use crate::function_registry::FunctionRegistry;
@@ -43,7 +44,7 @@ pub struct HllCalcFunction;
impl HllCalcFunction { impl HllCalcFunction {
pub fn register(registry: &FunctionRegistry) { pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(HllCalcFunction); registry.register(Arc::new(HllCalcFunction));
} }
} }
@@ -116,8 +117,6 @@ impl Function for HllCalcFunction {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::sync::Arc;
use datatypes::vectors::BinaryVector; use datatypes::vectors::BinaryVector;
use super::*; use super::*;

View File

@@ -17,6 +17,8 @@ mod ipv4;
mod ipv6; mod ipv6;
mod range; mod range;
use std::sync::Arc;
use cidr::{Ipv4ToCidr, Ipv6ToCidr}; use cidr::{Ipv4ToCidr, Ipv6ToCidr};
use ipv4::{Ipv4NumToString, Ipv4StringToNum}; use ipv4::{Ipv4NumToString, Ipv4StringToNum};
use ipv6::{Ipv6NumToString, Ipv6StringToNum}; use ipv6::{Ipv6NumToString, Ipv6StringToNum};
@@ -29,15 +31,15 @@ pub(crate) struct IpFunctions;
impl IpFunctions { impl IpFunctions {
pub fn register(registry: &FunctionRegistry) { pub fn register(registry: &FunctionRegistry) {
// Register IPv4 functions // Register IPv4 functions
registry.register_scalar(Ipv4NumToString); registry.register(Arc::new(Ipv4NumToString));
registry.register_scalar(Ipv4StringToNum); registry.register(Arc::new(Ipv4StringToNum));
registry.register_scalar(Ipv4ToCidr); registry.register(Arc::new(Ipv4ToCidr));
registry.register_scalar(Ipv4InRange); registry.register(Arc::new(Ipv4InRange));
// Register IPv6 functions // Register IPv6 functions
registry.register_scalar(Ipv6NumToString); registry.register(Arc::new(Ipv6NumToString));
registry.register_scalar(Ipv6StringToNum); registry.register(Arc::new(Ipv6StringToNum));
registry.register_scalar(Ipv6ToCidr); registry.register(Arc::new(Ipv6ToCidr));
registry.register_scalar(Ipv6InRange); registry.register(Arc::new(Ipv6InRange));
} }
} }

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::sync::Arc;
pub mod json_get; pub mod json_get;
mod json_is; mod json_is;
mod json_path_exists; mod json_path_exists;
@@ -32,23 +33,23 @@ pub(crate) struct JsonFunction;
impl JsonFunction { impl JsonFunction {
pub fn register(registry: &FunctionRegistry) { pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(JsonToStringFunction); registry.register(Arc::new(JsonToStringFunction));
registry.register_scalar(ParseJsonFunction); registry.register(Arc::new(ParseJsonFunction));
registry.register_scalar(JsonGetInt); registry.register(Arc::new(JsonGetInt));
registry.register_scalar(JsonGetFloat); registry.register(Arc::new(JsonGetFloat));
registry.register_scalar(JsonGetString); registry.register(Arc::new(JsonGetString));
registry.register_scalar(JsonGetBool); registry.register(Arc::new(JsonGetBool));
registry.register_scalar(JsonIsNull); registry.register(Arc::new(JsonIsNull));
registry.register_scalar(JsonIsInt); registry.register(Arc::new(JsonIsInt));
registry.register_scalar(JsonIsFloat); registry.register(Arc::new(JsonIsFloat));
registry.register_scalar(JsonIsString); registry.register(Arc::new(JsonIsString));
registry.register_scalar(JsonIsBool); registry.register(Arc::new(JsonIsBool));
registry.register_scalar(JsonIsArray); registry.register(Arc::new(JsonIsArray));
registry.register_scalar(JsonIsObject); registry.register(Arc::new(JsonIsObject));
registry.register_scalar(json_path_exists::JsonPathExistsFunction); registry.register(Arc::new(json_path_exists::JsonPathExistsFunction));
registry.register_scalar(json_path_match::JsonPathMatchFunction); registry.register(Arc::new(json_path_match::JsonPathMatchFunction));
} }
} }

View File

@@ -38,11 +38,11 @@ use crate::function_registry::FunctionRegistry;
/// ///
/// Usage: matches(`<col>`, `<pattern>`) -> boolean /// Usage: matches(`<col>`, `<pattern>`) -> boolean
#[derive(Clone, Debug, Default)] #[derive(Clone, Debug, Default)]
pub struct MatchesFunction; pub(crate) struct MatchesFunction;
impl MatchesFunction { impl MatchesFunction {
pub fn register(registry: &FunctionRegistry) { pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(MatchesFunction); registry.register(Arc::new(MatchesFunction));
} }
} }

View File

@@ -77,7 +77,7 @@ pub struct MatchesTermFunction;
impl MatchesTermFunction { impl MatchesTermFunction {
pub fn register(registry: &FunctionRegistry) { pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(MatchesTermFunction); registry.register(Arc::new(MatchesTermFunction));
} }
} }

View File

@@ -18,6 +18,7 @@ mod pow;
mod rate; mod rate;
use std::fmt; use std::fmt;
use std::sync::Arc;
pub use clamp::{ClampFunction, ClampMaxFunction, ClampMinFunction}; pub use clamp::{ClampFunction, ClampMaxFunction, ClampMinFunction};
use common_query::error::{GeneralDataFusionSnafu, Result}; use common_query::error::{GeneralDataFusionSnafu, Result};
@@ -38,13 +39,13 @@ pub(crate) struct MathFunction;
impl MathFunction { impl MathFunction {
pub fn register(registry: &FunctionRegistry) { pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(ModuloFunction); registry.register(Arc::new(ModuloFunction));
registry.register_scalar(PowFunction); registry.register(Arc::new(PowFunction));
registry.register_scalar(RateFunction); registry.register(Arc::new(RateFunction));
registry.register_scalar(RangeFunction); registry.register(Arc::new(RangeFunction));
registry.register_scalar(ClampFunction); registry.register(Arc::new(ClampFunction));
registry.register_scalar(ClampMinFunction); registry.register(Arc::new(ClampMinFunction));
registry.register_scalar(ClampMaxFunction); registry.register(Arc::new(ClampMaxFunction));
} }
} }

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::sync::Arc;
mod to_unixtime; mod to_unixtime;
use to_unixtime::ToUnixtimeFunction; use to_unixtime::ToUnixtimeFunction;
@@ -22,6 +23,6 @@ pub(crate) struct TimestampFunction;
impl TimestampFunction { impl TimestampFunction {
pub fn register(registry: &FunctionRegistry) { pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(ToUnixtimeFunction); registry.register(Arc::new(ToUnixtimeFunction));
} }
} }

View File

@@ -16,6 +16,7 @@
use std::fmt; use std::fmt;
use std::fmt::Display; use std::fmt::Display;
use std::sync::Arc;
use common_query::error::{DowncastVectorSnafu, InvalidFuncArgsSnafu, Result}; use common_query::error::{DowncastVectorSnafu, InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, Volatility}; use common_query::prelude::{Signature, Volatility};
@@ -43,7 +44,7 @@ pub struct UddSketchCalcFunction;
impl UddSketchCalcFunction { impl UddSketchCalcFunction {
pub fn register(registry: &FunctionRegistry) { pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(UddSketchCalcFunction); registry.register(Arc::new(UddSketchCalcFunction));
} }
} }

View File

@@ -17,8 +17,10 @@ mod distance;
mod elem_product; mod elem_product;
mod elem_sum; mod elem_sum;
pub mod impl_conv; pub mod impl_conv;
pub(crate) mod product;
mod scalar_add; mod scalar_add;
mod scalar_mul; mod scalar_mul;
pub(crate) mod sum;
mod vector_add; mod vector_add;
mod vector_dim; mod vector_dim;
mod vector_div; mod vector_div;
@@ -28,34 +30,37 @@ mod vector_norm;
mod vector_sub; mod vector_sub;
mod vector_subvector; mod vector_subvector;
use std::sync::Arc;
use crate::function_registry::FunctionRegistry; use crate::function_registry::FunctionRegistry;
pub(crate) struct VectorFunction; pub(crate) struct VectorFunction;
impl VectorFunction { impl VectorFunction {
pub fn register(registry: &FunctionRegistry) { pub fn register(registry: &FunctionRegistry) {
// conversion // conversion
registry.register_scalar(convert::ParseVectorFunction); registry.register(Arc::new(convert::ParseVectorFunction));
registry.register_scalar(convert::VectorToStringFunction); registry.register(Arc::new(convert::VectorToStringFunction));
// distance // distance
registry.register_scalar(distance::CosDistanceFunction); registry.register(Arc::new(distance::CosDistanceFunction));
registry.register_scalar(distance::DotProductFunction); registry.register(Arc::new(distance::DotProductFunction));
registry.register_scalar(distance::L2SqDistanceFunction); registry.register(Arc::new(distance::L2SqDistanceFunction));
// scalar calculation // scalar calculation
registry.register_scalar(scalar_add::ScalarAddFunction); registry.register(Arc::new(scalar_add::ScalarAddFunction));
registry.register_scalar(scalar_mul::ScalarMulFunction); registry.register(Arc::new(scalar_mul::ScalarMulFunction));
// vector calculation // vector calculation
registry.register_scalar(vector_add::VectorAddFunction); registry.register(Arc::new(vector_add::VectorAddFunction));
registry.register_scalar(vector_sub::VectorSubFunction); registry.register(Arc::new(vector_sub::VectorSubFunction));
registry.register_scalar(vector_mul::VectorMulFunction); registry.register(Arc::new(vector_mul::VectorMulFunction));
registry.register_scalar(vector_div::VectorDivFunction); registry.register(Arc::new(vector_div::VectorDivFunction));
registry.register_scalar(vector_norm::VectorNormFunction); registry.register(Arc::new(vector_norm::VectorNormFunction));
registry.register_scalar(vector_dim::VectorDimFunction); registry.register(Arc::new(vector_dim::VectorDimFunction));
registry.register_scalar(vector_kth_elem::VectorKthElemFunction); registry.register(Arc::new(vector_kth_elem::VectorKthElemFunction));
registry.register_scalar(vector_subvector::VectorSubvectorFunction); registry.register(Arc::new(vector_subvector::VectorSubvectorFunction));
registry.register_scalar(elem_sum::ElemSumFunction); registry.register(Arc::new(elem_sum::ElemSumFunction));
registry.register_scalar(elem_product::ElemProductFunction); registry.register(Arc::new(elem_product::ElemProductFunction));
} }
} }

View File

@@ -16,11 +16,8 @@ use std::sync::Arc;
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore}; use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{CreateAccumulatorSnafu, Error, InvalidFuncArgsSnafu}; use common_query::error::{CreateAccumulatorSnafu, Error, InvalidFuncArgsSnafu};
use common_query::logical_plan::{ use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
create_aggregate_function, Accumulator, AggregateFunctionCreator,
};
use common_query::prelude::AccumulatorCreatorFunction; use common_query::prelude::AccumulatorCreatorFunction;
use datafusion_expr::AggregateUDF;
use datatypes::prelude::{ConcreteDataType, Value, *}; use datatypes::prelude::{ConcreteDataType, Value, *};
use datatypes::vectors::VectorRef; use datatypes::vectors::VectorRef;
use nalgebra::{Const, DVectorView, Dyn, OVector}; use nalgebra::{Const, DVectorView, Dyn, OVector};
@@ -78,16 +75,6 @@ impl AggregateFunctionCreator for VectorProductCreator {
} }
impl VectorProduct { impl VectorProduct {
/// Create a new `AggregateUDF` for the `vec_product` aggregate function.
pub fn uadf_impl() -> AggregateUDF {
create_aggregate_function(
"vec_product".to_string(),
1,
Arc::new(VectorProductCreator::default()),
)
.into()
}
fn inner(&mut self, len: usize) -> &mut OVector<f32, Dyn> { fn inner(&mut self, len: usize) -> &mut OVector<f32, Dyn> {
self.product.get_or_insert_with(|| { self.product.get_or_insert_with(|| {
OVector::from_iterator_generic(Dyn(len), Const::<1>, (0..len).map(|_| 1.0)) OVector::from_iterator_generic(Dyn(len), Const::<1>, (0..len).map(|_| 1.0))

View File

@@ -16,11 +16,8 @@ use std::sync::Arc;
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore}; use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{CreateAccumulatorSnafu, Error, InvalidFuncArgsSnafu}; use common_query::error::{CreateAccumulatorSnafu, Error, InvalidFuncArgsSnafu};
use common_query::logical_plan::{ use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
create_aggregate_function, Accumulator, AggregateFunctionCreator,
};
use common_query::prelude::AccumulatorCreatorFunction; use common_query::prelude::AccumulatorCreatorFunction;
use datafusion_expr::AggregateUDF;
use datatypes::prelude::{ConcreteDataType, Value, *}; use datatypes::prelude::{ConcreteDataType, Value, *};
use datatypes::vectors::VectorRef; use datatypes::vectors::VectorRef;
use nalgebra::{Const, DVectorView, Dyn, OVector}; use nalgebra::{Const, DVectorView, Dyn, OVector};
@@ -28,7 +25,6 @@ use snafu::ensure;
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const, veclit_to_binlit}; use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const, veclit_to_binlit};
/// The accumulator for the `vec_sum` aggregate function.
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct VectorSum { pub struct VectorSum {
sum: Option<OVector<f32, Dyn>>, sum: Option<OVector<f32, Dyn>>,
@@ -78,16 +74,6 @@ impl AggregateFunctionCreator for VectorSumCreator {
} }
impl VectorSum { impl VectorSum {
/// Create a new `AggregateUDF` for the `vec_sum` aggregate function.
pub fn uadf_impl() -> AggregateUDF {
create_aggregate_function(
"vec_sum".to_string(),
1,
Arc::new(VectorSumCreator::default()),
)
.into()
}
fn inner(&mut self, len: usize) -> &mut OVector<f32, Dyn> { fn inner(&mut self, len: usize) -> &mut OVector<f32, Dyn> {
self.sum self.sum
.get_or_insert_with(|| OVector::zeros_generic(Dyn(len), Const::<1>)) .get_or_insert_with(|| OVector::zeros_generic(Dyn(len), Const::<1>))

View File

@@ -148,17 +148,6 @@ impl FunctionState {
) -> Result<api::v1::flow::FlowResponse> { ) -> Result<api::v1::flow::FlowResponse> {
todo!() todo!()
} }
async fn adjust(
&self,
_catalog: &str,
_flow: &str,
_min_run_interval_secs: u64,
_max_filter_num_per_query: usize,
_ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse> {
todo!()
}
} }
Self { Self {

View File

@@ -36,13 +36,13 @@ pub(crate) struct SystemFunction;
impl SystemFunction { impl SystemFunction {
pub fn register(registry: &FunctionRegistry) { pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(BuildFunction); registry.register(Arc::new(BuildFunction));
registry.register_scalar(VersionFunction); registry.register(Arc::new(VersionFunction));
registry.register_scalar(CurrentSchemaFunction); registry.register(Arc::new(CurrentSchemaFunction));
registry.register_scalar(DatabaseFunction); registry.register(Arc::new(DatabaseFunction));
registry.register_scalar(SessionUserFunction); registry.register(Arc::new(SessionUserFunction));
registry.register_scalar(ReadPreferenceFunction); registry.register(Arc::new(ReadPreferenceFunction));
registry.register_scalar(TimezoneFunction); registry.register(Arc::new(TimezoneFunction));
registry.register_async(Arc::new(ProcedureStateFunction)); registry.register_async(Arc::new(ProcedureStateFunction));
PGCatalogFunction::register(registry); PGCatalogFunction::register(registry);
} }

View File

@@ -16,6 +16,8 @@ mod pg_get_userbyid;
mod table_is_visible; mod table_is_visible;
mod version; mod version;
use std::sync::Arc;
use pg_get_userbyid::PGGetUserByIdFunction; use pg_get_userbyid::PGGetUserByIdFunction;
use table_is_visible::PGTableIsVisibleFunction; use table_is_visible::PGTableIsVisibleFunction;
use version::PGVersionFunction; use version::PGVersionFunction;
@@ -33,8 +35,8 @@ pub(super) struct PGCatalogFunction;
impl PGCatalogFunction { impl PGCatalogFunction {
pub fn register(registry: &FunctionRegistry) { pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(PGTableIsVisibleFunction); registry.register(Arc::new(PGTableIsVisibleFunction));
registry.register_scalar(PGGetUserByIdFunction); registry.register(Arc::new(PGGetUserByIdFunction));
registry.register_scalar(PGVersionFunction); registry.register(Arc::new(PGVersionFunction));
} }
} }

View File

@@ -10,7 +10,6 @@ workspace = true
[dependencies] [dependencies]
api.workspace = true api.workspace = true
arrow-flight.workspace = true arrow-flight.workspace = true
bytes.workspace = true
common-base.workspace = true common-base.workspace = true
common-error.workspace = true common-error.workspace = true
common-macro.workspace = true common-macro.workspace = true

View File

@@ -1,146 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use arrow_flight::FlightData;
use bytes::Bytes;
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
use common_recordbatch::DfRecordBatch;
use criterion::{criterion_group, criterion_main, Criterion};
use datatypes::arrow;
use datatypes::arrow::array::{ArrayRef, Int64Array, StringArray, TimestampMillisecondArray};
use datatypes::arrow::datatypes::DataType;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use prost::Message;
fn schema() -> arrow::datatypes::SchemaRef {
let schema = Schema::new(vec![
ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), false),
ColumnSchema::new("v1", ConcreteDataType::int64_datatype(), false),
]);
schema.arrow_schema().clone()
}
/// Generate record batch according to provided schema and num rows.
fn prepare_random_record_batch(
schema: arrow::datatypes::SchemaRef,
num_rows: usize,
) -> DfRecordBatch {
let tag_candidates = (0..10000).map(|i| i.to_string()).collect::<Vec<_>>();
let columns: Vec<ArrayRef> = schema
.fields
.iter()
.map(|col| match col.data_type() {
DataType::Utf8 => {
let array = StringArray::from(
(0..num_rows)
.map(|_| {
let idx: usize = rand::random_range(0..10000);
format!("tag-{}", tag_candidates[idx])
})
.collect::<Vec<_>>(),
);
Arc::new(array) as ArrayRef
}
DataType::Timestamp(_, _) => {
let now = common_time::util::current_time_millis();
let array = TimestampMillisecondArray::from(
(0..num_rows).map(|i| now + i as i64).collect::<Vec<_>>(),
);
Arc::new(array) as ArrayRef
}
DataType::Int64 => {
let array = Int64Array::from((0..num_rows).map(|i| i as i64).collect::<Vec<_>>());
Arc::new(array) as ArrayRef
}
_ => unreachable!(),
})
.collect();
DfRecordBatch::try_new(schema, columns).unwrap()
}
fn prepare_flight_data(num_rows: usize) -> (FlightData, FlightData) {
let schema = schema();
let mut encoder = FlightEncoder::default();
let schema_data = encoder.encode(FlightMessage::Schema(schema.clone()));
let rb = prepare_random_record_batch(schema, num_rows);
let rb_data = encoder.encode(FlightMessage::RecordBatch(rb));
(schema_data, rb_data)
}
fn decode_flight_data_from_protobuf(schema: &Bytes, payload: &Bytes) -> DfRecordBatch {
let schema = FlightData::decode(&schema[..]).unwrap();
let payload = FlightData::decode(&payload[..]).unwrap();
let mut decoder = FlightDecoder::default();
let _schema = decoder.try_decode(&schema).unwrap();
let message = decoder.try_decode(&payload).unwrap();
let FlightMessage::RecordBatch(batch) = message else {
unreachable!("unexpected message");
};
batch
}
fn decode_flight_data_from_header_and_body(
schema: &Bytes,
data_header: &Bytes,
data_body: &Bytes,
) -> DfRecordBatch {
let mut decoder = FlightDecoder::try_from_schema_bytes(schema).unwrap();
decoder
.try_decode_record_batch(data_header, data_body)
.unwrap()
}
fn bench_decode_flight_data(c: &mut Criterion) {
let row_counts = [100000, 200000, 1000000];
for row_count in row_counts {
let (schema, payload) = prepare_flight_data(row_count);
// arguments for decode_flight_data_from_protobuf
let schema_bytes = Bytes::from(schema.encode_to_vec());
let payload_bytes = Bytes::from(payload.encode_to_vec());
let mut group = c.benchmark_group(format!("flight_decoder_{}_rows", row_count));
group.bench_function("decode_from_protobuf", |b| {
b.iter(|| decode_flight_data_from_protobuf(&schema_bytes, &payload_bytes));
});
group.bench_function("decode_from_header_and_body", |b| {
b.iter(|| {
decode_flight_data_from_header_and_body(
&schema.data_header,
&payload.data_header,
&payload.data_body,
)
});
});
group.finish();
}
}
criterion_group!(benches, bench_decode_flight_data);
criterion_main!(benches);

View File

@@ -14,10 +14,8 @@
use criterion::criterion_main; use criterion::criterion_main;
mod bench_flight_decoder;
mod channel_manager; mod channel_manager;
criterion_main! { criterion_main! {
channel_manager::benches, channel_manager::benches
bench_flight_decoder::benches
} }

View File

@@ -18,7 +18,6 @@ use std::io;
use common_error::ext::ErrorExt; use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode; use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug; use common_macro::stack_trace_debug;
use datatypes::arrow::error::ArrowError;
use snafu::{Location, Snafu}; use snafu::{Location, Snafu};
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
@@ -60,6 +59,13 @@ pub enum Error {
location: Location, location: Location,
}, },
#[snafu(display("Failed to create RecordBatch"))]
CreateRecordBatch {
#[snafu(implicit)]
location: Location,
source: common_recordbatch::error::Error,
},
#[snafu(display("Failed to convert Arrow type: {}", from))] #[snafu(display("Failed to convert Arrow type: {}", from))]
Conversion { Conversion {
from: String, from: String,
@@ -82,6 +88,13 @@ pub enum Error {
location: Location, location: Location,
}, },
#[snafu(display("Failed to convert Arrow Schema"))]
ConvertArrowSchema {
#[snafu(implicit)]
location: Location,
source: datatypes::error::Error,
},
#[snafu(display("Not supported: {}", feat))] #[snafu(display("Not supported: {}", feat))]
NotSupported { feat: String }, NotSupported { feat: String },
@@ -92,14 +105,6 @@ pub enum Error {
#[snafu(implicit)] #[snafu(implicit)]
location: Location, location: Location,
}, },
#[snafu(display("Failed arrow operation"))]
Arrow {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: ArrowError,
},
} }
impl ErrorExt for Error { impl ErrorExt for Error {
@@ -116,7 +121,8 @@ impl ErrorExt for Error {
| Error::DecodeFlightData { .. } | Error::DecodeFlightData { .. }
| Error::SerdeJson { .. } => StatusCode::Internal, | Error::SerdeJson { .. } => StatusCode::Internal,
Error::Arrow { .. } => StatusCode::Internal, Error::CreateRecordBatch { source, .. } => source.status_code(),
Error::ConvertArrowSchema { source, .. } => source.status_code(),
} }
} }

View File

@@ -21,24 +21,25 @@ use api::v1::{AffectedRows, FlightMetadata, Metrics};
use arrow_flight::utils::flight_data_to_arrow_batch; use arrow_flight::utils::flight_data_to_arrow_batch;
use arrow_flight::{FlightData, SchemaAsIpc}; use arrow_flight::{FlightData, SchemaAsIpc};
use common_base::bytes::Bytes; use common_base::bytes::Bytes;
use common_recordbatch::DfRecordBatch; use common_recordbatch::{RecordBatch, RecordBatches};
use datatypes::arrow; use datatypes::arrow;
use datatypes::arrow::buffer::Buffer; use datatypes::arrow::datatypes::Schema as ArrowSchema;
use datatypes::arrow::datatypes::{Schema as ArrowSchema, SchemaRef}; use datatypes::arrow::ipc::{root_as_message, writer, MessageHeader};
use datatypes::arrow::error::ArrowError; use datatypes::schema::{Schema, SchemaRef};
use datatypes::arrow::ipc::{convert, reader, root_as_message, writer, MessageHeader};
use flatbuffers::FlatBufferBuilder; use flatbuffers::FlatBufferBuilder;
use prost::bytes::Bytes as ProstBytes; use prost::bytes::Bytes as ProstBytes;
use prost::Message; use prost::Message;
use snafu::{OptionExt, ResultExt}; use snafu::{OptionExt, ResultExt};
use crate::error; use crate::error::{
use crate::error::{DecodeFlightDataSnafu, InvalidFlightDataSnafu, Result}; ConvertArrowSchemaSnafu, CreateRecordBatchSnafu, DecodeFlightDataSnafu, InvalidFlightDataSnafu,
Result,
};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum FlightMessage { pub enum FlightMessage {
Schema(SchemaRef), Schema(SchemaRef),
RecordBatch(DfRecordBatch), Recordbatch(RecordBatch),
AffectedRows(usize), AffectedRows(usize),
Metrics(String), Metrics(String),
} }
@@ -64,27 +65,16 @@ impl Default for FlightEncoder {
} }
impl FlightEncoder { impl FlightEncoder {
/// Creates new [FlightEncoder] with compression disabled.
pub fn with_compression_disabled() -> Self {
let write_options = writer::IpcWriteOptions::default()
.try_with_compression(None)
.unwrap();
Self {
write_options,
data_gen: writer::IpcDataGenerator::default(),
dictionary_tracker: writer::DictionaryTracker::new(false),
}
}
pub fn encode(&mut self, flight_message: FlightMessage) -> FlightData { pub fn encode(&mut self, flight_message: FlightMessage) -> FlightData {
match flight_message { match flight_message {
FlightMessage::Schema(schema) => SchemaAsIpc::new(&schema, &self.write_options).into(), FlightMessage::Schema(schema) => {
FlightMessage::RecordBatch(record_batch) => { SchemaAsIpc::new(schema.arrow_schema(), &self.write_options).into()
}
FlightMessage::Recordbatch(recordbatch) => {
let (encoded_dictionaries, encoded_batch) = self let (encoded_dictionaries, encoded_batch) = self
.data_gen .data_gen
.encoded_batch( .encoded_batch(
&record_batch, recordbatch.df_record_batch(),
&mut self.dictionary_tracker, &mut self.dictionary_tracker,
&self.write_options, &self.write_options,
) )
@@ -134,58 +124,9 @@ impl FlightEncoder {
#[derive(Default)] #[derive(Default)]
pub struct FlightDecoder { pub struct FlightDecoder {
schema: Option<SchemaRef>, schema: Option<SchemaRef>,
schema_bytes: Option<bytes::Bytes>,
} }
impl FlightDecoder { impl FlightDecoder {
/// Build a [FlightDecoder] instance from provided schema bytes.
pub fn try_from_schema_bytes(schema_bytes: &bytes::Bytes) -> Result<Self> {
let arrow_schema = convert::try_schema_from_flatbuffer_bytes(&schema_bytes[..])
.context(error::ArrowSnafu)?;
Ok(Self {
schema: Some(Arc::new(arrow_schema)),
schema_bytes: Some(schema_bytes.clone()),
})
}
pub fn try_decode_record_batch(
&mut self,
data_header: &bytes::Bytes,
data_body: &bytes::Bytes,
) -> Result<DfRecordBatch> {
let schema = self
.schema
.as_ref()
.context(InvalidFlightDataSnafu {
reason: "Should have decoded schema first!",
})?
.clone();
let message = root_as_message(&data_header[..])
.map_err(|err| {
ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
})
.context(error::ArrowSnafu)?;
let result = message
.header_as_record_batch()
.ok_or_else(|| {
ArrowError::ParseError(
"Unable to convert flight data header to a record batch".to_string(),
)
})
.and_then(|batch| {
reader::read_record_batch(
&Buffer::from(data_body.as_ref()),
batch,
schema,
&HashMap::new(),
None,
&message.version(),
)
})
.context(error::ArrowSnafu)?;
Ok(result)
}
pub fn try_decode(&mut self, flight_data: &FlightData) -> Result<FlightMessage> { pub fn try_decode(&mut self, flight_data: &FlightData) -> Result<FlightMessage> {
let message = root_as_message(&flight_data.data_header).map_err(|e| { let message = root_as_message(&flight_data.data_header).map_err(|e| {
InvalidFlightDataSnafu { InvalidFlightDataSnafu {
@@ -211,29 +152,36 @@ impl FlightDecoder {
.fail() .fail()
} }
MessageHeader::Schema => { MessageHeader::Schema => {
let arrow_schema = Arc::new(ArrowSchema::try_from(flight_data).map_err(|e| { let arrow_schema = ArrowSchema::try_from(flight_data).map_err(|e| {
InvalidFlightDataSnafu { InvalidFlightDataSnafu {
reason: e.to_string(), reason: e.to_string(),
} }
.build() .build()
})?); })?;
self.schema = Some(arrow_schema.clone()); let schema =
self.schema_bytes = Some(flight_data.data_header.clone()); Arc::new(Schema::try_from(arrow_schema).context(ConvertArrowSchemaSnafu)?);
Ok(FlightMessage::Schema(arrow_schema))
self.schema = Some(schema.clone());
Ok(FlightMessage::Schema(schema))
} }
MessageHeader::RecordBatch => { MessageHeader::RecordBatch => {
let schema = self.schema.clone().context(InvalidFlightDataSnafu { let schema = self.schema.clone().context(InvalidFlightDataSnafu {
reason: "Should have decoded schema first!", reason: "Should have decoded schema first!",
})?; })?;
let arrow_schema = schema.arrow_schema().clone();
let arrow_batch = let arrow_batch =
flight_data_to_arrow_batch(flight_data, schema.clone(), &HashMap::new()) flight_data_to_arrow_batch(flight_data, arrow_schema, &HashMap::new())
.map_err(|e| { .map_err(|e| {
InvalidFlightDataSnafu { InvalidFlightDataSnafu {
reason: e.to_string(), reason: e.to_string(),
} }
.build() .build()
})?; })?;
Ok(FlightMessage::RecordBatch(arrow_batch)) let recordbatch = RecordBatch::try_from_df_record_batch(schema, arrow_batch)
.context(CreateRecordBatchSnafu)?;
Ok(FlightMessage::Recordbatch(recordbatch))
} }
other => { other => {
let name = other.variant_name().unwrap_or("UNKNOWN"); let name = other.variant_name().unwrap_or("UNKNOWN");
@@ -248,22 +196,16 @@ impl FlightDecoder {
pub fn schema(&self) -> Option<&SchemaRef> { pub fn schema(&self) -> Option<&SchemaRef> {
self.schema.as_ref() self.schema.as_ref()
} }
pub fn schema_bytes(&self) -> Option<bytes::Bytes> {
self.schema_bytes.clone()
}
} }
pub fn flight_messages_to_recordbatches( pub fn flight_messages_to_recordbatches(messages: Vec<FlightMessage>) -> Result<RecordBatches> {
messages: Vec<FlightMessage>,
) -> Result<Vec<DfRecordBatch>> {
if messages.is_empty() { if messages.is_empty() {
Ok(vec![]) Ok(RecordBatches::empty())
} else { } else {
let mut recordbatches = Vec::with_capacity(messages.len() - 1); let mut recordbatches = Vec::with_capacity(messages.len() - 1);
match &messages[0] { let schema = match &messages[0] {
FlightMessage::Schema(_schema) => {} FlightMessage::Schema(schema) => schema.clone(),
_ => { _ => {
return InvalidFlightDataSnafu { return InvalidFlightDataSnafu {
reason: "First Flight Message must be schema!", reason: "First Flight Message must be schema!",
@@ -274,7 +216,7 @@ pub fn flight_messages_to_recordbatches(
for message in messages.into_iter().skip(1) { for message in messages.into_iter().skip(1) {
match message { match message {
FlightMessage::RecordBatch(recordbatch) => recordbatches.push(recordbatch), FlightMessage::Recordbatch(recordbatch) => recordbatches.push(recordbatch),
_ => { _ => {
return InvalidFlightDataSnafu { return InvalidFlightDataSnafu {
reason: "Expect the following Flight Messages are all Recordbatches!", reason: "Expect the following Flight Messages are all Recordbatches!",
@@ -284,7 +226,7 @@ pub fn flight_messages_to_recordbatches(
} }
} }
Ok(recordbatches) RecordBatches::try_new(schema, recordbatches).context(CreateRecordBatchSnafu)
} }
} }
@@ -305,33 +247,38 @@ fn build_none_flight_msg() -> Bytes {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use arrow_flight::utils::batches_to_flight_data; use arrow_flight::utils::batches_to_flight_data;
use datatypes::arrow::array::Int32Array; use datatypes::arrow::datatypes::{DataType, Field};
use datatypes::arrow::datatypes::{DataType, Field, Schema}; use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::vectors::Int32Vector;
use super::*; use super::*;
use crate::Error; use crate::Error;
#[test] #[test]
fn test_try_decode() { fn test_try_decode() {
let schema = Arc::new(ArrowSchema::new(vec![Field::new( let arrow_schema = ArrowSchema::new(vec![Field::new("n", DataType::Int32, true)]);
"n", let schema = Arc::new(Schema::try_from(arrow_schema.clone()).unwrap());
DataType::Int32,
true,
)]));
let batch1 = DfRecordBatch::try_new( let batch1 = RecordBatch::new(
schema.clone(), schema.clone(),
vec![Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])) as _], vec![Arc::new(Int32Vector::from(vec![Some(1), None, Some(3)])) as _],
) )
.unwrap(); .unwrap();
let batch2 = DfRecordBatch::try_new( let batch2 = RecordBatch::new(
schema.clone(), schema.clone(),
vec![Arc::new(Int32Array::from(vec![None, Some(5)])) as _], vec![Arc::new(Int32Vector::from(vec![None, Some(5)])) as _],
) )
.unwrap(); .unwrap();
let flight_data = let flight_data = batches_to_flight_data(
batches_to_flight_data(&schema, vec![batch1.clone(), batch2.clone()]).unwrap(); &arrow_schema,
vec![
batch1.clone().into_df_record_batch(),
batch2.clone().into_df_record_batch(),
],
)
.unwrap();
assert_eq!(flight_data.len(), 3); assert_eq!(flight_data.len(), 3);
let [d1, d2, d3] = flight_data.as_slice() else { let [d1, d2, d3] = flight_data.as_slice() else {
unreachable!() unreachable!()
@@ -357,15 +304,15 @@ mod test {
let _ = decoder.schema.as_ref().unwrap(); let _ = decoder.schema.as_ref().unwrap();
let message = decoder.try_decode(d2).unwrap(); let message = decoder.try_decode(d2).unwrap();
assert!(matches!(message, FlightMessage::RecordBatch(_))); assert!(matches!(message, FlightMessage::Recordbatch(_)));
let FlightMessage::RecordBatch(actual_batch) = message else { let FlightMessage::Recordbatch(actual_batch) = message else {
unreachable!() unreachable!()
}; };
assert_eq!(actual_batch, batch1); assert_eq!(actual_batch, batch1);
let message = decoder.try_decode(d3).unwrap(); let message = decoder.try_decode(d3).unwrap();
assert!(matches!(message, FlightMessage::RecordBatch(_))); assert!(matches!(message, FlightMessage::Recordbatch(_)));
let FlightMessage::RecordBatch(actual_batch) = message else { let FlightMessage::Recordbatch(actual_batch) = message else {
unreachable!() unreachable!()
}; };
assert_eq!(actual_batch, batch2); assert_eq!(actual_batch, batch2);
@@ -373,22 +320,27 @@ mod test {
#[test] #[test]
fn test_flight_messages_to_recordbatches() { fn test_flight_messages_to_recordbatches() {
let schema = Arc::new(Schema::new(vec![Field::new("m", DataType::Int32, true)])); let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
let batch1 = DfRecordBatch::try_new( "m",
ConcreteDataType::int32_datatype(),
true,
)]));
let batch1 = RecordBatch::new(
schema.clone(), schema.clone(),
vec![Arc::new(Int32Array::from(vec![Some(2), None, Some(4)])) as _], vec![Arc::new(Int32Vector::from(vec![Some(2), None, Some(4)])) as _],
) )
.unwrap(); .unwrap();
let batch2 = DfRecordBatch::try_new( let batch2 = RecordBatch::new(
schema.clone(), schema.clone(),
vec![Arc::new(Int32Array::from(vec![None, Some(6)])) as _], vec![Arc::new(Int32Vector::from(vec![None, Some(6)])) as _],
) )
.unwrap(); .unwrap();
let recordbatches = vec![batch1.clone(), batch2.clone()]; let recordbatches =
RecordBatches::try_new(schema.clone(), vec![batch1.clone(), batch2.clone()]).unwrap();
let m1 = FlightMessage::Schema(schema); let m1 = FlightMessage::Schema(schema);
let m2 = FlightMessage::RecordBatch(batch1); let m2 = FlightMessage::Recordbatch(batch1);
let m3 = FlightMessage::RecordBatch(batch2); let m3 = FlightMessage::Recordbatch(batch2);
let result = flight_messages_to_recordbatches(vec![m2.clone(), m1.clone(), m3.clone()]); let result = flight_messages_to_recordbatches(vec![m2.clone(), m1.clone(), m3.clone()]);
assert!(matches!(result, Err(Error::InvalidFlightData { .. }))); assert!(matches!(result, Err(Error::InvalidFlightData { .. })));

View File

@@ -8,7 +8,6 @@ license.workspace = true
testing = [] testing = []
pg_kvbackend = ["dep:tokio-postgres", "dep:backon", "dep:deadpool-postgres", "dep:deadpool"] pg_kvbackend = ["dep:tokio-postgres", "dep:backon", "dep:deadpool-postgres", "dep:deadpool"]
mysql_kvbackend = ["dep:sqlx", "dep:backon"] mysql_kvbackend = ["dep:sqlx", "dep:backon"]
enterprise = []
[lints] [lints]
workspace = true workspace = true
@@ -43,7 +42,6 @@ deadpool = { workspace = true, optional = true }
deadpool-postgres = { workspace = true, optional = true } deadpool-postgres = { workspace = true, optional = true }
derive_builder.workspace = true derive_builder.workspace = true
etcd-client.workspace = true etcd-client.workspace = true
flexbuffers = "25.2"
futures.workspace = true futures.workspace = true
futures-util.workspace = true futures-util.workspace = true
hex.workspace = true hex.workspace = true
@@ -51,7 +49,6 @@ humantime-serde.workspace = true
itertools.workspace = true itertools.workspace = true
lazy_static.workspace = true lazy_static.workspace = true
moka.workspace = true moka.workspace = true
object-store.workspace = true
prometheus.workspace = true prometheus.workspace = true
prost.workspace = true prost.workspace = true
rand.workspace = true rand.workspace = true
@@ -74,7 +71,6 @@ typetag.workspace = true
[dev-dependencies] [dev-dependencies]
chrono.workspace = true chrono.workspace = true
common-procedure = { workspace = true, features = ["testing"] } common-procedure = { workspace = true, features = ["testing"] }
common-test-util.workspace = true
common-wal = { workspace = true, features = ["testing"] } common-wal = { workspace = true, features = ["testing"] }
datatypes.workspace = true datatypes.workspace = true
hyper = { version = "0.14", features = ["full"] } hyper = { version = "0.14", features = ["full"] }

View File

@@ -16,12 +16,9 @@ use std::sync::Arc;
use crate::error::Result; use crate::error::Result;
use crate::flow_name::FlowName; use crate::flow_name::FlowName;
use crate::instruction::{CacheIdent, DropFlow}; use crate::instruction::CacheIdent;
use crate::key::flow::flow_info::FlowInfoKey; use crate::key::flow::flow_info::FlowInfoKey;
use crate::key::flow::flow_name::FlowNameKey; use crate::key::flow::flow_name::FlowNameKey;
use crate::key::flow::flow_route::FlowRouteKey;
use crate::key::flow::flownode_flow::FlownodeFlowKey;
use crate::key::flow::table_flow::TableFlowKey;
use crate::key::schema_name::SchemaNameKey; use crate::key::schema_name::SchemaNameKey;
use crate::key::table_info::TableInfoKey; use crate::key::table_info::TableInfoKey;
use crate::key::table_name::TableNameKey; use crate::key::table_name::TableNameKey;
@@ -92,40 +89,9 @@ where
let key: SchemaNameKey = schema_name.into(); let key: SchemaNameKey = schema_name.into();
self.invalidate_key(&key.to_bytes()).await; self.invalidate_key(&key.to_bytes()).await;
} }
CacheIdent::CreateFlow(_) => { CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_) => {
// Do nothing // Do nothing
} }
CacheIdent::DropFlow(DropFlow {
flow_id,
source_table_ids,
flow_part2node_id,
}) => {
// invalidate flow route/flownode flow/table flow
let mut keys = Vec::with_capacity(
source_table_ids.len() * flow_part2node_id.len()
+ flow_part2node_id.len() * 2,
);
for table_id in source_table_ids {
for (partition_id, node_id) in flow_part2node_id {
let key =
TableFlowKey::new(*table_id, *node_id, *flow_id, *partition_id)
.to_bytes();
keys.push(key);
}
}
for (partition_id, node_id) in flow_part2node_id {
let key =
FlownodeFlowKey::new(*node_id, *flow_id, *partition_id).to_bytes();
keys.push(key);
let key = FlowRouteKey::new(*flow_id, *partition_id).to_bytes();
keys.push(key);
}
for key in keys {
self.invalidate_key(&key).await;
}
}
CacheIdent::FlowName(FlowName { CacheIdent::FlowName(FlowName {
catalog_name, catalog_name,
flow_name, flow_name,

View File

@@ -21,7 +21,7 @@ use snafu::{ensure, ResultExt};
use strum::AsRefStr; use strum::AsRefStr;
use crate::cache_invalidator::Context; use crate::cache_invalidator::Context;
use crate::ddl::utils::map_to_procedure_error; use crate::ddl::utils::handle_retry_error;
use crate::ddl::DdlContext; use crate::ddl::DdlContext;
use crate::error::{Result, SchemaNotFoundSnafu}; use crate::error::{Result, SchemaNotFoundSnafu};
use crate::instruction::CacheIdent; use crate::instruction::CacheIdent;
@@ -148,7 +148,7 @@ impl Procedure for AlterDatabaseProcedure {
AlterDatabaseState::UpdateMetadata => self.on_update_metadata().await, AlterDatabaseState::UpdateMetadata => self.on_update_metadata().await,
AlterDatabaseState::InvalidateSchemaCache => self.on_invalidate_schema_cache().await, AlterDatabaseState::InvalidateSchemaCache => self.on_invalidate_schema_cache().await,
} }
.map_err(map_to_procedure_error) .map_err(handle_retry_error)
} }
fn dump(&self) -> ProcedureResult<String> { fn dump(&self) -> ProcedureResult<String> {

View File

@@ -32,12 +32,9 @@ use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
use strum::AsRefStr; use strum::AsRefStr;
use table::metadata::TableId; use table::metadata::TableId;
use crate::ddl::utils::{ use crate::ddl::utils::{add_peer_context_if_needed, sync_follower_regions};
add_peer_context_if_needed, map_to_procedure_error, sync_follower_regions,
};
use crate::ddl::DdlContext; use crate::ddl::DdlContext;
use crate::error::{DecodeJsonSnafu, MetadataCorruptionSnafu, Result}; use crate::error::{DecodeJsonSnafu, Error, MetadataCorruptionSnafu, Result};
use crate::instruction::CacheIdent;
use crate::key::table_info::TableInfoValue; use crate::key::table_info::TableInfoValue;
use crate::key::table_route::PhysicalTableRouteValue; use crate::key::table_route::PhysicalTableRouteValue;
use crate::key::DeserializedValueWithBytes; use crate::key::DeserializedValueWithBytes;
@@ -69,7 +66,6 @@ impl AlterLogicalTablesProcedure {
physical_table_info: None, physical_table_info: None,
physical_table_route: None, physical_table_route: None,
physical_columns: vec![], physical_columns: vec![],
table_cache_keys_to_invalidate: vec![],
}, },
} }
} }
@@ -199,19 +195,16 @@ impl AlterLogicalTablesProcedure {
self.update_physical_table_metadata().await?; self.update_physical_table_metadata().await?;
self.update_logical_tables_metadata().await?; self.update_logical_tables_metadata().await?;
self.data.build_cache_keys_to_invalidate();
self.data.clear_metadata_fields();
self.data.state = AlterTablesState::InvalidateTableCache; self.data.state = AlterTablesState::InvalidateTableCache;
Ok(Status::executing(true)) Ok(Status::executing(true))
} }
pub(crate) async fn on_invalidate_table_cache(&mut self) -> Result<Status> { pub(crate) async fn on_invalidate_table_cache(&mut self) -> Result<Status> {
let to_invalidate = &self.data.table_cache_keys_to_invalidate; let to_invalidate = self.build_table_cache_keys_to_invalidate();
self.context self.context
.cache_invalidator .cache_invalidator
.invalidate(&Default::default(), to_invalidate) .invalidate(&Default::default(), &to_invalidate)
.await?; .await?;
Ok(Status::done()) Ok(Status::done())
} }
@@ -224,6 +217,14 @@ impl Procedure for AlterLogicalTablesProcedure {
} }
async fn execute(&mut self, _ctx: &Context) -> ProcedureResult<Status> { async fn execute(&mut self, _ctx: &Context) -> ProcedureResult<Status> {
let error_handler = |e: Error| {
if e.is_retry_later() {
common_procedure::Error::retry_later(e)
} else {
common_procedure::Error::external(e)
}
};
let state = &self.data.state; let state = &self.data.state;
let step = state.as_ref(); let step = state.as_ref();
@@ -240,7 +241,7 @@ impl Procedure for AlterLogicalTablesProcedure {
AlterTablesState::UpdateMetadata => self.on_update_metadata().await, AlterTablesState::UpdateMetadata => self.on_update_metadata().await,
AlterTablesState::InvalidateTableCache => self.on_invalidate_table_cache().await, AlterTablesState::InvalidateTableCache => self.on_invalidate_table_cache().await,
} }
.map_err(map_to_procedure_error) .map_err(error_handler)
} }
fn dump(&self) -> ProcedureResult<String> { fn dump(&self) -> ProcedureResult<String> {
@@ -279,20 +280,6 @@ pub struct AlterTablesData {
physical_table_info: Option<DeserializedValueWithBytes<TableInfoValue>>, physical_table_info: Option<DeserializedValueWithBytes<TableInfoValue>>,
physical_table_route: Option<PhysicalTableRouteValue>, physical_table_route: Option<PhysicalTableRouteValue>,
physical_columns: Vec<ColumnMetadata>, physical_columns: Vec<ColumnMetadata>,
table_cache_keys_to_invalidate: Vec<CacheIdent>,
}
impl AlterTablesData {
/// Clears all data fields except `state` and `table_cache_keys_to_invalidate` after metadata update.
/// This is done to avoid persisting unnecessary data after the update metadata step.
fn clear_metadata_fields(&mut self) {
self.tasks.clear();
self.table_info_values.clear();
self.physical_table_id = 0;
self.physical_table_info = None;
self.physical_table_route = None;
self.physical_columns.clear();
}
} }
#[derive(Debug, Serialize, Deserialize, AsRefStr)] #[derive(Debug, Serialize, Deserialize, AsRefStr)]

Some files were not shown because too many files have changed in this diff Show More