mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-25 15:40:02 +00:00
Compare commits
55 Commits
oldrelease
...
flow/choos
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e2c7dfcaba | ||
|
|
79f584316e | ||
|
|
6ab0f0cc5c | ||
|
|
8685ceb232 | ||
|
|
b442414422 | ||
|
|
51f2cb1053 | ||
|
|
fbf50c594e | ||
|
|
5739302845 | ||
|
|
148d96fc38 | ||
|
|
e787007eb5 | ||
|
|
60acf28f3c | ||
|
|
06126147d2 | ||
|
|
cce1285b16 | ||
|
|
4b5ab75312 | ||
|
|
56f31d5933 | ||
|
|
df31f0b9ec | ||
|
|
07e84a28a3 | ||
|
|
f298a110f9 | ||
|
|
6a5936468e | ||
|
|
49a936e2e1 | ||
|
|
41a706c7cd | ||
|
|
d6e98206b6 | ||
|
|
7b4df6343f | ||
|
|
bb4890cff8 | ||
|
|
b0ad3f0bb4 | ||
|
|
8726bf9f7a | ||
|
|
44e75b142d | ||
|
|
a706edbb73 | ||
|
|
0bf07d7f91 | ||
|
|
b8f9915d47 | ||
|
|
6166f2072e | ||
|
|
8338aa14d3 | ||
|
|
a18dc632c8 | ||
|
|
a9f486e493 | ||
|
|
06e8d46ba9 | ||
|
|
89661c0626 | ||
|
|
a3ae2d7b52 | ||
|
|
789f585a7f | ||
|
|
133f404547 | ||
|
|
bdd44fd7ec | ||
|
|
13ac4d5048 | ||
|
|
c6448a6ccc | ||
|
|
86aae6733d | ||
|
|
ed1ce8438f | ||
|
|
4b921b8425 | ||
|
|
1a517ec8ac | ||
|
|
21044c7339 | ||
|
|
8e1ec2a201 | ||
|
|
5ed0a095b6 | ||
|
|
3c943be189 | ||
|
|
eeba466717 | ||
|
|
2ff54486d3 | ||
|
|
66e2242e46 | ||
|
|
489b16ae30 | ||
|
|
85d564b0fb |
14
.github/scripts/create-version.sh
vendored
14
.github/scripts/create-version.sh
vendored
@@ -10,17 +10,17 @@ set -e
|
||||
function create_version() {
|
||||
# Read from envrionment variables.
|
||||
if [ -z "$GITHUB_EVENT_NAME" ]; then
|
||||
echo "GITHUB_EVENT_NAME is empty"
|
||||
echo "GITHUB_EVENT_NAME is empty" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [ -z "$NEXT_RELEASE_VERSION" ]; then
|
||||
echo "NEXT_RELEASE_VERSION is empty"
|
||||
exit 1
|
||||
echo "NEXT_RELEASE_VERSION is empty, use version from Cargo.toml" >&2
|
||||
export NEXT_RELEASE_VERSION=$(grep '^version = ' Cargo.toml | cut -d '"' -f 2 | head -n 1)
|
||||
fi
|
||||
|
||||
if [ -z "$NIGHTLY_RELEASE_PREFIX" ]; then
|
||||
echo "NIGHTLY_RELEASE_PREFIX is empty"
|
||||
echo "NIGHTLY_RELEASE_PREFIX is empty" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
@@ -35,7 +35,7 @@ function create_version() {
|
||||
# It will be like 'dev-2023080819-f0e7216c'.
|
||||
if [ "$NEXT_RELEASE_VERSION" = dev ]; then
|
||||
if [ -z "$COMMIT_SHA" ]; then
|
||||
echo "COMMIT_SHA is empty in dev build"
|
||||
echo "COMMIT_SHA is empty in dev build" >&2
|
||||
exit 1
|
||||
fi
|
||||
echo "dev-$(date "+%Y%m%d-%s")-$(echo "$COMMIT_SHA" | cut -c1-8)"
|
||||
@@ -45,7 +45,7 @@ function create_version() {
|
||||
# Note: Only output 'version=xxx' to stdout when everything is ok, so that it can be used in GitHub Actions Outputs.
|
||||
if [ "$GITHUB_EVENT_NAME" = push ]; then
|
||||
if [ -z "$GITHUB_REF_NAME" ]; then
|
||||
echo "GITHUB_REF_NAME is empty in push event"
|
||||
echo "GITHUB_REF_NAME is empty in push event" >&2
|
||||
exit 1
|
||||
fi
|
||||
echo "$GITHUB_REF_NAME"
|
||||
@@ -54,7 +54,7 @@ function create_version() {
|
||||
elif [ "$GITHUB_EVENT_NAME" = schedule ]; then
|
||||
echo "$NEXT_RELEASE_VERSION-$NIGHTLY_RELEASE_PREFIX-$(date "+%Y%m%d")"
|
||||
else
|
||||
echo "Unsupported GITHUB_EVENT_NAME: $GITHUB_EVENT_NAME"
|
||||
echo "Unsupported GITHUB_EVENT_NAME: $GITHUB_EVENT_NAME" >&2
|
||||
exit 1
|
||||
fi
|
||||
}
|
||||
|
||||
37
.github/scripts/update-dev-builder-version.sh
vendored
Executable file
37
.github/scripts/update-dev-builder-version.sh
vendored
Executable file
@@ -0,0 +1,37 @@
|
||||
#!/bin/bash
|
||||
|
||||
DEV_BUILDER_IMAGE_TAG=$1
|
||||
|
||||
update_dev_builder_version() {
|
||||
if [ -z "$DEV_BUILDER_IMAGE_TAG" ]; then
|
||||
echo "Error: Should specify the dev-builder image tag"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Configure Git configs.
|
||||
git config --global user.email greptimedb-ci@greptime.com
|
||||
git config --global user.name greptimedb-ci
|
||||
|
||||
# Checkout a new branch.
|
||||
BRANCH_NAME="ci/update-dev-builder-$(date +%Y%m%d%H%M%S)"
|
||||
git checkout -b $BRANCH_NAME
|
||||
|
||||
# Update the dev-builder image tag in the Makefile.
|
||||
gsed -i "s/DEV_BUILDER_IMAGE_TAG ?=.*/DEV_BUILDER_IMAGE_TAG ?= ${DEV_BUILDER_IMAGE_TAG}/g" Makefile
|
||||
|
||||
# Commit the changes.
|
||||
git add Makefile
|
||||
git commit -m "ci: update dev-builder image tag"
|
||||
git push origin $BRANCH_NAME
|
||||
|
||||
# Create a Pull Request.
|
||||
gh pr create \
|
||||
--title "ci: update dev-builder image tag" \
|
||||
--body "This PR updates the dev-builder image tag" \
|
||||
--base main \
|
||||
--head $BRANCH_NAME \
|
||||
--reviewer zyy17 \
|
||||
--reviewer daviderli614
|
||||
}
|
||||
|
||||
update_dev_builder_version
|
||||
19
.github/workflows/develop.yml
vendored
19
.github/workflows/develop.yml
vendored
@@ -22,6 +22,7 @@ concurrency:
|
||||
|
||||
jobs:
|
||||
check-typos-and-docs:
|
||||
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
|
||||
name: Check typos and docs
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
@@ -36,6 +37,7 @@ jobs:
|
||||
|| (echo "'config/config.md' is not up-to-date, please run 'make config-docs'." && exit 1)
|
||||
|
||||
license-header-check:
|
||||
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
|
||||
runs-on: ubuntu-latest
|
||||
name: Check License Header
|
||||
steps:
|
||||
@@ -45,6 +47,7 @@ jobs:
|
||||
- uses: korandoru/hawkeye@v5
|
||||
|
||||
check:
|
||||
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
|
||||
name: Check
|
||||
runs-on: ${{ matrix.os }}
|
||||
strategy:
|
||||
@@ -71,6 +74,7 @@ jobs:
|
||||
run: cargo check --locked --workspace --all-targets
|
||||
|
||||
toml:
|
||||
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
|
||||
name: Toml Check
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 60
|
||||
@@ -85,6 +89,7 @@ jobs:
|
||||
run: taplo format --check
|
||||
|
||||
build:
|
||||
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
|
||||
name: Build GreptimeDB binaries
|
||||
runs-on: ${{ matrix.os }}
|
||||
strategy:
|
||||
@@ -127,6 +132,7 @@ jobs:
|
||||
version: current
|
||||
|
||||
fuzztest:
|
||||
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
|
||||
name: Fuzz Test
|
||||
needs: build
|
||||
runs-on: ubuntu-latest
|
||||
@@ -183,6 +189,7 @@ jobs:
|
||||
max-total-time: 120
|
||||
|
||||
unstable-fuzztest:
|
||||
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
|
||||
name: Unstable Fuzz Test
|
||||
needs: build-greptime-ci
|
||||
runs-on: ubuntu-latest
|
||||
@@ -244,6 +251,7 @@ jobs:
|
||||
retention-days: 3
|
||||
|
||||
build-greptime-ci:
|
||||
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
|
||||
name: Build GreptimeDB binary (profile-CI)
|
||||
runs-on: ${{ matrix.os }}
|
||||
strategy:
|
||||
@@ -285,6 +293,7 @@ jobs:
|
||||
version: current
|
||||
|
||||
distributed-fuzztest:
|
||||
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
|
||||
name: Fuzz Test (Distributed, ${{ matrix.mode.name }}, ${{ matrix.target }})
|
||||
runs-on: ubuntu-latest
|
||||
needs: build-greptime-ci
|
||||
@@ -416,6 +425,7 @@ jobs:
|
||||
docker system prune -f
|
||||
|
||||
distributed-fuzztest-with-chaos:
|
||||
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
|
||||
name: Fuzz Test with Chaos (Distributed, ${{ matrix.mode.name }}, ${{ matrix.target }})
|
||||
runs-on: ubuntu-latest
|
||||
needs: build-greptime-ci
|
||||
@@ -563,6 +573,7 @@ jobs:
|
||||
docker system prune -f
|
||||
|
||||
sqlness:
|
||||
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
|
||||
name: Sqlness Test (${{ matrix.mode.name }})
|
||||
needs: build
|
||||
runs-on: ${{ matrix.os }}
|
||||
@@ -609,6 +620,7 @@ jobs:
|
||||
retention-days: 3
|
||||
|
||||
fmt:
|
||||
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
|
||||
name: Rustfmt
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 60
|
||||
@@ -626,6 +638,7 @@ jobs:
|
||||
run: make fmt-check
|
||||
|
||||
clippy:
|
||||
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
|
||||
name: Clippy
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 60
|
||||
@@ -651,6 +664,7 @@ jobs:
|
||||
run: make clippy
|
||||
|
||||
conflict-check:
|
||||
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
|
||||
name: Check for conflict
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
@@ -661,7 +675,7 @@ jobs:
|
||||
uses: olivernybroe/action-conflict-finder@v4.0
|
||||
|
||||
test:
|
||||
if: github.event_name != 'merge_group'
|
||||
if: ${{ github.repository == 'GreptimeTeam/greptimedb' && github.event_name != 'merge_group' }}
|
||||
runs-on: ubuntu-22.04-arm
|
||||
timeout-minutes: 60
|
||||
needs: [conflict-check, clippy, fmt]
|
||||
@@ -713,7 +727,7 @@ jobs:
|
||||
UNITTEST_LOG_DIR: "__unittest_logs"
|
||||
|
||||
coverage:
|
||||
if: github.event_name == 'merge_group'
|
||||
if: ${{ github.repository == 'GreptimeTeam/greptimedb' && github.event_name == 'merge_group' }}
|
||||
runs-on: ubuntu-22.04-8-cores
|
||||
timeout-minutes: 60
|
||||
steps:
|
||||
@@ -773,6 +787,7 @@ jobs:
|
||||
verbose: true
|
||||
|
||||
# compat:
|
||||
# if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
|
||||
# name: Compatibility Test
|
||||
# needs: build
|
||||
# runs-on: ubuntu-22.04
|
||||
|
||||
6
.github/workflows/nightly-ci.yml
vendored
6
.github/workflows/nightly-ci.yml
vendored
@@ -117,16 +117,16 @@ jobs:
|
||||
name: Run clean build on Linux
|
||||
runs-on: ubuntu-latest
|
||||
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
|
||||
timeout-minutes: 60
|
||||
timeout-minutes: 45
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
persist-credentials: false
|
||||
- uses: cachix/install-nix-action@v27
|
||||
- uses: cachix/install-nix-action@v31
|
||||
with:
|
||||
nix_path: nixpkgs=channel:nixos-24.11
|
||||
- run: nix develop --command cargo build
|
||||
- run: nix develop --command cargo build --bin greptime
|
||||
|
||||
check-status:
|
||||
name: Check status
|
||||
|
||||
@@ -24,11 +24,19 @@ on:
|
||||
description: Release dev-builder-android image
|
||||
required: false
|
||||
default: false
|
||||
update_dev_builder_image_tag:
|
||||
type: boolean
|
||||
description: Update the DEV_BUILDER_IMAGE_TAG in Makefile and create a PR
|
||||
required: false
|
||||
default: false
|
||||
|
||||
jobs:
|
||||
release-dev-builder-images:
|
||||
name: Release dev builder images
|
||||
if: ${{ inputs.release_dev_builder_ubuntu_image || inputs.release_dev_builder_centos_image || inputs.release_dev_builder_android_image }} # Only manually trigger this job.
|
||||
# The jobs are triggered by the following events:
|
||||
# 1. Manually triggered workflow_dispatch event
|
||||
# 2. Push event when the PR that modifies the `rust-toolchain.toml` or `docker/dev-builder/**` is merged to main
|
||||
if: ${{ github.event_name == 'push' || inputs.release_dev_builder_ubuntu_image || inputs.release_dev_builder_centos_image || inputs.release_dev_builder_android_image }}
|
||||
runs-on: ubuntu-latest
|
||||
outputs:
|
||||
version: ${{ steps.set-version.outputs.version }}
|
||||
@@ -57,9 +65,9 @@ jobs:
|
||||
version: ${{ env.VERSION }}
|
||||
dockerhub-image-registry-username: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
dockerhub-image-registry-token: ${{ secrets.DOCKERHUB_TOKEN }}
|
||||
build-dev-builder-ubuntu: ${{ inputs.release_dev_builder_ubuntu_image }}
|
||||
build-dev-builder-centos: ${{ inputs.release_dev_builder_centos_image }}
|
||||
build-dev-builder-android: ${{ inputs.release_dev_builder_android_image }}
|
||||
build-dev-builder-ubuntu: ${{ inputs.release_dev_builder_ubuntu_image || github.event_name == 'push' }}
|
||||
build-dev-builder-centos: ${{ inputs.release_dev_builder_centos_image || github.event_name == 'push' }}
|
||||
build-dev-builder-android: ${{ inputs.release_dev_builder_android_image || github.event_name == 'push' }}
|
||||
|
||||
release-dev-builder-images-ecr:
|
||||
name: Release dev builder images to AWS ECR
|
||||
@@ -85,7 +93,7 @@ jobs:
|
||||
|
||||
- name: Push dev-builder-ubuntu image
|
||||
shell: bash
|
||||
if: ${{ inputs.release_dev_builder_ubuntu_image }}
|
||||
if: ${{ inputs.release_dev_builder_ubuntu_image || github.event_name == 'push' }}
|
||||
env:
|
||||
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
|
||||
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
|
||||
@@ -106,7 +114,7 @@ jobs:
|
||||
|
||||
- name: Push dev-builder-centos image
|
||||
shell: bash
|
||||
if: ${{ inputs.release_dev_builder_centos_image }}
|
||||
if: ${{ inputs.release_dev_builder_centos_image || github.event_name == 'push' }}
|
||||
env:
|
||||
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
|
||||
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
|
||||
@@ -127,7 +135,7 @@ jobs:
|
||||
|
||||
- name: Push dev-builder-android image
|
||||
shell: bash
|
||||
if: ${{ inputs.release_dev_builder_android_image }}
|
||||
if: ${{ inputs.release_dev_builder_android_image || github.event_name == 'push' }}
|
||||
env:
|
||||
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
|
||||
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
|
||||
@@ -162,7 +170,7 @@ jobs:
|
||||
|
||||
- name: Push dev-builder-ubuntu image
|
||||
shell: bash
|
||||
if: ${{ inputs.release_dev_builder_ubuntu_image }}
|
||||
if: ${{ inputs.release_dev_builder_ubuntu_image || github.event_name == 'push' }}
|
||||
env:
|
||||
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
|
||||
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
|
||||
@@ -176,7 +184,7 @@ jobs:
|
||||
|
||||
- name: Push dev-builder-centos image
|
||||
shell: bash
|
||||
if: ${{ inputs.release_dev_builder_centos_image }}
|
||||
if: ${{ inputs.release_dev_builder_centos_image || github.event_name == 'push' }}
|
||||
env:
|
||||
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
|
||||
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
|
||||
@@ -190,7 +198,7 @@ jobs:
|
||||
|
||||
- name: Push dev-builder-android image
|
||||
shell: bash
|
||||
if: ${{ inputs.release_dev_builder_android_image }}
|
||||
if: ${{ inputs.release_dev_builder_android_image || github.event_name == 'push' }}
|
||||
env:
|
||||
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
|
||||
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
|
||||
@@ -201,3 +209,24 @@ jobs:
|
||||
quay.io/skopeo/stable:latest \
|
||||
copy -a docker://docker.io/$IMAGE_NAMESPACE/dev-builder-android:$IMAGE_VERSION \
|
||||
docker://$ACR_IMAGE_REGISTRY/$IMAGE_NAMESPACE/dev-builder-android:$IMAGE_VERSION
|
||||
|
||||
update-dev-builder-image-tag:
|
||||
name: Update dev-builder image tag
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: write
|
||||
pull-requests: write
|
||||
if: ${{ github.event_name == 'push' || inputs.update_dev_builder_image_tag }}
|
||||
needs: [
|
||||
release-dev-builder-images
|
||||
]
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Update dev-builder image tag
|
||||
shell: bash
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
run: |
|
||||
./.github/scripts/update-dev-builder-version.sh ${{ needs.release-dev-builder-images.outputs.version }}
|
||||
|
||||
26
.github/workflows/release.yml
vendored
26
.github/workflows/release.yml
vendored
@@ -90,8 +90,6 @@ env:
|
||||
|
||||
# The scheduled version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-YYYYMMDD', like v0.2.0-nigthly-20230313;
|
||||
NIGHTLY_RELEASE_PREFIX: nightly
|
||||
# Note: The NEXT_RELEASE_VERSION should be modified manually by every formal release.
|
||||
NEXT_RELEASE_VERSION: v0.14.0
|
||||
|
||||
jobs:
|
||||
allocate-runners:
|
||||
@@ -135,7 +133,6 @@ jobs:
|
||||
env:
|
||||
GITHUB_EVENT_NAME: ${{ github.event_name }}
|
||||
GITHUB_REF_NAME: ${{ github.ref_name }}
|
||||
NEXT_RELEASE_VERSION: ${{ env.NEXT_RELEASE_VERSION }}
|
||||
NIGHTLY_RELEASE_PREFIX: ${{ env.NIGHTLY_RELEASE_PREFIX }}
|
||||
|
||||
- name: Allocate linux-amd64 runner
|
||||
@@ -467,6 +464,29 @@ jobs:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
DOCS_REPO_TOKEN: ${{ secrets.DOCS_REPO_TOKEN }}
|
||||
|
||||
bump-website-version:
|
||||
name: Bump website version
|
||||
if: ${{ github.event_name == 'push' || github.event_name == 'schedule' }}
|
||||
needs: [allocate-runners]
|
||||
runs-on: ubuntu-latest
|
||||
# Permission reference: https://docs.github.com/en/actions/using-jobs/assigning-permissions-to-jobs
|
||||
permissions:
|
||||
issues: write # Allows the action to create issues for cyborg.
|
||||
contents: write # Allows the action to create a release.
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
persist-credentials: false
|
||||
- uses: ./.github/actions/setup-cyborg
|
||||
- name: Bump website version
|
||||
working-directory: cyborg
|
||||
run: pnpm tsx bin/bump-website-version.ts
|
||||
env:
|
||||
VERSION: ${{ needs.allocate-runners.outputs.version }}
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
WEBSITE_REPO_TOKEN: ${{ secrets.WEBSITE_REPO_TOKEN }}
|
||||
|
||||
notification:
|
||||
if: ${{ github.repository == 'GreptimeTeam/greptimedb' && (github.event_name == 'push' || github.event_name == 'schedule') && always() }}
|
||||
name: Send notification to Greptime team
|
||||
|
||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -28,6 +28,7 @@ debug/
|
||||
# Logs
|
||||
**/__unittest_logs
|
||||
logs/
|
||||
!grafana/dashboards/logs/
|
||||
|
||||
# cpython's generated python byte code
|
||||
**/__pycache__/
|
||||
|
||||
1049
Cargo.lock
generated
1049
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
31
Cargo.toml
31
Cargo.toml
@@ -68,15 +68,16 @@ members = [
|
||||
resolver = "2"
|
||||
|
||||
[workspace.package]
|
||||
version = "0.14.0"
|
||||
version = "0.15.0"
|
||||
edition = "2021"
|
||||
license = "Apache-2.0"
|
||||
|
||||
[workspace.lints]
|
||||
clippy.print_stdout = "warn"
|
||||
clippy.print_stderr = "warn"
|
||||
clippy.dbg_macro = "warn"
|
||||
clippy.implicit_clone = "warn"
|
||||
clippy.result_large_err = "allow"
|
||||
clippy.large_enum_variant = "allow"
|
||||
clippy.doc_overindented_list_items = "allow"
|
||||
rust.unknown_lints = "deny"
|
||||
rust.unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
|
||||
|
||||
@@ -112,15 +113,15 @@ clap = { version = "4.4", features = ["derive"] }
|
||||
config = "0.13.0"
|
||||
crossbeam-utils = "0.8"
|
||||
dashmap = "6.1"
|
||||
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" }
|
||||
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" }
|
||||
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" }
|
||||
datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" }
|
||||
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" }
|
||||
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" }
|
||||
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" }
|
||||
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" }
|
||||
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5bbedc6704162afb03478f56ffb629405a4e1220" }
|
||||
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
|
||||
deadpool = "0.12"
|
||||
deadpool-postgres = "0.14"
|
||||
derive_builder = "0.20"
|
||||
@@ -129,7 +130,7 @@ etcd-client = "0.14"
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "e82b0158cd38d4021edb4e4c0ae77f999051e62f" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "17a3550751c8b1e02ec16be40101d5f24dc255c3" }
|
||||
hex = "0.4"
|
||||
http = "1"
|
||||
humantime = "2.1"
|
||||
@@ -162,7 +163,7 @@ paste = "1.0"
|
||||
pin-project = "1.0"
|
||||
prometheus = { version = "0.13.3", features = ["process"] }
|
||||
promql-parser = { version = "0.5.1", features = ["ser"] }
|
||||
prost = "0.13"
|
||||
prost = { version = "0.13", features = ["no-recursion-limit"] }
|
||||
raft-engine = { version = "0.4.1", default-features = false }
|
||||
rand = "0.9"
|
||||
ratelimit = "0.10"
|
||||
@@ -191,7 +192,7 @@ simd-json = "0.15"
|
||||
similar-asserts = "1.6.0"
|
||||
smallvec = { version = "1", features = ["serde"] }
|
||||
snafu = "0.8"
|
||||
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "e98e6b322426a9d397a71efef17075966223c089", features = [
|
||||
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "0cf6c04490d59435ee965edd2078e8855bd8471e", features = [
|
||||
"visitor",
|
||||
"serde",
|
||||
] } # branch = "v0.54.x"
|
||||
|
||||
2
Makefile
2
Makefile
@@ -8,7 +8,7 @@ CARGO_BUILD_OPTS := --locked
|
||||
IMAGE_REGISTRY ?= docker.io
|
||||
IMAGE_NAMESPACE ?= greptime
|
||||
IMAGE_TAG ?= latest
|
||||
DEV_BUILDER_IMAGE_TAG ?= 2024-12-25-a71b93dd-20250305072908
|
||||
DEV_BUILDER_IMAGE_TAG ?= 2025-04-15-1a517ec8-20250428023155
|
||||
BUILDX_MULTI_PLATFORM_BUILD ?= false
|
||||
BUILDX_BUILDER_NAME ?= gtbuilder
|
||||
BASE_IMAGE ?= ubuntu
|
||||
|
||||
199
README.md
199
README.md
@@ -8,6 +8,8 @@
|
||||
|
||||
<h2 align="center">Real-Time & Cloud-Native Observability Database<br/>for metrics, logs, and traces</h2>
|
||||
|
||||
> Delivers sub-second querying at PB scale and exceptional cost efficiency from edge to cloud.
|
||||
|
||||
<div align="center">
|
||||
<h3 align="center">
|
||||
<a href="https://greptime.com/product/cloud">GreptimeCloud</a> |
|
||||
@@ -49,74 +51,77 @@
|
||||
</div>
|
||||
|
||||
- [Introduction](#introduction)
|
||||
- [**Features: Why GreptimeDB**](#why-greptimedb)
|
||||
- [Architecture](https://docs.greptime.com/contributor-guide/overview/#architecture)
|
||||
- [Try it for free](#try-greptimedb)
|
||||
- [⭐ Key Features](#features)
|
||||
- [Quick Comparison](#quick-comparison)
|
||||
- [Architecture](#architecture)
|
||||
- [Try GreptimeDB](#try-greptimedb)
|
||||
- [Getting Started](#getting-started)
|
||||
- [Project Status](#project-status)
|
||||
- [Join the community](#community)
|
||||
- [Contributing](#contributing)
|
||||
- [Build From Source](#build-from-source)
|
||||
- [Tools & Extensions](#tools--extensions)
|
||||
- [Project Status](#project-status)
|
||||
- [Community](#community)
|
||||
- [License](#license)
|
||||
- [Commercial Support](#commercial-support)
|
||||
- [Contributing](#contributing)
|
||||
- [Acknowledgement](#acknowledgement)
|
||||
|
||||
## Introduction
|
||||
|
||||
**GreptimeDB** is an open-source, cloud-native, unified & cost-effective observability database for **Metrics**, **Logs**, and **Traces**. You can gain real-time insights from Edge to Cloud at Any Scale.
|
||||
**GreptimeDB** is an open-source, cloud-native database purpose-built for the unified collection and analysis of observability data (metrics, logs, and traces). Whether you’re operating on the edge, in the cloud, or across hybrid environments, GreptimeDB empowers real-time insights at massive scale — all in one system.
|
||||
|
||||
## News
|
||||
## Features
|
||||
|
||||
**[GreptimeDB tops JSONBench's billion-record cold run test!](https://greptime.com/blogs/2025-03-18-jsonbench-greptimedb-performance)**
|
||||
| Feature | Description |
|
||||
| --------- | ----------- |
|
||||
| [Unified Observability Data](https://docs.greptime.com/user-guide/concepts/why-greptimedb) | Store metrics, logs, and traces as timestamped, contextual wide events. Query via [SQL](https://docs.greptime.com/user-guide/query-data/sql), [PromQL](https://docs.greptime.com/user-guide/query-data/promql), and [streaming](https://docs.greptime.com/user-guide/flow-computation/overview). |
|
||||
| [High Performance & Cost Effective](https://docs.greptime.com/user-guide/manage-data/data-index) | Written in Rust, with a distributed query engine, [rich indexing](https://docs.greptime.com/user-guide/manage-data/data-index), and optimized columnar storage, delivering sub-second responses at PB scale. |
|
||||
| [Cloud-Native Architecture](https://docs.greptime.com/user-guide/concepts/architecture) | Designed for [Kubernetes](https://docs.greptime.com/user-guide/deployments/deploy-on-kubernetes/greptimedb-operator-management), with compute/storage separation, native object storage (AWS S3, Azure Blob, etc.) and seamless cross-cloud access. |
|
||||
| [Developer-Friendly](https://docs.greptime.com/user-guide/protocols/overview) | Access via SQL/PromQL interfaces, REST API, MySQL/PostgreSQL protocols, and popular ingestion [protocols](https://docs.greptime.com/user-guide/protocols/overview). |
|
||||
| [Flexible Deployment](https://docs.greptime.com/user-guide/deployments/overview) | Deploy anywhere: edge (including ARM/[Android](https://docs.greptime.com/user-guide/deployments/run-on-android)) or cloud, with unified APIs and efficient data sync. |
|
||||
|
||||
## Why GreptimeDB
|
||||
Learn more in [Why GreptimeDB](https://docs.greptime.com/user-guide/concepts/why-greptimedb) and [Observability 2.0 and the Database for It](https://greptime.com/blogs/2025-04-25-greptimedb-observability2-new-database).
|
||||
|
||||
Our core developers have been building observability data platforms for years. Based on our best practices, GreptimeDB was born to give you:
|
||||
## Quick Comparison
|
||||
|
||||
* **Unified Processing of Observability Data**
|
||||
| Feature | GreptimeDB | Traditional TSDB | Log Stores |
|
||||
|----------------------------------|-----------------------|--------------------|-----------------|
|
||||
| Data Types | Metrics, Logs, Traces | Metrics only | Logs only |
|
||||
| Query Language | SQL, PromQL, Streaming| Custom/PromQL | Custom/DSL |
|
||||
| Deployment | Edge + Cloud | Cloud/On-prem | Mostly central |
|
||||
| Indexing & Performance | PB-Scale, Sub-second | Varies | Varies |
|
||||
| Integration | REST, SQL, Common protocols | Varies | Varies |
|
||||
|
||||
A unified database that treats metrics, logs, and traces as timestamped wide events with context, supporting [SQL](https://docs.greptime.com/user-guide/query-data/sql)/[PromQL](https://docs.greptime.com/user-guide/query-data/promql) queries and [stream processing](https://docs.greptime.com/user-guide/flow-computation/overview) to simplify complex data stacks.
|
||||
**Performance:**
|
||||
* [GreptimeDB tops JSONBench's billion-record cold run test!](https://greptime.com/blogs/2025-03-18-jsonbench-greptimedb-performance)
|
||||
* [TSBS Benchmark](https://github.com/GreptimeTeam/greptimedb/tree/main/docs/benchmarks/tsbs)
|
||||
|
||||
* **High Performance and Cost-effective**
|
||||
Read [more benchmark reports](https://docs.greptime.com/user-guide/concepts/features-that-you-concern#how-is-greptimedbs-performance-compared-to-other-solutions).
|
||||
|
||||
Written in Rust, combines a distributed query engine with [rich indexing](https://docs.greptime.com/user-guide/manage-data/data-index) (inverted, fulltext, skip data, and vector) and optimized columnar storage to deliver sub-second responses on petabyte-scale data and high-cost efficiency.
|
||||
## Architecture
|
||||
|
||||
* **Cloud-native Distributed Database**
|
||||
|
||||
Built for [Kubernetes](https://docs.greptime.com/user-guide/deployments/deploy-on-kubernetes/greptimedb-operator-management). GreptimeDB achieves seamless scalability with its [cloud-native architecture](https://docs.greptime.com/user-guide/concepts/architecture) of separated compute and storage, built on object storage (AWS S3, Azure Blob Storage, etc.) while enabling cross-cloud deployment through a unified data access layer.
|
||||
|
||||
* **Developer-Friendly**
|
||||
|
||||
Access standardized SQL/PromQL interfaces through built-in web dashboard, REST API, and MySQL/PostgreSQL protocols. Supports widely adopted data ingestion [protocols](https://docs.greptime.com/user-guide/protocols/overview) for seamless migration and integration.
|
||||
|
||||
* **Flexible Deployment Options**
|
||||
|
||||
Deploy GreptimeDB anywhere from ARM-based edge devices to cloud environments with unified APIs and bandwidth-efficient data synchronization. Query edge and cloud data seamlessly through identical APIs. [Learn how to run on Android](https://docs.greptime.com/user-guide/deployments/run-on-android/).
|
||||
|
||||
For more detailed info please read [Why GreptimeDB](https://docs.greptime.com/user-guide/concepts/why-greptimedb).
|
||||
* Read the [architecture](https://docs.greptime.com/contributor-guide/overview/#architecture) document.
|
||||
* [DeepWiki](https://deepwiki.com/GreptimeTeam/greptimedb/1-overview) provides an in-depth look at GreptimeDB:
|
||||
<img alt="GreptimeDB System Overview" src="docs/architecture.png">
|
||||
|
||||
## Try GreptimeDB
|
||||
|
||||
### 1. [Live Demo](https://greptime.com/playground)
|
||||
|
||||
Try out the features of GreptimeDB right from your browser.
|
||||
Experience GreptimeDB directly in your browser.
|
||||
|
||||
### 2. [GreptimeCloud](https://console.greptime.cloud/)
|
||||
|
||||
Start instantly with a free cluster.
|
||||
|
||||
### 3. Docker Image
|
||||
|
||||
To install GreptimeDB locally, the recommended way is via Docker:
|
||||
### 3. Docker (Local Quickstart)
|
||||
|
||||
```shell
|
||||
docker pull greptime/greptimedb
|
||||
```
|
||||
|
||||
Start a GreptimeDB container with:
|
||||
|
||||
```shell
|
||||
docker run -p 127.0.0.1:4000-4003:4000-4003 \
|
||||
-v "$(pwd)/greptimedb:./greptimedb_data" \
|
||||
-v "$(pwd)/greptimedb:/greptimedb_data" \
|
||||
--name greptime --rm \
|
||||
greptime/greptimedb:latest standalone start \
|
||||
--http-addr 0.0.0.0:4000 \
|
||||
@@ -124,114 +129,90 @@ docker run -p 127.0.0.1:4000-4003:4000-4003 \
|
||||
--mysql-addr 0.0.0.0:4002 \
|
||||
--postgres-addr 0.0.0.0:4003
|
||||
```
|
||||
Dashboard: [http://localhost:4000/dashboard](http://localhost:4000/dashboard)
|
||||
[Full Install Guide](https://docs.greptime.com/getting-started/installation/overview)
|
||||
|
||||
Access the dashboard via `http://localhost:4000/dashboard`.
|
||||
|
||||
Read more about [Installation](https://docs.greptime.com/getting-started/installation/overview) on docs.
|
||||
**Troubleshooting:**
|
||||
* Cannot connect to the database? Ensure that ports `4000`, `4001`, `4002`, and `4003` are not blocked by a firewall or used by other services.
|
||||
* Failed to start? Check the container logs with `docker logs greptime` for further details.
|
||||
|
||||
## Getting Started
|
||||
|
||||
* [Quickstart](https://docs.greptime.com/getting-started/quick-start)
|
||||
* [User Guide](https://docs.greptime.com/user-guide/overview)
|
||||
* [Demos](https://github.com/GreptimeTeam/demo-scene)
|
||||
* [FAQ](https://docs.greptime.com/faq-and-others/faq)
|
||||
- [Quickstart](https://docs.greptime.com/getting-started/quick-start)
|
||||
- [User Guide](https://docs.greptime.com/user-guide/overview)
|
||||
- [Demo Scenes](https://github.com/GreptimeTeam/demo-scene)
|
||||
- [FAQ](https://docs.greptime.com/faq-and-others/faq)
|
||||
|
||||
## Build
|
||||
|
||||
Check the prerequisite:
|
||||
## Build From Source
|
||||
|
||||
**Prerequisites:**
|
||||
* [Rust toolchain](https://www.rust-lang.org/tools/install) (nightly)
|
||||
* [Protobuf compiler](https://grpc.io/docs/protoc-installation/) (>= 3.15)
|
||||
* C/C++ building essentials, including `gcc`/`g++`/`autoconf` and glibc library (eg. `libc6-dev` on Ubuntu and `glibc-devel` on Fedora)
|
||||
* Python toolchain (optional): Required only if using some test scripts.
|
||||
|
||||
Build GreptimeDB binary:
|
||||
|
||||
```shell
|
||||
**Build and Run:**
|
||||
```bash
|
||||
make
|
||||
```
|
||||
|
||||
Run a standalone server:
|
||||
|
||||
```shell
|
||||
cargo run -- standalone start
|
||||
```
|
||||
|
||||
## Tools & Extensions
|
||||
|
||||
### Kubernetes
|
||||
|
||||
- [GreptimeDB Operator](https://github.com/GrepTimeTeam/greptimedb-operator)
|
||||
|
||||
### Dashboard
|
||||
|
||||
- [The dashboard UI for GreptimeDB](https://github.com/GreptimeTeam/dashboard)
|
||||
|
||||
### SDK
|
||||
|
||||
- [GreptimeDB Go Ingester](https://github.com/GreptimeTeam/greptimedb-ingester-go)
|
||||
- [GreptimeDB Java Ingester](https://github.com/GreptimeTeam/greptimedb-ingester-java)
|
||||
- [GreptimeDB C++ Ingester](https://github.com/GreptimeTeam/greptimedb-ingester-cpp)
|
||||
- [GreptimeDB Erlang Ingester](https://github.com/GreptimeTeam/greptimedb-ingester-erl)
|
||||
- [GreptimeDB Rust Ingester](https://github.com/GreptimeTeam/greptimedb-ingester-rust)
|
||||
- [GreptimeDB JavaScript Ingester](https://github.com/GreptimeTeam/greptimedb-ingester-js)
|
||||
|
||||
### Grafana Dashboard
|
||||
|
||||
Our official Grafana dashboard for monitoring GreptimeDB is available at [grafana](grafana/README.md) directory.
|
||||
- **Kubernetes:** [GreptimeDB Operator](https://github.com/GrepTimeTeam/greptimedb-operator)
|
||||
- **Helm Charts:** [Greptime Helm Charts](https://github.com/GreptimeTeam/helm-charts)
|
||||
- **Dashboard:** [Web UI](https://github.com/GreptimeTeam/dashboard)
|
||||
- **SDKs/Ingester:** [Go](https://github.com/GreptimeTeam/greptimedb-ingester-go), [Java](https://github.com/GreptimeTeam/greptimedb-ingester-java), [C++](https://github.com/GreptimeTeam/greptimedb-ingester-cpp), [Erlang](https://github.com/GreptimeTeam/greptimedb-ingester-erl), [Rust](https://github.com/GreptimeTeam/greptimedb-ingester-rust), [JS](https://github.com/GreptimeTeam/greptimedb-ingester-js)
|
||||
- **Grafana**: [Official Dashboard](https://github.com/GreptimeTeam/greptimedb/blob/main/grafana/README.md)
|
||||
|
||||
## Project Status
|
||||
|
||||
GreptimeDB is currently in Beta. We are targeting GA (General Availability) with v1.0 release by Early 2025.
|
||||
> **Status:** Beta.
|
||||
> **GA (v1.0):** Targeted for mid 2025.
|
||||
|
||||
While in Beta, GreptimeDB is already:
|
||||
|
||||
* Being used in production by early adopters
|
||||
* Actively maintained with regular releases, [about version number](https://docs.greptime.com/nightly/reference/about-greptimedb-version)
|
||||
* Suitable for testing and evaluation
|
||||
- Being used in production by early adopters
|
||||
- Stable, actively maintained, with regular releases ([version info](https://docs.greptime.com/nightly/reference/about-greptimedb-version))
|
||||
- Suitable for evaluation and pilot deployments
|
||||
|
||||
For production use, we recommend using the latest stable release.
|
||||
[](https://www.star-history.com/#GreptimeTeam/GreptimeDB&Date)
|
||||
|
||||
If you find this project useful, a ⭐ would mean a lot to us!
|
||||
<img alt="Known Users" src="https://greptime.com/logo/img/users.png"/>
|
||||
|
||||
## Community
|
||||
|
||||
Our core team is thrilled to see you participate in any ways you like. When you are stuck, try to
|
||||
ask for help by filling an issue with a detailed description of what you were trying to do
|
||||
and what went wrong. If you have any questions or if you would like to get involved in our
|
||||
community, please check out:
|
||||
We invite you to engage and contribute!
|
||||
|
||||
- GreptimeDB Community on [Slack](https://greptime.com/slack)
|
||||
- GreptimeDB [GitHub Discussions forum](https://github.com/GreptimeTeam/greptimedb/discussions)
|
||||
- Greptime official [website](https://greptime.com)
|
||||
|
||||
In addition, you may:
|
||||
|
||||
- View our official [Blog](https://greptime.com/blogs/)
|
||||
- Connect us with [Linkedin](https://www.linkedin.com/company/greptime/)
|
||||
- Follow us on [Twitter](https://twitter.com/greptime)
|
||||
|
||||
## Commercial Support
|
||||
|
||||
If you are running GreptimeDB OSS in your organization, we offer additional
|
||||
enterprise add-ons, installation services, training, and consulting. [Contact
|
||||
us](https://greptime.com/contactus) and we will reach out to you with more
|
||||
detail of our commercial license.
|
||||
- [Slack](https://greptime.com/slack)
|
||||
- [Discussions](https://github.com/GreptimeTeam/greptimedb/discussions)
|
||||
- [Official Website](https://greptime.com/)
|
||||
- [Blog](https://greptime.com/blogs/)
|
||||
- [LinkedIn](https://www.linkedin.com/company/greptime/)
|
||||
- [Twitter](https://twitter.com/greptime)
|
||||
|
||||
## License
|
||||
|
||||
GreptimeDB uses the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0.txt) to strike a balance between
|
||||
open contributions and allowing you to use the software however you want.
|
||||
GreptimeDB is licensed under the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0.txt).
|
||||
|
||||
## Commercial Support
|
||||
|
||||
Running GreptimeDB in your organization?
|
||||
We offer enterprise add-ons, services, training, and consulting.
|
||||
[Contact us](https://greptime.com/contactus) for details.
|
||||
|
||||
## Contributing
|
||||
|
||||
Please refer to [contribution guidelines](CONTRIBUTING.md) and [internal concepts docs](https://docs.greptime.com/contributor-guide/overview.html) for more information.
|
||||
- Read our [Contribution Guidelines](https://github.com/GreptimeTeam/greptimedb/blob/main/CONTRIBUTING.md).
|
||||
- Explore [Internal Concepts](https://docs.greptime.com/contributor-guide/overview.html) and [DeepWiki](https://deepwiki.com/GreptimeTeam/greptimedb).
|
||||
- Pick up a [good first issue](https://github.com/GreptimeTeam/greptimedb/issues?q=is%3Aissue+is%3Aopen+label%3A%22good+first+issue%22) and join the #contributors [Slack](https://greptime.com/slack) channel.
|
||||
|
||||
## Acknowledgement
|
||||
|
||||
Special thanks to all the contributors who have propelled GreptimeDB forward. For a complete list of contributors, please refer to [AUTHOR.md](AUTHOR.md).
|
||||
Special thanks to all contributors! See [AUTHORS.md](https://github.com/GreptimeTeam/greptimedb/blob/main/AUTHOR.md).
|
||||
|
||||
- GreptimeDB uses [Apache Arrow™](https://arrow.apache.org/) as the memory model and [Apache Parquet™](https://parquet.apache.org/) as the persistent file format.
|
||||
- GreptimeDB's query engine is powered by [Apache Arrow DataFusion™](https://arrow.apache.org/datafusion/).
|
||||
- [Apache OpenDAL™](https://opendal.apache.org) gives GreptimeDB a very general and elegant data access abstraction layer.
|
||||
- GreptimeDB's meta service is based on [etcd](https://etcd.io/).
|
||||
|
||||
<img alt="Known Users" src="https://greptime.com/logo/img/users.png"/>
|
||||
- Uses [Apache Arrow™](https://arrow.apache.org/) (memory model)
|
||||
- [Apache Parquet™](https://parquet.apache.org/) (file storage)
|
||||
- [Apache Arrow DataFusion™](https://arrow.apache.org/datafusion/) (query engine)
|
||||
- [Apache OpenDAL™](https://opendal.apache.org/) (data access abstraction)
|
||||
- [etcd](https://etcd.io/) (meta service)
|
||||
|
||||
57
cyborg/bin/bump-website-version.ts
Normal file
57
cyborg/bin/bump-website-version.ts
Normal file
@@ -0,0 +1,57 @@
|
||||
/*
|
||||
* Copyright 2023 Greptime Team
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import * as core from "@actions/core";
|
||||
import {obtainClient} from "@/common";
|
||||
|
||||
async function triggerWorkflow(workflowId: string, version: string) {
|
||||
const websiteClient = obtainClient("WEBSITE_REPO_TOKEN")
|
||||
try {
|
||||
await websiteClient.rest.actions.createWorkflowDispatch({
|
||||
owner: "GreptimeTeam",
|
||||
repo: "website",
|
||||
workflow_id: workflowId,
|
||||
ref: "main",
|
||||
inputs: {
|
||||
version,
|
||||
},
|
||||
});
|
||||
console.log(`Successfully triggered ${workflowId} workflow with version ${version}`);
|
||||
} catch (error) {
|
||||
core.setFailed(`Failed to trigger workflow: ${error.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
const version = process.env.VERSION;
|
||||
if (!version) {
|
||||
core.setFailed("VERSION environment variable is required");
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
// Remove 'v' prefix if exists
|
||||
const cleanVersion = version.startsWith('v') ? version.slice(1) : version;
|
||||
|
||||
if (cleanVersion.includes('nightly')) {
|
||||
console.log('Nightly version detected, skipping workflow trigger.');
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
try {
|
||||
triggerWorkflow('bump-patch-version.yml', cleanVersion);
|
||||
} catch (error) {
|
||||
core.setFailed(`Error processing version: ${error.message}`);
|
||||
process.exit(1);
|
||||
}
|
||||
BIN
docs/architecture.png
Normal file
BIN
docs/architecture.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 173 KiB |
18
flake.lock
generated
18
flake.lock
generated
@@ -8,11 +8,11 @@
|
||||
"rust-analyzer-src": "rust-analyzer-src"
|
||||
},
|
||||
"locked": {
|
||||
"lastModified": 1737613896,
|
||||
"narHash": "sha256-ldqXIglq74C7yKMFUzrS9xMT/EVs26vZpOD68Sh7OcU=",
|
||||
"lastModified": 1745735608,
|
||||
"narHash": "sha256-L0jzm815XBFfF2wCFmR+M1CF+beIEFj6SxlqVKF59Ec=",
|
||||
"owner": "nix-community",
|
||||
"repo": "fenix",
|
||||
"rev": "303a062fdd8e89f233db05868468975d17855d80",
|
||||
"rev": "c39a78eba6ed2a022cc3218db90d485077101496",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
@@ -41,11 +41,11 @@
|
||||
},
|
||||
"nixpkgs": {
|
||||
"locked": {
|
||||
"lastModified": 1737569578,
|
||||
"narHash": "sha256-6qY0pk2QmUtBT9Mywdvif0i/CLVgpCjMUn6g9vB+f3M=",
|
||||
"lastModified": 1745487689,
|
||||
"narHash": "sha256-FQoi3R0NjQeBAsEOo49b5tbDPcJSMWc3QhhaIi9eddw=",
|
||||
"owner": "NixOS",
|
||||
"repo": "nixpkgs",
|
||||
"rev": "47addd76727f42d351590c905d9d1905ca895b82",
|
||||
"rev": "5630cf13cceac06cefe9fc607e8dfa8fb342dde3",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
@@ -65,11 +65,11 @@
|
||||
"rust-analyzer-src": {
|
||||
"flake": false,
|
||||
"locked": {
|
||||
"lastModified": 1737581772,
|
||||
"narHash": "sha256-t1P2Pe3FAX9TlJsCZbmJ3wn+C4qr6aSMypAOu8WNsN0=",
|
||||
"lastModified": 1745694049,
|
||||
"narHash": "sha256-fxvRYH/tS7hGQeg9zCVh5RBcSWT+JGJet7RA8Ss+rC0=",
|
||||
"owner": "rust-lang",
|
||||
"repo": "rust-analyzer",
|
||||
"rev": "582af7ee9c8d84f5d534272fc7de9f292bd849be",
|
||||
"rev": "d8887c0758bbd2d5f752d5bd405d4491e90e7ed6",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
|
||||
@@ -21,7 +21,7 @@
|
||||
lib = nixpkgs.lib;
|
||||
rustToolchain = fenix.packages.${system}.fromToolchainName {
|
||||
name = (lib.importTOML ./rust-toolchain.toml).toolchain.channel;
|
||||
sha256 = "sha256-f/CVA1EC61EWbh0SjaRNhLL0Ypx2ObupbzigZp8NmL4=";
|
||||
sha256 = "sha256-arzEYlWLGGYeOhECHpBxQd2joZ4rPKV3qLNnZ+eql6A=";
|
||||
};
|
||||
in
|
||||
{
|
||||
|
||||
@@ -2,30 +2,63 @@
|
||||
|
||||
## Overview
|
||||
|
||||
This repository maintains the Grafana dashboards for GreptimeDB. It has two types of dashboards:
|
||||
This repository contains Grafana dashboards for visualizing metrics and logs of GreptimeDB instances running in either cluster or standalone mode. **The Grafana version should be greater than 9.0**.
|
||||
|
||||
- `cluster/dashboard.json`: The Grafana dashboard for the GreptimeDB cluster. Read the [dashboard.md](./dashboards/cluster/dashboard.md) for more details.
|
||||
- `standalone/dashboard.json`: The Grafana dashboard for the standalone GreptimeDB instance. **It's generated from the `cluster/dashboard.json` by removing the instance filter through the `make dashboards` command**. Read the [dashboard.md](./dashboards/standalone/dashboard.md) for more details.
|
||||
We highly recommend using the self-monitoring feature provided by [GreptimeDB Operator](https://github.com/GrepTimeTeam/greptimedb-operator) to automatically collect metrics and logs from your GreptimeDB instances and store them in a dedicated GreptimeDB instance.
|
||||
|
||||
As the rapid development of GreptimeDB, the metrics may be changed, and please feel free to submit your feedback and/or contribution to this dashboard 🤗
|
||||
- **Metrics Dashboards**
|
||||
|
||||
**NOTE**:
|
||||
- `dashboards/metrics/cluster/dashboard.json`: The Grafana dashboard for the GreptimeDB cluster. Read the [dashboard.md](./dashboards/metrics/cluster/dashboard.md) for more details.
|
||||
|
||||
- `dashboards/metrics/standalone/dashboard.json`: The Grafana dashboard for the standalone GreptimeDB instance. **It's generated from the `cluster/dashboard.json` by removing the instance filter through the `make dashboards` command**. Read the [dashboard.md](./dashboards/metrics/standalone/dashboard.md) for more details.
|
||||
|
||||
- The Grafana version should be greater than 9.0.
|
||||
- **Logs Dashboard**
|
||||
|
||||
- If you want to modify the dashboards, you only need to modify the `cluster/dashboard.json` and run the `make dashboards` command to generate the `standalone/dashboard.json` and other related files.
|
||||
The `dashboards/logs/dashboard.json` provides a comprehensive Grafana dashboard for visualizing GreptimeDB logs. To utilize this dashboard effectively, you need to collect logs in JSON format from your GreptimeDB instances and store them in a dedicated GreptimeDB instance.
|
||||
|
||||
To maintain the dashboards easily, we use the [`dac`](https://github.com/zyy17/dac) tool to generate the intermediate dashboards and markdown documents:
|
||||
For proper integration, the logs table must adhere to the following schema design with the table name `_gt_logs`:
|
||||
|
||||
- `cluster/dashboard.yaml`: The intermediate dashboard for the GreptimeDB cluster.
|
||||
- `standalone/dashboard.yaml`: The intermediate dashboard for the standalone GreptimeDB instance.
|
||||
```sql
|
||||
CREATE TABLE IF NOT EXISTS `_gt_logs` (
|
||||
`pod_ip` STRING NULL,
|
||||
`namespace` STRING NULL,
|
||||
`cluster` STRING NULL,
|
||||
`file` STRING NULL,
|
||||
`module_path` STRING NULL,
|
||||
`level` STRING NULL,
|
||||
`target` STRING NULL,
|
||||
`role` STRING NULL,
|
||||
`pod` STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),
|
||||
`message` STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false'),
|
||||
`err` STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false'),
|
||||
`timestamp` TIMESTAMP(9) NOT NULL,
|
||||
TIME INDEX (`timestamp`),
|
||||
PRIMARY KEY (`level`, `target`, `role`)
|
||||
)
|
||||
ENGINE=mito
|
||||
WITH (
|
||||
append_mode = 'true'
|
||||
)
|
||||
```
|
||||
|
||||
## Development
|
||||
|
||||
As GreptimeDB evolves rapidly, metrics may change over time. We welcome your feedback and contributions to improve these dashboards 🤗
|
||||
|
||||
To modify the metrics dashboards, simply edit the `dashboards/metrics/cluster/dashboard.json` file and run the `make dashboards` command. This will automatically generate the updated `dashboards/metrics/standalone/dashboard.json` and other related files.
|
||||
|
||||
For easier dashboard maintenance, we utilize the [`dac`](https://github.com/zyy17/dac) tool to generate human-readable intermediate dashboards and documentation:
|
||||
|
||||
- `dashboards/metrics/cluster/dashboard.yaml`: The intermediate dashboard file for the GreptimeDB cluster.
|
||||
- `dashboards/metrics/standalone/dashboard.yaml`: The intermediate dashboard file for standalone GreptimeDB instances.
|
||||
|
||||
## Data Sources
|
||||
|
||||
There are two data sources for the dashboards to fetch the metrics:
|
||||
The following data sources are used to fetch metrics and logs:
|
||||
|
||||
- **Prometheus**: Expose the metrics of GreptimeDB.
|
||||
- **Information Schema**: It is the MySQL port of the current monitored instance. The `overview` dashboard will use this datasource to show the information schema of the current instance.
|
||||
- **`${metrics}`**: Prometheus data source for providing the GreptimeDB metrics.
|
||||
- **`${logs}`**: MySQL data source for providing the GreptimeDB logs.
|
||||
- **`${information_schema}`**: MySQL data source for providing the information schema of the current instance and used for the `overview` panel. It is the MySQL port of the current monitored instance.
|
||||
|
||||
## Instance Filters
|
||||
|
||||
@@ -43,9 +76,9 @@ And the legend will be like: `[{{instance}}]-[{{ pod }}]`.
|
||||
|
||||
## Deployment
|
||||
|
||||
### Helm
|
||||
### (Recommended) Helm Chart
|
||||
|
||||
If you use the Helm [chart](https://github.com/GreptimeTeam/helm-charts) to deploy a GreptimeDB cluster, you can enable self-monitoring by setting the following values in your Helm chart:
|
||||
If you use the [Helm Chart](https://github.com/GreptimeTeam/helm-charts) to deploy a GreptimeDB cluster, you can enable self-monitoring by setting the following values in your Helm chart:
|
||||
|
||||
- `monitoring.enabled=true`: Deploys a standalone GreptimeDB instance dedicated to monitoring the cluster;
|
||||
- `grafana.enabled=true`: Deploys Grafana and automatically imports the monitoring dashboard;
|
||||
@@ -85,5 +118,5 @@ The standalone GreptimeDB instance will collect metrics from your cluster, and t
|
||||
|
||||
3. **Import the dashboards based on your deployment scenario**
|
||||
|
||||
- **Cluster**: Import the `cluster/dashboard.json` dashboard.
|
||||
- **Standalone**: Import the `standalone/dashboard.json` dashboard.
|
||||
- **Cluster**: Import the `dashboards/metrics/cluster/dashboard.json` dashboard.
|
||||
- **Standalone**: Import the `dashboards/metrics/standalone/dashboard.json` dashboard.
|
||||
|
||||
292
grafana/dashboards/logs/dashboard.json
Normal file
292
grafana/dashboards/logs/dashboard.json
Normal file
@@ -0,0 +1,292 @@
|
||||
{
|
||||
"annotations": {
|
||||
"list": [
|
||||
{
|
||||
"builtIn": 1,
|
||||
"datasource": {
|
||||
"type": "grafana",
|
||||
"uid": "-- Grafana --"
|
||||
},
|
||||
"enable": true,
|
||||
"hide": true,
|
||||
"iconColor": "rgba(0, 211, 255, 1)",
|
||||
"name": "Annotations & Alerts",
|
||||
"type": "dashboard"
|
||||
}
|
||||
]
|
||||
},
|
||||
"editable": true,
|
||||
"fiscalYearStartMonth": 0,
|
||||
"graphTooltip": 0,
|
||||
"id": 12,
|
||||
"links": [],
|
||||
"panels": [
|
||||
{
|
||||
"datasource": {
|
||||
"default": false,
|
||||
"type": "mysql",
|
||||
"uid": "${datasource}"
|
||||
},
|
||||
"fieldConfig": {
|
||||
"defaults": {},
|
||||
"overrides": []
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 20,
|
||||
"w": 24,
|
||||
"x": 0,
|
||||
"y": 0
|
||||
},
|
||||
"id": 1,
|
||||
"options": {
|
||||
"dedupStrategy": "none",
|
||||
"enableInfiniteScrolling": true,
|
||||
"enableLogDetails": true,
|
||||
"prettifyLogMessage": false,
|
||||
"showCommonLabels": false,
|
||||
"showLabels": false,
|
||||
"showTime": true,
|
||||
"sortOrder": "Descending",
|
||||
"wrapLogMessage": false
|
||||
},
|
||||
"pluginVersion": "11.6.0",
|
||||
"targets": [
|
||||
{
|
||||
"dataset": "greptime_private",
|
||||
"datasource": {
|
||||
"type": "mysql",
|
||||
"uid": "${datasource}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"format": "table",
|
||||
"rawQuery": true,
|
||||
"rawSql": "SELECT `timestamp`, CONCAT('[', `level`, ']', ' ', '<', `target`, '>', ' ', `message`),\n `role`,\n `pod`,\n `pod_ip`,\n `namespace`,\n `cluster`,\n `err`,\n `file`,\n `module_path`\nFROM\n `_gt_logs`\nWHERE\n (\n \"$level\" = \"'all'\"\n OR `level` IN ($level)\n ) \n AND (\n \"$role\" = \"'all'\"\n OR `role` IN ($role)\n )\n AND (\n \"$pod\" = \"\"\n OR `pod` = '$pod'\n )\n AND (\n \"$target\" = \"\"\n OR `target` = '$target'\n )\n AND (\n \"$search\" = \"\"\n OR matches_term(`message`, '$search')\n )\n AND (\n \"$exclude\" = \"\"\n OR NOT matches_term(`message`, '$exclude')\n )\n AND $__timeFilter(`timestamp`)\nORDER BY `timestamp` DESC\nLIMIT $limit;\n",
|
||||
"refId": "A",
|
||||
"sql": {
|
||||
"columns": [
|
||||
{
|
||||
"parameters": [],
|
||||
"type": "function"
|
||||
}
|
||||
],
|
||||
"groupBy": [
|
||||
{
|
||||
"property": {
|
||||
"type": "string"
|
||||
},
|
||||
"type": "groupBy"
|
||||
}
|
||||
],
|
||||
"limit": 50
|
||||
}
|
||||
}
|
||||
],
|
||||
"title": "Logs",
|
||||
"type": "logs"
|
||||
}
|
||||
],
|
||||
"preload": false,
|
||||
"refresh": "",
|
||||
"schemaVersion": 41,
|
||||
"tags": [],
|
||||
"templating": {
|
||||
"list": [
|
||||
{
|
||||
"current": {
|
||||
"text": "logs",
|
||||
"value": "P98F38F12DB221A8C"
|
||||
},
|
||||
"includeAll": false,
|
||||
"name": "datasource",
|
||||
"options": [],
|
||||
"query": "mysql",
|
||||
"refresh": 1,
|
||||
"regex": "",
|
||||
"type": "datasource"
|
||||
},
|
||||
{
|
||||
"allValue": "'all'",
|
||||
"current": {
|
||||
"text": [
|
||||
"$__all"
|
||||
],
|
||||
"value": [
|
||||
"$__all"
|
||||
]
|
||||
},
|
||||
"includeAll": true,
|
||||
"label": "level",
|
||||
"multi": true,
|
||||
"name": "level",
|
||||
"options": [
|
||||
{
|
||||
"selected": false,
|
||||
"text": "INFO",
|
||||
"value": "INFO"
|
||||
},
|
||||
{
|
||||
"selected": false,
|
||||
"text": "ERROR",
|
||||
"value": "ERROR"
|
||||
},
|
||||
{
|
||||
"selected": false,
|
||||
"text": "WARN",
|
||||
"value": "WARN"
|
||||
},
|
||||
{
|
||||
"selected": false,
|
||||
"text": "DEBUG",
|
||||
"value": "DEBUG"
|
||||
},
|
||||
{
|
||||
"selected": false,
|
||||
"text": "TRACE",
|
||||
"value": "TRACE"
|
||||
}
|
||||
],
|
||||
"query": "INFO,ERROR,WARN,DEBUG,TRACE",
|
||||
"type": "custom"
|
||||
},
|
||||
{
|
||||
"allValue": "'all'",
|
||||
"current": {
|
||||
"text": [
|
||||
"$__all"
|
||||
],
|
||||
"value": [
|
||||
"$__all"
|
||||
]
|
||||
},
|
||||
"includeAll": true,
|
||||
"label": "role",
|
||||
"multi": true,
|
||||
"name": "role",
|
||||
"options": [
|
||||
{
|
||||
"selected": false,
|
||||
"text": "datanode",
|
||||
"value": "datanode"
|
||||
},
|
||||
{
|
||||
"selected": false,
|
||||
"text": "frontend",
|
||||
"value": "frontend"
|
||||
},
|
||||
{
|
||||
"selected": false,
|
||||
"text": "meta",
|
||||
"value": "meta"
|
||||
}
|
||||
],
|
||||
"query": "datanode,frontend,meta",
|
||||
"type": "custom"
|
||||
},
|
||||
{
|
||||
"current": {
|
||||
"text": "",
|
||||
"value": ""
|
||||
},
|
||||
"label": "pod",
|
||||
"name": "pod",
|
||||
"options": [
|
||||
{
|
||||
"selected": true,
|
||||
"text": "",
|
||||
"value": ""
|
||||
}
|
||||
],
|
||||
"query": "",
|
||||
"type": "textbox"
|
||||
},
|
||||
{
|
||||
"current": {
|
||||
"text": "",
|
||||
"value": ""
|
||||
},
|
||||
"label": "target",
|
||||
"name": "target",
|
||||
"options": [
|
||||
{
|
||||
"selected": true,
|
||||
"text": "",
|
||||
"value": ""
|
||||
}
|
||||
],
|
||||
"query": "",
|
||||
"type": "textbox"
|
||||
},
|
||||
{
|
||||
"current": {
|
||||
"text": "",
|
||||
"value": ""
|
||||
},
|
||||
"label": "search",
|
||||
"name": "search",
|
||||
"options": [
|
||||
{
|
||||
"selected": true,
|
||||
"text": "",
|
||||
"value": ""
|
||||
}
|
||||
],
|
||||
"query": "",
|
||||
"type": "textbox"
|
||||
},
|
||||
{
|
||||
"current": {
|
||||
"text": "",
|
||||
"value": ""
|
||||
},
|
||||
"label": "exclude",
|
||||
"name": "exclude",
|
||||
"options": [
|
||||
{
|
||||
"selected": true,
|
||||
"text": "",
|
||||
"value": ""
|
||||
}
|
||||
],
|
||||
"query": "",
|
||||
"type": "textbox"
|
||||
},
|
||||
{
|
||||
"current": {
|
||||
"text": "2000",
|
||||
"value": "2000"
|
||||
},
|
||||
"includeAll": false,
|
||||
"label": "limit",
|
||||
"name": "limit",
|
||||
"options": [
|
||||
{
|
||||
"selected": true,
|
||||
"text": "2000",
|
||||
"value": "2000"
|
||||
},
|
||||
{
|
||||
"selected": false,
|
||||
"text": "5000",
|
||||
"value": "5000"
|
||||
},
|
||||
{
|
||||
"selected": false,
|
||||
"text": "8000",
|
||||
"value": "8000"
|
||||
}
|
||||
],
|
||||
"query": "2000,5000,8000",
|
||||
"type": "custom"
|
||||
}
|
||||
]
|
||||
},
|
||||
"time": {
|
||||
"from": "now-6h",
|
||||
"to": "now"
|
||||
},
|
||||
"timepicker": {},
|
||||
"timezone": "browser",
|
||||
"title": "GreptimeDB Logs",
|
||||
"uid": "edx5veo4rd3wge2",
|
||||
"version": 1
|
||||
}
|
||||
@@ -1,6 +1,6 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
DASHBOARD_DIR=${1:-grafana/dashboards}
|
||||
DASHBOARD_DIR=${1:-grafana/dashboards/metrics}
|
||||
|
||||
check_dashboard_description() {
|
||||
for dashboard in $(find $DASHBOARD_DIR -name "*.json"); do
|
||||
@@ -25,7 +25,7 @@ check_dashboard_description() {
|
||||
check_dashboards_generation() {
|
||||
./grafana/scripts/gen-dashboards.sh
|
||||
|
||||
if [[ -n "$(git diff --name-only grafana/dashboards)" ]]; then
|
||||
if [[ -n "$(git diff --name-only grafana/dashboards/metrics)" ]]; then
|
||||
echo "Error: The dashboards are not generated correctly. You should execute the `make dashboards` command."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
#! /usr/bin/env bash
|
||||
|
||||
CLUSTER_DASHBOARD_DIR=${1:-grafana/dashboards/cluster}
|
||||
STANDALONE_DASHBOARD_DIR=${2:-grafana/dashboards/standalone}
|
||||
CLUSTER_DASHBOARD_DIR=${1:-grafana/dashboards/metrics/cluster}
|
||||
STANDALONE_DASHBOARD_DIR=${2:-grafana/dashboards/metrics/standalone}
|
||||
DAC_IMAGE=ghcr.io/zyy17/dac:20250423-522bd35
|
||||
|
||||
remove_instance_filters() {
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
[toolchain]
|
||||
channel = "nightly-2024-12-25"
|
||||
channel = "nightly-2025-04-15"
|
||||
|
||||
@@ -1050,7 +1050,7 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue {
|
||||
Value::Int64(v) => Some(ValueData::I64Value(v)),
|
||||
Value::Float32(v) => Some(ValueData::F32Value(*v)),
|
||||
Value::Float64(v) => Some(ValueData::F64Value(*v)),
|
||||
Value::String(v) => Some(ValueData::StringValue(v.as_utf8().to_string())),
|
||||
Value::String(v) => Some(ValueData::StringValue(v.into_string())),
|
||||
Value::Binary(v) => Some(ValueData::BinaryValue(v.to_vec())),
|
||||
Value::Date(v) => Some(ValueData::DateValue(v.val())),
|
||||
Value::Timestamp(v) => Some(match v.unit() {
|
||||
|
||||
@@ -36,7 +36,7 @@ pub fn userinfo_by_name(username: Option<String>) -> UserInfoRef {
|
||||
}
|
||||
|
||||
pub fn user_provider_from_option(opt: &String) -> Result<UserProviderRef> {
|
||||
let (name, content) = opt.split_once(':').context(InvalidConfigSnafu {
|
||||
let (name, content) = opt.split_once(':').with_context(|| InvalidConfigSnafu {
|
||||
value: opt.to_string(),
|
||||
msg: "UserProviderOption must be in format `<option>:<value>`",
|
||||
})?;
|
||||
@@ -57,6 +57,24 @@ pub fn user_provider_from_option(opt: &String) -> Result<UserProviderRef> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn static_user_provider_from_option(opt: &String) -> Result<StaticUserProvider> {
|
||||
let (name, content) = opt.split_once(':').with_context(|| InvalidConfigSnafu {
|
||||
value: opt.to_string(),
|
||||
msg: "UserProviderOption must be in format `<option>:<value>`",
|
||||
})?;
|
||||
match name {
|
||||
STATIC_USER_PROVIDER => {
|
||||
let provider = StaticUserProvider::new(content)?;
|
||||
Ok(provider)
|
||||
}
|
||||
_ => InvalidConfigSnafu {
|
||||
value: name.to_string(),
|
||||
msg: format!("Invalid UserProviderOption, expect only {STATIC_USER_PROVIDER}"),
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
type Username<'a> = &'a str;
|
||||
type HostOrIp<'a> = &'a str;
|
||||
|
||||
|
||||
@@ -38,6 +38,14 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to convert to utf8"))]
|
||||
FromUtf8 {
|
||||
#[snafu(source)]
|
||||
error: std::string::FromUtf8Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Authentication source failure"))]
|
||||
AuthBackend {
|
||||
#[snafu(implicit)]
|
||||
@@ -85,7 +93,7 @@ impl ErrorExt for Error {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
match self {
|
||||
Error::InvalidConfig { .. } => StatusCode::InvalidArguments,
|
||||
Error::IllegalParam { .. } => StatusCode::InvalidArguments,
|
||||
Error::IllegalParam { .. } | Error::FromUtf8 { .. } => StatusCode::InvalidArguments,
|
||||
Error::FileWatch { .. } => StatusCode::InvalidArguments,
|
||||
Error::InternalState { .. } => StatusCode::Unexpected,
|
||||
Error::Io { .. } => StatusCode::StorageUnavailable,
|
||||
|
||||
@@ -22,10 +22,12 @@ mod user_provider;
|
||||
pub mod tests;
|
||||
|
||||
pub use common::{
|
||||
auth_mysql, user_provider_from_option, userinfo_by_name, HashedPassword, Identity, Password,
|
||||
auth_mysql, static_user_provider_from_option, user_provider_from_option, userinfo_by_name,
|
||||
HashedPassword, Identity, Password,
|
||||
};
|
||||
pub use permission::{PermissionChecker, PermissionReq, PermissionResp};
|
||||
pub use user_info::UserInfo;
|
||||
pub use user_provider::static_user_provider::StaticUserProvider;
|
||||
pub use user_provider::UserProvider;
|
||||
|
||||
/// pub type alias
|
||||
|
||||
@@ -15,15 +15,15 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use snafu::OptionExt;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::error::{InvalidConfigSnafu, Result};
|
||||
use crate::error::{FromUtf8Snafu, InvalidConfigSnafu, Result};
|
||||
use crate::user_provider::{authenticate_with_credential, load_credential_from_file};
|
||||
use crate::{Identity, Password, UserInfoRef, UserProvider};
|
||||
|
||||
pub(crate) const STATIC_USER_PROVIDER: &str = "static_user_provider";
|
||||
|
||||
pub(crate) struct StaticUserProvider {
|
||||
pub struct StaticUserProvider {
|
||||
users: HashMap<String, Vec<u8>>,
|
||||
}
|
||||
|
||||
@@ -60,6 +60,18 @@ impl StaticUserProvider {
|
||||
.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Return a random username/password pair
|
||||
/// This is useful for invoking from other components in the cluster
|
||||
pub fn get_one_user_pwd(&self) -> Result<(String, String)> {
|
||||
let kv = self.users.iter().next().context(InvalidConfigSnafu {
|
||||
value: "",
|
||||
msg: "Expect at least one pair of username and password",
|
||||
})?;
|
||||
let username = kv.0;
|
||||
let pwd = String::from_utf8(kv.1.clone()).context(FromUtf8Snafu)?;
|
||||
Ok((username.clone(), pwd))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
||||
@@ -84,12 +84,6 @@ mod tests {
|
||||
let key1 = "3178510";
|
||||
let key2 = "4215648";
|
||||
|
||||
// have collision
|
||||
assert_eq!(
|
||||
oid_map.hasher.hash_one(key1) as u32,
|
||||
oid_map.hasher.hash_one(key2) as u32
|
||||
);
|
||||
|
||||
// insert them into oid_map
|
||||
let oid1 = oid_map.get_oid(key1);
|
||||
let oid2 = oid_map.get_oid(key2);
|
||||
|
||||
@@ -51,7 +51,6 @@ opendal = { version = "0.51.1", features = [
|
||||
query.workspace = true
|
||||
rand.workspace = true
|
||||
reqwest.workspace = true
|
||||
rustyline = "10.1"
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
servers.workspace = true
|
||||
|
||||
@@ -1,154 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::error::{Error, InvalidReplCommandSnafu, Result};
|
||||
|
||||
/// Represents the parsed command from the user (which may be over many lines)
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub(crate) enum ReplCommand {
|
||||
Help,
|
||||
UseDatabase { db_name: String },
|
||||
Sql { sql: String },
|
||||
Exit,
|
||||
}
|
||||
|
||||
impl TryFrom<&str> for ReplCommand {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(input: &str) -> Result<Self> {
|
||||
let input = input.trim();
|
||||
if input.is_empty() {
|
||||
return InvalidReplCommandSnafu {
|
||||
reason: "No command specified".to_string(),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
|
||||
// If line ends with ';', it must be treated as a complete input.
|
||||
// However, the opposite is not true.
|
||||
let input_is_completed = input.ends_with(';');
|
||||
|
||||
let input = input.strip_suffix(';').map(|x| x.trim()).unwrap_or(input);
|
||||
let lowercase = input.to_lowercase();
|
||||
match lowercase.as_str() {
|
||||
"help" => Ok(Self::Help),
|
||||
"exit" | "quit" => Ok(Self::Exit),
|
||||
_ => match input.split_once(' ') {
|
||||
Some((maybe_use, database)) if maybe_use.to_lowercase() == "use" => {
|
||||
Ok(Self::UseDatabase {
|
||||
db_name: database.trim().to_string(),
|
||||
})
|
||||
}
|
||||
// Any valid SQL must contains at least one whitespace.
|
||||
Some(_) if input_is_completed => Ok(Self::Sql {
|
||||
sql: input.to_string(),
|
||||
}),
|
||||
_ => InvalidReplCommandSnafu {
|
||||
reason: format!("unknown command '{input}', maybe input is not completed"),
|
||||
}
|
||||
.fail(),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ReplCommand {
|
||||
pub fn help() -> &'static str {
|
||||
r#"
|
||||
Available commands (case insensitive):
|
||||
- 'help': print this help
|
||||
- 'exit' or 'quit': exit the REPL
|
||||
- 'use <your database name>': switch to another database/schema context
|
||||
- Other typed in text will be treated as SQL.
|
||||
You can enter new line while typing, just remember to end it with ';'.
|
||||
"#
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::error::Error::InvalidReplCommand;
|
||||
|
||||
#[test]
|
||||
fn test_from_str() {
|
||||
fn test_ok(s: &str, expected: ReplCommand) {
|
||||
let actual: ReplCommand = s.try_into().unwrap();
|
||||
assert_eq!(expected, actual, "'{}'", s);
|
||||
}
|
||||
|
||||
fn test_err(s: &str) {
|
||||
let result: Result<ReplCommand> = s.try_into();
|
||||
assert!(matches!(result, Err(InvalidReplCommand { .. })))
|
||||
}
|
||||
|
||||
test_err("");
|
||||
test_err(" ");
|
||||
test_err("\t");
|
||||
|
||||
test_ok("help", ReplCommand::Help);
|
||||
test_ok("help", ReplCommand::Help);
|
||||
test_ok(" help", ReplCommand::Help);
|
||||
test_ok(" help ", ReplCommand::Help);
|
||||
test_ok(" HELP ", ReplCommand::Help);
|
||||
test_ok(" Help; ", ReplCommand::Help);
|
||||
test_ok(" help ; ", ReplCommand::Help);
|
||||
|
||||
test_ok("exit", ReplCommand::Exit);
|
||||
test_ok("exit;", ReplCommand::Exit);
|
||||
test_ok("exit ;", ReplCommand::Exit);
|
||||
test_ok("EXIT", ReplCommand::Exit);
|
||||
|
||||
test_ok("quit", ReplCommand::Exit);
|
||||
test_ok("quit;", ReplCommand::Exit);
|
||||
test_ok("quit ;", ReplCommand::Exit);
|
||||
test_ok("QUIT", ReplCommand::Exit);
|
||||
|
||||
test_ok(
|
||||
"use Foo",
|
||||
ReplCommand::UseDatabase {
|
||||
db_name: "Foo".to_string(),
|
||||
},
|
||||
);
|
||||
test_ok(
|
||||
" use Foo ; ",
|
||||
ReplCommand::UseDatabase {
|
||||
db_name: "Foo".to_string(),
|
||||
},
|
||||
);
|
||||
// ensure that database name is case sensitive
|
||||
test_ok(
|
||||
" use FOO ; ",
|
||||
ReplCommand::UseDatabase {
|
||||
db_name: "FOO".to_string(),
|
||||
},
|
||||
);
|
||||
|
||||
// ensure that we aren't messing with capitalization
|
||||
test_ok(
|
||||
"SELECT * from foo;",
|
||||
ReplCommand::Sql {
|
||||
sql: "SELECT * from foo".to_string(),
|
||||
},
|
||||
);
|
||||
// Input line (that don't belong to any other cases above) must ends with ';' to make it a valid SQL.
|
||||
test_err("insert blah");
|
||||
test_ok(
|
||||
"insert blah;",
|
||||
ReplCommand::Sql {
|
||||
sql: "insert blah".to_string(),
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -101,9 +101,6 @@ pub enum Error {
|
||||
error: reqwest::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid REPL command: {reason}"))]
|
||||
InvalidReplCommand { reason: String },
|
||||
|
||||
#[snafu(display("Failed to parse SQL: {}", sql))]
|
||||
ParseSql {
|
||||
sql: String,
|
||||
@@ -254,7 +251,6 @@ impl ErrorExt for Error {
|
||||
Error::MissingConfig { .. }
|
||||
| Error::LoadLayeredConfig { .. }
|
||||
| Error::IllegalConfig { .. }
|
||||
| Error::InvalidReplCommand { .. }
|
||||
| Error::InitTimezone { .. }
|
||||
| Error::ConnectEtcd { .. }
|
||||
| Error::CreateDir { .. }
|
||||
|
||||
@@ -1,112 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::borrow::Cow;
|
||||
|
||||
use rustyline::completion::Completer;
|
||||
use rustyline::highlight::{Highlighter, MatchingBracketHighlighter};
|
||||
use rustyline::hint::{Hinter, HistoryHinter};
|
||||
use rustyline::validate::{ValidationContext, ValidationResult, Validator};
|
||||
|
||||
use crate::cmd::ReplCommand;
|
||||
|
||||
pub(crate) struct RustylineHelper {
|
||||
hinter: HistoryHinter,
|
||||
highlighter: MatchingBracketHighlighter,
|
||||
}
|
||||
|
||||
impl Default for RustylineHelper {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
hinter: HistoryHinter {},
|
||||
highlighter: MatchingBracketHighlighter::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl rustyline::Helper for RustylineHelper {}
|
||||
|
||||
impl Validator for RustylineHelper {
|
||||
fn validate(&self, ctx: &mut ValidationContext<'_>) -> rustyline::Result<ValidationResult> {
|
||||
let input = ctx.input();
|
||||
match ReplCommand::try_from(input) {
|
||||
Ok(_) => Ok(ValidationResult::Valid(None)),
|
||||
Err(e) => {
|
||||
if input.trim_end().ends_with(';') {
|
||||
// If line ends with ';', it HAS to be a valid command.
|
||||
Ok(ValidationResult::Invalid(Some(e.to_string())))
|
||||
} else {
|
||||
Ok(ValidationResult::Incomplete)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Hinter for RustylineHelper {
|
||||
type Hint = String;
|
||||
|
||||
fn hint(&self, line: &str, pos: usize, ctx: &rustyline::Context<'_>) -> Option<Self::Hint> {
|
||||
self.hinter.hint(line, pos, ctx)
|
||||
}
|
||||
}
|
||||
|
||||
impl Highlighter for RustylineHelper {
|
||||
fn highlight<'l>(&self, line: &'l str, pos: usize) -> Cow<'l, str> {
|
||||
self.highlighter.highlight(line, pos)
|
||||
}
|
||||
|
||||
fn highlight_prompt<'b, 's: 'b, 'p: 'b>(
|
||||
&'s self,
|
||||
prompt: &'p str,
|
||||
default: bool,
|
||||
) -> Cow<'b, str> {
|
||||
self.highlighter.highlight_prompt(prompt, default)
|
||||
}
|
||||
|
||||
fn highlight_hint<'h>(&self, hint: &'h str) -> Cow<'h, str> {
|
||||
use nu_ansi_term::Style;
|
||||
Cow::Owned(Style::new().dimmed().paint(hint).to_string())
|
||||
}
|
||||
|
||||
fn highlight_candidate<'c>(
|
||||
&self,
|
||||
candidate: &'c str,
|
||||
completion: rustyline::CompletionType,
|
||||
) -> Cow<'c, str> {
|
||||
self.highlighter.highlight_candidate(candidate, completion)
|
||||
}
|
||||
|
||||
fn highlight_char(&self, line: &str, pos: usize) -> bool {
|
||||
self.highlighter.highlight_char(line, pos)
|
||||
}
|
||||
}
|
||||
|
||||
impl Completer for RustylineHelper {
|
||||
type Candidate = String;
|
||||
|
||||
fn complete(
|
||||
&self,
|
||||
line: &str,
|
||||
pos: usize,
|
||||
ctx: &rustyline::Context<'_>,
|
||||
) -> rustyline::Result<(usize, Vec<Self::Candidate>)> {
|
||||
// If there is a hint, use that as the auto-complete when user hits `tab`
|
||||
if let Some(hint) = self.hinter.hint(line, pos, ctx) {
|
||||
Ok((pos, vec![hint]))
|
||||
} else {
|
||||
Ok((0, vec![]))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -13,15 +13,9 @@
|
||||
// limitations under the License.
|
||||
|
||||
mod bench;
|
||||
pub mod error;
|
||||
// Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373
|
||||
#[allow(unused)]
|
||||
mod cmd;
|
||||
mod export;
|
||||
mod helper;
|
||||
|
||||
// Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373
|
||||
mod database;
|
||||
pub mod error;
|
||||
mod export;
|
||||
mod import;
|
||||
|
||||
use async_trait::async_trait;
|
||||
|
||||
@@ -36,17 +36,17 @@ use common_grpc::flight::{FlightDecoder, FlightMessage};
|
||||
use common_query::Output;
|
||||
use common_recordbatch::error::ExternalSnafu;
|
||||
use common_recordbatch::RecordBatchStreamWrapper;
|
||||
use common_telemetry::error;
|
||||
use common_telemetry::tracing_context::W3cTrace;
|
||||
use common_telemetry::{error, warn};
|
||||
use futures::future;
|
||||
use futures_util::{Stream, StreamExt, TryStreamExt};
|
||||
use prost::Message;
|
||||
use snafu::{ensure, ResultExt};
|
||||
use tonic::metadata::{AsciiMetadataKey, MetadataValue};
|
||||
use tonic::metadata::{AsciiMetadataKey, AsciiMetadataValue, MetadataMap, MetadataValue};
|
||||
use tonic::transport::Channel;
|
||||
|
||||
use crate::error::{
|
||||
ConvertFlightDataSnafu, Error, FlightGetSnafu, IllegalFlightMessagesSnafu, InvalidAsciiSnafu,
|
||||
ConvertFlightDataSnafu, Error, FlightGetSnafu, IllegalFlightMessagesSnafu,
|
||||
InvalidTonicMetadataValueSnafu, ServerSnafu,
|
||||
};
|
||||
use crate::{from_grpc_response, Client, Result};
|
||||
@@ -165,26 +165,27 @@ impl Database {
|
||||
|
||||
let mut request = tonic::Request::new(request);
|
||||
let metadata = request.metadata_mut();
|
||||
for (key, value) in hints {
|
||||
let key = AsciiMetadataKey::from_bytes(format!("x-greptime-hint-{}", key).as_bytes())
|
||||
.map_err(|_| {
|
||||
InvalidAsciiSnafu {
|
||||
value: key.to_string(),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
let value = value.parse().map_err(|_| {
|
||||
InvalidAsciiSnafu {
|
||||
value: value.to_string(),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
metadata.insert(key, value);
|
||||
}
|
||||
Self::put_hints(metadata, hints)?;
|
||||
|
||||
let response = client.handle(request).await?.into_inner();
|
||||
from_grpc_response(response)
|
||||
}
|
||||
|
||||
fn put_hints(metadata: &mut MetadataMap, hints: &[(&str, &str)]) -> Result<()> {
|
||||
let Some(value) = hints
|
||||
.iter()
|
||||
.map(|(k, v)| format!("{}={}", k, v))
|
||||
.reduce(|a, b| format!("{},{}", a, b))
|
||||
else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let key = AsciiMetadataKey::from_static("x-greptime-hints");
|
||||
let value = AsciiMetadataValue::from_str(&value).context(InvalidTonicMetadataValueSnafu)?;
|
||||
metadata.insert(key, value);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn handle(&self, request: Request) -> Result<u32> {
|
||||
let mut client = make_database_client(&self.client)?.inner;
|
||||
let request = self.to_rpc_request(request);
|
||||
@@ -192,6 +193,36 @@ impl Database {
|
||||
from_grpc_response(response)
|
||||
}
|
||||
|
||||
/// Retry if connection fails, max_retries is the max number of retries, so the total wait time
|
||||
/// is `max_retries * GRPC_CONN_TIMEOUT`
|
||||
pub async fn handle_with_retry(&self, request: Request, max_retries: u32) -> Result<u32> {
|
||||
let mut client = make_database_client(&self.client)?.inner;
|
||||
let mut retries = 0;
|
||||
let request = self.to_rpc_request(request);
|
||||
loop {
|
||||
let raw_response = client.handle(request.clone()).await;
|
||||
match (raw_response, retries < max_retries) {
|
||||
(Ok(resp), _) => return from_grpc_response(resp.into_inner()),
|
||||
(Err(err), true) => {
|
||||
// determine if the error is retryable
|
||||
if is_grpc_retryable(&err) {
|
||||
// retry
|
||||
retries += 1;
|
||||
warn!("Retrying {} times with error = {:?}", retries, err);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
(Err(err), false) => {
|
||||
error!(
|
||||
"Failed to send request to grpc handle after {} retries, error = {:?}",
|
||||
retries, err
|
||||
);
|
||||
return Err(err.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn to_rpc_request(&self, request: Request) -> GreptimeRequest {
|
||||
GreptimeRequest {
|
||||
@@ -212,39 +243,49 @@ impl Database {
|
||||
where
|
||||
S: AsRef<str>,
|
||||
{
|
||||
self.do_get(Request::Query(QueryRequest {
|
||||
self.sql_with_hint(sql, &[]).await
|
||||
}
|
||||
|
||||
pub async fn sql_with_hint<S>(&self, sql: S, hints: &[(&str, &str)]) -> Result<Output>
|
||||
where
|
||||
S: AsRef<str>,
|
||||
{
|
||||
let request = Request::Query(QueryRequest {
|
||||
query: Some(Query::Sql(sql.as_ref().to_string())),
|
||||
}))
|
||||
.await
|
||||
});
|
||||
self.do_get(request, hints).await
|
||||
}
|
||||
|
||||
pub async fn logical_plan(&self, logical_plan: Vec<u8>) -> Result<Output> {
|
||||
self.do_get(Request::Query(QueryRequest {
|
||||
let request = Request::Query(QueryRequest {
|
||||
query: Some(Query::LogicalPlan(logical_plan)),
|
||||
}))
|
||||
.await
|
||||
});
|
||||
self.do_get(request, &[]).await
|
||||
}
|
||||
|
||||
pub async fn create(&self, expr: CreateTableExpr) -> Result<Output> {
|
||||
self.do_get(Request::Ddl(DdlRequest {
|
||||
let request = Request::Ddl(DdlRequest {
|
||||
expr: Some(DdlExpr::CreateTable(expr)),
|
||||
}))
|
||||
.await
|
||||
});
|
||||
self.do_get(request, &[]).await
|
||||
}
|
||||
|
||||
pub async fn alter(&self, expr: AlterTableExpr) -> Result<Output> {
|
||||
self.do_get(Request::Ddl(DdlRequest {
|
||||
let request = Request::Ddl(DdlRequest {
|
||||
expr: Some(DdlExpr::AlterTable(expr)),
|
||||
}))
|
||||
.await
|
||||
});
|
||||
self.do_get(request, &[]).await
|
||||
}
|
||||
|
||||
async fn do_get(&self, request: Request) -> Result<Output> {
|
||||
async fn do_get(&self, request: Request, hints: &[(&str, &str)]) -> Result<Output> {
|
||||
let request = self.to_rpc_request(request);
|
||||
let request = Ticket {
|
||||
ticket: request.encode_to_vec().into(),
|
||||
};
|
||||
|
||||
let mut request = tonic::Request::new(request);
|
||||
Self::put_hints(request.metadata_mut(), hints)?;
|
||||
|
||||
let mut client = self.client.make_flight_client()?;
|
||||
|
||||
let response = client.mut_inner().do_get(request).await.or_else(|e| {
|
||||
@@ -368,6 +409,11 @@ impl Database {
|
||||
}
|
||||
}
|
||||
|
||||
/// by grpc standard, only `Unavailable` is retryable, see: https://github.com/grpc/grpc/blob/master/doc/statuscodes.md#status-codes-and-their-use-in-grpc
|
||||
pub fn is_grpc_retryable(err: &tonic::Status) -> bool {
|
||||
matches!(err.code(), tonic::Code::Unavailable)
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Clone)]
|
||||
struct FlightContext {
|
||||
auth_header: Option<AuthHeader>,
|
||||
|
||||
@@ -110,13 +110,6 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to parse ascii string: {}", value))]
|
||||
InvalidAscii {
|
||||
value: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid Tonic metadata value"))]
|
||||
InvalidTonicMetadataValue {
|
||||
#[snafu(source)]
|
||||
@@ -143,10 +136,7 @@ impl ErrorExt for Error {
|
||||
| Error::ConvertFlightData { source, .. }
|
||||
| Error::CreateTlsChannel { source, .. } => source.status_code(),
|
||||
Error::IllegalGrpcClientState { .. } => StatusCode::Unexpected,
|
||||
|
||||
Error::InvalidAscii { .. } | Error::InvalidTonicMetadataValue { .. } => {
|
||||
StatusCode::InvalidArguments
|
||||
}
|
||||
Error::InvalidTonicMetadataValue { .. } => StatusCode::InvalidArguments,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -15,9 +15,11 @@
|
||||
#![doc = include_str!("../../../../README.md")]
|
||||
|
||||
use clap::{Parser, Subcommand};
|
||||
use cmd::datanode::builder::InstanceBuilder;
|
||||
use cmd::error::{InitTlsProviderSnafu, Result};
|
||||
use cmd::options::GlobalOptions;
|
||||
use cmd::{cli, datanode, flownode, frontend, metasrv, standalone, App};
|
||||
use common_base::Plugins;
|
||||
use common_version::version;
|
||||
use servers::install_ring_crypto_provider;
|
||||
|
||||
@@ -102,10 +104,10 @@ async fn main_body() -> Result<()> {
|
||||
async fn start(cli: Command) -> Result<()> {
|
||||
match cli.subcmd {
|
||||
SubCommand::Datanode(cmd) => {
|
||||
cmd.build(cmd.load_options(&cli.global_options)?)
|
||||
.await?
|
||||
.run()
|
||||
.await
|
||||
let opts = cmd.load_options(&cli.global_options)?;
|
||||
let plugins = Plugins::new();
|
||||
let builder = InstanceBuilder::try_new_with_init(opts, plugins).await?;
|
||||
cmd.build_with(builder).await?.run().await
|
||||
}
|
||||
SubCommand::Flownode(cmd) => {
|
||||
cmd.build(cmd.load_options(&cli.global_options)?)
|
||||
|
||||
@@ -58,7 +58,7 @@ impl App for Instance {
|
||||
false
|
||||
}
|
||||
|
||||
async fn stop(&self) -> Result<()> {
|
||||
async fn stop(&mut self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,33 +12,27 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
pub mod builder;
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use cache::build_datanode_cache_registry;
|
||||
use catalog::kvbackend::MetaKvBackend;
|
||||
use clap::Parser;
|
||||
use common_base::Plugins;
|
||||
use common_config::Configurable;
|
||||
use common_meta::cache::LayeredCacheRegistryBuilder;
|
||||
use common_telemetry::logging::TracingOptions;
|
||||
use common_telemetry::{info, warn};
|
||||
use common_version::{short_version, version};
|
||||
use common_wal::config::DatanodeWalConfig;
|
||||
use datanode::datanode::{Datanode, DatanodeBuilder};
|
||||
use datanode::service::DatanodeServiceBuilder;
|
||||
use meta_client::{MetaClientOptions, MetaClientType};
|
||||
use servers::Mode;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use datanode::datanode::Datanode;
|
||||
use meta_client::MetaClientOptions;
|
||||
use snafu::{ensure, ResultExt};
|
||||
use tracing_appender::non_blocking::WorkerGuard;
|
||||
|
||||
use crate::datanode::builder::InstanceBuilder;
|
||||
use crate::error::{
|
||||
LoadLayeredConfigSnafu, MetaClientInitSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu,
|
||||
StartDatanodeSnafu,
|
||||
LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu,
|
||||
};
|
||||
use crate::options::{GlobalOptions, GreptimeOptions};
|
||||
use crate::{log_versions, App};
|
||||
use crate::App;
|
||||
|
||||
pub const APP_NAME: &str = "greptime-datanode";
|
||||
|
||||
@@ -83,7 +77,7 @@ impl App for Instance {
|
||||
self.datanode.start().await.context(StartDatanodeSnafu)
|
||||
}
|
||||
|
||||
async fn stop(&self) -> Result<()> {
|
||||
async fn stop(&mut self) -> Result<()> {
|
||||
self.datanode
|
||||
.shutdown()
|
||||
.await
|
||||
@@ -98,8 +92,8 @@ pub struct Command {
|
||||
}
|
||||
|
||||
impl Command {
|
||||
pub async fn build(&self, opts: DatanodeOptions) -> Result<Instance> {
|
||||
self.subcmd.build(opts).await
|
||||
pub async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
|
||||
self.subcmd.build_with(builder).await
|
||||
}
|
||||
|
||||
pub fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
|
||||
@@ -115,9 +109,12 @@ enum SubCommand {
|
||||
}
|
||||
|
||||
impl SubCommand {
|
||||
async fn build(&self, opts: DatanodeOptions) -> Result<Instance> {
|
||||
async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
|
||||
match self {
|
||||
SubCommand::Start(cmd) => cmd.build(opts).await,
|
||||
SubCommand::Start(cmd) => {
|
||||
info!("Building datanode with {:#?}", cmd);
|
||||
builder.build().await
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -263,74 +260,6 @@ impl StartCommand {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn build(&self, opts: DatanodeOptions) -> Result<Instance> {
|
||||
common_runtime::init_global_runtimes(&opts.runtime);
|
||||
|
||||
let guard = common_telemetry::init_global_logging(
|
||||
APP_NAME,
|
||||
&opts.component.logging,
|
||||
&opts.component.tracing,
|
||||
opts.component.node_id.map(|x| x.to_string()),
|
||||
);
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
|
||||
info!("Datanode start command: {:#?}", self);
|
||||
info!("Datanode options: {:#?}", opts);
|
||||
|
||||
let plugin_opts = opts.plugins;
|
||||
let mut opts = opts.component;
|
||||
opts.grpc.detect_server_addr();
|
||||
let mut plugins = Plugins::new();
|
||||
plugins::setup_datanode_plugins(&mut plugins, &plugin_opts, &opts)
|
||||
.await
|
||||
.context(StartDatanodeSnafu)?;
|
||||
|
||||
let member_id = opts
|
||||
.node_id
|
||||
.context(MissingConfigSnafu { msg: "'node_id'" })?;
|
||||
|
||||
let meta_config = opts.meta_client.as_ref().context(MissingConfigSnafu {
|
||||
msg: "'meta_client_options'",
|
||||
})?;
|
||||
|
||||
let meta_client = meta_client::create_meta_client(
|
||||
MetaClientType::Datanode { member_id },
|
||||
meta_config,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.context(MetaClientInitSnafu)?;
|
||||
|
||||
let meta_backend = Arc::new(MetaKvBackend {
|
||||
client: meta_client.clone(),
|
||||
});
|
||||
|
||||
// Builds cache registry for datanode.
|
||||
let layered_cache_registry = Arc::new(
|
||||
LayeredCacheRegistryBuilder::default()
|
||||
.add_cache_registry(build_datanode_cache_registry(meta_backend.clone()))
|
||||
.build(),
|
||||
);
|
||||
|
||||
let mut datanode = DatanodeBuilder::new(opts.clone(), plugins, Mode::Distributed)
|
||||
.with_meta_client(meta_client)
|
||||
.with_kv_backend(meta_backend)
|
||||
.with_cache_registry(layered_cache_registry)
|
||||
.build()
|
||||
.await
|
||||
.context(StartDatanodeSnafu)?;
|
||||
|
||||
let services = DatanodeServiceBuilder::new(&opts)
|
||||
.with_default_grpc_server(&datanode.region_server())
|
||||
.enable_http_service()
|
||||
.build()
|
||||
.await
|
||||
.context(StartDatanodeSnafu)?;
|
||||
datanode.setup_services(services);
|
||||
|
||||
Ok(Instance::new(datanode, guard))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -352,7 +281,6 @@ mod tests {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let mut file = create_named_temp_file();
|
||||
let toml_str = r#"
|
||||
mode = "distributed"
|
||||
enable_memory_catalog = false
|
||||
node_id = 42
|
||||
|
||||
@@ -379,7 +307,6 @@ mod tests {
|
||||
fn test_read_from_config_file() {
|
||||
let mut file = create_named_temp_file();
|
||||
let toml_str = r#"
|
||||
mode = "distributed"
|
||||
enable_memory_catalog = false
|
||||
node_id = 42
|
||||
|
||||
@@ -545,7 +472,6 @@ mod tests {
|
||||
fn test_config_precedence_order() {
|
||||
let mut file = create_named_temp_file();
|
||||
let toml_str = r#"
|
||||
mode = "distributed"
|
||||
enable_memory_catalog = false
|
||||
node_id = 42
|
||||
rpc_addr = "127.0.0.1:3001"
|
||||
|
||||
137
src/cmd/src/datanode/builder.rs
Normal file
137
src/cmd/src/datanode/builder.rs
Normal file
@@ -0,0 +1,137 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use cache::build_datanode_cache_registry;
|
||||
use catalog::kvbackend::MetaKvBackend;
|
||||
use common_base::Plugins;
|
||||
use common_meta::cache::LayeredCacheRegistryBuilder;
|
||||
use common_telemetry::info;
|
||||
use common_version::{short_version, version};
|
||||
use datanode::datanode::DatanodeBuilder;
|
||||
use datanode::service::DatanodeServiceBuilder;
|
||||
use meta_client::MetaClientType;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use tracing_appender::non_blocking::WorkerGuard;
|
||||
|
||||
use crate::datanode::{DatanodeOptions, Instance, APP_NAME};
|
||||
use crate::error::{MetaClientInitSnafu, MissingConfigSnafu, Result, StartDatanodeSnafu};
|
||||
use crate::log_versions;
|
||||
|
||||
/// Builder for Datanode instance.
|
||||
pub struct InstanceBuilder {
|
||||
guard: Vec<WorkerGuard>,
|
||||
opts: DatanodeOptions,
|
||||
datanode_builder: DatanodeBuilder,
|
||||
}
|
||||
|
||||
impl InstanceBuilder {
|
||||
/// Try to create a new [InstanceBuilder], and do some initialization work like allocating
|
||||
/// runtime resources, setting up global logging and plugins, etc.
|
||||
pub async fn try_new_with_init(
|
||||
mut opts: DatanodeOptions,
|
||||
mut plugins: Plugins,
|
||||
) -> Result<Self> {
|
||||
let guard = Self::init(&mut opts, &mut plugins).await?;
|
||||
|
||||
let datanode_builder = Self::datanode_builder(&opts, plugins).await?;
|
||||
|
||||
Ok(Self {
|
||||
guard,
|
||||
opts,
|
||||
datanode_builder,
|
||||
})
|
||||
}
|
||||
|
||||
async fn init(opts: &mut DatanodeOptions, plugins: &mut Plugins) -> Result<Vec<WorkerGuard>> {
|
||||
common_runtime::init_global_runtimes(&opts.runtime);
|
||||
|
||||
let dn_opts = &mut opts.component;
|
||||
let guard = common_telemetry::init_global_logging(
|
||||
APP_NAME,
|
||||
&dn_opts.logging,
|
||||
&dn_opts.tracing,
|
||||
dn_opts.node_id.map(|x| x.to_string()),
|
||||
);
|
||||
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
|
||||
plugins::setup_datanode_plugins(plugins, &opts.plugins, dn_opts)
|
||||
.await
|
||||
.context(StartDatanodeSnafu)?;
|
||||
|
||||
dn_opts.grpc.detect_server_addr();
|
||||
|
||||
info!("Initialized Datanode instance with {:#?}", opts);
|
||||
Ok(guard)
|
||||
}
|
||||
|
||||
async fn datanode_builder(opts: &DatanodeOptions, plugins: Plugins) -> Result<DatanodeBuilder> {
|
||||
let dn_opts = &opts.component;
|
||||
|
||||
let member_id = dn_opts
|
||||
.node_id
|
||||
.context(MissingConfigSnafu { msg: "'node_id'" })?;
|
||||
let meta_client_options = dn_opts.meta_client.as_ref().context(MissingConfigSnafu {
|
||||
msg: "meta client options",
|
||||
})?;
|
||||
let client = meta_client::create_meta_client(
|
||||
MetaClientType::Datanode { member_id },
|
||||
meta_client_options,
|
||||
Some(&plugins),
|
||||
)
|
||||
.await
|
||||
.context(MetaClientInitSnafu)?;
|
||||
|
||||
let backend = Arc::new(MetaKvBackend {
|
||||
client: client.clone(),
|
||||
});
|
||||
let mut builder = DatanodeBuilder::new(dn_opts.clone(), plugins.clone(), backend.clone());
|
||||
|
||||
let registry = Arc::new(
|
||||
LayeredCacheRegistryBuilder::default()
|
||||
.add_cache_registry(build_datanode_cache_registry(backend))
|
||||
.build(),
|
||||
);
|
||||
builder
|
||||
.with_cache_registry(registry)
|
||||
.with_meta_client(client.clone());
|
||||
Ok(builder)
|
||||
}
|
||||
|
||||
/// Get the mutable builder for Datanode, in case you want to change some fields before the
|
||||
/// final construction.
|
||||
pub fn mut_datanode_builder(&mut self) -> &mut DatanodeBuilder {
|
||||
&mut self.datanode_builder
|
||||
}
|
||||
|
||||
/// Try to build the Datanode instance.
|
||||
pub async fn build(self) -> Result<Instance> {
|
||||
let mut datanode = self
|
||||
.datanode_builder
|
||||
.build()
|
||||
.await
|
||||
.context(StartDatanodeSnafu)?;
|
||||
|
||||
let services = DatanodeServiceBuilder::new(&self.opts.component)
|
||||
.with_default_grpc_server(&datanode.region_server())
|
||||
.enable_http_service()
|
||||
.build()
|
||||
.context(StartDatanodeSnafu)?;
|
||||
datanode.setup_services(services);
|
||||
|
||||
Ok(Instance::new(datanode, self.guard))
|
||||
}
|
||||
}
|
||||
@@ -177,9 +177,6 @@ pub enum Error {
|
||||
source: meta_srv::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid REPL command: {reason}"))]
|
||||
InvalidReplCommand { reason: String },
|
||||
|
||||
#[snafu(display("Failed to parse SQL: {}", sql))]
|
||||
ParseSql {
|
||||
sql: String,
|
||||
@@ -331,7 +328,6 @@ impl ErrorExt for Error {
|
||||
Error::MissingConfig { .. }
|
||||
| Error::LoadLayeredConfig { .. }
|
||||
| Error::IllegalConfig { .. }
|
||||
| Error::InvalidReplCommand { .. }
|
||||
| Error::InitTimezone { .. }
|
||||
| Error::ConnectEtcd { .. }
|
||||
| Error::CreateDir { .. }
|
||||
|
||||
@@ -33,7 +33,8 @@ use common_telemetry::info;
|
||||
use common_telemetry::logging::TracingOptions;
|
||||
use common_version::{short_version, version};
|
||||
use flow::{
|
||||
FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder, FrontendClient, FrontendInvoker,
|
||||
get_flow_auth_options, FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder,
|
||||
FrontendClient, FrontendInvoker,
|
||||
};
|
||||
use meta_client::{MetaClientOptions, MetaClientType};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
@@ -82,10 +83,14 @@ impl App for Instance {
|
||||
}
|
||||
|
||||
async fn start(&mut self) -> Result<()> {
|
||||
plugins::start_flownode_plugins(self.flownode.flow_engine().plugins().clone())
|
||||
.await
|
||||
.context(StartFlownodeSnafu)?;
|
||||
|
||||
self.flownode.start().await.context(StartFlownodeSnafu)
|
||||
}
|
||||
|
||||
async fn stop(&self) -> Result<()> {
|
||||
async fn stop(&mut self) -> Result<()> {
|
||||
self.flownode
|
||||
.shutdown()
|
||||
.await
|
||||
@@ -151,6 +156,9 @@ struct StartCommand {
|
||||
/// HTTP request timeout in seconds.
|
||||
#[clap(long)]
|
||||
http_timeout: Option<u64>,
|
||||
/// User Provider cfg, for auth, currently only support static user provider
|
||||
#[clap(long)]
|
||||
user_provider: Option<String>,
|
||||
}
|
||||
|
||||
impl StartCommand {
|
||||
@@ -214,6 +222,10 @@ impl StartCommand {
|
||||
opts.http.timeout = Duration::from_secs(http_timeout);
|
||||
}
|
||||
|
||||
if let Some(user_provider) = &self.user_provider {
|
||||
opts.user_provider = Some(user_provider.clone());
|
||||
}
|
||||
|
||||
ensure!(
|
||||
opts.node_id.is_some(),
|
||||
MissingConfigSnafu {
|
||||
@@ -238,9 +250,15 @@ impl StartCommand {
|
||||
info!("Flownode start command: {:#?}", self);
|
||||
info!("Flownode options: {:#?}", opts);
|
||||
|
||||
let plugin_opts = opts.plugins;
|
||||
let mut opts = opts.component;
|
||||
opts.grpc.detect_server_addr();
|
||||
|
||||
let mut plugins = Plugins::new();
|
||||
plugins::setup_flownode_plugins(&mut plugins, &plugin_opts, &opts)
|
||||
.await
|
||||
.context(StartFlownodeSnafu)?;
|
||||
|
||||
let member_id = opts
|
||||
.node_id
|
||||
.context(MissingConfigSnafu { msg: "'node_id'" })?;
|
||||
@@ -315,10 +333,12 @@ impl StartCommand {
|
||||
);
|
||||
|
||||
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone()));
|
||||
let frontend_client = FrontendClient::from_meta_client(meta_client.clone());
|
||||
let flow_auth_header = get_flow_auth_options(&opts).context(StartFlownodeSnafu)?;
|
||||
let frontend_client =
|
||||
FrontendClient::from_meta_client(meta_client.clone(), flow_auth_header);
|
||||
let flownode_builder = FlownodeBuilder::new(
|
||||
opts.clone(),
|
||||
Plugins::new(),
|
||||
plugins,
|
||||
table_metadata_manager,
|
||||
catalog_manager.clone(),
|
||||
flow_metadata_manager,
|
||||
@@ -331,7 +351,6 @@ impl StartCommand {
|
||||
.with_grpc_server(flownode.flownode_server().clone())
|
||||
.enable_http_service()
|
||||
.build()
|
||||
.await
|
||||
.context(StartFlownodeSnafu)?;
|
||||
flownode.setup_services(services);
|
||||
let flownode = flownode;
|
||||
|
||||
@@ -89,7 +89,7 @@ impl App for Instance {
|
||||
.context(error::StartFrontendSnafu)
|
||||
}
|
||||
|
||||
async fn stop(&self) -> Result<()> {
|
||||
async fn stop(&mut self) -> Result<()> {
|
||||
self.frontend
|
||||
.shutdown()
|
||||
.await
|
||||
@@ -382,7 +382,6 @@ impl StartCommand {
|
||||
|
||||
let servers = Services::new(opts, instance.clone(), plugins)
|
||||
.build()
|
||||
.await
|
||||
.context(error::StartFrontendSnafu)?;
|
||||
|
||||
let frontend = Frontend {
|
||||
@@ -448,8 +447,6 @@ mod tests {
|
||||
fn test_read_from_config_file() {
|
||||
let mut file = create_named_temp_file();
|
||||
let toml_str = r#"
|
||||
mode = "distributed"
|
||||
|
||||
[http]
|
||||
addr = "127.0.0.1:4000"
|
||||
timeout = "0s"
|
||||
@@ -538,8 +535,6 @@ mod tests {
|
||||
fn test_config_precedence_order() {
|
||||
let mut file = create_named_temp_file();
|
||||
let toml_str = r#"
|
||||
mode = "distributed"
|
||||
|
||||
[http]
|
||||
addr = "127.0.0.1:4000"
|
||||
|
||||
|
||||
@@ -74,7 +74,7 @@ pub trait App: Send {
|
||||
true
|
||||
}
|
||||
|
||||
async fn stop(&self) -> Result<()>;
|
||||
async fn stop(&mut self) -> Result<()>;
|
||||
|
||||
async fn run(&mut self) -> Result<()> {
|
||||
info!("Starting app: {}", self.name());
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt;
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
@@ -68,7 +69,7 @@ impl App for Instance {
|
||||
self.instance.start().await.context(StartMetaServerSnafu)
|
||||
}
|
||||
|
||||
async fn stop(&self) -> Result<()> {
|
||||
async fn stop(&mut self) -> Result<()> {
|
||||
self.instance
|
||||
.shutdown()
|
||||
.await
|
||||
@@ -131,7 +132,7 @@ impl SubCommand {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Parser)]
|
||||
#[derive(Default, Parser)]
|
||||
pub struct StartCommand {
|
||||
/// The address to bind the gRPC server.
|
||||
#[clap(long, alias = "bind-addr")]
|
||||
@@ -171,6 +172,27 @@ pub struct StartCommand {
|
||||
backend: Option<BackendImpl>,
|
||||
}
|
||||
|
||||
impl fmt::Debug for StartCommand {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("StartCommand")
|
||||
.field("rpc_bind_addr", &self.rpc_bind_addr)
|
||||
.field("rpc_server_addr", &self.rpc_server_addr)
|
||||
.field("store_addrs", &self.sanitize_store_addrs())
|
||||
.field("config_file", &self.config_file)
|
||||
.field("selector", &self.selector)
|
||||
.field("use_memory_store", &self.use_memory_store)
|
||||
.field("enable_region_failover", &self.enable_region_failover)
|
||||
.field("http_addr", &self.http_addr)
|
||||
.field("http_timeout", &self.http_timeout)
|
||||
.field("env_prefix", &self.env_prefix)
|
||||
.field("data_home", &self.data_home)
|
||||
.field("store_key_prefix", &self.store_key_prefix)
|
||||
.field("max_txn_ops", &self.max_txn_ops)
|
||||
.field("backend", &self.backend)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl StartCommand {
|
||||
pub fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
|
||||
let mut opts = MetasrvOptions::load_layered_options(
|
||||
@@ -184,6 +206,15 @@ impl StartCommand {
|
||||
Ok(opts)
|
||||
}
|
||||
|
||||
fn sanitize_store_addrs(&self) -> Option<Vec<String>> {
|
||||
self.store_addrs.as_ref().map(|addrs| {
|
||||
addrs
|
||||
.iter()
|
||||
.map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr))
|
||||
.collect()
|
||||
})
|
||||
}
|
||||
|
||||
// The precedence order is: cli > config file > environment variables > default values.
|
||||
fn merge_with_cli_options(
|
||||
&self,
|
||||
|
||||
@@ -75,7 +75,6 @@ use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
|
||||
use servers::grpc::GrpcOptions;
|
||||
use servers::http::HttpOptions;
|
||||
use servers::tls::{TlsMode, TlsOption};
|
||||
use servers::Mode;
|
||||
use snafu::ResultExt;
|
||||
use tokio::sync::RwLock;
|
||||
use tracing_appender::non_blocking::WorkerGuard;
|
||||
@@ -256,8 +255,8 @@ pub struct Instance {
|
||||
|
||||
impl Instance {
|
||||
/// Find the socket addr of a server by its `name`.
|
||||
pub async fn server_addr(&self, name: &str) -> Option<SocketAddr> {
|
||||
self.frontend.server_handlers().addr(name).await
|
||||
pub fn server_addr(&self, name: &str) -> Option<SocketAddr> {
|
||||
self.frontend.server_handlers().addr(name)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -294,7 +293,7 @@ impl App for Instance {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn stop(&self) -> Result<()> {
|
||||
async fn stop(&mut self) -> Result<()> {
|
||||
self.frontend
|
||||
.shutdown()
|
||||
.await
|
||||
@@ -497,12 +496,9 @@ impl StartCommand {
|
||||
.build(),
|
||||
);
|
||||
|
||||
let datanode = DatanodeBuilder::new(dn_opts, plugins.clone(), Mode::Standalone)
|
||||
.with_kv_backend(kv_backend.clone())
|
||||
.with_cache_registry(layered_cache_registry.clone())
|
||||
.build()
|
||||
.await
|
||||
.context(error::StartDatanodeSnafu)?;
|
||||
let mut builder = DatanodeBuilder::new(dn_opts, plugins.clone(), kv_backend.clone());
|
||||
builder.with_cache_registry(layered_cache_registry.clone());
|
||||
let datanode = builder.build().await.context(error::StartDatanodeSnafu)?;
|
||||
|
||||
let information_extension = Arc::new(StandaloneInformationExtension::new(
|
||||
datanode.region_server(),
|
||||
@@ -634,7 +630,6 @@ impl StartCommand {
|
||||
|
||||
let servers = Services::new(opts, fe_instance.clone(), plugins)
|
||||
.build()
|
||||
.await
|
||||
.context(error::StartFrontendSnafu)?;
|
||||
|
||||
let frontend = Frontend {
|
||||
@@ -858,8 +853,6 @@ mod tests {
|
||||
fn test_read_from_config_file() {
|
||||
let mut file = create_named_temp_file();
|
||||
let toml_str = r#"
|
||||
mode = "distributed"
|
||||
|
||||
enable_memory_catalog = true
|
||||
|
||||
[wal]
|
||||
@@ -990,8 +983,6 @@ mod tests {
|
||||
fn test_config_precedence_order() {
|
||||
let mut file = create_named_temp_file();
|
||||
let toml_str = r#"
|
||||
mode = "standalone"
|
||||
|
||||
[http]
|
||||
addr = "127.0.0.1:4000"
|
||||
|
||||
|
||||
@@ -111,11 +111,9 @@ mod tests {
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::*;
|
||||
use crate::Mode;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[derive(Debug, Serialize, Deserialize, Default)]
|
||||
struct TestDatanodeConfig {
|
||||
mode: Mode,
|
||||
node_id: Option<u64>,
|
||||
logging: LoggingOptions,
|
||||
meta_client: Option<MetaClientOptions>,
|
||||
@@ -123,19 +121,6 @@ mod tests {
|
||||
storage: StorageConfig,
|
||||
}
|
||||
|
||||
impl Default for TestDatanodeConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
mode: Mode::Distributed,
|
||||
node_id: None,
|
||||
logging: LoggingOptions::default(),
|
||||
meta_client: None,
|
||||
wal: DatanodeWalConfig::default(),
|
||||
storage: StorageConfig::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Configurable for TestDatanodeConfig {
|
||||
fn env_list_keys() -> Option<&'static [&'static str]> {
|
||||
Some(&["meta_client.metasrv_addrs"])
|
||||
@@ -146,7 +131,6 @@ mod tests {
|
||||
fn test_load_layered_options() {
|
||||
let mut file = create_named_temp_file();
|
||||
let toml_str = r#"
|
||||
mode = "distributed"
|
||||
enable_memory_catalog = false
|
||||
rpc_addr = "127.0.0.1:3001"
|
||||
rpc_hostname = "127.0.0.1"
|
||||
|
||||
@@ -26,16 +26,6 @@ pub fn metadata_store_dir(store_dir: &str) -> String {
|
||||
format!("{store_dir}/metadata")
|
||||
}
|
||||
|
||||
/// The Server running mode
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Copy)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum Mode {
|
||||
// The single process mode.
|
||||
Standalone,
|
||||
// The distributed cluster mode.
|
||||
Distributed,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(default)]
|
||||
pub struct KvBackendConfig {
|
||||
|
||||
@@ -13,7 +13,7 @@ default = ["geo"]
|
||||
geo = ["geohash", "h3o", "s2", "wkt", "geo-types", "dep:geo"]
|
||||
|
||||
[dependencies]
|
||||
ahash = "0.8"
|
||||
ahash.workspace = true
|
||||
api.workspace = true
|
||||
arc-swap = "1.0"
|
||||
async-trait.workspace = true
|
||||
|
||||
@@ -19,4 +19,4 @@ mod uddsketch_state;
|
||||
pub use geo_path::{GeoPathAccumulator, GEO_PATH_NAME};
|
||||
pub(crate) use hll::HllStateType;
|
||||
pub use hll::{HllState, HLL_MERGE_NAME, HLL_NAME};
|
||||
pub use uddsketch_state::{UddSketchState, UDDSKETCH_STATE_NAME};
|
||||
pub use uddsketch_state::{UddSketchState, UDDSKETCH_MERGE_NAME, UDDSKETCH_STATE_NAME};
|
||||
|
||||
@@ -31,23 +31,28 @@ use datafusion::physical_plan::expressions::Literal;
|
||||
use datafusion::prelude::create_udaf;
|
||||
use datatypes::arrow::array::ArrayRef;
|
||||
use datatypes::arrow::datatypes::{DataType, Float64Type};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uddsketch::{SketchHashKey, UDDSketch};
|
||||
|
||||
pub const UDDSKETCH_STATE_NAME: &str = "uddsketch_state";
|
||||
|
||||
#[derive(Debug)]
|
||||
pub const UDDSKETCH_MERGE_NAME: &str = "uddsketch_merge";
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct UddSketchState {
|
||||
uddsketch: UDDSketch,
|
||||
error_rate: f64,
|
||||
}
|
||||
|
||||
impl UddSketchState {
|
||||
pub fn new(bucket_size: u64, error_rate: f64) -> Self {
|
||||
Self {
|
||||
uddsketch: UDDSketch::new(bucket_size, error_rate),
|
||||
error_rate,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn udf_impl() -> AggregateUDF {
|
||||
pub fn state_udf_impl() -> AggregateUDF {
|
||||
create_udaf(
|
||||
UDDSKETCH_STATE_NAME,
|
||||
vec![DataType::Int64, DataType::Float64, DataType::Float64],
|
||||
@@ -61,18 +66,55 @@ impl UddSketchState {
|
||||
)
|
||||
}
|
||||
|
||||
/// Create a UDF for the `uddsketch_merge` function.
|
||||
///
|
||||
/// `uddsketch_merge` accepts bucket size, error rate, and a binary column of states generated by `uddsketch_state`
|
||||
/// and merges them into a single state.
|
||||
///
|
||||
/// The bucket size and error rate must be the same as the original state.
|
||||
pub fn merge_udf_impl() -> AggregateUDF {
|
||||
create_udaf(
|
||||
UDDSKETCH_MERGE_NAME,
|
||||
vec![DataType::Int64, DataType::Float64, DataType::Binary],
|
||||
Arc::new(DataType::Binary),
|
||||
Volatility::Immutable,
|
||||
Arc::new(|args| {
|
||||
let (bucket_size, error_rate) = downcast_accumulator_args(args)?;
|
||||
Ok(Box::new(UddSketchState::new(bucket_size, error_rate)))
|
||||
}),
|
||||
Arc::new(vec![DataType::Binary]),
|
||||
)
|
||||
}
|
||||
|
||||
fn update(&mut self, value: f64) {
|
||||
self.uddsketch.add_value(value);
|
||||
}
|
||||
|
||||
fn merge(&mut self, raw: &[u8]) {
|
||||
if let Ok(uddsketch) = bincode::deserialize::<UDDSketch>(raw) {
|
||||
if uddsketch.count() != 0 {
|
||||
self.uddsketch.merge_sketch(&uddsketch);
|
||||
fn merge(&mut self, raw: &[u8]) -> DfResult<()> {
|
||||
if let Ok(uddsketch) = bincode::deserialize::<Self>(raw) {
|
||||
if uddsketch.uddsketch.count() != 0 {
|
||||
if self.uddsketch.max_allowed_buckets() != uddsketch.uddsketch.max_allowed_buckets()
|
||||
|| (self.error_rate - uddsketch.error_rate).abs() >= 1e-9
|
||||
{
|
||||
return Err(DataFusionError::Plan(format!(
|
||||
"Merging UDDSketch with different parameters: arguments={:?} vs actual input={:?}",
|
||||
(
|
||||
self.uddsketch.max_allowed_buckets(),
|
||||
self.error_rate
|
||||
),
|
||||
(uddsketch.uddsketch.max_allowed_buckets(), uddsketch.error_rate)
|
||||
)));
|
||||
}
|
||||
self.uddsketch.merge_sketch(&uddsketch.uddsketch);
|
||||
}
|
||||
} else {
|
||||
trace!("Warning: Failed to deserialize UDDSketch from {:?}", raw);
|
||||
return Err(DataFusionError::Plan(
|
||||
"Failed to deserialize UDDSketch from binary".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -113,9 +155,21 @@ fn downcast_accumulator_args(args: AccumulatorArgs) -> DfResult<(u64, f64)> {
|
||||
impl DfAccumulator for UddSketchState {
|
||||
fn update_batch(&mut self, values: &[ArrayRef]) -> DfResult<()> {
|
||||
let array = &values[2]; // the third column is data value
|
||||
let f64_array = as_primitive_array::<Float64Type>(array)?;
|
||||
for v in f64_array.iter().flatten() {
|
||||
self.update(v);
|
||||
match array.data_type() {
|
||||
DataType::Float64 => {
|
||||
let f64_array = as_primitive_array::<Float64Type>(array)?;
|
||||
for v in f64_array.iter().flatten() {
|
||||
self.update(v);
|
||||
}
|
||||
}
|
||||
// meaning instantiate as `uddsketch_merge`
|
||||
DataType::Binary => self.merge_batch(&[array.clone()])?,
|
||||
_ => {
|
||||
return not_impl_err!(
|
||||
"UDDSketch functions do not support data type: {}",
|
||||
array.data_type()
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -123,7 +177,7 @@ impl DfAccumulator for UddSketchState {
|
||||
|
||||
fn evaluate(&mut self) -> DfResult<ScalarValue> {
|
||||
Ok(ScalarValue::Binary(Some(
|
||||
bincode::serialize(&self.uddsketch).map_err(|e| {
|
||||
bincode::serialize(&self).map_err(|e| {
|
||||
DataFusionError::Internal(format!("Failed to serialize UDDSketch: {}", e))
|
||||
})?,
|
||||
)))
|
||||
@@ -150,7 +204,7 @@ impl DfAccumulator for UddSketchState {
|
||||
|
||||
fn state(&mut self) -> DfResult<Vec<ScalarValue>> {
|
||||
Ok(vec![ScalarValue::Binary(Some(
|
||||
bincode::serialize(&self.uddsketch).map_err(|e| {
|
||||
bincode::serialize(&self).map_err(|e| {
|
||||
DataFusionError::Internal(format!("Failed to serialize UDDSketch: {}", e))
|
||||
})?,
|
||||
))])
|
||||
@@ -160,7 +214,7 @@ impl DfAccumulator for UddSketchState {
|
||||
let array = &states[0];
|
||||
let binary_array = as_binary_array(array)?;
|
||||
for v in binary_array.iter().flatten() {
|
||||
self.merge(v);
|
||||
self.merge(v)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -182,8 +236,8 @@ mod tests {
|
||||
|
||||
let result = state.evaluate().unwrap();
|
||||
if let ScalarValue::Binary(Some(bytes)) = result {
|
||||
let deserialized: UDDSketch = bincode::deserialize(&bytes).unwrap();
|
||||
assert_eq!(deserialized.count(), 3);
|
||||
let deserialized: UddSketchState = bincode::deserialize(&bytes).unwrap();
|
||||
assert_eq!(deserialized.uddsketch.count(), 3);
|
||||
} else {
|
||||
panic!("Expected binary scalar value");
|
||||
}
|
||||
@@ -201,13 +255,15 @@ mod tests {
|
||||
// Create new state and merge the serialized data
|
||||
let mut new_state = UddSketchState::new(10, 0.01);
|
||||
if let ScalarValue::Binary(Some(bytes)) = &serialized {
|
||||
new_state.merge(bytes);
|
||||
new_state.merge(bytes).unwrap();
|
||||
|
||||
// Verify the merged state matches original by comparing deserialized values
|
||||
let original_sketch: UDDSketch = bincode::deserialize(bytes).unwrap();
|
||||
let original_sketch: UddSketchState = bincode::deserialize(bytes).unwrap();
|
||||
let original_sketch = original_sketch.uddsketch;
|
||||
let new_result = new_state.evaluate().unwrap();
|
||||
if let ScalarValue::Binary(Some(new_bytes)) = new_result {
|
||||
let new_sketch: UDDSketch = bincode::deserialize(&new_bytes).unwrap();
|
||||
let new_sketch: UddSketchState = bincode::deserialize(&new_bytes).unwrap();
|
||||
let new_sketch = new_sketch.uddsketch;
|
||||
assert_eq!(original_sketch.count(), new_sketch.count());
|
||||
assert_eq!(original_sketch.sum(), new_sketch.sum());
|
||||
assert_eq!(original_sketch.mean(), new_sketch.mean());
|
||||
@@ -244,7 +300,8 @@ mod tests {
|
||||
|
||||
let result = state.evaluate().unwrap();
|
||||
if let ScalarValue::Binary(Some(bytes)) = result {
|
||||
let deserialized: UDDSketch = bincode::deserialize(&bytes).unwrap();
|
||||
let deserialized: UddSketchState = bincode::deserialize(&bytes).unwrap();
|
||||
let deserialized = deserialized.uddsketch;
|
||||
assert_eq!(deserialized.count(), 3);
|
||||
} else {
|
||||
panic!("Expected binary scalar value");
|
||||
@@ -273,7 +330,8 @@ mod tests {
|
||||
|
||||
let result = merged_state.evaluate().unwrap();
|
||||
if let ScalarValue::Binary(Some(bytes)) = result {
|
||||
let deserialized: UDDSketch = bincode::deserialize(&bytes).unwrap();
|
||||
let deserialized: UddSketchState = bincode::deserialize(&bytes).unwrap();
|
||||
let deserialized = deserialized.uddsketch;
|
||||
assert_eq!(deserialized.count(), 2);
|
||||
} else {
|
||||
panic!("Expected binary scalar value");
|
||||
|
||||
@@ -12,8 +12,9 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt;
|
||||
use std::iter::repeat_n;
|
||||
use std::sync::Arc;
|
||||
use std::{fmt, iter};
|
||||
|
||||
use common_query::error::{InvalidFuncArgsSnafu, Result};
|
||||
use common_query::prelude::Volatility;
|
||||
@@ -126,9 +127,10 @@ impl Function for MatchesTermFunction {
|
||||
let term = term_column.get_ref(0).as_string().unwrap();
|
||||
match term {
|
||||
None => {
|
||||
return Ok(Arc::new(BooleanVector::from_iter(
|
||||
iter::repeat(None).take(text_column.len()),
|
||||
)));
|
||||
return Ok(Arc::new(BooleanVector::from_iter(repeat_n(
|
||||
None,
|
||||
text_column.len(),
|
||||
))));
|
||||
}
|
||||
Some(term) => Some(MatchesTermFinder::new(term)),
|
||||
}
|
||||
@@ -217,7 +219,7 @@ impl MatchesTermFinder {
|
||||
}
|
||||
|
||||
let mut pos = 0;
|
||||
while let Some(found_pos) = self.finder.find(text[pos..].as_bytes()) {
|
||||
while let Some(found_pos) = self.finder.find(&text.as_bytes()[pos..]) {
|
||||
let actual_pos = pos + found_pos;
|
||||
|
||||
let prev_ok = self.starts_with_non_alnum
|
||||
|
||||
@@ -13,10 +13,8 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
mod greatest;
|
||||
mod to_unixtime;
|
||||
|
||||
use greatest::GreatestFunction;
|
||||
use to_unixtime::ToUnixtimeFunction;
|
||||
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
@@ -26,6 +24,5 @@ pub(crate) struct TimestampFunction;
|
||||
impl TimestampFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register(Arc::new(ToUnixtimeFunction));
|
||||
registry.register(Arc::new(GreatestFunction));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,328 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt::{self};
|
||||
|
||||
use common_query::error::{
|
||||
self, ArrowComputeSnafu, InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu,
|
||||
};
|
||||
use common_query::prelude::{Signature, Volatility};
|
||||
use datafusion::arrow::compute::kernels::cmp::gt;
|
||||
use datatypes::arrow::array::AsArray;
|
||||
use datatypes::arrow::compute::cast;
|
||||
use datatypes::arrow::compute::kernels::zip;
|
||||
use datatypes::arrow::datatypes::{
|
||||
DataType as ArrowDataType, Date32Type, TimeUnit, TimestampMicrosecondType,
|
||||
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
|
||||
};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::types::TimestampType;
|
||||
use datatypes::vectors::{Helper, VectorRef};
|
||||
use snafu::{ensure, ResultExt};
|
||||
|
||||
use crate::function::{Function, FunctionContext};
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct GreatestFunction;
|
||||
|
||||
const NAME: &str = "greatest";
|
||||
|
||||
macro_rules! gt_time_types {
|
||||
($ty: ident, $columns:expr) => {{
|
||||
let column1 = $columns[0].to_arrow_array();
|
||||
let column2 = $columns[1].to_arrow_array();
|
||||
|
||||
let column1 = column1.as_primitive::<$ty>();
|
||||
let column2 = column2.as_primitive::<$ty>();
|
||||
let boolean_array = gt(&column1, &column2).context(ArrowComputeSnafu)?;
|
||||
|
||||
let result = zip::zip(&boolean_array, &column1, &column2).context(ArrowComputeSnafu)?;
|
||||
Helper::try_into_vector(&result).context(error::FromArrowArraySnafu)
|
||||
}};
|
||||
}
|
||||
|
||||
impl Function for GreatestFunction {
|
||||
fn name(&self) -> &str {
|
||||
NAME
|
||||
}
|
||||
|
||||
fn return_type(&self, input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
|
||||
ensure!(
|
||||
input_types.len() == 2,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect exactly two, have: {}",
|
||||
input_types.len()
|
||||
)
|
||||
}
|
||||
);
|
||||
|
||||
match &input_types[0] {
|
||||
ConcreteDataType::String(_) => Ok(ConcreteDataType::timestamp_millisecond_datatype()),
|
||||
ConcreteDataType::Date(_) => Ok(ConcreteDataType::date_datatype()),
|
||||
ConcreteDataType::Timestamp(ts_type) => Ok(ConcreteDataType::Timestamp(*ts_type)),
|
||||
_ => UnsupportedInputDataTypeSnafu {
|
||||
function: NAME,
|
||||
datatypes: input_types,
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
fn signature(&self) -> Signature {
|
||||
Signature::uniform(
|
||||
2,
|
||||
vec![
|
||||
ConcreteDataType::string_datatype(),
|
||||
ConcreteDataType::date_datatype(),
|
||||
ConcreteDataType::timestamp_nanosecond_datatype(),
|
||||
ConcreteDataType::timestamp_microsecond_datatype(),
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
ConcreteDataType::timestamp_second_datatype(),
|
||||
],
|
||||
Volatility::Immutable,
|
||||
)
|
||||
}
|
||||
|
||||
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
|
||||
ensure!(
|
||||
columns.len() == 2,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect exactly two, have: {}",
|
||||
columns.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
match columns[0].data_type() {
|
||||
ConcreteDataType::String(_) => {
|
||||
let column1 = cast(
|
||||
&columns[0].to_arrow_array(),
|
||||
&ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
|
||||
)
|
||||
.context(ArrowComputeSnafu)?;
|
||||
let column1 = column1.as_primitive::<TimestampMillisecondType>();
|
||||
let column2 = cast(
|
||||
&columns[1].to_arrow_array(),
|
||||
&ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
|
||||
)
|
||||
.context(ArrowComputeSnafu)?;
|
||||
let column2 = column2.as_primitive::<TimestampMillisecondType>();
|
||||
let boolean_array = gt(&column1, &column2).context(ArrowComputeSnafu)?;
|
||||
let result =
|
||||
zip::zip(&boolean_array, &column1, &column2).context(ArrowComputeSnafu)?;
|
||||
Ok(Helper::try_into_vector(&result).context(error::FromArrowArraySnafu)?)
|
||||
}
|
||||
ConcreteDataType::Date(_) => gt_time_types!(Date32Type, columns),
|
||||
ConcreteDataType::Timestamp(ts_type) => match ts_type {
|
||||
TimestampType::Second(_) => gt_time_types!(TimestampSecondType, columns),
|
||||
TimestampType::Millisecond(_) => {
|
||||
gt_time_types!(TimestampMillisecondType, columns)
|
||||
}
|
||||
TimestampType::Microsecond(_) => {
|
||||
gt_time_types!(TimestampMicrosecondType, columns)
|
||||
}
|
||||
TimestampType::Nanosecond(_) => {
|
||||
gt_time_types!(TimestampNanosecondType, columns)
|
||||
}
|
||||
},
|
||||
_ => UnsupportedInputDataTypeSnafu {
|
||||
function: NAME,
|
||||
datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for GreatestFunction {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "GREATEST")
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use common_time::{Date, Timestamp};
|
||||
use datatypes::types::{
|
||||
DateType, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
|
||||
TimestampSecondType,
|
||||
};
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::{
|
||||
DateVector, StringVector, TimestampMicrosecondVector, TimestampMillisecondVector,
|
||||
TimestampNanosecondVector, TimestampSecondVector, Vector,
|
||||
};
|
||||
use paste::paste;
|
||||
|
||||
use super::*;
|
||||
#[test]
|
||||
fn test_greatest_takes_string_vector() {
|
||||
let function = GreatestFunction;
|
||||
assert_eq!(
|
||||
function
|
||||
.return_type(&[
|
||||
ConcreteDataType::string_datatype(),
|
||||
ConcreteDataType::string_datatype()
|
||||
])
|
||||
.unwrap(),
|
||||
ConcreteDataType::timestamp_millisecond_datatype()
|
||||
);
|
||||
let columns = vec![
|
||||
Arc::new(StringVector::from(vec![
|
||||
"1970-01-01".to_string(),
|
||||
"2012-12-23".to_string(),
|
||||
])) as _,
|
||||
Arc::new(StringVector::from(vec![
|
||||
"2001-02-01".to_string(),
|
||||
"1999-01-01".to_string(),
|
||||
])) as _,
|
||||
];
|
||||
|
||||
let result = function
|
||||
.eval(&FunctionContext::default(), &columns)
|
||||
.unwrap();
|
||||
let result = result
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondVector>()
|
||||
.unwrap();
|
||||
assert_eq!(result.len(), 2);
|
||||
assert_eq!(
|
||||
result.get(0),
|
||||
Value::Timestamp(Timestamp::from_str("2001-02-01 00:00:00", None).unwrap())
|
||||
);
|
||||
assert_eq!(
|
||||
result.get(1),
|
||||
Value::Timestamp(Timestamp::from_str("2012-12-23 00:00:00", None).unwrap())
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_greatest_takes_date_vector() {
|
||||
let function = GreatestFunction;
|
||||
assert_eq!(
|
||||
function
|
||||
.return_type(&[
|
||||
ConcreteDataType::date_datatype(),
|
||||
ConcreteDataType::date_datatype()
|
||||
])
|
||||
.unwrap(),
|
||||
ConcreteDataType::Date(DateType)
|
||||
);
|
||||
|
||||
let columns = vec![
|
||||
Arc::new(DateVector::from_slice(vec![-1, 2])) as _,
|
||||
Arc::new(DateVector::from_slice(vec![0, 1])) as _,
|
||||
];
|
||||
|
||||
let result = function
|
||||
.eval(&FunctionContext::default(), &columns)
|
||||
.unwrap();
|
||||
let result = result.as_any().downcast_ref::<DateVector>().unwrap();
|
||||
assert_eq!(result.len(), 2);
|
||||
assert_eq!(
|
||||
result.get(0),
|
||||
Value::Date(Date::from_str_utc("1970-01-01").unwrap())
|
||||
);
|
||||
assert_eq!(
|
||||
result.get(1),
|
||||
Value::Date(Date::from_str_utc("1970-01-03").unwrap())
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_greatest_takes_datetime_vector() {
|
||||
let function = GreatestFunction;
|
||||
assert_eq!(
|
||||
function
|
||||
.return_type(&[
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
ConcreteDataType::timestamp_millisecond_datatype()
|
||||
])
|
||||
.unwrap(),
|
||||
ConcreteDataType::timestamp_millisecond_datatype()
|
||||
);
|
||||
|
||||
let columns = vec![
|
||||
Arc::new(TimestampMillisecondVector::from_slice(vec![-1, 2])) as _,
|
||||
Arc::new(TimestampMillisecondVector::from_slice(vec![0, 1])) as _,
|
||||
];
|
||||
|
||||
let result = function
|
||||
.eval(&FunctionContext::default(), &columns)
|
||||
.unwrap();
|
||||
let result = result
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondVector>()
|
||||
.unwrap();
|
||||
assert_eq!(result.len(), 2);
|
||||
assert_eq!(
|
||||
result.get(0),
|
||||
Value::Timestamp(Timestamp::from_str("1970-01-01 00:00:00", None).unwrap())
|
||||
);
|
||||
assert_eq!(
|
||||
result.get(1),
|
||||
Value::Timestamp(Timestamp::from_str("1970-01-01 00:00:00.002", None).unwrap())
|
||||
);
|
||||
}
|
||||
|
||||
macro_rules! test_timestamp {
|
||||
($type: expr,$unit: ident) => {
|
||||
paste! {
|
||||
#[test]
|
||||
fn [<test_greatest_takes_ $unit:lower _vector>]() {
|
||||
let function = GreatestFunction;
|
||||
assert_eq!(
|
||||
function.return_type(&[$type, $type]).unwrap(),
|
||||
ConcreteDataType::Timestamp(TimestampType::$unit([<Timestamp $unit Type>]))
|
||||
);
|
||||
|
||||
let columns = vec![
|
||||
Arc::new([<Timestamp $unit Vector>]::from_slice(vec![-1, 2])) as _,
|
||||
Arc::new([<Timestamp $unit Vector>]::from_slice(vec![0, 1])) as _,
|
||||
];
|
||||
|
||||
let result = function.eval(&FunctionContext::default(), &columns).unwrap();
|
||||
let result = result.as_any().downcast_ref::<[<Timestamp $unit Vector>]>().unwrap();
|
||||
assert_eq!(result.len(), 2);
|
||||
assert_eq!(
|
||||
result.get(0),
|
||||
Value::Timestamp(Timestamp::new(0, TimeUnit::$unit))
|
||||
);
|
||||
assert_eq!(
|
||||
result.get(1),
|
||||
Value::Timestamp(Timestamp::new(2, TimeUnit::$unit))
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test_timestamp!(
|
||||
ConcreteDataType::timestamp_nanosecond_datatype(),
|
||||
Nanosecond
|
||||
);
|
||||
test_timestamp!(
|
||||
ConcreteDataType::timestamp_microsecond_datatype(),
|
||||
Microsecond
|
||||
);
|
||||
test_timestamp!(
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
Millisecond
|
||||
);
|
||||
test_timestamp!(ConcreteDataType::timestamp_second_datatype(), Second);
|
||||
}
|
||||
@@ -115,6 +115,13 @@ impl Function for UddSketchCalcFunction {
|
||||
}
|
||||
};
|
||||
|
||||
// Check if the sketch is empty, if so, return null
|
||||
// This is important to avoid panics when calling estimate_quantile on an empty sketch
|
||||
// In practice, this will happen if input is all null
|
||||
if sketch.bucket_iter().count() == 0 {
|
||||
builder.push_null();
|
||||
continue;
|
||||
}
|
||||
// Compute the estimated quantile from the sketch
|
||||
let result = sketch.estimate_quantile(perc);
|
||||
builder.push(Some(result));
|
||||
|
||||
@@ -192,6 +192,10 @@ impl FlightDecoder {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn schema(&self) -> Option<&SchemaRef> {
|
||||
self.schema.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn flight_messages_to_recordbatches(messages: Vec<FlightMessage>) -> Result<RecordBatches> {
|
||||
|
||||
@@ -18,4 +18,5 @@ pub mod flight;
|
||||
pub mod precision;
|
||||
pub mod select;
|
||||
|
||||
pub use arrow_flight::FlightData;
|
||||
pub use error::Error;
|
||||
|
||||
@@ -217,7 +217,9 @@ pub enum Instruction {
|
||||
/// Invalidates batch cache.
|
||||
InvalidateCaches(Vec<CacheIdent>),
|
||||
/// Flushes regions.
|
||||
FlushRegion(FlushRegions),
|
||||
FlushRegions(FlushRegions),
|
||||
/// Flushes a single region.
|
||||
FlushRegion(RegionId),
|
||||
}
|
||||
|
||||
/// The reply of [UpgradeRegion].
|
||||
@@ -248,6 +250,7 @@ pub enum InstructionReply {
|
||||
CloseRegion(SimpleReply),
|
||||
UpgradeRegion(UpgradeRegionReply),
|
||||
DowngradeRegion(DowngradeRegionReply),
|
||||
FlushRegion(SimpleReply),
|
||||
}
|
||||
|
||||
impl Display for InstructionReply {
|
||||
@@ -259,6 +262,7 @@ impl Display for InstructionReply {
|
||||
Self::DowngradeRegion(reply) => {
|
||||
write!(f, "InstructionReply::DowngradeRegion({})", reply)
|
||||
}
|
||||
Self::FlushRegion(reply) => write!(f, "InstructionReply::FlushRegion({})", reply),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -112,7 +112,7 @@ pub mod test_utils;
|
||||
mod tombstone;
|
||||
pub mod topic_name;
|
||||
pub mod topic_region;
|
||||
pub(crate) mod txn_helper;
|
||||
pub mod txn_helper;
|
||||
pub mod view_info;
|
||||
|
||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
||||
|
||||
@@ -478,10 +478,11 @@ impl TableRouteStorage {
|
||||
))
|
||||
}
|
||||
|
||||
// TODO(LFC): restore its original visibility after some test utility codes are refined
|
||||
/// Builds a update table route transaction,
|
||||
/// it expected the remote value equals the `current_table_route_value`.
|
||||
/// It retrieves the latest value if the comparing failed.
|
||||
pub(crate) fn build_update_txn(
|
||||
pub fn build_update_txn(
|
||||
&self,
|
||||
table_id: TableId,
|
||||
current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
|
||||
|
||||
@@ -25,7 +25,7 @@ pub struct TxnOpGetResponseSet(Vec<KeyValue>);
|
||||
|
||||
impl TxnOpGetResponseSet {
|
||||
/// Returns a filter to consume a [KeyValue] where the key equals `key`.
|
||||
pub(crate) fn filter(key: Vec<u8>) -> impl FnMut(&mut TxnOpGetResponseSet) -> Option<Vec<u8>> {
|
||||
pub fn filter(key: Vec<u8>) -> impl FnMut(&mut TxnOpGetResponseSet) -> Option<Vec<u8>> {
|
||||
move |set| {
|
||||
let pos = set.0.iter().position(|kv| kv.key == key);
|
||||
match pos {
|
||||
@@ -36,7 +36,7 @@ impl TxnOpGetResponseSet {
|
||||
}
|
||||
|
||||
/// Returns a decoder to decode bytes to `DeserializedValueWithBytes<T>`.
|
||||
pub(crate) fn decode_with<F, T>(
|
||||
pub fn decode_with<F, T>(
|
||||
mut f: F,
|
||||
) -> impl FnMut(&mut TxnOpGetResponseSet) -> Result<Option<DeserializedValueWithBytes<T>>>
|
||||
where
|
||||
|
||||
@@ -35,7 +35,7 @@ pub mod memory;
|
||||
pub mod rds;
|
||||
pub mod test;
|
||||
pub mod txn;
|
||||
|
||||
pub mod util;
|
||||
pub type KvBackendRef<E = Error> = Arc<dyn KvBackend<Error = E> + Send + Sync>;
|
||||
|
||||
#[async_trait]
|
||||
|
||||
85
src/common/meta/src/kv_backend/util.rs
Normal file
85
src/common/meta/src/kv_backend/util.rs
Normal file
@@ -0,0 +1,85 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
/// Removes sensitive information like passwords from connection strings.
|
||||
///
|
||||
/// This function sanitizes connection strings by removing credentials:
|
||||
/// - For URL format (mysql://user:password@host:port/db): Removes everything before '@'
|
||||
/// - For parameter format (host=localhost password=secret): Removes the password parameter
|
||||
/// - For URL format without credentials (mysql://host:port/db): Removes the protocol prefix
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `conn_str` - The connection string to sanitize
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// A sanitized version of the connection string with sensitive information removed
|
||||
pub fn sanitize_connection_string(conn_str: &str) -> String {
|
||||
// Case 1: URL format with credentials (mysql://user:password@host:port/db)
|
||||
// Extract everything after the '@' symbol
|
||||
if let Some(at_pos) = conn_str.find('@') {
|
||||
return conn_str[at_pos + 1..].to_string();
|
||||
}
|
||||
|
||||
// Case 2: Parameter format with password (host=localhost password=secret dbname=mydb)
|
||||
// Filter out any parameter that starts with "password="
|
||||
if conn_str.contains("password=") {
|
||||
return conn_str
|
||||
.split_whitespace()
|
||||
.filter(|param| !param.starts_with("password="))
|
||||
.collect::<Vec<_>>()
|
||||
.join(" ");
|
||||
}
|
||||
|
||||
// Case 3: URL format without credentials (mysql://host:port/db)
|
||||
// Extract everything after the protocol prefix
|
||||
if let Some(host_part) = conn_str.split("://").nth(1) {
|
||||
return host_part.to_string();
|
||||
}
|
||||
|
||||
// Case 4: Already sanitized or unknown format
|
||||
// Return as is
|
||||
conn_str.to_string()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_sanitize_connection_string() {
|
||||
// Test URL format with username/password
|
||||
let conn_str = "mysql://user:password123@localhost:3306/db";
|
||||
assert_eq!(sanitize_connection_string(conn_str), "localhost:3306/db");
|
||||
|
||||
// Test URL format without credentials
|
||||
let conn_str = "mysql://localhost:3306/db";
|
||||
assert_eq!(sanitize_connection_string(conn_str), "localhost:3306/db");
|
||||
|
||||
// Test parameter format with password
|
||||
let conn_str = "host=localhost port=5432 user=postgres password=secret dbname=mydb";
|
||||
assert_eq!(
|
||||
sanitize_connection_string(conn_str),
|
||||
"host=localhost port=5432 user=postgres dbname=mydb"
|
||||
);
|
||||
|
||||
// Test parameter format without password
|
||||
let conn_str = "host=localhost port=5432 user=postgres dbname=mydb";
|
||||
assert_eq!(
|
||||
sanitize_connection_string(conn_str),
|
||||
"host=localhost port=5432 user=postgres dbname=mydb"
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -15,8 +15,6 @@
|
||||
#![feature(assert_matches)]
|
||||
#![feature(btree_extract_if)]
|
||||
#![feature(let_chains)]
|
||||
#![feature(extract_if)]
|
||||
#![feature(hash_extract_if)]
|
||||
|
||||
pub mod cache;
|
||||
pub mod cache_invalidator;
|
||||
|
||||
@@ -121,8 +121,8 @@ pub enum FlowNameLock {
|
||||
}
|
||||
|
||||
impl FlowNameLock {
|
||||
pub fn new(catalog: &str, table: &str) -> Self {
|
||||
Self::Write(format!("{catalog}.{table}"))
|
||||
pub fn new(catalog: &str, flow_name: &str) -> Self {
|
||||
Self::Write(format!("{catalog}.{flow_name}"))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -176,15 +176,12 @@ impl TableRoute {
|
||||
})?
|
||||
.into();
|
||||
|
||||
let leader_peer = peers
|
||||
.get(region_route.leader_peer_index as usize)
|
||||
.cloned()
|
||||
.map(Into::into);
|
||||
let leader_peer = peers.get(region_route.leader_peer_index as usize).cloned();
|
||||
|
||||
let follower_peers = region_route
|
||||
.follower_peer_indexes
|
||||
.into_iter()
|
||||
.filter_map(|x| peers.get(x as usize).cloned().map(Into::into))
|
||||
.filter_map(|x| peers.get(x as usize).cloned())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
region_routes.push(RegionRoute {
|
||||
|
||||
@@ -18,11 +18,13 @@ use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_procedure::local::{acquire_dynamic_key_lock, DynamicKeyLockGuard};
|
||||
use common_procedure::rwlock::KeyRwLock;
|
||||
use common_procedure::store::poison_store::PoisonStore;
|
||||
use common_procedure::test_util::InMemoryPoisonStore;
|
||||
use common_procedure::{
|
||||
Context, ContextProvider, Output, PoisonKey, Procedure, ProcedureId, ProcedureState,
|
||||
ProcedureWithId, Result, Status,
|
||||
ProcedureWithId, Result, Status, StringKey,
|
||||
};
|
||||
|
||||
/// A Mock [ContextProvider].
|
||||
@@ -30,6 +32,7 @@ use common_procedure::{
|
||||
pub struct MockContextProvider {
|
||||
states: HashMap<ProcedureId, ProcedureState>,
|
||||
poison_manager: InMemoryPoisonStore,
|
||||
dynamic_key_lock: Arc<KeyRwLock<String>>,
|
||||
}
|
||||
|
||||
impl MockContextProvider {
|
||||
@@ -38,6 +41,7 @@ impl MockContextProvider {
|
||||
MockContextProvider {
|
||||
states,
|
||||
poison_manager: InMemoryPoisonStore::default(),
|
||||
dynamic_key_lock: Arc::new(KeyRwLock::new()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,6 +62,10 @@ impl ContextProvider for MockContextProvider {
|
||||
.try_put_poison(key.to_string(), procedure_id.to_string())
|
||||
.await
|
||||
}
|
||||
|
||||
async fn acquire_lock(&self, key: &StringKey) -> DynamicKeyLockGuard {
|
||||
acquire_dynamic_key_lock(&self.dynamic_key_lock, key).await
|
||||
}
|
||||
}
|
||||
|
||||
/// Executes a procedure until it returns [Status::Done].
|
||||
|
||||
@@ -20,6 +20,7 @@ pub mod error;
|
||||
pub mod local;
|
||||
pub mod options;
|
||||
mod procedure;
|
||||
pub mod rwlock;
|
||||
pub mod store;
|
||||
pub mod watcher;
|
||||
|
||||
@@ -28,8 +29,8 @@ pub mod test_util;
|
||||
|
||||
pub use crate::error::{Error, Result};
|
||||
pub use crate::procedure::{
|
||||
BoxedProcedure, BoxedProcedureLoader, Context, ContextProvider, LockKey, Output, ParseIdError,
|
||||
PoisonKey, PoisonKeys, Procedure, ProcedureId, ProcedureInfo, ProcedureManager,
|
||||
ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, StringKey,
|
||||
BoxedProcedure, BoxedProcedureLoader, Context, ContextProvider, ContextProviderRef, LockKey,
|
||||
Output, ParseIdError, PoisonKey, PoisonKeys, Procedure, ProcedureId, ProcedureInfo,
|
||||
ProcedureManager, ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, StringKey,
|
||||
};
|
||||
pub use crate::watcher::Watcher;
|
||||
|
||||
@@ -13,7 +13,6 @@
|
||||
// limitations under the License.
|
||||
|
||||
mod runner;
|
||||
mod rwlock;
|
||||
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::{HashMap, HashSet, VecDeque};
|
||||
@@ -30,7 +29,6 @@ use snafu::{ensure, OptionExt, ResultExt};
|
||||
use tokio::sync::watch::{self, Receiver, Sender};
|
||||
use tokio::sync::{Mutex as TokioMutex, Notify};
|
||||
|
||||
use self::rwlock::KeyRwLock;
|
||||
use crate::error::{
|
||||
self, DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu,
|
||||
PoisonKeyNotDefinedSnafu, ProcedureNotFoundSnafu, Result, StartRemoveOutdatedMetaTaskSnafu,
|
||||
@@ -38,11 +36,12 @@ use crate::error::{
|
||||
};
|
||||
use crate::local::runner::Runner;
|
||||
use crate::procedure::{BoxedProcedureLoader, InitProcedureState, PoisonKeys, ProcedureInfo};
|
||||
use crate::rwlock::{KeyRwLock, OwnedKeyRwLockGuard};
|
||||
use crate::store::poison_store::PoisonStoreRef;
|
||||
use crate::store::{ProcedureMessage, ProcedureMessages, ProcedureStore, StateStoreRef};
|
||||
use crate::{
|
||||
BoxedProcedure, ContextProvider, LockKey, PoisonKey, ProcedureId, ProcedureManager,
|
||||
ProcedureState, ProcedureWithId, Watcher,
|
||||
ProcedureState, ProcedureWithId, StringKey, Watcher,
|
||||
};
|
||||
|
||||
/// The expired time of a procedure's metadata.
|
||||
@@ -157,12 +156,80 @@ struct LoadedProcedure {
|
||||
step: u32,
|
||||
}
|
||||
|
||||
/// The dynamic lock for procedure execution.
|
||||
///
|
||||
/// Unlike the procedure-level locks, these locks are acquired dynamically by the procedure
|
||||
/// during execution. They are only held when the procedure specifically needs these keys
|
||||
/// and are released as soon as the procedure no longer needs them.
|
||||
/// This allows for more fine-grained concurrency control during procedure execution.
|
||||
pub(crate) type DynamicKeyLock = Arc<KeyRwLock<String>>;
|
||||
|
||||
/// Acquires a dynamic key lock for the given key.
|
||||
///
|
||||
/// This function takes a reference to the dynamic key lock and a pointer to the key.
|
||||
/// It then matches the key type and acquires the appropriate lock.
|
||||
pub async fn acquire_dynamic_key_lock(
|
||||
lock: &DynamicKeyLock,
|
||||
key: &StringKey,
|
||||
) -> DynamicKeyLockGuard {
|
||||
match key {
|
||||
StringKey::Share(key) => {
|
||||
let guard = lock.read(key.to_string()).await;
|
||||
DynamicKeyLockGuard {
|
||||
guard: Some(OwnedKeyRwLockGuard::from(guard)),
|
||||
key: key.to_string(),
|
||||
lock: lock.clone(),
|
||||
}
|
||||
}
|
||||
StringKey::Exclusive(key) => {
|
||||
let guard = lock.write(key.to_string()).await;
|
||||
DynamicKeyLockGuard {
|
||||
guard: Some(OwnedKeyRwLockGuard::from(guard)),
|
||||
key: key.to_string(),
|
||||
lock: lock.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
/// A guard for the dynamic key lock.
|
||||
///
|
||||
/// This guard is used to release the lock when the procedure no longer needs it.
|
||||
/// It also ensures that the lock is cleaned up when the guard is dropped.
|
||||
pub struct DynamicKeyLockGuard {
|
||||
guard: Option<OwnedKeyRwLockGuard>,
|
||||
key: String,
|
||||
lock: DynamicKeyLock,
|
||||
}
|
||||
|
||||
impl Drop for DynamicKeyLockGuard {
|
||||
fn drop(&mut self) {
|
||||
if let Some(guard) = self.guard.take() {
|
||||
drop(guard);
|
||||
}
|
||||
self.lock.clean_keys(&[self.key.to_string()]);
|
||||
}
|
||||
}
|
||||
|
||||
/// Shared context of the manager.
|
||||
pub(crate) struct ManagerContext {
|
||||
/// Procedure loaders. The key is the type name of the procedure which the loader returns.
|
||||
loaders: Mutex<HashMap<String, BoxedProcedureLoader>>,
|
||||
/// The key lock for the procedure.
|
||||
///
|
||||
/// The lock keys are defined in `Procedure::lock_key()`.
|
||||
/// These locks are acquired before the procedure starts and released after the procedure finishes.
|
||||
/// They ensure exclusive access to resources throughout the entire procedure lifecycle.
|
||||
key_lock: KeyRwLock<String>,
|
||||
/// The dynamic lock for procedure execution.
|
||||
///
|
||||
/// Unlike the procedure-level locks, these locks are acquired dynamically by the procedure
|
||||
/// during execution. They are only held when the procedure specifically needs these keys
|
||||
/// and are released as soon as the procedure no longer needs them.
|
||||
/// This allows for more fine-grained concurrency control during procedure execution.
|
||||
dynamic_key_lock: DynamicKeyLock,
|
||||
/// Procedures in the manager.
|
||||
procedures: RwLock<HashMap<ProcedureId, ProcedureMetaRef>>,
|
||||
/// Running procedures.
|
||||
running_procedures: Mutex<HashSet<ProcedureId>>,
|
||||
/// Ids and finished time of finished procedures.
|
||||
finished_procedures: Mutex<VecDeque<(ProcedureId, Instant)>>,
|
||||
@@ -199,6 +266,10 @@ impl ContextProvider for ManagerContext {
|
||||
let procedure_id = procedure_id.to_string();
|
||||
self.poison_manager.try_put_poison(key, procedure_id).await
|
||||
}
|
||||
|
||||
async fn acquire_lock(&self, key: &StringKey) -> DynamicKeyLockGuard {
|
||||
acquire_dynamic_key_lock(&self.dynamic_key_lock, key).await
|
||||
}
|
||||
}
|
||||
|
||||
impl ManagerContext {
|
||||
@@ -206,6 +277,7 @@ impl ManagerContext {
|
||||
fn new(poison_manager: PoisonStoreRef) -> ManagerContext {
|
||||
ManagerContext {
|
||||
key_lock: KeyRwLock::new(),
|
||||
dynamic_key_lock: Arc::new(KeyRwLock::new()),
|
||||
loaders: Mutex::new(HashMap::new()),
|
||||
procedures: RwLock::new(HashMap::new()),
|
||||
running_procedures: Mutex::new(HashSet::new()),
|
||||
|
||||
@@ -23,9 +23,9 @@ use snafu::ResultExt;
|
||||
use tokio::time;
|
||||
|
||||
use crate::error::{self, ProcedurePanicSnafu, Result, RollbackTimesExceededSnafu};
|
||||
use crate::local::rwlock::OwnedKeyRwLockGuard;
|
||||
use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef};
|
||||
use crate::procedure::{Output, StringKey};
|
||||
use crate::rwlock::OwnedKeyRwLockGuard;
|
||||
use crate::store::{ProcedureMessage, ProcedureStore};
|
||||
use crate::{
|
||||
BoxedProcedure, Context, Error, Procedure, ProcedureId, ProcedureState, ProcedureWithId, Status,
|
||||
@@ -581,6 +581,7 @@ impl Runner {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
@@ -588,13 +589,14 @@ mod tests {
|
||||
use common_error::mock::MockError;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_test_util::temp_dir::create_temp_dir;
|
||||
use futures::future::join_all;
|
||||
use futures_util::future::BoxFuture;
|
||||
use futures_util::FutureExt;
|
||||
use object_store::{EntryMode, ObjectStore};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use super::*;
|
||||
use crate::local::test_util;
|
||||
use crate::local::{test_util, DynamicKeyLockGuard};
|
||||
use crate::procedure::PoisonKeys;
|
||||
use crate::store::proc_path;
|
||||
use crate::test_util::InMemoryPoisonStore;
|
||||
@@ -666,6 +668,10 @@ mod tests {
|
||||
) -> Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn acquire_lock(&self, _key: &StringKey) -> DynamicKeyLockGuard {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
Context {
|
||||
@@ -1674,4 +1680,66 @@ mod tests {
|
||||
// If the procedure is poisoned, the poison key shouldn't be deleted.
|
||||
assert_eq!(procedure_id, ROOT_ID);
|
||||
}
|
||||
|
||||
fn test_procedure_with_dynamic_lock(
|
||||
shared_atomic_value: Arc<AtomicU64>,
|
||||
id: u64,
|
||||
) -> (BoxedProcedure, Arc<ProcedureMeta>) {
|
||||
let exec_fn = move |ctx: Context| {
|
||||
let moved_shared_atomic_value = shared_atomic_value.clone();
|
||||
let moved_ctx = ctx.clone();
|
||||
async move {
|
||||
debug!("Acquiring write lock, id: {}", id);
|
||||
let key = StringKey::Exclusive("test_lock".to_string());
|
||||
let guard = moved_ctx.provider.acquire_lock(&key).await;
|
||||
debug!("Acquired write lock, id: {}", id);
|
||||
let millis = rand::rng().random_range(10..=50);
|
||||
tokio::time::sleep(Duration::from_millis(millis)).await;
|
||||
let value = moved_shared_atomic_value.load(Ordering::Relaxed);
|
||||
moved_shared_atomic_value.store(value + 1, Ordering::Relaxed);
|
||||
debug!("Dropping write lock, id: {}", id);
|
||||
drop(guard);
|
||||
|
||||
Ok(Status::done())
|
||||
}
|
||||
.boxed()
|
||||
};
|
||||
|
||||
let adapter = ProcedureAdapter {
|
||||
data: "dynamic_lock".to_string(),
|
||||
lock_key: LockKey::new_exclusive([]),
|
||||
poison_keys: PoisonKeys::new([]),
|
||||
exec_fn,
|
||||
rollback_fn: None,
|
||||
};
|
||||
let meta = adapter.new_meta(ROOT_ID);
|
||||
|
||||
(Box::new(adapter), meta)
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_execute_with_dynamic_lock() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let shared_atomic_value = Arc::new(AtomicU64::new(0));
|
||||
let (procedure1, meta1) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 1);
|
||||
let (procedure2, meta2) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 2);
|
||||
|
||||
let dir = create_temp_dir("dynamic_lock");
|
||||
let object_store = test_util::new_object_store(&dir);
|
||||
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
|
||||
let mut runner1 = new_runner(meta1.clone(), procedure1, procedure_store.clone());
|
||||
let mut runner2 = new_runner(meta2.clone(), procedure2, procedure_store.clone());
|
||||
let ctx1 = context_with_provider(
|
||||
meta1.id,
|
||||
runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
|
||||
);
|
||||
let ctx2 = context_with_provider(
|
||||
meta2.id,
|
||||
// use same manager ctx as runner1
|
||||
runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
|
||||
);
|
||||
let tasks = [runner1.execute_once(&ctx1), runner2.execute_once(&ctx2)];
|
||||
join_all(tasks).await;
|
||||
assert_eq!(shared_atomic_value.load(Ordering::Relaxed), 2);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ use snafu::{ResultExt, Snafu};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::error::{self, Error, Result};
|
||||
use crate::local::DynamicKeyLockGuard;
|
||||
use crate::watcher::Watcher;
|
||||
|
||||
pub type Output = Arc<dyn Any + Send + Sync>;
|
||||
@@ -144,6 +145,9 @@ pub trait ContextProvider: Send + Sync {
|
||||
/// This method is used to mark a resource as being operated on by a procedure.
|
||||
/// If the poison key already exists with a different value, the operation will fail.
|
||||
async fn try_put_poison(&self, key: &PoisonKey, procedure_id: ProcedureId) -> Result<()>;
|
||||
|
||||
/// Acquires a key lock for the procedure.
|
||||
async fn acquire_lock(&self, key: &StringKey) -> DynamicKeyLockGuard;
|
||||
}
|
||||
|
||||
/// Reference-counted pointer to [ContextProvider].
|
||||
|
||||
@@ -18,8 +18,18 @@ use std::sync::{Arc, Mutex};
|
||||
|
||||
use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
|
||||
|
||||
/// A guard that owns a read or write lock on a key.
|
||||
///
|
||||
/// This enum wraps either a read or write lock guard obtained from a `KeyRwLock`.
|
||||
/// The guard is automatically released when it is dropped.
|
||||
pub enum OwnedKeyRwLockGuard {
|
||||
/// Represents a shared read lock on a key.
|
||||
/// Multiple read locks can be held simultaneously for the same key.
|
||||
Read { _guard: OwnedRwLockReadGuard<()> },
|
||||
|
||||
/// Represents an exclusive write lock on a key.
|
||||
/// Only one write lock can be held at a time for a given key,
|
||||
/// and no read locks can be held simultaneously with a write lock.
|
||||
Write { _guard: OwnedRwLockWriteGuard<()> },
|
||||
}
|
||||
|
||||
@@ -36,7 +46,7 @@ impl From<OwnedRwLockWriteGuard<()>> for OwnedKeyRwLockGuard {
|
||||
}
|
||||
|
||||
/// Locks based on a key, allowing other keys to lock independently.
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Default)]
|
||||
pub struct KeyRwLock<K> {
|
||||
/// The inner map of locks for specific keys.
|
||||
inner: Mutex<HashMap<K, Arc<RwLock<()>>>>,
|
||||
@@ -24,7 +24,7 @@ use datatypes::prelude::*;
|
||||
use datatypes::vectors::{Helper as VectorHelper, VectorRef};
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{self, Error, FromScalarValueSnafu, IntoVectorSnafu, Result};
|
||||
use crate::error::{self, FromScalarValueSnafu, IntoVectorSnafu, Result};
|
||||
use crate::prelude::*;
|
||||
|
||||
pub type AggregateFunctionCreatorRef = Arc<dyn AggregateFunctionCreator>;
|
||||
@@ -166,8 +166,7 @@ impl DfAccumulator for DfAccumulatorAdaptor {
|
||||
let output_type = self.creator.output_type()?;
|
||||
let scalar_value = value
|
||||
.try_to_scalar_value(&output_type)
|
||||
.context(error::ToScalarValueSnafu)
|
||||
.map_err(Error::from)?;
|
||||
.context(error::ToScalarValueSnafu)?;
|
||||
Ok(scalar_value)
|
||||
}
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@ use datafusion_expr::LogicalPlan;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
/// The query request to be handled by the RegionServer (Datanode).
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct QueryRequest {
|
||||
/// The header of this request. Often to store some context of the query. None means all to defaults.
|
||||
pub header: Option<RegionRequestHeader>,
|
||||
|
||||
@@ -43,10 +43,10 @@ use mito2::config::MitoConfig;
|
||||
use mito2::engine::MitoEngine;
|
||||
use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
|
||||
use object_store::util::normalize_dir;
|
||||
use query::dummy_catalog::TableProviderFactoryRef;
|
||||
use query::QueryEngineFactory;
|
||||
use servers::export_metrics::ExportMetricsTask;
|
||||
use servers::server::ServerHandlers;
|
||||
use servers::Mode;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::path_utils::{region_dir, WAL_DIR};
|
||||
use store_api::region_engine::{RegionEngineRef, RegionRole};
|
||||
@@ -58,8 +58,8 @@ use tokio::sync::Notify;
|
||||
use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
|
||||
use crate::error::{
|
||||
self, BuildMetricEngineSnafu, BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu,
|
||||
MissingCacheSnafu, MissingKvBackendSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result,
|
||||
ShutdownInstanceSnafu, ShutdownServerSnafu, StartServerSnafu,
|
||||
MissingCacheSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result, ShutdownInstanceSnafu,
|
||||
ShutdownServerSnafu, StartServerSnafu,
|
||||
};
|
||||
use crate::event_listener::{
|
||||
new_region_server_event_channel, NoopRegionServerEventListener, RegionServerEventListenerRef,
|
||||
@@ -129,7 +129,7 @@ impl Datanode {
|
||||
self.services = services;
|
||||
}
|
||||
|
||||
pub async fn shutdown(&self) -> Result<()> {
|
||||
pub async fn shutdown(&mut self) -> Result<()> {
|
||||
self.services
|
||||
.shutdown_all()
|
||||
.await
|
||||
@@ -157,50 +157,49 @@ impl Datanode {
|
||||
|
||||
pub struct DatanodeBuilder {
|
||||
opts: DatanodeOptions,
|
||||
mode: Mode,
|
||||
table_provider_factory: Option<TableProviderFactoryRef>,
|
||||
plugins: Plugins,
|
||||
meta_client: Option<MetaClientRef>,
|
||||
kv_backend: Option<KvBackendRef>,
|
||||
kv_backend: KvBackendRef,
|
||||
cache_registry: Option<Arc<LayeredCacheRegistry>>,
|
||||
}
|
||||
|
||||
impl DatanodeBuilder {
|
||||
/// `kv_backend` is optional. If absent, the builder will try to build one
|
||||
/// by using the given `opts`
|
||||
pub fn new(opts: DatanodeOptions, plugins: Plugins, mode: Mode) -> Self {
|
||||
pub fn new(opts: DatanodeOptions, plugins: Plugins, kv_backend: KvBackendRef) -> Self {
|
||||
Self {
|
||||
opts,
|
||||
mode,
|
||||
table_provider_factory: None,
|
||||
plugins,
|
||||
meta_client: None,
|
||||
kv_backend: None,
|
||||
kv_backend,
|
||||
cache_registry: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_meta_client(self, meta_client: MetaClientRef) -> Self {
|
||||
Self {
|
||||
meta_client: Some(meta_client),
|
||||
..self
|
||||
}
|
||||
pub fn options(&self) -> &DatanodeOptions {
|
||||
&self.opts
|
||||
}
|
||||
|
||||
pub fn with_cache_registry(self, cache_registry: Arc<LayeredCacheRegistry>) -> Self {
|
||||
Self {
|
||||
cache_registry: Some(cache_registry),
|
||||
..self
|
||||
}
|
||||
pub fn with_meta_client(&mut self, client: MetaClientRef) -> &mut Self {
|
||||
self.meta_client = Some(client);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_kv_backend(self, kv_backend: KvBackendRef) -> Self {
|
||||
Self {
|
||||
kv_backend: Some(kv_backend),
|
||||
..self
|
||||
}
|
||||
pub fn with_cache_registry(&mut self, registry: Arc<LayeredCacheRegistry>) -> &mut Self {
|
||||
self.cache_registry = Some(registry);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn kv_backend(&self) -> &KvBackendRef {
|
||||
&self.kv_backend
|
||||
}
|
||||
|
||||
pub fn with_table_provider_factory(&mut self, factory: TableProviderFactoryRef) -> &mut Self {
|
||||
self.table_provider_factory = Some(factory);
|
||||
self
|
||||
}
|
||||
|
||||
pub async fn build(mut self) -> Result<Datanode> {
|
||||
let mode = &self.mode;
|
||||
let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
|
||||
|
||||
let meta_client = self.meta_client.take();
|
||||
@@ -210,8 +209,6 @@ impl DatanodeBuilder {
|
||||
// writable upon open.
|
||||
let controlled_by_metasrv = meta_client.is_some();
|
||||
|
||||
let kv_backend = self.kv_backend.take().context(MissingKvBackendSnafu)?;
|
||||
|
||||
// build and initialize region server
|
||||
let (region_event_listener, region_event_receiver) = if controlled_by_metasrv {
|
||||
let (tx, rx) = new_region_server_event_channel();
|
||||
@@ -233,7 +230,7 @@ impl DatanodeBuilder {
|
||||
.new_region_server(schema_metadata_manager, region_event_listener)
|
||||
.await?;
|
||||
|
||||
let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone());
|
||||
let datanode_table_manager = DatanodeTableManager::new(self.kv_backend.clone());
|
||||
let table_values = datanode_table_manager
|
||||
.tables(node_id)
|
||||
.try_collect::<Vec<_>>()
|
||||
@@ -273,19 +270,18 @@ impl DatanodeBuilder {
|
||||
None
|
||||
};
|
||||
|
||||
let is_standalone = heartbeat_task.is_none();
|
||||
let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
|
||||
Some(self.opts.storage.data_home.clone()),
|
||||
mode,
|
||||
self.opts.enable_telemetry,
|
||||
is_standalone && self.opts.enable_telemetry,
|
||||
)
|
||||
.await;
|
||||
|
||||
let leases_notifier =
|
||||
if self.opts.require_lease_before_startup && matches!(mode, Mode::Distributed) {
|
||||
Some(Arc::new(Notify::new()))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let leases_notifier = if self.opts.require_lease_before_startup && !is_standalone {
|
||||
Some(Arc::new(Notify::new()))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let export_metrics_task =
|
||||
ExportMetricsTask::try_new(&self.opts.export_metrics, Some(&self.plugins))
|
||||
@@ -363,7 +359,11 @@ impl DatanodeBuilder {
|
||||
);
|
||||
let query_engine = query_engine_factory.query_engine();
|
||||
|
||||
let table_provider_factory = Arc::new(DummyTableProviderFactory);
|
||||
let table_provider_factory = self
|
||||
.table_provider_factory
|
||||
.clone()
|
||||
.unwrap_or_else(|| Arc::new(DummyTableProviderFactory));
|
||||
|
||||
let mut region_server = RegionServer::with_table_provider(
|
||||
query_engine,
|
||||
common_runtime::global_runtime(),
|
||||
@@ -398,45 +398,46 @@ impl DatanodeBuilder {
|
||||
schema_metadata_manager: SchemaMetadataManagerRef,
|
||||
plugins: Plugins,
|
||||
) -> Result<Vec<RegionEngineRef>> {
|
||||
let mut engines = vec![];
|
||||
let mut metric_engine_config = opts.region_engine.iter().find_map(|c| match c {
|
||||
RegionEngineConfig::Metric(config) => Some(config.clone()),
|
||||
_ => None,
|
||||
});
|
||||
let mut metric_engine_config = metric_engine::config::EngineConfig::default();
|
||||
let mut mito_engine_config = MitoConfig::default();
|
||||
let mut file_engine_config = file_engine::config::EngineConfig::default();
|
||||
|
||||
for engine in &opts.region_engine {
|
||||
match engine {
|
||||
RegionEngineConfig::Mito(config) => {
|
||||
let mito_engine = Self::build_mito_engine(
|
||||
opts,
|
||||
object_store_manager.clone(),
|
||||
config.clone(),
|
||||
schema_metadata_manager.clone(),
|
||||
plugins.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let metric_engine = MetricEngine::try_new(
|
||||
mito_engine.clone(),
|
||||
metric_engine_config.take().unwrap_or_default(),
|
||||
)
|
||||
.context(BuildMetricEngineSnafu)?;
|
||||
engines.push(Arc::new(mito_engine) as _);
|
||||
engines.push(Arc::new(metric_engine) as _);
|
||||
mito_engine_config = config.clone();
|
||||
}
|
||||
RegionEngineConfig::File(config) => {
|
||||
let engine = FileRegionEngine::new(
|
||||
config.clone(),
|
||||
object_store_manager.default_object_store().clone(), // TODO: implement custom storage for file engine
|
||||
);
|
||||
engines.push(Arc::new(engine) as _);
|
||||
file_engine_config = config.clone();
|
||||
}
|
||||
RegionEngineConfig::Metric(_) => {
|
||||
// Already handled in `build_mito_engine`.
|
||||
RegionEngineConfig::Metric(metric_config) => {
|
||||
metric_engine_config = metric_config.clone();
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(engines)
|
||||
|
||||
let mito_engine = Self::build_mito_engine(
|
||||
opts,
|
||||
object_store_manager.clone(),
|
||||
mito_engine_config,
|
||||
schema_metadata_manager.clone(),
|
||||
plugins.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let metric_engine = MetricEngine::try_new(mito_engine.clone(), metric_engine_config)
|
||||
.context(BuildMetricEngineSnafu)?;
|
||||
|
||||
let file_engine = FileRegionEngine::new(
|
||||
file_engine_config,
|
||||
object_store_manager.default_object_store().clone(), // TODO: implement custom storage for file engine
|
||||
);
|
||||
|
||||
Ok(vec![
|
||||
Arc::new(mito_engine) as _,
|
||||
Arc::new(metric_engine) as _,
|
||||
Arc::new(file_engine) as _,
|
||||
])
|
||||
}
|
||||
|
||||
/// Builds [MitoEngine] according to options.
|
||||
@@ -634,7 +635,6 @@ mod tests {
|
||||
use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use mito2::engine::MITO_ENGINE_NAME;
|
||||
use servers::Mode;
|
||||
use store_api::region_request::RegionRequest;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
@@ -670,19 +670,19 @@ mod tests {
|
||||
let kv_backend = Arc::new(MemoryKvBackend::new());
|
||||
let layered_cache_registry = Arc::new(
|
||||
LayeredCacheRegistryBuilder::default()
|
||||
.add_cache_registry(build_datanode_cache_registry(kv_backend))
|
||||
.add_cache_registry(build_datanode_cache_registry(kv_backend.clone()))
|
||||
.build(),
|
||||
);
|
||||
|
||||
let builder = DatanodeBuilder::new(
|
||||
let mut builder = DatanodeBuilder::new(
|
||||
DatanodeOptions {
|
||||
node_id: Some(0),
|
||||
..Default::default()
|
||||
},
|
||||
Plugins::default(),
|
||||
Mode::Standalone,
|
||||
)
|
||||
.with_cache_registry(layered_cache_registry);
|
||||
kv_backend,
|
||||
);
|
||||
builder.with_cache_registry(layered_cache_registry);
|
||||
|
||||
let kv = Arc::new(MemoryKvBackend::default()) as _;
|
||||
setup_table_datanode(&kv).await;
|
||||
|
||||
@@ -150,12 +150,6 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Expect KvBackend but not found"))]
|
||||
MissingKvBackend {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid SQL, error: {}", msg))]
|
||||
InvalidSql { msg: String },
|
||||
|
||||
@@ -426,7 +420,6 @@ impl ErrorExt for Error {
|
||||
| MissingRequiredField { .. }
|
||||
| RegionEngineNotFound { .. }
|
||||
| ParseAddr { .. }
|
||||
| MissingKvBackend { .. }
|
||||
| TomlFormat { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
PayloadNotExist { .. }
|
||||
|
||||
@@ -20,7 +20,6 @@ use common_greptimedb_telemetry::{
|
||||
default_get_uuid, Collector, GreptimeDBTelemetry, GreptimeDBTelemetryTask,
|
||||
Mode as VersionReporterMode, TELEMETRY_INTERVAL,
|
||||
};
|
||||
use servers::Mode;
|
||||
|
||||
struct StandaloneGreptimeDBTelemetryCollector {
|
||||
uuid: Option<String>,
|
||||
@@ -55,7 +54,6 @@ impl Collector for StandaloneGreptimeDBTelemetryCollector {
|
||||
|
||||
pub async fn get_greptimedb_telemetry_task(
|
||||
working_home: Option<String>,
|
||||
mode: &Mode,
|
||||
enable: bool,
|
||||
) -> Arc<GreptimeDBTelemetryTask> {
|
||||
if !enable || cfg!(test) || cfg!(debug_assertions) {
|
||||
@@ -64,19 +62,14 @@ pub async fn get_greptimedb_telemetry_task(
|
||||
// Always enable.
|
||||
let should_report = Arc::new(AtomicBool::new(true));
|
||||
|
||||
match mode {
|
||||
Mode::Standalone => Arc::new(GreptimeDBTelemetryTask::enable(
|
||||
TELEMETRY_INTERVAL,
|
||||
Box::new(GreptimeDBTelemetry::new(
|
||||
working_home.clone(),
|
||||
Box::new(StandaloneGreptimeDBTelemetryCollector {
|
||||
uuid: default_get_uuid(&working_home),
|
||||
retry: 0,
|
||||
}),
|
||||
should_report.clone(),
|
||||
)),
|
||||
should_report,
|
||||
let uuid = default_get_uuid(&working_home);
|
||||
Arc::new(GreptimeDBTelemetryTask::enable(
|
||||
TELEMETRY_INTERVAL,
|
||||
Box::new(GreptimeDBTelemetry::new(
|
||||
working_home,
|
||||
Box::new(StandaloneGreptimeDBTelemetryCollector { uuid, retry: 0 }),
|
||||
should_report.clone(),
|
||||
)),
|
||||
Mode::Distributed => Arc::new(GreptimeDBTelemetryTask::disable()),
|
||||
}
|
||||
should_report,
|
||||
))
|
||||
}
|
||||
|
||||
@@ -39,6 +39,7 @@ pub struct RegionHeartbeatResponseHandler {
|
||||
region_server: RegionServer,
|
||||
catchup_tasks: TaskTracker<()>,
|
||||
downgrade_tasks: TaskTracker<()>,
|
||||
flush_tasks: TaskTracker<()>,
|
||||
}
|
||||
|
||||
/// Handler of the instruction.
|
||||
@@ -50,6 +51,7 @@ pub struct HandlerContext {
|
||||
region_server: RegionServer,
|
||||
catchup_tasks: TaskTracker<()>,
|
||||
downgrade_tasks: TaskTracker<()>,
|
||||
flush_tasks: TaskTracker<()>,
|
||||
}
|
||||
|
||||
impl HandlerContext {
|
||||
@@ -63,6 +65,7 @@ impl HandlerContext {
|
||||
region_server,
|
||||
catchup_tasks: TaskTracker::new(),
|
||||
downgrade_tasks: TaskTracker::new(),
|
||||
flush_tasks: TaskTracker::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -74,6 +77,7 @@ impl RegionHeartbeatResponseHandler {
|
||||
region_server,
|
||||
catchup_tasks: TaskTracker::new(),
|
||||
downgrade_tasks: TaskTracker::new(),
|
||||
flush_tasks: TaskTracker::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -95,8 +99,11 @@ impl RegionHeartbeatResponseHandler {
|
||||
handler_context.handle_upgrade_region_instruction(upgrade_region)
|
||||
})),
|
||||
Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
|
||||
Instruction::FlushRegion(flush_regions) => Ok(Box::new(move |handler_context| {
|
||||
handler_context.handle_flush_region_instruction(flush_regions)
|
||||
Instruction::FlushRegions(flush_regions) => Ok(Box::new(move |handler_context| {
|
||||
handler_context.handle_flush_regions_instruction(flush_regions)
|
||||
})),
|
||||
Instruction::FlushRegion(flush_region) => Ok(Box::new(move |handler_context| {
|
||||
handler_context.handle_flush_region_instruction(flush_region)
|
||||
})),
|
||||
}
|
||||
}
|
||||
@@ -111,6 +118,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
|
||||
| Some((_, Instruction::CloseRegion { .. }))
|
||||
| Some((_, Instruction::DowngradeRegion { .. }))
|
||||
| Some((_, Instruction::UpgradeRegion { .. }))
|
||||
| Some((_, Instruction::FlushRegion { .. }))
|
||||
)
|
||||
}
|
||||
|
||||
@@ -124,12 +132,14 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
|
||||
let region_server = self.region_server.clone();
|
||||
let catchup_tasks = self.catchup_tasks.clone();
|
||||
let downgrade_tasks = self.downgrade_tasks.clone();
|
||||
let flush_tasks = self.flush_tasks.clone();
|
||||
let handler = Self::build_handler(instruction)?;
|
||||
let _handle = common_runtime::spawn_global(async move {
|
||||
let reply = handler(HandlerContext {
|
||||
region_server,
|
||||
catchup_tasks,
|
||||
downgrade_tasks,
|
||||
flush_tasks,
|
||||
})
|
||||
.await;
|
||||
|
||||
|
||||
@@ -12,16 +12,17 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_meta::instruction::{FlushRegions, InstructionReply};
|
||||
use common_meta::instruction::{FlushRegions, InstructionReply, SimpleReply};
|
||||
use common_telemetry::warn;
|
||||
use futures_util::future::BoxFuture;
|
||||
use store_api::region_request::{RegionFlushRequest, RegionRequest};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::error;
|
||||
use crate::heartbeat::handler::HandlerContext;
|
||||
|
||||
impl HandlerContext {
|
||||
pub(crate) fn handle_flush_region_instruction(
|
||||
pub(crate) fn handle_flush_regions_instruction(
|
||||
self,
|
||||
flush_regions: FlushRegions,
|
||||
) -> BoxFuture<'static, Option<InstructionReply>> {
|
||||
@@ -49,6 +50,59 @@ impl HandlerContext {
|
||||
None
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn handle_flush_region_instruction(
|
||||
self,
|
||||
region_id: RegionId,
|
||||
) -> BoxFuture<'static, Option<InstructionReply>> {
|
||||
Box::pin(async move {
|
||||
let Some(writable) = self.region_server.is_region_leader(region_id) else {
|
||||
return Some(InstructionReply::FlushRegion(SimpleReply {
|
||||
result: false,
|
||||
error: Some("Region is not leader".to_string()),
|
||||
}));
|
||||
};
|
||||
|
||||
if !writable {
|
||||
return Some(InstructionReply::FlushRegion(SimpleReply {
|
||||
result: false,
|
||||
error: Some("Region is not writable".to_string()),
|
||||
}));
|
||||
}
|
||||
|
||||
let region_server_moved = self.region_server.clone();
|
||||
let register_result = self
|
||||
.flush_tasks
|
||||
.try_register(
|
||||
region_id,
|
||||
Box::pin(async move {
|
||||
let request = RegionRequest::Flush(RegionFlushRequest {
|
||||
row_group_size: None,
|
||||
});
|
||||
region_server_moved
|
||||
.handle_request(region_id, request)
|
||||
.await?;
|
||||
Ok(())
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
if register_result.is_busy() {
|
||||
warn!("Another flush task is running for the region: {region_id}");
|
||||
}
|
||||
let mut watcher = register_result.into_watcher();
|
||||
let result = self.flush_tasks.wait_until_finish(&mut watcher).await;
|
||||
match result {
|
||||
Ok(()) => Some(InstructionReply::FlushRegion(SimpleReply {
|
||||
result: true,
|
||||
error: None,
|
||||
})),
|
||||
Err(err) => Some(InstructionReply::FlushRegion(SimpleReply {
|
||||
result: false,
|
||||
error: Some(format!("{err:?}")),
|
||||
})),
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -84,7 +138,7 @@ mod tests {
|
||||
|
||||
let reply = handler_context
|
||||
.clone()
|
||||
.handle_flush_region_instruction(FlushRegions {
|
||||
.handle_flush_regions_instruction(FlushRegions {
|
||||
region_ids: region_ids.clone(),
|
||||
})
|
||||
.await;
|
||||
@@ -94,7 +148,7 @@ mod tests {
|
||||
flushed_region_ids.write().unwrap().clear();
|
||||
let not_found_region_ids = (0..2).map(|i| RegionId::new(2048, i)).collect::<Vec<_>>();
|
||||
let reply = handler_context
|
||||
.handle_flush_region_instruction(FlushRegions {
|
||||
.handle_flush_regions_instruction(FlushRegions {
|
||||
region_ids: not_found_region_ids.clone(),
|
||||
})
|
||||
.await;
|
||||
|
||||
@@ -144,6 +144,11 @@ impl<T: Send + Sync + Clone + 'static> TaskTracker<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Waits for a [RegisterResult] and returns a [WaitResult].
|
||||
pub(crate) async fn wait_until_finish(&self, watcher: &mut TaskWatcher<T>) -> Result<T> {
|
||||
wait(watcher).await
|
||||
}
|
||||
|
||||
/// Tries to register a new async task, returns [RegisterResult::Busy] if previous task is running.
|
||||
pub(crate) async fn try_register(
|
||||
&self,
|
||||
|
||||
@@ -62,7 +62,7 @@ impl<'a> DatanodeServiceBuilder<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn build(mut self) -> Result<ServerHandlers> {
|
||||
pub fn build(mut self) -> Result<ServerHandlers> {
|
||||
let handlers = ServerHandlers::default();
|
||||
|
||||
if let Some(grpc_server) = self.grpc_server.take() {
|
||||
@@ -70,7 +70,7 @@ impl<'a> DatanodeServiceBuilder<'a> {
|
||||
addr: &self.opts.grpc.bind_addr,
|
||||
})?;
|
||||
let handler: ServerHandler = (Box::new(grpc_server), addr);
|
||||
handlers.insert(handler).await;
|
||||
handlers.insert(handler);
|
||||
}
|
||||
|
||||
if self.enable_http_service {
|
||||
@@ -82,7 +82,7 @@ impl<'a> DatanodeServiceBuilder<'a> {
|
||||
addr: &self.opts.http.addr,
|
||||
})?;
|
||||
let handler: ServerHandler = (Box::new(http_server), addr);
|
||||
handlers.insert(handler).await;
|
||||
handlers.insert(handler);
|
||||
}
|
||||
|
||||
Ok(handlers)
|
||||
|
||||
@@ -20,6 +20,7 @@ use snafu::{ensure, ResultExt};
|
||||
|
||||
use crate::data_type::{ConcreteDataType, DataType};
|
||||
use crate::error::{self, Result};
|
||||
use crate::types::cast;
|
||||
use crate::value::Value;
|
||||
use crate::vectors::operations::VectorOp;
|
||||
use crate::vectors::{TimestampMillisecondVector, VectorRef};
|
||||
@@ -178,6 +179,18 @@ impl ColumnDefaultConstraint {
|
||||
}
|
||||
}
|
||||
|
||||
/// Cast default value to given type
|
||||
pub fn cast_to_datatype(&self, data_type: &ConcreteDataType) -> Result<Self> {
|
||||
match self {
|
||||
ColumnDefaultConstraint::Value(v) => Ok(Self::Value(cast(v.clone(), data_type)?)),
|
||||
ColumnDefaultConstraint::Function(expr) => match &expr[..] {
|
||||
// no need to cast, since function always require a data_type when need to create default value
|
||||
CURRENT_TIMESTAMP | CURRENT_TIMESTAMP_FN | NOW_FN => Ok(self.clone()),
|
||||
_ => error::UnsupportedDefaultExprSnafu { expr }.fail(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Only create default vector if it's impure, i.e., it's a function.
|
||||
///
|
||||
/// This helps to delay creating constant default values to mito engine while also keeps impure default have consistent values
|
||||
@@ -253,9 +266,10 @@ fn create_current_timestamp_vector(
|
||||
data_type: &ConcreteDataType,
|
||||
num_rows: usize,
|
||||
) -> Result<VectorRef> {
|
||||
let current_timestamp_vector = TimestampMillisecondVector::from_values(
|
||||
std::iter::repeat(util::current_time_millis()).take(num_rows),
|
||||
);
|
||||
let current_timestamp_vector = TimestampMillisecondVector::from_values(std::iter::repeat_n(
|
||||
util::current_time_millis(),
|
||||
num_rows,
|
||||
));
|
||||
if data_type.is_timestamp() {
|
||||
current_timestamp_vector.cast(data_type)
|
||||
} else {
|
||||
|
||||
@@ -198,8 +198,7 @@ impl fmt::Debug for ConstantVector {
|
||||
|
||||
impl Serializable for ConstantVector {
|
||||
fn serialize_to_json(&self) -> Result<Vec<serde_json::Value>> {
|
||||
std::iter::repeat(self.get(0))
|
||||
.take(self.len())
|
||||
std::iter::repeat_n(self.get(0), self.len())
|
||||
.map(serde_json::Value::try_from)
|
||||
.collect::<serde_json::Result<_>>()
|
||||
.context(SerializeSnafu)
|
||||
|
||||
@@ -412,7 +412,7 @@ pub(crate) fn replicate_decimal128(
|
||||
// Safety: std::iter::Repeat and std::iter::Take implement TrustedLen.
|
||||
builder
|
||||
.mutable_array
|
||||
.append_trusted_len_iter(std::iter::repeat(data).take(repeat_times));
|
||||
.append_trusted_len_iter(std::iter::repeat_n(data, repeat_times));
|
||||
}
|
||||
}
|
||||
None => {
|
||||
|
||||
@@ -120,9 +120,7 @@ impl fmt::Debug for NullVector {
|
||||
|
||||
impl Serializable for NullVector {
|
||||
fn serialize_to_json(&self) -> Result<Vec<serde_json::Value>> {
|
||||
Ok(std::iter::repeat(serde_json::Value::Null)
|
||||
.take(self.len())
|
||||
.collect())
|
||||
Ok(std::iter::repeat_n(serde_json::Value::Null, self.len()).collect())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -388,7 +388,7 @@ pub(crate) fn replicate_primitive<T: LogicalPrimitiveType>(
|
||||
// Safety: std::iter::Repeat and std::iter::Take implement TrustedLen.
|
||||
builder
|
||||
.mutable_array
|
||||
.append_trusted_len_iter(std::iter::repeat(data).take(repeat_times));
|
||||
.append_trusted_len_iter(std::iter::repeat_n(data, repeat_times));
|
||||
}
|
||||
}
|
||||
None => {
|
||||
|
||||
@@ -166,7 +166,7 @@ impl ScalarVector for StringVector {
|
||||
}
|
||||
|
||||
pub struct StringVectorBuilder {
|
||||
mutable_array: MutableStringArray,
|
||||
pub mutable_array: MutableStringArray,
|
||||
}
|
||||
|
||||
impl MutableVector for StringVectorBuilder {
|
||||
|
||||
@@ -13,9 +13,11 @@ arrow.workspace = true
|
||||
arrow-schema.workspace = true
|
||||
async-recursion = "1.0"
|
||||
async-trait.workspace = true
|
||||
auth.workspace = true
|
||||
bytes.workspace = true
|
||||
cache.workspace = true
|
||||
catalog.workspace = true
|
||||
chrono.workspace = true
|
||||
client.workspace = true
|
||||
common-base.workspace = true
|
||||
common-config.workspace = true
|
||||
@@ -39,16 +41,13 @@ datafusion-expr.workspace = true
|
||||
datafusion-physical-expr.workspace = true
|
||||
datafusion-substrait.workspace = true
|
||||
datatypes.workspace = true
|
||||
dfir_rs = { version = "0.13.0", default-features = false }
|
||||
enum-as-inner = "0.6.0"
|
||||
enum_dispatch = "0.3"
|
||||
futures.workspace = true
|
||||
get-size2 = "0.1.2"
|
||||
greptime-proto.workspace = true
|
||||
# This fork of hydroflow is simply for keeping our dependency in our org, and pin the version
|
||||
# otherwise it is the same with upstream repo
|
||||
chrono.workspace = true
|
||||
http.workspace = true
|
||||
hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" }
|
||||
itertools.workspace = true
|
||||
lazy_static.workspace = true
|
||||
meta-client.workspace = true
|
||||
@@ -60,6 +59,7 @@ partition.workspace = true
|
||||
prometheus.workspace = true
|
||||
prost.workspace = true
|
||||
query.workspace = true
|
||||
rand.workspace = true
|
||||
serde.workspace = true
|
||||
servers.workspace = true
|
||||
session.workspace = true
|
||||
|
||||
@@ -107,6 +107,7 @@ pub struct FlownodeOptions {
|
||||
pub tracing: TracingOptions,
|
||||
pub heartbeat: HeartbeatOptions,
|
||||
pub query: QueryOptions,
|
||||
pub user_provider: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for FlownodeOptions {
|
||||
@@ -121,6 +122,7 @@ impl Default for FlownodeOptions {
|
||||
tracing: TracingOptions::default(),
|
||||
heartbeat: HeartbeatOptions::default(),
|
||||
query: QueryOptions::default(),
|
||||
user_provider: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ use api::v1::flow::{
|
||||
};
|
||||
use api::v1::region::InsertRequests;
|
||||
use catalog::CatalogManager;
|
||||
use common_base::Plugins;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::ddl::create_flow::FlowType;
|
||||
use common_meta::error::Result as MetaResult;
|
||||
@@ -37,11 +38,12 @@ use tokio::sync::{Mutex, RwLock};
|
||||
|
||||
use crate::adapter::{CreateFlowArgs, StreamingEngine};
|
||||
use crate::batching_mode::engine::BatchingEngine;
|
||||
use crate::batching_mode::{FRONTEND_SCAN_TIMEOUT, MIN_REFRESH_DURATION};
|
||||
use crate::engine::FlowEngine;
|
||||
use crate::error::{
|
||||
CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, IllegalCheckTaskStateSnafu,
|
||||
InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu, SyncCheckTaskSnafu,
|
||||
UnexpectedSnafu,
|
||||
InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu, NoAvailableFrontendSnafu,
|
||||
SyncCheckTaskSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::metrics::METRIC_FLOW_TASK_COUNT;
|
||||
use crate::repr::{self, DiffRow};
|
||||
@@ -62,6 +64,7 @@ pub struct FlowDualEngine {
|
||||
flow_metadata_manager: Arc<FlowMetadataManager>,
|
||||
catalog_manager: Arc<dyn CatalogManager>,
|
||||
check_task: tokio::sync::Mutex<Option<ConsistentCheckTask>>,
|
||||
plugins: Plugins,
|
||||
}
|
||||
|
||||
impl FlowDualEngine {
|
||||
@@ -70,6 +73,7 @@ impl FlowDualEngine {
|
||||
batching_engine: Arc<BatchingEngine>,
|
||||
flow_metadata_manager: Arc<FlowMetadataManager>,
|
||||
catalog_manager: Arc<dyn CatalogManager>,
|
||||
plugins: Plugins,
|
||||
) -> Self {
|
||||
Self {
|
||||
streaming_engine,
|
||||
@@ -78,9 +82,19 @@ impl FlowDualEngine {
|
||||
flow_metadata_manager,
|
||||
catalog_manager,
|
||||
check_task: Mutex::new(None),
|
||||
plugins,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn plugins(&self) -> &Plugins {
|
||||
&self.plugins
|
||||
}
|
||||
|
||||
/// Determine if the engine is in distributed mode
|
||||
pub fn is_distributed(&self) -> bool {
|
||||
self.streaming_engine.node_id.is_some()
|
||||
}
|
||||
|
||||
pub fn streaming_engine(&self) -> Arc<StreamingEngine> {
|
||||
self.streaming_engine.clone()
|
||||
}
|
||||
@@ -89,6 +103,39 @@ impl FlowDualEngine {
|
||||
self.batching_engine.clone()
|
||||
}
|
||||
|
||||
/// In distributed mode, scan periodically(1s) until available frontend is found, or timeout,
|
||||
/// in standalone mode, return immediately
|
||||
/// notice here if any frontend appear in cluster info this function will return immediately
|
||||
async fn wait_for_available_frontend(&self, timeout: std::time::Duration) -> Result<(), Error> {
|
||||
if !self.is_distributed() {
|
||||
return Ok(());
|
||||
}
|
||||
let frontend_client = self.batching_engine().frontend_client.clone();
|
||||
let sleep_duration = std::time::Duration::from_millis(1_000);
|
||||
let now = std::time::Instant::now();
|
||||
loop {
|
||||
let frontend_list = frontend_client.scan_for_frontend().await?;
|
||||
if !frontend_list.is_empty() {
|
||||
let fe_list = frontend_list
|
||||
.iter()
|
||||
.map(|(_, info)| &info.peer.addr)
|
||||
.collect::<Vec<_>>();
|
||||
info!("Available frontend found: {:?}", fe_list);
|
||||
return Ok(());
|
||||
}
|
||||
let elapsed = now.elapsed();
|
||||
tokio::time::sleep(sleep_duration).await;
|
||||
info!("Waiting for available frontend, elapsed={:?}", elapsed);
|
||||
if elapsed >= timeout {
|
||||
return NoAvailableFrontendSnafu {
|
||||
timeout,
|
||||
context: "No available frontend found in cluster info",
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Try to sync with check task, this is only used in drop flow&flush flow, so a flow id is required
|
||||
///
|
||||
/// the need to sync is to make sure flush flow actually get called
|
||||
@@ -338,18 +385,36 @@ struct ConsistentCheckTask {
|
||||
|
||||
impl ConsistentCheckTask {
|
||||
async fn start_check_task(engine: &Arc<FlowDualEngine>) -> Result<Self, Error> {
|
||||
// first do recover flows
|
||||
engine.check_flow_consistent(true, false).await?;
|
||||
|
||||
let inner = engine.clone();
|
||||
let engine = engine.clone();
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
|
||||
let (trigger_tx, mut trigger_rx) =
|
||||
tokio::sync::mpsc::channel::<(bool, bool, tokio::sync::oneshot::Sender<()>)>(10);
|
||||
let handle = common_runtime::spawn_global(async move {
|
||||
// first check if available frontend is found
|
||||
if let Err(err) = engine
|
||||
.wait_for_available_frontend(FRONTEND_SCAN_TIMEOUT)
|
||||
.await
|
||||
{
|
||||
warn!("No frontend is available yet:\n {err:?}");
|
||||
}
|
||||
|
||||
// then do recover flows, if failed, always retry
|
||||
let mut recover_retry = 0;
|
||||
while let Err(err) = engine.check_flow_consistent(true, false).await {
|
||||
recover_retry += 1;
|
||||
error!(
|
||||
"Failed to recover flows:\n {err:?}, retry {} in {}s",
|
||||
recover_retry,
|
||||
MIN_REFRESH_DURATION.as_secs()
|
||||
);
|
||||
tokio::time::sleep(MIN_REFRESH_DURATION).await;
|
||||
}
|
||||
|
||||
// then do check flows, with configurable allow_create and allow_drop
|
||||
let (mut allow_create, mut allow_drop) = (false, false);
|
||||
let mut ret_signal: Option<tokio::sync::oneshot::Sender<()>> = None;
|
||||
loop {
|
||||
if let Err(err) = inner.check_flow_consistent(allow_create, allow_drop).await {
|
||||
if let Err(err) = engine.check_flow_consistent(allow_create, allow_drop).await {
|
||||
error!(err; "Failed to check flow consistent");
|
||||
}
|
||||
if let Some(done) = ret_signal.take() {
|
||||
@@ -534,7 +599,12 @@ impl FlowEngine for FlowDualEngine {
|
||||
match flow_type {
|
||||
Some(FlowType::Batching) => self.batching_engine.flush_flow(flow_id).await,
|
||||
Some(FlowType::Streaming) => self.streaming_engine.flush_flow(flow_id).await,
|
||||
None => Ok(0),
|
||||
None => {
|
||||
warn!(
|
||||
"Currently flow={flow_id} doesn't exist in flownode, ignore flush_flow request"
|
||||
);
|
||||
Ok(0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -19,8 +19,8 @@ use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::info;
|
||||
use dfir_rs::scheduled::graph::Dfir;
|
||||
use enum_as_inner::EnumAsInner;
|
||||
use hydroflow::scheduled::graph::Hydroflow;
|
||||
use snafu::ensure;
|
||||
use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
|
||||
|
||||
@@ -49,9 +49,9 @@ pub fn create_worker<'a>() -> (WorkerHandle, Worker<'a>) {
|
||||
(worker_handle, worker)
|
||||
}
|
||||
|
||||
/// ActiveDataflowState is a wrapper around `Hydroflow` and `DataflowState`
|
||||
/// ActiveDataflowState is a wrapper around `Dfir` and `DataflowState`
|
||||
pub(crate) struct ActiveDataflowState<'subgraph> {
|
||||
df: Hydroflow<'subgraph>,
|
||||
df: Dfir<'subgraph>,
|
||||
state: DataflowState,
|
||||
err_collector: ErrCollector,
|
||||
}
|
||||
@@ -59,7 +59,7 @@ pub(crate) struct ActiveDataflowState<'subgraph> {
|
||||
impl std::fmt::Debug for ActiveDataflowState<'_> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("ActiveDataflowState")
|
||||
.field("df", &"<Hydroflow>")
|
||||
.field("df", &"<Dfir>")
|
||||
.field("state", &self.state)
|
||||
.field("err_collector", &self.err_collector)
|
||||
.finish()
|
||||
@@ -69,7 +69,7 @@ impl std::fmt::Debug for ActiveDataflowState<'_> {
|
||||
impl Default for ActiveDataflowState<'_> {
|
||||
fn default() -> Self {
|
||||
ActiveDataflowState {
|
||||
df: Hydroflow::new(),
|
||||
df: Dfir::new(),
|
||||
state: DataflowState::default(),
|
||||
err_collector: ErrCollector::default(),
|
||||
}
|
||||
|
||||
@@ -31,4 +31,19 @@ pub const DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT: Duration = Duration::from_secs(
|
||||
pub const SLOW_QUERY_THRESHOLD: Duration = Duration::from_secs(60);
|
||||
|
||||
/// The minimum duration between two queries execution by batching mode task
|
||||
const MIN_REFRESH_DURATION: Duration = Duration::new(5, 0);
|
||||
pub const MIN_REFRESH_DURATION: Duration = Duration::new(5, 0);
|
||||
|
||||
/// Grpc connection timeout
|
||||
const GRPC_CONN_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
/// Grpc max retry number
|
||||
const GRPC_MAX_RETRIES: u32 = 3;
|
||||
|
||||
/// Flow wait for available frontend timeout,
|
||||
/// if failed to find available frontend after FRONTEND_SCAN_TIMEOUT elapsed, return error
|
||||
/// which should prevent flownode from starting
|
||||
pub const FRONTEND_SCAN_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
|
||||
/// Frontend activity timeout
|
||||
/// if frontend is down(not sending heartbeat) for more than FRONTEND_ACTIVITY_TIMEOUT, it will be removed from the list that flownode use to connect
|
||||
pub const FRONTEND_ACTIVITY_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
@@ -49,7 +49,8 @@ use crate::{CreateFlowArgs, Error, FlowId, TableName};
|
||||
pub struct BatchingEngine {
|
||||
tasks: RwLock<BTreeMap<FlowId, BatchingTask>>,
|
||||
shutdown_txs: RwLock<BTreeMap<FlowId, oneshot::Sender<()>>>,
|
||||
frontend_client: Arc<FrontendClient>,
|
||||
/// frontend client for insert request
|
||||
pub(crate) frontend_client: Arc<FrontendClient>,
|
||||
flow_metadata_manager: FlowMetadataManagerRef,
|
||||
table_meta: TableMetadataManagerRef,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
@@ -302,7 +303,7 @@ impl BatchingEngine {
|
||||
})
|
||||
.transpose()?;
|
||||
|
||||
info!(
|
||||
debug!(
|
||||
"Flow id={}, found time window expr={}",
|
||||
flow_id,
|
||||
phy_expr
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
|
||||
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::SystemTime;
|
||||
|
||||
use api::v1::greptime_request::Request;
|
||||
use api::v1::CreateTableExpr;
|
||||
@@ -25,14 +26,20 @@ use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::rpc::store::RangeRequest;
|
||||
use common_query::Output;
|
||||
use common_telemetry::warn;
|
||||
use meta_client::client::MetaClient;
|
||||
use rand::rng;
|
||||
use rand::seq::SliceRandom;
|
||||
use servers::query_handler::grpc::GrpcQueryHandler;
|
||||
use session::context::{QueryContextBuilder, QueryContextRef};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::batching_mode::DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT;
|
||||
use crate::error::{ExternalSnafu, InvalidRequestSnafu, UnexpectedSnafu};
|
||||
use crate::Error;
|
||||
use crate::batching_mode::{
|
||||
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT,
|
||||
GRPC_MAX_RETRIES,
|
||||
};
|
||||
use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
|
||||
use crate::{Error, FlowAuthHeader};
|
||||
|
||||
/// Just like [`GrpcQueryHandler`] but use BoxedError
|
||||
///
|
||||
@@ -75,6 +82,7 @@ pub enum FrontendClient {
|
||||
Distributed {
|
||||
meta_client: Arc<MetaClient>,
|
||||
chnl_mgr: ChannelManager,
|
||||
auth: Option<FlowAuthHeader>,
|
||||
},
|
||||
Standalone {
|
||||
/// for the sake of simplicity still use grpc even in standalone mode
|
||||
@@ -95,13 +103,17 @@ impl FrontendClient {
|
||||
)
|
||||
}
|
||||
|
||||
pub fn from_meta_client(meta_client: Arc<MetaClient>) -> Self {
|
||||
pub fn from_meta_client(meta_client: Arc<MetaClient>, auth: Option<FlowAuthHeader>) -> Self {
|
||||
common_telemetry::info!("Frontend client build with auth={:?}", auth);
|
||||
Self::Distributed {
|
||||
meta_client,
|
||||
chnl_mgr: {
|
||||
let cfg = ChannelConfig::new().timeout(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT);
|
||||
let cfg = ChannelConfig::new()
|
||||
.connect_timeout(GRPC_CONN_TIMEOUT)
|
||||
.timeout(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT);
|
||||
ChannelManager::with_config(cfg)
|
||||
},
|
||||
auth,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -122,10 +134,24 @@ impl DatabaseWithPeer {
|
||||
fn new(database: Database, peer: Peer) -> Self {
|
||||
Self { database, peer }
|
||||
}
|
||||
|
||||
/// Try sending a "SELECT 1" to the database
|
||||
async fn try_select_one(&self) -> Result<(), Error> {
|
||||
// notice here use `sql` for `SELECT 1` return 1 row
|
||||
let _ = self
|
||||
.database
|
||||
.sql("SELECT 1")
|
||||
.await
|
||||
.with_context(|_| InvalidRequestSnafu {
|
||||
context: format!("Failed to handle `SELECT 1` request at {:?}", self.peer),
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl FrontendClient {
|
||||
async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
|
||||
/// scan for available frontend from metadata
|
||||
pub(crate) async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
|
||||
let Self::Distributed { meta_client, .. } = self else {
|
||||
return Ok(vec![]);
|
||||
};
|
||||
@@ -155,8 +181,8 @@ impl FrontendClient {
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
/// Get the database with max `last_activity_ts`
|
||||
async fn get_last_active_frontend(
|
||||
/// Get the database with maximum `last_activity_ts`& is able to process query
|
||||
async fn get_latest_active_frontend(
|
||||
&self,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
@@ -164,6 +190,7 @@ impl FrontendClient {
|
||||
let Self::Distributed {
|
||||
meta_client: _,
|
||||
chnl_mgr,
|
||||
auth,
|
||||
} = self
|
||||
else {
|
||||
return UnexpectedSnafu {
|
||||
@@ -172,22 +199,56 @@ impl FrontendClient {
|
||||
.fail();
|
||||
};
|
||||
|
||||
let frontends = self.scan_for_frontend().await?;
|
||||
let mut peer = None;
|
||||
let mut interval = tokio::time::interval(GRPC_CONN_TIMEOUT);
|
||||
interval.tick().await;
|
||||
for retry in 0..GRPC_MAX_RETRIES {
|
||||
let mut frontends = self.scan_for_frontend().await?;
|
||||
let now_in_ms = SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis() as i64;
|
||||
// shuffle the frontends to avoid always pick the same one
|
||||
frontends.shuffle(&mut rng());
|
||||
|
||||
if let Some((_, val)) = frontends.iter().max_by_key(|(_, val)| val.last_activity_ts) {
|
||||
peer = Some(val.peer.clone());
|
||||
// found node with maximum last_activity_ts
|
||||
for (_, node_info) in frontends
|
||||
.iter()
|
||||
// filter out frontend that have been down for more than 1 min
|
||||
.filter(|(_, node_info)| {
|
||||
node_info.last_activity_ts + FRONTEND_ACTIVITY_TIMEOUT.as_millis() as i64
|
||||
> now_in_ms
|
||||
})
|
||||
{
|
||||
let addr = &node_info.peer.addr;
|
||||
let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]);
|
||||
let database = {
|
||||
let mut db = Database::new(catalog, schema, client);
|
||||
if let Some(auth) = auth {
|
||||
db.set_auth(auth.auth().clone());
|
||||
}
|
||||
db
|
||||
};
|
||||
let db = DatabaseWithPeer::new(database, node_info.peer.clone());
|
||||
match db.try_select_one().await {
|
||||
Ok(_) => return Ok(db),
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Failed to connect to frontend {} on retry={}: \n{e:?}",
|
||||
addr, retry
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
// no available frontend
|
||||
// sleep and retry
|
||||
interval.tick().await;
|
||||
}
|
||||
|
||||
let Some(peer) = peer else {
|
||||
UnexpectedSnafu {
|
||||
reason: format!("No frontend available: {:?}", frontends),
|
||||
}
|
||||
.fail()?
|
||||
};
|
||||
let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![peer.addr.clone()]);
|
||||
let database = Database::new(catalog, schema, client);
|
||||
Ok(DatabaseWithPeer::new(database, peer))
|
||||
NoAvailableFrontendSnafu {
|
||||
timeout: GRPC_CONN_TIMEOUT,
|
||||
context: "No available frontend found that is able to process query",
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
|
||||
pub async fn create(
|
||||
@@ -217,17 +278,17 @@ impl FrontendClient {
|
||||
) -> Result<u32, Error> {
|
||||
match self {
|
||||
FrontendClient::Distributed { .. } => {
|
||||
let db = self.get_last_active_frontend(catalog, schema).await?;
|
||||
let db = self.get_latest_active_frontend(catalog, schema).await?;
|
||||
|
||||
*peer_desc = Some(PeerDesc::Dist {
|
||||
peer: db.peer.clone(),
|
||||
});
|
||||
|
||||
db.database
|
||||
.handle(req.clone())
|
||||
.handle_with_retry(req.clone(), GRPC_MAX_RETRIES)
|
||||
.await
|
||||
.with_context(|_| InvalidRequestSnafu {
|
||||
context: format!("Failed to handle request: {:?}", req),
|
||||
context: format!("Failed to handle request at {:?}: {:?}", db.peer, req),
|
||||
})
|
||||
}
|
||||
FrontendClient::Standalone { database_client } => {
|
||||
|
||||
@@ -53,6 +53,7 @@ use crate::batching_mode::utils::{
|
||||
use crate::batching_mode::{
|
||||
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, MIN_REFRESH_DURATION, SLOW_QUERY_THRESHOLD,
|
||||
};
|
||||
use crate::df_optimizer::apply_df_optimizer;
|
||||
use crate::error::{
|
||||
ConvertColumnSchemaSnafu, DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu,
|
||||
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
|
||||
@@ -191,7 +192,7 @@ impl BatchingTask {
|
||||
frontend_client: &Arc<FrontendClient>,
|
||||
) -> Result<Option<(u32, Duration)>, Error> {
|
||||
if let Some(new_query) = self.gen_insert_plan(engine).await? {
|
||||
debug!("Generate new query: {:#?}", new_query);
|
||||
debug!("Generate new query: {}", new_query);
|
||||
self.execute_logical_plan(frontend_client, &new_query).await
|
||||
} else {
|
||||
debug!("Generate no query");
|
||||
@@ -541,7 +542,10 @@ impl BatchingTask {
|
||||
.clone()
|
||||
.rewrite(&mut add_auto_column)
|
||||
.with_context(|_| DatafusionSnafu {
|
||||
context: format!("Failed to rewrite plan {:?}", self.config.plan),
|
||||
context: format!(
|
||||
"Failed to rewrite plan:\n {}\n",
|
||||
self.config.plan
|
||||
),
|
||||
})?
|
||||
.data;
|
||||
let schema_len = plan.schema().fields().len();
|
||||
@@ -573,16 +577,19 @@ impl BatchingTask {
|
||||
|
||||
let mut add_filter = AddFilterRewriter::new(expr);
|
||||
let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone());
|
||||
// make a not optimized plan for clearer unparse
|
||||
|
||||
let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false)
|
||||
.await?;
|
||||
plan.clone()
|
||||
let rewrite = plan
|
||||
.clone()
|
||||
.rewrite(&mut add_filter)
|
||||
.and_then(|p| p.data.rewrite(&mut add_auto_column))
|
||||
.with_context(|_| DatafusionSnafu {
|
||||
context: format!("Failed to rewrite plan {plan:?}"),
|
||||
context: format!("Failed to rewrite plan:\n {}\n", plan),
|
||||
})?
|
||||
.data
|
||||
.data;
|
||||
// only apply optimize after complex rewrite is done
|
||||
apply_df_optimizer(rewrite).await?
|
||||
};
|
||||
|
||||
Ok(Some((new_plan, schema_len)))
|
||||
|
||||
@@ -704,6 +704,28 @@ mod test {
|
||||
),
|
||||
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')"
|
||||
),
|
||||
// complex time window index with where
|
||||
(
|
||||
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE number in (2, 3, 4) GROUP BY time_window;",
|
||||
Timestamp::new(1740394109, TimeUnit::Second),
|
||||
(
|
||||
"ts".to_string(),
|
||||
Some(Timestamp::new(1740394080, TimeUnit::Second)),
|
||||
Some(Timestamp::new(1740394140, TimeUnit::Second)),
|
||||
),
|
||||
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE numbers_with_ts.number IN (2, 3, 4) AND ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')"
|
||||
),
|
||||
// complex time window index with between and
|
||||
(
|
||||
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE number BETWEEN 2 AND 4 GROUP BY time_window;",
|
||||
Timestamp::new(1740394109, TimeUnit::Second),
|
||||
(
|
||||
"ts".to_string(),
|
||||
Some(Timestamp::new(1740394080, TimeUnit::Second)),
|
||||
Some(Timestamp::new(1740394140, TimeUnit::Second)),
|
||||
),
|
||||
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE (numbers_with_ts.number BETWEEN 2 AND 4) AND ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')"
|
||||
),
|
||||
// no time index
|
||||
(
|
||||
"SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;",
|
||||
|
||||
@@ -138,9 +138,12 @@ impl TreeNodeVisitor<'_> for FindGroupByFinalName {
|
||||
fn f_down(&mut self, node: &Self::Node) -> datafusion_common::Result<TreeNodeRecursion> {
|
||||
if let LogicalPlan::Aggregate(aggregate) = node {
|
||||
self.group_exprs = Some(aggregate.group_expr.iter().cloned().collect());
|
||||
debug!("Group by exprs: {:?}", self.group_exprs);
|
||||
debug!(
|
||||
"FindGroupByFinalName: Get Group by exprs from Aggregate: {:?}",
|
||||
self.group_exprs
|
||||
);
|
||||
} else if let LogicalPlan::Distinct(distinct) = node {
|
||||
debug!("Distinct: {:#?}", distinct);
|
||||
debug!("FindGroupByFinalName: Distinct: {}", node);
|
||||
match distinct {
|
||||
Distinct::All(input) => {
|
||||
if let LogicalPlan::TableScan(table_scan) = &**input {
|
||||
@@ -162,7 +165,10 @@ impl TreeNodeVisitor<'_> for FindGroupByFinalName {
|
||||
self.group_exprs = Some(distinct_on.on_expr.iter().cloned().collect())
|
||||
}
|
||||
}
|
||||
debug!("Group by exprs: {:?}", self.group_exprs);
|
||||
debug!(
|
||||
"FindGroupByFinalName: Get Group by exprs from Distinct: {:?}",
|
||||
self.group_exprs
|
||||
);
|
||||
}
|
||||
|
||||
Ok(TreeNodeRecursion::Continue)
|
||||
@@ -342,8 +348,8 @@ impl TreeNodeRewriter for AddAutoColumnRewriter {
|
||||
}
|
||||
} else {
|
||||
return Err(DataFusionError::Plan(format!(
|
||||
"Expect table have 0,1 or 2 columns more than query columns, found {} query columns {:?}, {} table columns {:?} at node {:?}",
|
||||
query_col_cnt, exprs, table_col_cnt, self.schema.column_schemas(), node
|
||||
"Expect table have 0,1 or 2 columns more than query columns, found {} query columns {:?}, {} table columns {:?}",
|
||||
query_col_cnt, exprs, table_col_cnt, self.schema.column_schemas()
|
||||
)));
|
||||
}
|
||||
|
||||
@@ -406,7 +412,9 @@ mod test {
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, Schema};
|
||||
use pretty_assertions::assert_eq;
|
||||
use query::query_engine::DefaultSerializer;
|
||||
use session::context::QueryContext;
|
||||
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
|
||||
|
||||
use super::*;
|
||||
use crate::test_utils::create_test_query_engine;
|
||||
@@ -701,4 +709,18 @@ mod test {
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_null_cast() {
|
||||
let query_engine = create_test_query_engine();
|
||||
let ctx = QueryContext::arc();
|
||||
let sql = "SELECT NULL::DOUBLE FROM numbers_with_ts";
|
||||
let plan = sql_to_df_plan(ctx, query_engine.clone(), sql, false)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let _sub_plan = DFLogicalSubstraitConvertor {}
|
||||
.encode(&plan, DefaultSerializer)
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user