Compare commits

..

3 Commits

Author SHA1 Message Date
discord9
2f8e8be042 test: build a failing example with LogicalPlanBuilder
Signed-off-by: discord9 <discord9@163.com>
2025-08-12 15:19:47 +08:00
discord9
9d30459a58 test: reproduce the panic, still no clue why
Signed-off-by: discord9 <discord9@163.com>
2025-08-11 19:37:48 +08:00
discord9
f1650a78f7 fix?: optimize projection after join
Signed-off-by: discord9 <discord9@163.com>
2025-08-07 19:55:32 +08:00
1438 changed files with 25913 additions and 58938 deletions

View File

@@ -2,7 +2,7 @@
linker = "aarch64-linux-gnu-gcc"
[alias]
sqlness = "run --bin sqlness-runner --target-dir target/sqlness --"
sqlness = "run --bin sqlness-runner --"
[unstable.git]
shallow_index = true

View File

@@ -12,7 +12,7 @@ runs:
steps:
- name: Install Etcd cluster
shell: bash
run: |
run: |
helm upgrade \
--install etcd oci://registry-1.docker.io/bitnamicharts/etcd \
--set replicaCount=${{ inputs.etcd-replicas }} \
@@ -24,9 +24,4 @@ runs:
--set auth.rbac.token.enabled=false \
--set persistence.size=2Gi \
--create-namespace \
--set global.security.allowInsecureImages=true \
--set image.registry=docker.io \
--set image.repository=greptime/etcd \
--set image.tag=3.6.1-debian-12-r3 \
--version 12.0.8 \
-n ${{ inputs.namespace }}

View File

@@ -1,8 +1,3 @@
logging:
level: "info"
format: "json"
filters:
- log_store=debug
meta:
configData: |-
[runtime]

View File

@@ -12,7 +12,7 @@ runs:
steps:
- name: Install Kafka cluster
shell: bash
run: |
run: |
helm upgrade \
--install kafka oci://registry-1.docker.io/bitnamicharts/kafka \
--set controller.replicaCount=${{ inputs.controller-replicas }} \
@@ -23,8 +23,4 @@ runs:
--set listeners.controller.protocol=PLAINTEXT \
--set listeners.client.protocol=PLAINTEXT \
--create-namespace \
--set image.registry=docker.io \
--set image.repository=greptime/kafka \
--set image.tag=3.9.0-debian-12-r1 \
--version 31.0.0 \
-n ${{ inputs.namespace }}

View File

@@ -6,7 +6,9 @@ inputs:
description: "Number of PostgreSQL replicas"
namespace:
default: "postgres-namespace"
description: "The PostgreSQL namespace"
postgres-version:
default: "14.2"
description: "PostgreSQL version"
storage-size:
default: "1Gi"
description: "Storage size for PostgreSQL"
@@ -20,11 +22,7 @@ runs:
helm upgrade \
--install postgresql oci://registry-1.docker.io/bitnamicharts/postgresql \
--set replicaCount=${{ inputs.postgres-replicas }} \
--set global.security.allowInsecureImages=true \
--set image.registry=docker.io \
--set image.repository=greptime/postgresql \
--set image.tag=17.5.0-debian-12-r3 \
--version 16.7.4 \
--set image.tag=${{ inputs.postgres-version }} \
--set persistence.size=${{ inputs.storage-size }} \
--set postgresql.username=greptimedb \
--set postgresql.password=admin \

View File

@@ -35,8 +35,8 @@ HIGHER_VERSION=$(printf "%s\n%s" "$CLEAN_CURRENT" "$CLEAN_LATEST" | sort -V | ta
if [ "$HIGHER_VERSION" = "$CLEAN_CURRENT" ]; then
echo "Current version ($CLEAN_CURRENT) is NEWER than or EQUAL to latest ($CLEAN_LATEST)"
echo "is-current-version-latest=true" >> $GITHUB_OUTPUT
echo "should-push-latest-tag=true" >> $GITHUB_OUTPUT
else
echo "Current version ($CLEAN_CURRENT) is OLDER than latest ($CLEAN_LATEST)"
echo "is-current-version-latest=false" >> $GITHUB_OUTPUT
echo "should-push-latest-tag=false" >> $GITHUB_OUTPUT
fi

View File

@@ -3,14 +3,12 @@
set -e
set -o pipefail
KUBERNETES_VERSION="${KUBERNETES_VERSION:-v1.32.0}"
KUBERNETES_VERSION="${KUBERNETES_VERSION:-v1.24.0}"
ENABLE_STANDALONE_MODE="${ENABLE_STANDALONE_MODE:-true}"
DEFAULT_INSTALL_NAMESPACE=${DEFAULT_INSTALL_NAMESPACE:-default}
GREPTIMEDB_IMAGE_TAG=${GREPTIMEDB_IMAGE_TAG:-latest}
GREPTIME_CHART="https://greptimeteam.github.io/helm-charts/"
ETCD_CHART="oci://registry-1.docker.io/bitnamicharts/etcd"
ETCD_CHART_VERSION="${ETCD_CHART_VERSION:-12.0.8}"
ETCD_IMAGE_TAG="${ETCD_IMAGE_TAG:-3.6.1-debian-12-r3}"
GREPTIME_CHART="https://greptimeteam.github.io/helm-charts/"
# Create a cluster with 1 control-plane node and 5 workers.
function create_kind_cluster() {
@@ -37,16 +35,10 @@ function add_greptime_chart() {
function deploy_etcd_cluster() {
local namespace="$1"
helm upgrade --install etcd "$ETCD_CHART" \
--version "$ETCD_CHART_VERSION" \
--create-namespace \
helm install etcd "$ETCD_CHART" \
--set replicaCount=3 \
--set auth.rbac.create=false \
--set auth.rbac.token.enabled=false \
--set global.security.allowInsecureImages=true \
--set image.registry=docker.io \
--set image.repository=greptime/etcd \
--set image.tag="$ETCD_IMAGE_TAG" \
-n "$namespace"
# Wait for etcd cluster to be ready.
@@ -56,8 +48,7 @@ function deploy_etcd_cluster() {
# Deploy greptimedb-operator.
function deploy_greptimedb_operator() {
# Use the latest chart and image.
helm upgrade --install greptimedb-operator greptime/greptimedb-operator \
--create-namespace \
helm install greptimedb-operator greptime/greptimedb-operator \
--set image.tag=latest \
-n "$DEFAULT_INSTALL_NAMESPACE"
@@ -75,11 +66,9 @@ function deploy_greptimedb_cluster() {
deploy_etcd_cluster "$install_namespace"
helm upgrade --install "$cluster_name" greptime/greptimedb-cluster \
--create-namespace \
helm install "$cluster_name" greptime/greptimedb-cluster \
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
--set meta.backendStorage.etcd.storeKeyPrefix="$cluster_name" \
-n "$install_namespace"
# Wait for greptimedb cluster to be ready.
@@ -112,17 +101,15 @@ function deploy_greptimedb_cluster_with_s3_storage() {
deploy_etcd_cluster "$install_namespace"
helm upgrade --install "$cluster_name" greptime/greptimedb-cluster -n "$install_namespace" \
--create-namespace \
helm install "$cluster_name" greptime/greptimedb-cluster -n "$install_namespace" \
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
--set meta.backendStorage.etcd.storeKeyPrefix="$cluster_name" \
--set objectStorage.s3.bucket="$AWS_CI_TEST_BUCKET" \
--set objectStorage.s3.region="$AWS_REGION" \
--set objectStorage.s3.root="$DATA_ROOT" \
--set objectStorage.credentials.secretName=s3-credentials \
--set objectStorage.credentials.accessKeyId="$AWS_ACCESS_KEY_ID" \
--set objectStorage.credentials.secretAccessKey="$AWS_SECRET_ACCESS_KEY"
--set storage.s3.bucket="$AWS_CI_TEST_BUCKET" \
--set storage.s3.region="$AWS_REGION" \
--set storage.s3.root="$DATA_ROOT" \
--set storage.credentials.secretName=s3-credentials \
--set storage.credentials.accessKeyId="$AWS_ACCESS_KEY_ID" \
--set storage.credentials.secretAccessKey="$AWS_SECRET_ACCESS_KEY"
# Wait for greptimedb cluster to be ready.
while true; do
@@ -147,8 +134,7 @@ function deploy_greptimedb_cluster_with_s3_storage() {
# Deploy standalone greptimedb.
# It will expose cluster service ports as '34000', '34001', '34002', '34003' to local access.
function deploy_standalone_greptimedb() {
helm upgrade --install greptimedb-standalone greptime/greptimedb-standalone \
--create-namespace \
helm install greptimedb-standalone greptime/greptimedb-standalone \
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
-n "$DEFAULT_INSTALL_NAMESPACE"

View File

@@ -1,34 +0,0 @@
#!/bin/bash
# This script is used to pull the test dependency images that are stored in public ECR one by one to avoid rate limiting.
set -e
MAX_RETRIES=3
IMAGES=(
"greptime/zookeeper:3.7"
"greptime/kafka:3.9.0-debian-12-r1"
"greptime/etcd:3.6.1-debian-12-r3"
"greptime/minio:2024"
"greptime/mysql:5.7"
)
for image in "${IMAGES[@]}"; do
for ((attempt=1; attempt<=MAX_RETRIES; attempt++)); do
if docker pull "$image"; then
# Successfully pulled the image.
break
else
# Use some simple exponential backoff to avoid rate limiting.
if [ $attempt -lt $MAX_RETRIES ]; then
sleep_seconds=$((attempt * 5))
echo "Attempt $attempt failed for $image, waiting $sleep_seconds seconds"
sleep $sleep_seconds # 5s, 10s delays
else
echo "Failed to pull $image after $MAX_RETRIES attempts"
exit 1
fi
fi
done
done

View File

@@ -21,7 +21,7 @@ update_dev_builder_version() {
# Commit the changes.
git add Makefile
git commit -s -m "ci: update dev-builder image tag"
git commit -m "ci: update dev-builder image tag"
git push origin $BRANCH_NAME
# Create a Pull Request.

View File

@@ -12,7 +12,6 @@ on:
- 'docker/**'
- '.gitignore'
- 'grafana/**'
- 'Makefile'
workflow_dispatch:
name: CI
@@ -618,12 +617,10 @@ jobs:
- uses: actions/checkout@v4
with:
persist-credentials: false
- if: matrix.mode.kafka
name: Setup kafka server
working-directory: tests-integration/fixtures
run: ../../.github/scripts/pull-test-deps-images.sh && docker compose up -d --wait kafka
run: docker compose up -d --wait kafka
- name: Download pre-built binaries
uses: actions/download-artifact@v4
with:
@@ -685,30 +682,6 @@ jobs:
- name: Run cargo clippy
run: make clippy
check-udeps:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Check Unused Dependencies
runs-on: ubuntu-latest
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
with:
persist-credentials: false
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions-rust-lang/setup-rust-toolchain@v1
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
shared-key: "check-udeps"
cache-all-crates: "true"
save-if: ${{ github.ref == 'refs/heads/main' }}
- name: Install cargo-udeps
run: cargo install cargo-udeps --locked
- name: Check unused dependencies
run: make check-udeps
conflict-check:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Check for conflict
@@ -724,7 +697,7 @@ jobs:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' && github.event_name != 'merge_group' }}
runs-on: ubuntu-22.04-arm
timeout-minutes: 60
needs: [conflict-check, clippy, fmt, check-udeps]
needs: [conflict-check, clippy, fmt]
steps:
- uses: actions/checkout@v4
with:
@@ -736,7 +709,7 @@ jobs:
- name: Install toolchain
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
cache: false
cache: false
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
@@ -746,11 +719,9 @@ jobs:
save-if: ${{ github.ref == 'refs/heads/main' }}
- name: Install latest nextest release
uses: taiki-e/install-action@nextest
- name: Setup external services
working-directory: tests-integration/fixtures
run: ../../.github/scripts/pull-test-deps-images.sh && docker compose up -d --wait
run: docker compose up -d --wait
- name: Run nextest cases
run: cargo nextest run --workspace -F dashboard -F pg_kvbackend -F mysql_kvbackend
env:
@@ -767,11 +738,8 @@ jobs:
GT_MINIO_ACCESS_KEY: superpower_password
GT_MINIO_REGION: us-west-2
GT_MINIO_ENDPOINT_URL: http://127.0.0.1:9000
GT_ETCD_TLS_ENDPOINTS: https://127.0.0.1:2378
GT_ETCD_ENDPOINTS: http://127.0.0.1:2379
GT_POSTGRES_ENDPOINTS: postgres://greptimedb:admin@127.0.0.1:5432/postgres
GT_POSTGRES15_ENDPOINTS: postgres://test_user:test_password@127.0.0.1:5433/postgres
GT_POSTGRES15_SCHEMA: test_schema
GT_MYSQL_ENDPOINTS: mysql://greptimedb:admin@127.0.0.1:3306/mysql
GT_KAFKA_ENDPOINTS: 127.0.0.1:9092
GT_KAFKA_SASL_ENDPOINTS: 127.0.0.1:9093
@@ -804,11 +772,9 @@ jobs:
uses: taiki-e/install-action@nextest
- name: Install cargo-llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov
- name: Setup external services
working-directory: tests-integration/fixtures
run: ../../.github/scripts/pull-test-deps-images.sh && docker compose up -d --wait
run: docker compose up -d --wait
- name: Run nextest cases
run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F dashboard -F pg_kvbackend -F mysql_kvbackend
env:
@@ -824,11 +790,8 @@ jobs:
GT_MINIO_ACCESS_KEY: superpower_password
GT_MINIO_REGION: us-west-2
GT_MINIO_ENDPOINT_URL: http://127.0.0.1:9000
GT_ETCD_TLS_ENDPOINTS: https://127.0.0.1:2378
GT_ETCD_ENDPOINTS: http://127.0.0.1:2379
GT_POSTGRES_ENDPOINTS: postgres://greptimedb:admin@127.0.0.1:5432/postgres
GT_POSTGRES15_ENDPOINTS: postgres://test_user:test_password@127.0.0.1:5433/postgres
GT_POSTGRES15_SCHEMA: test_schema
GT_MYSQL_ENDPOINTS: mysql://greptimedb:admin@127.0.0.1:3306/mysql
GT_KAFKA_ENDPOINTS: 127.0.0.1:9092
GT_KAFKA_SASL_ENDPOINTS: 127.0.0.1:9093

View File

@@ -10,7 +10,6 @@ on:
- 'docker/**'
- '.gitignore'
- 'grafana/**'
- 'Makefile'
push:
branches:
- main
@@ -22,7 +21,6 @@ on:
- 'docker/**'
- '.gitignore'
- 'grafana/**'
- 'Makefile'
workflow_dispatch:
name: CI
@@ -67,12 +65,6 @@ jobs:
steps:
- run: 'echo "No action required"'
check-udeps:
name: Unused Dependencies
runs-on: ubuntu-latest
steps:
- run: 'echo "No action required"'
coverage:
runs-on: ubuntu-latest
steps:

View File

@@ -111,8 +111,7 @@ jobs:
# The 'version' use as the global tag name of the release workflow.
version: ${{ steps.create-version.outputs.version }}
# The 'is-current-version-latest' determines whether to update 'latest' Docker tags and downstream repositories.
is-current-version-latest: ${{ steps.check-version.outputs.is-current-version-latest }}
should-push-latest-tag: ${{ steps.check-version.outputs.should-push-latest-tag }}
steps:
- name: Checkout
uses: actions/checkout@v4
@@ -322,7 +321,7 @@ jobs:
image-registry-username: ${{ secrets.DOCKERHUB_USERNAME }}
image-registry-password: ${{ secrets.DOCKERHUB_TOKEN }}
version: ${{ needs.allocate-runners.outputs.version }}
push-latest-tag: ${{ needs.allocate-runners.outputs.is-current-version-latest == 'true' && github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
push-latest-tag: ${{ needs.allocate-runners.outputs.should-push-latest-tag == 'true' && github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
- name: Set build image result
id: set-build-image-result
@@ -369,7 +368,7 @@ jobs:
dev-mode: false
upload-to-s3: true
update-version-info: true
push-latest-tag: ${{ needs.allocate-runners.outputs.is-current-version-latest == 'true' && github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
push-latest-tag: ${{ needs.allocate-runners.outputs.should-push-latest-tag == 'true' && github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
publish-github-release:
name: Create GitHub release and upload artifacts
@@ -477,7 +476,7 @@ jobs:
bump-helm-charts-version:
name: Bump helm charts version
if: ${{ github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' && needs.allocate-runners.outputs.is-current-version-latest == 'true' }}
if: ${{ github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
needs: [allocate-runners, publish-github-release]
runs-on: ubuntu-latest
permissions:
@@ -498,7 +497,7 @@ jobs:
bump-homebrew-greptime-version:
name: Bump homebrew greptime version
if: ${{ github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' && needs.allocate-runners.outputs.is-current-version-latest == 'true' }}
if: ${{ github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
needs: [allocate-runners, publish-github-release]
runs-on: ubuntu-latest
permissions:

3
.gitignore vendored
View File

@@ -52,9 +52,6 @@ venv/
tests-fuzz/artifacts/
tests-fuzz/corpus/
# cargo-udeps reports
udeps-report.json
# Nix
.direnv
.envrc

View File

@@ -55,18 +55,14 @@ GreptimeDB uses the [Apache 2.0 license](https://github.com/GreptimeTeam/greptim
- To ensure that community is free and confident in its ability to use your contributions, please sign the Contributor License Agreement (CLA) which will be incorporated in the pull request process.
- Make sure all files have proper license header (running `docker run --rm -v $(pwd):/github/workspace ghcr.io/korandoru/hawkeye-native:v3 format` from the project root).
- Make sure all your codes are formatted and follow the [coding style](https://pingcap.github.io/style-guide/rust/) and [style guide](docs/style-guide.md).
- Make sure all unit tests are passed using [nextest](https://nexte.st/index.html) `cargo nextest run --workspace --features pg_kvbackend,mysql_kvbackend` or `make test`.
- Make sure all clippy warnings are fixed (you can check it locally by running `cargo clippy --workspace --all-targets -- -D warnings` or `make clippy`).
- Ensure there are no unused dependencies by running `make check-udeps` (clean them up with `make fix-udeps` if reported).
- If you must keep a target-specific dependency (e.g. under `[target.'cfg(...)'.dev-dependencies]`), add a cargo-udeps ignore entry in the same `Cargo.toml`, for example:
`[package.metadata.cargo-udeps.ignore]` with `development = ["rexpect"]` (or `dependencies`/`build` as appropriate).
- When modifying sample configuration files in `config/`, run `make config-docs` (which requires Docker to be installed) to update the configuration documentation and include it in your commit.
- Make sure all unit tests are passed using [nextest](https://nexte.st/index.html) `cargo nextest run`.
- Make sure all clippy warnings are fixed (you can check it locally by running `cargo clippy --workspace --all-targets -- -D warnings`).
#### `pre-commit` Hooks
You could setup the [`pre-commit`](https://pre-commit.com/#plugins) hooks to run these checks on every commit automatically.
1. Install `pre-commit`
1. Install `pre-commit`
pip install pre-commit
@@ -74,7 +70,7 @@ You could setup the [`pre-commit`](https://pre-commit.com/#plugins) hooks to run
brew install pre-commit
2. Install the `pre-commit` hooks
2. Install the `pre-commit` hooks
$ pre-commit install
pre-commit installed at .git/hooks/pre-commit

5026
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -73,8 +73,8 @@ members = [
resolver = "2"
[workspace.package]
version = "0.18.0"
edition = "2024"
version = "0.17.0"
edition = "2021"
license = "Apache-2.0"
[workspace.lints]
@@ -98,12 +98,11 @@ rust.unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
# See for more detaiils: https://github.com/rust-lang/cargo/issues/11329
ahash = { version = "0.8", features = ["compile-time-rng"] }
aquamarine = "0.6"
arrow = { version = "56.0", features = ["prettyprint"] }
arrow-array = { version = "56.0", default-features = false, features = ["chrono-tz"] }
arrow-buffer = "56.0"
arrow-flight = "56.0"
arrow-ipc = { version = "56.0", default-features = false, features = ["lz4", "zstd"] }
arrow-schema = { version = "56.0", features = ["serde"] }
arrow = { version = "54.2", features = ["prettyprint"] }
arrow-array = { version = "54.2", default-features = false, features = ["chrono-tz"] }
arrow-flight = "54.2"
arrow-ipc = { version = "54.2", default-features = false, features = ["lz4", "zstd"] }
arrow-schema = { version = "54.2", features = ["serde"] }
async-stream = "0.3"
async-trait = "0.1"
# Remember to update axum-extra, axum-macros when updating axum
@@ -122,30 +121,26 @@ clap = { version = "4.4", features = ["derive"] }
config = "0.13.0"
crossbeam-utils = "0.8"
dashmap = "6.1"
datafusion = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7d5214512740b4dfb742b6b3d91ed9affcc2c9d0" }
datafusion-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7d5214512740b4dfb742b6b3d91ed9affcc2c9d0" }
datafusion-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7d5214512740b4dfb742b6b3d91ed9affcc2c9d0" }
datafusion-functions = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7d5214512740b4dfb742b6b3d91ed9affcc2c9d0" }
datafusion-functions-aggregate-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7d5214512740b4dfb742b6b3d91ed9affcc2c9d0" }
datafusion-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7d5214512740b4dfb742b6b3d91ed9affcc2c9d0" }
datafusion-orc = { git = "https://github.com/GreptimeTeam/datafusion-orc", rev = "a0a5f902158f153119316eaeec868cff3fc8a99d" }
datafusion-physical-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7d5214512740b4dfb742b6b3d91ed9affcc2c9d0" }
datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7d5214512740b4dfb742b6b3d91ed9affcc2c9d0" }
datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7d5214512740b4dfb742b6b3d91ed9affcc2c9d0" }
datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7d5214512740b4dfb742b6b3d91ed9affcc2c9d0" }
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-functions-aggregate-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
deadpool = "0.12"
deadpool-postgres = "0.14"
derive_builder = "0.20"
dotenv = "0.15"
either = "1.15"
etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62df834f0cffda355eba96691fe1a9a332b75a7", features = [
"tls",
"tls-roots",
] }
etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "f9836cf8aab30e672f640c6ef4c1cfd2cf9fbc36" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ccfd4da48bc0254ed865e479cd981a3581b02d84" }
hex = "0.4"
http = "1"
humantime = "2.1"
@@ -156,7 +151,7 @@ itertools = "0.14"
jsonb = { git = "https://github.com/databendlabs/jsonb.git", rev = "8c8d2fc294a39f3ff08909d60f718639cfba3875", default-features = false }
lazy_static = "1.4"
local-ip-address = "0.6"
loki-proto = { git = "https://github.com/GreptimeTeam/loki-proto.git", rev = "3b7cd33234358b18ece977bf689dc6fb760f29ab" }
loki-proto = { git = "https://github.com/GreptimeTeam/loki-proto.git", rev = "1434ecf23a2654025d86188fb5205e7a74b225d3" }
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "5618e779cf2bb4755b499c630fba4c35e91898cb" }
mockall = "0.13"
moka = "0.12"
@@ -164,9 +159,9 @@ nalgebra = "0.33"
nix = { version = "0.30.1", default-features = false, features = ["event", "fs", "process"] }
notify = "8.0"
num_cpus = "1.16"
object_store_opendal = "0.54"
object_store_opendal = "0.50"
once_cell = "1.18"
opentelemetry-proto = { version = "0.30", features = [
opentelemetry-proto = { version = "0.27", features = [
"gen-tonic",
"metrics",
"trace",
@@ -175,14 +170,13 @@ opentelemetry-proto = { version = "0.30", features = [
] }
ordered-float = { version = "4.3", features = ["serde"] }
parking_lot = "0.12"
parquet = { version = "56.0", default-features = false, features = ["arrow", "async", "object_store"] }
parquet = { version = "54.2", default-features = false, features = ["arrow", "async", "object_store"] }
paste = "1.0"
pin-project = "1.0"
pretty_assertions = "1.4.0"
prometheus = { version = "0.13.3", features = ["process"] }
promql-parser = { version = "0.6", features = ["ser"] }
prost = { version = "0.13", features = ["no-recursion-limit"] }
prost-types = "0.13"
raft-engine = { version = "0.4.1", default-features = false }
rand = "0.9"
ratelimit = "0.10"
@@ -194,7 +188,7 @@ reqwest = { version = "0.12", default-features = false, features = [
"stream",
"multipart",
] }
rskafka = { git = "https://github.com/WenyXu/rskafka.git", rev = "7b0f31ed39db049b4ee2e5f1e95b5a30be9baf76", features = [
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev = "8dbd01ed809f5a791833a594e85b144e36e45820", features = [
"transport-tls",
] }
rstest = "0.25"
@@ -207,14 +201,15 @@ sea-query = "0.32"
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = ["float_roundtrip"] }
serde_with = "3"
shadow-rs = "1.1"
simd-json = "0.15"
similar-asserts = "1.6.0"
smallvec = { version = "1", features = ["serde"] }
snafu = "0.8"
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "39e4fc94c3c741981f77e9d63b5ce8c02e0a27ea", features = [
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "df6fcca80ce903f5beef7002cd2c1b062e7024f8", features = [
"visitor",
"serde",
] } # branch = "v0.55.x"
] } # branch = "v0.54.x"
sqlx = { version = "0.8", features = [
"runtime-tokio-rustls",
"mysql",
@@ -224,20 +219,20 @@ sqlx = { version = "0.8", features = [
strum = { version = "0.27", features = ["derive"] }
sysinfo = "0.33"
tempfile = "3"
tokio = { version = "1.47", features = ["full"] }
tokio = { version = "1.40", features = ["full"] }
tokio-postgres = "0.7"
tokio-rustls = { version = "0.26.2", default-features = false }
tokio-stream = "0.1"
tokio-util = { version = "0.7", features = ["io-util", "compat"] }
toml = "0.8.8"
tonic = { version = "0.13", features = ["tls-ring", "gzip", "zstd"] }
tonic = { version = "0.12", features = ["tls", "gzip", "zstd"] }
tower = "0.5"
tower-http = "0.6"
tracing = "0.1"
tracing-appender = "0.2"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "fmt"] }
typetag = "0.2"
uuid = { version = "1.17", features = ["serde", "v4", "fast-rng"] }
uuid = { version = "1.7", features = ["serde", "v4", "fast-rng"] }
vrl = "0.25"
zstd = "0.13"
# DO_NOT_REMOVE_THIS: END_OF_EXTERNAL_DEPENDENCIES
@@ -296,7 +291,7 @@ mito-codec = { path = "src/mito-codec" }
mito2 = { path = "src/mito2" }
object-store = { path = "src/object-store" }
operator = { path = "src/operator" }
otel-arrow-rust = { git = "https://github.com/GreptimeTeam/otel-arrow", rev = "2d64b7c0fa95642028a8205b36fe9ea0b023ec59", features = [
otel-arrow-rust = { git = "https://github.com/open-telemetry/otel-arrow", rev = "5d551412d2a12e689cde4d84c14ef29e36784e51", features = [
"server",
] }
partition = { path = "src/partition" }

View File

@@ -8,7 +8,7 @@ CARGO_BUILD_OPTS := --locked
IMAGE_REGISTRY ?= docker.io
IMAGE_NAMESPACE ?= greptime
IMAGE_TAG ?= latest
DEV_BUILDER_IMAGE_TAG ?= 2025-05-19-f55023f3-20250829091211
DEV_BUILDER_IMAGE_TAG ?= 2025-05-19-b2377d4b-20250520045554
BUILDX_MULTI_PLATFORM_BUILD ?= false
BUILDX_BUILDER_NAME ?= gtbuilder
BASE_IMAGE ?= ubuntu
@@ -22,7 +22,7 @@ SQLNESS_OPTS ?=
ETCD_VERSION ?= v3.5.9
ETCD_IMAGE ?= quay.io/coreos/etcd:${ETCD_VERSION}
RETRY_COUNT ?= 3
NEXTEST_OPTS := --retries ${RETRY_COUNT} --features pg_kvbackend,mysql_kvbackend
NEXTEST_OPTS := --retries ${RETRY_COUNT}
BUILD_JOBS ?= $(shell which nproc 1>/dev/null && expr $$(nproc) / 2) # If nproc is not available, we don't set the build jobs.
ifeq ($(BUILD_JOBS), 0) # If the number of cores is less than 2, set the build jobs to 1.
BUILD_JOBS := 1
@@ -193,17 +193,6 @@ clippy: ## Check clippy rules.
fix-clippy: ## Fix clippy violations.
cargo clippy --workspace --all-targets --all-features --fix
.PHONY: check-udeps
check-udeps: ## Check unused dependencies.
cargo udeps --workspace --all-targets
.PHONY: fix-udeps
fix-udeps: ## Remove unused dependencies automatically.
@echo "Running cargo-udeps to find unused dependencies..."
@cargo udeps --workspace --all-targets --output json > udeps-report.json || true
@echo "Removing unused dependencies..."
@python3 scripts/fix-udeps.py udeps-report.json
.PHONY: fmt-check
fmt-check: ## Check code format.
cargo fmt --all -- --check

View File

@@ -41,7 +41,6 @@
| `mysql.addr` | String | `127.0.0.1:4002` | The addr to bind the MySQL server. |
| `mysql.runtime_size` | Integer | `2` | The number of server worker threads. |
| `mysql.keep_alive` | String | `0s` | Server-side keep-alive time.<br/>Set to 0 (default) to disable. |
| `mysql.prepared_stmt_cache_size` | Integer | `10000` | Maximum entries in the MySQL prepared statement cache; default is 10,000. |
| `mysql.tls` | -- | -- | -- |
| `mysql.tls.mode` | String | `disable` | TLS mode, refer to https://www.postgresql.org/docs/current/libpq-ssl.html<br/>- `disable` (default value)<br/>- `prefer`<br/>- `require`<br/>- `verify-ca`<br/>- `verify-full` |
| `mysql.tls.cert_path` | String | Unset | Certificate file path. |
@@ -148,7 +147,7 @@
| `region_engine.mito.write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. |
| `region_engine.mito.max_concurrent_scan_files` | Integer | `128` | Maximum number of SST files to scan concurrently. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.<br/>To align with the old behavior, the default value is 0 (no restrictions). |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
@@ -187,13 +186,12 @@
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4318/v1/traces` | The OTLP tracing endpoint. |
| `logging.otlp_endpoint` | String | `http://localhost:4318` | The OTLP tracing endpoint. |
| `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. |
| `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. |
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.otlp_export_protocol` | String | `http` | The OTLP tracing export protocol. Can be `grpc`/`http`. |
| `logging.otlp_headers` | -- | -- | Additional OTLP headers, only valid when using OTLP http |
| `logging.tracing_sample_ratio` | -- | Unset | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `slow_query` | -- | -- | The slow query log options. |
| `slow_query.enable` | Bool | `false` | Whether to enable slow query log. |
@@ -245,22 +243,11 @@
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |
| `grpc.tls.key_path` | String | Unset | Private key file path. |
| `grpc.tls.watch` | Bool | `false` | Watch for Certificate and key file change and auto reload.<br/>For now, gRPC tls config does not support auto reload. |
| `internal_grpc` | -- | -- | The internal gRPC server options. Internal gRPC port for nodes inside cluster to access frontend. |
| `internal_grpc.bind_addr` | String | `127.0.0.1:4010` | The address to bind the gRPC server. |
| `internal_grpc.server_addr` | String | `127.0.0.1:4010` | The address advertised to the metasrv, and used for connections from outside the host.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `grpc.bind_addr`. |
| `internal_grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `internal_grpc.flight_compression` | String | `arrow_ipc` | Compression mode for frontend side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression.<br/>Default to `none` |
| `internal_grpc.tls` | -- | -- | internal gRPC server TLS options, see `mysql.tls` section. |
| `internal_grpc.tls.mode` | String | `disable` | TLS mode. |
| `internal_grpc.tls.cert_path` | String | Unset | Certificate file path. |
| `internal_grpc.tls.key_path` | String | Unset | Private key file path. |
| `internal_grpc.tls.watch` | Bool | `false` | Watch for Certificate and key file change and auto reload.<br/>For now, gRPC tls config does not support auto reload. |
| `mysql` | -- | -- | MySQL server options. |
| `mysql.enable` | Bool | `true` | Whether to enable. |
| `mysql.addr` | String | `127.0.0.1:4002` | The addr to bind the MySQL server. |
| `mysql.runtime_size` | Integer | `2` | The number of server worker threads. |
| `mysql.keep_alive` | String | `0s` | Server-side keep-alive time.<br/>Set to 0 (default) to disable. |
| `mysql.prepared_stmt_cache_size` | Integer | `10000` | Maximum entries in the MySQL prepared statement cache; default is 10,000. |
| `mysql.tls` | -- | -- | -- |
| `mysql.tls.mode` | String | `disable` | TLS mode, refer to https://www.postgresql.org/docs/current/libpq-ssl.html<br/>- `disable` (default value)<br/>- `prefer`<br/>- `require`<br/>- `verify-ca`<br/>- `verify-full` |
| `mysql.tls.cert_path` | String | Unset | Certificate file path. |
@@ -306,20 +293,19 @@
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4318/v1/traces` | The OTLP tracing endpoint. |
| `logging.otlp_endpoint` | String | `http://localhost:4318` | The OTLP tracing endpoint. |
| `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. |
| `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. |
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.otlp_export_protocol` | String | `http` | The OTLP tracing export protocol. Can be `grpc`/`http`. |
| `logging.otlp_headers` | -- | -- | Additional OTLP headers, only valid when using OTLP http |
| `logging.tracing_sample_ratio` | -- | Unset | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `slow_query` | -- | -- | The slow query log options. |
| `slow_query.enable` | Bool | `true` | Whether to enable slow query log. |
| `slow_query.record_type` | String | `system_table` | The record type of slow queries. It can be `system_table` or `log`.<br/>If `system_table` is selected, the slow queries will be recorded in a system table `greptime_private.slow_queries`.<br/>If `log` is selected, the slow queries will be logged in a log file `greptimedb-slow-queries.*`. |
| `slow_query.threshold` | String | `30s` | The threshold of slow query. It can be human readable time string, for example: `10s`, `100ms`, `1s`. |
| `slow_query.sample_ratio` | Float | `1.0` | The sampling ratio of slow query log. The value should be in the range of (0, 1]. For example, `0.1` means 10% of the slow queries will be logged and `1.0` means all slow queries will be logged. |
| `slow_query.ttl` | String | `90d` | The TTL of the `slow_queries` system table. Default is `90d` when `record_type` is `system_table`. |
| `slow_query.ttl` | String | `30d` | The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`. |
| `export_metrics` | -- | -- | The frontend can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
@@ -330,8 +316,6 @@
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
| `memory` | -- | -- | The memory options. |
| `memory.enable_heap_profiling` | Bool | `true` | Whether to enable heap profiling activation during startup.<br/>When enabled, heap profiling will be activated if the `MALLOC_CONF` environment variable<br/>is set to "prof:true,prof_active:false". The official image adds this env variable.<br/>Default is true. |
| `event_recorder` | -- | -- | Configuration options for the event recorder. |
| `event_recorder.ttl` | String | `90d` | TTL for the events table that will be used to store the events. Default is `90d`. |
### Metasrv
@@ -343,7 +327,6 @@
| `store_key_prefix` | String | `""` | If it's not empty, the metasrv will store all data with this key prefix. |
| `backend` | String | `etcd_store` | The datastore for meta server.<br/>Available values:<br/>- `etcd_store` (default value)<br/>- `memory_store`<br/>- `postgres_store`<br/>- `mysql_store` |
| `meta_table_name` | String | `greptime_metakv` | Table name in RDS to store metadata. Effect when using a RDS kvbackend.<br/>**Only used when backend is `postgres_store`.** |
| `meta_schema_name` | String | `greptime_schema` | Optional PostgreSQL schema for metadata table and election table name qualification.<br/>When PostgreSQL public schema is not writable (e.g., PostgreSQL 15+ with restricted public),<br/>set this to a writable schema. GreptimeDB will use `meta_schema_name`.`meta_table_name`.<br/>GreptimeDB will NOT create the schema automatically; please ensure it exists or the user has permission.<br/>**Only used when backend is `postgres_store`.** |
| `meta_election_lock_id` | Integer | `1` | Advisory lock id in PostgreSQL for election. Effect when using PostgreSQL as kvbackend<br/>Only used when backend is `postgres_store`. |
| `selector` | String | `round_robin` | Datanode selector type.<br/>- `round_robin` (default value)<br/>- `lease_based`<br/>- `load_based`<br/>For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". |
| `use_memory_store` | Bool | `false` | Store data in memory. |
@@ -355,7 +338,7 @@
| `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
| `backend_tls` | -- | -- | TLS configuration for kv store backend (applicable for etcd, PostgreSQL, and MySQL backends)<br/>When using etcd, PostgreSQL, or MySQL as metadata store, you can configure TLS here |
| `backend_tls` | -- | -- | TLS configuration for kv store backend (only applicable for PostgreSQL/MySQL backends)<br/>When using PostgreSQL or MySQL as metadata store, you can configure TLS here |
| `backend_tls.mode` | String | `prefer` | TLS mode, refer to https://www.postgresql.org/docs/current/libpq-ssl.html<br/>- "disable" - No TLS<br/>- "prefer" (default) - Try TLS, fallback to plain<br/>- "require" - Require TLS<br/>- "verify_ca" - Require TLS and verify CA<br/>- "verify_full" - Require TLS and verify hostname |
| `backend_tls.cert_path` | String | `""` | Path to client certificate file (for client authentication)<br/>Like "/path/to/client.crt" |
| `backend_tls.key_path` | String | `""` | Path to client private key file (for client authentication)<br/>Like "/path/to/client.key" |
@@ -388,33 +371,28 @@
| `datanode.client.tcp_nodelay` | Bool | `true` | `TCP_NODELAY` option for accepted connections. |
| `wal` | -- | -- | -- |
| `wal.provider` | String | `raft_engine` | -- |
| `wal.broker_endpoints` | Array | -- | The broker endpoints of the Kafka cluster.<br/><br/>**It's only used when the provider is `kafka`**. |
| `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.<br/>Set to `true` to automatically create topics for WAL.<br/>Otherwise, use topics named `topic_name_prefix_[0..num_topics)`<br/>**It's only used when the provider is `kafka`**. |
| `wal.auto_prune_interval` | String | `30m` | Interval of automatically WAL pruning.<br/>Set to `0s` to disable automatically WAL pruning which delete unused remote WAL entries periodically.<br/>**It's only used when the provider is `kafka`**. |
| `wal.flush_trigger_size` | String | `512MB` | Estimated size threshold to trigger a flush when using Kafka remote WAL.<br/>Since multiple regions may share a Kafka topic, the estimated size is calculated as:<br/> (latest_entry_id - flushed_entry_id) * avg_record_size<br/>MetaSrv triggers a flush for a region when this estimated size exceeds `flush_trigger_size`.<br/>- `latest_entry_id`: The latest entry ID in the topic.<br/>- `flushed_entry_id`: The last flushed entry ID for the region.<br/>Set to "0" to let the system decide the flush trigger size.<br/>**It's only used when the provider is `kafka`**. |
| `wal.checkpoint_trigger_size` | String | `128MB` | Estimated size threshold to trigger a checkpoint when using Kafka remote WAL.<br/>The estimated size is calculated as:<br/> (latest_entry_id - last_checkpoint_entry_id) * avg_record_size<br/>MetaSrv triggers a checkpoint for a region when this estimated size exceeds `checkpoint_trigger_size`.<br/>Set to "0" to let the system decide the checkpoint trigger size.<br/>**It's only used when the provider is `kafka`**. |
| `wal.auto_prune_parallelism` | Integer | `10` | Concurrent task limit for automatically WAL pruning.<br/>**It's only used when the provider is `kafka`**. |
| `wal.num_topics` | Integer | `64` | Number of topics used for remote WAL.<br/>**It's only used when the provider is `kafka`**. |
| `wal.selector_type` | String | `round_robin` | Topic selector type.<br/>Available selector types:<br/>- `round_robin` (default)<br/>**It's only used when the provider is `kafka`**. |
| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.<br/>Only accepts strings that match the following regular expression pattern:<br/>[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*<br/>i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1.<br/>**It's only used when the provider is `kafka`**. |
| `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition.<br/>**It's only used when the provider is `kafka`**. |
| `wal.create_topic_timeout` | String | `30s` | The timeout for creating a Kafka topic.<br/>**It's only used when the provider is `kafka`**. |
| `wal.broker_endpoints` | Array | -- | The broker endpoints of the Kafka cluster. |
| `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.<br/>Set to `true` to automatically create topics for WAL.<br/>Otherwise, use topics named `topic_name_prefix_[0..num_topics)` |
| `wal.auto_prune_interval` | String | `0s` | Interval of automatically WAL pruning.<br/>Set to `0s` to disable automatically WAL pruning which delete unused remote WAL entries periodically. |
| `wal.trigger_flush_threshold` | Integer | `0` | The threshold to trigger a flush operation of a region in automatically WAL pruning.<br/>Metasrv will send a flush request to flush the region when:<br/>`trigger_flush_threshold` + `prunable_entry_id` < `max_prunable_entry_id`<br/>where:<br/>- `prunable_entry_id` is the maximum entry id that can be pruned of the region.<br/>- `max_prunable_entry_id` is the maximum prunable entry id among all regions in the same topic.<br/>Set to `0` to disable the flush operation. |
| `wal.auto_prune_parallelism` | Integer | `10` | Concurrent task limit for automatically WAL pruning. |
| `wal.num_topics` | Integer | `64` | Number of topics. |
| `wal.selector_type` | String | `round_robin` | Topic selector type.<br/>Available selector types:<br/>- `round_robin` (default) |
| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.<br/>Only accepts strings that match the following regular expression pattern:<br/>[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*<br/>i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1. |
| `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition. |
| `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled. |
| `event_recorder` | -- | -- | Configuration options for the event recorder. |
| `event_recorder.ttl` | String | `90d` | TTL for the events table that will be used to store the events. Default is `90d`. |
| `stats_persistence` | -- | -- | Configuration options for the stats persistence. |
| `stats_persistence.ttl` | String | `0s` | TTL for the stats table that will be used to store the stats.<br/>Set to `0s` to disable stats persistence.<br/>Default is `0s`.<br/>If you want to enable stats persistence, set the TTL to a value greater than 0.<br/>It is recommended to set a small value, e.g., `3h`. |
| `stats_persistence.interval` | String | `10m` | The interval to persist the stats. Default is `10m`.<br/>The minimum value is `10m`, if the value is less than `10m`, it will be overridden to `10m`. |
| `event_recorder.ttl` | String | `30d` | TTL for the events table that will be used to store the events. |
| `logging` | -- | -- | The logging options. |
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4318/v1/traces` | The OTLP tracing endpoint. |
| `logging.otlp_endpoint` | String | `http://localhost:4318` | The OTLP tracing endpoint. |
| `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. |
| `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. |
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.otlp_export_protocol` | String | `http` | The OTLP tracing export protocol. Can be `grpc`/`http`. |
| `logging.otlp_headers` | -- | -- | Additional OTLP headers, only valid when using OTLP http |
| `logging.tracing_sample_ratio` | -- | Unset | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `export_metrics` | -- | -- | The metasrv can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
@@ -521,8 +499,6 @@
| `region_engine.mito.worker_channel_size` | Integer | `128` | Request channel size of each worker. |
| `region_engine.mito.worker_request_batch_size` | Integer | `64` | Max batch size for a worker to handle requests. |
| `region_engine.mito.manifest_checkpoint_distance` | Integer | `10` | Number of meta action updated to trigger a new checkpoint for the manifest. |
| `region_engine.mito.experimental_manifest_keep_removed_file_count` | Integer | `256` | Number of removed files to keep in manifest's `removed_files` field before also<br/>remove them from `removed_files`. Mostly for debugging purpose.<br/>If set to 0, it will only use `keep_removed_file_ttl` to decide when to remove files<br/>from `removed_files` field. |
| `region_engine.mito.experimental_manifest_keep_removed_file_ttl` | String | `1h` | How long to keep removed files in the `removed_files` field of manifest<br/>after they are removed from manifest.<br/>files will only be removed from `removed_files` field<br/>if both `keep_removed_file_count` and `keep_removed_file_ttl` is reached. |
| `region_engine.mito.compress_manifest` | Bool | `false` | Whether to compress manifest and checkpoint file by gzip (default false). |
| `region_engine.mito.max_background_flushes` | Integer | Auto | Max number of running background flush jobs (default: 1/2 of cpu cores). |
| `region_engine.mito.max_background_compactions` | Integer | Auto | Max number of running background compaction jobs (default: 1/4 of cpu cores). |
@@ -540,7 +516,7 @@
| `region_engine.mito.write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. |
| `region_engine.mito.max_concurrent_scan_files` | Integer | `128` | Maximum number of SST files to scan concurrently. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.<br/>To align with the old behavior, the default value is 0 (no restrictions). |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
@@ -579,13 +555,12 @@
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4318/v1/traces` | The OTLP tracing endpoint. |
| `logging.otlp_endpoint` | String | `http://localhost:4318` | The OTLP tracing endpoint. |
| `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. |
| `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. |
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.otlp_export_protocol` | String | `http` | The OTLP tracing export protocol. Can be `grpc`/`http`. |
| `logging.otlp_headers` | -- | -- | Additional OTLP headers, only valid when using OTLP http |
| `logging.tracing_sample_ratio` | -- | Unset | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
@@ -616,12 +591,6 @@
| `flow.batching_mode.experimental_frontend_activity_timeout` | String | `60s` | Frontend activity timeout<br/>if frontend is down(not sending heartbeat) for more than frontend_activity_timeout,<br/>it will be removed from the list that flownode use to connect |
| `flow.batching_mode.experimental_max_filter_num_per_query` | Integer | `20` | Maximum number of filters allowed in a single query |
| `flow.batching_mode.experimental_time_window_merge_threshold` | Integer | `3` | Time window merge distance |
| `flow.batching_mode.read_preference` | String | `Leader` | Read preference of the Frontend client. |
| `flow.batching_mode.frontend_tls` | -- | -- | -- |
| `flow.batching_mode.frontend_tls.enabled` | Bool | `false` | Whether to enable TLS for client. |
| `flow.batching_mode.frontend_tls.server_ca_cert_path` | String | Unset | Server Certificate file path. |
| `flow.batching_mode.frontend_tls.client_cert_path` | String | Unset | Client Certificate file path. |
| `flow.batching_mode.frontend_tls.client_key_path` | String | Unset | Client Private key file path. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.bind_addr` | String | `127.0.0.1:6800` | The address to bind the gRPC server. |
| `grpc.server_addr` | String | `127.0.0.1:6800` | The address advertised to the metasrv,<br/>and used for connections from outside the host |
@@ -649,13 +618,12 @@
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4318/v1/traces` | The OTLP tracing endpoint. |
| `logging.otlp_endpoint` | String | `http://localhost:4318` | The OTLP tracing endpoint. |
| `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. |
| `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. |
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.otlp_export_protocol` | String | `http` | The OTLP tracing export protocol. Can be `grpc`/`http`. |
| `logging.otlp_headers` | -- | -- | Additional OTLP headers, only valid when using OTLP http |
| `logging.tracing_sample_ratio` | -- | Unset | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |

View File

@@ -409,19 +409,6 @@ worker_request_batch_size = 64
## Number of meta action updated to trigger a new checkpoint for the manifest.
manifest_checkpoint_distance = 10
## Number of removed files to keep in manifest's `removed_files` field before also
## remove them from `removed_files`. Mostly for debugging purpose.
## If set to 0, it will only use `keep_removed_file_ttl` to decide when to remove files
## from `removed_files` field.
experimental_manifest_keep_removed_file_count = 256
## How long to keep removed files in the `removed_files` field of manifest
## after they are removed from manifest.
## files will only be removed from `removed_files` field
## if both `keep_removed_file_count` and `keep_removed_file_ttl` is reached.
experimental_manifest_keep_removed_file_ttl = "1h"
## Whether to compress manifest and checkpoint file by gzip (default false).
compress_manifest = false
@@ -488,7 +475,7 @@ sst_write_buffer_size = "8MB"
parallel_scan_channel_size = 32
## Maximum number of SST files to scan concurrently.
max_concurrent_scan_files = 384
max_concurrent_scan_files = 128
## Whether to allow stale WAL entries read during replay.
allow_stale_entries = false
@@ -645,7 +632,7 @@ level = "info"
enable_otlp_tracing = false
## The OTLP tracing endpoint.
otlp_endpoint = "http://localhost:4318/v1/traces"
otlp_endpoint = "http://localhost:4318"
## Whether to append logs to stdout.
append_stdout = true
@@ -659,13 +646,6 @@ max_log_files = 720
## The OTLP tracing export protocol. Can be `grpc`/`http`.
otlp_export_protocol = "http"
## Additional OTLP headers, only valid when using OTLP http
[logging.otlp_headers]
## @toml2docs:none-default
#Authorization = "Bearer my-token"
## @toml2docs:none-default
#Database = "My database"
## The percentage of tracing will be sampled and exported.
## Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
## ratio > 1 are treated as 1. Fractions < 0 are treated as 0

View File

@@ -30,20 +30,6 @@ node_id = 14
#+experimental_max_filter_num_per_query=20
## Time window merge distance
#+experimental_time_window_merge_threshold=3
## Read preference of the Frontend client.
#+read_preference="Leader"
[flow.batching_mode.frontend_tls]
## Whether to enable TLS for client.
#+enabled=false
## Server Certificate file path.
## @toml2docs:none-default
#+server_ca_cert_path=""
## Client Certificate file path.
## @toml2docs:none-default
#+client_cert_path=""
## Client Private key file path.
## @toml2docs:none-default
#+client_key_path=""
## The gRPC server options.
[grpc]
@@ -120,7 +106,7 @@ level = "info"
enable_otlp_tracing = false
## The OTLP tracing endpoint.
otlp_endpoint = "http://localhost:4318/v1/traces"
otlp_endpoint = "http://localhost:4318"
## Whether to append logs to stdout.
append_stdout = true
@@ -134,13 +120,6 @@ max_log_files = 720
## The OTLP tracing export protocol. Can be `grpc`/`http`.
otlp_export_protocol = "http"
## Additional OTLP headers, only valid when using OTLP http
[logging.otlp_headers]
## @toml2docs:none-default
#Authorization = "Bearer my-token"
## @toml2docs:none-default
#Database = "My database"
## The percentage of tracing will be sampled and exported.
## Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
## ratio > 1 are treated as 1. Fractions < 0 are treated as 0

View File

@@ -79,42 +79,6 @@ key_path = ""
## For now, gRPC tls config does not support auto reload.
watch = false
## The internal gRPC server options. Internal gRPC port for nodes inside cluster to access frontend.
[internal_grpc]
## The address to bind the gRPC server.
bind_addr = "127.0.0.1:4010"
## The address advertised to the metasrv, and used for connections from outside the host.
## If left empty or unset, the server will automatically use the IP address of the first network interface
## on the host, with the same port number as the one specified in `grpc.bind_addr`.
server_addr = "127.0.0.1:4010"
## The number of server worker threads.
runtime_size = 8
## Compression mode for frontend side Arrow IPC service. Available options:
## - `none`: disable all compression
## - `transport`: only enable gRPC transport compression (zstd)
## - `arrow_ipc`: only enable Arrow IPC compression (lz4)
## - `all`: enable all compression.
## Default to `none`
flight_compression = "arrow_ipc"
## internal gRPC server TLS options, see `mysql.tls` section.
[internal_grpc.tls]
## TLS mode.
mode = "disable"
## Certificate file path.
## @toml2docs:none-default
cert_path = ""
## Private key file path.
## @toml2docs:none-default
key_path = ""
## Watch for Certificate and key file change and auto reload.
## For now, gRPC tls config does not support auto reload.
watch = false
## MySQL server options.
[mysql]
## Whether to enable.
@@ -126,8 +90,6 @@ runtime_size = 2
## Server-side keep-alive time.
## Set to 0 (default) to disable.
keep_alive = "0s"
## Maximum entries in the MySQL prepared statement cache; default is 10,000.
prepared_stmt_cache_size = 10000
# MySQL server TLS options.
[mysql.tls]
@@ -259,7 +221,7 @@ level = "info"
enable_otlp_tracing = false
## The OTLP tracing endpoint.
otlp_endpoint = "http://localhost:4318/v1/traces"
otlp_endpoint = "http://localhost:4318"
## Whether to append logs to stdout.
append_stdout = true
@@ -273,13 +235,6 @@ max_log_files = 720
## The OTLP tracing export protocol. Can be `grpc`/`http`.
otlp_export_protocol = "http"
## Additional OTLP headers, only valid when using OTLP http
[logging.otlp_headers]
## @toml2docs:none-default
#Authorization = "Bearer my-token"
## @toml2docs:none-default
#Database = "My database"
## The percentage of tracing will be sampled and exported.
## Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
## ratio > 1 are treated as 1. Fractions < 0 are treated as 0
@@ -302,8 +257,8 @@ threshold = "30s"
## The sampling ratio of slow query log. The value should be in the range of (0, 1]. For example, `0.1` means 10% of the slow queries will be logged and `1.0` means all slow queries will be logged.
sample_ratio = 1.0
## The TTL of the `slow_queries` system table. Default is `90d` when `record_type` is `system_table`.
ttl = "90d"
## The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`.
ttl = "30d"
## The frontend can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
@@ -333,8 +288,3 @@ headers = { }
## is set to "prof:true,prof_active:false". The official image adds this env variable.
## Default is true.
enable_heap_profiling = true
## Configuration options for the event recorder.
[event_recorder]
## TTL for the events table that will be used to store the events. Default is `90d`.
ttl = "90d"

View File

@@ -23,14 +23,6 @@ backend = "etcd_store"
## **Only used when backend is `postgres_store`.**
meta_table_name = "greptime_metakv"
## Optional PostgreSQL schema for metadata table and election table name qualification.
## When PostgreSQL public schema is not writable (e.g., PostgreSQL 15+ with restricted public),
## set this to a writable schema. GreptimeDB will use `meta_schema_name`.`meta_table_name`.
## GreptimeDB will NOT create the schema automatically; please ensure it exists or the user has permission.
## **Only used when backend is `postgres_store`.**
meta_schema_name = "greptime_schema"
## Advisory lock id in PostgreSQL for election. Effect when using PostgreSQL as kvbackend
## Only used when backend is `postgres_store`.
meta_election_lock_id = 1
@@ -73,8 +65,8 @@ node_max_idle_time = "24hours"
## The number of threads to execute the runtime for global write operations.
#+ compact_rt_size = 4
## TLS configuration for kv store backend (applicable for etcd, PostgreSQL, and MySQL backends)
## When using etcd, PostgreSQL, or MySQL as metadata store, you can configure TLS here
## TLS configuration for kv store backend (only applicable for PostgreSQL/MySQL backends)
## When using PostgreSQL or MySQL as metadata store, you can configure TLS here
[backend_tls]
## TLS mode, refer to https://www.postgresql.org/docs/current/libpq-ssl.html
## - "disable" - No TLS
@@ -184,69 +176,50 @@ tcp_nodelay = true
# - `kafka`: metasrv **have to be** configured with kafka wal config when using kafka wal provider in datanode.
provider = "raft_engine"
# Kafka wal config.
## The broker endpoints of the Kafka cluster.
##
## **It's only used when the provider is `kafka`**.
broker_endpoints = ["127.0.0.1:9092"]
## Automatically create topics for WAL.
## Set to `true` to automatically create topics for WAL.
## Otherwise, use topics named `topic_name_prefix_[0..num_topics)`
## **It's only used when the provider is `kafka`**.
auto_create_topics = true
## Interval of automatically WAL pruning.
## Set to `0s` to disable automatically WAL pruning which delete unused remote WAL entries periodically.
## **It's only used when the provider is `kafka`**.
auto_prune_interval = "30m"
auto_prune_interval = "0s"
## Estimated size threshold to trigger a flush when using Kafka remote WAL.
## Since multiple regions may share a Kafka topic, the estimated size is calculated as:
## (latest_entry_id - flushed_entry_id) * avg_record_size
## MetaSrv triggers a flush for a region when this estimated size exceeds `flush_trigger_size`.
## - `latest_entry_id`: The latest entry ID in the topic.
## - `flushed_entry_id`: The last flushed entry ID for the region.
## Set to "0" to let the system decide the flush trigger size.
## **It's only used when the provider is `kafka`**.
flush_trigger_size = "512MB"
## Estimated size threshold to trigger a checkpoint when using Kafka remote WAL.
## The estimated size is calculated as:
## (latest_entry_id - last_checkpoint_entry_id) * avg_record_size
## MetaSrv triggers a checkpoint for a region when this estimated size exceeds `checkpoint_trigger_size`.
## Set to "0" to let the system decide the checkpoint trigger size.
## **It's only used when the provider is `kafka`**.
checkpoint_trigger_size = "128MB"
## The threshold to trigger a flush operation of a region in automatically WAL pruning.
## Metasrv will send a flush request to flush the region when:
## `trigger_flush_threshold` + `prunable_entry_id` < `max_prunable_entry_id`
## where:
## - `prunable_entry_id` is the maximum entry id that can be pruned of the region.
## - `max_prunable_entry_id` is the maximum prunable entry id among all regions in the same topic.
## Set to `0` to disable the flush operation.
trigger_flush_threshold = 0
## Concurrent task limit for automatically WAL pruning.
## **It's only used when the provider is `kafka`**.
auto_prune_parallelism = 10
## Number of topics used for remote WAL.
## **It's only used when the provider is `kafka`**.
## Number of topics.
num_topics = 64
## Topic selector type.
## Available selector types:
## - `round_robin` (default)
## **It's only used when the provider is `kafka`**.
selector_type = "round_robin"
## A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
## Only accepts strings that match the following regular expression pattern:
## [a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*
## i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1.
## **It's only used when the provider is `kafka`**.
topic_name_prefix = "greptimedb_wal_topic"
## Expected number of replicas of each partition.
## **It's only used when the provider is `kafka`**.
replication_factor = 1
## The timeout for creating a Kafka topic.
## **It's only used when the provider is `kafka`**.
## Above which a topic creation operation will be cancelled.
create_topic_timeout = "30s"
# The Kafka SASL configuration.
@@ -269,20 +242,8 @@ create_topic_timeout = "30s"
## Configuration options for the event recorder.
[event_recorder]
## TTL for the events table that will be used to store the events. Default is `90d`.
ttl = "90d"
## Configuration options for the stats persistence.
[stats_persistence]
## TTL for the stats table that will be used to store the stats.
## Set to `0s` to disable stats persistence.
## Default is `0s`.
## If you want to enable stats persistence, set the TTL to a value greater than 0.
## It is recommended to set a small value, e.g., `3h`.
ttl = "0s"
## The interval to persist the stats. Default is `10m`.
## The minimum value is `10m`, if the value is less than `10m`, it will be overridden to `10m`.
interval = "10m"
## TTL for the events table that will be used to store the events.
ttl = "30d"
## The logging options.
[logging]
@@ -297,7 +258,7 @@ level = "info"
enable_otlp_tracing = false
## The OTLP tracing endpoint.
otlp_endpoint = "http://localhost:4318/v1/traces"
otlp_endpoint = "http://localhost:4318"
## Whether to append logs to stdout.
append_stdout = true
@@ -311,14 +272,6 @@ max_log_files = 720
## The OTLP tracing export protocol. Can be `grpc`/`http`.
otlp_export_protocol = "http"
## Additional OTLP headers, only valid when using OTLP http
[logging.otlp_headers]
## @toml2docs:none-default
#Authorization = "Bearer my-token"
## @toml2docs:none-default
#Database = "My database"
## The percentage of tracing will be sampled and exported.
## Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
## ratio > 1 are treated as 1. Fractions < 0 are treated as 0

View File

@@ -85,8 +85,7 @@ runtime_size = 2
## Server-side keep-alive time.
## Set to 0 (default) to disable.
keep_alive = "0s"
## Maximum entries in the MySQL prepared statement cache; default is 10,000.
prepared_stmt_cache_size= 10000
# MySQL server TLS options.
[mysql.tls]
@@ -567,7 +566,7 @@ sst_write_buffer_size = "8MB"
parallel_scan_channel_size = 32
## Maximum number of SST files to scan concurrently.
max_concurrent_scan_files = 384
max_concurrent_scan_files = 128
## Whether to allow stale WAL entries read during replay.
allow_stale_entries = false
@@ -724,7 +723,7 @@ level = "info"
enable_otlp_tracing = false
## The OTLP tracing endpoint.
otlp_endpoint = "http://localhost:4318/v1/traces"
otlp_endpoint = "http://localhost:4318"
## Whether to append logs to stdout.
append_stdout = true
@@ -738,13 +737,6 @@ max_log_files = 720
## The OTLP tracing export protocol. Can be `grpc`/`http`.
otlp_export_protocol = "http"
## Additional OTLP headers, only valid when using OTLP http
[logging.otlp_headers]
## @toml2docs:none-default
#Authorization = "Bearer my-token"
## @toml2docs:none-default
#Database = "My database"
## The percentage of tracing will be sampled and exported.
## Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
## ratio > 1 are treated as 1. Fractions < 0 are treated as 0

View File

@@ -13,8 +13,7 @@ RUN apt-get update && apt-get install -y \
git \
unzip \
build-essential \
pkg-config \
openssh-client
pkg-config
# Install protoc
ARG PROTOBUF_VERSION=29.3

View File

@@ -19,7 +19,7 @@ ARG PROTOBUF_VERSION=29.3
RUN curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOBUF_VERSION}/protoc-${PROTOBUF_VERSION}-linux-x86_64.zip && \
unzip protoc-${PROTOBUF_VERSION}-linux-x86_64.zip -d protoc3;
RUN mv protoc3/bin/* /usr/local/bin/
RUN mv protoc3/include/* /usr/local/include/

View File

@@ -34,48 +34,6 @@ services:
networks:
- greptimedb
etcd-tls:
<<: *etcd_common_settings
container_name: etcd-tls
ports:
- 2378:2378
- 2381:2381
command:
- --name=etcd-tls
- --data-dir=/var/lib/etcd
- --initial-advertise-peer-urls=https://etcd-tls:2381
- --listen-peer-urls=https://0.0.0.0:2381
- --listen-client-urls=https://0.0.0.0:2378
- --advertise-client-urls=https://etcd-tls:2378
- --heartbeat-interval=250
- --election-timeout=1250
- --initial-cluster=etcd-tls=https://etcd-tls:2381
- --initial-cluster-state=new
- --initial-cluster-token=etcd-tls-cluster
- --cert-file=/certs/server.crt
- --key-file=/certs/server-key.pem
- --peer-cert-file=/certs/server.crt
- --peer-key-file=/certs/server-key.pem
- --trusted-ca-file=/certs/ca.crt
- --peer-trusted-ca-file=/certs/ca.crt
- --client-cert-auth
- --peer-client-cert-auth
volumes:
- ./greptimedb-cluster-docker-compose/etcd-tls:/var/lib/etcd
- ./greptimedb-cluster-docker-compose/certs:/certs:ro
environment:
- ETCDCTL_API=3
- ETCDCTL_CACERT=/certs/ca.crt
- ETCDCTL_CERT=/certs/server.crt
- ETCDCTL_KEY=/certs/server-key.pem
healthcheck:
test: [ "CMD", "etcdctl", "--endpoints=https://etcd-tls:2378", "--cacert=/certs/ca.crt", "--cert=/certs/server.crt", "--key=/certs/server-key.pem", "endpoint", "health" ]
interval: 10s
timeout: 5s
retries: 5
networks:
- greptimedb
metasrv:
image: *greptimedb_image
container_name: metasrv

View File

@@ -0,0 +1,72 @@
Currently, our query engine is based on DataFusion, so all aggregate function is executed by DataFusion, through its UDAF interface. You can find DataFusion's UDAF example [here](https://github.com/apache/datafusion/tree/main/datafusion-examples/examples/simple_udaf.rs). Basically, we provide the same way as DataFusion to write aggregate functions: both are centered in a struct called "Accumulator" to accumulates states along the way in aggregation.
However, DataFusion's UDAF implementation has a huge restriction, that it requires user to provide a concrete "Accumulator". Take `Median` aggregate function for example, to aggregate a `u32` datatype column, you have to write a `MedianU32`, and use `SELECT MEDIANU32(x)` in SQL. `MedianU32` cannot be used to aggregate a `i32` datatype column. Or, there's another way: you can use a special type that can hold all kinds of data (like our `Value` enum or Arrow's `ScalarValue`), and `match` all the way up to do aggregate calculations. It might work, though rather tedious. (But I think it's DataFusion's preferred way to write UDAF.)
So is there a way we can make an aggregate function that automatically match the input data's type? For example, a `Median` aggregator that can work on both `u32` column and `i32`? The answer is yes until we find a way to bypass DataFusion's restriction, a restriction that DataFusion simply doesn't pass the input data's type when creating an Accumulator.
> There's an example in `my_sum_udaf_example.rs`, take that as quick start.
# 1. Impl `AggregateFunctionCreator` trait for your accumulator creator.
You must first define a struct that will be used to create your accumulator. For example,
```Rust
#[as_aggr_func_creator]
#[derive(Debug, AggrFuncTypeStore)]
struct MySumAccumulatorCreator {}
```
Attribute macro `#[as_aggr_func_creator]` and derive macro `#[derive(Debug, AggrFuncTypeStore)]` must both be annotated on the struct. They work together to provide a storage of aggregate function's input data types, which are needed for creating generic accumulator later.
> Note that the `as_aggr_func_creator` macro will add fields to the struct, so the struct cannot be defined as an empty struct without field like `struct Foo;`, neither as a new type like `struct Foo(bar)`.
Then impl `AggregateFunctionCreator` trait on it. The definition of the trait is:
```Rust
pub trait AggregateFunctionCreator: Send + Sync + Debug {
fn creator(&self) -> AccumulatorCreatorFunction;
fn output_type(&self) -> ConcreteDataType;
fn state_types(&self) -> Vec<ConcreteDataType>;
}
```
You can use input data's type in methods that return output type and state types (just invoke `input_types()`).
The output type is aggregate function's output data's type. For example, `SUM` aggregate function's output type is `u64` for a `u32` datatype column. The state types are accumulator's internal states' types. Take `AVG` aggregate function on a `i32` column as example, its state types are `i64` (for sum) and `u64` (for count).
The `creator` function is where you define how an accumulator (that will be used in DataFusion) is created. You define "how" to create the accumulator (instead of "what" to create), using the input data's type as arguments. With input datatype known, you can create accumulator generically.
# 2. Impl `Accumulator` trait for your accumulator.
The accumulator is where you store the aggregate calculation states and evaluate a result. You must impl `Accumulator` trait for it. The trait's definition is:
```Rust
pub trait Accumulator: Send + Sync + Debug {
fn state(&self) -> Result<Vec<Value>>;
fn update_batch(&mut self, values: &[VectorRef]) -> Result<()>;
fn merge_batch(&mut self, states: &[VectorRef]) -> Result<()>;
fn evaluate(&self) -> Result<Value>;
}
```
The DataFusion basically executes aggregate like this:
1. Partitioning all input data for aggregate. Create an accumulator for each part.
2. Call `update_batch` on each accumulator with partitioned data, to let you update your aggregate calculation.
3. Call `state` to get each accumulator's internal state, the medial calculation result.
4. Call `merge_batch` to merge all accumulator's internal state to one.
5. Execute `evaluate` on the chosen one to get the final calculation result.
Once you know the meaning of each method, you can easily write your accumulator. You can refer to `Median` accumulator or `SUM` accumulator defined in file `my_sum_udaf_example.rs` for more details.
# 3. Register your aggregate function to our query engine.
You can call `register_aggregate_function` method in query engine to register your aggregate function. To do that, you have to new an instance of struct `AggregateFunctionMeta`. The struct has three fields, first is the name of your aggregate function's name. The function name is case-sensitive due to DataFusion's restriction. We strongly recommend using lowercase for your name. If you have to use uppercase name, wrap your aggregate function with quotation marks. For example, if you define an aggregate function named "my_aggr", you can use "`SELECT MY_AGGR(x)`"; if you define "my_AGGR", you have to use "`SELECT "my_AGGR"(x)`".
The second field is arg_counts ,the count of the arguments. Like accumulator `percentile`, calculating the p_number of the column. We need to input the value of column and the value of p to calculate, and so the count of the arguments is two.
The third field is a function about how to create your accumulator creator that you defined in step 1 above. Create creator, that's a bit intertwined, but it is how we make DataFusion use a newly created aggregate function each time it executes a SQL, preventing the stored input types from affecting each other. The key detail can be starting looking at our `DfContextProviderAdapter` struct's `get_aggregate_meta` method.
# (Optional) 4. Make your aggregate function automatically registered.
If you've written a great aggregate function that wants to let everyone use it, you can make it automatically register to our query engine at start time. It's quick and simple, just refer to the `AggregateFunctions::register` function in `common/function/src/scalars/aggregate/mod.rs`.

View File

@@ -1,112 +0,0 @@
---
Feature Name: Async Index Build
Tracking Issue: https://github.com/GreptimeTeam/greptimedb/issues/6756
Date: 2025-08-16
Author: "SNC123 <sinhco@outlook.com>"
---
# Summary
This RFC proposes an asynchronous index build mechanism in the database, with a configuration option to choose between synchronous and asynchronous modes, aiming to improve flexibility and adapt to different workload requirements.
# Motivation
Currently, index creation is performed synchronously, which may lead to prolonged write suspension and impact business continuity. As data volume grows, the time required for index building increases significantly. An asynchronous solution is urgently needed to enhance user experience and system throughput.
# Details
## Overview
The following table highlights the difference between async and sync index approach:
| Approach | Trigger | Data Source | Additional Index Metadata Installation | Fine-grained `FileMeta` Index |
| :--- | :--- | :--- | :--- | :--- |
| Sync Index | On `write_sst` | Memory (on flush) / Disk (on compact) | Not required(already installed synchronously) | Not required |
| Async Index | 4 trigger types | Disk | Required | Required |
The index build mode (synchronous or asynchronous) can be selected via configuration file.
### Four Trigger Types
This RFC introduces four `IndexBuildType`s to trigger index building:
- **Manual Rebuild**: Triggered by the user via `ADMIN build_index("table_name")`, for scenarios like recovering from failed builds or migrating data. SST files whose `ColumnIndexMetadata` (see below) is already consistent with the `RegionMetadata` will be skipped.
- **Schema Change**: Automatically triggered when the schema of an indexed column is altered.
- **Flush**: Automatically builds indexes for new SST files created by a flush.
- **Compact**: Automatically builds indexes for new SST files created by a compaction.
### Additional Index Metadata Installation
Previously, index information in the in-memory `FileMeta` was updated synchronously. The async approach requires an explicit installation step.
A race condition can occur when compaction and index building run concurrently, leading to:
1. Building an index for a file that is about to be deleted by compaction.
2. Creating an unnecessary index file and an incorrect manifest record.
3. On restart, replaying the manifest could load metadata for a non-existent file.
To prevent this, the system checks if a file's `FileMeta` is in a `compacting` state before updating the manifest. If it is, the installation is aborted.
### Fine-grained `FileMeta` Index
The original `FileMeta` only stored file-level index information. However, manual rebuilds require column-level details to identify files inconsistent with the current DDL. Therefore, the `indexes` field in `FileMeta` is updated as follows:
```rust
struct FileMeta {
...
// From file-level:
// available_indexes: SmallVec<[IndexType; 4]>
// To column-level:
indexes: Vec<ColumnIndexMetadata>,
...
}
pub struct ColumnIndexMetadata {
pub column_id: ColumnId,
pub created_indexes: IndexTypes,
}
```
## Process
The index building process is similar to a flush and is illustrated below:
```mermaid
sequenceDiagram
Region0->>Region0: Triggered by one of 4 conditions, targets specific files
loop For each target file
Region0->>IndexBuildScheduler: Submits an index build task
end
IndexBuildScheduler->>IndexBuildTask: Executes the task
IndexBuildTask->>Storage Interfaces: Reads SST data from disk
IndexBuildTask->>IndexBuildTask: Builds the index file
alt Index file size > 0
IndexBuildTask->>Region0: Sends IndexBuildFinished notification
end
alt File exists in Version and is not compacting
Region0->>Storage Interfaces: Updates manifest and Version
end
```
### Task Triggering and Scheduling
The process starts with one of the four `IndexBuildType` triggers. In `handle_rebuild_index`, the `RegionWorkerLoop` identifies target SSTs from the request or the current region version. It then creates an `IndexBuildTask` for each file and submits it to the `index_build_scheduler`.
Similar to Flush and Compact operations, index build tasks are ultimately dispatched to the LocalScheduler. Resource usage can be adjusted via configuration files. Since asynchronous index tasks are both memory-intensive and IO-intensive but have lower priority, it is recommended to allocate fewer resources to them compared to compaction and flush tasks—for example, limiting them to 1/8 of the CPU cores.
### Index Building and Notification
The scheduled `IndexBuildTask` executes its `index_build` method. It uses an `indexer_builder` to create an `Indexer` that reads SST data and builds the index. If a new index file is created (`IndexOutput.file_size > 0`), the task sends an `IndexBuildFinished` notification back to the `RegionWorkerLoop`.
### Index Metadata Installation
Upon receiving the `IndexBuildFinished` notification in `handle_index_build_finished`, the `RegionWorkerLoop` verifies that the file still exists in the current `version` and is not being compacted. If the check passes, it calls `manifest_ctx.update_manifest` to apply a `RegionEdit` with the new index information, completing the installation.
# Drawbacks
Asynchronous index building may consume extra system resources, potentially affecting overall performance during peak periods.
There may be a delay before the new index becomes available for queries, which could impact certain use cases.
# Unresolved Questions and Future Work
**Resource Management and Throttling**: The resource consumption (CPU, I/O) of background index building can be managed and limited to some extent by configuring a dedicated background thread pool. However, this approach cannot fully eliminate resource contention, especially under heavy workloads or when I/O is highly competitive. Additional throttling mechanisms or dynamic prioritization may still be necessary to avoid impacting foreground operations.
# Alternatives
Instead of being triggered by events like Flush or Compact, index building could be performed in batches during scheduled maintenance windows. This offers predictable resource usage but delays index availability.

View File

@@ -15,6 +15,8 @@
let
pkgs = nixpkgs.legacyPackages.${system};
buildInputs = with pkgs; [
libgit2
libz
];
lib = nixpkgs.lib;
rustToolchain = fenix.packages.${system}.fromToolchainName {

File diff suppressed because it is too large Load Diff

View File

@@ -87,13 +87,6 @@
| Other Request P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, scheme, operation) (rate(opendal_operation_duration_seconds_bucket{instance=~"$datanode", operation!~"read\|write\|list\|Writer::write\|Writer::close\|Reader::read"}[$__rate_interval])))` | `timeseries` | Other Request P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]` |
| Opendal traffic | `sum by(instance, pod, scheme, operation) (rate(opendal_operation_bytes_sum{instance=~"$datanode"}[$__rate_interval]))` | `timeseries` | Total traffic as in bytes by instance and operation | `prometheus` | `decbytes` | `[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]` |
| OpenDAL errors per Instance | `sum by(instance, pod, scheme, operation, error) (rate(opendal_operation_errors_total{instance=~"$datanode", error!="NotFound"}[$__rate_interval]))` | `timeseries` | OpenDAL error counts per Instance. | `prometheus` | -- | `[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]-[{{error}}]` |
# Remote WAL
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |
| Triggered region flush total | `meta_triggered_region_flush_total` | `timeseries` | Triggered region flush total | `prometheus` | `none` | `{{pod}}-{{topic_name}}` |
| Triggered region checkpoint total | `meta_triggered_region_checkpoint_total` | `timeseries` | Triggered region checkpoint total | `prometheus` | `none` | `{{pod}}-{{topic_name}}` |
| Topic estimated replay size | `meta_topic_estimated_replay_size` | `timeseries` | Topic estimated max replay size | `prometheus` | `bytes` | `{{pod}}-{{topic_name}}` |
| Kafka logstore's bytes traffic | `rate(greptime_logstore_kafka_client_bytes_total[$__rate_interval])` | `timeseries` | Kafka logstore's bytes traffic | `prometheus` | `bytes` | `{{pod}}-{{logstore}}` |
# Metasrv
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |
@@ -110,8 +103,6 @@
| Meta KV Ops Latency | `histogram_quantile(0.99, sum by(pod, le, op, target) (greptime_meta_kv_request_elapsed_bucket))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `{{pod}}-{{op}} p99` |
| Rate of meta KV Ops | `rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `{{pod}}-{{op}} p99` |
| DDL Latency | `histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `CreateLogicalTables-{{step}} p90` |
| Reconciliation stats | `greptime_meta_reconciliation_stats` | `timeseries` | Reconciliation stats | `prometheus` | `s` | `{{pod}}-{{table_type}}-{{type}}` |
| Reconciliation steps | `histogram_quantile(0.9, greptime_meta_reconciliation_procedure_bucket)` | `timeseries` | Elapsed of Reconciliation steps | `prometheus` | `s` | `{{procedure_name}}-{{step}}-P90` |
# Flownode
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |

View File

@@ -802,48 +802,6 @@ groups:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]-[{{error}}]'
- title: Remote WAL
panels:
- title: Triggered region flush total
type: timeseries
description: Triggered region flush total
unit: none
queries:
- expr: meta_triggered_region_flush_total
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{topic_name}}'
- title: Triggered region checkpoint total
type: timeseries
description: Triggered region checkpoint total
unit: none
queries:
- expr: meta_triggered_region_checkpoint_total
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{topic_name}}'
- title: Topic estimated replay size
type: timeseries
description: Topic estimated max replay size
unit: bytes
queries:
- expr: meta_topic_estimated_replay_size
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{topic_name}}'
- title: Kafka logstore's bytes traffic
type: timeseries
description: Kafka logstore's bytes traffic
unit: bytes
queries:
- expr: rate(greptime_logstore_kafka_client_bytes_total[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{logstore}}'
- title: Metasrv
panels:
- title: Region migration datanode
@@ -990,26 +948,6 @@ groups:
type: prometheus
uid: ${metrics}
legendFormat: AlterTable-{{step}} p90
- title: Reconciliation stats
type: timeseries
description: Reconciliation stats
unit: s
queries:
- expr: greptime_meta_reconciliation_stats
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{table_type}}-{{type}}'
- title: Reconciliation steps
type: timeseries
description: 'Elapsed of Reconciliation steps '
unit: s
queries:
- expr: histogram_quantile(0.9, greptime_meta_reconciliation_procedure_bucket)
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{procedure_name}}-{{step}}-P90'
- title: Flownode
panels:
- title: Flow Ingest / Output Rate

File diff suppressed because it is too large Load Diff

View File

@@ -87,13 +87,6 @@
| Other Request P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, scheme, operation) (rate(opendal_operation_duration_seconds_bucket{ operation!~"read\|write\|list\|Writer::write\|Writer::close\|Reader::read"}[$__rate_interval])))` | `timeseries` | Other Request P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]` |
| Opendal traffic | `sum by(instance, pod, scheme, operation) (rate(opendal_operation_bytes_sum{}[$__rate_interval]))` | `timeseries` | Total traffic as in bytes by instance and operation | `prometheus` | `decbytes` | `[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]` |
| OpenDAL errors per Instance | `sum by(instance, pod, scheme, operation, error) (rate(opendal_operation_errors_total{ error!="NotFound"}[$__rate_interval]))` | `timeseries` | OpenDAL error counts per Instance. | `prometheus` | -- | `[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]-[{{error}}]` |
# Remote WAL
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |
| Triggered region flush total | `meta_triggered_region_flush_total` | `timeseries` | Triggered region flush total | `prometheus` | `none` | `{{pod}}-{{topic_name}}` |
| Triggered region checkpoint total | `meta_triggered_region_checkpoint_total` | `timeseries` | Triggered region checkpoint total | `prometheus` | `none` | `{{pod}}-{{topic_name}}` |
| Topic estimated replay size | `meta_topic_estimated_replay_size` | `timeseries` | Topic estimated max replay size | `prometheus` | `bytes` | `{{pod}}-{{topic_name}}` |
| Kafka logstore's bytes traffic | `rate(greptime_logstore_kafka_client_bytes_total[$__rate_interval])` | `timeseries` | Kafka logstore's bytes traffic | `prometheus` | `bytes` | `{{pod}}-{{logstore}}` |
# Metasrv
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |
@@ -110,8 +103,6 @@
| Meta KV Ops Latency | `histogram_quantile(0.99, sum by(pod, le, op, target) (greptime_meta_kv_request_elapsed_bucket))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `{{pod}}-{{op}} p99` |
| Rate of meta KV Ops | `rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `{{pod}}-{{op}} p99` |
| DDL Latency | `histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `CreateLogicalTables-{{step}} p90` |
| Reconciliation stats | `greptime_meta_reconciliation_stats` | `timeseries` | Reconciliation stats | `prometheus` | `s` | `{{pod}}-{{table_type}}-{{type}}` |
| Reconciliation steps | `histogram_quantile(0.9, greptime_meta_reconciliation_procedure_bucket)` | `timeseries` | Elapsed of Reconciliation steps | `prometheus` | `s` | `{{procedure_name}}-{{step}}-P90` |
# Flownode
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |

View File

@@ -802,48 +802,6 @@ groups:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]-[{{error}}]'
- title: Remote WAL
panels:
- title: Triggered region flush total
type: timeseries
description: Triggered region flush total
unit: none
queries:
- expr: meta_triggered_region_flush_total
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{topic_name}}'
- title: Triggered region checkpoint total
type: timeseries
description: Triggered region checkpoint total
unit: none
queries:
- expr: meta_triggered_region_checkpoint_total
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{topic_name}}'
- title: Topic estimated replay size
type: timeseries
description: Topic estimated max replay size
unit: bytes
queries:
- expr: meta_topic_estimated_replay_size
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{topic_name}}'
- title: Kafka logstore's bytes traffic
type: timeseries
description: Kafka logstore's bytes traffic
unit: bytes
queries:
- expr: rate(greptime_logstore_kafka_client_bytes_total[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{logstore}}'
- title: Metasrv
panels:
- title: Region migration datanode
@@ -990,26 +948,6 @@ groups:
type: prometheus
uid: ${metrics}
legendFormat: AlterTable-{{step}} p90
- title: Reconciliation stats
type: timeseries
description: Reconciliation stats
unit: s
queries:
- expr: greptime_meta_reconciliation_stats
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{table_type}}-{{type}}'
- title: Reconciliation steps
type: timeseries
description: 'Elapsed of Reconciliation steps '
unit: s
queries:
- expr: histogram_quantile(0.9, greptime_meta_reconciliation_procedure_bucket)
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{procedure_name}}-{{step}}-P90'
- title: Flownode
panels:
- title: Flow Ingest / Output Rate

View File

@@ -1,265 +0,0 @@
# Copyright 2023 Greptime Team
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
import re
import sys
def load_udeps_report(report_path):
try:
with open(report_path, "r") as f:
return json.load(f)
except FileNotFoundError:
print(f"Error: Report file '{report_path}' not found.")
return None
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in report file: {e}")
return None
def extract_unused_dependencies(report):
"""
Extract and organize unused dependencies from the cargo-udeps JSON report.
The cargo-udeps report has this structure:
{
"unused_deps": {
"package_name v0.1.0 (/path/to/package)": {
"normal": ["dep1", "dep2"],
"development": ["dev_dep1"],
"build": ["build_dep1"],
"manifest_path": "/path/to/Cargo.toml"
}
}
}
Args:
report (dict): The parsed JSON report from cargo-udeps
Returns:
dict: Organized unused dependencies by package name:
{
"package_name": {
"dependencies": [("dep1", "normal"), ("dev_dep1", "dev")],
"manifest_path": "/path/to/Cargo.toml"
}
}
"""
if not report or "unused_deps" not in report:
return {}
unused_deps = {}
for package_full_name, deps_info in report["unused_deps"].items():
package_name = package_full_name.split(" ")[0]
all_unused = []
if deps_info.get("normal"):
all_unused.extend([(dep, "normal") for dep in deps_info["normal"]])
if deps_info.get("development"):
all_unused.extend([(dep, "dev") for dep in deps_info["development"]])
if deps_info.get("build"):
all_unused.extend([(dep, "build") for dep in deps_info["build"]])
if all_unused:
unused_deps[package_name] = {
"dependencies": all_unused,
"manifest_path": deps_info.get("manifest_path", "unknown"),
}
return unused_deps
def get_section_pattern(dep_type):
"""
Get regex patterns to identify different dependency sections in Cargo.toml.
Args:
dep_type (str): Type of dependency ("normal", "dev", or "build")
Returns:
list: List of regex patterns to match the appropriate section headers
"""
patterns = {
"normal": [r"\[dependencies\]", r"\[dependencies\..*?\]"],
"dev": [r"\[dev-dependencies\]", r"\[dev-dependencies\..*?\]"],
"build": [r"\[build-dependencies\]", r"\[build-dependencies\..*?\]"],
}
return patterns.get(dep_type, [])
def remove_dependency_line(content, dep_name, section_start, section_end):
"""
Remove a dependency line from a specific section of a Cargo.toml file.
Args:
content (str): The entire content of the Cargo.toml file
dep_name (str): Name of the dependency to remove (e.g., "serde", "tokio")
section_start (int): Starting position of the section in the content
section_end (int): Ending position of the section in the content
Returns:
tuple: (new_content, removed) where:
- new_content (str): The modified content with dependency removed
- removed (bool): True if dependency was found and removed, False otherwise
Example input content format:
content = '''
[package]
name = "my-crate"
version = "0.1.0"
[dependencies]
serde = "1.0"
tokio = { version = "1.0", features = ["full"] }
serde_json.workspace = true
[dev-dependencies]
tempfile = "3.0"
'''
# If dep_name = "serde", section_start = start of [dependencies],
# section_end = start of [dev-dependencies], this function will:
# 1. Extract the section: "serde = "1.0"\ntokio = { version = "1.0", features = ["full"] }\nserde_json.workspace = true\n"
# 2. Find and remove the line: "serde = "1.0""
# 3. Return the modified content with that line removed
"""
section_content = content[section_start:section_end]
dep_patterns = [
rf"^{re.escape(dep_name)}\s*=.*$", # e.g., "serde = "1.0""
rf"^{re.escape(dep_name)}\.workspace\s*=.*$", # e.g., "serde_json.workspace = true"
]
for pattern in dep_patterns:
match = re.search(pattern, section_content, re.MULTILINE)
if match:
line_start = section_start + match.start() # Start of the matched line
line_end = section_start + match.end() # End of the matched line
if line_end < len(content) and content[line_end] == "\n":
line_end += 1
return content[:line_start] + content[line_end:], True
return content, False
def remove_dependency_from_toml(file_path, dep_name, dep_type):
"""
Remove a specific dependency from a Cargo.toml file.
Args:
file_path (str): Path to the Cargo.toml file
dep_name (str): Name of the dependency to remove
dep_type (str): Type of dependency ("normal", "dev", or "build")
Returns:
bool: True if dependency was successfully removed, False otherwise
"""
try:
with open(file_path, "r") as f:
content = f.read()
section_patterns = get_section_pattern(dep_type)
if not section_patterns:
return False
for pattern in section_patterns:
section_match = re.search(pattern, content, re.IGNORECASE)
if not section_match:
continue
section_start = section_match.end()
next_section = re.search(r"\n\s*\[", content[section_start:])
section_end = (
section_start + next_section.start() if next_section else len(content)
)
new_content, removed = remove_dependency_line(
content, dep_name, section_start, section_end
)
if removed:
with open(file_path, "w") as f:
f.write(new_content)
return True
return False
except Exception as e:
print(f"Error processing {file_path}: {e}")
return False
def process_unused_dependencies(unused_deps):
"""
Process and remove all unused dependencies from their respective Cargo.toml files.
Args:
unused_deps (dict): Dictionary of unused dependencies organized by package:
{
"package_name": {
"dependencies": [("dep1", "normal"), ("dev_dep1", "dev")],
"manifest_path": "/path/to/Cargo.toml"
}
}
"""
if not unused_deps:
print("No unused dependencies found.")
return
total_removed = 0
total_failed = 0
for package, info in unused_deps.items():
deps = info["dependencies"]
manifest_path = info["manifest_path"]
if not os.path.exists(manifest_path):
print(f"Manifest file not found: {manifest_path}")
total_failed += len(deps)
continue
for dep, dep_type in deps:
if remove_dependency_from_toml(manifest_path, dep, dep_type):
print(f"Removed {dep} from {package}")
total_removed += 1
else:
print(f"Failed to remove {dep} from {package}")
total_failed += 1
print(f"Removed {total_removed} dependencies")
if total_failed > 0:
print(f"Failed to remove {total_failed} dependencies")
def main():
if len(sys.argv) > 1:
report_path = sys.argv[1]
else:
report_path = "udeps-report.json"
report = load_udeps_report(report_path)
if report is None:
sys.exit(1)
unused_deps = extract_unused_dependencies(report)
process_unused_dependencies(unused_deps)
if __name__ == "__main__":
main()

View File

@@ -1,71 +0,0 @@
#!/bin/bash
# Generate TLS certificates for etcd testing
# This script creates certificates for TLS-enabled etcd in testing environments
set -euo pipefail
CERT_DIR="${1:-$(dirname "$0")/../tests-integration/fixtures/etcd-tls-certs}"
DAYS="${2:-365}"
echo "Generating TLS certificates for etcd in ${CERT_DIR}..."
mkdir -p "${CERT_DIR}"
cd "${CERT_DIR}"
echo "Generating CA private key..."
openssl genrsa -out ca-key.pem 2048
echo "Generating CA certificate..."
openssl req -new -x509 -key ca-key.pem -out ca.crt -days "${DAYS}" \
-subj "/C=US/ST=CA/L=SF/O=Greptime/CN=etcd-ca"
# Create server certificate config with Subject Alternative Names
echo "Creating server certificate configuration..."
cat > server.conf << 'EOF'
[req]
distinguished_name = req
[v3_req]
basicConstraints = CA:FALSE
keyUsage = keyEncipherment, dataEncipherment
subjectAltName = @alt_names
[alt_names]
DNS.1 = localhost
DNS.2 = etcd-tls
DNS.3 = 127.0.0.1
IP.1 = 127.0.0.1
IP.2 = ::1
EOF
echo "Generating server private key..."
openssl genrsa -out server-key.pem 2048
echo "Generating server certificate signing request..."
openssl req -new -key server-key.pem -out server.csr \
-subj "/CN=etcd-tls"
echo "Generating server certificate..."
openssl x509 -req -in server.csr -CA ca.crt \
-CAkey ca-key.pem -CAcreateserial -out server.crt \
-days "${DAYS}" -extensions v3_req -extfile server.conf
echo "Generating client private key..."
openssl genrsa -out client-key.pem 2048
echo "Generating client certificate signing request..."
openssl req -new -key client-key.pem -out client.csr \
-subj "/CN=etcd-client"
echo "Generating client certificate..."
openssl x509 -req -in client.csr -CA ca.crt \
-CAkey ca-key.pem -CAcreateserial -out client.crt \
-days "${DAYS}"
echo "Setting proper file permissions..."
chmod 644 ca.crt server.crt client.crt
chmod 600 ca-key.pem server-key.pem client-key.pem
# Clean up intermediate files
rm -f server.csr client.csr server.conf
echo "TLS certificates generated successfully in ${CERT_DIR}"

View File

@@ -19,3 +19,6 @@ paste.workspace = true
prost.workspace = true
serde_json.workspace = true
snafu.workspace = true
[build-dependencies]
tonic-build = "0.11"

View File

@@ -19,8 +19,8 @@ use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_time::timestamp::TimeUnit;
use datatypes::prelude::ConcreteDataType;
use snafu::Location;
use snafu::prelude::*;
use snafu::Location;
pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -16,15 +16,15 @@ use std::collections::HashSet;
use std::sync::Arc;
use common_base::BitVec;
use common_decimal::Decimal128;
use common_decimal::decimal128::{DECIMAL128_DEFAULT_SCALE, DECIMAL128_MAX_PRECISION};
use common_decimal::Decimal128;
use common_time::time::Time;
use common_time::timestamp::TimeUnit;
use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp};
use datatypes::prelude::{ConcreteDataType, ValueRef};
use datatypes::scalars::ScalarVector;
use datatypes::types::{
Int8Type, Int16Type, IntervalType, TimeType, TimestampType, UInt8Type, UInt16Type,
Int16Type, Int8Type, IntervalType, TimeType, TimestampType, UInt16Type, UInt8Type,
};
use datatypes::value::{OrderedF32, OrderedF64, Value};
use datatypes::vectors::{
@@ -295,7 +295,7 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
| ConcreteDataType::Struct(_)
| ConcreteDataType::Dictionary(_)
| ConcreteDataType::Duration(_) => {
return error::IntoColumnDataTypeSnafu { from: datatype }.fail();
return error::IntoColumnDataTypeSnafu { from: datatype }.fail()
}
};
let datatype_extension = match column_datatype {

View File

@@ -14,8 +14,6 @@
pub mod column_def;
pub mod helper;
pub mod meta {
pub use greptime_proto::v1::meta::*;
}

View File

@@ -15,9 +15,9 @@
use std::collections::HashMap;
use datatypes::schema::{
COMMENT_KEY, ColumnDefaultConstraint, ColumnSchema, FULLTEXT_KEY, FulltextAnalyzer,
FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY, SkippingIndexOptions,
SkippingIndexType,
ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend, FulltextOptions,
SkippingIndexOptions, SkippingIndexType, COMMENT_KEY, FULLTEXT_KEY, INVERTED_INDEX_KEY,
SKIPPING_INDEX_KEY,
};
use greptime_proto::v1::{
Analyzer, FulltextBackend as PbFulltextBackend, SkippingIndexType as PbSkippingIndexType,

View File

@@ -1,65 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use greptime_proto::v1::value::ValueData;
use greptime_proto::v1::{ColumnDataType, ColumnSchema, Row, SemanticType, Value};
/// Create a time index [ColumnSchema] with column's name and datatype.
/// Other fields are left default.
/// Useful when you just want to create a simple [ColumnSchema] without providing much struct fields.
pub fn time_index_column_schema(name: &str, datatype: ColumnDataType) -> ColumnSchema {
ColumnSchema {
column_name: name.to_string(),
datatype: datatype as i32,
semantic_type: SemanticType::Timestamp as i32,
..Default::default()
}
}
/// Create a tag [ColumnSchema] with column's name and datatype.
/// Other fields are left default.
/// Useful when you just want to create a simple [ColumnSchema] without providing much struct fields.
pub fn tag_column_schema(name: &str, datatype: ColumnDataType) -> ColumnSchema {
ColumnSchema {
column_name: name.to_string(),
datatype: datatype as i32,
semantic_type: SemanticType::Tag as i32,
..Default::default()
}
}
/// Create a field [ColumnSchema] with column's name and datatype.
/// Other fields are left default.
/// Useful when you just want to create a simple [ColumnSchema] without providing much struct fields.
pub fn field_column_schema(name: &str, datatype: ColumnDataType) -> ColumnSchema {
ColumnSchema {
column_name: name.to_string(),
datatype: datatype as i32,
semantic_type: SemanticType::Field as i32,
..Default::default()
}
}
/// Create a [Row] from [ValueData]s.
/// Useful when you don't want to write much verbose codes.
pub fn row(values: Vec<ValueData>) -> Row {
Row {
values: values
.into_iter()
.map(|x| Value {
value_data: Some(x),
})
.collect::<Vec<_>>(),
}
}

View File

@@ -17,13 +17,13 @@ use std::sync::Arc;
use common_base::secrets::SecretString;
use digest::Digest;
use sha1::Sha1;
use snafu::{OptionExt, ensure};
use snafu::{ensure, OptionExt};
use crate::error::{IllegalParamSnafu, InvalidConfigSnafu, Result, UserPasswordMismatchSnafu};
use crate::user_info::DefaultUserInfo;
use crate::user_provider::static_user_provider::{STATIC_USER_PROVIDER, StaticUserProvider};
use crate::user_provider::static_user_provider::{StaticUserProvider, STATIC_USER_PROVIDER};
use crate::user_provider::watch_file_user_provider::{
WATCH_FILE_USER_PROVIDER, WatchFileUserProvider,
WatchFileUserProvider, WATCH_FILE_USER_PROVIDER,
};
use crate::{UserInfoRef, UserProviderRef};

View File

@@ -22,13 +22,13 @@ mod user_provider;
pub mod tests;
pub use common::{
HashedPassword, Identity, Password, auth_mysql, static_user_provider_from_option,
user_provider_from_option, userinfo_by_name,
auth_mysql, static_user_provider_from_option, user_provider_from_option, userinfo_by_name,
HashedPassword, Identity, Password,
};
pub use permission::{PermissionChecker, PermissionReq, PermissionResp};
pub use user_info::UserInfo;
pub use user_provider::UserProvider;
pub use user_provider::static_user_provider::StaticUserProvider;
pub use user_provider::UserProvider;
/// pub type alias
pub type UserInfoRef = std::sync::Arc<dyn UserInfo>;

View File

@@ -32,7 +32,6 @@ pub enum PermissionReq<'a> {
PromStoreRead,
Otlp,
LogWrite,
BulkInsert,
}
#[derive(Debug)]

View File

@@ -21,7 +21,7 @@ use crate::error::{
UserPasswordMismatchSnafu,
};
use crate::user_info::DefaultUserInfo;
use crate::{Identity, Password, UserInfoRef, UserProvider, auth_mysql};
use crate::{auth_mysql, Identity, Password, UserInfoRef, UserProvider};
pub struct DatabaseAuthInfo<'a> {
pub catalog: &'a str,

View File

@@ -22,7 +22,7 @@ use std::io::BufRead;
use std::path::Path;
use common_base::secrets::ExposeSecret;
use snafu::{OptionExt, ResultExt, ensure};
use snafu::{ensure, OptionExt, ResultExt};
use crate::common::{Identity, Password};
use crate::error::{
@@ -30,7 +30,7 @@ use crate::error::{
UserNotFoundSnafu, UserPasswordMismatchSnafu,
};
use crate::user_info::DefaultUserInfo;
use crate::{UserInfoRef, auth_mysql};
use crate::{auth_mysql, UserInfoRef};
#[async_trait::async_trait]
pub trait UserProvider: Send + Sync {

View File

@@ -102,10 +102,10 @@ pub mod test {
use common_test_util::temp_dir::create_temp_dir;
use crate::UserProvider;
use crate::user_info::DefaultUserInfo;
use crate::user_provider::static_user_provider::StaticUserProvider;
use crate::user_provider::{Identity, Password};
use crate::UserProvider;
async fn test_authenticate(provider: &dyn UserProvider, username: &str, password: &str) {
let re = provider
@@ -143,13 +143,12 @@ pub mod test {
let file = File::create(&file_path);
let file = file.unwrap();
let mut lw = LineWriter::new(file);
assert!(
lw.write_all(
assert!(lw
.write_all(
b"root=123456
admin=654321",
)
.is_ok()
);
.is_ok());
lw.flush().unwrap();
}

View File

@@ -20,7 +20,7 @@ use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use common_telemetry::{info, warn};
use notify::{EventKind, RecursiveMode, Watcher};
use snafu::{ResultExt, ensure};
use snafu::{ensure, ResultExt};
use crate::error::{FileWatchSnafu, InvalidConfigSnafu, Result};
use crate::user_info::DefaultUserInfo;
@@ -133,9 +133,9 @@ pub mod test {
use common_test_util::temp_dir::create_temp_dir;
use tokio::time::sleep;
use crate::UserProvider;
use crate::user_provider::watch_file_user_provider::WatchFileUserProvider;
use crate::user_provider::{Identity, Password};
use crate::UserProvider;
async fn test_authenticate(
provider: &dyn UserProvider,

View File

@@ -19,9 +19,9 @@ use std::time::Duration;
use catalog::kvbackend::new_table_cache;
use common_meta::cache::{
CacheRegistry, CacheRegistryBuilder, LayeredCacheRegistryBuilder, new_schema_cache,
new_table_flownode_set_cache, new_table_info_cache, new_table_name_cache,
new_table_route_cache, new_table_schema_cache, new_view_info_cache,
new_schema_cache, new_table_flownode_set_cache, new_table_info_cache, new_table_name_cache,
new_table_route_cache, new_table_schema_cache, new_view_info_cache, CacheRegistry,
CacheRegistryBuilder, LayeredCacheRegistryBuilder,
};
use common_meta::kv_backend::KvBackendRef;
use moka::future::CacheBuilder;

View File

@@ -21,7 +21,6 @@ bytes.workspace = true
common-base.workspace = true
common-catalog.workspace = true
common-error.workspace = true
common-event-recorder.workspace = true
common-frontend.workspace = true
common-macro.workspace = true
common-meta.workspace = true
@@ -32,7 +31,6 @@ common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-version.workspace = true
common-workload.workspace = true
dashmap.workspace = true
datafusion.workspace = true
datatypes.workspace = true
@@ -49,7 +47,6 @@ prometheus.workspace = true
promql-parser.workspace = true
rand.workspace = true
rustc-hash.workspace = true
serde.workspace = true
serde_json.workspace = true
session.workspace = true
snafu.workspace = true

View File

@@ -297,20 +297,6 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to handle query"))]
HandleQuery {
source: common_meta::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to project schema"))]
ProjectSchema {
source: datatypes::error::Error,
#[snafu(implicit)]
location: Location,
},
}
impl Error {
@@ -383,8 +369,6 @@ impl ErrorExt for Error {
Error::FrontendNotFound { .. } | Error::MetaClientMissing { .. } => {
StatusCode::Unexpected
}
Error::HandleQuery { source, .. } => source.status_code(),
Error::ProjectSchema { source, .. } => source.status_code(),
}
}

View File

@@ -14,34 +14,25 @@
use api::v1::meta::ProcedureStatus;
use common_error::ext::BoxedError;
use common_meta::cluster::{ClusterInfo, NodeInfo, Role};
use common_meta::cluster::{ClusterInfo, NodeInfo};
use common_meta::datanode::RegionStat;
use common_meta::key::flow::flow_state::FlowStat;
use common_meta::node_manager::DatanodeManagerRef;
use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor};
use common_meta::rpc::procedure;
use common_procedure::{ProcedureInfo, ProcedureState};
use common_query::request::QueryRequest;
use common_recordbatch::SendableRecordBatchStream;
use common_recordbatch::util::ChainedRecordBatchStream;
use meta_client::MetaClientRef;
use snafu::ResultExt;
use store_api::storage::RegionId;
use crate::error;
use crate::information_schema::{DatanodeInspectRequest, InformationExtension};
use crate::information_schema::InformationExtension;
pub struct DistributedInformationExtension {
meta_client: MetaClientRef,
datanode_manager: DatanodeManagerRef,
}
impl DistributedInformationExtension {
pub fn new(meta_client: MetaClientRef, datanode_manager: DatanodeManagerRef) -> Self {
Self {
meta_client,
datanode_manager,
}
pub fn new(meta_client: MetaClientRef) -> Self {
Self { meta_client }
}
}
@@ -107,39 +98,4 @@ impl InformationExtension for DistributedInformationExtension {
.map_err(BoxedError::new)
.context(crate::error::ListFlowStatsSnafu)
}
async fn inspect_datanode(
&self,
request: DatanodeInspectRequest,
) -> std::result::Result<SendableRecordBatchStream, Self::Error> {
// Aggregate results from all datanodes
let nodes = self
.meta_client
.list_nodes(Some(Role::Datanode))
.await
.map_err(BoxedError::new)
.context(crate::error::ListNodesSnafu)?;
let plan = request
.build_plan()
.context(crate::error::DatafusionSnafu)?;
let mut streams = Vec::with_capacity(nodes.len());
for node in nodes {
let client = self.datanode_manager.datanode(&node.peer).await;
let stream = client
.handle_query(QueryRequest {
plan: plan.clone(),
region_id: RegionId::default(),
header: None,
})
.await
.context(crate::error::HandleQuerySnafu)?;
streams.push(stream);
}
let chained =
ChainedRecordBatchStream::new(streams).context(crate::error::CreateRecordBatchSnafu)?;
Ok(Box::pin(chained))
}
}

View File

@@ -21,4 +21,4 @@ mod table_cache;
pub use builder::KvBackendCatalogManagerBuilder;
pub use manager::KvBackendCatalogManager;
pub use table_cache::{TableCache, TableCacheRef, new_table_cache};
pub use table_cache::{new_table_cache, TableCache, TableCacheRef};

View File

@@ -16,8 +16,8 @@ use std::sync::Arc;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_meta::cache::LayeredCacheRegistryRef;
use common_meta::key::TableMetadataManager;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_procedure::ProcedureManagerRef;
use moka::sync::Cache;
@@ -26,8 +26,8 @@ use partition::manager::PartitionRuleManager;
#[cfg(feature = "enterprise")]
use crate::information_schema::InformationSchemaTableFactoryRef;
use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider};
use crate::kvbackend::manager::{SystemCatalog, CATALOG_CACHE_MAX_CAPACITY};
use crate::kvbackend::KvBackendCatalogManager;
use crate::kvbackend::manager::{CATALOG_CACHE_MAX_CAPACITY, SystemCatalog};
use crate::process_manager::ProcessManagerRef;
use crate::system_schema::pg_catalog::PGCatalogProvider;

View File

@@ -24,12 +24,12 @@ use common_meta::error::Error::CacheNotGet;
use common_meta::error::{CacheNotGetSnafu, Error, ExternalSnafu, GetKvCacheSnafu, Result};
use common_meta::kv_backend::txn::{Txn, TxnResponse};
use common_meta::kv_backend::{KvBackend, KvBackendRef, TxnService};
use common_meta::rpc::KeyValue;
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
};
use common_meta::rpc::KeyValue;
use common_telemetry::debug;
use meta_client::client::MetaClient;
use moka::future::{Cache, CacheBuilder};
@@ -461,17 +461,17 @@ impl KvBackend for MetaKvBackend {
#[cfg(test)]
mod tests {
use std::any::Any;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
use common_meta::kv_backend::{KvBackend, TxnService};
use common_meta::rpc::KeyValue;
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse,
BatchPutRequest, BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest,
PutResponse, RangeRequest, RangeResponse,
};
use common_meta::rpc::KeyValue;
use dashmap::DashMap;
use super::CachedKvBackend;

View File

@@ -26,12 +26,12 @@ use common_meta::cache::{
LayeredCacheRegistryRef, TableInfoCacheRef, TableNameCacheRef, TableRoute, TableRouteCacheRef,
ViewInfoCacheRef,
};
use common_meta::key::TableMetadataManagerRef;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
use common_meta::key::table_name::TableNameKey;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::KvBackendRef;
use common_procedure::ProcedureManagerRef;
use futures_util::stream::BoxStream;
@@ -41,16 +41,14 @@ use partition::manager::PartitionRuleManagerRef;
use session::context::{Channel, QueryContext};
use snafu::prelude::*;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use table::TableRef;
use table::dist_table::DistTable;
use table::metadata::{TableId, TableInfoRef};
use table::table::PartitionRules;
use table::table::numbers::{NUMBERS_TABLE_NAME, NumbersTable};
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use table::table_name::TableName;
use table::TableRef;
use tokio::sync::Semaphore;
use tokio_stream::wrappers::ReceiverStream;
use crate::CatalogManager;
use crate::error::{
CacheNotFoundSnafu, GetTableCacheSnafu, InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu,
ListSchemasSnafu, ListTablesSnafu, Result, TableMetadataManagerSnafu,
@@ -60,8 +58,9 @@ use crate::information_schema::InformationSchemaTableFactoryRef;
use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider};
use crate::kvbackend::TableCacheRef;
use crate::process_manager::ProcessManagerRef;
use crate::system_schema::SystemSchemaProvider;
use crate::system_schema::pg_catalog::PGCatalogProvider;
use crate::system_schema::SystemSchemaProvider;
use crate::CatalogManager;
/// Access all existing catalog, schema and tables.
///
@@ -133,8 +132,6 @@ impl KvBackendCatalogManager {
{
let mut new_table_info = (*table.table_info()).clone();
let mut phy_part_cols_not_in_logical_table = vec![];
// Remap partition key indices from physical table to logical table
new_table_info.meta.partition_key_indices = physical_table_info_value
.table_info
@@ -151,30 +148,15 @@ impl KvBackendCatalogManager {
.get(physical_index)
.and_then(|physical_column| {
// Find the corresponding index in the logical table schema
let idx = new_table_info
new_table_info
.meta
.schema
.column_index_by_name(physical_column.name.as_str());
if idx.is_none() {
// not all part columns in physical table that are also in logical table
phy_part_cols_not_in_logical_table
.push(physical_column.name.clone());
}
idx
.column_index_by_name(physical_column.name.as_str())
})
})
.collect();
let partition_rules = if !phy_part_cols_not_in_logical_table.is_empty() {
Some(PartitionRules {
extra_phy_cols_not_in_logical_table: phy_part_cols_not_in_logical_table,
})
} else {
None
};
let new_table = DistTable::table_partitioned(Arc::new(new_table_info), partition_rules);
let new_table = DistTable::table(Arc::new(new_table_info));
return Ok(new_table);
}

View File

@@ -20,9 +20,9 @@ use common_meta::instruction::CacheIdent;
use futures::future::BoxFuture;
use moka::future::Cache;
use snafu::OptionExt;
use table::TableRef;
use table::dist_table::DistTable;
use table::table_name::TableName;
use table::TableRef;
pub type TableCacheRef = Arc<TableCache>;

View File

@@ -25,8 +25,8 @@ use common_catalog::consts::{INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME};
use futures::future::BoxFuture;
use futures_util::stream::BoxStream;
use session::context::QueryContext;
use table::TableRef;
use table::metadata::{TableId, TableInfoRef};
use table::TableRef;
use crate::error::Result;

View File

@@ -14,4 +14,4 @@
pub mod manager;
pub use manager::{MemoryCatalogManager, new_memory_catalog_manager};
pub use manager::{new_memory_catalog_manager, MemoryCatalogManager};

View File

@@ -28,8 +28,8 @@ use common_meta::kv_backend::memory::MemoryKvBackend;
use futures_util::stream::BoxStream;
use session::context::QueryContext;
use snafu::OptionExt;
use table::TableRef;
use table::metadata::{TableId, TableInfoRef};
use table::TableRef;
use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu};
use crate::information_schema::InformationSchemaProvider;
@@ -38,7 +38,7 @@ use crate::{CatalogManager, DeregisterTableRequest, RegisterSchemaRequest, Regis
type SchemaEntries = HashMap<String, HashMap<String, TableRef>>;
/// Simple in-memory list of catalogs used for tests.
/// Simple in-memory list of catalogs
#[derive(Clone)]
pub struct MemoryCatalogManager {
/// Collection of catalogs containing schemas and ultimately Tables
@@ -419,7 +419,7 @@ pub fn new_memory_catalog_manager() -> Result<Arc<MemoryCatalogManager>> {
mod tests {
use common_catalog::consts::*;
use futures_util::TryStreamExt;
use table::table::numbers::{NUMBERS_TABLE_NAME, NumbersTable};
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use super::*;
@@ -454,18 +454,16 @@ mod tests {
tables[0].table_info().table_id()
);
assert!(
catalog_list
.table(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
"not_exists",
None
)
.await
.unwrap()
.is_none()
);
assert!(catalog_list
.table(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
"not_exists",
None
)
.await
.unwrap()
.is_none());
}
#[test]
@@ -488,13 +486,11 @@ mod tests {
table: NumbersTable::table(2333),
};
catalog.register_table_sync(register_table_req).unwrap();
assert!(
catalog
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name, None)
.await
.unwrap()
.is_some()
);
assert!(catalog
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name, None)
.await
.unwrap()
.is_some());
let deregister_table_req = DeregisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
@@ -502,12 +498,10 @@ mod tests {
table_name: table_name.to_string(),
};
catalog.deregister_table_sync(deregister_table_req).unwrap();
assert!(
catalog
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name, None)
.await
.unwrap()
.is_none()
);
assert!(catalog
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name, None)
.await
.unwrap()
.is_none());
}
}

View File

@@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt::{Debug, Display, Formatter};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, RwLock};
@@ -21,17 +21,17 @@ use std::time::{Duration, Instant, UNIX_EPOCH};
use api::v1::frontend::{KillProcessRequest, ListProcessRequest, ProcessInfo};
use common_base::cancellation::CancellationHandle;
use common_event_recorder::EventRecorderRef;
use common_frontend::selector::{FrontendSelector, MetaClientSelector};
use common_frontend::slow_query_event::SlowQueryEvent;
use common_telemetry::logging::SlowQueriesRecordType;
use common_telemetry::{debug, info, slow, warn};
use common_telemetry::{debug, error, info, warn};
use common_time::util::current_time_millis;
use meta_client::MetaClientRef;
use promql_parser::parser::EvalStmt;
use rand::random;
use snafu::{OptionExt, ResultExt, ensure};
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements::statement::Statement;
use tokio::sync::mpsc::Sender;
use crate::error;
use crate::metrics::{PROCESS_KILL_COUNT, PROCESS_LIST_COUNT};
@@ -249,8 +249,6 @@ pub struct Ticket {
pub(crate) manager: ProcessManagerRef,
pub(crate) id: ProcessId,
pub cancellation_handle: Arc<CancellationHandle>,
// Keep the handle of the slow query timer to ensure it will trigger the event recording when dropped.
_slow_query_timer: Option<SlowQueryTimer>,
}
@@ -297,37 +295,38 @@ impl Debug for CancellableProcess {
pub struct SlowQueryTimer {
start: Instant,
stmt: QueryStatement,
threshold: Duration,
sample_ratio: f64,
record_type: SlowQueriesRecordType,
recorder: EventRecorderRef,
query_ctx: QueryContextRef,
threshold: Option<Duration>,
sample_ratio: Option<f64>,
tx: Sender<SlowQueryEvent>,
}
impl SlowQueryTimer {
pub fn new(
stmt: QueryStatement,
threshold: Duration,
sample_ratio: f64,
record_type: SlowQueriesRecordType,
recorder: EventRecorderRef,
query_ctx: QueryContextRef,
threshold: Option<Duration>,
sample_ratio: Option<f64>,
tx: Sender<SlowQueryEvent>,
) -> Self {
Self {
start: Instant::now(),
stmt,
query_ctx,
threshold,
sample_ratio,
record_type,
recorder,
tx,
}
}
}
impl SlowQueryTimer {
fn send_slow_query_event(&self, elapsed: Duration) {
fn send_slow_query_event(&self, elapsed: Duration, threshold: Duration) {
let mut slow_query_event = SlowQueryEvent {
cost: elapsed.as_millis() as u64,
threshold: self.threshold.as_millis() as u64,
threshold: threshold.as_millis() as u64,
query: "".to_string(),
query_ctx: self.query_ctx.clone(),
// The following fields are only used for PromQL queries.
is_promql: false,
@@ -364,37 +363,29 @@ impl SlowQueryTimer {
}
}
match self.record_type {
// Send the slow query event to the event recorder to persist it as the system table.
SlowQueriesRecordType::SystemTable => {
self.recorder.record(Box::new(slow_query_event));
}
// Record the slow query in a specific logs file.
SlowQueriesRecordType::Log => {
slow!(
cost = slow_query_event.cost,
threshold = slow_query_event.threshold,
query = slow_query_event.query,
is_promql = slow_query_event.is_promql,
promql_range = slow_query_event.promql_range,
promql_step = slow_query_event.promql_step,
promql_start = slow_query_event.promql_start,
promql_end = slow_query_event.promql_end,
);
}
// Send SlowQueryEvent to the handler.
if let Err(e) = self.tx.try_send(slow_query_event) {
error!(e; "Failed to send slow query event");
}
}
}
impl Drop for SlowQueryTimer {
fn drop(&mut self) {
// Calculate the elaspsed duration since the timer is created.
let elapsed = self.start.elapsed();
if elapsed > self.threshold {
// Only capture a portion of slow queries based on sample_ratio.
// Generate a random number in [0, 1) and compare it with sample_ratio.
if self.sample_ratio >= 1.0 || random::<f64>() <= self.sample_ratio {
self.send_slow_query_event(elapsed);
if let Some(threshold) = self.threshold {
// Calculate the elaspsed duration since the timer is created.
let elapsed = self.start.elapsed();
if elapsed > threshold {
if let Some(ratio) = self.sample_ratio {
// Only capture a portion of slow queries based on sample_ratio.
// Generate a random number in [0, 1) and compare it with sample_ratio.
if ratio >= 1.0 || random::<f64>() <= ratio {
self.send_slow_query_event(elapsed, threshold);
}
} else {
// Captures all slow queries if sample_ratio is not set.
self.send_slow_query_event(elapsed, threshold);
}
}
}
}

View File

@@ -36,26 +36,22 @@ use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME
use common_error::ext::ErrorExt;
use common_meta::cluster::NodeInfo;
use common_meta::datanode::RegionStat;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::flow::flow_state::FlowStat;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_procedure::ProcedureInfo;
use common_recordbatch::SendableRecordBatchStream;
use datafusion::error::DataFusionError;
use datafusion::logical_expr::LogicalPlan;
use datatypes::schema::SchemaRef;
use lazy_static::lazy_static;
use paste::paste;
use process_list::InformationSchemaProcessList;
use store_api::sst_entry::{ManifestSstEntry, StorageSstEntry};
use store_api::storage::{ScanRequest, TableId};
use table::TableRef;
use table::metadata::TableType;
use table::TableRef;
pub use table_names::*;
use views::InformationSchemaViews;
use self::columns::InformationSchemaColumns;
use crate::CatalogManager;
use crate::error::{Error, Result};
use crate::process_manager::ProcessManagerRef;
use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo;
@@ -73,6 +69,7 @@ pub(crate) use crate::system_schema::predicate::Predicates;
use crate::system_schema::{
SystemSchemaProvider, SystemSchemaProviderInner, SystemTable, SystemTableRef,
};
use crate::CatalogManager;
lazy_static! {
// Memory tables in `information_schema`.
@@ -412,43 +409,8 @@ pub trait InformationExtension {
/// Get the flow statistics. If no flownode is available, return `None`.
async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
/// Inspects the datanode.
async fn inspect_datanode(
&self,
request: DatanodeInspectRequest,
) -> std::result::Result<SendableRecordBatchStream, Self::Error>;
}
/// The request to inspect the datanode.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DatanodeInspectRequest {
/// Kind to fetch from datanode.
pub kind: DatanodeInspectKind,
/// Pushdown scan configuration (projection/predicate/limit) for the returned stream.
/// This allows server-side filtering to reduce I/O and network costs.
pub scan: ScanRequest,
}
/// The kind of the datanode inspect request.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DatanodeInspectKind {
/// List SST entries recorded in manifest
SstManifest,
/// List SST entries discovered in storage layer
SstStorage,
}
impl DatanodeInspectRequest {
/// Builds a logical plan for the datanode inspect request.
pub fn build_plan(self) -> std::result::Result<LogicalPlan, DataFusionError> {
match self.kind {
DatanodeInspectKind::SstManifest => ManifestSstEntry::build_plan(self.scan),
DatanodeInspectKind::SstStorage => StorageSstEntry::build_plan(self.scan),
}
}
}
pub struct NoopInformationExtension;
#[async_trait::async_trait]
@@ -470,11 +432,4 @@ impl InformationExtension for NoopInformationExtension {
async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
Ok(None)
}
async fn inspect_datanode(
&self,
_request: DatanodeInspectRequest,
) -> std::result::Result<SendableRecordBatchStream, Self::Error> {
Ok(common_recordbatch::RecordBatches::empty().as_stream())
}
}

View File

@@ -18,46 +18,37 @@ use std::time::Duration;
use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID;
use common_error::ext::BoxedError;
use common_meta::cluster::{DatanodeStatus, NodeInfo, NodeStatus};
use common_meta::cluster::NodeInfo;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_time::timestamp::Timestamp;
use common_workload::DatanodeWorkloadType;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::timestamp::TimestampMillisecond;
use datatypes::value::Value;
use datatypes::vectors::{
Int64VectorBuilder, StringVectorBuilder, TimestampMillisecondVectorBuilder,
UInt32VectorBuilder, UInt64VectorBuilder,
};
use serde::Serialize;
use snafu::ResultExt;
use store_api::storage::{ScanRequest, TableId};
use crate::CatalogManager;
use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
use crate::system_schema::information_schema::{CLUSTER_INFO, InformationTable, Predicates};
use crate::system_schema::information_schema::{InformationTable, Predicates, CLUSTER_INFO};
use crate::system_schema::utils;
const PEER_TYPE_FRONTEND: &str = "FRONTEND";
const PEER_TYPE_METASRV: &str = "METASRV";
use crate::CatalogManager;
const PEER_ID: &str = "peer_id";
const PEER_TYPE: &str = "peer_type";
const PEER_ADDR: &str = "peer_addr";
const CPUS: &str = "cpus";
const MEMORY_BYTES: &str = "memory_bytes";
const VERSION: &str = "version";
const GIT_COMMIT: &str = "git_commit";
const START_TIME: &str = "start_time";
const UPTIME: &str = "uptime";
const ACTIVE_TIME: &str = "active_time";
const NODE_STATUS: &str = "node_status";
const INIT_CAPACITY: usize = 42;
@@ -66,14 +57,11 @@ const INIT_CAPACITY: usize = 42;
/// - `peer_id`: the peer server id.
/// - `peer_type`: the peer type, such as `datanode`, `frontend`, `metasrv` etc.
/// - `peer_addr`: the peer gRPC address.
/// - `cpus`: the number of CPUs of the peer.
/// - `memory_bytes`: the memory bytes of the peer.
/// - `version`: the build package version of the peer.
/// - `git_commit`: the build git commit hash of the peer.
/// - `start_time`: the starting time of the peer.
/// - `uptime`: the uptime of the peer.
/// - `active_time`: the time since the last activity of the peer.
/// - `node_status`: the status info of the peer.
///
#[derive(Debug)]
pub(super) struct InformationSchemaClusterInfo {
@@ -94,8 +82,6 @@ impl InformationSchemaClusterInfo {
ColumnSchema::new(PEER_ID, ConcreteDataType::int64_datatype(), false),
ColumnSchema::new(PEER_TYPE, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(PEER_ADDR, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(CPUS, ConcreteDataType::uint32_datatype(), false),
ColumnSchema::new(MEMORY_BYTES, ConcreteDataType::uint64_datatype(), false),
ColumnSchema::new(VERSION, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(GIT_COMMIT, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(
@@ -105,7 +91,6 @@ impl InformationSchemaClusterInfo {
),
ColumnSchema::new(UPTIME, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(ACTIVE_TIME, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(NODE_STATUS, ConcreteDataType::string_datatype(), true),
]))
}
@@ -155,14 +140,11 @@ struct InformationSchemaClusterInfoBuilder {
peer_ids: Int64VectorBuilder,
peer_types: StringVectorBuilder,
peer_addrs: StringVectorBuilder,
cpus: UInt32VectorBuilder,
memory_bytes: UInt64VectorBuilder,
versions: StringVectorBuilder,
git_commits: StringVectorBuilder,
start_times: TimestampMillisecondVectorBuilder,
uptimes: StringVectorBuilder,
active_times: StringVectorBuilder,
node_status: StringVectorBuilder,
}
impl InformationSchemaClusterInfoBuilder {
@@ -173,14 +155,11 @@ impl InformationSchemaClusterInfoBuilder {
peer_ids: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
peer_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
peer_addrs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
cpus: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
memory_bytes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
versions: StringVectorBuilder::with_capacity(INIT_CAPACITY),
git_commits: StringVectorBuilder::with_capacity(INIT_CAPACITY),
start_times: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
uptimes: StringVectorBuilder::with_capacity(INIT_CAPACITY),
active_times: StringVectorBuilder::with_capacity(INIT_CAPACITY),
node_status: StringVectorBuilder::with_capacity(INIT_CAPACITY),
}
}
@@ -197,10 +176,9 @@ impl InformationSchemaClusterInfoBuilder {
fn add_node_info(&mut self, predicates: &Predicates, node_info: NodeInfo) {
let peer_type = node_info.status.role_name();
let peer_id = peer_id(peer_type, node_info.peer.id);
let row = [
(PEER_ID, &Value::from(peer_id)),
(PEER_ID, &Value::from(node_info.peer.id)),
(PEER_TYPE, &Value::from(peer_type)),
(PEER_ADDR, &Value::from(node_info.peer.addr.as_str())),
(VERSION, &Value::from(node_info.version.as_str())),
@@ -211,7 +189,13 @@ impl InformationSchemaClusterInfoBuilder {
return;
}
self.peer_ids.push(Some(peer_id));
if peer_type == "FRONTEND" || peer_type == "METASRV" {
// Always set peer_id to be -1 for frontends and metasrvs
self.peer_ids.push(Some(-1));
} else {
self.peer_ids.push(Some(node_info.peer.id as i64));
}
self.peer_types.push(Some(peer_type));
self.peer_addrs.push(Some(&node_info.peer.addr));
self.versions.push(Some(&node_info.version));
@@ -228,8 +212,6 @@ impl InformationSchemaClusterInfoBuilder {
self.start_times.push(None);
self.uptimes.push(None);
}
self.cpus.push(Some(node_info.cpus));
self.memory_bytes.push(Some(node_info.memory_bytes));
if node_info.last_activity_ts > 0 {
self.active_times.push(Some(
@@ -238,8 +220,6 @@ impl InformationSchemaClusterInfoBuilder {
} else {
self.active_times.push(None);
}
self.node_status
.push(format_node_status(&node_info).as_deref());
}
fn format_duration_since(ts: u64) -> String {
@@ -253,14 +233,11 @@ impl InformationSchemaClusterInfoBuilder {
Arc::new(self.peer_ids.finish()),
Arc::new(self.peer_types.finish()),
Arc::new(self.peer_addrs.finish()),
Arc::new(self.cpus.finish()),
Arc::new(self.memory_bytes.finish()),
Arc::new(self.versions.finish()),
Arc::new(self.git_commits.finish()),
Arc::new(self.start_times.finish()),
Arc::new(self.uptimes.finish()),
Arc::new(self.active_times.finish()),
Arc::new(self.node_status.finish()),
];
RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
}
@@ -286,56 +263,3 @@ impl DfPartitionStream for InformationSchemaClusterInfo {
))
}
}
fn peer_id(peer_type: &str, peer_id: u64) -> i64 {
if peer_type == PEER_TYPE_FRONTEND || peer_type == PEER_TYPE_METASRV {
-1
} else {
peer_id as i64
}
}
#[derive(Serialize)]
struct DisplayMetasrvStatus {
is_leader: bool,
}
#[derive(Serialize)]
struct DisplayDatanodeStatus {
workloads: Vec<DatanodeWorkloadType>,
leader_regions: usize,
follower_regions: usize,
}
impl From<&DatanodeStatus> for DisplayDatanodeStatus {
fn from(status: &DatanodeStatus) -> Self {
Self {
workloads: status
.workloads
.types
.iter()
.flat_map(|w| DatanodeWorkloadType::from_i32(*w))
.collect(),
leader_regions: status.leader_regions,
follower_regions: status.follower_regions,
}
}
}
fn format_node_status(node_info: &NodeInfo) -> Option<String> {
match &node_info.status {
NodeStatus::Datanode(datanode_status) => {
serde_json::to_string(&DisplayDatanodeStatus::from(datanode_status)).ok()
}
NodeStatus::Frontend(_) => None,
NodeStatus::Flownode(_) => None,
NodeStatus::Metasrv(metasrv_status) => {
if metasrv_status.is_leader {
serde_json::to_string(&DisplayMetasrvStatus { is_leader: true }).ok()
} else {
None
}
}
NodeStatus::Standalone => None,
}
}

View File

@@ -23,9 +23,9 @@ use common_error::ext::BoxedError;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, DataType, MutableVector};
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
@@ -38,12 +38,12 @@ use snafu::{OptionExt, ResultExt};
use sql::statements;
use store_api::storage::{ScanRequest, TableId};
use crate::CatalogManager;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::Predicates;
use crate::system_schema::information_schema::{COLUMNS, InformationTable};
use crate::system_schema::information_schema::{InformationTable, COLUMNS};
use crate::CatalogManager;
#[derive(Debug)]
pub(super) struct InformationSchemaColumns {

View File

@@ -16,10 +16,10 @@ use std::sync::{Arc, Weak};
use common_catalog::consts::INFORMATION_SCHEMA_FLOW_TABLE_ID;
use common_error::ext::BoxedError;
use common_meta::key::FlowId;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::flow::flow_info::FlowInfoValue;
use common_meta::key::flow::flow_state::FlowStat;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::FlowId;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
@@ -38,14 +38,14 @@ use futures::TryStreamExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};
use crate::CatalogManager;
use crate::error::{
CreateRecordBatchSnafu, FlowInfoNotFoundSnafu, InternalSnafu, JsonSnafu, ListFlowsSnafu,
Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::{FLOWS, Predicates};
use crate::information_schema::{Predicates, FLOWS};
use crate::system_schema::information_schema::InformationTable;
use crate::system_schema::utils;
use crate::CatalogManager;
const INIT_CAPACITY: usize = 42;

View File

@@ -89,9 +89,9 @@ pub(super) fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec<VectorRef>
vec![
Arc::new(StringVector::from(vec![build_info.branch.to_string()])),
Arc::new(StringVector::from(vec![build_info.commit.to_string()])),
Arc::new(StringVector::from(vec![
build_info.commit_short.to_string(),
])),
Arc::new(StringVector::from(vec![build_info
.commit_short
.to_string()])),
Arc::new(StringVector::from(vec![build_info.clean.to_string()])),
Arc::new(StringVector::from(vec![build_info.version.to_string()])),
],
@@ -369,9 +369,17 @@ pub(super) fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec<VectorRef>
TRIGGERS => (
vec![
string_column("TRIGGER_NAME"),
ColumnSchema::new("trigger_id", ConcreteDataType::uint64_datatype(), false),
ColumnSchema::new(
"trigger_id",
ConcreteDataType::uint64_datatype(),
false,
),
string_column("TRIGGER_DEFINITION"),
ColumnSchema::new("flownode_id", ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(
"flownode_id",
ConcreteDataType::uint64_datatype(),
true,
),
],
vec![],
),

View File

@@ -20,9 +20,9 @@ use common_error::ext::BoxedError;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, FulltextBackend, Schema, SchemaRef};
use datatypes::value::Value;
@@ -31,11 +31,11 @@ use futures_util::TryStreamExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};
use crate::CatalogManager;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::system_schema::information_schema::{InformationTable, KEY_COLUMN_USAGE, Predicates};
use crate::system_schema::information_schema::{InformationTable, Predicates, KEY_COLUMN_USAGE};
use crate::CatalogManager;
pub const CONSTRAINT_SCHEMA: &str = "constraint_schema";
pub const CONSTRAINT_NAME: &str = "constraint_name";
@@ -277,15 +277,15 @@ impl InformationSchemaKeyColumnUsageBuilder {
constraints.push(CONSTRAINT_NAME_INVERTED_INDEX);
greptime_index_type.push(INDEX_TYPE_INVERTED_INDEX);
}
if let Ok(Some(options)) = column.fulltext_options()
&& options.enable
{
constraints.push(CONSTRAINT_NAME_FULLTEXT_INDEX);
let index_type = match options.backend {
FulltextBackend::Bloom => INDEX_TYPE_FULLTEXT_BLOOM,
FulltextBackend::Tantivy => INDEX_TYPE_FULLTEXT_TANTIVY,
};
greptime_index_type.push(index_type);
if let Ok(Some(options)) = column.fulltext_options() {
if options.enable {
constraints.push(CONSTRAINT_NAME_FULLTEXT_INDEX);
let index_type = match options.backend {
FulltextBackend::Bloom => INDEX_TYPE_FULLTEXT_BLOOM,
FulltextBackend::Tantivy => INDEX_TYPE_FULLTEXT_TANTIVY,
};
greptime_index_type.push(index_type);
}
}
if column.is_skipping_indexed() {
constraints.push(CONSTRAINT_NAME_SKIPPING_INDEX);

View File

@@ -21,9 +21,9 @@ use common_error::ext::BoxedError;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::timestamp::TimestampMicrosecond;
@@ -39,13 +39,13 @@ use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};
use table::metadata::{TableInfo, TableType};
use crate::CatalogManager;
use crate::error::{
CreateRecordBatchSnafu, FindPartitionsSnafu, InternalSnafu, PartitionManagerNotFoundSnafu,
Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::kvbackend::KvBackendCatalogManager;
use crate::system_schema::information_schema::{InformationTable, PARTITIONS, Predicates};
use crate::system_schema::information_schema::{InformationTable, Predicates, PARTITIONS};
use crate::CatalogManager;
const TABLE_CATALOG: &str = "table_catalog";
const TABLE_SCHEMA: &str = "table_schema";

View File

@@ -22,9 +22,9 @@ use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_time::timestamp::Timestamp;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::timestamp::TimestampMillisecond;
@@ -33,10 +33,10 @@ use datatypes::vectors::{StringVectorBuilder, TimestampMillisecondVectorBuilder}
use snafu::ResultExt;
use store_api::storage::{ScanRequest, TableId};
use crate::CatalogManager;
use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
use crate::system_schema::information_schema::{InformationTable, PROCEDURE_INFO, Predicates};
use crate::system_schema::information_schema::{InformationTable, Predicates, PROCEDURE_INFO};
use crate::system_schema::utils;
use crate::CatalogManager;
const PROCEDURE_ID: &str = "procedure_id";
const PROCEDURE_TYPE: &str = "procedure_type";

View File

@@ -23,9 +23,9 @@ use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::common::HashMap;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
@@ -35,13 +35,13 @@ use snafu::{OptionExt, ResultExt};
use store_api::storage::{RegionId, ScanRequest, TableId};
use table::metadata::TableType;
use crate::CatalogManager;
use crate::error::{
CreateRecordBatchSnafu, FindRegionRoutesSnafu, InternalSnafu, Result,
UpgradeWeakCatalogManagerRefSnafu,
};
use crate::kvbackend::KvBackendCatalogManager;
use crate::system_schema::information_schema::{InformationTable, Predicates, REGION_PEERS};
use crate::CatalogManager;
pub const TABLE_CATALOG: &str = "table_catalog";
pub const TABLE_SCHEMA: &str = "table_schema";

View File

@@ -30,17 +30,16 @@ use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder, UInt64VectorB
use snafu::ResultExt;
use store_api::storage::{ScanRequest, TableId};
use crate::CatalogManager;
use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
use crate::information_schema::Predicates;
use crate::system_schema::information_schema::{InformationTable, REGION_STATISTICS};
use crate::system_schema::utils;
use crate::CatalogManager;
const REGION_ID: &str = "region_id";
const TABLE_ID: &str = "table_id";
const REGION_NUMBER: &str = "region_number";
const REGION_ROWS: &str = "region_rows";
const WRITTEN_BYTES: &str = "written_bytes_since_open";
const DISK_SIZE: &str = "disk_size";
const MEMTABLE_SIZE: &str = "memtable_size";
const MANIFEST_SIZE: &str = "manifest_size";
@@ -58,7 +57,6 @@ const INIT_CAPACITY: usize = 42;
/// - `table_id`: The table id.
/// - `region_number`: The region number.
/// - `region_rows`: The number of rows in region.
/// - `written_bytes_since_open`: The total bytes written of the region since region opened.
/// - `memtable_size`: The memtable size in bytes.
/// - `disk_size`: The approximate disk size in bytes.
/// - `manifest_size`: The manifest size in bytes.
@@ -86,7 +84,6 @@ impl InformationSchemaRegionStatistics {
ColumnSchema::new(TABLE_ID, ConcreteDataType::uint32_datatype(), false),
ColumnSchema::new(REGION_NUMBER, ConcreteDataType::uint32_datatype(), false),
ColumnSchema::new(REGION_ROWS, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(WRITTEN_BYTES, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(DISK_SIZE, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(MEMTABLE_SIZE, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(MANIFEST_SIZE, ConcreteDataType::uint64_datatype(), true),
@@ -150,7 +147,6 @@ struct InformationSchemaRegionStatisticsBuilder {
table_ids: UInt32VectorBuilder,
region_numbers: UInt32VectorBuilder,
region_rows: UInt64VectorBuilder,
written_bytes: UInt64VectorBuilder,
disk_sizes: UInt64VectorBuilder,
memtable_sizes: UInt64VectorBuilder,
manifest_sizes: UInt64VectorBuilder,
@@ -170,7 +166,6 @@ impl InformationSchemaRegionStatisticsBuilder {
table_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
region_numbers: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
region_rows: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
written_bytes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
disk_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
memtable_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
manifest_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
@@ -202,7 +197,6 @@ impl InformationSchemaRegionStatisticsBuilder {
(TABLE_ID, &Value::from(region_stat.id.table_id())),
(REGION_NUMBER, &Value::from(region_stat.id.region_number())),
(REGION_ROWS, &Value::from(region_stat.num_rows)),
(WRITTEN_BYTES, &Value::from(region_stat.written_bytes)),
(DISK_SIZE, &Value::from(region_stat.approximate_bytes)),
(MEMTABLE_SIZE, &Value::from(region_stat.memtable_size)),
(MANIFEST_SIZE, &Value::from(region_stat.manifest_size)),
@@ -222,7 +216,6 @@ impl InformationSchemaRegionStatisticsBuilder {
self.region_numbers
.push(Some(region_stat.id.region_number()));
self.region_rows.push(Some(region_stat.num_rows));
self.written_bytes.push(Some(region_stat.written_bytes));
self.disk_sizes.push(Some(region_stat.approximate_bytes));
self.memtable_sizes.push(Some(region_stat.memtable_size));
self.manifest_sizes.push(Some(region_stat.manifest_size));
@@ -239,7 +232,6 @@ impl InformationSchemaRegionStatisticsBuilder {
Arc::new(self.table_ids.finish()),
Arc::new(self.region_numbers.finish()),
Arc::new(self.region_rows.finish()),
Arc::new(self.written_bytes.finish()),
Arc::new(self.disk_sizes.finish()),
Arc::new(self.memtable_sizes.finish()),
Arc::new(self.manifest_sizes.finish()),

View File

@@ -21,9 +21,9 @@ use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_time::util::current_time_millis;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, MutableVector};
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};

View File

@@ -21,9 +21,9 @@ use common_meta::key::schema_name::SchemaNameKey;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
@@ -31,13 +31,13 @@ use datatypes::vectors::StringVectorBuilder;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};
use crate::CatalogManager;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, TableMetadataManagerSnafu,
UpgradeWeakCatalogManagerRefSnafu,
};
use crate::system_schema::information_schema::{InformationTable, Predicates, SCHEMATA};
use crate::system_schema::utils;
use crate::CatalogManager;
pub const CATALOG_NAME: &str = "catalog_name";
pub const SCHEMA_NAME: &str = "schema_name";

View File

@@ -20,9 +20,9 @@ use common_error::ext::BoxedError;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, MutableVector};
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
@@ -32,15 +32,15 @@ use futures::TryStreamExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};
use crate::CatalogManager;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::Predicates;
use crate::information_schema::key_column_usage::{
CONSTRAINT_NAME_PRI, CONSTRAINT_NAME_TIME_INDEX,
};
use crate::information_schema::Predicates;
use crate::system_schema::information_schema::{InformationTable, TABLE_CONSTRAINTS};
use crate::CatalogManager;
/// The `TABLE_CONSTRAINTS` table describes which tables have constraints.
#[derive(Debug)]

View File

@@ -23,26 +23,27 @@ use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_telemetry::error;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{
StringVectorBuilder, TimestampSecondVectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder,
StringVectorBuilder, TimestampMicrosecondVectorBuilder, UInt32VectorBuilder,
UInt64VectorBuilder,
};
use futures::TryStreamExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{RegionId, ScanRequest, TableId};
use table::metadata::{TableInfo, TableType};
use crate::CatalogManager;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::system_schema::information_schema::{InformationTable, Predicates, TABLES};
use crate::system_schema::utils;
use crate::CatalogManager;
pub const TABLE_CATALOG: &str = "table_catalog";
pub const TABLE_SCHEMA: &str = "table_schema";
@@ -106,17 +107,17 @@ impl InformationSchemaTables {
ColumnSchema::new(AUTO_INCREMENT, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(
CREATE_TIME,
ConcreteDataType::timestamp_second_datatype(),
ConcreteDataType::timestamp_microsecond_datatype(),
true,
),
ColumnSchema::new(
UPDATE_TIME,
ConcreteDataType::timestamp_second_datatype(),
ConcreteDataType::timestamp_microsecond_datatype(),
true,
),
ColumnSchema::new(
CHECK_TIME,
ConcreteDataType::timestamp_second_datatype(),
ConcreteDataType::timestamp_microsecond_datatype(),
true,
),
ColumnSchema::new(TABLE_COLLATION, ConcreteDataType::string_datatype(), true),
@@ -193,9 +194,9 @@ struct InformationSchemaTablesBuilder {
max_index_length: UInt64VectorBuilder,
data_free: UInt64VectorBuilder,
auto_increment: UInt64VectorBuilder,
create_time: TimestampSecondVectorBuilder,
update_time: TimestampSecondVectorBuilder,
check_time: TimestampSecondVectorBuilder,
create_time: TimestampMicrosecondVectorBuilder,
update_time: TimestampMicrosecondVectorBuilder,
check_time: TimestampMicrosecondVectorBuilder,
table_collation: StringVectorBuilder,
checksum: UInt64VectorBuilder,
create_options: StringVectorBuilder,
@@ -230,9 +231,9 @@ impl InformationSchemaTablesBuilder {
max_index_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
data_free: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
auto_increment: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
create_time: TimestampSecondVectorBuilder::with_capacity(INIT_CAPACITY),
update_time: TimestampSecondVectorBuilder::with_capacity(INIT_CAPACITY),
check_time: TimestampSecondVectorBuilder::with_capacity(INIT_CAPACITY),
create_time: TimestampMicrosecondVectorBuilder::with_capacity(INIT_CAPACITY),
update_time: TimestampMicrosecondVectorBuilder::with_capacity(INIT_CAPACITY),
check_time: TimestampMicrosecondVectorBuilder::with_capacity(INIT_CAPACITY),
table_collation: StringVectorBuilder::with_capacity(INIT_CAPACITY),
checksum: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
create_options: StringVectorBuilder::with_capacity(INIT_CAPACITY),
@@ -379,7 +380,7 @@ impl InformationSchemaTablesBuilder {
self.create_options
.push(Some(table_info.meta.options.to_string().as_ref()));
self.create_time
.push(Some(table_info.meta.created_on.timestamp().into()));
.push(Some(table_info.meta.created_on.timestamp_millis().into()));
self.temporary
.push(if matches!(table_type, TableType::Temporary) {

View File

@@ -20,9 +20,9 @@ use common_error::ext::BoxedError;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
@@ -32,13 +32,13 @@ use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};
use table::metadata::TableType;
use crate::CatalogManager;
use crate::error::{
CastManagerSnafu, CreateRecordBatchSnafu, GetViewCacheSnafu, InternalSnafu, Result,
UpgradeWeakCatalogManagerRefSnafu, ViewInfoNotFoundSnafu,
};
use crate::kvbackend::KvBackendCatalogManager;
use crate::system_schema::information_schema::{InformationTable, Predicates, VIEWS};
use crate::CatalogManager;
const INIT_CAPACITY: usize = 42;
pub const TABLE_CATALOG: &str = "table_catalog";

View File

@@ -21,9 +21,9 @@ use common_error::ext::BoxedError;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::schema::SchemaRef;
use datatypes::vectors::VectorRef;
use snafu::ResultExt;

View File

@@ -34,10 +34,10 @@ use table::TableRef;
pub use table_names::*;
use self::pg_namespace::oid_map::{PGNamespaceOidMap, PGNamespaceOidMapRef};
use crate::CatalogManager;
use crate::system_schema::memory_table::MemoryTable;
use crate::system_schema::utils::tables::u32_column;
use crate::system_schema::{SystemSchemaProvider, SystemSchemaProviderInner, SystemTableRef};
use crate::CatalogManager;
lazy_static! {
static ref MEMORY_TABLES: &'static [&'static str] = &[table_names::PG_TYPE];

View File

@@ -32,15 +32,15 @@ use snafu::{OptionExt, ResultExt};
use store_api::storage::ScanRequest;
use table::metadata::TableType;
use crate::CatalogManager;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::Predicates;
use crate::system_schema::SystemTable;
use crate::system_schema::pg_catalog::pg_namespace::oid_map::PGNamespaceOidMapRef;
use crate::system_schema::pg_catalog::{OID_COLUMN_NAME, PG_CLASS, query_ctx};
use crate::system_schema::pg_catalog::{query_ctx, OID_COLUMN_NAME, PG_CLASS};
use crate::system_schema::utils::tables::{string_column, u32_column};
use crate::system_schema::SystemTable;
use crate::CatalogManager;
// === column name ===
pub const RELNAME: &str = "relname";

View File

@@ -29,15 +29,15 @@ use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder, VectorRef};
use snafu::{OptionExt, ResultExt};
use store_api::storage::ScanRequest;
use crate::CatalogManager;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::Predicates;
use crate::system_schema::SystemTable;
use crate::system_schema::pg_catalog::pg_namespace::oid_map::PGNamespaceOidMapRef;
use crate::system_schema::pg_catalog::{OID_COLUMN_NAME, PG_DATABASE, query_ctx};
use crate::system_schema::pg_catalog::{query_ctx, OID_COLUMN_NAME, PG_DATABASE};
use crate::system_schema::utils::tables::{string_column, u32_column};
use crate::system_schema::SystemTable;
use crate::CatalogManager;
// === column name ===
pub const DATNAME: &str = "datname";

View File

@@ -35,16 +35,16 @@ use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder, VectorRef};
use snafu::{OptionExt, ResultExt};
use store_api::storage::ScanRequest;
use crate::CatalogManager;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::Predicates;
use crate::system_schema::SystemTable;
use crate::system_schema::pg_catalog::{
OID_COLUMN_NAME, PG_NAMESPACE, PGNamespaceOidMapRef, query_ctx,
query_ctx, PGNamespaceOidMapRef, OID_COLUMN_NAME, PG_NAMESPACE,
};
use crate::system_schema::utils::tables::{string_column, u32_column};
use crate::system_schema::SystemTable;
use crate::CatalogManager;
const NSPNAME: &str = "nspname";
const INIT_CAPACITY: usize = 42;

View File

@@ -133,7 +133,7 @@ impl Predicate {
let Expr::Column(c) = *expr else {
unreachable!();
};
let Expr::Literal(ScalarValue::Utf8(Some(pattern)), _) = *pattern else {
let Expr::Literal(ScalarValue::Utf8(Some(pattern))) = *pattern else {
unreachable!();
};
@@ -148,8 +148,8 @@ impl Predicate {
// left OP right
Expr::BinaryExpr(bin) => match (*bin.left, bin.op, *bin.right) {
// left == right
(Expr::Literal(scalar, _), Operator::Eq, Expr::Column(c))
| (Expr::Column(c), Operator::Eq, Expr::Literal(scalar, _)) => {
(Expr::Literal(scalar), Operator::Eq, Expr::Column(c))
| (Expr::Column(c), Operator::Eq, Expr::Literal(scalar)) => {
let Ok(v) = Value::try_from(scalar) else {
return None;
};
@@ -157,8 +157,8 @@ impl Predicate {
Some(Predicate::Eq(c.name, v))
}
// left != right
(Expr::Literal(scalar, _), Operator::NotEq, Expr::Column(c))
| (Expr::Column(c), Operator::NotEq, Expr::Literal(scalar, _)) => {
(Expr::Literal(scalar), Operator::NotEq, Expr::Column(c))
| (Expr::Column(c), Operator::NotEq, Expr::Literal(scalar)) => {
let Ok(v) = Value::try_from(scalar) else {
return None;
};
@@ -189,7 +189,7 @@ impl Predicate {
let mut values = Vec::with_capacity(list.len());
for scalar in list {
// Safety: checked by `is_all_scalars`
let Expr::Literal(scalar, _) = scalar else {
let Expr::Literal(scalar) = scalar else {
unreachable!();
};
@@ -237,7 +237,7 @@ fn like_utf8(s: &str, pattern: &str, case_insensitive: &bool) -> Option<bool> {
}
fn is_string_literal(expr: &Expr) -> bool {
matches!(expr, Expr::Literal(ScalarValue::Utf8(Some(_)), _))
matches!(expr, Expr::Literal(ScalarValue::Utf8(Some(_))))
}
fn is_column(expr: &Expr) -> bool {
@@ -286,14 +286,14 @@ impl Predicates {
/// Returns true when the values are all [`DfExpr::Literal`].
fn is_all_scalars(list: &[Expr]) -> bool {
list.iter().all(|v| matches!(v, Expr::Literal(_, _)))
list.iter().all(|v| matches!(v, Expr::Literal(_)))
}
#[cfg(test)]
mod tests {
use datafusion::common::Column;
use datafusion::common::{Column, ScalarValue};
use datafusion::logical_expr::expr::InList;
use datafusion::logical_expr::{BinaryExpr, Literal};
use datafusion::logical_expr::BinaryExpr;
use super::*;
@@ -339,22 +339,18 @@ mod tests {
assert!(!p.eval(&wrong_row).unwrap());
assert!(p.eval(&[]).is_none());
assert!(p.eval(&[("c", &a_value)]).is_none());
assert!(
!p.eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &a_value)])
.unwrap()
);
assert!(
!p.eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &b_value)])
.unwrap()
);
assert!(
p.eval(&[(a_col.as_ref(), &a_value), ("c", &a_value)])
.is_none()
);
assert!(
!p.eval(&[(a_col.as_ref(), &b_value), ("c", &a_value)])
.unwrap()
);
assert!(!p
.eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &a_value)])
.unwrap());
assert!(!p
.eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &b_value)])
.unwrap());
assert!(p
.eval(&[(a_col.as_ref(), &a_value), ("c", &a_value)])
.is_none());
assert!(!p
.eval(&[(a_col.as_ref(), &b_value), ("c", &a_value)])
.unwrap());
//Predicate::Or
let p = Predicate::Or(Box::new(p1), Box::new(p2));
@@ -362,22 +358,18 @@ mod tests {
assert!(p.eval(&wrong_row).unwrap());
assert!(p.eval(&[]).is_none());
assert!(p.eval(&[("c", &a_value)]).is_none());
assert!(
!p.eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &a_value)])
.unwrap()
);
assert!(
p.eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &b_value)])
.unwrap()
);
assert!(
p.eval(&[(a_col.as_ref(), &a_value), ("c", &a_value)])
.unwrap()
);
assert!(
p.eval(&[(a_col.as_ref(), &b_value), ("c", &a_value)])
.is_none()
);
assert!(!p
.eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &a_value)])
.unwrap());
assert!(p
.eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &b_value)])
.unwrap());
assert!(p
.eval(&[(a_col.as_ref(), &a_value), ("c", &a_value)])
.unwrap());
assert!(p
.eval(&[(a_col.as_ref(), &b_value), ("c", &a_value)])
.is_none());
}
#[test]
@@ -386,7 +378,7 @@ mod tests {
let expr = Expr::Like(Like {
negated: false,
expr: Box::new(column("a")),
pattern: Box::new("%abc".lit()),
pattern: Box::new(string_literal("%abc")),
case_insensitive: true,
escape_char: None,
});
@@ -413,7 +405,7 @@ mod tests {
let expr = Expr::Like(Like {
negated: false,
expr: Box::new(column("a")),
pattern: Box::new("%abc".lit()),
pattern: Box::new(string_literal("%abc")),
case_insensitive: false,
escape_char: None,
});
@@ -433,7 +425,7 @@ mod tests {
let expr = Expr::Like(Like {
negated: true,
expr: Box::new(column("a")),
pattern: Box::new("%abc".lit()),
pattern: Box::new(string_literal("%abc")),
case_insensitive: true,
escape_char: None,
});
@@ -448,6 +440,10 @@ mod tests {
Expr::Column(Column::from_name(name))
}
fn string_literal(v: &str) -> Expr {
Expr::Literal(ScalarValue::Utf8(Some(v.to_string())))
}
fn match_string_value(v: &Value, expected: &str) -> bool {
matches!(v, Value::String(bs) if bs.as_utf8() == expected)
}
@@ -467,13 +463,13 @@ mod tests {
let expr1 = Expr::BinaryExpr(BinaryExpr {
left: Box::new(column("a")),
op: Operator::Eq,
right: Box::new("a_value".lit()),
right: Box::new(string_literal("a_value")),
});
let expr2 = Expr::BinaryExpr(BinaryExpr {
left: Box::new(column("b")),
op: Operator::NotEq,
right: Box::new("b_value".lit()),
right: Box::new(string_literal("b_value")),
});
(expr1, expr2)
@@ -512,7 +508,7 @@ mod tests {
let inlist_expr = Expr::InList(InList {
expr: Box::new(column("a")),
list: vec!["a1".lit(), "a2".lit()],
list: vec![string_literal("a1"), string_literal("a2")],
negated: false,
});
@@ -522,7 +518,7 @@ mod tests {
let inlist_expr = Expr::InList(InList {
expr: Box::new(column("a")),
list: vec!["a1".lit(), "a2".lit()],
list: vec![string_literal("a1"), string_literal("a2")],
negated: true,
});
let inlist_p = Predicate::from_expr(inlist_expr).unwrap();

View File

@@ -17,10 +17,10 @@ use std::sync::Weak;
use common_meta::key::TableMetadataManagerRef;
use snafu::OptionExt;
use crate::CatalogManager;
use crate::error::{GetInformationExtensionSnafu, Result, UpgradeWeakCatalogManagerRefSnafu};
use crate::information_schema::InformationExtensionRef;
use crate::kvbackend::KvBackendCatalogManager;
use crate::CatalogManager;
pub mod tables;

View File

@@ -17,27 +17,27 @@ use std::sync::Arc;
use bytes::Bytes;
use common_catalog::format_full_table_name;
use common_query::logical_plan::{SubstraitPlanDecoderRef, rename_logical_plan_columns};
use common_query::logical_plan::{rename_logical_plan_columns, SubstraitPlanDecoderRef};
use datafusion::common::{ResolvedTableReference, TableReference};
use datafusion::datasource::view::ViewTable;
use datafusion::datasource::{TableProvider, provider_as_source};
use datafusion::datasource::{provider_as_source, TableProvider};
use datafusion::logical_expr::TableSource;
use itertools::Itertools;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt, ensure};
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::TableType;
use table::table::adapter::DfTableProviderAdapter;
pub mod dummy_catalog;
use dummy_catalog::DummyCatalogList;
use table::TableRef;
use crate::CatalogManagerRef;
use crate::error::{
CastManagerSnafu, DecodePlanSnafu, GetViewCacheSnafu, ProjectViewColumnsSnafu,
CastManagerSnafu, DatafusionSnafu, DecodePlanSnafu, GetViewCacheSnafu, ProjectViewColumnsSnafu,
QueryAccessDeniedSnafu, Result, TableNotExistSnafu, ViewInfoNotFoundSnafu,
ViewPlanColumnsChangedSnafu,
};
use crate::kvbackend::KvBackendCatalogManager;
use crate::CatalogManagerRef;
pub struct DfTableSourceProvider {
catalog_manager: CatalogManagerRef,
@@ -199,10 +199,10 @@ impl DfTableSourceProvider {
logical_plan
};
Ok(Arc::new(ViewTable::new(
logical_plan,
Some(view_info.definition.to_string()),
)))
Ok(Arc::new(
ViewTable::try_new(logical_plan, Some(view_info.definition.to_string()))
.context(DatafusionSnafu)?,
))
}
}
@@ -272,7 +272,7 @@ mod tests {
use common_query::logical_plan::SubstraitPlanDecoder;
use datafusion::catalog::CatalogProviderList;
use datafusion::logical_expr::builder::LogicalTableSource;
use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder, col, lit};
use datafusion::logical_expr::{col, lit, LogicalPlan, LogicalPlanBuilder};
use crate::information_schema::NoopInformationExtension;

View File

@@ -25,8 +25,8 @@ use datafusion::datasource::TableProvider;
use snafu::OptionExt;
use table::table::adapter::DfTableProviderAdapter;
use crate::CatalogManagerRef;
use crate::error::TableNotExistSnafu;
use crate::CatalogManagerRef;
/// Delegate the resolving requests to the `[CatalogManager]` unconditionally.
#[derive(Clone)]

View File

@@ -66,9 +66,6 @@ pub struct BenchTableMetadataCommand {
#[cfg(feature = "pg_kvbackend")]
#[clap(long)]
postgres_addr: Option<String>,
#[cfg(feature = "pg_kvbackend")]
#[clap(long)]
postgres_schema: Option<String>,
#[cfg(feature = "mysql_kvbackend")]
#[clap(long)]
mysql_addr: Option<String>,

View File

@@ -14,8 +14,8 @@
use std::time::Instant;
use common_meta::key::TableMetadataManagerRef;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::TableMetadataManagerRef;
use table::table_name::TableName;
use crate::bench::{

View File

@@ -18,9 +18,9 @@ mod import;
use clap::Subcommand;
use common_error::ext::BoxedError;
use crate::Tool;
use crate::data::export::ExportCommand;
use crate::data::import::ImportCommand;
use crate::Tool;
/// Command for data operations including exporting data from and importing data into GreptimeDB.
#[derive(Subcommand)]

View File

@@ -24,18 +24,18 @@ use common_error::ext::BoxedError;
use common_telemetry::{debug, error, info};
use object_store::layers::LoggingLayer;
use object_store::services::Oss;
use object_store::{ObjectStore, services};
use object_store::{services, ObjectStore};
use serde_json::Value;
use snafu::{OptionExt, ResultExt};
use tokio::sync::Semaphore;
use tokio::time::Instant;
use crate::database::{DatabaseClient, parse_proxy_opts};
use crate::database::{parse_proxy_opts, DatabaseClient};
use crate::error::{
EmptyResultSnafu, Error, OpenDalSnafu, OutputDirNotSetSnafu, Result, S3ConfigNotSetSnafu,
SchemaNotFoundSnafu,
};
use crate::{Tool, database};
use crate::{database, Tool};
type TableReference = (String, String, String);

View File

@@ -25,9 +25,9 @@ use snafu::{OptionExt, ResultExt};
use tokio::sync::Semaphore;
use tokio::time::Instant;
use crate::database::{DatabaseClient, parse_proxy_opts};
use crate::database::{parse_proxy_opts, DatabaseClient};
use crate::error::{Error, FileIoSnafu, Result, SchemaNotFoundSnafu};
use crate::{Tool, database};
use crate::{database, Tool};
#[derive(Debug, Default, Clone, ValueEnum)]
enum ImportTarget {

View File

@@ -14,15 +14,15 @@
use std::time::Duration;
use base64::Engine;
use base64::engine::general_purpose;
use base64::Engine;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::BoxedError;
use humantime::format_duration;
use serde_json::Value;
use servers::http::GreptimeQueryOutput;
use servers::http::header::constants::GREPTIME_DB_HEADER_TIMEOUT;
use servers::http::result::greptime_result_v1::GreptimedbV1Response;
use servers::http::GreptimeQueryOutput;
use snafu::ResultExt;
use crate::error::{

View File

@@ -21,10 +21,10 @@ mod utils;
use clap::Subcommand;
use common_error::ext::BoxedError;
use crate::Tool;
use crate::metadata::control::{DelCommand, GetCommand};
use crate::metadata::repair::RepairLogicalTablesCommand;
use crate::metadata::snapshot::SnapshotCommand;
use crate::Tool;
/// Command for managing metadata operations,
/// including saving and restoring metadata snapshots,

View File

@@ -16,12 +16,11 @@ use std::sync::Arc;
use clap::Parser;
use common_error::ext::BoxedError;
use common_meta::kv_backend::KvBackendRef;
use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::etcd::EtcdStore;
use meta_srv::bootstrap::create_etcd_client_with_tls;
use common_meta::kv_backend::KvBackendRef;
use meta_srv::bootstrap::create_etcd_client;
use meta_srv::metasrv::BackendImpl;
use servers::tls::{TlsMode, TlsOption};
use crate::error::{EmptyStoreAddrsSnafu, UnsupportedMemoryBackendSnafu};
@@ -56,30 +55,6 @@ pub(crate) struct StoreConfig {
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
#[clap(long, default_value = common_meta::kv_backend::DEFAULT_META_TABLE_NAME)]
meta_table_name: String,
/// Optional PostgreSQL schema for metadata table (defaults to current search_path if unset).
#[cfg(feature = "pg_kvbackend")]
#[clap(long)]
meta_schema_name: Option<String>,
/// TLS mode for backend store connections (etcd, PostgreSQL, MySQL)
#[clap(long = "backend-tls-mode", value_enum, default_value = "disable")]
backend_tls_mode: TlsMode,
/// Path to TLS certificate file for backend store connections
#[clap(long = "backend-tls-cert-path", default_value = "")]
backend_tls_cert_path: String,
/// Path to TLS private key file for backend store connections
#[clap(long = "backend-tls-key-path", default_value = "")]
backend_tls_key_path: String,
/// Path to TLS CA certificate file for backend store connections
#[clap(long = "backend-tls-ca-cert-path", default_value = "")]
backend_tls_ca_cert_path: String,
/// Enable watching TLS certificate files for changes
#[clap(long = "backend-tls-watch")]
backend_tls_watch: bool,
}
impl StoreConfig {
@@ -92,18 +67,7 @@ impl StoreConfig {
} else {
let kvbackend = match self.backend {
BackendImpl::EtcdStore => {
let tls_config = if self.backend_tls_mode != TlsMode::Disable {
Some(TlsOption {
mode: self.backend_tls_mode.clone(),
cert_path: self.backend_tls_cert_path.clone(),
key_path: self.backend_tls_key_path.clone(),
ca_cert_path: self.backend_tls_ca_cert_path.clone(),
watch: self.backend_tls_watch,
})
} else {
None
};
let etcd_client = create_etcd_client_with_tls(store_addrs, tls_config.as_ref())
let etcd_client = create_etcd_client(store_addrs)
.await
.map_err(BoxedError::new)?;
Ok(EtcdStore::with_etcd_client(etcd_client, max_txn_ops))
@@ -114,10 +78,8 @@ impl StoreConfig {
let pool = meta_srv::bootstrap::create_postgres_pool(store_addrs, None)
.await
.map_err(BoxedError::new)?;
let schema_name = self.meta_schema_name.as_deref();
Ok(common_meta::kv_backend::rds::PgStore::with_pg_pool(
pool,
schema_name,
table_name,
max_txn_ops,
)

View File

@@ -18,9 +18,9 @@ mod table;
use clap::Subcommand;
use common_error::ext::BoxedError;
use crate::Tool;
use crate::metadata::control::del::key::DelKeyCommand;
use crate::metadata::control::del::table::DelTableCommand;
use crate::Tool;
/// The prefix of the tombstone keys.
pub(crate) const CLI_TOMBSTONE_PREFIX: &str = "__cli_tombstone/";

View File

@@ -19,9 +19,9 @@ use common_meta::key::tombstone::TombstoneManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::rpc::store::RangeRequest;
use crate::Tool;
use crate::metadata::common::StoreConfig;
use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
use crate::Tool;
/// Delete key-value pairs logically from the metadata store.
#[derive(Debug, Default, Parser)]
@@ -102,8 +102,8 @@ mod tests {
use common_meta::kv_backend::{KvBackend, KvBackendRef};
use common_meta::rpc::store::RangeRequest;
use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
use crate::metadata::control::del::key::KeyDeleter;
use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
use crate::metadata::control::test_utils::put_key;
#[tokio::test]

View File

@@ -18,16 +18,16 @@ use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_meta::ddl::utils::get_region_wal_options;
use common_meta::key::TableMetadataManager;
use common_meta::key::table_name::TableNameManager;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use store_api::storage::TableId;
use crate::Tool;
use crate::error::{InvalidArgumentsSnafu, TableNotFoundSnafu};
use crate::metadata::common::StoreConfig;
use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
use crate::metadata::control::utils::get_table_id_by_name;
use crate::Tool;
/// Delete table metadata logically from the metadata store.
#[derive(Debug, Default, Parser)]
@@ -183,15 +183,15 @@ mod tests {
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_meta::key::TableMetadataManager;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackend, KvBackendRef};
use common_meta::rpc::store::RangeRequest;
use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
use crate::metadata::control::del::table::TableMetadataDeleter;
use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
use crate::metadata::control::test_utils::prepare_physical_table_metadata;
#[tokio::test]

View File

@@ -19,18 +19,18 @@ use clap::{Parser, Subcommand};
use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_meta::key::TableMetadataManager;
use common_meta::key::table_info::TableInfoKey;
use common_meta::key::table_route::TableRouteKey;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream};
use common_meta::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use common_meta::rpc::store::RangeRequest;
use futures::TryStreamExt;
use crate::Tool;
use crate::error::InvalidArgumentsSnafu;
use crate::metadata::common::StoreConfig;
use crate::metadata::control::utils::{decode_key_value, get_table_id_by_name, json_fromatter};
use crate::Tool;
/// Getting metadata from metadata store.
#[derive(Subcommand)]

View File

@@ -31,18 +31,18 @@ use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::NodeManagerRef;
use common_meta::peer::Peer;
use common_meta::rpc::router::{RegionRoute, find_leaders};
use common_meta::rpc::router::{find_leaders, RegionRoute};
use common_telemetry::{error, info, warn};
use futures::TryStreamExt;
use snafu::{ResultExt, ensure};
use snafu::{ensure, ResultExt};
use store_api::storage::TableId;
use crate::Tool;
use crate::error::{
InvalidArgumentsSnafu, Result, SendRequestToDatanodeSnafu, TableMetadataSnafu, UnexpectedSnafu,
};
use crate::metadata::common::StoreConfig;
use crate::metadata::utils::{FullTableMetadata, IteratorInput, TableMetadataIterator};
use crate::Tool;
/// Repair metadata of logical tables.
#[derive(Debug, Default, Parser)]
@@ -301,10 +301,7 @@ impl RepairTool {
warn!(
"Sending alter table requests to datanodes for table: {} failed for the datanodes: {:?}",
full_table_name,
failed_peers
.iter()
.map(|(peer, _)| peer.id)
.collect::<Vec<_>>()
failed_peers.iter().map(|(peer, _)| peer.id).collect::<Vec<_>>()
);
let create_table_expr =
@@ -323,7 +320,8 @@ impl RepairTool {
}
info!(
"Region not found for table: {}, datanode: {}, trying to create the logical table on that datanode",
full_table_name, peer.id
full_table_name,
peer.id
);
// If the alter table request fails for any datanode, we attempt to create the table on that datanode

Some files were not shown because too many files have changed in this diff Show More