mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
Compare commits
134 Commits
flow/choos
...
more_metri
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d093d11e62 | ||
|
|
457334e190 | ||
|
|
92f36d96b5 | ||
|
|
e83e1a7699 | ||
|
|
f9f905ae14 | ||
|
|
1d53dd26ae | ||
|
|
01796c9cc0 | ||
|
|
9469a8f8f2 | ||
|
|
2fabe346a1 | ||
|
|
c26138963e | ||
|
|
12648f388a | ||
|
|
2979aa048e | ||
|
|
74222c3070 | ||
|
|
0311db3089 | ||
|
|
e434294a0c | ||
|
|
8d2c1b7f6a | ||
|
|
c50e84095e | ||
|
|
d3d233257d | ||
|
|
fdf32a8f46 | ||
|
|
69870e2762 | ||
|
|
f9f4ac1dca | ||
|
|
99e56af98c | ||
|
|
538b5abaae | ||
|
|
a2b3ad77df | ||
|
|
0eb9e97f79 | ||
|
|
06b1627da5 | ||
|
|
0d4f27a699 | ||
|
|
c4da8bb69d | ||
|
|
0bd8856e2f | ||
|
|
92c5a9f5f4 | ||
|
|
80c5af0ecf | ||
|
|
7afb77fd35 | ||
|
|
0b9af77fe9 | ||
|
|
cbafb6e00b | ||
|
|
744a754246 | ||
|
|
9cd4a2c525 | ||
|
|
180920327b | ||
|
|
ee4f830be6 | ||
|
|
69975f1f71 | ||
|
|
38cac301f2 | ||
|
|
083c22b90a | ||
|
|
fdd164c0fa | ||
|
|
078afb2bd6 | ||
|
|
477e4cc344 | ||
|
|
078d83cec2 | ||
|
|
7705d84d83 | ||
|
|
0d81400bb4 | ||
|
|
1d7ae66e75 | ||
|
|
af6cf999c1 | ||
|
|
54869a1329 | ||
|
|
3104d49434 | ||
|
|
b4d00fb499 | ||
|
|
4ae6df607b | ||
|
|
183e1dc031 | ||
|
|
886c2dba76 | ||
|
|
4e615e8906 | ||
|
|
9afc61f778 | ||
|
|
d22084e90c | ||
|
|
5e9b5d981f | ||
|
|
b01fce95a0 | ||
|
|
9fbcf9b7e7 | ||
|
|
dc3591655e | ||
|
|
aca7ad82b1 | ||
|
|
10fa6d8736 | ||
|
|
92422dafca | ||
|
|
53752e4f6c | ||
|
|
40bfa98d4b | ||
|
|
49986b03d6 | ||
|
|
493440a802 | ||
|
|
77e2fee755 | ||
|
|
b85429c0f1 | ||
|
|
3d942f6763 | ||
|
|
3901863432 | ||
|
|
27e339f628 | ||
|
|
cf2712e6f4 | ||
|
|
4b71e493f7 | ||
|
|
bf496e05cc | ||
|
|
513ca951ee | ||
|
|
791f530a78 | ||
|
|
1de6d8c619 | ||
|
|
a4d0420727 | ||
|
|
fc6300a2ba | ||
|
|
f55af5838c | ||
|
|
5a0da5b6bb | ||
|
|
d5f0006864 | ||
|
|
ede82331b2 | ||
|
|
56e696bd55 | ||
|
|
bc0cdf62ba | ||
|
|
eaf7b4b9dd | ||
|
|
7ae0e150e5 | ||
|
|
43c30b55ae | ||
|
|
153e80450a | ||
|
|
1624dc41c5 | ||
|
|
300262562b | ||
|
|
b2377d4b87 | ||
|
|
8d36ffb4e1 | ||
|
|
955ad644f7 | ||
|
|
c2e3c3d398 | ||
|
|
400229c384 | ||
|
|
cd9b6990bf | ||
|
|
a56e6e04c2 | ||
|
|
d324439014 | ||
|
|
038acda7cd | ||
|
|
a0d89c9ed1 | ||
|
|
3a5534722c | ||
|
|
1010a0c2ad | ||
|
|
f46cdbd66b | ||
|
|
864cc117b3 | ||
|
|
0ea9ab385d | ||
|
|
c7e9485534 | ||
|
|
57b53211d9 | ||
|
|
01076069a3 | ||
|
|
73b4b710cd | ||
|
|
14b655ea57 | ||
|
|
c780746171 | ||
|
|
1f62c3b545 | ||
|
|
5a9023d6b3 | ||
|
|
209f8371f2 | ||
|
|
30f1cbf0bf | ||
|
|
bbb6f8685e | ||
|
|
29540b55ee | ||
|
|
ca1641d1c4 | ||
|
|
b275793b36 | ||
|
|
265b144ca2 | ||
|
|
2ce5631d3c | ||
|
|
36d9346ffc | ||
|
|
36ff36e094 | ||
|
|
9cf5f0e940 | ||
|
|
2a0e9c930d | ||
|
|
787a50631b | ||
|
|
50df275097 | ||
|
|
8dca448baf | ||
|
|
828f69a562 | ||
|
|
04cae4b21e |
2
.github/CODEOWNERS
vendored
2
.github/CODEOWNERS
vendored
@@ -4,7 +4,7 @@
|
||||
|
||||
* @GreptimeTeam/db-approver
|
||||
|
||||
## [Module] Databse Engine
|
||||
## [Module] Database Engine
|
||||
/src/index @zhongzc
|
||||
/src/mito2 @evenyag @v0y4g3r @waynexia
|
||||
/src/query @evenyag
|
||||
|
||||
@@ -52,7 +52,7 @@ runs:
|
||||
uses: ./.github/actions/build-greptime-binary
|
||||
with:
|
||||
base-image: ubuntu
|
||||
features: servers/dashboard,pg_kvbackend,mysql_kvbackend
|
||||
features: servers/dashboard
|
||||
cargo-profile: ${{ inputs.cargo-profile }}
|
||||
artifacts-dir: greptime-linux-${{ inputs.arch }}-${{ inputs.version }}
|
||||
version: ${{ inputs.version }}
|
||||
@@ -70,7 +70,7 @@ runs:
|
||||
if: ${{ inputs.arch == 'amd64' && inputs.dev-mode == 'false' }} # Builds greptime for centos if the host machine is amd64.
|
||||
with:
|
||||
base-image: centos
|
||||
features: servers/dashboard,pg_kvbackend,mysql_kvbackend
|
||||
features: servers/dashboard
|
||||
cargo-profile: ${{ inputs.cargo-profile }}
|
||||
artifacts-dir: greptime-linux-${{ inputs.arch }}-centos-${{ inputs.version }}
|
||||
version: ${{ inputs.version }}
|
||||
|
||||
@@ -64,11 +64,11 @@ inputs:
|
||||
upload-max-retry-times:
|
||||
description: Max retry times for uploading artifacts to S3
|
||||
required: false
|
||||
default: "20"
|
||||
default: "30"
|
||||
upload-retry-timeout:
|
||||
description: Timeout for uploading artifacts to S3
|
||||
required: false
|
||||
default: "30" # minutes
|
||||
default: "120" # minutes
|
||||
runs:
|
||||
using: composite
|
||||
steps:
|
||||
|
||||
@@ -59,7 +59,7 @@ runs:
|
||||
--set base.podTemplate.main.resources.requests.cpu=50m \
|
||||
--set base.podTemplate.main.resources.requests.memory=256Mi \
|
||||
--set base.podTemplate.main.resources.limits.cpu=2000m \
|
||||
--set base.podTemplate.main.resources.limits.memory=2Gi \
|
||||
--set base.podTemplate.main.resources.limits.memory=3Gi \
|
||||
--set frontend.replicas=${{ inputs.frontend-replicas }} \
|
||||
--set datanode.replicas=${{ inputs.datanode-replicas }} \
|
||||
--set meta.replicas=${{ inputs.meta-replicas }} \
|
||||
|
||||
@@ -22,7 +22,6 @@ datanode:
|
||||
[wal]
|
||||
provider = "kafka"
|
||||
broker_endpoints = ["kafka.kafka-cluster.svc.cluster.local:9092"]
|
||||
linger = "2ms"
|
||||
overwrite_entry_start_id = true
|
||||
frontend:
|
||||
configData: |-
|
||||
|
||||
5
.github/scripts/create-version.sh
vendored
5
.github/scripts/create-version.sh
vendored
@@ -8,7 +8,7 @@ set -e
|
||||
# - If it's a nightly build, the version is 'nightly-YYYYMMDD-$(git rev-parse --short HEAD)', like 'nightly-20230712-e5b243c'.
|
||||
# create_version ${GIHUB_EVENT_NAME} ${NEXT_RELEASE_VERSION} ${NIGHTLY_RELEASE_PREFIX}
|
||||
function create_version() {
|
||||
# Read from envrionment variables.
|
||||
# Read from environment variables.
|
||||
if [ -z "$GITHUB_EVENT_NAME" ]; then
|
||||
echo "GITHUB_EVENT_NAME is empty" >&2
|
||||
exit 1
|
||||
@@ -16,7 +16,8 @@ function create_version() {
|
||||
|
||||
if [ -z "$NEXT_RELEASE_VERSION" ]; then
|
||||
echo "NEXT_RELEASE_VERSION is empty, use version from Cargo.toml" >&2
|
||||
export NEXT_RELEASE_VERSION=$(grep '^version = ' Cargo.toml | cut -d '"' -f 2 | head -n 1)
|
||||
# NOTE: Need a `v` prefix for the version string.
|
||||
export NEXT_RELEASE_VERSION=v$(grep '^version = ' Cargo.toml | cut -d '"' -f 2 | head -n 1)
|
||||
fi
|
||||
|
||||
if [ -z "$NIGHTLY_RELEASE_PREFIX" ]; then
|
||||
|
||||
2
.github/scripts/deploy-greptimedb.sh
vendored
2
.github/scripts/deploy-greptimedb.sh
vendored
@@ -10,7 +10,7 @@ GREPTIMEDB_IMAGE_TAG=${GREPTIMEDB_IMAGE_TAG:-latest}
|
||||
ETCD_CHART="oci://registry-1.docker.io/bitnamicharts/etcd"
|
||||
GREPTIME_CHART="https://greptimeteam.github.io/helm-charts/"
|
||||
|
||||
# Ceate a cluster with 1 control-plane node and 5 workers.
|
||||
# Create a cluster with 1 control-plane node and 5 workers.
|
||||
function create_kind_cluster() {
|
||||
cat <<EOF | kind create cluster --name "${CLUSTER}" --image kindest/node:"$KUBERNETES_VERSION" --config=-
|
||||
kind: Cluster
|
||||
|
||||
@@ -4,7 +4,7 @@ DEV_BUILDER_IMAGE_TAG=$1
|
||||
|
||||
update_dev_builder_version() {
|
||||
if [ -z "$DEV_BUILDER_IMAGE_TAG" ]; then
|
||||
echo "Error: Should specify the dev-builder image tag"
|
||||
echo "Error: Should specify the dev-builder image tag"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
@@ -17,7 +17,7 @@ update_dev_builder_version() {
|
||||
git checkout -b $BRANCH_NAME
|
||||
|
||||
# Update the dev-builder image tag in the Makefile.
|
||||
gsed -i "s/DEV_BUILDER_IMAGE_TAG ?=.*/DEV_BUILDER_IMAGE_TAG ?= ${DEV_BUILDER_IMAGE_TAG}/g" Makefile
|
||||
sed -i "s/DEV_BUILDER_IMAGE_TAG ?=.*/DEV_BUILDER_IMAGE_TAG ?= ${DEV_BUILDER_IMAGE_TAG}/g" Makefile
|
||||
|
||||
# Commit the changes.
|
||||
git add Makefile
|
||||
|
||||
46
.github/scripts/update-helm-charts-version.sh
vendored
Executable file
46
.github/scripts/update-helm-charts-version.sh
vendored
Executable file
@@ -0,0 +1,46 @@
|
||||
#!/bin/bash
|
||||
|
||||
set -e
|
||||
|
||||
VERSION=${VERSION}
|
||||
GITHUB_TOKEN=${GITHUB_TOKEN}
|
||||
|
||||
update_helm_charts_version() {
|
||||
# Configure Git configs.
|
||||
git config --global user.email update-helm-charts-version@greptime.com
|
||||
git config --global user.name update-helm-charts-version
|
||||
|
||||
# Clone helm-charts repository.
|
||||
git clone "https://x-access-token:${GITHUB_TOKEN}@github.com/GreptimeTeam/helm-charts.git"
|
||||
cd helm-charts
|
||||
|
||||
# Set default remote for gh CLI
|
||||
gh repo set-default GreptimeTeam/helm-charts
|
||||
|
||||
# Checkout a new branch.
|
||||
BRANCH_NAME="chore/greptimedb-${VERSION}"
|
||||
git checkout -b $BRANCH_NAME
|
||||
|
||||
# Update version.
|
||||
make update-version CHART=greptimedb-cluster VERSION=${VERSION}
|
||||
make update-version CHART=greptimedb-standalone VERSION=${VERSION}
|
||||
|
||||
# Update docs.
|
||||
make docs
|
||||
|
||||
# Commit the changes.
|
||||
git add .
|
||||
git commit -s -m "chore: Update GreptimeDB version to ${VERSION}"
|
||||
git push origin $BRANCH_NAME
|
||||
|
||||
# Create a Pull Request.
|
||||
gh pr create \
|
||||
--title "chore: Update GreptimeDB version to ${VERSION}" \
|
||||
--body "This PR updates the GreptimeDB version." \
|
||||
--base main \
|
||||
--head $BRANCH_NAME \
|
||||
--reviewer zyy17 \
|
||||
--reviewer daviderli614
|
||||
}
|
||||
|
||||
update_helm_charts_version
|
||||
42
.github/scripts/update-homebrew-greptme-version.sh
vendored
Executable file
42
.github/scripts/update-homebrew-greptme-version.sh
vendored
Executable file
@@ -0,0 +1,42 @@
|
||||
#!/bin/bash
|
||||
|
||||
set -e
|
||||
|
||||
VERSION=${VERSION}
|
||||
GITHUB_TOKEN=${GITHUB_TOKEN}
|
||||
|
||||
update_homebrew_greptime_version() {
|
||||
# Configure Git configs.
|
||||
git config --global user.email update-greptime-version@greptime.com
|
||||
git config --global user.name update-greptime-version
|
||||
|
||||
# Clone helm-charts repository.
|
||||
git clone "https://x-access-token:${GITHUB_TOKEN}@github.com/GreptimeTeam/homebrew-greptime.git"
|
||||
cd homebrew-greptime
|
||||
|
||||
# Set default remote for gh CLI
|
||||
gh repo set-default GreptimeTeam/homebrew-greptime
|
||||
|
||||
# Checkout a new branch.
|
||||
BRANCH_NAME="chore/greptimedb-${VERSION}"
|
||||
git checkout -b $BRANCH_NAME
|
||||
|
||||
# Update version.
|
||||
make update-greptime-version VERSION=${VERSION}
|
||||
|
||||
# Commit the changes.
|
||||
git add .
|
||||
git commit -s -m "chore: Update GreptimeDB version to ${VERSION}"
|
||||
git push origin $BRANCH_NAME
|
||||
|
||||
# Create a Pull Request.
|
||||
gh pr create \
|
||||
--title "chore: Update GreptimeDB version to ${VERSION}" \
|
||||
--body "This PR updates the GreptimeDB version." \
|
||||
--base main \
|
||||
--head $BRANCH_NAME \
|
||||
--reviewer zyy17 \
|
||||
--reviewer daviderli614
|
||||
}
|
||||
|
||||
update_homebrew_greptime_version
|
||||
2
.github/scripts/upload-artifacts-to-s3.sh
vendored
2
.github/scripts/upload-artifacts-to-s3.sh
vendored
@@ -41,7 +41,7 @@ function upload_artifacts() {
|
||||
# Updates the latest version information in AWS S3 if UPDATE_VERSION_INFO is true.
|
||||
function update_version_info() {
|
||||
if [ "$UPDATE_VERSION_INFO" == "true" ]; then
|
||||
# If it's the officail release(like v1.0.0, v1.0.1, v1.0.2, etc.), update latest-version.txt.
|
||||
# If it's the official release(like v1.0.0, v1.0.1, v1.0.2, etc.), update latest-version.txt.
|
||||
if [[ "$VERSION" =~ ^v[0-9]+\.[0-9]+\.[0-9]+$ ]]; then
|
||||
echo "Updating latest-version.txt"
|
||||
echo "$VERSION" > latest-version.txt
|
||||
|
||||
9
.github/workflows/dev-build.yml
vendored
9
.github/workflows/dev-build.yml
vendored
@@ -55,6 +55,11 @@ on:
|
||||
description: Build and push images to DockerHub and ACR
|
||||
required: false
|
||||
default: true
|
||||
upload_artifacts_to_s3:
|
||||
type: boolean
|
||||
description: Whether upload artifacts to s3
|
||||
required: false
|
||||
default: false
|
||||
cargo_profile:
|
||||
type: choice
|
||||
description: The cargo profile to use in building GreptimeDB.
|
||||
@@ -238,7 +243,7 @@ jobs:
|
||||
version: ${{ needs.allocate-runners.outputs.version }}
|
||||
push-latest-tag: false # Don't push the latest tag to registry.
|
||||
dev-mode: true # Only build the standard images.
|
||||
|
||||
|
||||
- name: Echo Docker image tag to step summary
|
||||
run: |
|
||||
echo "## Docker Image Tag" >> $GITHUB_STEP_SUMMARY
|
||||
@@ -281,7 +286,7 @@ jobs:
|
||||
aws-cn-access-key-id: ${{ secrets.AWS_CN_ACCESS_KEY_ID }}
|
||||
aws-cn-secret-access-key: ${{ secrets.AWS_CN_SECRET_ACCESS_KEY }}
|
||||
aws-cn-region: ${{ vars.AWS_RELEASE_BUCKET_REGION }}
|
||||
upload-to-s3: false
|
||||
upload-to-s3: ${{ inputs.upload_artifacts_to_s3 }}
|
||||
dev-mode: true # Only build the standard images(exclude centos images).
|
||||
push-latest-tag: false # Don't push the latest tag to registry.
|
||||
update-version-info: false # Don't update the version info in S3.
|
||||
|
||||
33
.github/workflows/develop.yml
vendored
33
.github/workflows/develop.yml
vendored
@@ -195,6 +195,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 60
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
target: [ "unstable_fuzz_create_table_standalone" ]
|
||||
steps:
|
||||
@@ -222,12 +223,12 @@ jobs:
|
||||
run: |
|
||||
sudo apt update && sudo apt install -y libfuzzer-14-dev
|
||||
cargo install cargo-fuzz cargo-gc-bin --force
|
||||
- name: Download pre-built binariy
|
||||
- name: Download pre-built binary
|
||||
uses: actions/download-artifact@v4
|
||||
with:
|
||||
name: bin
|
||||
path: .
|
||||
- name: Unzip bianry
|
||||
- name: Unzip binary
|
||||
run: |
|
||||
tar -xvf ./bin.tar.gz
|
||||
rm ./bin.tar.gz
|
||||
@@ -249,6 +250,11 @@ jobs:
|
||||
name: unstable-fuzz-logs
|
||||
path: /tmp/unstable-greptime/
|
||||
retention-days: 3
|
||||
- name: Describe pods
|
||||
if: failure()
|
||||
shell: bash
|
||||
run: |
|
||||
kubectl describe pod -n my-greptimedb
|
||||
|
||||
build-greptime-ci:
|
||||
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
|
||||
@@ -275,7 +281,7 @@ jobs:
|
||||
- name: Install cargo-gc-bin
|
||||
shell: bash
|
||||
run: cargo install cargo-gc-bin --force
|
||||
- name: Build greptime bianry
|
||||
- name: Build greptime binary
|
||||
shell: bash
|
||||
# `cargo gc` will invoke `cargo build` with specified args
|
||||
run: cargo gc --profile ci -- --bin greptime --features "pg_kvbackend,mysql_kvbackend"
|
||||
@@ -299,6 +305,7 @@ jobs:
|
||||
needs: build-greptime-ci
|
||||
timeout-minutes: 60
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
target: [ "fuzz_create_table", "fuzz_alter_table", "fuzz_create_database", "fuzz_create_logical_table", "fuzz_alter_logical_table", "fuzz_insert", "fuzz_insert_logical_table" ]
|
||||
mode:
|
||||
@@ -328,9 +335,9 @@ jobs:
|
||||
name: Setup Minio
|
||||
uses: ./.github/actions/setup-minio
|
||||
- if: matrix.mode.kafka
|
||||
name: Setup Kafka cluser
|
||||
name: Setup Kafka cluster
|
||||
uses: ./.github/actions/setup-kafka-cluster
|
||||
- name: Setup Etcd cluser
|
||||
- name: Setup Etcd cluster
|
||||
uses: ./.github/actions/setup-etcd-cluster
|
||||
# Prepares for fuzz tests
|
||||
- uses: arduino/setup-protoc@v3
|
||||
@@ -403,6 +410,11 @@ jobs:
|
||||
shell: bash
|
||||
run: |
|
||||
kubectl describe nodes
|
||||
- name: Describe pod
|
||||
if: failure()
|
||||
shell: bash
|
||||
run: |
|
||||
kubectl describe pod -n my-greptimedb
|
||||
- name: Export kind logs
|
||||
if: failure()
|
||||
shell: bash
|
||||
@@ -431,6 +443,7 @@ jobs:
|
||||
needs: build-greptime-ci
|
||||
timeout-minutes: 60
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
target: ["fuzz_migrate_mito_regions", "fuzz_migrate_metric_regions", "fuzz_failover_mito_regions", "fuzz_failover_metric_regions"]
|
||||
mode:
|
||||
@@ -475,9 +488,9 @@ jobs:
|
||||
name: Setup Minio
|
||||
uses: ./.github/actions/setup-minio
|
||||
- if: matrix.mode.kafka
|
||||
name: Setup Kafka cluser
|
||||
name: Setup Kafka cluster
|
||||
uses: ./.github/actions/setup-kafka-cluster
|
||||
- name: Setup Etcd cluser
|
||||
- name: Setup Etcd cluster
|
||||
uses: ./.github/actions/setup-etcd-cluster
|
||||
# Prepares for fuzz tests
|
||||
- uses: arduino/setup-protoc@v3
|
||||
@@ -551,6 +564,11 @@ jobs:
|
||||
shell: bash
|
||||
run: |
|
||||
kubectl describe nodes
|
||||
- name: Describe pods
|
||||
if: failure()
|
||||
shell: bash
|
||||
run: |
|
||||
kubectl describe pod -n my-greptimedb
|
||||
- name: Export kind logs
|
||||
if: failure()
|
||||
shell: bash
|
||||
@@ -578,6 +596,7 @@ jobs:
|
||||
needs: build
|
||||
runs-on: ${{ matrix.os }}
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
os: [ ubuntu-latest ]
|
||||
mode:
|
||||
|
||||
6
.github/workflows/nightly-ci.yml
vendored
6
.github/workflows/nightly-ci.yml
vendored
@@ -124,9 +124,9 @@ jobs:
|
||||
fetch-depth: 0
|
||||
persist-credentials: false
|
||||
- uses: cachix/install-nix-action@v31
|
||||
with:
|
||||
nix_path: nixpkgs=channel:nixos-24.11
|
||||
- run: nix develop --command cargo build --bin greptime
|
||||
- run: nix develop --command cargo check --bin greptime
|
||||
env:
|
||||
CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=mold"
|
||||
|
||||
check-status:
|
||||
name: Check status
|
||||
|
||||
84
.github/workflows/release.yml
vendored
84
.github/workflows/release.yml
vendored
@@ -88,7 +88,7 @@ env:
|
||||
# Controls whether to run tests, include unit-test, integration-test and sqlness.
|
||||
DISABLE_RUN_TESTS: ${{ inputs.skip_test || vars.DEFAULT_SKIP_TEST }}
|
||||
|
||||
# The scheduled version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-YYYYMMDD', like v0.2.0-nigthly-20230313;
|
||||
# The scheduled version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-YYYYMMDD', like v0.2.0-nightly-20230313;
|
||||
NIGHTLY_RELEASE_PREFIX: nightly
|
||||
|
||||
jobs:
|
||||
@@ -124,7 +124,7 @@ jobs:
|
||||
|
||||
# The create-version will create a global variable named 'version' in the global workflows.
|
||||
# - If it's a tag push release, the version is the tag name(${{ github.ref_name }});
|
||||
# - If it's a scheduled release, the version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-$buildTime', like v0.2.0-nigthly-20230313;
|
||||
# - If it's a scheduled release, the version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-$buildTime', like v0.2.0-nightly-20230313;
|
||||
# - If it's a manual release, the version is '${{ env.NEXT_RELEASE_VERSION }}-<short-git-sha>-YYYYMMDDSS', like v0.2.0-e5b243c-2023071245;
|
||||
- name: Create version
|
||||
id: create-version
|
||||
@@ -388,7 +388,7 @@ jobs:
|
||||
|
||||
### Stop runners ###
|
||||
# It's very necessary to split the job of releasing runners into 'stop-linux-amd64-runner' and 'stop-linux-arm64-runner'.
|
||||
# Because we can terminate the specified EC2 instance immediately after the job is finished without uncessary waiting.
|
||||
# Because we can terminate the specified EC2 instance immediately after the job is finished without unnecessary waiting.
|
||||
stop-linux-amd64-runner: # It's always run as the last job in the workflow to make sure that the runner is released.
|
||||
name: Stop linux-amd64 runner
|
||||
# Only run this job when the runner is allocated.
|
||||
@@ -441,10 +441,10 @@ jobs:
|
||||
aws-region: ${{ vars.EC2_RUNNER_REGION }}
|
||||
github-token: ${{ secrets.GH_PERSONAL_ACCESS_TOKEN }}
|
||||
|
||||
bump-doc-version:
|
||||
name: Bump doc version
|
||||
bump-downstream-repo-versions:
|
||||
name: Bump downstream repo versions
|
||||
if: ${{ github.event_name == 'push' || github.event_name == 'schedule' }}
|
||||
needs: [allocate-runners]
|
||||
needs: [allocate-runners, publish-github-release]
|
||||
runs-on: ubuntu-latest
|
||||
# Permission reference: https://docs.github.com/en/actions/using-jobs/assigning-permissions-to-jobs
|
||||
permissions:
|
||||
@@ -456,36 +456,58 @@ jobs:
|
||||
fetch-depth: 0
|
||||
persist-credentials: false
|
||||
- uses: ./.github/actions/setup-cyborg
|
||||
- name: Bump doc version
|
||||
- name: Bump downstream repo versions
|
||||
working-directory: cyborg
|
||||
run: pnpm tsx bin/bump-doc-version.ts
|
||||
env:
|
||||
VERSION: ${{ needs.allocate-runners.outputs.version }}
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
DOCS_REPO_TOKEN: ${{ secrets.DOCS_REPO_TOKEN }}
|
||||
|
||||
bump-website-version:
|
||||
name: Bump website version
|
||||
if: ${{ github.event_name == 'push' || github.event_name == 'schedule' }}
|
||||
needs: [allocate-runners]
|
||||
runs-on: ubuntu-latest
|
||||
# Permission reference: https://docs.github.com/en/actions/using-jobs/assigning-permissions-to-jobs
|
||||
permissions:
|
||||
issues: write # Allows the action to create issues for cyborg.
|
||||
contents: write # Allows the action to create a release.
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
persist-credentials: false
|
||||
- uses: ./.github/actions/setup-cyborg
|
||||
- name: Bump website version
|
||||
working-directory: cyborg
|
||||
run: pnpm tsx bin/bump-website-version.ts
|
||||
run: pnpm tsx bin/bump-versions.ts
|
||||
env:
|
||||
TARGET_REPOS: website,docs,demo
|
||||
VERSION: ${{ needs.allocate-runners.outputs.version }}
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
WEBSITE_REPO_TOKEN: ${{ secrets.WEBSITE_REPO_TOKEN }}
|
||||
DOCS_REPO_TOKEN: ${{ secrets.DOCS_REPO_TOKEN }}
|
||||
DEMO_REPO_TOKEN: ${{ secrets.DEMO_REPO_TOKEN }}
|
||||
|
||||
bump-helm-charts-version:
|
||||
name: Bump helm charts version
|
||||
if: ${{ github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
|
||||
needs: [allocate-runners, publish-github-release]
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: write
|
||||
pull-requests: write
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Bump helm charts version
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.HELM_CHARTS_REPO_TOKEN }}
|
||||
VERSION: ${{ needs.allocate-runners.outputs.version }}
|
||||
run: |
|
||||
./.github/scripts/update-helm-charts-version.sh
|
||||
|
||||
bump-homebrew-greptime-version:
|
||||
name: Bump homebrew greptime version
|
||||
if: ${{ github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
|
||||
needs: [allocate-runners, publish-github-release]
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: write
|
||||
pull-requests: write
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Bump homebrew greptime version
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.HOMEBREW_GREPTIME_REPO_TOKEN }}
|
||||
VERSION: ${{ needs.allocate-runners.outputs.version }}
|
||||
run: |
|
||||
./.github/scripts/update-homebrew-greptme-version.sh
|
||||
|
||||
notification:
|
||||
if: ${{ github.repository == 'GreptimeTeam/greptimedb' && (github.event_name == 'push' || github.event_name == 'schedule') && always() }}
|
||||
|
||||
3
.github/workflows/semantic-pull-request.yml
vendored
3
.github/workflows/semantic-pull-request.yml
vendored
@@ -14,6 +14,9 @@ concurrency:
|
||||
jobs:
|
||||
check:
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
pull-requests: write # Add permissions to modify PRs
|
||||
issues: write
|
||||
timeout-minutes: 10
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
3
.gitignore
vendored
3
.gitignore
vendored
@@ -58,3 +58,6 @@ tests-fuzz/corpus/
|
||||
|
||||
## default data home
|
||||
greptimedb_data
|
||||
|
||||
# github
|
||||
!/.github
|
||||
@@ -108,7 +108,7 @@ of what you were trying to do and what went wrong. You can also reach for help i
|
||||
The core team will be thrilled if you would like to participate in any way you like. When you are stuck, try to ask for help by filing an issue, with a detailed description of what you were trying to do and what went wrong. If you have any questions or if you would like to get involved in our community, please check out:
|
||||
|
||||
- [GreptimeDB Community Slack](https://greptime.com/slack)
|
||||
- [GreptimeDB Github Discussions](https://github.com/GreptimeTeam/greptimedb/discussions)
|
||||
- [GreptimeDB GitHub Discussions](https://github.com/GreptimeTeam/greptimedb/discussions)
|
||||
|
||||
Also, see some extra GreptimeDB content:
|
||||
|
||||
|
||||
1059
Cargo.lock
generated
1059
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
32
Cargo.toml
32
Cargo.toml
@@ -30,12 +30,14 @@ members = [
|
||||
"src/common/recordbatch",
|
||||
"src/common/runtime",
|
||||
"src/common/session",
|
||||
"src/common/stat",
|
||||
"src/common/substrait",
|
||||
"src/common/telemetry",
|
||||
"src/common/test-util",
|
||||
"src/common/time",
|
||||
"src/common/version",
|
||||
"src/common/wal",
|
||||
"src/common/workload",
|
||||
"src/datanode",
|
||||
"src/datatypes",
|
||||
"src/file-engine",
|
||||
@@ -78,6 +80,7 @@ clippy.implicit_clone = "warn"
|
||||
clippy.result_large_err = "allow"
|
||||
clippy.large_enum_variant = "allow"
|
||||
clippy.doc_overindented_list_items = "allow"
|
||||
clippy.uninlined_format_args = "allow"
|
||||
rust.unknown_lints = "deny"
|
||||
rust.unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
|
||||
|
||||
@@ -113,15 +116,15 @@ clap = { version = "4.4", features = ["derive"] }
|
||||
config = "0.13.0"
|
||||
crossbeam-utils = "0.8"
|
||||
dashmap = "6.1"
|
||||
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
|
||||
deadpool = "0.12"
|
||||
deadpool-postgres = "0.14"
|
||||
derive_builder = "0.20"
|
||||
@@ -130,7 +133,7 @@ etcd-client = "0.14"
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "17a3550751c8b1e02ec16be40101d5f24dc255c3" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "454c52634c3bac27de10bf0d85d5533eed1cf03f" }
|
||||
hex = "0.4"
|
||||
http = "1"
|
||||
humantime = "2.1"
|
||||
@@ -146,6 +149,7 @@ meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev =
|
||||
mockall = "0.13"
|
||||
moka = "0.12"
|
||||
nalgebra = "0.33"
|
||||
nix = { version = "0.30.1", default-features = false, features = ["event", "fs", "process"] }
|
||||
notify = "8.0"
|
||||
num_cpus = "1.16"
|
||||
object_store_opendal = "0.50"
|
||||
@@ -162,7 +166,9 @@ parquet = { version = "54.2", default-features = false, features = ["arrow", "as
|
||||
paste = "1.0"
|
||||
pin-project = "1.0"
|
||||
prometheus = { version = "0.13.3", features = ["process"] }
|
||||
promql-parser = { version = "0.5.1", features = ["ser"] }
|
||||
promql-parser = { git = "https://github.com/GreptimeTeam/promql-parser.git", rev = "0410e8b459dda7cb222ce9596f8bf3971bd07bd2", features = [
|
||||
"ser",
|
||||
] }
|
||||
prost = { version = "0.13", features = ["no-recursion-limit"] }
|
||||
raft-engine = { version = "0.4.1", default-features = false }
|
||||
rand = "0.9"
|
||||
@@ -175,7 +181,7 @@ reqwest = { version = "0.12", default-features = false, features = [
|
||||
"stream",
|
||||
"multipart",
|
||||
] }
|
||||
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev = "75535b5ad9bae4a5dbb582c82e44dfd81ec10105", features = [
|
||||
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev = "8dbd01ed809f5a791833a594e85b144e36e45820", features = [
|
||||
"transport-tls",
|
||||
] }
|
||||
rstest = "0.25"
|
||||
@@ -256,6 +262,7 @@ common-test-util = { path = "src/common/test-util" }
|
||||
common-time = { path = "src/common/time" }
|
||||
common-version = { path = "src/common/version" }
|
||||
common-wal = { path = "src/common/wal" }
|
||||
common-workload = { path = "src/common/workload" }
|
||||
datanode = { path = "src/datanode" }
|
||||
datatypes = { path = "src/datatypes" }
|
||||
file-engine = { path = "src/file-engine" }
|
||||
@@ -282,6 +289,7 @@ query = { path = "src/query" }
|
||||
servers = { path = "src/servers" }
|
||||
session = { path = "src/session" }
|
||||
sql = { path = "src/sql" }
|
||||
stat = { path = "src/common/stat" }
|
||||
store-api = { path = "src/store-api" }
|
||||
substrait = { path = "src/common/substrait" }
|
||||
table = { path = "src/table" }
|
||||
|
||||
2
Makefile
2
Makefile
@@ -8,7 +8,7 @@ CARGO_BUILD_OPTS := --locked
|
||||
IMAGE_REGISTRY ?= docker.io
|
||||
IMAGE_NAMESPACE ?= greptime
|
||||
IMAGE_TAG ?= latest
|
||||
DEV_BUILDER_IMAGE_TAG ?= 2025-04-15-1a517ec8-20250428023155
|
||||
DEV_BUILDER_IMAGE_TAG ?= 2025-05-19-b2377d4b-20250520045554
|
||||
BUILDX_MULTI_PLATFORM_BUILD ?= false
|
||||
BUILDX_BUILDER_NAME ?= gtbuilder
|
||||
BASE_IMAGE ?= ubuntu
|
||||
|
||||
11
README.md
11
README.md
@@ -121,7 +121,7 @@ docker pull greptime/greptimedb
|
||||
|
||||
```shell
|
||||
docker run -p 127.0.0.1:4000-4003:4000-4003 \
|
||||
-v "$(pwd)/greptimedb:/greptimedb_data" \
|
||||
-v "$(pwd)/greptimedb_data:/greptimedb_data" \
|
||||
--name greptime --rm \
|
||||
greptime/greptimedb:latest standalone start \
|
||||
--http-addr 0.0.0.0:4000 \
|
||||
@@ -129,7 +129,7 @@ docker run -p 127.0.0.1:4000-4003:4000-4003 \
|
||||
--mysql-addr 0.0.0.0:4002 \
|
||||
--postgres-addr 0.0.0.0:4003
|
||||
```
|
||||
Dashboard: [http://localhost:4000/dashboard](http://localhost:4000/dashboard)
|
||||
Dashboard: [http://localhost:4000/dashboard](http://localhost:4000/dashboard)
|
||||
[Full Install Guide](https://docs.greptime.com/getting-started/installation/overview)
|
||||
|
||||
**Troubleshooting:**
|
||||
@@ -167,7 +167,7 @@ cargo run -- standalone start
|
||||
|
||||
## Project Status
|
||||
|
||||
> **Status:** Beta.
|
||||
> **Status:** Beta.
|
||||
> **GA (v1.0):** Targeted for mid 2025.
|
||||
|
||||
- Being used in production by early adopters
|
||||
@@ -197,8 +197,8 @@ GreptimeDB is licensed under the [Apache License 2.0](https://apache.org/license
|
||||
|
||||
## Commercial Support
|
||||
|
||||
Running GreptimeDB in your organization?
|
||||
We offer enterprise add-ons, services, training, and consulting.
|
||||
Running GreptimeDB in your organization?
|
||||
We offer enterprise add-ons, services, training, and consulting.
|
||||
[Contact us](https://greptime.com/contactus) for details.
|
||||
|
||||
## Contributing
|
||||
@@ -215,4 +215,3 @@ Special thanks to all contributors! See [AUTHORS.md](https://github.com/Greptime
|
||||
- [Apache Parquet™](https://parquet.apache.org/) (file storage)
|
||||
- [Apache Arrow DataFusion™](https://arrow.apache.org/datafusion/) (query engine)
|
||||
- [Apache OpenDAL™](https://opendal.apache.org/) (data access abstraction)
|
||||
- [etcd](https://etcd.io/) (meta service)
|
||||
|
||||
@@ -27,6 +27,7 @@
|
||||
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
|
||||
| `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default<br/>This allows browser to access http APIs without CORS restrictions |
|
||||
| `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. |
|
||||
| `http.prom_validation_mode` | String | `strict` | Whether to enable validation for Prometheus remote write requests.<br/>Available options:<br/>- strict: deny invalid UTF-8 strings (default).<br/>- lossy: allow invalid UTF-8 strings, replace invalid characters with REPLACEMENT_CHARACTER(U+FFFD).<br/>- unchecked: do not valid strings. |
|
||||
| `grpc` | -- | -- | The gRPC server options. |
|
||||
| `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
|
||||
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
|
||||
@@ -99,7 +100,7 @@
|
||||
| `query` | -- | -- | The query engine options. |
|
||||
| `query.parallelism` | Integer | `0` | Parallelism of the query engine.<br/>Default to 0, which means the number of CPU cores. |
|
||||
| `storage` | -- | -- | The data storage options. |
|
||||
| `storage.data_home` | String | `./greptimedb_data/` | The working home directory. |
|
||||
| `storage.data_home` | String | `./greptimedb_data` | The working home directory. |
|
||||
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
|
||||
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
|
||||
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
|
||||
@@ -154,6 +155,7 @@
|
||||
| `region_engine.mito.index.metadata_cache_size` | String | `64MiB` | Cache size for inverted index metadata. |
|
||||
| `region_engine.mito.index.content_cache_size` | String | `128MiB` | Cache size for inverted index content. |
|
||||
| `region_engine.mito.index.content_cache_page_size` | String | `64KiB` | Page size for inverted index content cache. |
|
||||
| `region_engine.mito.index.result_cache_size` | String | `128MiB` | Cache size for index result. |
|
||||
| `region_engine.mito.inverted_index` | -- | -- | The options for inverted index in Mito engine. |
|
||||
| `region_engine.mito.inverted_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
|
||||
| `region_engine.mito.inverted_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
|
||||
@@ -188,17 +190,18 @@
|
||||
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
|
||||
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
|
||||
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
|
||||
| `logging.slow_query` | -- | -- | The slow query log options. |
|
||||
| `logging.slow_query.enable` | Bool | `false` | Whether to enable slow query log. |
|
||||
| `logging.slow_query.threshold` | String | Unset | The threshold of slow query. |
|
||||
| `logging.slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. |
|
||||
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `slow_query` | -- | -- | The slow query log options. |
|
||||
| `slow_query.enable` | Bool | `false` | Whether to enable slow query log. |
|
||||
| `slow_query.record_type` | String | Unset | The record type of slow queries. It can be `system_table` or `log`. |
|
||||
| `slow_query.threshold` | String | Unset | The threshold of slow query. |
|
||||
| `slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. |
|
||||
| `export_metrics` | -- | -- | The standalone can export its metrics and send to Prometheus compatible service (e.g. `greptimedb`) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
|
||||
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
|
||||
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommended to collect metrics generated by itself<br/>You must create the database before enabling it. |
|
||||
| `export_metrics.self_import.db` | String | Unset | -- |
|
||||
| `export_metrics.remote_write` | -- | -- | -- |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
|
||||
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
|
||||
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
|
||||
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
|
||||
@@ -224,10 +227,12 @@
|
||||
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
|
||||
| `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default<br/>This allows browser to access http APIs without CORS restrictions |
|
||||
| `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. |
|
||||
| `http.prom_validation_mode` | String | `strict` | Whether to enable validation for Prometheus remote write requests.<br/>Available options:<br/>- strict: deny invalid UTF-8 strings (default).<br/>- lossy: allow invalid UTF-8 strings, replace invalid characters with REPLACEMENT_CHARACTER(U+FFFD).<br/>- unchecked: do not valid strings. |
|
||||
| `grpc` | -- | -- | The gRPC server options. |
|
||||
| `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
|
||||
| `grpc.server_addr` | String | `127.0.0.1:4001` | The address advertised to the metasrv, and used for connections from outside the host.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `grpc.bind_addr`. |
|
||||
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
|
||||
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for frontend side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression.<br/>Default to `none` |
|
||||
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
|
||||
| `grpc.tls.mode` | String | `disable` | TLS mode. |
|
||||
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |
|
||||
@@ -288,17 +293,17 @@
|
||||
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
|
||||
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
|
||||
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
|
||||
| `logging.slow_query` | -- | -- | The slow query log options. |
|
||||
| `logging.slow_query.enable` | Bool | `false` | Whether to enable slow query log. |
|
||||
| `logging.slow_query.threshold` | String | Unset | The threshold of slow query. |
|
||||
| `logging.slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. |
|
||||
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `slow_query` | -- | -- | The slow query log options. |
|
||||
| `slow_query.enable` | Bool | `true` | Whether to enable slow query log. |
|
||||
| `slow_query.record_type` | String | `system_table` | The record type of slow queries. It can be `system_table` or `log`.<br/>If `system_table` is selected, the slow queries will be recorded in a system table `greptime_private.slow_queries`.<br/>If `log` is selected, the slow queries will be logged in a log file `greptimedb-slow-queries.*`. |
|
||||
| `slow_query.threshold` | String | `30s` | The threshold of slow query. It can be human readable time string, for example: `10s`, `100ms`, `1s`. |
|
||||
| `slow_query.sample_ratio` | Float | `1.0` | The sampling ratio of slow query log. The value should be in the range of (0, 1]. For example, `0.1` means 10% of the slow queries will be logged and `1.0` means all slow queries will be logged. |
|
||||
| `slow_query.ttl` | String | `30d` | The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`. |
|
||||
| `export_metrics` | -- | -- | The frontend can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
|
||||
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
|
||||
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
|
||||
| `export_metrics.self_import.db` | String | Unset | -- |
|
||||
| `export_metrics.remote_write` | -- | -- | -- |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
|
||||
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
|
||||
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
|
||||
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
|
||||
@@ -308,12 +313,10 @@
|
||||
|
||||
| Key | Type | Default | Descriptions |
|
||||
| --- | -----| ------- | ----------- |
|
||||
| `data_home` | String | `./greptimedb_data/metasrv/` | The working home directory. |
|
||||
| `bind_addr` | String | `127.0.0.1:3002` | The bind address of metasrv. |
|
||||
| `server_addr` | String | `127.0.0.1:3002` | The communication server address for the frontend and datanode to connect to metasrv.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `bind_addr`. |
|
||||
| `data_home` | String | `./greptimedb_data` | The working home directory. |
|
||||
| `store_addrs` | Array | -- | Store server address default to etcd store.<br/>For postgres store, the format is:<br/>"password=password dbname=postgres user=postgres host=localhost port=5432"<br/>For etcd store, the format is:<br/>"127.0.0.1:2379" |
|
||||
| `store_key_prefix` | String | `""` | If it's not empty, the metasrv will store all data with this key prefix. |
|
||||
| `backend` | String | `etcd_store` | The datastore for meta server.<br/>Available values:<br/>- `etcd_store` (default value)<br/>- `memory_store`<br/>- `postgres_store` |
|
||||
| `backend` | String | `etcd_store` | The datastore for meta server.<br/>Available values:<br/>- `etcd_store` (default value)<br/>- `memory_store`<br/>- `postgres_store`<br/>- `mysql_store` |
|
||||
| `meta_table_name` | String | `greptime_metakv` | Table name in RDS to store metadata. Effect when using a RDS kvbackend.<br/>**Only used when backend is `postgres_store`.** |
|
||||
| `meta_election_lock_id` | Integer | `1` | Advisory lock id in PostgreSQL for election. Effect when using PostgreSQL as kvbackend<br/>Only used when backend is `postgres_store`. |
|
||||
| `selector` | String | `round_robin` | Datanode selector type.<br/>- `round_robin` (default value)<br/>- `lease_based`<br/>- `load_based`<br/>For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". |
|
||||
@@ -325,6 +328,16 @@
|
||||
| `runtime` | -- | -- | The runtime options. |
|
||||
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
|
||||
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
|
||||
| `grpc` | -- | -- | The gRPC server options. |
|
||||
| `grpc.bind_addr` | String | `127.0.0.1:3002` | The address to bind the gRPC server. |
|
||||
| `grpc.server_addr` | String | `127.0.0.1:3002` | The communication server address for the frontend and datanode to connect to metasrv.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `bind_addr`. |
|
||||
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
|
||||
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
|
||||
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
|
||||
| `http` | -- | -- | The HTTP server options. |
|
||||
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
|
||||
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
|
||||
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
|
||||
| `procedure` | -- | -- | Procedure storage options. |
|
||||
| `procedure.max_retry_times` | Integer | `12` | Procedure max retry time. |
|
||||
| `procedure.retry_delay` | String | `500ms` | Initial retry delay of procedures, increases exponentially |
|
||||
@@ -362,17 +375,11 @@
|
||||
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
|
||||
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
|
||||
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
|
||||
| `logging.slow_query` | -- | -- | The slow query log options. |
|
||||
| `logging.slow_query.enable` | Bool | `false` | Whether to enable slow query log. |
|
||||
| `logging.slow_query.threshold` | String | Unset | The threshold of slow query. |
|
||||
| `logging.slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. |
|
||||
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics` | -- | -- | The metasrv can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
|
||||
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
|
||||
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
|
||||
| `export_metrics.self_import.db` | String | Unset | -- |
|
||||
| `export_metrics.remote_write` | -- | -- | -- |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
|
||||
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
|
||||
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
|
||||
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
|
||||
@@ -398,6 +405,7 @@
|
||||
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
|
||||
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
|
||||
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
|
||||
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for datanode side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression.<br/>Default to `none` |
|
||||
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
|
||||
| `grpc.tls.mode` | String | `disable` | TLS mode. |
|
||||
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |
|
||||
@@ -440,7 +448,7 @@
|
||||
| `query` | -- | -- | The query engine options. |
|
||||
| `query.parallelism` | Integer | `0` | Parallelism of the query engine.<br/>Default to 0, which means the number of CPU cores. |
|
||||
| `storage` | -- | -- | The data storage options. |
|
||||
| `storage.data_home` | String | `./greptimedb_data/` | The working home directory. |
|
||||
| `storage.data_home` | String | `./greptimedb_data` | The working home directory. |
|
||||
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
|
||||
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
|
||||
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
|
||||
@@ -495,6 +503,7 @@
|
||||
| `region_engine.mito.index.metadata_cache_size` | String | `64MiB` | Cache size for inverted index metadata. |
|
||||
| `region_engine.mito.index.content_cache_size` | String | `128MiB` | Cache size for inverted index content. |
|
||||
| `region_engine.mito.index.content_cache_page_size` | String | `64KiB` | Page size for inverted index content cache. |
|
||||
| `region_engine.mito.index.result_cache_size` | String | `128MiB` | Cache size for index result. |
|
||||
| `region_engine.mito.inverted_index` | -- | -- | The options for inverted index in Mito engine. |
|
||||
| `region_engine.mito.inverted_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
|
||||
| `region_engine.mito.inverted_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
|
||||
@@ -529,17 +538,11 @@
|
||||
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
|
||||
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
|
||||
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
|
||||
| `logging.slow_query` | -- | -- | The slow query log options. |
|
||||
| `logging.slow_query.enable` | Bool | `false` | Whether to enable slow query log. |
|
||||
| `logging.slow_query.threshold` | String | Unset | The threshold of slow query. |
|
||||
| `logging.slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. |
|
||||
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
|
||||
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
|
||||
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
|
||||
| `export_metrics.self_import.db` | String | Unset | -- |
|
||||
| `export_metrics.remote_write` | -- | -- | -- |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
|
||||
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
|
||||
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
|
||||
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
|
||||
@@ -585,9 +588,5 @@
|
||||
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
|
||||
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
|
||||
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
|
||||
| `logging.slow_query` | -- | -- | The slow query log options. |
|
||||
| `logging.slow_query.enable` | Bool | `false` | Whether to enable slow query log. |
|
||||
| `logging.slow_query.threshold` | String | Unset | The threshold of slow query. |
|
||||
| `logging.slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. |
|
||||
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
|
||||
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
|
||||
|
||||
@@ -44,6 +44,13 @@ runtime_size = 8
|
||||
max_recv_message_size = "512MB"
|
||||
## The maximum send message size for gRPC server.
|
||||
max_send_message_size = "512MB"
|
||||
## Compression mode for datanode side Arrow IPC service. Available options:
|
||||
## - `none`: disable all compression
|
||||
## - `transport`: only enable gRPC transport compression (zstd)
|
||||
## - `arrow_ipc`: only enable Arrow IPC compression (lz4)
|
||||
## - `all`: enable all compression.
|
||||
## Default to `none`
|
||||
flight_compression = "arrow_ipc"
|
||||
|
||||
## gRPC server TLS options, see `mysql.tls` section.
|
||||
[grpc.tls]
|
||||
@@ -252,7 +259,7 @@ parallelism = 0
|
||||
## The data storage options.
|
||||
[storage]
|
||||
## The working home directory.
|
||||
data_home = "./greptimedb_data/"
|
||||
data_home = "./greptimedb_data"
|
||||
|
||||
## The storage type used to store the data.
|
||||
## - `File`: the data is stored in the local file system.
|
||||
@@ -499,6 +506,9 @@ content_cache_size = "128MiB"
|
||||
## Page size for inverted index content cache.
|
||||
content_cache_page_size = "64KiB"
|
||||
|
||||
## Cache size for index result.
|
||||
result_cache_size = "128MiB"
|
||||
|
||||
## The options for inverted index in Mito engine.
|
||||
[region_engine.mito.inverted_index]
|
||||
|
||||
@@ -632,37 +642,16 @@ max_log_files = 720
|
||||
[logging.tracing_sample_ratio]
|
||||
default_ratio = 1.0
|
||||
|
||||
## The slow query log options.
|
||||
[logging.slow_query]
|
||||
## Whether to enable slow query log.
|
||||
enable = false
|
||||
|
||||
## The threshold of slow query.
|
||||
## @toml2docs:none-default
|
||||
threshold = "10s"
|
||||
|
||||
## The sampling ratio of slow query log. The value should be in the range of (0, 1].
|
||||
## @toml2docs:none-default
|
||||
sample_ratio = 1.0
|
||||
|
||||
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
|
||||
## The datanode can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.
|
||||
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
|
||||
[export_metrics]
|
||||
|
||||
## whether enable export metrics.
|
||||
enable = false
|
||||
|
||||
## The interval of export metrics.
|
||||
write_interval = "30s"
|
||||
|
||||
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
|
||||
## You must create the database before enabling it.
|
||||
[export_metrics.self_import]
|
||||
## @toml2docs:none-default
|
||||
db = "greptime_metrics"
|
||||
|
||||
[export_metrics.remote_write]
|
||||
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
|
||||
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
|
||||
url = ""
|
||||
|
||||
## HTTP headers of Prometheus remote-write carry.
|
||||
|
||||
@@ -100,19 +100,6 @@ max_log_files = 720
|
||||
[logging.tracing_sample_ratio]
|
||||
default_ratio = 1.0
|
||||
|
||||
## The slow query log options.
|
||||
[logging.slow_query]
|
||||
## Whether to enable slow query log.
|
||||
enable = false
|
||||
|
||||
## The threshold of slow query.
|
||||
## @toml2docs:none-default
|
||||
threshold = "10s"
|
||||
|
||||
## The sampling ratio of slow query log. The value should be in the range of (0, 1].
|
||||
## @toml2docs:none-default
|
||||
sample_ratio = 1.0
|
||||
|
||||
## The tracing options. Only effect when compiled with `tokio-console` feature.
|
||||
#+ [tracing]
|
||||
## The tokio console address.
|
||||
|
||||
@@ -37,6 +37,12 @@ enable_cors = true
|
||||
## Customize allowed origins for HTTP CORS.
|
||||
## @toml2docs:none-default
|
||||
cors_allowed_origins = ["https://example.com"]
|
||||
## Whether to enable validation for Prometheus remote write requests.
|
||||
## Available options:
|
||||
## - strict: deny invalid UTF-8 strings (default).
|
||||
## - lossy: allow invalid UTF-8 strings, replace invalid characters with REPLACEMENT_CHARACTER(U+FFFD).
|
||||
## - unchecked: do not valid strings.
|
||||
prom_validation_mode = "strict"
|
||||
|
||||
## The gRPC server options.
|
||||
[grpc]
|
||||
@@ -48,6 +54,13 @@ bind_addr = "127.0.0.1:4001"
|
||||
server_addr = "127.0.0.1:4001"
|
||||
## The number of server worker threads.
|
||||
runtime_size = 8
|
||||
## Compression mode for frontend side Arrow IPC service. Available options:
|
||||
## - `none`: disable all compression
|
||||
## - `transport`: only enable gRPC transport compression (zstd)
|
||||
## - `arrow_ipc`: only enable Arrow IPC compression (lz4)
|
||||
## - `all`: enable all compression.
|
||||
## Default to `none`
|
||||
flight_compression = "arrow_ipc"
|
||||
|
||||
## gRPC server TLS options, see `mysql.tls` section.
|
||||
[grpc.tls]
|
||||
@@ -223,36 +236,34 @@ max_log_files = 720
|
||||
default_ratio = 1.0
|
||||
|
||||
## The slow query log options.
|
||||
[logging.slow_query]
|
||||
[slow_query]
|
||||
## Whether to enable slow query log.
|
||||
enable = false
|
||||
enable = true
|
||||
|
||||
## The threshold of slow query.
|
||||
## @toml2docs:none-default
|
||||
threshold = "10s"
|
||||
## The record type of slow queries. It can be `system_table` or `log`.
|
||||
## If `system_table` is selected, the slow queries will be recorded in a system table `greptime_private.slow_queries`.
|
||||
## If `log` is selected, the slow queries will be logged in a log file `greptimedb-slow-queries.*`.
|
||||
record_type = "system_table"
|
||||
|
||||
## The sampling ratio of slow query log. The value should be in the range of (0, 1].
|
||||
## @toml2docs:none-default
|
||||
## The threshold of slow query. It can be human readable time string, for example: `10s`, `100ms`, `1s`.
|
||||
threshold = "30s"
|
||||
|
||||
## The sampling ratio of slow query log. The value should be in the range of (0, 1]. For example, `0.1` means 10% of the slow queries will be logged and `1.0` means all slow queries will be logged.
|
||||
sample_ratio = 1.0
|
||||
|
||||
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
|
||||
## The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`.
|
||||
ttl = "30d"
|
||||
|
||||
## The frontend can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.
|
||||
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
|
||||
[export_metrics]
|
||||
|
||||
## whether enable export metrics.
|
||||
enable = false
|
||||
|
||||
## The interval of export metrics.
|
||||
write_interval = "30s"
|
||||
|
||||
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
|
||||
## You must create the database before enabling it.
|
||||
[export_metrics.self_import]
|
||||
## @toml2docs:none-default
|
||||
db = "greptime_metrics"
|
||||
|
||||
[export_metrics.remote_write]
|
||||
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
|
||||
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
|
||||
url = ""
|
||||
|
||||
## HTTP headers of Prometheus remote-write carry.
|
||||
|
||||
@@ -1,13 +1,5 @@
|
||||
## The working home directory.
|
||||
data_home = "./greptimedb_data/metasrv/"
|
||||
|
||||
## The bind address of metasrv.
|
||||
bind_addr = "127.0.0.1:3002"
|
||||
|
||||
## The communication server address for the frontend and datanode to connect to metasrv.
|
||||
## If left empty or unset, the server will automatically use the IP address of the first network interface
|
||||
## on the host, with the same port number as the one specified in `bind_addr`.
|
||||
server_addr = "127.0.0.1:3002"
|
||||
data_home = "./greptimedb_data"
|
||||
|
||||
## Store server address default to etcd store.
|
||||
## For postgres store, the format is:
|
||||
@@ -24,6 +16,7 @@ store_key_prefix = ""
|
||||
## - `etcd_store` (default value)
|
||||
## - `memory_store`
|
||||
## - `postgres_store`
|
||||
## - `mysql_store`
|
||||
backend = "etcd_store"
|
||||
|
||||
## Table name in RDS to store metadata. Effect when using a RDS kvbackend.
|
||||
@@ -67,6 +60,32 @@ node_max_idle_time = "24hours"
|
||||
## The number of threads to execute the runtime for global write operations.
|
||||
#+ compact_rt_size = 4
|
||||
|
||||
## The gRPC server options.
|
||||
[grpc]
|
||||
## The address to bind the gRPC server.
|
||||
bind_addr = "127.0.0.1:3002"
|
||||
## The communication server address for the frontend and datanode to connect to metasrv.
|
||||
## If left empty or unset, the server will automatically use the IP address of the first network interface
|
||||
## on the host, with the same port number as the one specified in `bind_addr`.
|
||||
server_addr = "127.0.0.1:3002"
|
||||
## The number of server worker threads.
|
||||
runtime_size = 8
|
||||
## The maximum receive message size for gRPC server.
|
||||
max_recv_message_size = "512MB"
|
||||
## The maximum send message size for gRPC server.
|
||||
max_send_message_size = "512MB"
|
||||
|
||||
## The HTTP server options.
|
||||
[http]
|
||||
## The address to bind the HTTP server.
|
||||
addr = "127.0.0.1:4000"
|
||||
## HTTP request timeout. Set to 0 to disable timeout.
|
||||
timeout = "0s"
|
||||
## HTTP request body limit.
|
||||
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
|
||||
## Set to 0 to disable limit.
|
||||
body_limit = "64MB"
|
||||
|
||||
## Procedure storage options.
|
||||
[procedure]
|
||||
|
||||
@@ -218,37 +237,16 @@ max_log_files = 720
|
||||
[logging.tracing_sample_ratio]
|
||||
default_ratio = 1.0
|
||||
|
||||
## The slow query log options.
|
||||
[logging.slow_query]
|
||||
## Whether to enable slow query log.
|
||||
enable = false
|
||||
|
||||
## The threshold of slow query.
|
||||
## @toml2docs:none-default
|
||||
threshold = "10s"
|
||||
|
||||
## The sampling ratio of slow query log. The value should be in the range of (0, 1].
|
||||
## @toml2docs:none-default
|
||||
sample_ratio = 1.0
|
||||
|
||||
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
|
||||
## The metasrv can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.
|
||||
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
|
||||
[export_metrics]
|
||||
|
||||
## whether enable export metrics.
|
||||
enable = false
|
||||
|
||||
## The interval of export metrics.
|
||||
write_interval = "30s"
|
||||
|
||||
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
|
||||
## You must create the database before enabling it.
|
||||
[export_metrics.self_import]
|
||||
## @toml2docs:none-default
|
||||
db = "greptime_metrics"
|
||||
|
||||
[export_metrics.remote_write]
|
||||
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
|
||||
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
|
||||
url = ""
|
||||
|
||||
## HTTP headers of Prometheus remote-write carry.
|
||||
|
||||
@@ -43,6 +43,13 @@ enable_cors = true
|
||||
## @toml2docs:none-default
|
||||
cors_allowed_origins = ["https://example.com"]
|
||||
|
||||
## Whether to enable validation for Prometheus remote write requests.
|
||||
## Available options:
|
||||
## - strict: deny invalid UTF-8 strings (default).
|
||||
## - lossy: allow invalid UTF-8 strings, replace invalid characters with REPLACEMENT_CHARACTER(U+FFFD).
|
||||
## - unchecked: do not valid strings.
|
||||
prom_validation_mode = "strict"
|
||||
|
||||
## The gRPC server options.
|
||||
[grpc]
|
||||
## The address to bind the gRPC server.
|
||||
@@ -343,7 +350,7 @@ parallelism = 0
|
||||
## The data storage options.
|
||||
[storage]
|
||||
## The working home directory.
|
||||
data_home = "./greptimedb_data/"
|
||||
data_home = "./greptimedb_data"
|
||||
|
||||
## The storage type used to store the data.
|
||||
## - `File`: the data is stored in the local file system.
|
||||
@@ -590,6 +597,9 @@ content_cache_size = "128MiB"
|
||||
## Page size for inverted index content cache.
|
||||
content_cache_page_size = "64KiB"
|
||||
|
||||
## Cache size for index result.
|
||||
result_cache_size = "128MiB"
|
||||
|
||||
## The options for inverted index in Mito engine.
|
||||
[region_engine.mito.inverted_index]
|
||||
|
||||
@@ -724,25 +734,27 @@ max_log_files = 720
|
||||
default_ratio = 1.0
|
||||
|
||||
## The slow query log options.
|
||||
[logging.slow_query]
|
||||
[slow_query]
|
||||
## Whether to enable slow query log.
|
||||
enable = false
|
||||
#+ enable = false
|
||||
|
||||
## The record type of slow queries. It can be `system_table` or `log`.
|
||||
## @toml2docs:none-default
|
||||
#+ record_type = "system_table"
|
||||
|
||||
## The threshold of slow query.
|
||||
## @toml2docs:none-default
|
||||
threshold = "10s"
|
||||
#+ threshold = "10s"
|
||||
|
||||
## The sampling ratio of slow query log. The value should be in the range of (0, 1].
|
||||
## @toml2docs:none-default
|
||||
sample_ratio = 1.0
|
||||
#+ sample_ratio = 1.0
|
||||
|
||||
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
|
||||
## The standalone can export its metrics and send to Prometheus compatible service (e.g. `greptimedb`) from remote-write API.
|
||||
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
|
||||
[export_metrics]
|
||||
|
||||
## whether enable export metrics.
|
||||
enable = false
|
||||
|
||||
## The interval of export metrics.
|
||||
write_interval = "30s"
|
||||
|
||||
@@ -753,7 +765,7 @@ write_interval = "30s"
|
||||
db = "greptime_metrics"
|
||||
|
||||
[export_metrics.remote_write]
|
||||
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
|
||||
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
|
||||
url = ""
|
||||
|
||||
## HTTP headers of Prometheus remote-write carry.
|
||||
|
||||
@@ -1,75 +0,0 @@
|
||||
/*
|
||||
* Copyright 2023 Greptime Team
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import * as core from "@actions/core";
|
||||
import {obtainClient} from "@/common";
|
||||
|
||||
async function triggerWorkflow(workflowId: string, version: string) {
|
||||
const docsClient = obtainClient("DOCS_REPO_TOKEN")
|
||||
try {
|
||||
await docsClient.rest.actions.createWorkflowDispatch({
|
||||
owner: "GreptimeTeam",
|
||||
repo: "docs",
|
||||
workflow_id: workflowId,
|
||||
ref: "main",
|
||||
inputs: {
|
||||
version,
|
||||
},
|
||||
});
|
||||
console.log(`Successfully triggered ${workflowId} workflow with version ${version}`);
|
||||
} catch (error) {
|
||||
core.setFailed(`Failed to trigger workflow: ${error.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
function determineWorkflow(version: string): [string, string] {
|
||||
// Check if it's a nightly version
|
||||
if (version.includes('nightly')) {
|
||||
return ['bump-nightly-version.yml', version];
|
||||
}
|
||||
|
||||
const parts = version.split('.');
|
||||
|
||||
if (parts.length !== 3) {
|
||||
throw new Error('Invalid version format');
|
||||
}
|
||||
|
||||
// If patch version (last number) is 0, it's a major version
|
||||
// Return only major.minor version
|
||||
if (parts[2] === '0') {
|
||||
return ['bump-version.yml', `${parts[0]}.${parts[1]}`];
|
||||
}
|
||||
|
||||
// Otherwise it's a patch version, use full version
|
||||
return ['bump-patch-version.yml', version];
|
||||
}
|
||||
|
||||
const version = process.env.VERSION;
|
||||
if (!version) {
|
||||
core.setFailed("VERSION environment variable is required");
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
// Remove 'v' prefix if exists
|
||||
const cleanVersion = version.startsWith('v') ? version.slice(1) : version;
|
||||
|
||||
try {
|
||||
const [workflowId, apiVersion] = determineWorkflow(cleanVersion);
|
||||
triggerWorkflow(workflowId, apiVersion);
|
||||
} catch (error) {
|
||||
core.setFailed(`Error processing version: ${error.message}`);
|
||||
process.exit(1);
|
||||
}
|
||||
156
cyborg/bin/bump-versions.ts
Normal file
156
cyborg/bin/bump-versions.ts
Normal file
@@ -0,0 +1,156 @@
|
||||
/*
|
||||
* Copyright 2023 Greptime Team
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import * as core from "@actions/core";
|
||||
import {obtainClient} from "@/common";
|
||||
|
||||
interface RepoConfig {
|
||||
tokenEnv: string;
|
||||
repo: string;
|
||||
workflowLogic: (version: string) => [string, string] | null;
|
||||
}
|
||||
|
||||
const REPO_CONFIGS: Record<string, RepoConfig> = {
|
||||
website: {
|
||||
tokenEnv: "WEBSITE_REPO_TOKEN",
|
||||
repo: "website",
|
||||
workflowLogic: (version: string) => {
|
||||
// Skip nightly versions for website
|
||||
if (version.includes('nightly')) {
|
||||
console.log('Nightly version detected for website, skipping workflow trigger.');
|
||||
return null;
|
||||
}
|
||||
return ['bump-patch-version.yml', version];
|
||||
}
|
||||
},
|
||||
demo: {
|
||||
tokenEnv: "DEMO_REPO_TOKEN",
|
||||
repo: "demo-scene",
|
||||
workflowLogic: (version: string) => {
|
||||
// Skip nightly versions for demo
|
||||
if (version.includes('nightly')) {
|
||||
console.log('Nightly version detected for demo, skipping workflow trigger.');
|
||||
return null;
|
||||
}
|
||||
return ['bump-patch-version.yml', version];
|
||||
}
|
||||
},
|
||||
docs: {
|
||||
tokenEnv: "DOCS_REPO_TOKEN",
|
||||
repo: "docs",
|
||||
workflowLogic: (version: string) => {
|
||||
// Check if it's a nightly version
|
||||
if (version.includes('nightly')) {
|
||||
return ['bump-nightly-version.yml', version];
|
||||
}
|
||||
|
||||
const parts = version.split('.');
|
||||
if (parts.length !== 3) {
|
||||
throw new Error('Invalid version format');
|
||||
}
|
||||
|
||||
// If patch version (last number) is 0, it's a major version
|
||||
// Return only major.minor version
|
||||
if (parts[2] === '0') {
|
||||
return ['bump-version.yml', `${parts[0]}.${parts[1]}`];
|
||||
}
|
||||
|
||||
// Otherwise it's a patch version, use full version
|
||||
return ['bump-patch-version.yml', version];
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
async function triggerWorkflow(repoConfig: RepoConfig, workflowId: string, version: string) {
|
||||
const client = obtainClient(repoConfig.tokenEnv);
|
||||
try {
|
||||
await client.rest.actions.createWorkflowDispatch({
|
||||
owner: "GreptimeTeam",
|
||||
repo: repoConfig.repo,
|
||||
workflow_id: workflowId,
|
||||
ref: "main",
|
||||
inputs: {
|
||||
version,
|
||||
},
|
||||
});
|
||||
console.log(`Successfully triggered ${workflowId} workflow for ${repoConfig.repo} with version ${version}`);
|
||||
} catch (error) {
|
||||
core.setFailed(`Failed to trigger workflow for ${repoConfig.repo}: ${error.message}`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function processRepo(repoName: string, version: string) {
|
||||
const repoConfig = REPO_CONFIGS[repoName];
|
||||
if (!repoConfig) {
|
||||
throw new Error(`Unknown repository: ${repoName}`);
|
||||
}
|
||||
|
||||
try {
|
||||
const workflowResult = repoConfig.workflowLogic(version);
|
||||
if (workflowResult === null) {
|
||||
// Skip this repo (e.g., nightly version for website)
|
||||
return;
|
||||
}
|
||||
|
||||
const [workflowId, apiVersion] = workflowResult;
|
||||
await triggerWorkflow(repoConfig, workflowId, apiVersion);
|
||||
} catch (error) {
|
||||
core.setFailed(`Error processing ${repoName} with version ${version}: ${error.message}`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function main() {
|
||||
const version = process.env.VERSION;
|
||||
if (!version) {
|
||||
core.setFailed("VERSION environment variable is required");
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
// Remove 'v' prefix if exists
|
||||
const cleanVersion = version.startsWith('v') ? version.slice(1) : version;
|
||||
|
||||
// Get target repositories from environment variable
|
||||
// Default to both if not specified
|
||||
const targetRepos = process.env.TARGET_REPOS?.split(',').map(repo => repo.trim()) || ['website', 'docs'];
|
||||
|
||||
console.log(`Processing version ${cleanVersion} for repositories: ${targetRepos.join(', ')}`);
|
||||
|
||||
const errors: string[] = [];
|
||||
|
||||
// Process each repository
|
||||
for (const repo of targetRepos) {
|
||||
try {
|
||||
await processRepo(repo, cleanVersion);
|
||||
} catch (error) {
|
||||
errors.push(`${repo}: ${error.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
if (errors.length > 0) {
|
||||
core.setFailed(`Failed to process some repositories: ${errors.join('; ')}`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
console.log('All repositories processed successfully');
|
||||
}
|
||||
|
||||
// Execute main function
|
||||
main().catch((error) => {
|
||||
core.setFailed(`Unexpected error: ${error.message}`);
|
||||
process.exit(1);
|
||||
});
|
||||
@@ -1,57 +0,0 @@
|
||||
/*
|
||||
* Copyright 2023 Greptime Team
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import * as core from "@actions/core";
|
||||
import {obtainClient} from "@/common";
|
||||
|
||||
async function triggerWorkflow(workflowId: string, version: string) {
|
||||
const websiteClient = obtainClient("WEBSITE_REPO_TOKEN")
|
||||
try {
|
||||
await websiteClient.rest.actions.createWorkflowDispatch({
|
||||
owner: "GreptimeTeam",
|
||||
repo: "website",
|
||||
workflow_id: workflowId,
|
||||
ref: "main",
|
||||
inputs: {
|
||||
version,
|
||||
},
|
||||
});
|
||||
console.log(`Successfully triggered ${workflowId} workflow with version ${version}`);
|
||||
} catch (error) {
|
||||
core.setFailed(`Failed to trigger workflow: ${error.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
const version = process.env.VERSION;
|
||||
if (!version) {
|
||||
core.setFailed("VERSION environment variable is required");
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
// Remove 'v' prefix if exists
|
||||
const cleanVersion = version.startsWith('v') ? version.slice(1) : version;
|
||||
|
||||
if (cleanVersion.includes('nightly')) {
|
||||
console.log('Nightly version detected, skipping workflow trigger.');
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
try {
|
||||
triggerWorkflow('bump-patch-version.yml', cleanVersion);
|
||||
} catch (error) {
|
||||
core.setFailed(`Error processing version: ${error.message}`);
|
||||
process.exit(1);
|
||||
}
|
||||
@@ -11,6 +11,6 @@ And database will reply with something like:
|
||||
Log Level changed from Some("info") to "trace,flow=debug"%
|
||||
```
|
||||
|
||||
The data is a string in the format of `global_level,module1=level1,module2=level2,...` that follow the same rule of `RUST_LOG`.
|
||||
The data is a string in the format of `global_level,module1=level1,module2=level2,...` that follows the same rule of `RUST_LOG`.
|
||||
|
||||
The module is the module name of the log, and the level is the log level. The log level can be one of the following: `trace`, `debug`, `info`, `warn`, `error`, `off`(case insensitive).
|
||||
@@ -14,7 +14,7 @@ impl SqlQueryHandler for Instance {
|
||||
```
|
||||
|
||||
Normally, when a SQL query arrives at GreptimeDB, the `do_query` method will be called. After some parsing work, the SQL
|
||||
will be feed into `StatementExecutor`:
|
||||
will be fed into `StatementExecutor`:
|
||||
|
||||
```rust
|
||||
// in Frontend Instance:
|
||||
@@ -27,7 +27,7 @@ an example.
|
||||
|
||||
Now, what if the statements should be handled differently for GreptimeDB Standalone and Cluster? You can see there's
|
||||
a `SqlStatementExecutor` field in `StatementExecutor`. Each GreptimeDB Standalone and Cluster has its own implementation
|
||||
of `SqlStatementExecutor`. If you are going to implement the statements differently in the two mode (
|
||||
of `SqlStatementExecutor`. If you are going to implement the statements differently in the two modes (
|
||||
like `CREATE TABLE`), you have to implement them in their own `SqlStatementExecutor`s.
|
||||
|
||||
Summarize as the diagram below:
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# Profile memory usage of GreptimeDB
|
||||
|
||||
This crate provides an easy approach to dump memory profiling info. A set of ready to use scripts is provided in [docs/how-to/memory-profile-scripts](docs/how-to/memory-profile-scripts).
|
||||
This crate provides an easy approach to dump memory profiling info. A set of ready to use scripts is provided in [docs/how-to/memory-profile-scripts](./memory-profile-scripts/scripts).
|
||||
|
||||
## Prerequisites
|
||||
### jemalloc
|
||||
@@ -44,6 +44,10 @@ Dump memory profiling data through HTTP API:
|
||||
|
||||
```bash
|
||||
curl -X POST localhost:4000/debug/prof/mem > greptime.hprof
|
||||
# or output flamegraph directly
|
||||
curl -X POST "localhost:4000/debug/prof/mem?output=flamegraph" > greptime.svg
|
||||
# or output pprof format
|
||||
curl -X POST "localhost:4000/debug/prof/mem?output=proto" > greptime.pprof
|
||||
```
|
||||
|
||||
You can periodically dump profiling data and compare them to find the delta memory usage.
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
Currently, our query engine is based on DataFusion, so all aggregate function is executed by DataFusion, through its UDAF interface. You can find DataFusion's UDAF example [here](https://github.com/apache/arrow-datafusion/blob/arrow2/datafusion-examples/examples/simple_udaf.rs). Basically, we provide the same way as DataFusion to write aggregate functions: both are centered in a struct called "Accumulator" to accumulates states along the way in aggregation.
|
||||
|
||||
However, DataFusion's UDAF implementation has a huge restriction, that it requires user to provide a concrete "Accumulator". Take `Median` aggregate function for example, to aggregate a `u32` datatype column, you have to write a `MedianU32`, and use `SELECT MEDIANU32(x)` in SQL. `MedianU32` cannot be used to aggregate a `i32` datatype column. Or, there's another way: you can use a special type that can hold all kinds of data (like our `Value` enum or Arrow's `ScalarValue`), and `match` all the way up to do aggregate calculations. It might work, though rather tedious. (But I think it's DataFusion's prefer way to write UDAF.)
|
||||
However, DataFusion's UDAF implementation has a huge restriction, that it requires user to provide a concrete "Accumulator". Take `Median` aggregate function for example, to aggregate a `u32` datatype column, you have to write a `MedianU32`, and use `SELECT MEDIANU32(x)` in SQL. `MedianU32` cannot be used to aggregate a `i32` datatype column. Or, there's another way: you can use a special type that can hold all kinds of data (like our `Value` enum or Arrow's `ScalarValue`), and `match` all the way up to do aggregate calculations. It might work, though rather tedious. (But I think it's DataFusion's preferred way to write UDAF.)
|
||||
|
||||
So is there a way we can make an aggregate function that automatically match the input data's type? For example, a `Median` aggregator that can work on both `u32` column and `i32`? The answer is yes until we found a way to bypassing DataFusion's restriction, a restriction that DataFusion simply don't pass the input data's type when creating an Accumulator.
|
||||
So is there a way we can make an aggregate function that automatically match the input data's type? For example, a `Median` aggregator that can work on both `u32` column and `i32`? The answer is yes until we find a way to bypass DataFusion's restriction, a restriction that DataFusion simply doesn't pass the input data's type when creating an Accumulator.
|
||||
|
||||
> There's an example in `my_sum_udaf_example.rs`, take that as quick start.
|
||||
|
||||
@@ -16,7 +16,7 @@ You must first define a struct that will be used to create your accumulator. For
|
||||
struct MySumAccumulatorCreator {}
|
||||
```
|
||||
|
||||
Attribute macro `#[as_aggr_func_creator]` and derive macro `#[derive(Debug, AggrFuncTypeStore)]` must both annotated on the struct. They work together to provide a storage of aggregate function's input data types, which are needed for creating generic accumulator later.
|
||||
Attribute macro `#[as_aggr_func_creator]` and derive macro `#[derive(Debug, AggrFuncTypeStore)]` must both be annotated on the struct. They work together to provide a storage of aggregate function's input data types, which are needed for creating generic accumulator later.
|
||||
|
||||
> Note that the `as_aggr_func_creator` macro will add fields to the struct, so the struct cannot be defined as an empty struct without field like `struct Foo;`, neither as a new type like `struct Foo(bar)`.
|
||||
|
||||
@@ -32,11 +32,11 @@ pub trait AggregateFunctionCreator: Send + Sync + Debug {
|
||||
|
||||
You can use input data's type in methods that return output type and state types (just invoke `input_types()`).
|
||||
|
||||
The output type is aggregate function's output data's type. For example, `SUM` aggregate function's output type is `u64` for a `u32` datatype column. The state types are accumulator's internal states' types. Take `AVG` aggregate function on a `i32` column as example, it's state types are `i64` (for sum) and `u64` (for count).
|
||||
The output type is aggregate function's output data's type. For example, `SUM` aggregate function's output type is `u64` for a `u32` datatype column. The state types are accumulator's internal states' types. Take `AVG` aggregate function on a `i32` column as example, its state types are `i64` (for sum) and `u64` (for count).
|
||||
|
||||
The `creator` function is where you define how an accumulator (that will be used in DataFusion) is created. You define "how" to create the accumulator (instead of "what" to create), using the input data's type as arguments. With input datatype known, you can create accumulator generically.
|
||||
|
||||
# 2. Impl `Accumulator` trait for you accumulator.
|
||||
# 2. Impl `Accumulator` trait for your accumulator.
|
||||
|
||||
The accumulator is where you store the aggregate calculation states and evaluate a result. You must impl `Accumulator` trait for it. The trait's definition is:
|
||||
|
||||
@@ -49,7 +49,7 @@ pub trait Accumulator: Send + Sync + Debug {
|
||||
}
|
||||
```
|
||||
|
||||
The DataFusion basically execute aggregate like this:
|
||||
The DataFusion basically executes aggregate like this:
|
||||
|
||||
1. Partitioning all input data for aggregate. Create an accumulator for each part.
|
||||
2. Call `update_batch` on each accumulator with partitioned data, to let you update your aggregate calculation.
|
||||
@@ -57,16 +57,16 @@ The DataFusion basically execute aggregate like this:
|
||||
4. Call `merge_batch` to merge all accumulator's internal state to one.
|
||||
5. Execute `evaluate` on the chosen one to get the final calculation result.
|
||||
|
||||
Once you know the meaning of each method, you can easily write your accumulator. You can refer to `Median` accumulator or `SUM` accumulator defined in file `my_sum_udaf_example.rs` for more details.
|
||||
Once you know the meaning of each method, you can easily write your accumulator. You can refer to `Median` accumulator or `SUM` accumulator defined in file `my_sum_udaf_example.rs` for more details.
|
||||
|
||||
# 3. Register your aggregate function to our query engine.
|
||||
|
||||
You can call `register_aggregate_function` method in query engine to register your aggregate function. To do that, you have to new an instance of struct `AggregateFunctionMeta`. The struct has three fields, first is the name of your aggregate function's name. The function name is case-sensitive due to DataFusion's restriction. We strongly recommend using lowercase for your name. If you have to use uppercase name, wrap your aggregate function with quotation marks. For example, if you define an aggregate function named "my_aggr", you can use "`SELECT MY_AGGR(x)`"; if you define "my_AGGR", you have to use "`SELECT "my_AGGR"(x)`".
|
||||
|
||||
The second field is arg_counts ,the count of the arguments. Like accumulator `percentile`, calculating the p_number of the column. We need to input the value of column and the value of p to cacalate, and so the count of the arguments is two.
|
||||
The second field is arg_counts ,the count of the arguments. Like accumulator `percentile`, calculating the p_number of the column. We need to input the value of column and the value of p to calculate, and so the count of the arguments is two.
|
||||
|
||||
The third field is a function about how to create your accumulator creator that you defined in step 1 above. Create creator, that's a bit intertwined, but it is how we make DataFusion use a newly created aggregate function each time it executes a SQL, preventing the stored input types from affecting each other. The key detail can be starting looking at our `DfContextProviderAdapter` struct's `get_aggregate_meta` method.
|
||||
|
||||
# (Optional) 4. Make your aggregate function automatically registered.
|
||||
|
||||
If you've written a great aggregate function that want to let everyone use it, you can make it automatically registered to our query engine at start time. It's quick simple, just refer to the `AggregateFunctions::register` function in `common/function/src/scalars/aggregate/mod.rs`.
|
||||
If you've written a great aggregate function that wants to let everyone use it, you can make it automatically register to our query engine at start time. It's quick and simple, just refer to the `AggregateFunctions::register` function in `common/function/src/scalars/aggregate/mod.rs`.
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
This document introduces how to write fuzz tests in GreptimeDB.
|
||||
|
||||
## What is a fuzz test
|
||||
Fuzz test is tool that leverage deterministic random generation to assist in finding bugs. The goal of fuzz tests is to identify inputs generated by the fuzzer that cause system panics, crashes, or unexpected behaviors to occur. And we are using the [cargo-fuzz](https://github.com/rust-fuzz/cargo-fuzz) to run our fuzz test targets.
|
||||
Fuzz test is tool that leverages deterministic random generation to assist in finding bugs. The goal of fuzz tests is to identify inputs generated by the fuzzer that cause system panics, crashes, or unexpected behaviors to occur. And we are using the [cargo-fuzz](https://github.com/rust-fuzz/cargo-fuzz) to run our fuzz test targets.
|
||||
|
||||
## Why we need them
|
||||
- Find bugs by leveraging random generation
|
||||
|
||||
8
flake.lock
generated
8
flake.lock
generated
@@ -41,16 +41,16 @@
|
||||
},
|
||||
"nixpkgs": {
|
||||
"locked": {
|
||||
"lastModified": 1745487689,
|
||||
"narHash": "sha256-FQoi3R0NjQeBAsEOo49b5tbDPcJSMWc3QhhaIi9eddw=",
|
||||
"lastModified": 1748162331,
|
||||
"narHash": "sha256-rqc2RKYTxP3tbjA+PB3VMRQNnjesrT0pEofXQTrMsS8=",
|
||||
"owner": "NixOS",
|
||||
"repo": "nixpkgs",
|
||||
"rev": "5630cf13cceac06cefe9fc607e8dfa8fb342dde3",
|
||||
"rev": "7c43f080a7f28b2774f3b3f43234ca11661bf334",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "NixOS",
|
||||
"ref": "nixos-24.11",
|
||||
"ref": "nixos-25.05",
|
||||
"repo": "nixpkgs",
|
||||
"type": "github"
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
description = "Development environment flake";
|
||||
|
||||
inputs = {
|
||||
nixpkgs.url = "github:NixOS/nixpkgs/nixos-24.11";
|
||||
nixpkgs.url = "github:NixOS/nixpkgs/nixos-25.05";
|
||||
fenix = {
|
||||
url = "github:nix-community/fenix";
|
||||
inputs.nixpkgs.follows = "nixpkgs";
|
||||
@@ -21,7 +21,7 @@
|
||||
lib = nixpkgs.lib;
|
||||
rustToolchain = fenix.packages.${system}.fromToolchainName {
|
||||
name = (lib.importTOML ./rust-toolchain.toml).toolchain.channel;
|
||||
sha256 = "sha256-arzEYlWLGGYeOhECHpBxQd2joZ4rPKV3qLNnZ+eql6A=";
|
||||
sha256 = "sha256-tJJr8oqX3YD+ohhPK7jlt/7kvKBnBqJVjYtoFr520d4=";
|
||||
};
|
||||
in
|
||||
{
|
||||
@@ -51,6 +51,7 @@
|
||||
];
|
||||
|
||||
LD_LIBRARY_PATH = pkgs.lib.makeLibraryPath buildInputs;
|
||||
NIX_HARDENING_ENABLE = "";
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -46,6 +46,7 @@
|
||||
| Ingest Rows per Instance | `sum by(instance, pod)(rate(greptime_table_operator_ingest_rows{instance=~"$frontend"}[$__rate_interval]))` | `timeseries` | Ingestion rate by row as in each frontend | `prometheus` | `rowsps` | `[{{instance}}]-[{{pod}}]` |
|
||||
| Region Call QPS per Instance | `sum by(instance, pod, request_type) (rate(greptime_grpc_region_request_count{instance=~"$frontend"}[$__rate_interval]))` | `timeseries` | Region Call QPS per Instance. | `prometheus` | `ops` | `[{{instance}}]-[{{pod}}]-[{{request_type}}]` |
|
||||
| Region Call P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, request_type) (rate(greptime_grpc_region_request_bucket{instance=~"$frontend"}[$__rate_interval])))` | `timeseries` | Region Call P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{request_type}}]` |
|
||||
| Frontend Handle Bulk Insert Elapsed Time | `sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_sum[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_count[$__rate_interval]))`<br/>`histogram_quantile(0.99, sum by(instance, pod, stage, le) (rate(greptime_table_operator_handle_bulk_insert_bucket[$__rate_interval])))` | `timeseries` | Per-stage time for frontend to handle bulk insert requests | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG` |
|
||||
# Mito Engine
|
||||
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
||||
| --- | --- | --- | --- | --- | --- | --- |
|
||||
@@ -59,7 +60,7 @@
|
||||
| Read Stage P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_read_stage_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))` | `timeseries` | Read Stage P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]` |
|
||||
| Write Stage P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_write_stage_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))` | `timeseries` | Write Stage P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]` |
|
||||
| Compaction OPS per Instance | `sum by(instance, pod) (rate(greptime_mito_compaction_total_elapsed_count{instance=~"$datanode"}[$__rate_interval]))` | `timeseries` | Compaction OPS per Instance. | `prometheus` | `ops` | `[{{ instance }}]-[{{pod}}]` |
|
||||
| Compaction P99 per Instance by Stage | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))` | `timeseries` | Compaction latency by stage | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-p99` |
|
||||
| Compaction Elapsed Time per Instance by Stage | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))`<br/>`sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{instance=~"$datanode"}[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_count{instance=~"$datanode"}[$__rate_interval]))` | `timeseries` | Compaction latency by stage | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-p99` |
|
||||
| Compaction P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le,stage) (rate(greptime_mito_compaction_total_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))` | `timeseries` | Compaction P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-compaction` |
|
||||
| WAL write size | `histogram_quantile(0.95, sum by(le,instance, pod) (rate(raft_engine_write_size_bucket[$__rate_interval])))`<br/>`histogram_quantile(0.99, sum by(le,instance,pod) (rate(raft_engine_write_size_bucket[$__rate_interval])))`<br/>`sum by (instance, pod)(rate(raft_engine_write_size_sum[$__rate_interval]))` | `timeseries` | Write-ahead logs write size as bytes. This chart includes stats of p95 and p99 size by instance, total WAL write rate. | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-req-size-p95` |
|
||||
| Cached Bytes per Instance | `greptime_mito_cache_bytes{instance=~"$datanode"}` | `timeseries` | Cached Bytes per Instance. | `prometheus` | `decbytes` | `[{{instance}}]-[{{pod}}]-[{{type}}]` |
|
||||
@@ -67,6 +68,9 @@
|
||||
| WAL sync duration seconds | `histogram_quantile(0.99, sum by(le, type, node, instance, pod) (rate(raft_engine_sync_log_duration_seconds_bucket[$__rate_interval])))` | `timeseries` | Raft engine (local disk) log store sync latency, p99 | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-p99` |
|
||||
| Log Store op duration seconds | `histogram_quantile(0.99, sum by(le,logstore,optype,instance, pod) (rate(greptime_logstore_op_elapsed_bucket[$__rate_interval])))` | `timeseries` | Write-ahead log operations latency at p99 | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{logstore}}]-[{{optype}}]-p99` |
|
||||
| Inflight Flush | `greptime_mito_inflight_flush_count` | `timeseries` | Ongoing flush task count | `prometheus` | `none` | `[{{instance}}]-[{{pod}}]` |
|
||||
| Compaction Input/Output Bytes | `sum by(instance, pod) (greptime_mito_compaction_input_bytes)`<br/>`sum by(instance, pod) (greptime_mito_compaction_output_bytes)` | `timeseries` | Compaction oinput output bytes | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-input` |
|
||||
| Region Worker Handle Bulk Insert Requests | `histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))`<br/>`sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to handle bulk insert region requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
|
||||
| Region Worker Convert Requests | `histogram_quantile(0.95, sum by(le, instance, stage, pod) (rate(greptime_datanode_convert_region_request_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to decode requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
|
||||
# OpenDAL
|
||||
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
||||
| --- | --- | --- | --- | --- | --- | --- |
|
||||
@@ -84,9 +88,19 @@
|
||||
# Metasrv
|
||||
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
||||
| --- | --- | --- | --- | --- | --- | --- |
|
||||
| Region migration datanode | `greptime_meta_region_migration_stat{datanode_type="src"}`<br/>`greptime_meta_region_migration_stat{datanode_type="desc"}` | `state-timeline` | Counter of region migration by source and destination | `prometheus` | `none` | `from-datanode-{{datanode_id}}` |
|
||||
| Region migration error | `greptime_meta_region_migration_error` | `timeseries` | Counter of region migration error | `prometheus` | `none` | `__auto` |
|
||||
| Datanode load | `greptime_datanode_load` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `__auto` |
|
||||
| Region migration datanode | `greptime_meta_region_migration_stat{datanode_type="src"}`<br/>`greptime_meta_region_migration_stat{datanode_type="desc"}` | `status-history` | Counter of region migration by source and destination | `prometheus` | -- | `from-datanode-{{datanode_id}}` |
|
||||
| Region migration error | `greptime_meta_region_migration_error` | `timeseries` | Counter of region migration error | `prometheus` | `none` | `{{pod}}-{{state}}-{{error_type}}` |
|
||||
| Datanode load | `greptime_datanode_load` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `binBps` | `Datanode-{{datanode_id}}-writeload` |
|
||||
| Rate of SQL Executions (RDS) | `rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_count[$__rate_interval])` | `timeseries` | Displays the rate of SQL executions processed by the Meta service using the RDS backend. | `prometheus` | `none` | `{{pod}} {{op}} {{type}} {{result}} ` |
|
||||
| SQL Execution Latency (RDS) | `histogram_quantile(0.90, sum by(pod, op, type, result, le) (rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_bucket[$__rate_interval])))` | `timeseries` | Measures the response time of SQL executions via the RDS backend. | `prometheus` | `ms` | `{{pod}} {{op}} {{type}} {{result}} p90` |
|
||||
| Handler Execution Latency | `histogram_quantile(0.90, sum by(pod, le, name) (
|
||||
rate(greptime_meta_handler_execute_bucket[$__rate_interval])
|
||||
))` | `timeseries` | Shows latency of Meta handlers by pod and handler name, useful for monitoring handler performance and detecting latency spikes.<br/> | `prometheus` | `s` | `{{pod}} {{name}} p90` |
|
||||
| Heartbeat Packet Size | `histogram_quantile(0.9, sum by(pod, le) (greptime_meta_heartbeat_stat_memory_size_bucket))` | `timeseries` | Shows p90 heartbeat message sizes, helping track network usage and identify anomalies in heartbeat payload.<br/> | `prometheus` | `bytes` | `{{pod}}` |
|
||||
| Meta Heartbeat Receive Rate | `rate(greptime_meta_heartbeat_rate[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `{{pod}}` |
|
||||
| Meta KV Ops Latency | `histogram_quantile(0.99, sum by(pod, le, op, target) (greptime_meta_kv_request_elapsed_bucket))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `{{pod}}-{{op}} p99` |
|
||||
| Rate of meta KV Ops | `rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `{{pod}}-{{op}} p99` |
|
||||
| DDL Latency | `histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `CreateLogicalTables-{{step}} p90` |
|
||||
# Flownode
|
||||
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
||||
| --- | --- | --- | --- | --- | --- | --- |
|
||||
|
||||
@@ -371,6 +371,21 @@ groups:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{request_type}}]'
|
||||
- title: 'Frontend Handle Bulk Insert Elapsed Time '
|
||||
type: timeseries
|
||||
description: Per-stage time for frontend to handle bulk insert requests
|
||||
unit: s
|
||||
queries:
|
||||
- expr: sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_sum[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_count[$__rate_interval]))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG'
|
||||
- expr: histogram_quantile(0.99, sum by(instance, pod, stage, le) (rate(greptime_table_operator_handle_bulk_insert_bucket[$__rate_interval])))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
|
||||
- title: Mito Engine
|
||||
panels:
|
||||
- title: Request OPS per Instance
|
||||
@@ -472,7 +487,7 @@ groups:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{ instance }}]-[{{pod}}]'
|
||||
- title: Compaction P99 per Instance by Stage
|
||||
- title: Compaction Elapsed Time per Instance by Stage
|
||||
type: timeseries
|
||||
description: Compaction latency by stage
|
||||
unit: s
|
||||
@@ -482,6 +497,11 @@ groups:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-p99'
|
||||
- expr: sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{instance=~"$datanode"}[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_count{instance=~"$datanode"}[$__rate_interval]))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-avg'
|
||||
- title: Compaction P99 per Instance
|
||||
type: timeseries
|
||||
description: Compaction P99 per Instance.
|
||||
@@ -562,6 +582,51 @@ groups:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]'
|
||||
- title: Compaction Input/Output Bytes
|
||||
type: timeseries
|
||||
description: Compaction oinput output bytes
|
||||
unit: bytes
|
||||
queries:
|
||||
- expr: sum by(instance, pod) (greptime_mito_compaction_input_bytes)
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-input'
|
||||
- expr: sum by(instance, pod) (greptime_mito_compaction_output_bytes)
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-output'
|
||||
- title: Region Worker Handle Bulk Insert Requests
|
||||
type: timeseries
|
||||
description: Per-stage elapsed time for region worker to handle bulk insert region requests.
|
||||
unit: s
|
||||
queries:
|
||||
- expr: histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
|
||||
- expr: sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG'
|
||||
- title: Region Worker Convert Requests
|
||||
type: timeseries
|
||||
description: Per-stage elapsed time for region worker to decode requests.
|
||||
unit: s
|
||||
queries:
|
||||
- expr: histogram_quantile(0.95, sum by(le, instance, stage, pod) (rate(greptime_datanode_convert_region_request_bucket[$__rate_interval])))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
|
||||
- expr: sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_count[$__rate_interval]))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG'
|
||||
- title: OpenDAL
|
||||
panels:
|
||||
- title: QPS per Instance
|
||||
@@ -676,9 +741,8 @@ groups:
|
||||
- title: Metasrv
|
||||
panels:
|
||||
- title: Region migration datanode
|
||||
type: state-timeline
|
||||
type: status-history
|
||||
description: Counter of region migration by source and destination
|
||||
unit: none
|
||||
queries:
|
||||
- expr: greptime_meta_region_migration_stat{datanode_type="src"}
|
||||
datasource:
|
||||
@@ -699,17 +763,127 @@ groups:
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: __auto
|
||||
legendFormat: '{{pod}}-{{state}}-{{error_type}}'
|
||||
- title: Datanode load
|
||||
type: timeseries
|
||||
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
|
||||
unit: none
|
||||
unit: binBps
|
||||
queries:
|
||||
- expr: greptime_datanode_load
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: __auto
|
||||
legendFormat: Datanode-{{datanode_id}}-writeload
|
||||
- title: Rate of SQL Executions (RDS)
|
||||
type: timeseries
|
||||
description: Displays the rate of SQL executions processed by the Meta service using the RDS backend.
|
||||
unit: none
|
||||
queries:
|
||||
- expr: rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_count[$__rate_interval])
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '{{pod}} {{op}} {{type}} {{result}} '
|
||||
- title: SQL Execution Latency (RDS)
|
||||
type: timeseries
|
||||
description: 'Measures the response time of SQL executions via the RDS backend. '
|
||||
unit: ms
|
||||
queries:
|
||||
- expr: histogram_quantile(0.90, sum by(pod, op, type, result, le) (rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_bucket[$__rate_interval])))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '{{pod}} {{op}} {{type}} {{result}} p90'
|
||||
- title: Handler Execution Latency
|
||||
type: timeseries
|
||||
description: |
|
||||
Shows latency of Meta handlers by pod and handler name, useful for monitoring handler performance and detecting latency spikes.
|
||||
unit: s
|
||||
queries:
|
||||
- expr: |-
|
||||
histogram_quantile(0.90, sum by(pod, le, name) (
|
||||
rate(greptime_meta_handler_execute_bucket[$__rate_interval])
|
||||
))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '{{pod}} {{name}} p90'
|
||||
- title: Heartbeat Packet Size
|
||||
type: timeseries
|
||||
description: |
|
||||
Shows p90 heartbeat message sizes, helping track network usage and identify anomalies in heartbeat payload.
|
||||
unit: bytes
|
||||
queries:
|
||||
- expr: histogram_quantile(0.9, sum by(pod, le) (greptime_meta_heartbeat_stat_memory_size_bucket))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '{{pod}}'
|
||||
- title: Meta Heartbeat Receive Rate
|
||||
type: timeseries
|
||||
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
|
||||
unit: s
|
||||
queries:
|
||||
- expr: rate(greptime_meta_heartbeat_rate[$__rate_interval])
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '{{pod}}'
|
||||
- title: Meta KV Ops Latency
|
||||
type: timeseries
|
||||
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
|
||||
unit: s
|
||||
queries:
|
||||
- expr: histogram_quantile(0.99, sum by(pod, le, op, target) (greptime_meta_kv_request_elapsed_bucket))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '{{pod}}-{{op}} p99'
|
||||
- title: Rate of meta KV Ops
|
||||
type: timeseries
|
||||
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
|
||||
unit: none
|
||||
queries:
|
||||
- expr: rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '{{pod}}-{{op}} p99'
|
||||
- title: DDL Latency
|
||||
type: timeseries
|
||||
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
|
||||
unit: s
|
||||
queries:
|
||||
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: CreateLogicalTables-{{step}} p90
|
||||
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: CreateTable-{{step}} p90
|
||||
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: CreateView-{{step}} p90
|
||||
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: CreateFlow-{{step}} p90
|
||||
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: DropTable-{{step}} p90
|
||||
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: AlterTable-{{step}} p90
|
||||
- title: Flownode
|
||||
panels:
|
||||
- title: Flow Ingest / Output Rate
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -46,6 +46,7 @@
|
||||
| Ingest Rows per Instance | `sum by(instance, pod)(rate(greptime_table_operator_ingest_rows{}[$__rate_interval]))` | `timeseries` | Ingestion rate by row as in each frontend | `prometheus` | `rowsps` | `[{{instance}}]-[{{pod}}]` |
|
||||
| Region Call QPS per Instance | `sum by(instance, pod, request_type) (rate(greptime_grpc_region_request_count{}[$__rate_interval]))` | `timeseries` | Region Call QPS per Instance. | `prometheus` | `ops` | `[{{instance}}]-[{{pod}}]-[{{request_type}}]` |
|
||||
| Region Call P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, request_type) (rate(greptime_grpc_region_request_bucket{}[$__rate_interval])))` | `timeseries` | Region Call P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{request_type}}]` |
|
||||
| Frontend Handle Bulk Insert Elapsed Time | `sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_sum[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_count[$__rate_interval]))`<br/>`histogram_quantile(0.99, sum by(instance, pod, stage, le) (rate(greptime_table_operator_handle_bulk_insert_bucket[$__rate_interval])))` | `timeseries` | Per-stage time for frontend to handle bulk insert requests | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG` |
|
||||
# Mito Engine
|
||||
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
||||
| --- | --- | --- | --- | --- | --- | --- |
|
||||
@@ -59,7 +60,7 @@
|
||||
| Read Stage P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_read_stage_elapsed_bucket{}[$__rate_interval])))` | `timeseries` | Read Stage P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]` |
|
||||
| Write Stage P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_write_stage_elapsed_bucket{}[$__rate_interval])))` | `timeseries` | Write Stage P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]` |
|
||||
| Compaction OPS per Instance | `sum by(instance, pod) (rate(greptime_mito_compaction_total_elapsed_count{}[$__rate_interval]))` | `timeseries` | Compaction OPS per Instance. | `prometheus` | `ops` | `[{{ instance }}]-[{{pod}}]` |
|
||||
| Compaction P99 per Instance by Stage | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{}[$__rate_interval])))` | `timeseries` | Compaction latency by stage | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-p99` |
|
||||
| Compaction Elapsed Time per Instance by Stage | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{}[$__rate_interval])))`<br/>`sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{}[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_count{}[$__rate_interval]))` | `timeseries` | Compaction latency by stage | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-p99` |
|
||||
| Compaction P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le,stage) (rate(greptime_mito_compaction_total_elapsed_bucket{}[$__rate_interval])))` | `timeseries` | Compaction P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-compaction` |
|
||||
| WAL write size | `histogram_quantile(0.95, sum by(le,instance, pod) (rate(raft_engine_write_size_bucket[$__rate_interval])))`<br/>`histogram_quantile(0.99, sum by(le,instance,pod) (rate(raft_engine_write_size_bucket[$__rate_interval])))`<br/>`sum by (instance, pod)(rate(raft_engine_write_size_sum[$__rate_interval]))` | `timeseries` | Write-ahead logs write size as bytes. This chart includes stats of p95 and p99 size by instance, total WAL write rate. | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-req-size-p95` |
|
||||
| Cached Bytes per Instance | `greptime_mito_cache_bytes{}` | `timeseries` | Cached Bytes per Instance. | `prometheus` | `decbytes` | `[{{instance}}]-[{{pod}}]-[{{type}}]` |
|
||||
@@ -67,6 +68,9 @@
|
||||
| WAL sync duration seconds | `histogram_quantile(0.99, sum by(le, type, node, instance, pod) (rate(raft_engine_sync_log_duration_seconds_bucket[$__rate_interval])))` | `timeseries` | Raft engine (local disk) log store sync latency, p99 | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-p99` |
|
||||
| Log Store op duration seconds | `histogram_quantile(0.99, sum by(le,logstore,optype,instance, pod) (rate(greptime_logstore_op_elapsed_bucket[$__rate_interval])))` | `timeseries` | Write-ahead log operations latency at p99 | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{logstore}}]-[{{optype}}]-p99` |
|
||||
| Inflight Flush | `greptime_mito_inflight_flush_count` | `timeseries` | Ongoing flush task count | `prometheus` | `none` | `[{{instance}}]-[{{pod}}]` |
|
||||
| Compaction Input/Output Bytes | `sum by(instance, pod) (greptime_mito_compaction_input_bytes)`<br/>`sum by(instance, pod) (greptime_mito_compaction_output_bytes)` | `timeseries` | Compaction oinput output bytes | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-input` |
|
||||
| Region Worker Handle Bulk Insert Requests | `histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))`<br/>`sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to handle bulk insert region requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
|
||||
| Region Worker Convert Requests | `histogram_quantile(0.95, sum by(le, instance, stage, pod) (rate(greptime_datanode_convert_region_request_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to decode requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
|
||||
# OpenDAL
|
||||
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
||||
| --- | --- | --- | --- | --- | --- | --- |
|
||||
@@ -84,9 +88,19 @@
|
||||
# Metasrv
|
||||
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
||||
| --- | --- | --- | --- | --- | --- | --- |
|
||||
| Region migration datanode | `greptime_meta_region_migration_stat{datanode_type="src"}`<br/>`greptime_meta_region_migration_stat{datanode_type="desc"}` | `state-timeline` | Counter of region migration by source and destination | `prometheus` | `none` | `from-datanode-{{datanode_id}}` |
|
||||
| Region migration error | `greptime_meta_region_migration_error` | `timeseries` | Counter of region migration error | `prometheus` | `none` | `__auto` |
|
||||
| Datanode load | `greptime_datanode_load` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `__auto` |
|
||||
| Region migration datanode | `greptime_meta_region_migration_stat{datanode_type="src"}`<br/>`greptime_meta_region_migration_stat{datanode_type="desc"}` | `status-history` | Counter of region migration by source and destination | `prometheus` | -- | `from-datanode-{{datanode_id}}` |
|
||||
| Region migration error | `greptime_meta_region_migration_error` | `timeseries` | Counter of region migration error | `prometheus` | `none` | `{{pod}}-{{state}}-{{error_type}}` |
|
||||
| Datanode load | `greptime_datanode_load` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `binBps` | `Datanode-{{datanode_id}}-writeload` |
|
||||
| Rate of SQL Executions (RDS) | `rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_count[$__rate_interval])` | `timeseries` | Displays the rate of SQL executions processed by the Meta service using the RDS backend. | `prometheus` | `none` | `{{pod}} {{op}} {{type}} {{result}} ` |
|
||||
| SQL Execution Latency (RDS) | `histogram_quantile(0.90, sum by(pod, op, type, result, le) (rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_bucket[$__rate_interval])))` | `timeseries` | Measures the response time of SQL executions via the RDS backend. | `prometheus` | `ms` | `{{pod}} {{op}} {{type}} {{result}} p90` |
|
||||
| Handler Execution Latency | `histogram_quantile(0.90, sum by(pod, le, name) (
|
||||
rate(greptime_meta_handler_execute_bucket[$__rate_interval])
|
||||
))` | `timeseries` | Shows latency of Meta handlers by pod and handler name, useful for monitoring handler performance and detecting latency spikes.<br/> | `prometheus` | `s` | `{{pod}} {{name}} p90` |
|
||||
| Heartbeat Packet Size | `histogram_quantile(0.9, sum by(pod, le) (greptime_meta_heartbeat_stat_memory_size_bucket))` | `timeseries` | Shows p90 heartbeat message sizes, helping track network usage and identify anomalies in heartbeat payload.<br/> | `prometheus` | `bytes` | `{{pod}}` |
|
||||
| Meta Heartbeat Receive Rate | `rate(greptime_meta_heartbeat_rate[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `{{pod}}` |
|
||||
| Meta KV Ops Latency | `histogram_quantile(0.99, sum by(pod, le, op, target) (greptime_meta_kv_request_elapsed_bucket))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `{{pod}}-{{op}} p99` |
|
||||
| Rate of meta KV Ops | `rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `{{pod}}-{{op}} p99` |
|
||||
| DDL Latency | `histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `CreateLogicalTables-{{step}} p90` |
|
||||
# Flownode
|
||||
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
||||
| --- | --- | --- | --- | --- | --- | --- |
|
||||
|
||||
@@ -371,6 +371,21 @@ groups:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{request_type}}]'
|
||||
- title: 'Frontend Handle Bulk Insert Elapsed Time '
|
||||
type: timeseries
|
||||
description: Per-stage time for frontend to handle bulk insert requests
|
||||
unit: s
|
||||
queries:
|
||||
- expr: sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_sum[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_count[$__rate_interval]))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG'
|
||||
- expr: histogram_quantile(0.99, sum by(instance, pod, stage, le) (rate(greptime_table_operator_handle_bulk_insert_bucket[$__rate_interval])))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
|
||||
- title: Mito Engine
|
||||
panels:
|
||||
- title: Request OPS per Instance
|
||||
@@ -472,7 +487,7 @@ groups:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{ instance }}]-[{{pod}}]'
|
||||
- title: Compaction P99 per Instance by Stage
|
||||
- title: Compaction Elapsed Time per Instance by Stage
|
||||
type: timeseries
|
||||
description: Compaction latency by stage
|
||||
unit: s
|
||||
@@ -482,6 +497,11 @@ groups:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-p99'
|
||||
- expr: sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{}[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_mito_compaction_stage_elapsed_count{}[$__rate_interval]))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-avg'
|
||||
- title: Compaction P99 per Instance
|
||||
type: timeseries
|
||||
description: Compaction P99 per Instance.
|
||||
@@ -562,6 +582,51 @@ groups:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]'
|
||||
- title: Compaction Input/Output Bytes
|
||||
type: timeseries
|
||||
description: Compaction oinput output bytes
|
||||
unit: bytes
|
||||
queries:
|
||||
- expr: sum by(instance, pod) (greptime_mito_compaction_input_bytes)
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-input'
|
||||
- expr: sum by(instance, pod) (greptime_mito_compaction_output_bytes)
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-output'
|
||||
- title: Region Worker Handle Bulk Insert Requests
|
||||
type: timeseries
|
||||
description: Per-stage elapsed time for region worker to handle bulk insert region requests.
|
||||
unit: s
|
||||
queries:
|
||||
- expr: histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
|
||||
- expr: sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG'
|
||||
- title: Region Worker Convert Requests
|
||||
type: timeseries
|
||||
description: Per-stage elapsed time for region worker to decode requests.
|
||||
unit: s
|
||||
queries:
|
||||
- expr: histogram_quantile(0.95, sum by(le, instance, stage, pod) (rate(greptime_datanode_convert_region_request_bucket[$__rate_interval])))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
|
||||
- expr: sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_count[$__rate_interval]))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG'
|
||||
- title: OpenDAL
|
||||
panels:
|
||||
- title: QPS per Instance
|
||||
@@ -676,9 +741,8 @@ groups:
|
||||
- title: Metasrv
|
||||
panels:
|
||||
- title: Region migration datanode
|
||||
type: state-timeline
|
||||
type: status-history
|
||||
description: Counter of region migration by source and destination
|
||||
unit: none
|
||||
queries:
|
||||
- expr: greptime_meta_region_migration_stat{datanode_type="src"}
|
||||
datasource:
|
||||
@@ -699,17 +763,127 @@ groups:
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: __auto
|
||||
legendFormat: '{{pod}}-{{state}}-{{error_type}}'
|
||||
- title: Datanode load
|
||||
type: timeseries
|
||||
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
|
||||
unit: none
|
||||
unit: binBps
|
||||
queries:
|
||||
- expr: greptime_datanode_load
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: __auto
|
||||
legendFormat: Datanode-{{datanode_id}}-writeload
|
||||
- title: Rate of SQL Executions (RDS)
|
||||
type: timeseries
|
||||
description: Displays the rate of SQL executions processed by the Meta service using the RDS backend.
|
||||
unit: none
|
||||
queries:
|
||||
- expr: rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_count[$__rate_interval])
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '{{pod}} {{op}} {{type}} {{result}} '
|
||||
- title: SQL Execution Latency (RDS)
|
||||
type: timeseries
|
||||
description: 'Measures the response time of SQL executions via the RDS backend. '
|
||||
unit: ms
|
||||
queries:
|
||||
- expr: histogram_quantile(0.90, sum by(pod, op, type, result, le) (rate(greptime_meta_rds_pg_sql_execute_elapsed_ms_bucket[$__rate_interval])))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '{{pod}} {{op}} {{type}} {{result}} p90'
|
||||
- title: Handler Execution Latency
|
||||
type: timeseries
|
||||
description: |
|
||||
Shows latency of Meta handlers by pod and handler name, useful for monitoring handler performance and detecting latency spikes.
|
||||
unit: s
|
||||
queries:
|
||||
- expr: |-
|
||||
histogram_quantile(0.90, sum by(pod, le, name) (
|
||||
rate(greptime_meta_handler_execute_bucket[$__rate_interval])
|
||||
))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '{{pod}} {{name}} p90'
|
||||
- title: Heartbeat Packet Size
|
||||
type: timeseries
|
||||
description: |
|
||||
Shows p90 heartbeat message sizes, helping track network usage and identify anomalies in heartbeat payload.
|
||||
unit: bytes
|
||||
queries:
|
||||
- expr: histogram_quantile(0.9, sum by(pod, le) (greptime_meta_heartbeat_stat_memory_size_bucket))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '{{pod}}'
|
||||
- title: Meta Heartbeat Receive Rate
|
||||
type: timeseries
|
||||
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
|
||||
unit: s
|
||||
queries:
|
||||
- expr: rate(greptime_meta_heartbeat_rate[$__rate_interval])
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '{{pod}}'
|
||||
- title: Meta KV Ops Latency
|
||||
type: timeseries
|
||||
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
|
||||
unit: s
|
||||
queries:
|
||||
- expr: histogram_quantile(0.99, sum by(pod, le, op, target) (greptime_meta_kv_request_elapsed_bucket))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '{{pod}}-{{op}} p99'
|
||||
- title: Rate of meta KV Ops
|
||||
type: timeseries
|
||||
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
|
||||
unit: none
|
||||
queries:
|
||||
- expr: rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '{{pod}}-{{op}} p99'
|
||||
- title: DDL Latency
|
||||
type: timeseries
|
||||
description: Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads.
|
||||
unit: s
|
||||
queries:
|
||||
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: CreateLogicalTables-{{step}} p90
|
||||
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: CreateTable-{{step}} p90
|
||||
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: CreateView-{{step}} p90
|
||||
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: CreateFlow-{{step}} p90
|
||||
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: DropTable-{{step}} p90
|
||||
- expr: histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: AlterTable-{{step}} p90
|
||||
- title: Flownode
|
||||
panels:
|
||||
- title: Flow Ingest / Output Rate
|
||||
|
||||
@@ -6,7 +6,7 @@ DAC_IMAGE=ghcr.io/zyy17/dac:20250423-522bd35
|
||||
|
||||
remove_instance_filters() {
|
||||
# Remove the instance filters for the standalone dashboards.
|
||||
sed 's/instance=~\\"$datanode\\",//; s/instance=~\\"$datanode\\"//; s/instance=~\\"$frontend\\",//; s/instance=~\\"$frontend\\"//; s/instance=~\\"$metasrv\\",//; s/instance=~\\"$metasrv\\"//; s/instance=~\\"$flownode\\",//; s/instance=~\\"$flownode\\"//;' $CLUSTER_DASHBOARD_DIR/dashboard.json > $STANDALONE_DASHBOARD_DIR/dashboard.json
|
||||
sed -E 's/instance=~\\"(\$datanode|\$frontend|\$metasrv|\$flownode)\\",?//g' "$CLUSTER_DASHBOARD_DIR/dashboard.json" > "$STANDALONE_DASHBOARD_DIR/dashboard.json"
|
||||
}
|
||||
|
||||
generate_intermediate_dashboards_and_docs() {
|
||||
|
||||
@@ -26,6 +26,13 @@ excludes = [
|
||||
"src/common/base/src/secrets.rs",
|
||||
"src/servers/src/repeated_field.rs",
|
||||
"src/servers/src/http/test_helpers.rs",
|
||||
# enterprise
|
||||
"src/common/meta/src/rpc/ddl/trigger.rs",
|
||||
"src/operator/src/expr_helper/trigger.rs",
|
||||
"src/sql/src/statements/create/trigger.rs",
|
||||
"src/sql/src/statements/show/trigger.rs",
|
||||
"src/sql/src/parsers/create_parser/trigger.rs",
|
||||
"src/sql/src/parsers/show_parser/trigger.rs",
|
||||
]
|
||||
|
||||
[properties]
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
[toolchain]
|
||||
channel = "nightly-2025-04-15"
|
||||
channel = "nightly-2025-05-19"
|
||||
|
||||
@@ -5,8 +5,12 @@ edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[features]
|
||||
pg_kvbackend = ["common-meta/pg_kvbackend"]
|
||||
mysql_kvbackend = ["common-meta/mysql_kvbackend"]
|
||||
default = [
|
||||
"pg_kvbackend",
|
||||
"mysql_kvbackend",
|
||||
]
|
||||
pg_kvbackend = ["common-meta/pg_kvbackend", "meta-srv/pg_kvbackend"]
|
||||
mysql_kvbackend = ["common-meta/mysql_kvbackend", "meta-srv/mysql_kvbackend"]
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
@@ -43,11 +47,9 @@ etcd-client.workspace = true
|
||||
futures.workspace = true
|
||||
humantime.workspace = true
|
||||
meta-client.workspace = true
|
||||
meta-srv.workspace = true
|
||||
nu-ansi-term = "0.46"
|
||||
opendal = { version = "0.51.1", features = [
|
||||
"services-fs",
|
||||
"services-s3",
|
||||
] }
|
||||
object-store.workspace = true
|
||||
query.workspace = true
|
||||
rand.workspace = true
|
||||
reqwest.workspace = true
|
||||
|
||||
@@ -58,6 +58,7 @@ where
|
||||
info!("{desc}, average operation cost: {cost:.2} ms");
|
||||
}
|
||||
|
||||
/// Command to benchmark table metadata operations.
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct BenchTableMetadataCommand {
|
||||
#[clap(long)]
|
||||
|
||||
@@ -17,6 +17,7 @@ use std::any::Any;
|
||||
use common_error::ext::{BoxedError, ErrorExt};
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_macro::stack_trace_debug;
|
||||
use object_store::Error as ObjectStoreError;
|
||||
use snafu::{Location, Snafu};
|
||||
|
||||
#[derive(Snafu)]
|
||||
@@ -225,7 +226,7 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
#[snafu(source)]
|
||||
error: opendal::Error,
|
||||
error: ObjectStoreError,
|
||||
},
|
||||
#[snafu(display("S3 config need be set"))]
|
||||
S3ConfigNotSet {
|
||||
@@ -237,6 +238,24 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
#[snafu(display("KV backend not set: {}", backend))]
|
||||
KvBackendNotSet {
|
||||
backend: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
#[snafu(display("Unsupported memory backend"))]
|
||||
UnsupportedMemoryBackend {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("File path invalid: {}", msg))]
|
||||
InvalidFilePath {
|
||||
msg: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -255,6 +274,8 @@ impl ErrorExt for Error {
|
||||
| Error::ConnectEtcd { .. }
|
||||
| Error::CreateDir { .. }
|
||||
| Error::EmptyResult { .. }
|
||||
| Error::InvalidFilePath { .. }
|
||||
| Error::UnsupportedMemoryBackend { .. }
|
||||
| Error::ParseProxyOpts { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
Error::StartProcedureManager { source, .. }
|
||||
@@ -273,8 +294,9 @@ impl ErrorExt for Error {
|
||||
|
||||
Error::Other { source, .. } => source.status_code(),
|
||||
Error::OpenDal { .. } => StatusCode::Internal,
|
||||
Error::S3ConfigNotSet { .. } => StatusCode::InvalidArguments,
|
||||
Error::OutputDirNotSet { .. } => StatusCode::InvalidArguments,
|
||||
Error::S3ConfigNotSet { .. }
|
||||
| Error::OutputDirNotSet { .. }
|
||||
| Error::KvBackendNotSet { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
Error::BuildRuntime { source, .. } => source.status_code(),
|
||||
|
||||
|
||||
@@ -19,10 +19,12 @@ use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use clap::{Parser, ValueEnum};
|
||||
use common_base::secrets::{ExposeSecret, SecretString};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_telemetry::{debug, error, info};
|
||||
use opendal::layers::LoggingLayer;
|
||||
use opendal::{services, Operator};
|
||||
use object_store::layers::LoggingLayer;
|
||||
use object_store::services::Oss;
|
||||
use object_store::{services, ObjectStore};
|
||||
use serde_json::Value;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use tokio::sync::Semaphore;
|
||||
@@ -48,6 +50,7 @@ enum ExportTarget {
|
||||
All,
|
||||
}
|
||||
|
||||
/// Command for exporting data from the GreptimeDB.
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct ExportCommand {
|
||||
/// Server address to connect
|
||||
@@ -110,11 +113,26 @@ pub struct ExportCommand {
|
||||
#[clap(long)]
|
||||
s3: bool,
|
||||
|
||||
/// if both `ddl_local_dir` and remote storage (s3/oss) are set, `ddl_local_dir` will be only used for
|
||||
/// exported SQL files, and the data will be exported to remote storage.
|
||||
///
|
||||
/// Note that `ddl_local_dir` export sql files to **LOCAL** file system, this is useful if export client don't have
|
||||
/// direct access to remote storage.
|
||||
///
|
||||
/// if remote storage is set but `ddl_local_dir` is not set, both SQL&data will be exported to remote storage.
|
||||
#[clap(long)]
|
||||
ddl_local_dir: Option<String>,
|
||||
|
||||
/// The s3 bucket name
|
||||
/// if s3 is set, this is required
|
||||
#[clap(long)]
|
||||
s3_bucket: Option<String>,
|
||||
|
||||
// The s3 root path
|
||||
/// if s3 is set, this is required
|
||||
#[clap(long)]
|
||||
s3_root: Option<String>,
|
||||
|
||||
/// The s3 endpoint
|
||||
/// if s3 is set, this is required
|
||||
#[clap(long)]
|
||||
@@ -134,6 +152,30 @@ pub struct ExportCommand {
|
||||
/// if s3 is set, this is required
|
||||
#[clap(long)]
|
||||
s3_region: Option<String>,
|
||||
|
||||
/// if export data to oss
|
||||
#[clap(long)]
|
||||
oss: bool,
|
||||
|
||||
/// The oss bucket name
|
||||
/// if oss is set, this is required
|
||||
#[clap(long)]
|
||||
oss_bucket: Option<String>,
|
||||
|
||||
/// The oss endpoint
|
||||
/// if oss is set, this is required
|
||||
#[clap(long)]
|
||||
oss_endpoint: Option<String>,
|
||||
|
||||
/// The oss access key id
|
||||
/// if oss is set, this is required
|
||||
#[clap(long)]
|
||||
oss_access_key_id: Option<String>,
|
||||
|
||||
/// The oss access key secret
|
||||
/// if oss is set, this is required
|
||||
#[clap(long)]
|
||||
oss_access_key_secret: Option<String>,
|
||||
}
|
||||
|
||||
impl ExportCommand {
|
||||
@@ -147,7 +189,7 @@ impl ExportCommand {
|
||||
{
|
||||
return Err(BoxedError::new(S3ConfigNotSetSnafu {}.build()));
|
||||
}
|
||||
if !self.s3 && self.output_dir.is_none() {
|
||||
if !self.s3 && !self.oss && self.output_dir.is_none() {
|
||||
return Err(BoxedError::new(OutputDirNotSetSnafu {}.build()));
|
||||
}
|
||||
let (catalog, schema) =
|
||||
@@ -172,11 +214,32 @@ impl ExportCommand {
|
||||
start_time: self.start_time.clone(),
|
||||
end_time: self.end_time.clone(),
|
||||
s3: self.s3,
|
||||
ddl_local_dir: self.ddl_local_dir.clone(),
|
||||
s3_bucket: self.s3_bucket.clone(),
|
||||
s3_root: self.s3_root.clone(),
|
||||
s3_endpoint: self.s3_endpoint.clone(),
|
||||
s3_access_key: self.s3_access_key.clone(),
|
||||
s3_secret_key: self.s3_secret_key.clone(),
|
||||
// Wrap sensitive values in SecretString
|
||||
s3_access_key: self
|
||||
.s3_access_key
|
||||
.as_ref()
|
||||
.map(|k| SecretString::from(k.clone())),
|
||||
s3_secret_key: self
|
||||
.s3_secret_key
|
||||
.as_ref()
|
||||
.map(|k| SecretString::from(k.clone())),
|
||||
s3_region: self.s3_region.clone(),
|
||||
oss: self.oss,
|
||||
oss_bucket: self.oss_bucket.clone(),
|
||||
oss_endpoint: self.oss_endpoint.clone(),
|
||||
// Wrap sensitive values in SecretString
|
||||
oss_access_key_id: self
|
||||
.oss_access_key_id
|
||||
.as_ref()
|
||||
.map(|k| SecretString::from(k.clone())),
|
||||
oss_access_key_secret: self
|
||||
.oss_access_key_secret
|
||||
.as_ref()
|
||||
.map(|k| SecretString::from(k.clone())),
|
||||
}))
|
||||
}
|
||||
}
|
||||
@@ -192,21 +255,30 @@ pub struct Export {
|
||||
start_time: Option<String>,
|
||||
end_time: Option<String>,
|
||||
s3: bool,
|
||||
ddl_local_dir: Option<String>,
|
||||
s3_bucket: Option<String>,
|
||||
s3_root: Option<String>,
|
||||
s3_endpoint: Option<String>,
|
||||
s3_access_key: Option<String>,
|
||||
s3_secret_key: Option<String>,
|
||||
// Changed to SecretString for sensitive data
|
||||
s3_access_key: Option<SecretString>,
|
||||
s3_secret_key: Option<SecretString>,
|
||||
s3_region: Option<String>,
|
||||
oss: bool,
|
||||
oss_bucket: Option<String>,
|
||||
oss_endpoint: Option<String>,
|
||||
// Changed to SecretString for sensitive data
|
||||
oss_access_key_id: Option<SecretString>,
|
||||
oss_access_key_secret: Option<SecretString>,
|
||||
}
|
||||
|
||||
impl Export {
|
||||
fn catalog_path(&self) -> PathBuf {
|
||||
if self.s3 {
|
||||
if self.s3 || self.oss {
|
||||
PathBuf::from(&self.catalog)
|
||||
} else if let Some(dir) = &self.output_dir {
|
||||
PathBuf::from(dir).join(&self.catalog)
|
||||
} else {
|
||||
unreachable!("catalog_path: output_dir must be set when not using s3")
|
||||
unreachable!("catalog_path: output_dir must be set when not using remote storage")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -364,7 +436,7 @@ impl Export {
|
||||
let timer = Instant::now();
|
||||
let db_names = self.get_db_names().await?;
|
||||
let db_count = db_names.len();
|
||||
let operator = self.build_operator().await?;
|
||||
let operator = self.build_prefer_fs_operator().await?;
|
||||
|
||||
for schema in db_names {
|
||||
let create_database = self
|
||||
@@ -394,7 +466,7 @@ impl Export {
|
||||
let semaphore = Arc::new(Semaphore::new(self.parallelism));
|
||||
let db_names = self.get_db_names().await?;
|
||||
let db_count = db_names.len();
|
||||
let operator = Arc::new(self.build_operator().await?);
|
||||
let operator = Arc::new(self.build_prefer_fs_operator().await?);
|
||||
let mut tasks = Vec::with_capacity(db_names.len());
|
||||
|
||||
for schema in db_names {
|
||||
@@ -408,7 +480,7 @@ impl Export {
|
||||
.await?;
|
||||
|
||||
// Create directory if needed for file system storage
|
||||
if !export_self.s3 {
|
||||
if !export_self.s3 && !export_self.oss {
|
||||
let db_dir = format!("{}/{}/", export_self.catalog, schema);
|
||||
operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
|
||||
}
|
||||
@@ -451,21 +523,45 @@ impl Export {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn build_operator(&self) -> Result<Operator> {
|
||||
async fn build_operator(&self) -> Result<ObjectStore> {
|
||||
if self.s3 {
|
||||
self.build_s3_operator().await
|
||||
} else if self.oss {
|
||||
self.build_oss_operator().await
|
||||
} else {
|
||||
self.build_fs_operator().await
|
||||
}
|
||||
}
|
||||
|
||||
async fn build_s3_operator(&self) -> Result<Operator> {
|
||||
let mut builder = services::S3::default().root("").bucket(
|
||||
/// build operator with preference for file system
|
||||
async fn build_prefer_fs_operator(&self) -> Result<ObjectStore> {
|
||||
if (self.s3 || self.oss) && self.ddl_local_dir.is_some() {
|
||||
let root = self.ddl_local_dir.as_ref().unwrap().clone();
|
||||
let op = ObjectStore::new(services::Fs::default().root(&root))
|
||||
.context(OpenDalSnafu)?
|
||||
.layer(LoggingLayer::default())
|
||||
.finish();
|
||||
Ok(op)
|
||||
} else if self.s3 {
|
||||
self.build_s3_operator().await
|
||||
} else if self.oss {
|
||||
self.build_oss_operator().await
|
||||
} else {
|
||||
self.build_fs_operator().await
|
||||
}
|
||||
}
|
||||
|
||||
async fn build_s3_operator(&self) -> Result<ObjectStore> {
|
||||
let mut builder = services::S3::default().bucket(
|
||||
self.s3_bucket
|
||||
.as_ref()
|
||||
.expect("s3_bucket must be provided when s3 is enabled"),
|
||||
);
|
||||
|
||||
if let Some(root) = self.s3_root.as_ref() {
|
||||
builder = builder.root(root);
|
||||
}
|
||||
|
||||
if let Some(endpoint) = self.s3_endpoint.as_ref() {
|
||||
builder = builder.endpoint(endpoint);
|
||||
}
|
||||
@@ -475,27 +571,51 @@ impl Export {
|
||||
}
|
||||
|
||||
if let Some(key_id) = self.s3_access_key.as_ref() {
|
||||
builder = builder.access_key_id(key_id);
|
||||
builder = builder.access_key_id(key_id.expose_secret());
|
||||
}
|
||||
|
||||
if let Some(secret_key) = self.s3_secret_key.as_ref() {
|
||||
builder = builder.secret_access_key(secret_key);
|
||||
builder = builder.secret_access_key(secret_key.expose_secret());
|
||||
}
|
||||
|
||||
let op = Operator::new(builder)
|
||||
let op = ObjectStore::new(builder)
|
||||
.context(OpenDalSnafu)?
|
||||
.layer(LoggingLayer::default())
|
||||
.finish();
|
||||
Ok(op)
|
||||
}
|
||||
|
||||
async fn build_fs_operator(&self) -> Result<Operator> {
|
||||
async fn build_oss_operator(&self) -> Result<ObjectStore> {
|
||||
let mut builder = Oss::default()
|
||||
.bucket(self.oss_bucket.as_ref().expect("oss_bucket must be set"))
|
||||
.endpoint(
|
||||
self.oss_endpoint
|
||||
.as_ref()
|
||||
.expect("oss_endpoint must be set"),
|
||||
);
|
||||
|
||||
// Use expose_secret() to access the actual secret value
|
||||
if let Some(key_id) = self.oss_access_key_id.as_ref() {
|
||||
builder = builder.access_key_id(key_id.expose_secret());
|
||||
}
|
||||
if let Some(secret_key) = self.oss_access_key_secret.as_ref() {
|
||||
builder = builder.access_key_secret(secret_key.expose_secret());
|
||||
}
|
||||
|
||||
let op = ObjectStore::new(builder)
|
||||
.context(OpenDalSnafu)?
|
||||
.layer(LoggingLayer::default())
|
||||
.finish();
|
||||
Ok(op)
|
||||
}
|
||||
|
||||
async fn build_fs_operator(&self) -> Result<ObjectStore> {
|
||||
let root = self
|
||||
.output_dir
|
||||
.as_ref()
|
||||
.context(OutputDirNotSetSnafu)?
|
||||
.clone();
|
||||
let op = Operator::new(services::Fs::default().root(&root))
|
||||
let op = ObjectStore::new(services::Fs::default().root(&root))
|
||||
.context(OpenDalSnafu)?
|
||||
.layer(LoggingLayer::default())
|
||||
.finish();
|
||||
@@ -509,6 +629,7 @@ impl Export {
|
||||
let db_count = db_names.len();
|
||||
let mut tasks = Vec::with_capacity(db_count);
|
||||
let operator = Arc::new(self.build_operator().await?);
|
||||
let fs_first_operator = Arc::new(self.build_prefer_fs_operator().await?);
|
||||
let with_options = build_with_options(&self.start_time, &self.end_time);
|
||||
|
||||
for schema in db_names {
|
||||
@@ -516,12 +637,13 @@ impl Export {
|
||||
let export_self = self.clone();
|
||||
let with_options_clone = with_options.clone();
|
||||
let operator = operator.clone();
|
||||
let fs_first_operator = fs_first_operator.clone();
|
||||
|
||||
tasks.push(async move {
|
||||
let _permit = semaphore_moved.acquire().await.unwrap();
|
||||
|
||||
// Create directory if not using S3
|
||||
if !export_self.s3 {
|
||||
// Create directory if not using remote storage
|
||||
if !export_self.s3 && !export_self.oss {
|
||||
let db_dir = format!("{}/{}/", export_self.catalog, schema);
|
||||
operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
|
||||
}
|
||||
@@ -533,7 +655,11 @@ impl Export {
|
||||
r#"COPY DATABASE "{}"."{}" TO '{}' WITH ({}){};"#,
|
||||
export_self.catalog, schema, path, with_options_clone, connection_part
|
||||
);
|
||||
info!("Executing sql: {sql}");
|
||||
|
||||
// Log SQL command but mask sensitive information
|
||||
let safe_sql = export_self.mask_sensitive_sql(&sql);
|
||||
info!("Executing sql: {}", safe_sql);
|
||||
|
||||
export_self.database_client.sql_in_public(&sql).await?;
|
||||
info!(
|
||||
"Finished exporting {}.{} data to {}",
|
||||
@@ -549,7 +675,7 @@ impl Export {
|
||||
let copy_from_path = export_self.get_file_path(&schema, "copy_from.sql");
|
||||
export_self
|
||||
.write_to_storage(
|
||||
&operator,
|
||||
&fs_first_operator,
|
||||
©_from_path,
|
||||
copy_database_from_sql.into_bytes(),
|
||||
)
|
||||
@@ -573,6 +699,29 @@ impl Export {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Mask sensitive information in SQL commands for safe logging
|
||||
fn mask_sensitive_sql(&self, sql: &str) -> String {
|
||||
let mut masked_sql = sql.to_string();
|
||||
|
||||
// Mask S3 credentials
|
||||
if let Some(access_key) = &self.s3_access_key {
|
||||
masked_sql = masked_sql.replace(access_key.expose_secret(), "[REDACTED]");
|
||||
}
|
||||
if let Some(secret_key) = &self.s3_secret_key {
|
||||
masked_sql = masked_sql.replace(secret_key.expose_secret(), "[REDACTED]");
|
||||
}
|
||||
|
||||
// Mask OSS credentials
|
||||
if let Some(access_key_id) = &self.oss_access_key_id {
|
||||
masked_sql = masked_sql.replace(access_key_id.expose_secret(), "[REDACTED]");
|
||||
}
|
||||
if let Some(access_key_secret) = &self.oss_access_key_secret {
|
||||
masked_sql = masked_sql.replace(access_key_secret.expose_secret(), "[REDACTED]");
|
||||
}
|
||||
|
||||
masked_sql
|
||||
}
|
||||
|
||||
fn get_file_path(&self, schema: &str, file_name: &str) -> String {
|
||||
format!("{}/{}/{}", self.catalog, schema, file_name)
|
||||
}
|
||||
@@ -580,8 +729,20 @@ impl Export {
|
||||
fn format_output_path(&self, file_path: &str) -> String {
|
||||
if self.s3 {
|
||||
format!(
|
||||
"s3://{}/{}",
|
||||
"s3://{}{}/{}",
|
||||
self.s3_bucket.as_ref().unwrap_or(&String::new()),
|
||||
if let Some(root) = &self.s3_root {
|
||||
format!("/{}", root)
|
||||
} else {
|
||||
String::new()
|
||||
},
|
||||
file_path
|
||||
)
|
||||
} else if self.oss {
|
||||
format!(
|
||||
"oss://{}/{}/{}",
|
||||
self.oss_bucket.as_ref().unwrap_or(&String::new()),
|
||||
self.catalog,
|
||||
file_path
|
||||
)
|
||||
} else {
|
||||
@@ -595,19 +756,27 @@ impl Export {
|
||||
|
||||
async fn write_to_storage(
|
||||
&self,
|
||||
op: &Operator,
|
||||
op: &ObjectStore,
|
||||
file_path: &str,
|
||||
content: Vec<u8>,
|
||||
) -> Result<()> {
|
||||
op.write(file_path, content).await.context(OpenDalSnafu)
|
||||
op.write(file_path, content)
|
||||
.await
|
||||
.context(OpenDalSnafu)
|
||||
.map(|_| ())
|
||||
}
|
||||
|
||||
fn get_storage_params(&self, schema: &str) -> (String, String) {
|
||||
if self.s3 {
|
||||
let s3_path = format!(
|
||||
"s3://{}/{}/{}/",
|
||||
"s3://{}{}/{}/{}/",
|
||||
// Safety: s3_bucket is required when s3 is enabled
|
||||
self.s3_bucket.as_ref().unwrap(),
|
||||
if let Some(root) = &self.s3_root {
|
||||
format!("/{}", root)
|
||||
} else {
|
||||
String::new()
|
||||
},
|
||||
self.catalog,
|
||||
schema
|
||||
);
|
||||
@@ -620,15 +789,36 @@ impl Export {
|
||||
};
|
||||
|
||||
// Safety: All s3 options are required
|
||||
// Use expose_secret() to access the actual secret values
|
||||
let connection_options = format!(
|
||||
"ACCESS_KEY_ID='{}', SECRET_ACCESS_KEY='{}', REGION='{}'{}",
|
||||
self.s3_access_key.as_ref().unwrap(),
|
||||
self.s3_secret_key.as_ref().unwrap(),
|
||||
self.s3_access_key.as_ref().unwrap().expose_secret(),
|
||||
self.s3_secret_key.as_ref().unwrap().expose_secret(),
|
||||
self.s3_region.as_ref().unwrap(),
|
||||
endpoint_option
|
||||
);
|
||||
|
||||
(s3_path, format!(" CONNECTION ({})", connection_options))
|
||||
} else if self.oss {
|
||||
let oss_path = format!(
|
||||
"oss://{}/{}/{}/",
|
||||
self.oss_bucket.as_ref().unwrap(),
|
||||
self.catalog,
|
||||
schema
|
||||
);
|
||||
let endpoint_option = if let Some(endpoint) = self.oss_endpoint.as_ref() {
|
||||
format!(", ENDPOINT='{}'", endpoint)
|
||||
} else {
|
||||
String::new()
|
||||
};
|
||||
|
||||
let connection_options = format!(
|
||||
"ACCESS_KEY_ID='{}', ACCESS_KEY_SECRET='{}'{}",
|
||||
self.oss_access_key_id.as_ref().unwrap().expose_secret(),
|
||||
self.oss_access_key_secret.as_ref().unwrap().expose_secret(),
|
||||
endpoint_option
|
||||
);
|
||||
(oss_path, format!(" CONNECTION ({})", connection_options))
|
||||
} else {
|
||||
(
|
||||
self.catalog_path()
|
||||
|
||||
@@ -40,6 +40,7 @@ enum ImportTarget {
|
||||
All,
|
||||
}
|
||||
|
||||
/// Command to import data from a directory into a GreptimeDB instance.
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct ImportCommand {
|
||||
/// Server address to connect
|
||||
|
||||
@@ -17,9 +17,10 @@ mod database;
|
||||
pub mod error;
|
||||
mod export;
|
||||
mod import;
|
||||
mod meta_snapshot;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use clap::Parser;
|
||||
use clap::{Parser, Subcommand};
|
||||
use common_error::ext::BoxedError;
|
||||
pub use database::DatabaseClient;
|
||||
use error::Result;
|
||||
@@ -27,6 +28,7 @@ use error::Result;
|
||||
pub use crate::bench::BenchTableMetadataCommand;
|
||||
pub use crate::export::ExportCommand;
|
||||
pub use crate::import::ImportCommand;
|
||||
pub use crate::meta_snapshot::{MetaCommand, MetaInfoCommand, MetaRestoreCommand, MetaSaveCommand};
|
||||
|
||||
#[async_trait]
|
||||
pub trait Tool: Send + Sync {
|
||||
@@ -49,3 +51,19 @@ impl AttachCommand {
|
||||
unimplemented!("Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373")
|
||||
}
|
||||
}
|
||||
|
||||
/// Subcommand for data operations like export and import.
|
||||
#[derive(Subcommand)]
|
||||
pub enum DataCommand {
|
||||
Export(ExportCommand),
|
||||
Import(ImportCommand),
|
||||
}
|
||||
|
||||
impl DataCommand {
|
||||
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
|
||||
match self {
|
||||
DataCommand::Export(cmd) => cmd.build().await,
|
||||
DataCommand::Import(cmd) => cmd.build().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
459
src/cli/src/meta_snapshot.rs
Normal file
459
src/cli/src/meta_snapshot.rs
Normal file
@@ -0,0 +1,459 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use clap::{Parser, Subcommand};
|
||||
use common_base::secrets::{ExposeSecret, SecretString};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::kv_backend::chroot::ChrootKvBackend;
|
||||
use common_meta::kv_backend::etcd::EtcdStore;
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use common_meta::snapshot::MetadataSnapshotManager;
|
||||
use meta_srv::bootstrap::create_etcd_client;
|
||||
use meta_srv::metasrv::BackendImpl;
|
||||
use object_store::services::{Fs, S3};
|
||||
use object_store::ObjectStore;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::error::{
|
||||
InvalidFilePathSnafu, KvBackendNotSetSnafu, OpenDalSnafu, S3ConfigNotSetSnafu,
|
||||
UnsupportedMemoryBackendSnafu,
|
||||
};
|
||||
use crate::Tool;
|
||||
|
||||
/// Subcommand for metadata snapshot management.
|
||||
#[derive(Subcommand)]
|
||||
pub enum MetaCommand {
|
||||
#[clap(subcommand)]
|
||||
Snapshot(MetaSnapshotCommand),
|
||||
}
|
||||
|
||||
impl MetaCommand {
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
match self {
|
||||
MetaCommand::Snapshot(cmd) => cmd.build().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Subcommand for metadata snapshot operations. such as save, restore and info.
|
||||
#[derive(Subcommand)]
|
||||
pub enum MetaSnapshotCommand {
|
||||
/// Export metadata snapshot tool.
|
||||
Save(MetaSaveCommand),
|
||||
/// Restore metadata snapshot tool.
|
||||
Restore(MetaRestoreCommand),
|
||||
/// Explore metadata from metadata snapshot.
|
||||
Info(MetaInfoCommand),
|
||||
}
|
||||
|
||||
impl MetaSnapshotCommand {
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
match self {
|
||||
MetaSnapshotCommand::Save(cmd) => cmd.build().await,
|
||||
MetaSnapshotCommand::Restore(cmd) => cmd.build().await,
|
||||
MetaSnapshotCommand::Info(cmd) => cmd.build().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Parser)]
|
||||
struct MetaConnection {
|
||||
/// The endpoint of store. one of etcd, pg or mysql.
|
||||
#[clap(long, alias = "store-addr", value_delimiter = ',', num_args = 1..)]
|
||||
store_addrs: Vec<String>,
|
||||
/// The database backend.
|
||||
#[clap(long, value_enum)]
|
||||
backend: Option<BackendImpl>,
|
||||
#[clap(long, default_value = "")]
|
||||
store_key_prefix: String,
|
||||
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
|
||||
#[clap(long,default_value = common_meta::kv_backend::DEFAULT_META_TABLE_NAME)]
|
||||
meta_table_name: String,
|
||||
#[clap(long, default_value = "128")]
|
||||
max_txn_ops: usize,
|
||||
}
|
||||
|
||||
impl MetaConnection {
|
||||
pub async fn build(&self) -> Result<KvBackendRef, BoxedError> {
|
||||
let max_txn_ops = self.max_txn_ops;
|
||||
let store_addrs = &self.store_addrs;
|
||||
if store_addrs.is_empty() {
|
||||
KvBackendNotSetSnafu { backend: "all" }
|
||||
.fail()
|
||||
.map_err(BoxedError::new)
|
||||
} else {
|
||||
let kvbackend = match self.backend {
|
||||
Some(BackendImpl::EtcdStore) => {
|
||||
let etcd_client = create_etcd_client(store_addrs)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
Ok(EtcdStore::with_etcd_client(etcd_client, max_txn_ops))
|
||||
}
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
Some(BackendImpl::PostgresStore) => {
|
||||
let table_name = &self.meta_table_name;
|
||||
let pool = meta_srv::bootstrap::create_postgres_pool(store_addrs)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
Ok(common_meta::kv_backend::rds::PgStore::with_pg_pool(
|
||||
pool,
|
||||
table_name,
|
||||
max_txn_ops,
|
||||
)
|
||||
.await
|
||||
.map_err(BoxedError::new)?)
|
||||
}
|
||||
#[cfg(feature = "mysql_kvbackend")]
|
||||
Some(BackendImpl::MysqlStore) => {
|
||||
let table_name = &self.meta_table_name;
|
||||
let pool = meta_srv::bootstrap::create_mysql_pool(store_addrs)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
Ok(common_meta::kv_backend::rds::MySqlStore::with_mysql_pool(
|
||||
pool,
|
||||
table_name,
|
||||
max_txn_ops,
|
||||
)
|
||||
.await
|
||||
.map_err(BoxedError::new)?)
|
||||
}
|
||||
Some(BackendImpl::MemoryStore) => UnsupportedMemoryBackendSnafu
|
||||
.fail()
|
||||
.map_err(BoxedError::new),
|
||||
_ => KvBackendNotSetSnafu { backend: "all" }
|
||||
.fail()
|
||||
.map_err(BoxedError::new),
|
||||
};
|
||||
if self.store_key_prefix.is_empty() {
|
||||
kvbackend
|
||||
} else {
|
||||
let chroot_kvbackend =
|
||||
ChrootKvBackend::new(self.store_key_prefix.as_bytes().to_vec(), kvbackend?);
|
||||
Ok(Arc::new(chroot_kvbackend))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(qtang): Abstract a generic s3 config for export import meta snapshot restore
|
||||
#[derive(Debug, Default, Parser)]
|
||||
struct S3Config {
|
||||
/// whether to use s3 as the output directory. default is false.
|
||||
#[clap(long, default_value = "false")]
|
||||
s3: bool,
|
||||
/// The s3 bucket name.
|
||||
#[clap(long)]
|
||||
s3_bucket: Option<String>,
|
||||
/// The s3 region.
|
||||
#[clap(long)]
|
||||
s3_region: Option<String>,
|
||||
/// The s3 access key.
|
||||
#[clap(long)]
|
||||
s3_access_key: Option<SecretString>,
|
||||
/// The s3 secret key.
|
||||
#[clap(long)]
|
||||
s3_secret_key: Option<SecretString>,
|
||||
/// The s3 endpoint. we will automatically use the default s3 decided by the region if not set.
|
||||
#[clap(long)]
|
||||
s3_endpoint: Option<String>,
|
||||
}
|
||||
|
||||
impl S3Config {
|
||||
pub fn build(&self, root: &str) -> Result<Option<ObjectStore>, BoxedError> {
|
||||
if !self.s3 {
|
||||
Ok(None)
|
||||
} else {
|
||||
if self.s3_region.is_none()
|
||||
|| self.s3_access_key.is_none()
|
||||
|| self.s3_secret_key.is_none()
|
||||
|| self.s3_bucket.is_none()
|
||||
{
|
||||
return S3ConfigNotSetSnafu.fail().map_err(BoxedError::new);
|
||||
}
|
||||
// Safety, unwrap is safe because we have checked the options above.
|
||||
let mut config = S3::default()
|
||||
.bucket(self.s3_bucket.as_ref().unwrap())
|
||||
.region(self.s3_region.as_ref().unwrap())
|
||||
.access_key_id(self.s3_access_key.as_ref().unwrap().expose_secret())
|
||||
.secret_access_key(self.s3_secret_key.as_ref().unwrap().expose_secret());
|
||||
|
||||
if !root.is_empty() && root != "." {
|
||||
config = config.root(root);
|
||||
}
|
||||
|
||||
if let Some(endpoint) = &self.s3_endpoint {
|
||||
config = config.endpoint(endpoint);
|
||||
}
|
||||
Ok(Some(
|
||||
ObjectStore::new(config)
|
||||
.context(OpenDalSnafu)
|
||||
.map_err(BoxedError::new)?
|
||||
.finish(),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Export metadata snapshot tool.
|
||||
/// This tool is used to export metadata snapshot from etcd, pg or mysql.
|
||||
/// It will dump the metadata snapshot to local file or s3 bucket.
|
||||
/// The snapshot file will be in binary format.
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct MetaSaveCommand {
|
||||
/// The connection to the metadata store.
|
||||
#[clap(flatten)]
|
||||
connection: MetaConnection,
|
||||
/// The s3 config.
|
||||
#[clap(flatten)]
|
||||
s3_config: S3Config,
|
||||
/// The name of the target snapshot file. we will add the file extension automatically.
|
||||
#[clap(long, default_value = "metadata_snapshot")]
|
||||
file_name: String,
|
||||
/// The directory to store the snapshot file.
|
||||
/// if target output is s3 bucket, this is the root directory in the bucket.
|
||||
/// if target output is local file, this is the local directory.
|
||||
#[clap(long, default_value = "")]
|
||||
output_dir: String,
|
||||
}
|
||||
|
||||
fn create_local_file_object_store(root: &str) -> Result<ObjectStore, BoxedError> {
|
||||
let root = if root.is_empty() { "." } else { root };
|
||||
let object_store = ObjectStore::new(Fs::default().root(root))
|
||||
.context(OpenDalSnafu)
|
||||
.map_err(BoxedError::new)?
|
||||
.finish();
|
||||
Ok(object_store)
|
||||
}
|
||||
|
||||
impl MetaSaveCommand {
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
let kvbackend = self.connection.build().await?;
|
||||
let output_dir = &self.output_dir;
|
||||
let object_store = self.s3_config.build(output_dir).map_err(BoxedError::new)?;
|
||||
if let Some(store) = object_store {
|
||||
let tool = MetaSnapshotTool {
|
||||
inner: MetadataSnapshotManager::new(kvbackend, store),
|
||||
target_file: self.file_name.clone(),
|
||||
};
|
||||
Ok(Box::new(tool))
|
||||
} else {
|
||||
let object_store = create_local_file_object_store(output_dir)?;
|
||||
let tool = MetaSnapshotTool {
|
||||
inner: MetadataSnapshotManager::new(kvbackend, object_store),
|
||||
target_file: self.file_name.clone(),
|
||||
};
|
||||
Ok(Box::new(tool))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MetaSnapshotTool {
|
||||
inner: MetadataSnapshotManager,
|
||||
target_file: String,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Tool for MetaSnapshotTool {
|
||||
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
|
||||
self.inner
|
||||
.dump("", &self.target_file)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Restore metadata snapshot tool.
|
||||
/// This tool is used to restore metadata snapshot from etcd, pg or mysql.
|
||||
/// It will restore the metadata snapshot from local file or s3 bucket.
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct MetaRestoreCommand {
|
||||
/// The connection to the metadata store.
|
||||
#[clap(flatten)]
|
||||
connection: MetaConnection,
|
||||
/// The s3 config.
|
||||
#[clap(flatten)]
|
||||
s3_config: S3Config,
|
||||
/// The name of the target snapshot file.
|
||||
#[clap(long, default_value = "metadata_snapshot.metadata.fb")]
|
||||
file_name: String,
|
||||
/// The directory to store the snapshot file.
|
||||
#[clap(long, default_value = ".")]
|
||||
input_dir: String,
|
||||
#[clap(long, default_value = "false")]
|
||||
force: bool,
|
||||
}
|
||||
|
||||
impl MetaRestoreCommand {
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
let kvbackend = self.connection.build().await?;
|
||||
let input_dir = &self.input_dir;
|
||||
let object_store = self.s3_config.build(input_dir).map_err(BoxedError::new)?;
|
||||
if let Some(store) = object_store {
|
||||
let tool = MetaRestoreTool::new(
|
||||
MetadataSnapshotManager::new(kvbackend, store),
|
||||
self.file_name.clone(),
|
||||
self.force,
|
||||
);
|
||||
Ok(Box::new(tool))
|
||||
} else {
|
||||
let object_store = create_local_file_object_store(input_dir)?;
|
||||
let tool = MetaRestoreTool::new(
|
||||
MetadataSnapshotManager::new(kvbackend, object_store),
|
||||
self.file_name.clone(),
|
||||
self.force,
|
||||
);
|
||||
Ok(Box::new(tool))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MetaRestoreTool {
|
||||
inner: MetadataSnapshotManager,
|
||||
source_file: String,
|
||||
force: bool,
|
||||
}
|
||||
|
||||
impl MetaRestoreTool {
|
||||
pub fn new(inner: MetadataSnapshotManager, source_file: String, force: bool) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
source_file,
|
||||
force,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Tool for MetaRestoreTool {
|
||||
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
|
||||
let clean = self
|
||||
.inner
|
||||
.check_target_source_clean()
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
if clean {
|
||||
common_telemetry::info!(
|
||||
"The target source is clean, we will restore the metadata snapshot."
|
||||
);
|
||||
self.inner
|
||||
.restore(&self.source_file)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
Ok(())
|
||||
} else if !self.force {
|
||||
common_telemetry::warn!(
|
||||
"The target source is not clean, if you want to restore the metadata snapshot forcefully, please use --force option."
|
||||
);
|
||||
Ok(())
|
||||
} else {
|
||||
common_telemetry::info!("The target source is not clean, We will restore the metadata snapshot with --force.");
|
||||
self.inner
|
||||
.restore(&self.source_file)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Explore metadata from metadata snapshot.
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct MetaInfoCommand {
|
||||
/// The s3 config.
|
||||
#[clap(flatten)]
|
||||
s3_config: S3Config,
|
||||
/// The name of the target snapshot file. we will add the file extension automatically.
|
||||
#[clap(long, default_value = "metadata_snapshot")]
|
||||
file_name: String,
|
||||
/// The query string to filter the metadata.
|
||||
#[clap(long, default_value = "*")]
|
||||
inspect_key: String,
|
||||
/// The limit of the metadata to query.
|
||||
#[clap(long)]
|
||||
limit: Option<usize>,
|
||||
}
|
||||
|
||||
pub struct MetaInfoTool {
|
||||
inner: ObjectStore,
|
||||
source_file: String,
|
||||
inspect_key: String,
|
||||
limit: Option<usize>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Tool for MetaInfoTool {
|
||||
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
|
||||
let result = MetadataSnapshotManager::info(
|
||||
&self.inner,
|
||||
&self.source_file,
|
||||
&self.inspect_key,
|
||||
self.limit,
|
||||
)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
for item in result {
|
||||
println!("{}", item);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl MetaInfoCommand {
|
||||
fn decide_object_store_root_for_local_store(
|
||||
file_path: &str,
|
||||
) -> Result<(&str, &str), BoxedError> {
|
||||
let path = Path::new(file_path);
|
||||
let parent = path
|
||||
.parent()
|
||||
.and_then(|p| p.to_str())
|
||||
.context(InvalidFilePathSnafu { msg: file_path })
|
||||
.map_err(BoxedError::new)?;
|
||||
let file_name = path
|
||||
.file_name()
|
||||
.and_then(|f| f.to_str())
|
||||
.context(InvalidFilePathSnafu { msg: file_path })
|
||||
.map_err(BoxedError::new)?;
|
||||
let root = if parent.is_empty() { "." } else { parent };
|
||||
Ok((root, file_name))
|
||||
}
|
||||
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
let object_store = self.s3_config.build("").map_err(BoxedError::new)?;
|
||||
if let Some(store) = object_store {
|
||||
let tool = MetaInfoTool {
|
||||
inner: store,
|
||||
source_file: self.file_name.clone(),
|
||||
inspect_key: self.inspect_key.clone(),
|
||||
limit: self.limit,
|
||||
};
|
||||
Ok(Box::new(tool))
|
||||
} else {
|
||||
let (root, file_name) =
|
||||
Self::decide_object_store_root_for_local_store(&self.file_name)?;
|
||||
let object_store = create_local_file_object_store(root)?;
|
||||
let tool = MetaInfoTool {
|
||||
inner: object_store,
|
||||
source_file: file_name.to_string(),
|
||||
inspect_key: self.inspect_key.clone(),
|
||||
limit: self.limit,
|
||||
};
|
||||
Ok(Box::new(tool))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -25,6 +25,7 @@ common-meta.workspace = true
|
||||
common-query.workspace = true
|
||||
common-recordbatch.workspace = true
|
||||
common-telemetry.workspace = true
|
||||
datatypes.workspace = true
|
||||
enum_dispatch = "0.3"
|
||||
futures.workspace = true
|
||||
futures-util.workspace = true
|
||||
|
||||
@@ -162,14 +162,23 @@ impl Client {
|
||||
.as_bytes() as usize
|
||||
}
|
||||
|
||||
pub fn make_flight_client(&self) -> Result<FlightClient> {
|
||||
pub fn make_flight_client(
|
||||
&self,
|
||||
send_compression: bool,
|
||||
accept_compression: bool,
|
||||
) -> Result<FlightClient> {
|
||||
let (addr, channel) = self.find_channel()?;
|
||||
|
||||
let client = FlightServiceClient::new(channel)
|
||||
let mut client = FlightServiceClient::new(channel)
|
||||
.max_decoding_message_size(self.max_grpc_recv_message_size())
|
||||
.max_encoding_message_size(self.max_grpc_send_message_size())
|
||||
.accept_compressed(CompressionEncoding::Zstd)
|
||||
.send_compressed(CompressionEncoding::Zstd);
|
||||
.max_encoding_message_size(self.max_grpc_send_message_size());
|
||||
// todo(hl): support compression methods.
|
||||
if send_compression {
|
||||
client = client.send_compressed(CompressionEncoding::Zstd);
|
||||
}
|
||||
if accept_compression {
|
||||
client = client.accept_compressed(CompressionEncoding::Zstd);
|
||||
}
|
||||
|
||||
Ok(FlightClient { addr, client })
|
||||
}
|
||||
@@ -178,9 +187,7 @@ impl Client {
|
||||
let (addr, channel) = self.find_channel()?;
|
||||
let client = PbRegionClient::new(channel)
|
||||
.max_decoding_message_size(self.max_grpc_recv_message_size())
|
||||
.max_encoding_message_size(self.max_grpc_send_message_size())
|
||||
.accept_compressed(CompressionEncoding::Zstd)
|
||||
.send_compressed(CompressionEncoding::Zstd);
|
||||
.max_encoding_message_size(self.max_grpc_send_message_size());
|
||||
Ok((addr, client))
|
||||
}
|
||||
|
||||
|
||||
@@ -49,7 +49,16 @@ impl NodeManager for NodeClients {
|
||||
async fn datanode(&self, datanode: &Peer) -> DatanodeRef {
|
||||
let client = self.get_client(datanode).await;
|
||||
|
||||
Arc::new(RegionRequester::new(client))
|
||||
let ChannelConfig {
|
||||
send_compression,
|
||||
accept_compression,
|
||||
..
|
||||
} = self.channel_manager.config();
|
||||
Arc::new(RegionRequester::new(
|
||||
client,
|
||||
*send_compression,
|
||||
*accept_compression,
|
||||
))
|
||||
}
|
||||
|
||||
async fn flownode(&self, flownode: &Peer) -> FlownodeRef {
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
use std::pin::Pin;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::auth_header::AuthScheme;
|
||||
use api::v1::ddl_request::Expr as DdlExpr;
|
||||
@@ -35,7 +36,7 @@ use common_grpc::flight::do_put::DoPutResponse;
|
||||
use common_grpc::flight::{FlightDecoder, FlightMessage};
|
||||
use common_query::Output;
|
||||
use common_recordbatch::error::ExternalSnafu;
|
||||
use common_recordbatch::RecordBatchStreamWrapper;
|
||||
use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper};
|
||||
use common_telemetry::tracing_context::W3cTrace;
|
||||
use common_telemetry::{error, warn};
|
||||
use futures::future;
|
||||
@@ -49,7 +50,7 @@ use crate::error::{
|
||||
ConvertFlightDataSnafu, Error, FlightGetSnafu, IllegalFlightMessagesSnafu,
|
||||
InvalidTonicMetadataValueSnafu, ServerSnafu,
|
||||
};
|
||||
use crate::{from_grpc_response, Client, Result};
|
||||
use crate::{error, from_grpc_response, Client, Result};
|
||||
|
||||
type FlightDataStream = Pin<Box<dyn Stream<Item = FlightData> + Send>>;
|
||||
|
||||
@@ -286,7 +287,7 @@ impl Database {
|
||||
let mut request = tonic::Request::new(request);
|
||||
Self::put_hints(request.metadata_mut(), hints)?;
|
||||
|
||||
let mut client = self.client.make_flight_client()?;
|
||||
let mut client = self.client.make_flight_client(false, false)?;
|
||||
|
||||
let response = client.mut_inner().do_get(request).await.or_else(|e| {
|
||||
let tonic_code = e.code();
|
||||
@@ -315,7 +316,7 @@ impl Database {
|
||||
let mut flight_message_stream = flight_data_stream.map(move |flight_data| {
|
||||
flight_data
|
||||
.map_err(Error::from)
|
||||
.and_then(|data| decoder.try_decode(data).context(ConvertFlightDataSnafu))
|
||||
.and_then(|data| decoder.try_decode(&data).context(ConvertFlightDataSnafu))
|
||||
});
|
||||
|
||||
let Some(first_flight_message) = flight_message_stream.next().await else {
|
||||
@@ -337,20 +338,30 @@ impl Database {
|
||||
);
|
||||
Ok(Output::new_with_affected_rows(rows))
|
||||
}
|
||||
FlightMessage::Recordbatch(_) | FlightMessage::Metrics(_) => {
|
||||
FlightMessage::RecordBatch(_) | FlightMessage::Metrics(_) => {
|
||||
IllegalFlightMessagesSnafu {
|
||||
reason: "The first flight message cannot be a RecordBatch or Metrics message",
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
FlightMessage::Schema(schema) => {
|
||||
let schema = Arc::new(
|
||||
datatypes::schema::Schema::try_from(schema)
|
||||
.context(error::ConvertSchemaSnafu)?,
|
||||
);
|
||||
let schema_cloned = schema.clone();
|
||||
let stream = Box::pin(stream!({
|
||||
while let Some(flight_message) = flight_message_stream.next().await {
|
||||
let flight_message = flight_message
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
match flight_message {
|
||||
FlightMessage::Recordbatch(record_batch) => yield Ok(record_batch),
|
||||
FlightMessage::RecordBatch(arrow_batch) => {
|
||||
yield RecordBatch::try_from_df_record_batch(
|
||||
schema_cloned.clone(),
|
||||
arrow_batch,
|
||||
)
|
||||
}
|
||||
FlightMessage::Metrics(_) => {}
|
||||
FlightMessage::AffectedRows(_) | FlightMessage::Schema(_) => {
|
||||
yield IllegalFlightMessagesSnafu {reason: format!("A Schema message must be succeeded exclusively by a set of RecordBatch messages, flight_message: {:?}", flight_message)}
|
||||
@@ -398,7 +409,7 @@ impl Database {
|
||||
MetadataValue::from_str(db_to_put).context(InvalidTonicMetadataValueSnafu)?,
|
||||
);
|
||||
|
||||
let mut client = self.client.make_flight_client()?;
|
||||
let mut client = self.client.make_flight_client(false, false)?;
|
||||
let response = client.mut_inner().do_put(request).await?;
|
||||
let response = response
|
||||
.into_inner()
|
||||
|
||||
@@ -117,6 +117,13 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to convert Schema"))]
|
||||
ConvertSchema {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -137,6 +144,7 @@ impl ErrorExt for Error {
|
||||
| Error::CreateTlsChannel { source, .. } => source.status_code(),
|
||||
Error::IllegalGrpcClientState { .. } => StatusCode::Unexpected,
|
||||
Error::InvalidTonicMetadataValue { .. } => StatusCode::InvalidArguments,
|
||||
Error::ConvertSchema { source, .. } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -28,7 +28,7 @@ use common_meta::error::{self as meta_error, Result as MetaResult};
|
||||
use common_meta::node_manager::Datanode;
|
||||
use common_query::request::QueryRequest;
|
||||
use common_recordbatch::error::ExternalSnafu;
|
||||
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
|
||||
use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream};
|
||||
use common_telemetry::error;
|
||||
use common_telemetry::tracing_context::TracingContext;
|
||||
use prost::Message;
|
||||
@@ -46,6 +46,8 @@ use crate::{metrics, Client, Error};
|
||||
#[derive(Debug)]
|
||||
pub struct RegionRequester {
|
||||
client: Client,
|
||||
send_compression: bool,
|
||||
accept_compression: bool,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -55,6 +57,7 @@ impl Datanode for RegionRequester {
|
||||
if err.should_retry() {
|
||||
meta_error::Error::RetryLater {
|
||||
source: BoxedError::new(err),
|
||||
clean_poisons: false,
|
||||
}
|
||||
} else {
|
||||
meta_error::Error::External {
|
||||
@@ -88,12 +91,18 @@ impl Datanode for RegionRequester {
|
||||
}
|
||||
|
||||
impl RegionRequester {
|
||||
pub fn new(client: Client) -> Self {
|
||||
Self { client }
|
||||
pub fn new(client: Client, send_compression: bool, accept_compression: bool) -> Self {
|
||||
Self {
|
||||
client,
|
||||
send_compression,
|
||||
accept_compression,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn do_get_inner(&self, ticket: Ticket) -> Result<SendableRecordBatchStream> {
|
||||
let mut flight_client = self.client.make_flight_client()?;
|
||||
let mut flight_client = self
|
||||
.client
|
||||
.make_flight_client(self.send_compression, self.accept_compression)?;
|
||||
let response = flight_client
|
||||
.mut_inner()
|
||||
.do_get(ticket)
|
||||
@@ -125,7 +134,7 @@ impl RegionRequester {
|
||||
let mut flight_message_stream = flight_data_stream.map(move |flight_data| {
|
||||
flight_data
|
||||
.map_err(Error::from)
|
||||
.and_then(|data| decoder.try_decode(data).context(ConvertFlightDataSnafu))
|
||||
.and_then(|data| decoder.try_decode(&data).context(ConvertFlightDataSnafu))
|
||||
});
|
||||
|
||||
let Some(first_flight_message) = flight_message_stream.next().await else {
|
||||
@@ -146,6 +155,10 @@ impl RegionRequester {
|
||||
|
||||
let tracing_context = TracingContext::from_current_span();
|
||||
|
||||
let schema = Arc::new(
|
||||
datatypes::schema::Schema::try_from(schema).context(error::ConvertSchemaSnafu)?,
|
||||
);
|
||||
let schema_cloned = schema.clone();
|
||||
let stream = Box::pin(stream!({
|
||||
let _span = tracing_context.attach(common_telemetry::tracing::info_span!(
|
||||
"poll_flight_data_stream"
|
||||
@@ -156,7 +169,12 @@ impl RegionRequester {
|
||||
.context(ExternalSnafu)?;
|
||||
|
||||
match flight_message {
|
||||
FlightMessage::Recordbatch(record_batch) => yield Ok(record_batch),
|
||||
FlightMessage::RecordBatch(record_batch) => {
|
||||
yield RecordBatch::try_from_df_record_batch(
|
||||
schema_cloned.clone(),
|
||||
record_batch,
|
||||
)
|
||||
}
|
||||
FlightMessage::Metrics(s) => {
|
||||
let m = serde_json::from_str(&s).ok().map(Arc::new);
|
||||
metrics_ref.swap(m);
|
||||
|
||||
@@ -10,7 +10,13 @@ name = "greptime"
|
||||
path = "src/bin/greptime.rs"
|
||||
|
||||
[features]
|
||||
default = ["servers/pprof", "servers/mem-prof"]
|
||||
default = [
|
||||
"servers/pprof",
|
||||
"servers/mem-prof",
|
||||
"meta-srv/pg_kvbackend",
|
||||
"meta-srv/mysql_kvbackend",
|
||||
]
|
||||
enterprise = ["common-meta/enterprise", "frontend/enterprise", "meta-srv/enterprise"]
|
||||
tokio-console = ["common-telemetry/tokio-console"]
|
||||
|
||||
[lints]
|
||||
@@ -74,6 +80,7 @@ servers.workspace = true
|
||||
session.workspace = true
|
||||
similar-asserts.workspace = true
|
||||
snafu.workspace = true
|
||||
stat.workspace = true
|
||||
store-api.workspace = true
|
||||
substrait.workspace = true
|
||||
table.workspace = true
|
||||
|
||||
@@ -76,6 +76,7 @@ impl Command {
|
||||
&opts,
|
||||
&TracingOptions::default(),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
|
||||
let tool = self.cmd.build().await.context(error::BuildCliSnafu)?;
|
||||
@@ -145,6 +146,7 @@ mod tests {
|
||||
let output_dir = tempfile::tempdir().unwrap();
|
||||
let cli = cli::Command::parse_from([
|
||||
"cli",
|
||||
"data",
|
||||
"export",
|
||||
"--addr",
|
||||
"127.0.0.1:4000",
|
||||
|
||||
@@ -14,12 +14,13 @@
|
||||
|
||||
pub mod builder;
|
||||
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use clap::Parser;
|
||||
use common_config::Configurable;
|
||||
use common_telemetry::logging::TracingOptions;
|
||||
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
|
||||
use common_telemetry::{info, warn};
|
||||
use common_wal::config::DatanodeWalConfig;
|
||||
use datanode::datanode::Datanode;
|
||||
@@ -156,6 +157,7 @@ impl StartCommand {
|
||||
.context(LoadLayeredConfigSnafu)?;
|
||||
|
||||
self.merge_with_cli_options(global_options, &mut opts)?;
|
||||
opts.component.sanitize();
|
||||
|
||||
Ok(opts)
|
||||
}
|
||||
@@ -247,6 +249,14 @@ impl StartCommand {
|
||||
raft_engine_config.dir.replace(wal_dir.clone());
|
||||
}
|
||||
|
||||
// If the logging dir is not set, use the default logs dir in the data home.
|
||||
if opts.logging.dir.is_empty() {
|
||||
opts.logging.dir = Path::new(&opts.storage.data_home)
|
||||
.join(DEFAULT_LOGGING_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string();
|
||||
}
|
||||
|
||||
if let Some(http_addr) = &self.http_addr {
|
||||
opts.http.addr.clone_from(http_addr);
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ use tracing_appender::non_blocking::WorkerGuard;
|
||||
|
||||
use crate::datanode::{DatanodeOptions, Instance, APP_NAME};
|
||||
use crate::error::{MetaClientInitSnafu, MissingConfigSnafu, Result, StartDatanodeSnafu};
|
||||
use crate::log_versions;
|
||||
use crate::{create_resource_limit_metrics, log_versions};
|
||||
|
||||
/// Builder for Datanode instance.
|
||||
pub struct InstanceBuilder {
|
||||
@@ -64,9 +64,11 @@ impl InstanceBuilder {
|
||||
&dn_opts.logging,
|
||||
&dn_opts.tracing,
|
||||
dn_opts.node_id.map(|x| x.to_string()),
|
||||
None,
|
||||
);
|
||||
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
create_resource_limit_metrics(APP_NAME);
|
||||
|
||||
plugins::setup_datanode_plugins(plugins, &opts.plugins, dn_opts)
|
||||
.await
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -21,7 +22,7 @@ use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKv
|
||||
use clap::Parser;
|
||||
use client::client_manager::NodeClients;
|
||||
use common_base::Plugins;
|
||||
use common_config::Configurable;
|
||||
use common_config::{Configurable, DEFAULT_DATA_HOME};
|
||||
use common_grpc::channel_manager::ChannelConfig;
|
||||
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
|
||||
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
|
||||
@@ -30,7 +31,7 @@ use common_meta::heartbeat::handler::HandlerGroupExecutor;
|
||||
use common_meta::key::flow::FlowMetadataManager;
|
||||
use common_meta::key::TableMetadataManager;
|
||||
use common_telemetry::info;
|
||||
use common_telemetry::logging::TracingOptions;
|
||||
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
|
||||
use common_version::{short_version, version};
|
||||
use flow::{
|
||||
get_flow_auth_options, FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder,
|
||||
@@ -45,7 +46,7 @@ use crate::error::{
|
||||
MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
|
||||
};
|
||||
use crate::options::{GlobalOptions, GreptimeOptions};
|
||||
use crate::{log_versions, App};
|
||||
use crate::{create_resource_limit_metrics, log_versions, App};
|
||||
|
||||
pub const APP_NAME: &str = "greptime-flownode";
|
||||
|
||||
@@ -186,6 +187,14 @@ impl StartCommand {
|
||||
opts.logging.dir.clone_from(dir);
|
||||
}
|
||||
|
||||
// If the logging dir is not set, use the default logs dir in the data home.
|
||||
if opts.logging.dir.is_empty() {
|
||||
opts.logging.dir = Path::new(DEFAULT_DATA_HOME)
|
||||
.join(DEFAULT_LOGGING_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string();
|
||||
}
|
||||
|
||||
if global_options.log_level.is_some() {
|
||||
opts.logging.level.clone_from(&global_options.log_level);
|
||||
}
|
||||
@@ -244,8 +253,11 @@ impl StartCommand {
|
||||
&opts.component.logging,
|
||||
&opts.component.tracing,
|
||||
opts.component.node_id.map(|x| x.to_string()),
|
||||
None,
|
||||
);
|
||||
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
create_resource_limit_metrics(APP_NAME);
|
||||
|
||||
info!("Flownode start command: {:#?}", self);
|
||||
info!("Flownode options: {:#?}", opts);
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -22,14 +23,14 @@ use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKv
|
||||
use clap::Parser;
|
||||
use client::client_manager::NodeClients;
|
||||
use common_base::Plugins;
|
||||
use common_config::Configurable;
|
||||
use common_config::{Configurable, DEFAULT_DATA_HOME};
|
||||
use common_grpc::channel_manager::ChannelConfig;
|
||||
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
|
||||
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
|
||||
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
|
||||
use common_meta::heartbeat::handler::HandlerGroupExecutor;
|
||||
use common_telemetry::info;
|
||||
use common_telemetry::logging::TracingOptions;
|
||||
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
|
||||
use common_time::timezone::set_default_timezone;
|
||||
use common_version::{short_version, version};
|
||||
use frontend::frontend::Frontend;
|
||||
@@ -37,7 +38,6 @@ use frontend::heartbeat::HeartbeatTask;
|
||||
use frontend::instance::builder::FrontendBuilder;
|
||||
use frontend::server::Services;
|
||||
use meta_client::{MetaClientOptions, MetaClientType};
|
||||
use query::stats::StatementStatistics;
|
||||
use servers::export_metrics::ExportMetricsTask;
|
||||
use servers::tls::{TlsMode, TlsOption};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
@@ -45,7 +45,7 @@ use tracing_appender::non_blocking::WorkerGuard;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::options::{GlobalOptions, GreptimeOptions};
|
||||
use crate::{log_versions, App};
|
||||
use crate::{create_resource_limit_metrics, log_versions, App};
|
||||
|
||||
type FrontendOptions = GreptimeOptions<frontend::frontend::FrontendOptions>;
|
||||
|
||||
@@ -195,6 +195,14 @@ impl StartCommand {
|
||||
opts.logging.dir.clone_from(dir);
|
||||
}
|
||||
|
||||
// If the logging dir is not set, use the default logs dir in the data home.
|
||||
if opts.logging.dir.is_empty() {
|
||||
opts.logging.dir = Path::new(DEFAULT_DATA_HOME)
|
||||
.join(DEFAULT_LOGGING_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string();
|
||||
}
|
||||
|
||||
if global_options.log_level.is_some() {
|
||||
opts.logging.level.clone_from(&global_options.log_level);
|
||||
}
|
||||
@@ -269,8 +277,11 @@ impl StartCommand {
|
||||
&opts.component.logging,
|
||||
&opts.component.tracing,
|
||||
opts.component.node_id.clone(),
|
||||
opts.component.slow_query.as_ref(),
|
||||
);
|
||||
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
create_resource_limit_metrics(APP_NAME);
|
||||
|
||||
info!("Frontend start command: {:#?}", self);
|
||||
info!("Frontend options: {:#?}", opts);
|
||||
@@ -353,12 +364,16 @@ impl StartCommand {
|
||||
|
||||
// frontend to datanode need not timeout.
|
||||
// Some queries are expected to take long time.
|
||||
let channel_config = ChannelConfig {
|
||||
let mut channel_config = ChannelConfig {
|
||||
timeout: None,
|
||||
tcp_nodelay: opts.datanode.client.tcp_nodelay,
|
||||
connect_timeout: Some(opts.datanode.client.connect_timeout),
|
||||
..Default::default()
|
||||
};
|
||||
if opts.grpc.flight_compression.transport_compression() {
|
||||
channel_config.accept_compression = true;
|
||||
channel_config.send_compression = true;
|
||||
}
|
||||
let client = NodeClients::new(channel_config);
|
||||
|
||||
let instance = FrontendBuilder::new(
|
||||
@@ -368,7 +383,6 @@ impl StartCommand {
|
||||
catalog_manager,
|
||||
Arc::new(client),
|
||||
meta_client,
|
||||
StatementStatistics::new(opts.logging.slow_query.clone()),
|
||||
)
|
||||
.with_plugin(plugins.clone())
|
||||
.with_local_cache_invalidator(layered_cache_registry)
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_telemetry::{error, info};
|
||||
use stat::{get_cpu_limit, get_memory_limit};
|
||||
|
||||
use crate::error::Result;
|
||||
|
||||
@@ -31,6 +32,12 @@ pub mod standalone;
|
||||
lazy_static::lazy_static! {
|
||||
static ref APP_VERSION: prometheus::IntGaugeVec =
|
||||
prometheus::register_int_gauge_vec!("greptime_app_version", "app version", &["version", "short_version", "app"]).unwrap();
|
||||
|
||||
static ref CPU_LIMIT: prometheus::IntGaugeVec =
|
||||
prometheus::register_int_gauge_vec!("greptime_cpu_limit_in_millicores", "cpu limit in millicores", &["app"]).unwrap();
|
||||
|
||||
static ref MEMORY_LIMIT: prometheus::IntGaugeVec =
|
||||
prometheus::register_int_gauge_vec!("greptime_memory_limit_in_bytes", "memory limit in bytes", &["app"]).unwrap();
|
||||
}
|
||||
|
||||
/// wait for the close signal, for unix platform it's SIGINT or SIGTERM
|
||||
@@ -114,6 +121,24 @@ pub fn log_versions(version: &str, short_version: &str, app: &str) {
|
||||
log_env_flags();
|
||||
}
|
||||
|
||||
pub fn create_resource_limit_metrics(app: &str) {
|
||||
if let Some(cpu_limit) = get_cpu_limit() {
|
||||
info!(
|
||||
"GreptimeDB start with cpu limit in millicores: {}",
|
||||
cpu_limit
|
||||
);
|
||||
CPU_LIMIT.with_label_values(&[app]).set(cpu_limit);
|
||||
}
|
||||
|
||||
if let Some(memory_limit) = get_memory_limit() {
|
||||
info!(
|
||||
"GreptimeDB start with memory limit in bytes: {}",
|
||||
memory_limit
|
||||
);
|
||||
MEMORY_LIMIT.with_label_values(&[app]).set(memory_limit);
|
||||
}
|
||||
}
|
||||
|
||||
fn log_env_flags() {
|
||||
info!("command line arguments");
|
||||
for argument in std::env::args() {
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
@@ -20,7 +21,7 @@ use clap::Parser;
|
||||
use common_base::Plugins;
|
||||
use common_config::Configurable;
|
||||
use common_telemetry::info;
|
||||
use common_telemetry::logging::TracingOptions;
|
||||
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
|
||||
use common_version::{short_version, version};
|
||||
use meta_srv::bootstrap::MetasrvInstance;
|
||||
use meta_srv::metasrv::BackendImpl;
|
||||
@@ -29,7 +30,7 @@ use tracing_appender::non_blocking::WorkerGuard;
|
||||
|
||||
use crate::error::{self, LoadLayeredConfigSnafu, Result, StartMetaServerSnafu};
|
||||
use crate::options::{GlobalOptions, GreptimeOptions};
|
||||
use crate::{log_versions, App};
|
||||
use crate::{create_resource_limit_metrics, log_versions, App};
|
||||
|
||||
type MetasrvOptions = GreptimeOptions<meta_srv::metasrv::MetasrvOptions>;
|
||||
|
||||
@@ -236,12 +237,20 @@ impl StartCommand {
|
||||
tokio_console_addr: global_options.tokio_console_addr.clone(),
|
||||
};
|
||||
|
||||
#[allow(deprecated)]
|
||||
if let Some(addr) = &self.rpc_bind_addr {
|
||||
opts.bind_addr.clone_from(addr);
|
||||
opts.grpc.bind_addr.clone_from(addr);
|
||||
} else if !opts.bind_addr.is_empty() {
|
||||
opts.grpc.bind_addr.clone_from(&opts.bind_addr);
|
||||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
if let Some(addr) = &self.rpc_server_addr {
|
||||
opts.server_addr.clone_from(addr);
|
||||
opts.grpc.server_addr.clone_from(addr);
|
||||
} else if !opts.server_addr.is_empty() {
|
||||
opts.grpc.server_addr.clone_from(&opts.server_addr);
|
||||
}
|
||||
|
||||
if let Some(addrs) = &self.store_addrs {
|
||||
@@ -274,6 +283,14 @@ impl StartCommand {
|
||||
opts.data_home.clone_from(data_home);
|
||||
}
|
||||
|
||||
// If the logging dir is not set, use the default logs dir in the data home.
|
||||
if opts.logging.dir.is_empty() {
|
||||
opts.logging.dir = Path::new(&opts.data_home)
|
||||
.join(DEFAULT_LOGGING_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string();
|
||||
}
|
||||
|
||||
if !self.store_key_prefix.is_empty() {
|
||||
opts.store_key_prefix.clone_from(&self.store_key_prefix)
|
||||
}
|
||||
@@ -300,14 +317,17 @@ impl StartCommand {
|
||||
&opts.component.logging,
|
||||
&opts.component.tracing,
|
||||
None,
|
||||
None,
|
||||
);
|
||||
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
create_resource_limit_metrics(APP_NAME);
|
||||
|
||||
info!("Metasrv start command: {:#?}", self);
|
||||
|
||||
let plugin_opts = opts.plugins;
|
||||
let mut opts = opts.component;
|
||||
opts.detect_server_addr();
|
||||
opts.grpc.detect_server_addr();
|
||||
|
||||
info!("Metasrv options: {:#?}", opts);
|
||||
|
||||
@@ -351,7 +371,7 @@ mod tests {
|
||||
};
|
||||
|
||||
let options = cmd.load_options(&Default::default()).unwrap().component;
|
||||
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
|
||||
assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr);
|
||||
assert_eq!(vec!["127.0.0.1:2380".to_string()], options.store_addrs);
|
||||
assert_eq!(SelectorType::LoadBased, options.selector);
|
||||
}
|
||||
@@ -384,8 +404,8 @@ mod tests {
|
||||
};
|
||||
|
||||
let options = cmd.load_options(&Default::default()).unwrap().component;
|
||||
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
|
||||
assert_eq!("127.0.0.1:3002".to_string(), options.server_addr);
|
||||
assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr);
|
||||
assert_eq!("127.0.0.1:3002".to_string(), options.grpc.server_addr);
|
||||
assert_eq!(vec!["127.0.0.1:2379".to_string()], options.store_addrs);
|
||||
assert_eq!(SelectorType::LeaseBased, options.selector);
|
||||
assert_eq!("debug", options.logging.level.as_ref().unwrap());
|
||||
@@ -497,10 +517,10 @@ mod tests {
|
||||
let opts = command.load_options(&Default::default()).unwrap().component;
|
||||
|
||||
// Should be read from env, env > default values.
|
||||
assert_eq!(opts.bind_addr, "127.0.0.1:14002");
|
||||
assert_eq!(opts.grpc.bind_addr, "127.0.0.1:14002");
|
||||
|
||||
// Should be read from config file, config file > env > default values.
|
||||
assert_eq!(opts.server_addr, "127.0.0.1:3002");
|
||||
assert_eq!(opts.grpc.server_addr, "127.0.0.1:3002");
|
||||
|
||||
// Should be read from cli, cli > config file > env > default values.
|
||||
assert_eq!(opts.http.addr, "127.0.0.1:14000");
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::{fs, path};
|
||||
|
||||
@@ -35,6 +36,8 @@ use common_meta::ddl::flow_meta::{FlowMetadataAllocator, FlowMetadataAllocatorRe
|
||||
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
|
||||
use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl, ProcedureExecutorRef};
|
||||
use common_meta::ddl_manager::DdlManager;
|
||||
#[cfg(feature = "enterprise")]
|
||||
use common_meta::ddl_manager::TriggerDdlManagerRef;
|
||||
use common_meta::key::flow::flow_state::FlowStat;
|
||||
use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
|
||||
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
|
||||
@@ -47,7 +50,9 @@ use common_meta::sequence::SequenceBuilder;
|
||||
use common_meta::wal_options_allocator::{build_wal_options_allocator, WalOptionsAllocatorRef};
|
||||
use common_procedure::{ProcedureInfo, ProcedureManagerRef};
|
||||
use common_telemetry::info;
|
||||
use common_telemetry::logging::{LoggingOptions, TracingOptions};
|
||||
use common_telemetry::logging::{
|
||||
LoggingOptions, SlowQueryOptions, TracingOptions, DEFAULT_LOGGING_DIR,
|
||||
};
|
||||
use common_time::timezone::set_default_timezone;
|
||||
use common_version::{short_version, version};
|
||||
use common_wal::config::DatanodeWalConfig;
|
||||
@@ -69,7 +74,7 @@ use frontend::service_config::{
|
||||
};
|
||||
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
|
||||
use mito2::config::MitoConfig;
|
||||
use query::stats::StatementStatistics;
|
||||
use query::options::QueryOptions;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
|
||||
use servers::grpc::GrpcOptions;
|
||||
@@ -81,7 +86,7 @@ use tracing_appender::non_blocking::WorkerGuard;
|
||||
|
||||
use crate::error::{Result, StartFlownodeSnafu};
|
||||
use crate::options::{GlobalOptions, GreptimeOptions};
|
||||
use crate::{error, log_versions, App};
|
||||
use crate::{create_resource_limit_metrics, error, log_versions, App};
|
||||
|
||||
pub const APP_NAME: &str = "greptime-standalone";
|
||||
|
||||
@@ -153,6 +158,8 @@ pub struct StandaloneOptions {
|
||||
pub init_regions_in_background: bool,
|
||||
pub init_regions_parallelism: usize,
|
||||
pub max_in_flight_write_bytes: Option<ReadableSize>,
|
||||
pub slow_query: Option<SlowQueryOptions>,
|
||||
pub query: QueryOptions,
|
||||
}
|
||||
|
||||
impl Default for StandaloneOptions {
|
||||
@@ -184,6 +191,8 @@ impl Default for StandaloneOptions {
|
||||
init_regions_in_background: false,
|
||||
init_regions_parallelism: 16,
|
||||
max_in_flight_write_bytes: None,
|
||||
slow_query: Some(SlowQueryOptions::default()),
|
||||
query: QueryOptions::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -223,6 +232,7 @@ impl StandaloneOptions {
|
||||
// Handle the export metrics task run by standalone to frontend for execution
|
||||
export_metrics: cloned_opts.export_metrics,
|
||||
max_in_flight_write_bytes: cloned_opts.max_in_flight_write_bytes,
|
||||
slow_query: cloned_opts.slow_query,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
@@ -238,6 +248,7 @@ impl StandaloneOptions {
|
||||
grpc: cloned_opts.grpc,
|
||||
init_regions_in_background: cloned_opts.init_regions_in_background,
|
||||
init_regions_parallelism: cloned_opts.init_regions_parallelism,
|
||||
query: cloned_opts.query,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
@@ -399,6 +410,14 @@ impl StartCommand {
|
||||
opts.storage.data_home.clone_from(data_home);
|
||||
}
|
||||
|
||||
// If the logging dir is not set, use the default logs dir in the data home.
|
||||
if opts.logging.dir.is_empty() {
|
||||
opts.logging.dir = Path::new(&opts.storage.data_home)
|
||||
.join(DEFAULT_LOGGING_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string();
|
||||
}
|
||||
|
||||
if let Some(addr) = &self.rpc_bind_addr {
|
||||
// frontend grpc addr conflict with datanode default grpc addr
|
||||
let datanode_grpc_addr = DatanodeOptions::default().grpc.bind_addr;
|
||||
@@ -447,8 +466,11 @@ impl StartCommand {
|
||||
&opts.component.logging,
|
||||
&opts.component.tracing,
|
||||
None,
|
||||
opts.component.slow_query.as_ref(),
|
||||
);
|
||||
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
create_resource_limit_metrics(APP_NAME);
|
||||
|
||||
info!("Standalone start command: {:#?}", self);
|
||||
info!("Standalone options: {opts:#?}");
|
||||
@@ -576,6 +598,8 @@ impl StartCommand {
|
||||
flow_id_sequence,
|
||||
));
|
||||
|
||||
#[cfg(feature = "enterprise")]
|
||||
let trigger_ddl_manager: Option<TriggerDdlManagerRef> = plugins.get();
|
||||
let ddl_task_executor = Self::create_ddl_task_executor(
|
||||
procedure_manager.clone(),
|
||||
node_manager.clone(),
|
||||
@@ -584,6 +608,8 @@ impl StartCommand {
|
||||
table_meta_allocator,
|
||||
flow_metadata_manager,
|
||||
flow_meta_allocator,
|
||||
#[cfg(feature = "enterprise")]
|
||||
trigger_ddl_manager,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -594,7 +620,6 @@ impl StartCommand {
|
||||
catalog_manager.clone(),
|
||||
node_manager.clone(),
|
||||
ddl_task_executor.clone(),
|
||||
StatementStatistics::new(opts.logging.slow_query.clone()),
|
||||
)
|
||||
.with_plugin(plugins.clone())
|
||||
.try_build()
|
||||
@@ -649,6 +674,7 @@ impl StartCommand {
|
||||
})
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn create_ddl_task_executor(
|
||||
procedure_manager: ProcedureManagerRef,
|
||||
node_manager: NodeManagerRef,
|
||||
@@ -657,6 +683,7 @@ impl StartCommand {
|
||||
table_metadata_allocator: TableMetadataAllocatorRef,
|
||||
flow_metadata_manager: FlowMetadataManagerRef,
|
||||
flow_metadata_allocator: FlowMetadataAllocatorRef,
|
||||
#[cfg(feature = "enterprise")] trigger_ddl_manager: Option<TriggerDdlManagerRef>,
|
||||
) -> Result<ProcedureExecutorRef> {
|
||||
let procedure_executor: ProcedureExecutorRef = Arc::new(
|
||||
DdlManager::try_new(
|
||||
@@ -673,6 +700,8 @@ impl StartCommand {
|
||||
},
|
||||
procedure_manager,
|
||||
true,
|
||||
#[cfg(feature = "enterprise")]
|
||||
trigger_ddl_manager,
|
||||
)
|
||||
.context(error::InitDdlManagerSnafu)?,
|
||||
);
|
||||
|
||||
@@ -12,13 +12,14 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
|
||||
use cmd::options::GreptimeOptions;
|
||||
use cmd::standalone::StandaloneOptions;
|
||||
use common_config::Configurable;
|
||||
use common_config::{Configurable, DEFAULT_DATA_HOME};
|
||||
use common_options::datanode::{ClientOptions, DatanodeClientOptions};
|
||||
use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, DEFAULT_OTLP_ENDPOINT};
|
||||
use common_telemetry::logging::{LoggingOptions, DEFAULT_LOGGING_DIR, DEFAULT_OTLP_ENDPOINT};
|
||||
use common_wal::config::raft_engine::RaftEngineConfig;
|
||||
use common_wal::config::DatanodeWalConfig;
|
||||
use datanode::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
|
||||
@@ -32,6 +33,7 @@ use mito2::config::MitoConfig;
|
||||
use servers::export_metrics::ExportMetricsOption;
|
||||
use servers::grpc::GrpcOptions;
|
||||
use servers::http::HttpOptions;
|
||||
use store_api::path_utils::WAL_DIR;
|
||||
|
||||
#[allow(deprecated)]
|
||||
#[test]
|
||||
@@ -56,13 +58,18 @@ fn test_load_datanode_example_config() {
|
||||
metadata_cache_tti: Duration::from_secs(300),
|
||||
}),
|
||||
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
|
||||
dir: Some("./greptimedb_data/wal".to_string()),
|
||||
dir: Some(
|
||||
Path::new(DEFAULT_DATA_HOME)
|
||||
.join(WAL_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string(),
|
||||
),
|
||||
sync_period: Some(Duration::from_secs(10)),
|
||||
recovery_parallelism: 2,
|
||||
..Default::default()
|
||||
}),
|
||||
storage: StorageConfig {
|
||||
data_home: "./greptimedb_data/".to_string(),
|
||||
data_home: DEFAULT_DATA_HOME.to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
region_engine: vec![
|
||||
@@ -79,12 +86,16 @@ fn test_load_datanode_example_config() {
|
||||
],
|
||||
logging: LoggingOptions {
|
||||
level: Some("info".to_string()),
|
||||
dir: Path::new(DEFAULT_DATA_HOME)
|
||||
.join(DEFAULT_LOGGING_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string(),
|
||||
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
|
||||
tracing_sample_ratio: Some(Default::default()),
|
||||
..Default::default()
|
||||
},
|
||||
export_metrics: ExportMetricsOption {
|
||||
self_import: Some(Default::default()),
|
||||
self_import: None,
|
||||
remote_write: Some(Default::default()),
|
||||
..Default::default()
|
||||
},
|
||||
@@ -121,6 +132,10 @@ fn test_load_frontend_example_config() {
|
||||
}),
|
||||
logging: LoggingOptions {
|
||||
level: Some("info".to_string()),
|
||||
dir: Path::new(DEFAULT_DATA_HOME)
|
||||
.join(DEFAULT_LOGGING_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string(),
|
||||
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
|
||||
tracing_sample_ratio: Some(Default::default()),
|
||||
..Default::default()
|
||||
@@ -133,7 +148,7 @@ fn test_load_frontend_example_config() {
|
||||
},
|
||||
},
|
||||
export_metrics: ExportMetricsOption {
|
||||
self_import: Some(Default::default()),
|
||||
self_import: None,
|
||||
remote_write: Some(Default::default()),
|
||||
..Default::default()
|
||||
},
|
||||
@@ -160,18 +175,20 @@ fn test_load_metasrv_example_config() {
|
||||
let expected = GreptimeOptions::<MetasrvOptions> {
|
||||
component: MetasrvOptions {
|
||||
selector: SelectorType::default(),
|
||||
data_home: "./greptimedb_data/metasrv/".to_string(),
|
||||
server_addr: "127.0.0.1:3002".to_string(),
|
||||
data_home: DEFAULT_DATA_HOME.to_string(),
|
||||
grpc: GrpcOptions {
|
||||
bind_addr: "127.0.0.1:3002".to_string(),
|
||||
server_addr: "127.0.0.1:3002".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
logging: LoggingOptions {
|
||||
dir: "./greptimedb_data/logs".to_string(),
|
||||
dir: Path::new(DEFAULT_DATA_HOME)
|
||||
.join(DEFAULT_LOGGING_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string(),
|
||||
level: Some("info".to_string()),
|
||||
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
|
||||
tracing_sample_ratio: Some(Default::default()),
|
||||
slow_query: SlowQueryOptions {
|
||||
enable: false,
|
||||
threshold: None,
|
||||
sample_ratio: None,
|
||||
},
|
||||
..Default::default()
|
||||
},
|
||||
datanode: DatanodeClientOptions {
|
||||
@@ -182,7 +199,7 @@ fn test_load_metasrv_example_config() {
|
||||
},
|
||||
},
|
||||
export_metrics: ExportMetricsOption {
|
||||
self_import: Some(Default::default()),
|
||||
self_import: None,
|
||||
remote_write: Some(Default::default()),
|
||||
..Default::default()
|
||||
},
|
||||
@@ -203,7 +220,12 @@ fn test_load_standalone_example_config() {
|
||||
component: StandaloneOptions {
|
||||
default_timezone: Some("UTC".to_string()),
|
||||
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
|
||||
dir: Some("./greptimedb_data/wal".to_string()),
|
||||
dir: Some(
|
||||
Path::new(DEFAULT_DATA_HOME)
|
||||
.join(WAL_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string(),
|
||||
),
|
||||
sync_period: Some(Duration::from_secs(10)),
|
||||
recovery_parallelism: 2,
|
||||
..Default::default()
|
||||
@@ -221,11 +243,15 @@ fn test_load_standalone_example_config() {
|
||||
}),
|
||||
],
|
||||
storage: StorageConfig {
|
||||
data_home: "./greptimedb_data/".to_string(),
|
||||
data_home: DEFAULT_DATA_HOME.to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
logging: LoggingOptions {
|
||||
level: Some("info".to_string()),
|
||||
dir: Path::new(DEFAULT_DATA_HOME)
|
||||
.join(DEFAULT_LOGGING_DIR)
|
||||
.to_string_lossy()
|
||||
.to_string(),
|
||||
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
|
||||
tracing_sample_ratio: Some(Default::default()),
|
||||
..Default::default()
|
||||
|
||||
@@ -26,6 +26,9 @@ pub fn metadata_store_dir(store_dir: &str) -> String {
|
||||
format!("{store_dir}/metadata")
|
||||
}
|
||||
|
||||
/// The default data home directory.
|
||||
pub const DEFAULT_DATA_HOME: &str = "./greptimedb_data";
|
||||
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(default)]
|
||||
pub struct KvBackendConfig {
|
||||
|
||||
@@ -13,7 +13,9 @@
|
||||
// limitations under the License.
|
||||
|
||||
pub mod fs;
|
||||
pub mod oss;
|
||||
pub mod s3;
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use lazy_static::lazy_static;
|
||||
@@ -25,10 +27,12 @@ use url::{ParseError, Url};
|
||||
use self::fs::build_fs_backend;
|
||||
use self::s3::build_s3_backend;
|
||||
use crate::error::{self, Result};
|
||||
use crate::object_store::oss::build_oss_backend;
|
||||
use crate::util::find_dir_and_filename;
|
||||
|
||||
pub const FS_SCHEMA: &str = "FS";
|
||||
pub const S3_SCHEMA: &str = "S3";
|
||||
pub const OSS_SCHEMA: &str = "OSS";
|
||||
|
||||
/// Returns `(schema, Option<host>, path)`
|
||||
pub fn parse_url(url: &str) -> Result<(String, Option<String>, String)> {
|
||||
@@ -64,6 +68,12 @@ pub fn build_backend(url: &str, connection: &HashMap<String, String>) -> Result<
|
||||
})?;
|
||||
Ok(build_s3_backend(&host, &root, connection)?)
|
||||
}
|
||||
OSS_SCHEMA => {
|
||||
let host = host.context(error::EmptyHostPathSnafu {
|
||||
url: url.to_string(),
|
||||
})?;
|
||||
Ok(build_oss_backend(&host, &root, connection)?)
|
||||
}
|
||||
FS_SCHEMA => Ok(build_fs_backend(&root)?),
|
||||
|
||||
_ => error::UnsupportedBackendProtocolSnafu {
|
||||
|
||||
118
src/common/datasource/src/object_store/oss.rs
Normal file
118
src/common/datasource/src/object_store/oss.rs
Normal file
@@ -0,0 +1,118 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use object_store::services::Oss;
|
||||
use object_store::ObjectStore;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
|
||||
const BUCKET: &str = "bucket";
|
||||
const ENDPOINT: &str = "endpoint";
|
||||
const ACCESS_KEY_ID: &str = "access_key_id";
|
||||
const ACCESS_KEY_SECRET: &str = "access_key_secret";
|
||||
const ROOT: &str = "root";
|
||||
const ALLOW_ANONYMOUS: &str = "allow_anonymous";
|
||||
|
||||
/// Check if the key is supported in OSS configuration.
|
||||
pub fn is_supported_in_oss(key: &str) -> bool {
|
||||
[
|
||||
ROOT,
|
||||
ALLOW_ANONYMOUS,
|
||||
BUCKET,
|
||||
ENDPOINT,
|
||||
ACCESS_KEY_ID,
|
||||
ACCESS_KEY_SECRET,
|
||||
]
|
||||
.contains(&key)
|
||||
}
|
||||
|
||||
/// Build an OSS backend using the provided bucket, root, and connection parameters.
|
||||
pub fn build_oss_backend(
|
||||
bucket: &str,
|
||||
root: &str,
|
||||
connection: &HashMap<String, String>,
|
||||
) -> Result<ObjectStore> {
|
||||
let mut builder = Oss::default().bucket(bucket).root(root);
|
||||
|
||||
if let Some(endpoint) = connection.get(ENDPOINT) {
|
||||
builder = builder.endpoint(endpoint);
|
||||
}
|
||||
|
||||
if let Some(access_key_id) = connection.get(ACCESS_KEY_ID) {
|
||||
builder = builder.access_key_id(access_key_id);
|
||||
}
|
||||
|
||||
if let Some(access_key_secret) = connection.get(ACCESS_KEY_SECRET) {
|
||||
builder = builder.access_key_secret(access_key_secret);
|
||||
}
|
||||
|
||||
if let Some(allow_anonymous) = connection.get(ALLOW_ANONYMOUS) {
|
||||
let allow = allow_anonymous.as_str().parse::<bool>().map_err(|e| {
|
||||
error::InvalidConnectionSnafu {
|
||||
msg: format!(
|
||||
"failed to parse the option {}={}, {}",
|
||||
ALLOW_ANONYMOUS, allow_anonymous, e
|
||||
),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
if allow {
|
||||
builder = builder.allow_anonymous();
|
||||
}
|
||||
}
|
||||
|
||||
let op = ObjectStore::new(builder)
|
||||
.context(error::BuildBackendSnafu)?
|
||||
.layer(object_store::layers::LoggingLayer::default())
|
||||
.layer(object_store::layers::TracingLayer)
|
||||
.layer(object_store::layers::build_prometheus_metrics_layer(true))
|
||||
.finish();
|
||||
|
||||
Ok(op)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_is_supported_in_oss() {
|
||||
assert!(is_supported_in_oss(ROOT));
|
||||
assert!(is_supported_in_oss(ALLOW_ANONYMOUS));
|
||||
assert!(is_supported_in_oss(BUCKET));
|
||||
assert!(is_supported_in_oss(ENDPOINT));
|
||||
assert!(is_supported_in_oss(ACCESS_KEY_ID));
|
||||
assert!(is_supported_in_oss(ACCESS_KEY_SECRET));
|
||||
assert!(!is_supported_in_oss("foo"));
|
||||
assert!(!is_supported_in_oss("BAR"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_oss_backend_all_fields_valid() {
|
||||
let mut connection = HashMap::new();
|
||||
connection.insert(
|
||||
ENDPOINT.to_string(),
|
||||
"http://oss-ap-southeast-1.aliyuncs.com".to_string(),
|
||||
);
|
||||
connection.insert(ACCESS_KEY_ID.to_string(), "key_id".to_string());
|
||||
connection.insert(ACCESS_KEY_SECRET.to_string(), "key_secret".to_string());
|
||||
connection.insert(ALLOW_ANONYMOUS.to_string(), "true".to_string());
|
||||
|
||||
let result = build_oss_backend("my-bucket", "my-root", &connection);
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
}
|
||||
18
src/common/function/src/aggrs.rs
Normal file
18
src/common/function/src/aggrs.rs
Normal file
@@ -0,0 +1,18 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub mod approximate;
|
||||
#[cfg(feature = "geo")]
|
||||
pub mod geo;
|
||||
pub mod vector;
|
||||
32
src/common/function/src/aggrs/approximate.rs
Normal file
32
src/common/function/src/aggrs/approximate.rs
Normal file
@@ -0,0 +1,32 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
|
||||
pub(crate) mod hll;
|
||||
mod uddsketch;
|
||||
|
||||
pub(crate) struct ApproximateFunction;
|
||||
|
||||
impl ApproximateFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
// uddsketch
|
||||
registry.register_aggr(uddsketch::UddSketchState::state_udf_impl());
|
||||
registry.register_aggr(uddsketch::UddSketchState::merge_udf_impl());
|
||||
|
||||
// hll
|
||||
registry.register_aggr(hll::HllState::state_udf_impl());
|
||||
registry.register_aggr(hll::HllState::merge_udf_impl());
|
||||
}
|
||||
}
|
||||
@@ -163,7 +163,7 @@ impl DfAccumulator for UddSketchState {
|
||||
}
|
||||
}
|
||||
// meaning instantiate as `uddsketch_merge`
|
||||
DataType::Binary => self.merge_batch(&[array.clone()])?,
|
||||
DataType::Binary => self.merge_batch(std::slice::from_ref(array))?,
|
||||
_ => {
|
||||
return not_impl_err!(
|
||||
"UDDSketch functions do not support data type: {}",
|
||||
27
src/common/function/src/aggrs/geo.rs
Normal file
27
src/common/function/src/aggrs/geo.rs
Normal file
@@ -0,0 +1,27 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
|
||||
mod encoding;
|
||||
mod geo_path;
|
||||
|
||||
pub(crate) struct GeoFunction;
|
||||
|
||||
impl GeoFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register_aggr(geo_path::GeoPathAccumulator::uadf_impl());
|
||||
registry.register_aggr(encoding::JsonPathAccumulator::uadf_impl());
|
||||
}
|
||||
}
|
||||
@@ -19,9 +19,12 @@ use common_error::status_code::StatusCode;
|
||||
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
|
||||
use common_query::error::{self, InvalidInputStateSnafu, Result};
|
||||
use common_query::logical_plan::accumulator::AggrFuncTypeStore;
|
||||
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
|
||||
use common_query::logical_plan::{
|
||||
create_aggregate_function, Accumulator, AggregateFunctionCreator,
|
||||
};
|
||||
use common_query::prelude::AccumulatorCreatorFunction;
|
||||
use common_time::Timestamp;
|
||||
use datafusion_expr::AggregateUDF;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::value::{ListValue, Value};
|
||||
use datatypes::vectors::VectorRef;
|
||||
@@ -47,6 +50,16 @@ impl JsonPathAccumulator {
|
||||
timestamp_type,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new `AggregateUDF` for the `json_encode_path` aggregate function.
|
||||
pub fn uadf_impl() -> AggregateUDF {
|
||||
create_aggregate_function(
|
||||
"json_encode_path".to_string(),
|
||||
3,
|
||||
Arc::new(JsonPathEncodeFunctionCreator::default()),
|
||||
)
|
||||
.into()
|
||||
}
|
||||
}
|
||||
|
||||
impl Accumulator for JsonPathAccumulator {
|
||||
@@ -47,7 +47,7 @@ impl GeoPathAccumulator {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
pub fn udf_impl() -> AggregateUDF {
|
||||
pub fn uadf_impl() -> AggregateUDF {
|
||||
create_udaf(
|
||||
GEO_PATH_NAME,
|
||||
// Input types: lat, lng, timestamp
|
||||
29
src/common/function/src/aggrs/vector.rs
Normal file
29
src/common/function/src/aggrs/vector.rs
Normal file
@@ -0,0 +1,29 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::aggrs::vector::product::VectorProduct;
|
||||
use crate::aggrs::vector::sum::VectorSum;
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
|
||||
mod product;
|
||||
mod sum;
|
||||
|
||||
pub(crate) struct VectorFunction;
|
||||
|
||||
impl VectorFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register_aggr(VectorSum::uadf_impl());
|
||||
registry.register_aggr(VectorProduct::uadf_impl());
|
||||
}
|
||||
}
|
||||
@@ -16,8 +16,11 @@ use std::sync::Arc;
|
||||
|
||||
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
|
||||
use common_query::error::{CreateAccumulatorSnafu, Error, InvalidFuncArgsSnafu};
|
||||
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
|
||||
use common_query::logical_plan::{
|
||||
create_aggregate_function, Accumulator, AggregateFunctionCreator,
|
||||
};
|
||||
use common_query::prelude::AccumulatorCreatorFunction;
|
||||
use datafusion_expr::AggregateUDF;
|
||||
use datatypes::prelude::{ConcreteDataType, Value, *};
|
||||
use datatypes::vectors::VectorRef;
|
||||
use nalgebra::{Const, DVectorView, Dyn, OVector};
|
||||
@@ -75,6 +78,16 @@ impl AggregateFunctionCreator for VectorProductCreator {
|
||||
}
|
||||
|
||||
impl VectorProduct {
|
||||
/// Create a new `AggregateUDF` for the `vec_product` aggregate function.
|
||||
pub fn uadf_impl() -> AggregateUDF {
|
||||
create_aggregate_function(
|
||||
"vec_product".to_string(),
|
||||
1,
|
||||
Arc::new(VectorProductCreator::default()),
|
||||
)
|
||||
.into()
|
||||
}
|
||||
|
||||
fn inner(&mut self, len: usize) -> &mut OVector<f32, Dyn> {
|
||||
self.product.get_or_insert_with(|| {
|
||||
OVector::from_iterator_generic(Dyn(len), Const::<1>, (0..len).map(|_| 1.0))
|
||||
@@ -16,8 +16,11 @@ use std::sync::Arc;
|
||||
|
||||
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
|
||||
use common_query::error::{CreateAccumulatorSnafu, Error, InvalidFuncArgsSnafu};
|
||||
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
|
||||
use common_query::logical_plan::{
|
||||
create_aggregate_function, Accumulator, AggregateFunctionCreator,
|
||||
};
|
||||
use common_query::prelude::AccumulatorCreatorFunction;
|
||||
use datafusion_expr::AggregateUDF;
|
||||
use datatypes::prelude::{ConcreteDataType, Value, *};
|
||||
use datatypes::vectors::VectorRef;
|
||||
use nalgebra::{Const, DVectorView, Dyn, OVector};
|
||||
@@ -25,6 +28,7 @@ use snafu::ensure;
|
||||
|
||||
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const, veclit_to_binlit};
|
||||
|
||||
/// The accumulator for the `vec_sum` aggregate function.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct VectorSum {
|
||||
sum: Option<OVector<f32, Dyn>>,
|
||||
@@ -74,6 +78,16 @@ impl AggregateFunctionCreator for VectorSumCreator {
|
||||
}
|
||||
|
||||
impl VectorSum {
|
||||
/// Create a new `AggregateUDF` for the `vec_sum` aggregate function.
|
||||
pub fn uadf_impl() -> AggregateUDF {
|
||||
create_aggregate_function(
|
||||
"vec_sum".to_string(),
|
||||
1,
|
||||
Arc::new(VectorSumCreator::default()),
|
||||
)
|
||||
.into()
|
||||
}
|
||||
|
||||
fn inner(&mut self, len: usize) -> &mut OVector<f32, Dyn> {
|
||||
self.sum
|
||||
.get_or_insert_with(|| OVector::zeros_generic(Dyn(len), Const::<1>))
|
||||
63
src/common/function/src/function_factory.rs
Normal file
63
src/common/function/src/function_factory.rs
Normal file
@@ -0,0 +1,63 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use datafusion_expr::ScalarUDF;
|
||||
|
||||
use crate::function::{FunctionContext, FunctionRef};
|
||||
use crate::scalars::udf::create_udf;
|
||||
|
||||
/// A factory for creating `ScalarUDF` that require a function context.
|
||||
#[derive(Clone)]
|
||||
pub struct ScalarFunctionFactory {
|
||||
name: String,
|
||||
factory: Arc<dyn Fn(FunctionContext) -> ScalarUDF + Send + Sync>,
|
||||
}
|
||||
|
||||
impl ScalarFunctionFactory {
|
||||
/// Returns the name of the function.
|
||||
pub fn name(&self) -> &str {
|
||||
&self.name
|
||||
}
|
||||
|
||||
/// Returns a `ScalarUDF` when given a function context.
|
||||
pub fn provide(&self, ctx: FunctionContext) -> ScalarUDF {
|
||||
(self.factory)(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ScalarUDF> for ScalarFunctionFactory {
|
||||
fn from(df_udf: ScalarUDF) -> Self {
|
||||
let name = df_udf.name().to_string();
|
||||
let func = Arc::new(move |_ctx| df_udf.clone());
|
||||
Self {
|
||||
name,
|
||||
factory: func,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<FunctionRef> for ScalarFunctionFactory {
|
||||
fn from(func: FunctionRef) -> Self {
|
||||
let name = func.name().to_string();
|
||||
let func = Arc::new(move |ctx: FunctionContext| {
|
||||
create_udf(func.clone(), ctx.query_ctx, ctx.state)
|
||||
});
|
||||
Self {
|
||||
name,
|
||||
factory: func,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -16,11 +16,14 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use datafusion_expr::AggregateUDF;
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
use crate::admin::AdminFunction;
|
||||
use crate::function::{AsyncFunctionRef, FunctionRef};
|
||||
use crate::scalars::aggregate::{AggregateFunctionMetaRef, AggregateFunctions};
|
||||
use crate::aggrs::approximate::ApproximateFunction;
|
||||
use crate::aggrs::vector::VectorFunction as VectorAggrFunction;
|
||||
use crate::function::{AsyncFunctionRef, Function, FunctionRef};
|
||||
use crate::function_factory::ScalarFunctionFactory;
|
||||
use crate::scalars::date::DateFunction;
|
||||
use crate::scalars::expression::ExpressionFunction;
|
||||
use crate::scalars::hll_count::HllCalcFunction;
|
||||
@@ -31,18 +34,19 @@ use crate::scalars::matches_term::MatchesTermFunction;
|
||||
use crate::scalars::math::MathFunction;
|
||||
use crate::scalars::timestamp::TimestampFunction;
|
||||
use crate::scalars::uddsketch_calc::UddSketchCalcFunction;
|
||||
use crate::scalars::vector::VectorFunction;
|
||||
use crate::scalars::vector::VectorFunction as VectorScalarFunction;
|
||||
use crate::system::SystemFunction;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct FunctionRegistry {
|
||||
functions: RwLock<HashMap<String, FunctionRef>>,
|
||||
functions: RwLock<HashMap<String, ScalarFunctionFactory>>,
|
||||
async_functions: RwLock<HashMap<String, AsyncFunctionRef>>,
|
||||
aggregate_functions: RwLock<HashMap<String, AggregateFunctionMetaRef>>,
|
||||
aggregate_functions: RwLock<HashMap<String, AggregateUDF>>,
|
||||
}
|
||||
|
||||
impl FunctionRegistry {
|
||||
pub fn register(&self, func: FunctionRef) {
|
||||
pub fn register(&self, func: impl Into<ScalarFunctionFactory>) {
|
||||
let func = func.into();
|
||||
let _ = self
|
||||
.functions
|
||||
.write()
|
||||
@@ -50,6 +54,10 @@ impl FunctionRegistry {
|
||||
.insert(func.name().to_string(), func);
|
||||
}
|
||||
|
||||
pub fn register_scalar(&self, func: impl Function + 'static) {
|
||||
self.register(Arc::new(func) as FunctionRef);
|
||||
}
|
||||
|
||||
pub fn register_async(&self, func: AsyncFunctionRef) {
|
||||
let _ = self
|
||||
.async_functions
|
||||
@@ -58,6 +66,14 @@ impl FunctionRegistry {
|
||||
.insert(func.name().to_string(), func);
|
||||
}
|
||||
|
||||
pub fn register_aggr(&self, func: AggregateUDF) {
|
||||
let _ = self
|
||||
.aggregate_functions
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert(func.name().to_string(), func);
|
||||
}
|
||||
|
||||
pub fn get_async_function(&self, name: &str) -> Option<AsyncFunctionRef> {
|
||||
self.async_functions.read().unwrap().get(name).cloned()
|
||||
}
|
||||
@@ -71,27 +87,16 @@ impl FunctionRegistry {
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn register_aggregate_function(&self, func: AggregateFunctionMetaRef) {
|
||||
let _ = self
|
||||
.aggregate_functions
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert(func.name(), func);
|
||||
}
|
||||
|
||||
pub fn get_aggr_function(&self, name: &str) -> Option<AggregateFunctionMetaRef> {
|
||||
self.aggregate_functions.read().unwrap().get(name).cloned()
|
||||
}
|
||||
|
||||
pub fn get_function(&self, name: &str) -> Option<FunctionRef> {
|
||||
#[cfg(test)]
|
||||
pub fn get_function(&self, name: &str) -> Option<ScalarFunctionFactory> {
|
||||
self.functions.read().unwrap().get(name).cloned()
|
||||
}
|
||||
|
||||
pub fn functions(&self) -> Vec<FunctionRef> {
|
||||
pub fn scalar_functions(&self) -> Vec<ScalarFunctionFactory> {
|
||||
self.functions.read().unwrap().values().cloned().collect()
|
||||
}
|
||||
|
||||
pub fn aggregate_functions(&self) -> Vec<AggregateFunctionMetaRef> {
|
||||
pub fn aggregate_functions(&self) -> Vec<AggregateUDF> {
|
||||
self.aggregate_functions
|
||||
.read()
|
||||
.unwrap()
|
||||
@@ -112,9 +117,6 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
|
||||
UddSketchCalcFunction::register(&function_registry);
|
||||
HllCalcFunction::register(&function_registry);
|
||||
|
||||
// Aggregate functions
|
||||
AggregateFunctions::register(&function_registry);
|
||||
|
||||
// Full text search function
|
||||
MatchesFunction::register(&function_registry);
|
||||
MatchesTermFunction::register(&function_registry);
|
||||
@@ -127,15 +129,21 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
|
||||
JsonFunction::register(&function_registry);
|
||||
|
||||
// Vector related functions
|
||||
VectorFunction::register(&function_registry);
|
||||
VectorScalarFunction::register(&function_registry);
|
||||
VectorAggrFunction::register(&function_registry);
|
||||
|
||||
// Geo functions
|
||||
#[cfg(feature = "geo")]
|
||||
crate::scalars::geo::GeoFunctions::register(&function_registry);
|
||||
#[cfg(feature = "geo")]
|
||||
crate::aggrs::geo::GeoFunction::register(&function_registry);
|
||||
|
||||
// Ip functions
|
||||
IpFunctions::register(&function_registry);
|
||||
|
||||
// Approximate functions
|
||||
ApproximateFunction::register(&function_registry);
|
||||
|
||||
Arc::new(function_registry)
|
||||
});
|
||||
|
||||
@@ -147,12 +155,11 @@ mod tests {
|
||||
#[test]
|
||||
fn test_function_registry() {
|
||||
let registry = FunctionRegistry::default();
|
||||
let func = Arc::new(TestAndFunction);
|
||||
|
||||
assert!(registry.get_function("test_and").is_none());
|
||||
assert!(registry.functions().is_empty());
|
||||
registry.register(func);
|
||||
assert!(registry.scalar_functions().is_empty());
|
||||
registry.register_scalar(TestAndFunction);
|
||||
let _ = registry.get_function("test_and").unwrap();
|
||||
assert_eq!(1, registry.functions().len());
|
||||
assert_eq!(1, registry.scalar_functions().len());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,13 +18,14 @@
|
||||
mod admin;
|
||||
mod flush_flow;
|
||||
mod macros;
|
||||
pub mod scalars;
|
||||
mod system;
|
||||
|
||||
pub mod aggr;
|
||||
pub mod aggrs;
|
||||
pub mod function;
|
||||
pub mod function_factory;
|
||||
pub mod function_registry;
|
||||
pub mod handlers;
|
||||
pub mod helper;
|
||||
pub mod scalars;
|
||||
pub mod state;
|
||||
pub mod utils;
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub mod aggregate;
|
||||
pub(crate) mod date;
|
||||
pub mod expression;
|
||||
#[cfg(feature = "geo")]
|
||||
|
||||
@@ -1,89 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! # Deprecate Warning:
|
||||
//!
|
||||
//! This module is deprecated and will be removed in the future.
|
||||
//! All UDAF implementation here are not maintained and should
|
||||
//! not be used before they are refactored into the `src/aggr`
|
||||
//! version.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_query::logical_plan::AggregateFunctionCreatorRef;
|
||||
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
use crate::scalars::vector::product::VectorProductCreator;
|
||||
use crate::scalars::vector::sum::VectorSumCreator;
|
||||
|
||||
/// A function creates `AggregateFunctionCreator`.
|
||||
/// "Aggregator" *is* AggregatorFunction. Since the later one is long, we named an short alias for it.
|
||||
/// The two names might be used interchangeably.
|
||||
type AggregatorCreatorFunction = Arc<dyn Fn() -> AggregateFunctionCreatorRef + Send + Sync>;
|
||||
|
||||
/// `AggregateFunctionMeta` dynamically creates AggregateFunctionCreator.
|
||||
#[derive(Clone)]
|
||||
pub struct AggregateFunctionMeta {
|
||||
name: String,
|
||||
args_count: u8,
|
||||
creator: AggregatorCreatorFunction,
|
||||
}
|
||||
|
||||
pub type AggregateFunctionMetaRef = Arc<AggregateFunctionMeta>;
|
||||
|
||||
impl AggregateFunctionMeta {
|
||||
pub fn new(name: &str, args_count: u8, creator: AggregatorCreatorFunction) -> Self {
|
||||
Self {
|
||||
name: name.to_string(),
|
||||
args_count,
|
||||
creator,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn name(&self) -> String {
|
||||
self.name.to_string()
|
||||
}
|
||||
|
||||
pub fn args_count(&self) -> u8 {
|
||||
self.args_count
|
||||
}
|
||||
|
||||
pub fn create(&self) -> AggregateFunctionCreatorRef {
|
||||
(self.creator)()
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct AggregateFunctions;
|
||||
|
||||
impl AggregateFunctions {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register_aggregate_function(Arc::new(AggregateFunctionMeta::new(
|
||||
"vec_sum",
|
||||
1,
|
||||
Arc::new(|| Arc::new(VectorSumCreator::default())),
|
||||
)));
|
||||
registry.register_aggregate_function(Arc::new(AggregateFunctionMeta::new(
|
||||
"vec_product",
|
||||
1,
|
||||
Arc::new(|| Arc::new(VectorProductCreator::default())),
|
||||
)));
|
||||
|
||||
#[cfg(feature = "geo")]
|
||||
registry.register_aggregate_function(Arc::new(AggregateFunctionMeta::new(
|
||||
"json_encode_path",
|
||||
3,
|
||||
Arc::new(|| Arc::new(super::geo::encoding::JsonPathEncodeFunctionCreator::default())),
|
||||
)));
|
||||
}
|
||||
}
|
||||
@@ -12,7 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
mod date_add;
|
||||
mod date_format;
|
||||
mod date_sub;
|
||||
@@ -27,8 +26,8 @@ pub(crate) struct DateFunction;
|
||||
|
||||
impl DateFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register(Arc::new(DateAddFunction));
|
||||
registry.register(Arc::new(DateSubFunction));
|
||||
registry.register(Arc::new(DateFormatFunction));
|
||||
registry.register_scalar(DateAddFunction);
|
||||
registry.register_scalar(DateSubFunction);
|
||||
registry.register_scalar(DateFormatFunction);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,8 +17,6 @@ mod ctx;
|
||||
mod is_null;
|
||||
mod unary;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
pub use binary::scalar_binary_op;
|
||||
pub use ctx::EvalContext;
|
||||
pub use unary::scalar_unary_op;
|
||||
@@ -30,6 +28,6 @@ pub(crate) struct ExpressionFunction;
|
||||
|
||||
impl ExpressionFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register(Arc::new(IsNullFunction));
|
||||
registry.register_scalar(IsNullFunction);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,11 +12,9 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
pub(crate) mod encoding;
|
||||
mod geohash;
|
||||
mod h3;
|
||||
mod helpers;
|
||||
pub(crate) mod helpers;
|
||||
mod measure;
|
||||
mod relation;
|
||||
mod s2;
|
||||
@@ -29,57 +27,57 @@ pub(crate) struct GeoFunctions;
|
||||
impl GeoFunctions {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
// geohash
|
||||
registry.register(Arc::new(geohash::GeohashFunction));
|
||||
registry.register(Arc::new(geohash::GeohashNeighboursFunction));
|
||||
registry.register_scalar(geohash::GeohashFunction);
|
||||
registry.register_scalar(geohash::GeohashNeighboursFunction);
|
||||
|
||||
// h3 index
|
||||
registry.register(Arc::new(h3::H3LatLngToCell));
|
||||
registry.register(Arc::new(h3::H3LatLngToCellString));
|
||||
registry.register_scalar(h3::H3LatLngToCell);
|
||||
registry.register_scalar(h3::H3LatLngToCellString);
|
||||
|
||||
// h3 index inspection
|
||||
registry.register(Arc::new(h3::H3CellBase));
|
||||
registry.register(Arc::new(h3::H3CellIsPentagon));
|
||||
registry.register(Arc::new(h3::H3StringToCell));
|
||||
registry.register(Arc::new(h3::H3CellToString));
|
||||
registry.register(Arc::new(h3::H3CellCenterLatLng));
|
||||
registry.register(Arc::new(h3::H3CellResolution));
|
||||
registry.register_scalar(h3::H3CellBase);
|
||||
registry.register_scalar(h3::H3CellIsPentagon);
|
||||
registry.register_scalar(h3::H3StringToCell);
|
||||
registry.register_scalar(h3::H3CellToString);
|
||||
registry.register_scalar(h3::H3CellCenterLatLng);
|
||||
registry.register_scalar(h3::H3CellResolution);
|
||||
|
||||
// h3 hierarchical grid
|
||||
registry.register(Arc::new(h3::H3CellCenterChild));
|
||||
registry.register(Arc::new(h3::H3CellParent));
|
||||
registry.register(Arc::new(h3::H3CellToChildren));
|
||||
registry.register(Arc::new(h3::H3CellToChildrenSize));
|
||||
registry.register(Arc::new(h3::H3CellToChildPos));
|
||||
registry.register(Arc::new(h3::H3ChildPosToCell));
|
||||
registry.register(Arc::new(h3::H3CellContains));
|
||||
registry.register_scalar(h3::H3CellCenterChild);
|
||||
registry.register_scalar(h3::H3CellParent);
|
||||
registry.register_scalar(h3::H3CellToChildren);
|
||||
registry.register_scalar(h3::H3CellToChildrenSize);
|
||||
registry.register_scalar(h3::H3CellToChildPos);
|
||||
registry.register_scalar(h3::H3ChildPosToCell);
|
||||
registry.register_scalar(h3::H3CellContains);
|
||||
|
||||
// h3 grid traversal
|
||||
registry.register(Arc::new(h3::H3GridDisk));
|
||||
registry.register(Arc::new(h3::H3GridDiskDistances));
|
||||
registry.register(Arc::new(h3::H3GridDistance));
|
||||
registry.register(Arc::new(h3::H3GridPathCells));
|
||||
registry.register_scalar(h3::H3GridDisk);
|
||||
registry.register_scalar(h3::H3GridDiskDistances);
|
||||
registry.register_scalar(h3::H3GridDistance);
|
||||
registry.register_scalar(h3::H3GridPathCells);
|
||||
|
||||
// h3 measurement
|
||||
registry.register(Arc::new(h3::H3CellDistanceSphereKm));
|
||||
registry.register(Arc::new(h3::H3CellDistanceEuclideanDegree));
|
||||
registry.register_scalar(h3::H3CellDistanceSphereKm);
|
||||
registry.register_scalar(h3::H3CellDistanceEuclideanDegree);
|
||||
|
||||
// s2
|
||||
registry.register(Arc::new(s2::S2LatLngToCell));
|
||||
registry.register(Arc::new(s2::S2CellLevel));
|
||||
registry.register(Arc::new(s2::S2CellToToken));
|
||||
registry.register(Arc::new(s2::S2CellParent));
|
||||
registry.register_scalar(s2::S2LatLngToCell);
|
||||
registry.register_scalar(s2::S2CellLevel);
|
||||
registry.register_scalar(s2::S2CellToToken);
|
||||
registry.register_scalar(s2::S2CellParent);
|
||||
|
||||
// spatial data type
|
||||
registry.register(Arc::new(wkt::LatLngToPointWkt));
|
||||
registry.register_scalar(wkt::LatLngToPointWkt);
|
||||
|
||||
// spatial relation
|
||||
registry.register(Arc::new(relation::STContains));
|
||||
registry.register(Arc::new(relation::STWithin));
|
||||
registry.register(Arc::new(relation::STIntersects));
|
||||
registry.register_scalar(relation::STContains);
|
||||
registry.register_scalar(relation::STWithin);
|
||||
registry.register_scalar(relation::STIntersects);
|
||||
|
||||
// spatial measure
|
||||
registry.register(Arc::new(measure::STDistance));
|
||||
registry.register(Arc::new(measure::STDistanceSphere));
|
||||
registry.register(Arc::new(measure::STArea));
|
||||
registry.register_scalar(measure::STDistance);
|
||||
registry.register_scalar(measure::STDistanceSphere);
|
||||
registry.register_scalar(measure::STArea);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -37,7 +37,7 @@ macro_rules! ensure_columns_len {
|
||||
};
|
||||
}
|
||||
|
||||
pub(super) use ensure_columns_len;
|
||||
pub(crate) use ensure_columns_len;
|
||||
|
||||
macro_rules! ensure_columns_n {
|
||||
($columns:ident, $n:literal) => {
|
||||
@@ -58,7 +58,7 @@ macro_rules! ensure_columns_n {
|
||||
};
|
||||
}
|
||||
|
||||
pub(super) use ensure_columns_n;
|
||||
pub(crate) use ensure_columns_n;
|
||||
|
||||
macro_rules! ensure_and_coerce {
|
||||
($compare:expr, $coerce:expr) => {{
|
||||
@@ -72,4 +72,4 @@ macro_rules! ensure_and_coerce {
|
||||
}};
|
||||
}
|
||||
|
||||
pub(super) use ensure_and_coerce;
|
||||
pub(crate) use ensure_and_coerce;
|
||||
|
||||
@@ -16,7 +16,6 @@
|
||||
|
||||
use std::fmt;
|
||||
use std::fmt::Display;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_query::error::{DowncastVectorSnafu, InvalidFuncArgsSnafu, Result};
|
||||
use common_query::prelude::{Signature, Volatility};
|
||||
@@ -27,7 +26,7 @@ use datatypes::vectors::{BinaryVector, MutableVector, UInt64VectorBuilder, Vecto
|
||||
use hyperloglogplus::HyperLogLog;
|
||||
use snafu::OptionExt;
|
||||
|
||||
use crate::aggr::HllStateType;
|
||||
use crate::aggrs::approximate::hll::HllStateType;
|
||||
use crate::function::{Function, FunctionContext};
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
|
||||
@@ -44,7 +43,7 @@ pub struct HllCalcFunction;
|
||||
|
||||
impl HllCalcFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register(Arc::new(HllCalcFunction));
|
||||
registry.register_scalar(HllCalcFunction);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -117,6 +116,8 @@ impl Function for HllCalcFunction {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use datatypes::vectors::BinaryVector;
|
||||
|
||||
use super::*;
|
||||
|
||||
@@ -17,8 +17,6 @@ mod ipv4;
|
||||
mod ipv6;
|
||||
mod range;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use cidr::{Ipv4ToCidr, Ipv6ToCidr};
|
||||
use ipv4::{Ipv4NumToString, Ipv4StringToNum};
|
||||
use ipv6::{Ipv6NumToString, Ipv6StringToNum};
|
||||
@@ -31,15 +29,15 @@ pub(crate) struct IpFunctions;
|
||||
impl IpFunctions {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
// Register IPv4 functions
|
||||
registry.register(Arc::new(Ipv4NumToString));
|
||||
registry.register(Arc::new(Ipv4StringToNum));
|
||||
registry.register(Arc::new(Ipv4ToCidr));
|
||||
registry.register(Arc::new(Ipv4InRange));
|
||||
registry.register_scalar(Ipv4NumToString);
|
||||
registry.register_scalar(Ipv4StringToNum);
|
||||
registry.register_scalar(Ipv4ToCidr);
|
||||
registry.register_scalar(Ipv4InRange);
|
||||
|
||||
// Register IPv6 functions
|
||||
registry.register(Arc::new(Ipv6NumToString));
|
||||
registry.register(Arc::new(Ipv6StringToNum));
|
||||
registry.register(Arc::new(Ipv6ToCidr));
|
||||
registry.register(Arc::new(Ipv6InRange));
|
||||
registry.register_scalar(Ipv6NumToString);
|
||||
registry.register_scalar(Ipv6StringToNum);
|
||||
registry.register_scalar(Ipv6ToCidr);
|
||||
registry.register_scalar(Ipv6InRange);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -468,8 +468,8 @@ mod tests {
|
||||
let empty_values = vec![""];
|
||||
let empty_input = Arc::new(StringVector::from_slice(&empty_values)) as VectorRef;
|
||||
|
||||
let ipv4_result = ipv4_func.eval(&ctx, &[empty_input.clone()]);
|
||||
let ipv6_result = ipv6_func.eval(&ctx, &[empty_input.clone()]);
|
||||
let ipv4_result = ipv4_func.eval(&ctx, std::slice::from_ref(&empty_input));
|
||||
let ipv6_result = ipv6_func.eval(&ctx, std::slice::from_ref(&empty_input));
|
||||
|
||||
assert!(ipv4_result.is_err());
|
||||
assert!(ipv6_result.is_err());
|
||||
@@ -478,7 +478,7 @@ mod tests {
|
||||
let invalid_values = vec!["not an ip", "192.168.1.256", "zzzz::ffff"];
|
||||
let invalid_input = Arc::new(StringVector::from_slice(&invalid_values)) as VectorRef;
|
||||
|
||||
let ipv4_result = ipv4_func.eval(&ctx, &[invalid_input.clone()]);
|
||||
let ipv4_result = ipv4_func.eval(&ctx, std::slice::from_ref(&invalid_input));
|
||||
|
||||
assert!(ipv4_result.is_err());
|
||||
}
|
||||
|
||||
@@ -294,7 +294,7 @@ mod tests {
|
||||
let input = Arc::new(StringVector::from_slice(&values)) as VectorRef;
|
||||
|
||||
// Convert IPv6 addresses to binary
|
||||
let binary_result = to_num.eval(&ctx, &[input.clone()]).unwrap();
|
||||
let binary_result = to_num.eval(&ctx, std::slice::from_ref(&input)).unwrap();
|
||||
|
||||
// Convert binary to hex string representation (for ipv6_num_to_string)
|
||||
let mut hex_strings = Vec::new();
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
pub mod json_get;
|
||||
mod json_is;
|
||||
mod json_path_exists;
|
||||
@@ -33,23 +32,23 @@ pub(crate) struct JsonFunction;
|
||||
|
||||
impl JsonFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register(Arc::new(JsonToStringFunction));
|
||||
registry.register(Arc::new(ParseJsonFunction));
|
||||
registry.register_scalar(JsonToStringFunction);
|
||||
registry.register_scalar(ParseJsonFunction);
|
||||
|
||||
registry.register(Arc::new(JsonGetInt));
|
||||
registry.register(Arc::new(JsonGetFloat));
|
||||
registry.register(Arc::new(JsonGetString));
|
||||
registry.register(Arc::new(JsonGetBool));
|
||||
registry.register_scalar(JsonGetInt);
|
||||
registry.register_scalar(JsonGetFloat);
|
||||
registry.register_scalar(JsonGetString);
|
||||
registry.register_scalar(JsonGetBool);
|
||||
|
||||
registry.register(Arc::new(JsonIsNull));
|
||||
registry.register(Arc::new(JsonIsInt));
|
||||
registry.register(Arc::new(JsonIsFloat));
|
||||
registry.register(Arc::new(JsonIsString));
|
||||
registry.register(Arc::new(JsonIsBool));
|
||||
registry.register(Arc::new(JsonIsArray));
|
||||
registry.register(Arc::new(JsonIsObject));
|
||||
registry.register_scalar(JsonIsNull);
|
||||
registry.register_scalar(JsonIsInt);
|
||||
registry.register_scalar(JsonIsFloat);
|
||||
registry.register_scalar(JsonIsString);
|
||||
registry.register_scalar(JsonIsBool);
|
||||
registry.register_scalar(JsonIsArray);
|
||||
registry.register_scalar(JsonIsObject);
|
||||
|
||||
registry.register(Arc::new(json_path_exists::JsonPathExistsFunction));
|
||||
registry.register(Arc::new(json_path_match::JsonPathMatchFunction));
|
||||
registry.register_scalar(json_path_exists::JsonPathExistsFunction);
|
||||
registry.register_scalar(json_path_match::JsonPathMatchFunction);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,11 +38,11 @@ use crate::function_registry::FunctionRegistry;
|
||||
///
|
||||
/// Usage: matches(`<col>`, `<pattern>`) -> boolean
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub(crate) struct MatchesFunction;
|
||||
pub struct MatchesFunction;
|
||||
|
||||
impl MatchesFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register(Arc::new(MatchesFunction));
|
||||
registry.register_scalar(MatchesFunction);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -77,7 +77,7 @@ pub struct MatchesTermFunction;
|
||||
|
||||
impl MatchesTermFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register(Arc::new(MatchesTermFunction));
|
||||
registry.register_scalar(MatchesTermFunction);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,15 +12,14 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod clamp;
|
||||
pub mod clamp;
|
||||
mod modulo;
|
||||
mod pow;
|
||||
mod rate;
|
||||
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub use clamp::ClampFunction;
|
||||
pub use clamp::{ClampFunction, ClampMaxFunction, ClampMinFunction};
|
||||
use common_query::error::{GeneralDataFusionSnafu, Result};
|
||||
use common_query::prelude::Signature;
|
||||
use datafusion::error::DataFusionError;
|
||||
@@ -39,11 +38,13 @@ pub(crate) struct MathFunction;
|
||||
|
||||
impl MathFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register(Arc::new(ModuloFunction));
|
||||
registry.register(Arc::new(PowFunction));
|
||||
registry.register(Arc::new(RateFunction));
|
||||
registry.register(Arc::new(RangeFunction));
|
||||
registry.register(Arc::new(ClampFunction));
|
||||
registry.register_scalar(ModuloFunction);
|
||||
registry.register_scalar(PowFunction);
|
||||
registry.register_scalar(RateFunction);
|
||||
registry.register_scalar(RangeFunction);
|
||||
registry.register_scalar(ClampFunction);
|
||||
registry.register_scalar(ClampMinFunction);
|
||||
registry.register_scalar(ClampMaxFunction);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user