mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
Compare commits
40 Commits
flow/choos
...
ci/update-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b2377d4b87 | ||
|
|
8d36ffb4e1 | ||
|
|
955ad644f7 | ||
|
|
c2e3c3d398 | ||
|
|
400229c384 | ||
|
|
cd9b6990bf | ||
|
|
a56e6e04c2 | ||
|
|
d324439014 | ||
|
|
038acda7cd | ||
|
|
a0d89c9ed1 | ||
|
|
3a5534722c | ||
|
|
1010a0c2ad | ||
|
|
f46cdbd66b | ||
|
|
864cc117b3 | ||
|
|
0ea9ab385d | ||
|
|
c7e9485534 | ||
|
|
57b53211d9 | ||
|
|
01076069a3 | ||
|
|
73b4b710cd | ||
|
|
14b655ea57 | ||
|
|
c780746171 | ||
|
|
1f62c3b545 | ||
|
|
5a9023d6b3 | ||
|
|
209f8371f2 | ||
|
|
30f1cbf0bf | ||
|
|
bbb6f8685e | ||
|
|
29540b55ee | ||
|
|
ca1641d1c4 | ||
|
|
b275793b36 | ||
|
|
265b144ca2 | ||
|
|
2ce5631d3c | ||
|
|
36d9346ffc | ||
|
|
36ff36e094 | ||
|
|
9cf5f0e940 | ||
|
|
2a0e9c930d | ||
|
|
787a50631b | ||
|
|
50df275097 | ||
|
|
8dca448baf | ||
|
|
828f69a562 | ||
|
|
04cae4b21e |
2
.github/CODEOWNERS
vendored
2
.github/CODEOWNERS
vendored
@@ -4,7 +4,7 @@
|
||||
|
||||
* @GreptimeTeam/db-approver
|
||||
|
||||
## [Module] Databse Engine
|
||||
## [Module] Database Engine
|
||||
/src/index @zhongzc
|
||||
/src/mito2 @evenyag @v0y4g3r @waynexia
|
||||
/src/query @evenyag
|
||||
|
||||
2
.github/scripts/create-version.sh
vendored
2
.github/scripts/create-version.sh
vendored
@@ -8,7 +8,7 @@ set -e
|
||||
# - If it's a nightly build, the version is 'nightly-YYYYMMDD-$(git rev-parse --short HEAD)', like 'nightly-20230712-e5b243c'.
|
||||
# create_version ${GIHUB_EVENT_NAME} ${NEXT_RELEASE_VERSION} ${NIGHTLY_RELEASE_PREFIX}
|
||||
function create_version() {
|
||||
# Read from envrionment variables.
|
||||
# Read from environment variables.
|
||||
if [ -z "$GITHUB_EVENT_NAME" ]; then
|
||||
echo "GITHUB_EVENT_NAME is empty" >&2
|
||||
exit 1
|
||||
|
||||
2
.github/scripts/deploy-greptimedb.sh
vendored
2
.github/scripts/deploy-greptimedb.sh
vendored
@@ -10,7 +10,7 @@ GREPTIMEDB_IMAGE_TAG=${GREPTIMEDB_IMAGE_TAG:-latest}
|
||||
ETCD_CHART="oci://registry-1.docker.io/bitnamicharts/etcd"
|
||||
GREPTIME_CHART="https://greptimeteam.github.io/helm-charts/"
|
||||
|
||||
# Ceate a cluster with 1 control-plane node and 5 workers.
|
||||
# Create a cluster with 1 control-plane node and 5 workers.
|
||||
function create_kind_cluster() {
|
||||
cat <<EOF | kind create cluster --name "${CLUSTER}" --image kindest/node:"$KUBERNETES_VERSION" --config=-
|
||||
kind: Cluster
|
||||
|
||||
46
.github/scripts/update-helm-charts-version.sh
vendored
Executable file
46
.github/scripts/update-helm-charts-version.sh
vendored
Executable file
@@ -0,0 +1,46 @@
|
||||
#!/bin/bash
|
||||
|
||||
set -e
|
||||
|
||||
VERSION=${VERSION}
|
||||
GITHUB_TOKEN=${GITHUB_TOKEN}
|
||||
|
||||
update_helm_charts_version() {
|
||||
# Configure Git configs.
|
||||
git config --global user.email update-helm-charts-version@greptime.com
|
||||
git config --global user.name update-helm-charts-version
|
||||
|
||||
# Clone helm-charts repository.
|
||||
git clone "https://x-access-token:${GITHUB_TOKEN}@github.com/GreptimeTeam/helm-charts.git"
|
||||
cd helm-charts
|
||||
|
||||
# Set default remote for gh CLI
|
||||
gh repo set-default GreptimeTeam/helm-charts
|
||||
|
||||
# Checkout a new branch.
|
||||
BRANCH_NAME="chore/greptimedb-${VERSION}"
|
||||
git checkout -b $BRANCH_NAME
|
||||
|
||||
# Update version.
|
||||
make update-version CHART=greptimedb-cluster VERSION=${VERSION}
|
||||
make update-version CHART=greptimedb-standalone VERSION=${VERSION}
|
||||
|
||||
# Update docs.
|
||||
make docs
|
||||
|
||||
# Commit the changes.
|
||||
git add .
|
||||
git commit -m "chore: Update GreptimeDB version to ${VERSION}"
|
||||
git push origin $BRANCH_NAME
|
||||
|
||||
# Create a Pull Request.
|
||||
gh pr create \
|
||||
--title "chore: Update GreptimeDB version to ${VERSION}" \
|
||||
--body "This PR updates the GreptimeDB version." \
|
||||
--base main \
|
||||
--head $BRANCH_NAME \
|
||||
--reviewer zyy17 \
|
||||
--reviewer daviderli614
|
||||
}
|
||||
|
||||
update_helm_charts_version
|
||||
42
.github/scripts/update-homebrew-greptme-version.sh
vendored
Executable file
42
.github/scripts/update-homebrew-greptme-version.sh
vendored
Executable file
@@ -0,0 +1,42 @@
|
||||
#!/bin/bash
|
||||
|
||||
set -e
|
||||
|
||||
VERSION=${VERSION}
|
||||
GITHUB_TOKEN=${GITHUB_TOKEN}
|
||||
|
||||
update_homebrew_greptime_version() {
|
||||
# Configure Git configs.
|
||||
git config --global user.email update-greptime-version@greptime.com
|
||||
git config --global user.name update-greptime-version
|
||||
|
||||
# Clone helm-charts repository.
|
||||
git clone "https://x-access-token:${GITHUB_TOKEN}@github.com/GreptimeTeam/homebrew-greptime.git"
|
||||
cd homebrew-greptime
|
||||
|
||||
# Set default remote for gh CLI
|
||||
gh repo set-default GreptimeTeam/homebrew-greptime
|
||||
|
||||
# Checkout a new branch.
|
||||
BRANCH_NAME="chore/greptimedb-${VERSION}"
|
||||
git checkout -b $BRANCH_NAME
|
||||
|
||||
# Update version.
|
||||
make update-greptime-version VERSION=${VERSION}
|
||||
|
||||
# Commit the changes.
|
||||
git add .
|
||||
git commit -m "chore: Update GreptimeDB version to ${VERSION}"
|
||||
git push origin $BRANCH_NAME
|
||||
|
||||
# Create a Pull Request.
|
||||
gh pr create \
|
||||
--title "chore: Update GreptimeDB version to ${VERSION}" \
|
||||
--body "This PR updates the GreptimeDB version." \
|
||||
--base main \
|
||||
--head $BRANCH_NAME \
|
||||
--reviewer zyy17 \
|
||||
--reviewer daviderli614
|
||||
}
|
||||
|
||||
update_homebrew_greptime_version
|
||||
2
.github/scripts/upload-artifacts-to-s3.sh
vendored
2
.github/scripts/upload-artifacts-to-s3.sh
vendored
@@ -41,7 +41,7 @@ function upload_artifacts() {
|
||||
# Updates the latest version information in AWS S3 if UPDATE_VERSION_INFO is true.
|
||||
function update_version_info() {
|
||||
if [ "$UPDATE_VERSION_INFO" == "true" ]; then
|
||||
# If it's the officail release(like v1.0.0, v1.0.1, v1.0.2, etc.), update latest-version.txt.
|
||||
# If it's the official release(like v1.0.0, v1.0.1, v1.0.2, etc.), update latest-version.txt.
|
||||
if [[ "$VERSION" =~ ^v[0-9]+\.[0-9]+\.[0-9]+$ ]]; then
|
||||
echo "Updating latest-version.txt"
|
||||
echo "$VERSION" > latest-version.txt
|
||||
|
||||
14
.github/workflows/develop.yml
vendored
14
.github/workflows/develop.yml
vendored
@@ -222,12 +222,12 @@ jobs:
|
||||
run: |
|
||||
sudo apt update && sudo apt install -y libfuzzer-14-dev
|
||||
cargo install cargo-fuzz cargo-gc-bin --force
|
||||
- name: Download pre-built binariy
|
||||
- name: Download pre-built binary
|
||||
uses: actions/download-artifact@v4
|
||||
with:
|
||||
name: bin
|
||||
path: .
|
||||
- name: Unzip bianry
|
||||
- name: Unzip binary
|
||||
run: |
|
||||
tar -xvf ./bin.tar.gz
|
||||
rm ./bin.tar.gz
|
||||
@@ -275,7 +275,7 @@ jobs:
|
||||
- name: Install cargo-gc-bin
|
||||
shell: bash
|
||||
run: cargo install cargo-gc-bin --force
|
||||
- name: Build greptime bianry
|
||||
- name: Build greptime binary
|
||||
shell: bash
|
||||
# `cargo gc` will invoke `cargo build` with specified args
|
||||
run: cargo gc --profile ci -- --bin greptime --features "pg_kvbackend,mysql_kvbackend"
|
||||
@@ -328,9 +328,9 @@ jobs:
|
||||
name: Setup Minio
|
||||
uses: ./.github/actions/setup-minio
|
||||
- if: matrix.mode.kafka
|
||||
name: Setup Kafka cluser
|
||||
name: Setup Kafka cluster
|
||||
uses: ./.github/actions/setup-kafka-cluster
|
||||
- name: Setup Etcd cluser
|
||||
- name: Setup Etcd cluster
|
||||
uses: ./.github/actions/setup-etcd-cluster
|
||||
# Prepares for fuzz tests
|
||||
- uses: arduino/setup-protoc@v3
|
||||
@@ -475,9 +475,9 @@ jobs:
|
||||
name: Setup Minio
|
||||
uses: ./.github/actions/setup-minio
|
||||
- if: matrix.mode.kafka
|
||||
name: Setup Kafka cluser
|
||||
name: Setup Kafka cluster
|
||||
uses: ./.github/actions/setup-kafka-cluster
|
||||
- name: Setup Etcd cluser
|
||||
- name: Setup Etcd cluster
|
||||
uses: ./.github/actions/setup-etcd-cluster
|
||||
# Prepares for fuzz tests
|
||||
- uses: arduino/setup-protoc@v3
|
||||
|
||||
2
.github/workflows/nightly-ci.yml
vendored
2
.github/workflows/nightly-ci.yml
vendored
@@ -127,6 +127,8 @@ jobs:
|
||||
with:
|
||||
nix_path: nixpkgs=channel:nixos-24.11
|
||||
- run: nix develop --command cargo build --bin greptime
|
||||
env:
|
||||
CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=mold"
|
||||
|
||||
check-status:
|
||||
name: Check status
|
||||
|
||||
54
.github/workflows/release.yml
vendored
54
.github/workflows/release.yml
vendored
@@ -88,7 +88,7 @@ env:
|
||||
# Controls whether to run tests, include unit-test, integration-test and sqlness.
|
||||
DISABLE_RUN_TESTS: ${{ inputs.skip_test || vars.DEFAULT_SKIP_TEST }}
|
||||
|
||||
# The scheduled version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-YYYYMMDD', like v0.2.0-nigthly-20230313;
|
||||
# The scheduled version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-YYYYMMDD', like v0.2.0-nightly-20230313;
|
||||
NIGHTLY_RELEASE_PREFIX: nightly
|
||||
|
||||
jobs:
|
||||
@@ -124,7 +124,7 @@ jobs:
|
||||
|
||||
# The create-version will create a global variable named 'version' in the global workflows.
|
||||
# - If it's a tag push release, the version is the tag name(${{ github.ref_name }});
|
||||
# - If it's a scheduled release, the version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-$buildTime', like v0.2.0-nigthly-20230313;
|
||||
# - If it's a scheduled release, the version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-$buildTime', like v0.2.0-nightly-20230313;
|
||||
# - If it's a manual release, the version is '${{ env.NEXT_RELEASE_VERSION }}-<short-git-sha>-YYYYMMDDSS', like v0.2.0-e5b243c-2023071245;
|
||||
- name: Create version
|
||||
id: create-version
|
||||
@@ -388,7 +388,7 @@ jobs:
|
||||
|
||||
### Stop runners ###
|
||||
# It's very necessary to split the job of releasing runners into 'stop-linux-amd64-runner' and 'stop-linux-arm64-runner'.
|
||||
# Because we can terminate the specified EC2 instance immediately after the job is finished without uncessary waiting.
|
||||
# Because we can terminate the specified EC2 instance immediately after the job is finished without unnecessary waiting.
|
||||
stop-linux-amd64-runner: # It's always run as the last job in the workflow to make sure that the runner is released.
|
||||
name: Stop linux-amd64 runner
|
||||
# Only run this job when the runner is allocated.
|
||||
@@ -444,7 +444,7 @@ jobs:
|
||||
bump-doc-version:
|
||||
name: Bump doc version
|
||||
if: ${{ github.event_name == 'push' || github.event_name == 'schedule' }}
|
||||
needs: [allocate-runners]
|
||||
needs: [allocate-runners, publish-github-release]
|
||||
runs-on: ubuntu-latest
|
||||
# Permission reference: https://docs.github.com/en/actions/using-jobs/assigning-permissions-to-jobs
|
||||
permissions:
|
||||
@@ -466,8 +466,8 @@ jobs:
|
||||
|
||||
bump-website-version:
|
||||
name: Bump website version
|
||||
if: ${{ github.event_name == 'push' || github.event_name == 'schedule' }}
|
||||
needs: [allocate-runners]
|
||||
if: ${{ github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
|
||||
needs: [allocate-runners, publish-github-release]
|
||||
runs-on: ubuntu-latest
|
||||
# Permission reference: https://docs.github.com/en/actions/using-jobs/assigning-permissions-to-jobs
|
||||
permissions:
|
||||
@@ -487,6 +487,48 @@ jobs:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
WEBSITE_REPO_TOKEN: ${{ secrets.WEBSITE_REPO_TOKEN }}
|
||||
|
||||
bump-helm-charts-version:
|
||||
name: Bump helm charts version
|
||||
if: ${{ github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
|
||||
needs: [allocate-runners, publish-github-release]
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: write
|
||||
pull-requests: write
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Bump helm charts version
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.HELM_CHARTS_REPO_TOKEN }}
|
||||
VERSION: ${{ needs.allocate-runners.outputs.version }}
|
||||
run: |
|
||||
./.github/scripts/update-helm-charts-version.sh
|
||||
|
||||
bump-homebrew-greptime-version:
|
||||
name: Bump homebrew greptime version
|
||||
if: ${{ github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
|
||||
needs: [allocate-runners, publish-github-release]
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: write
|
||||
pull-requests: write
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Bump homebrew greptime version
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.HOMEBREW_GREPTIME_REPO_TOKEN }}
|
||||
VERSION: ${{ needs.allocate-runners.outputs.version }}
|
||||
run: |
|
||||
./.github/scripts/update-homebrew-greptme-version.sh
|
||||
|
||||
notification:
|
||||
if: ${{ github.repository == 'GreptimeTeam/greptimedb' && (github.event_name == 'push' || github.event_name == 'schedule') && always() }}
|
||||
name: Send notification to Greptime team
|
||||
|
||||
2
.github/workflows/semantic-pull-request.yml
vendored
2
.github/workflows/semantic-pull-request.yml
vendored
@@ -14,6 +14,8 @@ concurrency:
|
||||
jobs:
|
||||
check:
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
pull-requests: write # Add permissions to modify PRs
|
||||
timeout-minutes: 10
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
3
.gitignore
vendored
3
.gitignore
vendored
@@ -58,3 +58,6 @@ tests-fuzz/corpus/
|
||||
|
||||
## default data home
|
||||
greptimedb_data
|
||||
|
||||
# github
|
||||
!/.github
|
||||
151
Cargo.lock
generated
151
Cargo.lock
generated
@@ -173,9 +173,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "anyhow"
|
||||
version = "1.0.98"
|
||||
version = "1.0.89"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487"
|
||||
checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6"
|
||||
|
||||
[[package]]
|
||||
name = "anymap2"
|
||||
@@ -1571,7 +1571,7 @@ dependencies = [
|
||||
"partition",
|
||||
"paste",
|
||||
"prometheus",
|
||||
"rustc-hash 2.1.1",
|
||||
"rustc-hash 2.0.0",
|
||||
"serde_json",
|
||||
"session",
|
||||
"snafu 0.8.5",
|
||||
@@ -1593,9 +1593,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cc"
|
||||
version = "1.2.20"
|
||||
version = "1.1.24"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "04da6a0d40b948dfc4fa8f5bbf402b0fc1a64a28dbf7d12ffd683550f2c1b63a"
|
||||
checksum = "812acba72f0a070b003d3697490d2b55b837230ae7c6c6497f05cc2ddbb8d938"
|
||||
dependencies = [
|
||||
"jobserver",
|
||||
"libc",
|
||||
@@ -2285,8 +2285,11 @@ dependencies = [
|
||||
name = "common-mem-prof"
|
||||
version = "0.15.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"common-error",
|
||||
"common-macro",
|
||||
"mappings",
|
||||
"pprof_util",
|
||||
"snafu 0.8.5",
|
||||
"tempfile",
|
||||
"tikv-jemalloc-ctl",
|
||||
@@ -2319,6 +2322,7 @@ dependencies = [
|
||||
"common-telemetry",
|
||||
"common-time",
|
||||
"common-wal",
|
||||
"common-workload",
|
||||
"datafusion-common",
|
||||
"datafusion-expr",
|
||||
"datatypes",
|
||||
@@ -2590,6 +2594,15 @@ dependencies = [
|
||||
"toml 0.8.19",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "common-workload"
|
||||
version = "0.15.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"common-telemetry",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "concurrent-queue"
|
||||
version = "2.5.0"
|
||||
@@ -2871,9 +2884,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-channel"
|
||||
version = "0.5.15"
|
||||
version = "0.5.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2"
|
||||
checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
]
|
||||
@@ -2930,9 +2943,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "csv"
|
||||
version = "1.3.1"
|
||||
version = "1.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "acdc4883a9c96732e4733212c01447ebd805833b7275a73ca3ee080fd77afdaf"
|
||||
checksum = "ac574ff4d437a7b5ad237ef331c17ccca63c46479e5b5453eb8e10bb99a759fe"
|
||||
dependencies = [
|
||||
"csv-core",
|
||||
"itoa",
|
||||
@@ -3517,6 +3530,7 @@ dependencies = [
|
||||
"common-time",
|
||||
"common-version",
|
||||
"common-wal",
|
||||
"common-workload",
|
||||
"dashmap",
|
||||
"datafusion",
|
||||
"datafusion-common",
|
||||
@@ -3979,6 +3993,25 @@ dependencies = [
|
||||
"syn 2.0.100",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "env_filter"
|
||||
version = "0.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "186e05a59d4c50738528153b83b0b0194d3a29507dfec16eccd4b342903397d0"
|
||||
dependencies = [
|
||||
"log",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "env_logger"
|
||||
version = "0.11.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "13c863f0904021b108aa8b2f55046443e6b1ebde8fd4a15c399893aae4fa069f"
|
||||
dependencies = [
|
||||
"env_filter",
|
||||
"log",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "equator"
|
||||
version = "0.2.2"
|
||||
@@ -4312,6 +4345,7 @@ dependencies = [
|
||||
"prometheus",
|
||||
"prost 0.13.5",
|
||||
"query",
|
||||
"rand 0.9.0",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"servers",
|
||||
@@ -4423,6 +4457,7 @@ dependencies = [
|
||||
"promql-parser",
|
||||
"prost 0.13.5",
|
||||
"query",
|
||||
"rand 0.9.0",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"servers",
|
||||
@@ -4820,7 +4855,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=17a3550751c8b1e02ec16be40101d5f24dc255c3#17a3550751c8b1e02ec16be40101d5f24dc255c3"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=7668a882d57ca6a2333146e0574b8f0c9d5008ae#7668a882d57ca6a2333146e0574b8f0c9d5008ae"
|
||||
dependencies = [
|
||||
"prost 0.13.5",
|
||||
"serde",
|
||||
@@ -5648,6 +5683,28 @@ dependencies = [
|
||||
"str_stack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "inferno"
|
||||
version = "0.12.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2094aecddc672e902cd773bad7071542f63641e01e9187c3bba4b43005e837e9"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"clap 4.5.19",
|
||||
"crossbeam-channel",
|
||||
"crossbeam-utils",
|
||||
"dashmap",
|
||||
"env_logger",
|
||||
"indexmap 2.9.0",
|
||||
"itoa",
|
||||
"log",
|
||||
"num-format",
|
||||
"once_cell",
|
||||
"quick-xml 0.37.5",
|
||||
"rgb",
|
||||
"str_stack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "influxdb_line_protocol"
|
||||
version = "0.1.0"
|
||||
@@ -6359,9 +6416,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "log"
|
||||
version = "0.4.27"
|
||||
version = "0.4.22"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94"
|
||||
checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
|
||||
|
||||
[[package]]
|
||||
name = "log-query"
|
||||
@@ -6553,6 +6610,19 @@ dependencies = [
|
||||
"thiserror 1.0.64",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mappings"
|
||||
version = "0.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e434981a332777c2b3062652d16a55f8e74fa78e6b1882633f0d77399c84fc2a"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"libc",
|
||||
"once_cell",
|
||||
"pprof_util",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "match_cfg"
|
||||
version = "0.1.0"
|
||||
@@ -6714,6 +6784,7 @@ dependencies = [
|
||||
"common-time",
|
||||
"common-version",
|
||||
"common-wal",
|
||||
"common-workload",
|
||||
"dashmap",
|
||||
"datatypes",
|
||||
"deadpool",
|
||||
@@ -8651,7 +8722,7 @@ dependencies = [
|
||||
"cfg-if",
|
||||
"criterion 0.5.1",
|
||||
"findshlibs",
|
||||
"inferno",
|
||||
"inferno 0.11.21",
|
||||
"libc",
|
||||
"log",
|
||||
"nix 0.26.4",
|
||||
@@ -8668,6 +8739,21 @@ dependencies = [
|
||||
"thiserror 1.0.64",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pprof_util"
|
||||
version = "0.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9fa015c78eed2130951e22c58d2095849391e73817ab2e74f71b0b9f63dd8416"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"backtrace",
|
||||
"flate2",
|
||||
"inferno 0.12.2",
|
||||
"num",
|
||||
"paste",
|
||||
"prost 0.13.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ppv-lite86"
|
||||
version = "0.2.20"
|
||||
@@ -8886,8 +8972,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "promql-parser"
|
||||
version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "60d851f6523a8215e2fbf86b6cef4548433f8b76092e9ffb607105de52ae63fd"
|
||||
source = "git+https://github.com/GreptimeTeam/promql-parser.git?rev=0410e8b459dda7cb222ce9596f8bf3971bd07bd2#0410e8b459dda7cb222ce9596f8bf3971bd07bd2"
|
||||
dependencies = [
|
||||
"cfgrammar",
|
||||
"chrono",
|
||||
@@ -8897,6 +8982,7 @@ dependencies = [
|
||||
"regex",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"unescaper",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -9252,6 +9338,15 @@ dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quick-xml"
|
||||
version = "0.37.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "331e97a1af0bf59823e6eadffe373d7b27f485be8748f71471c662c1f269b7fb"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quinn"
|
||||
version = "0.11.5"
|
||||
@@ -9262,7 +9357,7 @@ dependencies = [
|
||||
"pin-project-lite",
|
||||
"quinn-proto",
|
||||
"quinn-udp",
|
||||
"rustc-hash 2.1.1",
|
||||
"rustc-hash 2.0.0",
|
||||
"rustls",
|
||||
"socket2",
|
||||
"thiserror 1.0.64",
|
||||
@@ -9279,7 +9374,7 @@ dependencies = [
|
||||
"bytes",
|
||||
"rand 0.8.5",
|
||||
"ring",
|
||||
"rustc-hash 2.1.1",
|
||||
"rustc-hash 2.0.0",
|
||||
"rustls",
|
||||
"slab",
|
||||
"thiserror 1.0.64",
|
||||
@@ -9529,9 +9624,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "regex"
|
||||
version = "1.11.1"
|
||||
version = "1.11.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191"
|
||||
checksum = "38200e5ee88914975b69f657f0801b6f6dccafd44fd9326302a4aaeecfacb1d8"
|
||||
dependencies = [
|
||||
"aho-corasick",
|
||||
"memchr",
|
||||
@@ -9713,14 +9808,15 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "ring"
|
||||
version = "0.17.14"
|
||||
version = "0.17.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7"
|
||||
checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"cfg-if",
|
||||
"getrandom 0.2.15",
|
||||
"libc",
|
||||
"spin",
|
||||
"untrusted",
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
@@ -9814,6 +9910,7 @@ dependencies = [
|
||||
"hmac",
|
||||
"pbkdf2",
|
||||
"rand 0.8.5",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sha2",
|
||||
"stringprep",
|
||||
@@ -9822,8 +9919,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rskafka"
|
||||
version = "0.5.0"
|
||||
source = "git+https://github.com/influxdata/rskafka.git?rev=75535b5ad9bae4a5dbb582c82e44dfd81ec10105#75535b5ad9bae4a5dbb582c82e44dfd81ec10105"
|
||||
version = "0.6.0"
|
||||
source = "git+https://github.com/influxdata/rskafka.git?rev=8dbd01ed809f5a791833a594e85b144e36e45820#8dbd01ed809f5a791833a594e85b144e36e45820"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"chrono",
|
||||
@@ -9833,11 +9930,11 @@ dependencies = [
|
||||
"integer-encoding 4.0.2",
|
||||
"lz4",
|
||||
"parking_lot 0.12.3",
|
||||
"rand 0.8.5",
|
||||
"rand 0.9.0",
|
||||
"rsasl",
|
||||
"rustls",
|
||||
"snap",
|
||||
"thiserror 1.0.64",
|
||||
"thiserror 2.0.12",
|
||||
"tokio",
|
||||
"tokio-rustls",
|
||||
"tracing",
|
||||
@@ -9992,9 +10089,9 @@ checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
|
||||
|
||||
[[package]]
|
||||
name = "rustc-hash"
|
||||
version = "2.1.1"
|
||||
version = "2.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
|
||||
checksum = "583034fd73374156e66797ed8e5b0d5690409c9226b22d87cb7f19821c05d152"
|
||||
|
||||
[[package]]
|
||||
name = "rustc_version"
|
||||
|
||||
11
Cargo.toml
11
Cargo.toml
@@ -36,6 +36,7 @@ members = [
|
||||
"src/common/time",
|
||||
"src/common/version",
|
||||
"src/common/wal",
|
||||
"src/common/workload",
|
||||
"src/datanode",
|
||||
"src/datatypes",
|
||||
"src/file-engine",
|
||||
@@ -78,6 +79,7 @@ clippy.implicit_clone = "warn"
|
||||
clippy.result_large_err = "allow"
|
||||
clippy.large_enum_variant = "allow"
|
||||
clippy.doc_overindented_list_items = "allow"
|
||||
clippy.uninlined_format_args = "allow"
|
||||
rust.unknown_lints = "deny"
|
||||
rust.unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
|
||||
|
||||
@@ -130,7 +132,7 @@ etcd-client = "0.14"
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "17a3550751c8b1e02ec16be40101d5f24dc255c3" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7668a882d57ca6a2333146e0574b8f0c9d5008ae" }
|
||||
hex = "0.4"
|
||||
http = "1"
|
||||
humantime = "2.1"
|
||||
@@ -162,7 +164,9 @@ parquet = { version = "54.2", default-features = false, features = ["arrow", "as
|
||||
paste = "1.0"
|
||||
pin-project = "1.0"
|
||||
prometheus = { version = "0.13.3", features = ["process"] }
|
||||
promql-parser = { version = "0.5.1", features = ["ser"] }
|
||||
promql-parser = { git = "https://github.com/GreptimeTeam/promql-parser.git", rev = "0410e8b459dda7cb222ce9596f8bf3971bd07bd2", features = [
|
||||
"ser",
|
||||
] }
|
||||
prost = { version = "0.13", features = ["no-recursion-limit"] }
|
||||
raft-engine = { version = "0.4.1", default-features = false }
|
||||
rand = "0.9"
|
||||
@@ -175,7 +179,7 @@ reqwest = { version = "0.12", default-features = false, features = [
|
||||
"stream",
|
||||
"multipart",
|
||||
] }
|
||||
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev = "75535b5ad9bae4a5dbb582c82e44dfd81ec10105", features = [
|
||||
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev = "8dbd01ed809f5a791833a594e85b144e36e45820", features = [
|
||||
"transport-tls",
|
||||
] }
|
||||
rstest = "0.25"
|
||||
@@ -256,6 +260,7 @@ common-test-util = { path = "src/common/test-util" }
|
||||
common-time = { path = "src/common/time" }
|
||||
common-version = { path = "src/common/version" }
|
||||
common-wal = { path = "src/common/wal" }
|
||||
common-workload = { path = "src/common/workload" }
|
||||
datanode = { path = "src/datanode" }
|
||||
datatypes = { path = "src/datatypes" }
|
||||
file-engine = { path = "src/file-engine" }
|
||||
|
||||
@@ -215,4 +215,3 @@ Special thanks to all contributors! See [AUTHORS.md](https://github.com/Greptime
|
||||
- [Apache Parquet™](https://parquet.apache.org/) (file storage)
|
||||
- [Apache Arrow DataFusion™](https://arrow.apache.org/datafusion/) (query engine)
|
||||
- [Apache OpenDAL™](https://opendal.apache.org/) (data access abstraction)
|
||||
- [etcd](https://etcd.io/) (meta service)
|
||||
|
||||
@@ -154,6 +154,7 @@
|
||||
| `region_engine.mito.index.metadata_cache_size` | String | `64MiB` | Cache size for inverted index metadata. |
|
||||
| `region_engine.mito.index.content_cache_size` | String | `128MiB` | Cache size for inverted index content. |
|
||||
| `region_engine.mito.index.content_cache_page_size` | String | `64KiB` | Page size for inverted index content cache. |
|
||||
| `region_engine.mito.index.result_cache_size` | String | `128MiB` | Cache size for index result. |
|
||||
| `region_engine.mito.inverted_index` | -- | -- | The options for inverted index in Mito engine. |
|
||||
| `region_engine.mito.inverted_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
|
||||
| `region_engine.mito.inverted_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
|
||||
@@ -188,10 +189,11 @@
|
||||
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
|
||||
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
|
||||
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
|
||||
| `logging.slow_query` | -- | -- | The slow query log options. |
|
||||
| `logging.slow_query.enable` | Bool | `false` | Whether to enable slow query log. |
|
||||
| `logging.slow_query.threshold` | String | Unset | The threshold of slow query. |
|
||||
| `logging.slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. |
|
||||
| `slow_query` | -- | -- | The slow query log options. |
|
||||
| `slow_query.enable` | Bool | `false` | Whether to enable slow query log. |
|
||||
| `slow_query.record_type` | String | Unset | The record type of slow queries. It can be `system_table` or `log`. |
|
||||
| `slow_query.threshold` | String | Unset | The threshold of slow query. |
|
||||
| `slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. |
|
||||
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
|
||||
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
|
||||
@@ -288,10 +290,12 @@
|
||||
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
|
||||
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
|
||||
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
|
||||
| `logging.slow_query` | -- | -- | The slow query log options. |
|
||||
| `logging.slow_query.enable` | Bool | `false` | Whether to enable slow query log. |
|
||||
| `logging.slow_query.threshold` | String | Unset | The threshold of slow query. |
|
||||
| `logging.slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. |
|
||||
| `slow_query` | -- | -- | The slow query log options. |
|
||||
| `slow_query.enable` | Bool | `true` | Whether to enable slow query log. |
|
||||
| `slow_query.record_type` | String | `system_table` | The record type of slow queries. It can be `system_table` or `log`.<br/>If `system_table` is selected, the slow queries will be recorded in a system table `greptime_private.slow_queries`.<br/>If `log` is selected, the slow queries will be logged in a log file `greptimedb-slow-queries.*`. |
|
||||
| `slow_query.threshold` | String | `30s` | The threshold of slow query. It can be human readable time string, for example: `10s`, `100ms`, `1s`. |
|
||||
| `slow_query.sample_ratio` | Float | `1.0` | The sampling ratio of slow query log. The value should be in the range of (0, 1]. For example, `0.1` means 10% of the slow queries will be logged and `1.0` means all slow queries will be logged. |
|
||||
| `slow_query.ttl` | String | `30d` | The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`. |
|
||||
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
|
||||
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
|
||||
@@ -362,10 +366,6 @@
|
||||
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
|
||||
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
|
||||
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
|
||||
| `logging.slow_query` | -- | -- | The slow query log options. |
|
||||
| `logging.slow_query.enable` | Bool | `false` | Whether to enable slow query log. |
|
||||
| `logging.slow_query.threshold` | String | Unset | The threshold of slow query. |
|
||||
| `logging.slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. |
|
||||
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
|
||||
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
|
||||
@@ -495,6 +495,7 @@
|
||||
| `region_engine.mito.index.metadata_cache_size` | String | `64MiB` | Cache size for inverted index metadata. |
|
||||
| `region_engine.mito.index.content_cache_size` | String | `128MiB` | Cache size for inverted index content. |
|
||||
| `region_engine.mito.index.content_cache_page_size` | String | `64KiB` | Page size for inverted index content cache. |
|
||||
| `region_engine.mito.index.result_cache_size` | String | `128MiB` | Cache size for index result. |
|
||||
| `region_engine.mito.inverted_index` | -- | -- | The options for inverted index in Mito engine. |
|
||||
| `region_engine.mito.inverted_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
|
||||
| `region_engine.mito.inverted_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
|
||||
@@ -529,10 +530,6 @@
|
||||
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
|
||||
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
|
||||
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
|
||||
| `logging.slow_query` | -- | -- | The slow query log options. |
|
||||
| `logging.slow_query.enable` | Bool | `false` | Whether to enable slow query log. |
|
||||
| `logging.slow_query.threshold` | String | Unset | The threshold of slow query. |
|
||||
| `logging.slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. |
|
||||
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
|
||||
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
|
||||
@@ -585,9 +582,5 @@
|
||||
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
|
||||
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
|
||||
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
|
||||
| `logging.slow_query` | -- | -- | The slow query log options. |
|
||||
| `logging.slow_query.enable` | Bool | `false` | Whether to enable slow query log. |
|
||||
| `logging.slow_query.threshold` | String | Unset | The threshold of slow query. |
|
||||
| `logging.slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. |
|
||||
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
|
||||
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
|
||||
|
||||
@@ -499,6 +499,9 @@ content_cache_size = "128MiB"
|
||||
## Page size for inverted index content cache.
|
||||
content_cache_page_size = "64KiB"
|
||||
|
||||
## Cache size for index result.
|
||||
result_cache_size = "128MiB"
|
||||
|
||||
## The options for inverted index in Mito engine.
|
||||
[region_engine.mito.inverted_index]
|
||||
|
||||
@@ -632,19 +635,6 @@ max_log_files = 720
|
||||
[logging.tracing_sample_ratio]
|
||||
default_ratio = 1.0
|
||||
|
||||
## The slow query log options.
|
||||
[logging.slow_query]
|
||||
## Whether to enable slow query log.
|
||||
enable = false
|
||||
|
||||
## The threshold of slow query.
|
||||
## @toml2docs:none-default
|
||||
threshold = "10s"
|
||||
|
||||
## The sampling ratio of slow query log. The value should be in the range of (0, 1].
|
||||
## @toml2docs:none-default
|
||||
sample_ratio = 1.0
|
||||
|
||||
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
|
||||
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
|
||||
[export_metrics]
|
||||
|
||||
@@ -100,19 +100,6 @@ max_log_files = 720
|
||||
[logging.tracing_sample_ratio]
|
||||
default_ratio = 1.0
|
||||
|
||||
## The slow query log options.
|
||||
[logging.slow_query]
|
||||
## Whether to enable slow query log.
|
||||
enable = false
|
||||
|
||||
## The threshold of slow query.
|
||||
## @toml2docs:none-default
|
||||
threshold = "10s"
|
||||
|
||||
## The sampling ratio of slow query log. The value should be in the range of (0, 1].
|
||||
## @toml2docs:none-default
|
||||
sample_ratio = 1.0
|
||||
|
||||
## The tracing options. Only effect when compiled with `tokio-console` feature.
|
||||
#+ [tracing]
|
||||
## The tokio console address.
|
||||
|
||||
@@ -223,18 +223,24 @@ max_log_files = 720
|
||||
default_ratio = 1.0
|
||||
|
||||
## The slow query log options.
|
||||
[logging.slow_query]
|
||||
[slow_query]
|
||||
## Whether to enable slow query log.
|
||||
enable = false
|
||||
enable = true
|
||||
|
||||
## The threshold of slow query.
|
||||
## @toml2docs:none-default
|
||||
threshold = "10s"
|
||||
## The record type of slow queries. It can be `system_table` or `log`.
|
||||
## If `system_table` is selected, the slow queries will be recorded in a system table `greptime_private.slow_queries`.
|
||||
## If `log` is selected, the slow queries will be logged in a log file `greptimedb-slow-queries.*`.
|
||||
record_type = "system_table"
|
||||
|
||||
## The sampling ratio of slow query log. The value should be in the range of (0, 1].
|
||||
## @toml2docs:none-default
|
||||
## The threshold of slow query. It can be human readable time string, for example: `10s`, `100ms`, `1s`.
|
||||
threshold = "30s"
|
||||
|
||||
## The sampling ratio of slow query log. The value should be in the range of (0, 1]. For example, `0.1` means 10% of the slow queries will be logged and `1.0` means all slow queries will be logged.
|
||||
sample_ratio = 1.0
|
||||
|
||||
## The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`.
|
||||
ttl = "30d"
|
||||
|
||||
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
|
||||
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
|
||||
[export_metrics]
|
||||
|
||||
@@ -218,19 +218,6 @@ max_log_files = 720
|
||||
[logging.tracing_sample_ratio]
|
||||
default_ratio = 1.0
|
||||
|
||||
## The slow query log options.
|
||||
[logging.slow_query]
|
||||
## Whether to enable slow query log.
|
||||
enable = false
|
||||
|
||||
## The threshold of slow query.
|
||||
## @toml2docs:none-default
|
||||
threshold = "10s"
|
||||
|
||||
## The sampling ratio of slow query log. The value should be in the range of (0, 1].
|
||||
## @toml2docs:none-default
|
||||
sample_ratio = 1.0
|
||||
|
||||
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
|
||||
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
|
||||
[export_metrics]
|
||||
|
||||
@@ -590,6 +590,9 @@ content_cache_size = "128MiB"
|
||||
## Page size for inverted index content cache.
|
||||
content_cache_page_size = "64KiB"
|
||||
|
||||
## Cache size for index result.
|
||||
result_cache_size = "128MiB"
|
||||
|
||||
## The options for inverted index in Mito engine.
|
||||
[region_engine.mito.inverted_index]
|
||||
|
||||
@@ -724,17 +727,21 @@ max_log_files = 720
|
||||
default_ratio = 1.0
|
||||
|
||||
## The slow query log options.
|
||||
[logging.slow_query]
|
||||
[slow_query]
|
||||
## Whether to enable slow query log.
|
||||
enable = false
|
||||
#+ enable = false
|
||||
|
||||
## The record type of slow queries. It can be `system_table` or `log`.
|
||||
## @toml2docs:none-default
|
||||
#+ record_type = "system_table"
|
||||
|
||||
## The threshold of slow query.
|
||||
## @toml2docs:none-default
|
||||
threshold = "10s"
|
||||
#+ threshold = "10s"
|
||||
|
||||
## The sampling ratio of slow query log. The value should be in the range of (0, 1].
|
||||
## @toml2docs:none-default
|
||||
sample_ratio = 1.0
|
||||
#+ sample_ratio = 1.0
|
||||
|
||||
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
|
||||
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
|
||||
|
||||
@@ -11,6 +11,6 @@ And database will reply with something like:
|
||||
Log Level changed from Some("info") to "trace,flow=debug"%
|
||||
```
|
||||
|
||||
The data is a string in the format of `global_level,module1=level1,module2=level2,...` that follow the same rule of `RUST_LOG`.
|
||||
The data is a string in the format of `global_level,module1=level1,module2=level2,...` that follows the same rule of `RUST_LOG`.
|
||||
|
||||
The module is the module name of the log, and the level is the log level. The log level can be one of the following: `trace`, `debug`, `info`, `warn`, `error`, `off`(case insensitive).
|
||||
@@ -14,7 +14,7 @@ impl SqlQueryHandler for Instance {
|
||||
```
|
||||
|
||||
Normally, when a SQL query arrives at GreptimeDB, the `do_query` method will be called. After some parsing work, the SQL
|
||||
will be feed into `StatementExecutor`:
|
||||
will be fed into `StatementExecutor`:
|
||||
|
||||
```rust
|
||||
// in Frontend Instance:
|
||||
@@ -27,7 +27,7 @@ an example.
|
||||
|
||||
Now, what if the statements should be handled differently for GreptimeDB Standalone and Cluster? You can see there's
|
||||
a `SqlStatementExecutor` field in `StatementExecutor`. Each GreptimeDB Standalone and Cluster has its own implementation
|
||||
of `SqlStatementExecutor`. If you are going to implement the statements differently in the two mode (
|
||||
of `SqlStatementExecutor`. If you are going to implement the statements differently in the two modes (
|
||||
like `CREATE TABLE`), you have to implement them in their own `SqlStatementExecutor`s.
|
||||
|
||||
Summarize as the diagram below:
|
||||
|
||||
@@ -44,6 +44,10 @@ Dump memory profiling data through HTTP API:
|
||||
|
||||
```bash
|
||||
curl -X POST localhost:4000/debug/prof/mem > greptime.hprof
|
||||
# or output flamegraph directly
|
||||
curl -X POST "localhost:4000/debug/prof/mem?output=flamegraph" > greptime.svg
|
||||
# or output pprof format
|
||||
curl -X POST "localhost:4000/debug/prof/mem?output=proto" > greptime.pprof
|
||||
```
|
||||
|
||||
You can periodically dump profiling data and compare them to find the delta memory usage.
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
Currently, our query engine is based on DataFusion, so all aggregate function is executed by DataFusion, through its UDAF interface. You can find DataFusion's UDAF example [here](https://github.com/apache/arrow-datafusion/blob/arrow2/datafusion-examples/examples/simple_udaf.rs). Basically, we provide the same way as DataFusion to write aggregate functions: both are centered in a struct called "Accumulator" to accumulates states along the way in aggregation.
|
||||
|
||||
However, DataFusion's UDAF implementation has a huge restriction, that it requires user to provide a concrete "Accumulator". Take `Median` aggregate function for example, to aggregate a `u32` datatype column, you have to write a `MedianU32`, and use `SELECT MEDIANU32(x)` in SQL. `MedianU32` cannot be used to aggregate a `i32` datatype column. Or, there's another way: you can use a special type that can hold all kinds of data (like our `Value` enum or Arrow's `ScalarValue`), and `match` all the way up to do aggregate calculations. It might work, though rather tedious. (But I think it's DataFusion's prefer way to write UDAF.)
|
||||
However, DataFusion's UDAF implementation has a huge restriction, that it requires user to provide a concrete "Accumulator". Take `Median` aggregate function for example, to aggregate a `u32` datatype column, you have to write a `MedianU32`, and use `SELECT MEDIANU32(x)` in SQL. `MedianU32` cannot be used to aggregate a `i32` datatype column. Or, there's another way: you can use a special type that can hold all kinds of data (like our `Value` enum or Arrow's `ScalarValue`), and `match` all the way up to do aggregate calculations. It might work, though rather tedious. (But I think it's DataFusion's preferred way to write UDAF.)
|
||||
|
||||
So is there a way we can make an aggregate function that automatically match the input data's type? For example, a `Median` aggregator that can work on both `u32` column and `i32`? The answer is yes until we found a way to bypassing DataFusion's restriction, a restriction that DataFusion simply don't pass the input data's type when creating an Accumulator.
|
||||
So is there a way we can make an aggregate function that automatically match the input data's type? For example, a `Median` aggregator that can work on both `u32` column and `i32`? The answer is yes until we find a way to bypass DataFusion's restriction, a restriction that DataFusion simply doesn't pass the input data's type when creating an Accumulator.
|
||||
|
||||
> There's an example in `my_sum_udaf_example.rs`, take that as quick start.
|
||||
|
||||
@@ -16,7 +16,7 @@ You must first define a struct that will be used to create your accumulator. For
|
||||
struct MySumAccumulatorCreator {}
|
||||
```
|
||||
|
||||
Attribute macro `#[as_aggr_func_creator]` and derive macro `#[derive(Debug, AggrFuncTypeStore)]` must both annotated on the struct. They work together to provide a storage of aggregate function's input data types, which are needed for creating generic accumulator later.
|
||||
Attribute macro `#[as_aggr_func_creator]` and derive macro `#[derive(Debug, AggrFuncTypeStore)]` must both be annotated on the struct. They work together to provide a storage of aggregate function's input data types, which are needed for creating generic accumulator later.
|
||||
|
||||
> Note that the `as_aggr_func_creator` macro will add fields to the struct, so the struct cannot be defined as an empty struct without field like `struct Foo;`, neither as a new type like `struct Foo(bar)`.
|
||||
|
||||
@@ -32,11 +32,11 @@ pub trait AggregateFunctionCreator: Send + Sync + Debug {
|
||||
|
||||
You can use input data's type in methods that return output type and state types (just invoke `input_types()`).
|
||||
|
||||
The output type is aggregate function's output data's type. For example, `SUM` aggregate function's output type is `u64` for a `u32` datatype column. The state types are accumulator's internal states' types. Take `AVG` aggregate function on a `i32` column as example, it's state types are `i64` (for sum) and `u64` (for count).
|
||||
The output type is aggregate function's output data's type. For example, `SUM` aggregate function's output type is `u64` for a `u32` datatype column. The state types are accumulator's internal states' types. Take `AVG` aggregate function on a `i32` column as example, its state types are `i64` (for sum) and `u64` (for count).
|
||||
|
||||
The `creator` function is where you define how an accumulator (that will be used in DataFusion) is created. You define "how" to create the accumulator (instead of "what" to create), using the input data's type as arguments. With input datatype known, you can create accumulator generically.
|
||||
|
||||
# 2. Impl `Accumulator` trait for you accumulator.
|
||||
# 2. Impl `Accumulator` trait for your accumulator.
|
||||
|
||||
The accumulator is where you store the aggregate calculation states and evaluate a result. You must impl `Accumulator` trait for it. The trait's definition is:
|
||||
|
||||
@@ -49,7 +49,7 @@ pub trait Accumulator: Send + Sync + Debug {
|
||||
}
|
||||
```
|
||||
|
||||
The DataFusion basically execute aggregate like this:
|
||||
The DataFusion basically executes aggregate like this:
|
||||
|
||||
1. Partitioning all input data for aggregate. Create an accumulator for each part.
|
||||
2. Call `update_batch` on each accumulator with partitioned data, to let you update your aggregate calculation.
|
||||
@@ -57,16 +57,16 @@ The DataFusion basically execute aggregate like this:
|
||||
4. Call `merge_batch` to merge all accumulator's internal state to one.
|
||||
5. Execute `evaluate` on the chosen one to get the final calculation result.
|
||||
|
||||
Once you know the meaning of each method, you can easily write your accumulator. You can refer to `Median` accumulator or `SUM` accumulator defined in file `my_sum_udaf_example.rs` for more details.
|
||||
Once you know the meaning of each method, you can easily write your accumulator. You can refer to `Median` accumulator or `SUM` accumulator defined in file `my_sum_udaf_example.rs` for more details.
|
||||
|
||||
# 3. Register your aggregate function to our query engine.
|
||||
|
||||
You can call `register_aggregate_function` method in query engine to register your aggregate function. To do that, you have to new an instance of struct `AggregateFunctionMeta`. The struct has three fields, first is the name of your aggregate function's name. The function name is case-sensitive due to DataFusion's restriction. We strongly recommend using lowercase for your name. If you have to use uppercase name, wrap your aggregate function with quotation marks. For example, if you define an aggregate function named "my_aggr", you can use "`SELECT MY_AGGR(x)`"; if you define "my_AGGR", you have to use "`SELECT "my_AGGR"(x)`".
|
||||
|
||||
The second field is arg_counts ,the count of the arguments. Like accumulator `percentile`, calculating the p_number of the column. We need to input the value of column and the value of p to cacalate, and so the count of the arguments is two.
|
||||
The second field is arg_counts ,the count of the arguments. Like accumulator `percentile`, calculating the p_number of the column. We need to input the value of column and the value of p to calculate, and so the count of the arguments is two.
|
||||
|
||||
The third field is a function about how to create your accumulator creator that you defined in step 1 above. Create creator, that's a bit intertwined, but it is how we make DataFusion use a newly created aggregate function each time it executes a SQL, preventing the stored input types from affecting each other. The key detail can be starting looking at our `DfContextProviderAdapter` struct's `get_aggregate_meta` method.
|
||||
|
||||
# (Optional) 4. Make your aggregate function automatically registered.
|
||||
|
||||
If you've written a great aggregate function that want to let everyone use it, you can make it automatically registered to our query engine at start time. It's quick simple, just refer to the `AggregateFunctions::register` function in `common/function/src/scalars/aggregate/mod.rs`.
|
||||
If you've written a great aggregate function that wants to let everyone use it, you can make it automatically register to our query engine at start time. It's quick and simple, just refer to the `AggregateFunctions::register` function in `common/function/src/scalars/aggregate/mod.rs`.
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
This document introduces how to write fuzz tests in GreptimeDB.
|
||||
|
||||
## What is a fuzz test
|
||||
Fuzz test is tool that leverage deterministic random generation to assist in finding bugs. The goal of fuzz tests is to identify inputs generated by the fuzzer that cause system panics, crashes, or unexpected behaviors to occur. And we are using the [cargo-fuzz](https://github.com/rust-fuzz/cargo-fuzz) to run our fuzz test targets.
|
||||
Fuzz test is tool that leverages deterministic random generation to assist in finding bugs. The goal of fuzz tests is to identify inputs generated by the fuzzer that cause system panics, crashes, or unexpected behaviors to occur. And we are using the [cargo-fuzz](https://github.com/rust-fuzz/cargo-fuzz) to run our fuzz test targets.
|
||||
|
||||
## Why we need them
|
||||
- Find bugs by leveraging random generation
|
||||
|
||||
@@ -21,7 +21,7 @@
|
||||
lib = nixpkgs.lib;
|
||||
rustToolchain = fenix.packages.${system}.fromToolchainName {
|
||||
name = (lib.importTOML ./rust-toolchain.toml).toolchain.channel;
|
||||
sha256 = "sha256-arzEYlWLGGYeOhECHpBxQd2joZ4rPKV3qLNnZ+eql6A=";
|
||||
sha256 = "sha256-tJJr8oqX3YD+ohhPK7jlt/7kvKBnBqJVjYtoFr520d4=";
|
||||
};
|
||||
in
|
||||
{
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
[toolchain]
|
||||
channel = "nightly-2025-04-15"
|
||||
channel = "nightly-2025-05-19"
|
||||
|
||||
@@ -110,11 +110,26 @@ pub struct ExportCommand {
|
||||
#[clap(long)]
|
||||
s3: bool,
|
||||
|
||||
/// if both `s3_ddl_local_dir` and `s3` are set, `s3_ddl_local_dir` will be only used for
|
||||
/// exported SQL files, and the data will be exported to s3.
|
||||
///
|
||||
/// Note that `s3_ddl_local_dir` export sql files to **LOCAL** file system, this is useful if export client don't have
|
||||
/// direct access to s3.
|
||||
///
|
||||
/// if `s3` is set but `s3_ddl_local_dir` is not set, both SQL&data will be exported to s3.
|
||||
#[clap(long)]
|
||||
s3_ddl_local_dir: Option<String>,
|
||||
|
||||
/// The s3 bucket name
|
||||
/// if s3 is set, this is required
|
||||
#[clap(long)]
|
||||
s3_bucket: Option<String>,
|
||||
|
||||
// The s3 root path
|
||||
/// if s3 is set, this is required
|
||||
#[clap(long)]
|
||||
s3_root: Option<String>,
|
||||
|
||||
/// The s3 endpoint
|
||||
/// if s3 is set, this is required
|
||||
#[clap(long)]
|
||||
@@ -172,7 +187,9 @@ impl ExportCommand {
|
||||
start_time: self.start_time.clone(),
|
||||
end_time: self.end_time.clone(),
|
||||
s3: self.s3,
|
||||
s3_ddl_local_dir: self.s3_ddl_local_dir.clone(),
|
||||
s3_bucket: self.s3_bucket.clone(),
|
||||
s3_root: self.s3_root.clone(),
|
||||
s3_endpoint: self.s3_endpoint.clone(),
|
||||
s3_access_key: self.s3_access_key.clone(),
|
||||
s3_secret_key: self.s3_secret_key.clone(),
|
||||
@@ -192,7 +209,9 @@ pub struct Export {
|
||||
start_time: Option<String>,
|
||||
end_time: Option<String>,
|
||||
s3: bool,
|
||||
s3_ddl_local_dir: Option<String>,
|
||||
s3_bucket: Option<String>,
|
||||
s3_root: Option<String>,
|
||||
s3_endpoint: Option<String>,
|
||||
s3_access_key: Option<String>,
|
||||
s3_secret_key: Option<String>,
|
||||
@@ -364,7 +383,7 @@ impl Export {
|
||||
let timer = Instant::now();
|
||||
let db_names = self.get_db_names().await?;
|
||||
let db_count = db_names.len();
|
||||
let operator = self.build_operator().await?;
|
||||
let operator = self.build_prefer_fs_operator().await?;
|
||||
|
||||
for schema in db_names {
|
||||
let create_database = self
|
||||
@@ -394,7 +413,7 @@ impl Export {
|
||||
let semaphore = Arc::new(Semaphore::new(self.parallelism));
|
||||
let db_names = self.get_db_names().await?;
|
||||
let db_count = db_names.len();
|
||||
let operator = Arc::new(self.build_operator().await?);
|
||||
let operator = Arc::new(self.build_prefer_fs_operator().await?);
|
||||
let mut tasks = Vec::with_capacity(db_names.len());
|
||||
|
||||
for schema in db_names {
|
||||
@@ -459,13 +478,34 @@ impl Export {
|
||||
}
|
||||
}
|
||||
|
||||
/// build operator with preference for file system
|
||||
async fn build_prefer_fs_operator(&self) -> Result<Operator> {
|
||||
// is under s3 mode and s3_ddl_dir is set, use it as root
|
||||
if self.s3 && self.s3_ddl_local_dir.is_some() {
|
||||
let root = self.s3_ddl_local_dir.as_ref().unwrap().clone();
|
||||
let op = Operator::new(services::Fs::default().root(&root))
|
||||
.context(OpenDalSnafu)?
|
||||
.layer(LoggingLayer::default())
|
||||
.finish();
|
||||
Ok(op)
|
||||
} else if self.s3 {
|
||||
self.build_s3_operator().await
|
||||
} else {
|
||||
self.build_fs_operator().await
|
||||
}
|
||||
}
|
||||
|
||||
async fn build_s3_operator(&self) -> Result<Operator> {
|
||||
let mut builder = services::S3::default().root("").bucket(
|
||||
let mut builder = services::S3::default().bucket(
|
||||
self.s3_bucket
|
||||
.as_ref()
|
||||
.expect("s3_bucket must be provided when s3 is enabled"),
|
||||
);
|
||||
|
||||
if let Some(root) = self.s3_root.as_ref() {
|
||||
builder = builder.root(root);
|
||||
}
|
||||
|
||||
if let Some(endpoint) = self.s3_endpoint.as_ref() {
|
||||
builder = builder.endpoint(endpoint);
|
||||
}
|
||||
@@ -509,6 +549,7 @@ impl Export {
|
||||
let db_count = db_names.len();
|
||||
let mut tasks = Vec::with_capacity(db_count);
|
||||
let operator = Arc::new(self.build_operator().await?);
|
||||
let fs_first_operator = Arc::new(self.build_prefer_fs_operator().await?);
|
||||
let with_options = build_with_options(&self.start_time, &self.end_time);
|
||||
|
||||
for schema in db_names {
|
||||
@@ -516,6 +557,7 @@ impl Export {
|
||||
let export_self = self.clone();
|
||||
let with_options_clone = with_options.clone();
|
||||
let operator = operator.clone();
|
||||
let fs_first_operator = fs_first_operator.clone();
|
||||
|
||||
tasks.push(async move {
|
||||
let _permit = semaphore_moved.acquire().await.unwrap();
|
||||
@@ -549,7 +591,7 @@ impl Export {
|
||||
let copy_from_path = export_self.get_file_path(&schema, "copy_from.sql");
|
||||
export_self
|
||||
.write_to_storage(
|
||||
&operator,
|
||||
&fs_first_operator,
|
||||
©_from_path,
|
||||
copy_database_from_sql.into_bytes(),
|
||||
)
|
||||
@@ -580,8 +622,13 @@ impl Export {
|
||||
fn format_output_path(&self, file_path: &str) -> String {
|
||||
if self.s3 {
|
||||
format!(
|
||||
"s3://{}/{}",
|
||||
"s3://{}{}/{}",
|
||||
self.s3_bucket.as_ref().unwrap_or(&String::new()),
|
||||
if let Some(root) = &self.s3_root {
|
||||
format!("/{}", root)
|
||||
} else {
|
||||
String::new()
|
||||
},
|
||||
file_path
|
||||
)
|
||||
} else {
|
||||
@@ -605,9 +652,14 @@ impl Export {
|
||||
fn get_storage_params(&self, schema: &str) -> (String, String) {
|
||||
if self.s3 {
|
||||
let s3_path = format!(
|
||||
"s3://{}/{}/{}/",
|
||||
"s3://{}{}/{}/{}/",
|
||||
// Safety: s3_bucket is required when s3 is enabled
|
||||
self.s3_bucket.as_ref().unwrap(),
|
||||
if let Some(root) = &self.s3_root {
|
||||
format!("/{}", root)
|
||||
} else {
|
||||
String::new()
|
||||
},
|
||||
self.catalog,
|
||||
schema
|
||||
);
|
||||
|
||||
@@ -315,7 +315,7 @@ impl Database {
|
||||
let mut flight_message_stream = flight_data_stream.map(move |flight_data| {
|
||||
flight_data
|
||||
.map_err(Error::from)
|
||||
.and_then(|data| decoder.try_decode(data).context(ConvertFlightDataSnafu))
|
||||
.and_then(|data| decoder.try_decode(&data).context(ConvertFlightDataSnafu))
|
||||
});
|
||||
|
||||
let Some(first_flight_message) = flight_message_stream.next().await else {
|
||||
|
||||
@@ -125,7 +125,7 @@ impl RegionRequester {
|
||||
let mut flight_message_stream = flight_data_stream.map(move |flight_data| {
|
||||
flight_data
|
||||
.map_err(Error::from)
|
||||
.and_then(|data| decoder.try_decode(data).context(ConvertFlightDataSnafu))
|
||||
.and_then(|data| decoder.try_decode(&data).context(ConvertFlightDataSnafu))
|
||||
});
|
||||
|
||||
let Some(first_flight_message) = flight_message_stream.next().await else {
|
||||
|
||||
@@ -76,6 +76,7 @@ impl Command {
|
||||
&opts,
|
||||
&TracingOptions::default(),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
|
||||
let tool = self.cmd.build().await.context(error::BuildCliSnafu)?;
|
||||
|
||||
@@ -156,6 +156,7 @@ impl StartCommand {
|
||||
.context(LoadLayeredConfigSnafu)?;
|
||||
|
||||
self.merge_with_cli_options(global_options, &mut opts)?;
|
||||
opts.component.sanitize();
|
||||
|
||||
Ok(opts)
|
||||
}
|
||||
|
||||
@@ -64,6 +64,7 @@ impl InstanceBuilder {
|
||||
&dn_opts.logging,
|
||||
&dn_opts.tracing,
|
||||
dn_opts.node_id.map(|x| x.to_string()),
|
||||
None,
|
||||
);
|
||||
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
|
||||
@@ -244,6 +244,7 @@ impl StartCommand {
|
||||
&opts.component.logging,
|
||||
&opts.component.tracing,
|
||||
opts.component.node_id.map(|x| x.to_string()),
|
||||
None,
|
||||
);
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
|
||||
|
||||
@@ -37,7 +37,6 @@ use frontend::heartbeat::HeartbeatTask;
|
||||
use frontend::instance::builder::FrontendBuilder;
|
||||
use frontend::server::Services;
|
||||
use meta_client::{MetaClientOptions, MetaClientType};
|
||||
use query::stats::StatementStatistics;
|
||||
use servers::export_metrics::ExportMetricsTask;
|
||||
use servers::tls::{TlsMode, TlsOption};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
@@ -269,6 +268,7 @@ impl StartCommand {
|
||||
&opts.component.logging,
|
||||
&opts.component.tracing,
|
||||
opts.component.node_id.clone(),
|
||||
opts.component.slow_query.as_ref(),
|
||||
);
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
|
||||
@@ -368,7 +368,6 @@ impl StartCommand {
|
||||
catalog_manager,
|
||||
Arc::new(client),
|
||||
meta_client,
|
||||
StatementStatistics::new(opts.logging.slow_query.clone()),
|
||||
)
|
||||
.with_plugin(plugins.clone())
|
||||
.with_local_cache_invalidator(layered_cache_registry)
|
||||
|
||||
@@ -300,6 +300,7 @@ impl StartCommand {
|
||||
&opts.component.logging,
|
||||
&opts.component.tracing,
|
||||
None,
|
||||
None,
|
||||
);
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
|
||||
|
||||
@@ -47,7 +47,7 @@ use common_meta::sequence::SequenceBuilder;
|
||||
use common_meta::wal_options_allocator::{build_wal_options_allocator, WalOptionsAllocatorRef};
|
||||
use common_procedure::{ProcedureInfo, ProcedureManagerRef};
|
||||
use common_telemetry::info;
|
||||
use common_telemetry::logging::{LoggingOptions, TracingOptions};
|
||||
use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions};
|
||||
use common_time::timezone::set_default_timezone;
|
||||
use common_version::{short_version, version};
|
||||
use common_wal::config::DatanodeWalConfig;
|
||||
@@ -69,7 +69,6 @@ use frontend::service_config::{
|
||||
};
|
||||
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
|
||||
use mito2::config::MitoConfig;
|
||||
use query::stats::StatementStatistics;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
|
||||
use servers::grpc::GrpcOptions;
|
||||
@@ -153,6 +152,7 @@ pub struct StandaloneOptions {
|
||||
pub init_regions_in_background: bool,
|
||||
pub init_regions_parallelism: usize,
|
||||
pub max_in_flight_write_bytes: Option<ReadableSize>,
|
||||
pub slow_query: Option<SlowQueryOptions>,
|
||||
}
|
||||
|
||||
impl Default for StandaloneOptions {
|
||||
@@ -184,6 +184,7 @@ impl Default for StandaloneOptions {
|
||||
init_regions_in_background: false,
|
||||
init_regions_parallelism: 16,
|
||||
max_in_flight_write_bytes: None,
|
||||
slow_query: Some(SlowQueryOptions::default()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -223,6 +224,7 @@ impl StandaloneOptions {
|
||||
// Handle the export metrics task run by standalone to frontend for execution
|
||||
export_metrics: cloned_opts.export_metrics,
|
||||
max_in_flight_write_bytes: cloned_opts.max_in_flight_write_bytes,
|
||||
slow_query: cloned_opts.slow_query,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
@@ -447,6 +449,7 @@ impl StartCommand {
|
||||
&opts.component.logging,
|
||||
&opts.component.tracing,
|
||||
None,
|
||||
opts.component.slow_query.as_ref(),
|
||||
);
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
|
||||
@@ -594,7 +597,6 @@ impl StartCommand {
|
||||
catalog_manager.clone(),
|
||||
node_manager.clone(),
|
||||
ddl_task_executor.clone(),
|
||||
StatementStatistics::new(opts.logging.slow_query.clone()),
|
||||
)
|
||||
.with_plugin(plugins.clone())
|
||||
.try_build()
|
||||
|
||||
@@ -18,7 +18,7 @@ use cmd::options::GreptimeOptions;
|
||||
use cmd::standalone::StandaloneOptions;
|
||||
use common_config::Configurable;
|
||||
use common_options::datanode::{ClientOptions, DatanodeClientOptions};
|
||||
use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, DEFAULT_OTLP_ENDPOINT};
|
||||
use common_telemetry::logging::{LoggingOptions, DEFAULT_OTLP_ENDPOINT};
|
||||
use common_wal::config::raft_engine::RaftEngineConfig;
|
||||
use common_wal::config::DatanodeWalConfig;
|
||||
use datanode::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
|
||||
@@ -167,11 +167,6 @@ fn test_load_metasrv_example_config() {
|
||||
level: Some("info".to_string()),
|
||||
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
|
||||
tracing_sample_ratio: Some(Default::default()),
|
||||
slow_query: SlowQueryOptions {
|
||||
enable: false,
|
||||
threshold: None,
|
||||
sample_ratio: None,
|
||||
},
|
||||
..Default::default()
|
||||
},
|
||||
datanode: DatanodeClientOptions {
|
||||
|
||||
@@ -163,7 +163,7 @@ impl DfAccumulator for UddSketchState {
|
||||
}
|
||||
}
|
||||
// meaning instantiate as `uddsketch_merge`
|
||||
DataType::Binary => self.merge_batch(&[array.clone()])?,
|
||||
DataType::Binary => self.merge_batch(std::slice::from_ref(array))?,
|
||||
_ => {
|
||||
return not_impl_err!(
|
||||
"UDDSketch functions do not support data type: {}",
|
||||
|
||||
@@ -468,8 +468,8 @@ mod tests {
|
||||
let empty_values = vec![""];
|
||||
let empty_input = Arc::new(StringVector::from_slice(&empty_values)) as VectorRef;
|
||||
|
||||
let ipv4_result = ipv4_func.eval(&ctx, &[empty_input.clone()]);
|
||||
let ipv6_result = ipv6_func.eval(&ctx, &[empty_input.clone()]);
|
||||
let ipv4_result = ipv4_func.eval(&ctx, std::slice::from_ref(&empty_input));
|
||||
let ipv6_result = ipv6_func.eval(&ctx, std::slice::from_ref(&empty_input));
|
||||
|
||||
assert!(ipv4_result.is_err());
|
||||
assert!(ipv6_result.is_err());
|
||||
@@ -478,7 +478,7 @@ mod tests {
|
||||
let invalid_values = vec!["not an ip", "192.168.1.256", "zzzz::ffff"];
|
||||
let invalid_input = Arc::new(StringVector::from_slice(&invalid_values)) as VectorRef;
|
||||
|
||||
let ipv4_result = ipv4_func.eval(&ctx, &[invalid_input.clone()]);
|
||||
let ipv4_result = ipv4_func.eval(&ctx, std::slice::from_ref(&invalid_input));
|
||||
|
||||
assert!(ipv4_result.is_err());
|
||||
}
|
||||
|
||||
@@ -294,7 +294,7 @@ mod tests {
|
||||
let input = Arc::new(StringVector::from_slice(&values)) as VectorRef;
|
||||
|
||||
// Convert IPv6 addresses to binary
|
||||
let binary_result = to_num.eval(&ctx, &[input.clone()]).unwrap();
|
||||
let binary_result = to_num.eval(&ctx, std::slice::from_ref(&input)).unwrap();
|
||||
|
||||
// Convert binary to hex string representation (for ipv6_num_to_string)
|
||||
let mut hex_strings = Vec::new();
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod clamp;
|
||||
pub mod clamp;
|
||||
mod modulo;
|
||||
mod pow;
|
||||
mod rate;
|
||||
@@ -20,7 +20,7 @@ mod rate;
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub use clamp::ClampFunction;
|
||||
pub use clamp::{ClampFunction, ClampMaxFunction, ClampMinFunction};
|
||||
use common_query::error::{GeneralDataFusionSnafu, Result};
|
||||
use common_query::prelude::Signature;
|
||||
use datafusion::error::DataFusionError;
|
||||
@@ -44,6 +44,8 @@ impl MathFunction {
|
||||
registry.register(Arc::new(RateFunction));
|
||||
registry.register(Arc::new(RangeFunction));
|
||||
registry.register(Arc::new(ClampFunction));
|
||||
registry.register(Arc::new(ClampMinFunction));
|
||||
registry.register(Arc::new(ClampMaxFunction));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -155,6 +155,182 @@ fn clamp_impl<T: LogicalPrimitiveType, const CLAMP_MIN: bool, const CLAMP_MAX: b
|
||||
Ok(Arc::new(PrimitiveVector::<T>::from(result)))
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct ClampMinFunction;
|
||||
|
||||
const CLAMP_MIN_NAME: &str = "clamp_min";
|
||||
|
||||
impl Function for ClampMinFunction {
|
||||
fn name(&self) -> &str {
|
||||
CLAMP_MIN_NAME
|
||||
}
|
||||
|
||||
fn return_type(&self, input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
|
||||
Ok(input_types[0].clone())
|
||||
}
|
||||
|
||||
fn signature(&self) -> Signature {
|
||||
// input, min
|
||||
Signature::uniform(2, ConcreteDataType::numerics(), Volatility::Immutable)
|
||||
}
|
||||
|
||||
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
|
||||
ensure!(
|
||||
columns.len() == 2,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect exactly 2, have: {}",
|
||||
columns.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
ensure!(
|
||||
columns[0].data_type().is_numeric(),
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The first arg's type is not numeric, have: {}",
|
||||
columns[0].data_type()
|
||||
),
|
||||
}
|
||||
);
|
||||
ensure!(
|
||||
columns[0].data_type() == columns[1].data_type(),
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"Arguments don't have identical types: {}, {}",
|
||||
columns[0].data_type(),
|
||||
columns[1].data_type()
|
||||
),
|
||||
}
|
||||
);
|
||||
ensure!(
|
||||
columns[1].len() == 1,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The second arg (min) should be scalar, have: {:?}",
|
||||
columns[1]
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
with_match_primitive_type_id!(columns[0].data_type().logical_type_id(), |$S| {
|
||||
let input_array = columns[0].to_arrow_array();
|
||||
let input = input_array
|
||||
.as_any()
|
||||
.downcast_ref::<PrimitiveArray<<$S as LogicalPrimitiveType>::ArrowPrimitive>>()
|
||||
.unwrap();
|
||||
|
||||
let min = TryAsPrimitive::<$S>::try_as_primitive(&columns[1].get(0))
|
||||
.with_context(|| {
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: "The second arg (min) should not be none",
|
||||
}
|
||||
})?;
|
||||
// For clamp_min, max is effectively infinity, so we don't use it in the clamp_impl logic.
|
||||
// We pass a default/dummy value for max.
|
||||
let max_dummy = <$S as LogicalPrimitiveType>::Native::default();
|
||||
|
||||
clamp_impl::<$S, true, false>(input, min, max_dummy)
|
||||
},{
|
||||
unreachable!()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for ClampMinFunction {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{}", CLAMP_MIN_NAME.to_ascii_uppercase())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct ClampMaxFunction;
|
||||
|
||||
const CLAMP_MAX_NAME: &str = "clamp_max";
|
||||
|
||||
impl Function for ClampMaxFunction {
|
||||
fn name(&self) -> &str {
|
||||
CLAMP_MAX_NAME
|
||||
}
|
||||
|
||||
fn return_type(&self, input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
|
||||
Ok(input_types[0].clone())
|
||||
}
|
||||
|
||||
fn signature(&self) -> Signature {
|
||||
// input, max
|
||||
Signature::uniform(2, ConcreteDataType::numerics(), Volatility::Immutable)
|
||||
}
|
||||
|
||||
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
|
||||
ensure!(
|
||||
columns.len() == 2,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect exactly 2, have: {}",
|
||||
columns.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
ensure!(
|
||||
columns[0].data_type().is_numeric(),
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The first arg's type is not numeric, have: {}",
|
||||
columns[0].data_type()
|
||||
),
|
||||
}
|
||||
);
|
||||
ensure!(
|
||||
columns[0].data_type() == columns[1].data_type(),
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"Arguments don't have identical types: {}, {}",
|
||||
columns[0].data_type(),
|
||||
columns[1].data_type()
|
||||
),
|
||||
}
|
||||
);
|
||||
ensure!(
|
||||
columns[1].len() == 1,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The second arg (max) should be scalar, have: {:?}",
|
||||
columns[1]
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
with_match_primitive_type_id!(columns[0].data_type().logical_type_id(), |$S| {
|
||||
let input_array = columns[0].to_arrow_array();
|
||||
let input = input_array
|
||||
.as_any()
|
||||
.downcast_ref::<PrimitiveArray<<$S as LogicalPrimitiveType>::ArrowPrimitive>>()
|
||||
.unwrap();
|
||||
|
||||
let max = TryAsPrimitive::<$S>::try_as_primitive(&columns[1].get(0))
|
||||
.with_context(|| {
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: "The second arg (max) should not be none",
|
||||
}
|
||||
})?;
|
||||
// For clamp_max, min is effectively -infinity, so we don't use it in the clamp_impl logic.
|
||||
// We pass a default/dummy value for min.
|
||||
let min_dummy = <$S as LogicalPrimitiveType>::Native::default();
|
||||
|
||||
clamp_impl::<$S, false, true>(input, min_dummy, max)
|
||||
},{
|
||||
unreachable!()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for ClampMaxFunction {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{}", CLAMP_MAX_NAME.to_ascii_uppercase())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
|
||||
@@ -394,4 +570,134 @@ mod test {
|
||||
let result = func.eval(&FunctionContext::default(), args.as_slice());
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn clamp_min_i64() {
|
||||
let inputs = [
|
||||
(
|
||||
vec![Some(-3), Some(-2), Some(-1), Some(0), Some(1), Some(2)],
|
||||
-1,
|
||||
vec![Some(-1), Some(-1), Some(-1), Some(0), Some(1), Some(2)],
|
||||
),
|
||||
(
|
||||
vec![Some(-3), None, Some(-1), None, None, Some(2)],
|
||||
-2,
|
||||
vec![Some(-2), None, Some(-1), None, None, Some(2)],
|
||||
),
|
||||
];
|
||||
|
||||
let func = ClampMinFunction;
|
||||
for (in_data, min, expected) in inputs {
|
||||
let args = [
|
||||
Arc::new(Int64Vector::from(in_data)) as _,
|
||||
Arc::new(Int64Vector::from_vec(vec![min])) as _,
|
||||
];
|
||||
let result = func
|
||||
.eval(&FunctionContext::default(), args.as_slice())
|
||||
.unwrap();
|
||||
let expected: VectorRef = Arc::new(Int64Vector::from(expected));
|
||||
assert_eq!(expected, result);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn clamp_max_i64() {
|
||||
let inputs = [
|
||||
(
|
||||
vec![Some(-3), Some(-2), Some(-1), Some(0), Some(1), Some(2)],
|
||||
1,
|
||||
vec![Some(-3), Some(-2), Some(-1), Some(0), Some(1), Some(1)],
|
||||
),
|
||||
(
|
||||
vec![Some(-3), None, Some(-1), None, None, Some(2)],
|
||||
0,
|
||||
vec![Some(-3), None, Some(-1), None, None, Some(0)],
|
||||
),
|
||||
];
|
||||
|
||||
let func = ClampMaxFunction;
|
||||
for (in_data, max, expected) in inputs {
|
||||
let args = [
|
||||
Arc::new(Int64Vector::from(in_data)) as _,
|
||||
Arc::new(Int64Vector::from_vec(vec![max])) as _,
|
||||
];
|
||||
let result = func
|
||||
.eval(&FunctionContext::default(), args.as_slice())
|
||||
.unwrap();
|
||||
let expected: VectorRef = Arc::new(Int64Vector::from(expected));
|
||||
assert_eq!(expected, result);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn clamp_min_f64() {
|
||||
let inputs = [(
|
||||
vec![Some(-3.0), Some(-2.0), Some(-1.0), Some(0.0), Some(1.0)],
|
||||
-1.0,
|
||||
vec![Some(-1.0), Some(-1.0), Some(-1.0), Some(0.0), Some(1.0)],
|
||||
)];
|
||||
|
||||
let func = ClampMinFunction;
|
||||
for (in_data, min, expected) in inputs {
|
||||
let args = [
|
||||
Arc::new(Float64Vector::from(in_data)) as _,
|
||||
Arc::new(Float64Vector::from_vec(vec![min])) as _,
|
||||
];
|
||||
let result = func
|
||||
.eval(&FunctionContext::default(), args.as_slice())
|
||||
.unwrap();
|
||||
let expected: VectorRef = Arc::new(Float64Vector::from(expected));
|
||||
assert_eq!(expected, result);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn clamp_max_f64() {
|
||||
let inputs = [(
|
||||
vec![Some(-3.0), Some(-2.0), Some(-1.0), Some(0.0), Some(1.0)],
|
||||
0.0,
|
||||
vec![Some(-3.0), Some(-2.0), Some(-1.0), Some(0.0), Some(0.0)],
|
||||
)];
|
||||
|
||||
let func = ClampMaxFunction;
|
||||
for (in_data, max, expected) in inputs {
|
||||
let args = [
|
||||
Arc::new(Float64Vector::from(in_data)) as _,
|
||||
Arc::new(Float64Vector::from_vec(vec![max])) as _,
|
||||
];
|
||||
let result = func
|
||||
.eval(&FunctionContext::default(), args.as_slice())
|
||||
.unwrap();
|
||||
let expected: VectorRef = Arc::new(Float64Vector::from(expected));
|
||||
assert_eq!(expected, result);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn clamp_min_type_not_match() {
|
||||
let input = vec![Some(-3.0), Some(-2.0), Some(-1.0), Some(0.0), Some(1.0)];
|
||||
let min = -1;
|
||||
|
||||
let func = ClampMinFunction;
|
||||
let args = [
|
||||
Arc::new(Float64Vector::from(input)) as _,
|
||||
Arc::new(Int64Vector::from_vec(vec![min])) as _,
|
||||
];
|
||||
let result = func.eval(&FunctionContext::default(), args.as_slice());
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn clamp_max_type_not_match() {
|
||||
let input = vec![Some(-3.0), Some(-2.0), Some(-1.0), Some(0.0), Some(1.0)];
|
||||
let max = 1;
|
||||
|
||||
let func = ClampMaxFunction;
|
||||
let args = [
|
||||
Arc::new(Float64Vector::from(input)) as _,
|
||||
Arc::new(Int64Vector::from_vec(vec![max])) as _,
|
||||
];
|
||||
let result = func.eval(&FunctionContext::default(), args.as_slice());
|
||||
assert!(result.is_err());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -37,7 +37,7 @@ impl fmt::Display for RateFunction {
|
||||
|
||||
impl Function for RateFunction {
|
||||
fn name(&self) -> &str {
|
||||
"prom_rate"
|
||||
"rate"
|
||||
}
|
||||
|
||||
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
|
||||
@@ -82,7 +82,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_rate_function() {
|
||||
let rate = RateFunction;
|
||||
assert_eq!("prom_rate", rate.name());
|
||||
assert_eq!("rate", rate.name());
|
||||
assert_eq!(
|
||||
ConcreteDataType::float64_datatype(),
|
||||
rate.return_type(&[]).unwrap()
|
||||
|
||||
@@ -127,7 +127,7 @@ pub struct FlightDecoder {
|
||||
}
|
||||
|
||||
impl FlightDecoder {
|
||||
pub fn try_decode(&mut self, flight_data: FlightData) -> Result<FlightMessage> {
|
||||
pub fn try_decode(&mut self, flight_data: &FlightData) -> Result<FlightMessage> {
|
||||
let message = root_as_message(&flight_data.data_header).map_err(|e| {
|
||||
InvalidFlightDataSnafu {
|
||||
reason: e.to_string(),
|
||||
@@ -136,7 +136,7 @@ impl FlightDecoder {
|
||||
})?;
|
||||
match message.header_type() {
|
||||
MessageHeader::NONE => {
|
||||
let metadata = FlightMetadata::decode(flight_data.app_metadata)
|
||||
let metadata = FlightMetadata::decode(flight_data.app_metadata.clone())
|
||||
.context(DecodeFlightDataSnafu)?;
|
||||
if let Some(AffectedRows { value }) = metadata.affected_rows {
|
||||
return Ok(FlightMessage::AffectedRows(value as _));
|
||||
@@ -152,7 +152,7 @@ impl FlightDecoder {
|
||||
.fail()
|
||||
}
|
||||
MessageHeader::Schema => {
|
||||
let arrow_schema = ArrowSchema::try_from(&flight_data).map_err(|e| {
|
||||
let arrow_schema = ArrowSchema::try_from(flight_data).map_err(|e| {
|
||||
InvalidFlightDataSnafu {
|
||||
reason: e.to_string(),
|
||||
}
|
||||
@@ -172,7 +172,7 @@ impl FlightDecoder {
|
||||
let arrow_schema = schema.arrow_schema().clone();
|
||||
|
||||
let arrow_batch =
|
||||
flight_data_to_arrow_batch(&flight_data, arrow_schema, &HashMap::new())
|
||||
flight_data_to_arrow_batch(flight_data, arrow_schema, &HashMap::new())
|
||||
.map_err(|e| {
|
||||
InvalidFlightDataSnafu {
|
||||
reason: e.to_string(),
|
||||
@@ -287,14 +287,14 @@ mod test {
|
||||
let decoder = &mut FlightDecoder::default();
|
||||
assert!(decoder.schema.is_none());
|
||||
|
||||
let result = decoder.try_decode(d2.clone());
|
||||
let result = decoder.try_decode(d2);
|
||||
assert!(matches!(result, Err(Error::InvalidFlightData { .. })));
|
||||
assert!(result
|
||||
.unwrap_err()
|
||||
.to_string()
|
||||
.contains("Should have decoded schema first!"));
|
||||
|
||||
let message = decoder.try_decode(d1.clone()).unwrap();
|
||||
let message = decoder.try_decode(d1).unwrap();
|
||||
assert!(matches!(message, FlightMessage::Schema(_)));
|
||||
let FlightMessage::Schema(decoded_schema) = message else {
|
||||
unreachable!()
|
||||
@@ -303,14 +303,14 @@ mod test {
|
||||
|
||||
let _ = decoder.schema.as_ref().unwrap();
|
||||
|
||||
let message = decoder.try_decode(d2.clone()).unwrap();
|
||||
let message = decoder.try_decode(d2).unwrap();
|
||||
assert!(matches!(message, FlightMessage::Recordbatch(_)));
|
||||
let FlightMessage::Recordbatch(actual_batch) = message else {
|
||||
unreachable!()
|
||||
};
|
||||
assert_eq!(actual_batch, batch1);
|
||||
|
||||
let message = decoder.try_decode(d3.clone()).unwrap();
|
||||
let message = decoder.try_decode(d3).unwrap();
|
||||
assert!(matches!(message, FlightMessage::Recordbatch(_)));
|
||||
let FlightMessage::Recordbatch(actual_batch) = message else {
|
||||
unreachable!()
|
||||
|
||||
@@ -8,6 +8,7 @@ license.workspace = true
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1"
|
||||
common-error.workspace = true
|
||||
common-macro.workspace = true
|
||||
snafu.workspace = true
|
||||
@@ -16,6 +17,11 @@ tokio.workspace = true
|
||||
|
||||
[target.'cfg(not(windows))'.dependencies]
|
||||
tikv-jemalloc-ctl = { version = "0.6", features = ["use_std", "stats"] }
|
||||
jemalloc-pprof-utils = { version = "0.7", package = "pprof_util", features = [
|
||||
"flamegraph",
|
||||
"symbolize",
|
||||
] } # for parsing jemalloc prof dump
|
||||
jemalloc-pprof-mappings = { version = "0.7", package = "mappings" } # for get the name of functions in the prof dump
|
||||
|
||||
[target.'cfg(not(windows))'.dependencies.tikv-jemalloc-sys]
|
||||
features = ["stats", "profiling", "unprefixed_malloc_on_supported_platforms"]
|
||||
|
||||
@@ -30,12 +30,25 @@ pub enum Error {
|
||||
|
||||
#[snafu(display("Memory profiling is not supported"))]
|
||||
ProfilingNotSupported,
|
||||
|
||||
#[snafu(display("Failed to parse jeheap profile: {}", err))]
|
||||
ParseJeHeap {
|
||||
#[snafu(source)]
|
||||
err: anyhow::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to dump profile data to flamegraph: {}", err))]
|
||||
Flamegraph {
|
||||
#[snafu(source)]
|
||||
err: anyhow::Error,
|
||||
},
|
||||
}
|
||||
|
||||
impl ErrorExt for Error {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
match self {
|
||||
Error::Internal { source } => source.status_code(),
|
||||
Error::ParseJeHeap { .. } | Error::Flamegraph { .. } => StatusCode::Internal,
|
||||
Error::ProfilingNotSupported => StatusCode::Unsupported,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,16 +15,19 @@
|
||||
mod error;
|
||||
|
||||
use std::ffi::{c_char, CString};
|
||||
use std::io::BufReader;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use error::{
|
||||
BuildTempPathSnafu, DumpProfileDataSnafu, OpenTempFileSnafu, ProfilingNotEnabledSnafu,
|
||||
ReadOptProfSnafu,
|
||||
};
|
||||
use jemalloc_pprof_mappings::MAPPINGS;
|
||||
use jemalloc_pprof_utils::{parse_jeheap, FlamegraphOptions, StackProfile};
|
||||
use snafu::{ensure, ResultExt};
|
||||
use tokio::io::AsyncReadExt;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::error::{FlamegraphSnafu, ParseJeHeapSnafu, Result};
|
||||
|
||||
const PROF_DUMP: &[u8] = b"prof.dump\0";
|
||||
const OPT_PROF: &[u8] = b"opt.prof\0";
|
||||
@@ -70,6 +73,26 @@ pub async fn dump_profile() -> Result<Vec<u8>> {
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
async fn dump_profile_to_stack_profile() -> Result<StackProfile> {
|
||||
let profile = dump_profile().await?;
|
||||
let profile = BufReader::new(profile.as_slice());
|
||||
parse_jeheap(profile, MAPPINGS.as_deref()).context(ParseJeHeapSnafu)
|
||||
}
|
||||
|
||||
pub async fn dump_pprof() -> Result<Vec<u8>> {
|
||||
let profile = dump_profile_to_stack_profile().await?;
|
||||
let pprof = profile.to_pprof(("inuse_space", "bytes"), ("space", "bytes"), None);
|
||||
Ok(pprof)
|
||||
}
|
||||
|
||||
pub async fn dump_flamegraph() -> Result<Vec<u8>> {
|
||||
let profile = dump_profile_to_stack_profile().await?;
|
||||
let mut opts = FlamegraphOptions::default();
|
||||
opts.title = "inuse_space".to_string();
|
||||
opts.count_name = "bytes".to_string();
|
||||
let flamegraph = profile.to_flamegraph(&mut opts).context(FlamegraphSnafu)?;
|
||||
Ok(flamegraph)
|
||||
}
|
||||
fn is_prof_enabled() -> Result<bool> {
|
||||
// safety: OPT_PROF variable, if present, is always a boolean value.
|
||||
Ok(unsafe { tikv_jemalloc_ctl::raw::read::<bool>(OPT_PROF).context(ReadOptProfSnafu)? })
|
||||
|
||||
@@ -17,9 +17,19 @@ pub mod error;
|
||||
#[cfg(not(windows))]
|
||||
mod jemalloc;
|
||||
#[cfg(not(windows))]
|
||||
pub use jemalloc::dump_profile;
|
||||
pub use jemalloc::{dump_flamegraph, dump_pprof, dump_profile};
|
||||
|
||||
#[cfg(windows)]
|
||||
pub async fn dump_profile() -> error::Result<Vec<u8>> {
|
||||
error::ProfilingNotSupportedSnafu.fail()
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
pub async fn dump_pprof() -> error::Result<Vec<u8>> {
|
||||
error::ProfilingNotSupportedSnafu.fail()
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
pub async fn dump_flamegraph() -> error::Result<Vec<u8>> {
|
||||
error::ProfilingNotSupportedSnafu.fail()
|
||||
}
|
||||
|
||||
@@ -34,6 +34,7 @@ common-recordbatch.workspace = true
|
||||
common-telemetry.workspace = true
|
||||
common-time.workspace = true
|
||||
common-wal.workspace = true
|
||||
common-workload.workspace = true
|
||||
datafusion-common.workspace = true
|
||||
datafusion-expr.workspace = true
|
||||
datatypes.workspace = true
|
||||
|
||||
168
src/common/meta/src/cache/flow/table_flownode.rs
vendored
168
src/common/meta/src/cache/flow/table_flownode.rs
vendored
@@ -24,21 +24,39 @@ use crate::cache::{CacheContainer, Initializer};
|
||||
use crate::error::Result;
|
||||
use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
|
||||
use crate::key::flow::{TableFlowManager, TableFlowManagerRef};
|
||||
use crate::key::{FlowId, FlowPartitionId};
|
||||
use crate::kv_backend::KvBackendRef;
|
||||
use crate::peer::Peer;
|
||||
use crate::FlownodeId;
|
||||
|
||||
type FlownodeSet = Arc<HashMap<FlownodeId, Peer>>;
|
||||
/// Flow id&flow partition key
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct FlowIdent {
|
||||
pub flow_id: FlowId,
|
||||
pub partition_id: FlowPartitionId,
|
||||
}
|
||||
|
||||
impl FlowIdent {
|
||||
pub fn new(flow_id: FlowId, partition_id: FlowPartitionId) -> Self {
|
||||
Self {
|
||||
flow_id,
|
||||
partition_id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// cache for TableFlowManager, the table_id part is in the outer cache
|
||||
/// include flownode_id, flow_id, partition_id mapping to Peer
|
||||
type FlownodeFlowSet = Arc<HashMap<FlowIdent, Peer>>;
|
||||
|
||||
pub type TableFlownodeSetCacheRef = Arc<TableFlownodeSetCache>;
|
||||
|
||||
/// [TableFlownodeSetCache] caches the [TableId] to [FlownodeSet] mapping.
|
||||
pub type TableFlownodeSetCache = CacheContainer<TableId, FlownodeSet, CacheIdent>;
|
||||
pub type TableFlownodeSetCache = CacheContainer<TableId, FlownodeFlowSet, CacheIdent>;
|
||||
|
||||
/// Constructs a [TableFlownodeSetCache].
|
||||
pub fn new_table_flownode_set_cache(
|
||||
name: String,
|
||||
cache: Cache<TableId, FlownodeSet>,
|
||||
cache: Cache<TableId, FlownodeFlowSet>,
|
||||
kv_backend: KvBackendRef,
|
||||
) -> TableFlownodeSetCache {
|
||||
let table_flow_manager = Arc::new(TableFlowManager::new(kv_backend));
|
||||
@@ -47,7 +65,7 @@ pub fn new_table_flownode_set_cache(
|
||||
CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
|
||||
}
|
||||
|
||||
fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId, FlownodeSet> {
|
||||
fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId, FlownodeFlowSet> {
|
||||
Arc::new(move |&table_id| {
|
||||
let table_flow_manager = table_flow_manager.clone();
|
||||
Box::pin(async move {
|
||||
@@ -57,7 +75,12 @@ fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId,
|
||||
.map(|flows| {
|
||||
flows
|
||||
.into_iter()
|
||||
.map(|(key, value)| (key.flownode_id(), value.peer))
|
||||
.map(|(key, value)| {
|
||||
(
|
||||
FlowIdent::new(key.flow_id(), key.partition_id()),
|
||||
value.peer,
|
||||
)
|
||||
})
|
||||
.collect::<HashMap<_, _>>()
|
||||
})
|
||||
// We must cache the `HashSet` even if it's empty,
|
||||
@@ -71,26 +94,33 @@ fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId,
|
||||
}
|
||||
|
||||
async fn handle_create_flow(
|
||||
cache: &Cache<TableId, FlownodeSet>,
|
||||
cache: &Cache<TableId, FlownodeFlowSet>,
|
||||
CreateFlow {
|
||||
flow_id,
|
||||
source_table_ids,
|
||||
flownodes: flownode_peers,
|
||||
partition_to_peer_mapping: flow_part2nodes,
|
||||
}: &CreateFlow,
|
||||
) {
|
||||
for table_id in source_table_ids {
|
||||
let entry = cache.entry(*table_id);
|
||||
entry
|
||||
.and_compute_with(
|
||||
async |entry: Option<moka::Entry<u32, Arc<HashMap<u64, _>>>>| match entry {
|
||||
async |entry: Option<moka::Entry<u32, FlownodeFlowSet>>| match entry {
|
||||
Some(entry) => {
|
||||
let mut map = entry.into_value().as_ref().clone();
|
||||
map.extend(flownode_peers.iter().map(|peer| (peer.id, peer.clone())));
|
||||
map.extend(
|
||||
flow_part2nodes.iter().map(|(part, peer)| {
|
||||
(FlowIdent::new(*flow_id, *part), peer.clone())
|
||||
}),
|
||||
);
|
||||
|
||||
Op::Put(Arc::new(map))
|
||||
}
|
||||
None => Op::Put(Arc::new(HashMap::from_iter(
|
||||
flownode_peers.iter().map(|peer| (peer.id, peer.clone())),
|
||||
))),
|
||||
None => {
|
||||
Op::Put(Arc::new(HashMap::from_iter(flow_part2nodes.iter().map(
|
||||
|(part, peer)| (FlowIdent::new(*flow_id, *part), peer.clone()),
|
||||
))))
|
||||
}
|
||||
},
|
||||
)
|
||||
.await;
|
||||
@@ -98,21 +128,23 @@ async fn handle_create_flow(
|
||||
}
|
||||
|
||||
async fn handle_drop_flow(
|
||||
cache: &Cache<TableId, FlownodeSet>,
|
||||
cache: &Cache<TableId, FlownodeFlowSet>,
|
||||
DropFlow {
|
||||
flow_id,
|
||||
source_table_ids,
|
||||
flownode_ids,
|
||||
flow_part2node_id,
|
||||
}: &DropFlow,
|
||||
) {
|
||||
for table_id in source_table_ids {
|
||||
let entry = cache.entry(*table_id);
|
||||
entry
|
||||
.and_compute_with(
|
||||
async |entry: Option<moka::Entry<u32, Arc<HashMap<u64, _>>>>| match entry {
|
||||
async |entry: Option<moka::Entry<u32, FlownodeFlowSet>>| match entry {
|
||||
Some(entry) => {
|
||||
let mut set = entry.into_value().as_ref().clone();
|
||||
for flownode_id in flownode_ids {
|
||||
set.remove(flownode_id);
|
||||
for (part, _node) in flow_part2node_id {
|
||||
let key = FlowIdent::new(*flow_id, *part);
|
||||
set.remove(&key);
|
||||
}
|
||||
|
||||
Op::Put(Arc::new(set))
|
||||
@@ -128,7 +160,7 @@ async fn handle_drop_flow(
|
||||
}
|
||||
|
||||
fn invalidator<'a>(
|
||||
cache: &'a Cache<TableId, FlownodeSet>,
|
||||
cache: &'a Cache<TableId, FlownodeFlowSet>,
|
||||
ident: &'a CacheIdent,
|
||||
) -> BoxFuture<'a, Result<()>> {
|
||||
Box::pin(async move {
|
||||
@@ -154,7 +186,7 @@ mod tests {
|
||||
use moka::future::CacheBuilder;
|
||||
use table::table_name::TableName;
|
||||
|
||||
use crate::cache::flow::table_flownode::new_table_flownode_set_cache;
|
||||
use crate::cache::flow::table_flownode::{new_table_flownode_set_cache, FlowIdent};
|
||||
use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
|
||||
use crate::key::flow::flow_info::FlowInfoValue;
|
||||
use crate::key::flow::flow_route::FlowRouteValue;
|
||||
@@ -214,12 +246,16 @@ mod tests {
|
||||
let set = cache.get(1024).await.unwrap().unwrap();
|
||||
assert_eq!(
|
||||
set.as_ref().clone(),
|
||||
HashMap::from_iter((1..=3).map(|i| { (i, Peer::empty(i),) }))
|
||||
HashMap::from_iter(
|
||||
(1..=3).map(|i| { (FlowIdent::new(1024, (i - 1) as u32), Peer::empty(i),) })
|
||||
)
|
||||
);
|
||||
let set = cache.get(1025).await.unwrap().unwrap();
|
||||
assert_eq!(
|
||||
set.as_ref().clone(),
|
||||
HashMap::from_iter((1..=3).map(|i| { (i, Peer::empty(i),) }))
|
||||
HashMap::from_iter(
|
||||
(1..=3).map(|i| { (FlowIdent::new(1024, (i - 1) as u32), Peer::empty(i),) })
|
||||
)
|
||||
);
|
||||
let result = cache.get(1026).await.unwrap().unwrap();
|
||||
assert_eq!(result.len(), 0);
|
||||
@@ -231,8 +267,9 @@ mod tests {
|
||||
let cache = CacheBuilder::new(128).build();
|
||||
let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
|
||||
let ident = vec![CacheIdent::CreateFlow(CreateFlow {
|
||||
flow_id: 2001,
|
||||
source_table_ids: vec![1024, 1025],
|
||||
flownodes: (1..=5).map(Peer::empty).collect(),
|
||||
partition_to_peer_mapping: (1..=5).map(|i| (i as u32, Peer::empty(i + 1))).collect(),
|
||||
})];
|
||||
cache.invalidate(&ident).await.unwrap();
|
||||
let set = cache.get(1024).await.unwrap().unwrap();
|
||||
@@ -241,6 +278,54 @@ mod tests {
|
||||
assert_eq!(set.len(), 5);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_replace_flow() {
|
||||
let mem_kv = Arc::new(MemoryKvBackend::default());
|
||||
let cache = CacheBuilder::new(128).build();
|
||||
let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
|
||||
let ident = vec![CacheIdent::CreateFlow(CreateFlow {
|
||||
flow_id: 2001,
|
||||
source_table_ids: vec![1024, 1025],
|
||||
partition_to_peer_mapping: (1..=5).map(|i| (i as u32, Peer::empty(i + 1))).collect(),
|
||||
})];
|
||||
cache.invalidate(&ident).await.unwrap();
|
||||
let set = cache.get(1024).await.unwrap().unwrap();
|
||||
assert_eq!(set.len(), 5);
|
||||
let set = cache.get(1025).await.unwrap().unwrap();
|
||||
assert_eq!(set.len(), 5);
|
||||
|
||||
let drop_then_create_flow = vec![
|
||||
CacheIdent::DropFlow(DropFlow {
|
||||
flow_id: 2001,
|
||||
source_table_ids: vec![1024, 1025],
|
||||
flow_part2node_id: (1..=5).map(|i| (i as u32, i + 1)).collect(),
|
||||
}),
|
||||
CacheIdent::CreateFlow(CreateFlow {
|
||||
flow_id: 2001,
|
||||
source_table_ids: vec![1026, 1027],
|
||||
partition_to_peer_mapping: (11..=15)
|
||||
.map(|i| (i as u32, Peer::empty(i + 1)))
|
||||
.collect(),
|
||||
}),
|
||||
CacheIdent::FlowId(2001),
|
||||
];
|
||||
cache.invalidate(&drop_then_create_flow).await.unwrap();
|
||||
|
||||
let set = cache.get(1024).await.unwrap().unwrap();
|
||||
assert!(set.is_empty());
|
||||
|
||||
let expected = HashMap::from_iter(
|
||||
(11..=15).map(|i| (FlowIdent::new(2001, i as u32), Peer::empty(i + 1))),
|
||||
);
|
||||
let set = cache.get(1026).await.unwrap().unwrap();
|
||||
|
||||
assert_eq!(set.as_ref().clone(), expected);
|
||||
|
||||
let set = cache.get(1027).await.unwrap().unwrap();
|
||||
|
||||
assert_eq!(set.as_ref().clone(), expected);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_drop_flow() {
|
||||
let mem_kv = Arc::new(MemoryKvBackend::default());
|
||||
@@ -248,34 +333,57 @@ mod tests {
|
||||
let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
|
||||
let ident = vec![
|
||||
CacheIdent::CreateFlow(CreateFlow {
|
||||
flow_id: 2001,
|
||||
source_table_ids: vec![1024, 1025],
|
||||
flownodes: (1..=5).map(Peer::empty).collect(),
|
||||
partition_to_peer_mapping: (1..=5)
|
||||
.map(|i| (i as u32, Peer::empty(i + 1)))
|
||||
.collect(),
|
||||
}),
|
||||
CacheIdent::CreateFlow(CreateFlow {
|
||||
flow_id: 2002,
|
||||
source_table_ids: vec![1024, 1025],
|
||||
flownodes: (11..=12).map(Peer::empty).collect(),
|
||||
partition_to_peer_mapping: (11..=12)
|
||||
.map(|i| (i as u32, Peer::empty(i + 1)))
|
||||
.collect(),
|
||||
}),
|
||||
// same flownode that hold multiple flows
|
||||
CacheIdent::CreateFlow(CreateFlow {
|
||||
flow_id: 2003,
|
||||
source_table_ids: vec![1024, 1025],
|
||||
partition_to_peer_mapping: (1..=5)
|
||||
.map(|i| (i as u32, Peer::empty(i + 1)))
|
||||
.collect(),
|
||||
}),
|
||||
];
|
||||
cache.invalidate(&ident).await.unwrap();
|
||||
let set = cache.get(1024).await.unwrap().unwrap();
|
||||
assert_eq!(set.len(), 7);
|
||||
assert_eq!(set.len(), 12);
|
||||
let set = cache.get(1025).await.unwrap().unwrap();
|
||||
assert_eq!(set.len(), 7);
|
||||
assert_eq!(set.len(), 12);
|
||||
|
||||
let ident = vec![CacheIdent::DropFlow(DropFlow {
|
||||
flow_id: 2001,
|
||||
source_table_ids: vec![1024, 1025],
|
||||
flownode_ids: vec![1, 2, 3, 4, 5],
|
||||
flow_part2node_id: (1..=5).map(|i| (i as u32, i + 1)).collect(),
|
||||
})];
|
||||
cache.invalidate(&ident).await.unwrap();
|
||||
let set = cache.get(1024).await.unwrap().unwrap();
|
||||
assert_eq!(
|
||||
set.as_ref().clone(),
|
||||
HashMap::from_iter((11..=12).map(|i| { (i, Peer::empty(i),) }))
|
||||
HashMap::from_iter(
|
||||
(11..=12)
|
||||
.map(|i| (FlowIdent::new(2002, i as u32), Peer::empty(i + 1)))
|
||||
.chain((1..=5).map(|i| (FlowIdent::new(2003, i as u32), Peer::empty(i + 1))))
|
||||
)
|
||||
);
|
||||
let set = cache.get(1025).await.unwrap().unwrap();
|
||||
assert_eq!(
|
||||
set.as_ref().clone(),
|
||||
HashMap::from_iter((11..=12).map(|i| { (i, Peer::empty(i),) }))
|
||||
HashMap::from_iter(
|
||||
(11..=12)
|
||||
.map(|i| (FlowIdent::new(2002, i as u32), Peer::empty(i + 1)))
|
||||
.chain((1..=5).map(|i| (FlowIdent::new(2003, i as u32), Peer::empty(i + 1))))
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
use std::hash::{DefaultHasher, Hash, Hasher};
|
||||
use std::str::FromStr;
|
||||
|
||||
use api::v1::meta::HeartbeatRequest;
|
||||
use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest};
|
||||
use common_error::ext::ErrorExt;
|
||||
use lazy_static::lazy_static;
|
||||
use regex::Regex;
|
||||
@@ -161,6 +161,8 @@ pub struct DatanodeStatus {
|
||||
pub leader_regions: usize,
|
||||
/// How many follower regions on this node.
|
||||
pub follower_regions: usize,
|
||||
/// The workloads of the datanode.
|
||||
pub workloads: DatanodeWorkloads,
|
||||
}
|
||||
|
||||
/// The status of a frontend.
|
||||
@@ -281,6 +283,8 @@ impl TryFrom<i32> for Role {
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
|
||||
use common_workload::DatanodeWorkloadType;
|
||||
|
||||
use super::*;
|
||||
use crate::cluster::Role::{Datanode, Frontend};
|
||||
use crate::cluster::{DatanodeStatus, NodeInfo, NodeInfoKey, NodeStatus};
|
||||
@@ -313,6 +317,9 @@ mod tests {
|
||||
wcus: 2,
|
||||
leader_regions: 3,
|
||||
follower_regions: 4,
|
||||
workloads: DatanodeWorkloads {
|
||||
types: vec![DatanodeWorkloadType::Hybrid.to_i32()],
|
||||
},
|
||||
}),
|
||||
version: "".to_string(),
|
||||
git_commit: "".to_string(),
|
||||
@@ -332,6 +339,7 @@ mod tests {
|
||||
wcus: 2,
|
||||
leader_regions: 3,
|
||||
follower_regions: 4,
|
||||
..
|
||||
}),
|
||||
start_time_ms: 1,
|
||||
..
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
use std::collections::HashSet;
|
||||
use std::str::FromStr;
|
||||
|
||||
use api::v1::meta::{HeartbeatRequest, RequestHeader};
|
||||
use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, RequestHeader};
|
||||
use common_time::util as time_util;
|
||||
use lazy_static::lazy_static;
|
||||
use regex::Regex;
|
||||
@@ -27,6 +27,7 @@ use table::metadata::TableId;
|
||||
|
||||
use crate::error;
|
||||
use crate::error::Result;
|
||||
use crate::heartbeat::utils::get_datanode_workloads;
|
||||
|
||||
pub(crate) const DATANODE_LEASE_PREFIX: &str = "__meta_datanode_lease";
|
||||
const INACTIVE_REGION_PREFIX: &str = "__meta_inactive_region";
|
||||
@@ -65,6 +66,8 @@ pub struct Stat {
|
||||
pub region_stats: Vec<RegionStat>,
|
||||
// The node epoch is used to check whether the node has restarted or redeployed.
|
||||
pub node_epoch: u64,
|
||||
/// The datanode workloads.
|
||||
pub datanode_workloads: DatanodeWorkloads,
|
||||
}
|
||||
|
||||
/// The statistics of a region.
|
||||
@@ -197,6 +200,7 @@ impl TryFrom<&HeartbeatRequest> for Stat {
|
||||
peer,
|
||||
region_stats,
|
||||
node_epoch,
|
||||
node_workloads,
|
||||
..
|
||||
} = value;
|
||||
|
||||
@@ -207,6 +211,7 @@ impl TryFrom<&HeartbeatRequest> for Stat {
|
||||
.map(RegionStat::from)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let datanode_workloads = get_datanode_workloads(node_workloads.as_ref());
|
||||
Ok(Self {
|
||||
timestamp_millis: time_util::current_time_millis(),
|
||||
// datanode id
|
||||
@@ -218,6 +223,7 @@ impl TryFrom<&HeartbeatRequest> for Stat {
|
||||
region_num: region_stats.len() as u64,
|
||||
region_stats,
|
||||
node_epoch: *node_epoch,
|
||||
datanode_workloads,
|
||||
})
|
||||
}
|
||||
(header, _) => Err(header.clone()),
|
||||
|
||||
@@ -39,7 +39,7 @@ use crate::cache_invalidator::Context;
|
||||
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error};
|
||||
use crate::ddl::DdlContext;
|
||||
use crate::error::{self, Result, UnexpectedSnafu};
|
||||
use crate::instruction::{CacheIdent, CreateFlow};
|
||||
use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
|
||||
use crate::key::flow::flow_info::FlowInfoValue;
|
||||
use crate::key::flow::flow_route::FlowRouteValue;
|
||||
use crate::key::table_name::TableNameKey;
|
||||
@@ -70,6 +70,7 @@ impl CreateFlowProcedure {
|
||||
query_context,
|
||||
state: CreateFlowState::Prepare,
|
||||
prev_flow_info_value: None,
|
||||
did_replace: false,
|
||||
flow_type: None,
|
||||
},
|
||||
}
|
||||
@@ -224,6 +225,7 @@ impl CreateFlowProcedure {
|
||||
.update_flow_metadata(flow_id, prev_flow_value, &flow_info, flow_routes)
|
||||
.await?;
|
||||
info!("Replaced flow metadata for flow {flow_id}");
|
||||
self.data.did_replace = true;
|
||||
} else {
|
||||
self.context
|
||||
.flow_metadata_manager
|
||||
@@ -240,22 +242,43 @@ impl CreateFlowProcedure {
|
||||
debug_assert!(self.data.state == CreateFlowState::InvalidateFlowCache);
|
||||
// Safety: The flow id must be allocated.
|
||||
let flow_id = self.data.flow_id.unwrap();
|
||||
let did_replace = self.data.did_replace;
|
||||
let ctx = Context {
|
||||
subject: Some("Invalidate flow cache by creating flow".to_string()),
|
||||
};
|
||||
|
||||
let mut caches = vec![];
|
||||
|
||||
// if did replaced, invalidate the flow cache with drop the old flow
|
||||
if did_replace {
|
||||
let old_flow_info = self.data.prev_flow_info_value.as_ref().unwrap();
|
||||
|
||||
// only drop flow is needed, since flow name haven't changed, and flow id already invalidated below
|
||||
caches.extend([CacheIdent::DropFlow(DropFlow {
|
||||
flow_id,
|
||||
source_table_ids: old_flow_info.source_table_ids.clone(),
|
||||
flow_part2node_id: old_flow_info.flownode_ids().clone().into_iter().collect(),
|
||||
})]);
|
||||
}
|
||||
|
||||
let (_flow_info, flow_routes) = (&self.data).into();
|
||||
let flow_part2peers = flow_routes
|
||||
.into_iter()
|
||||
.map(|(part_id, route)| (part_id, route.peer))
|
||||
.collect();
|
||||
|
||||
caches.extend([
|
||||
CacheIdent::CreateFlow(CreateFlow {
|
||||
flow_id,
|
||||
source_table_ids: self.data.source_table_ids.clone(),
|
||||
partition_to_peer_mapping: flow_part2peers,
|
||||
}),
|
||||
CacheIdent::FlowId(flow_id),
|
||||
]);
|
||||
|
||||
self.context
|
||||
.cache_invalidator
|
||||
.invalidate(
|
||||
&ctx,
|
||||
&[
|
||||
CacheIdent::CreateFlow(CreateFlow {
|
||||
source_table_ids: self.data.source_table_ids.clone(),
|
||||
flownodes: self.data.peers.clone(),
|
||||
}),
|
||||
CacheIdent::FlowId(flow_id),
|
||||
],
|
||||
)
|
||||
.invalidate(&ctx, &caches)
|
||||
.await?;
|
||||
|
||||
Ok(Status::done_with_output(flow_id))
|
||||
@@ -377,6 +400,10 @@ pub struct CreateFlowData {
|
||||
/// For verify if prev value is consistent when need to update flow metadata.
|
||||
/// only set when `or_replace` is true.
|
||||
pub(crate) prev_flow_info_value: Option<DeserializedValueWithBytes<FlowInfoValue>>,
|
||||
/// Only set to true when replace actually happened.
|
||||
/// This is used to determine whether to invalidate the cache.
|
||||
#[serde(default)]
|
||||
pub(crate) did_replace: bool,
|
||||
pub(crate) flow_type: Option<FlowType>,
|
||||
}
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
mod metadata;
|
||||
|
||||
use api::v1::flow::{flow_request, DropRequest, FlowRequest};
|
||||
use async_trait::async_trait;
|
||||
use common_catalog::format_full_flow_name;
|
||||
@@ -153,6 +154,12 @@ impl DropFlowProcedure {
|
||||
};
|
||||
let flow_info_value = self.data.flow_info_value.as_ref().unwrap();
|
||||
|
||||
let flow_part2nodes = flow_info_value
|
||||
.flownode_ids()
|
||||
.clone()
|
||||
.into_iter()
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
self.context
|
||||
.cache_invalidator
|
||||
.invalidate(
|
||||
@@ -164,8 +171,9 @@ impl DropFlowProcedure {
|
||||
flow_name: flow_info_value.flow_name.to_string(),
|
||||
}),
|
||||
CacheIdent::DropFlow(DropFlow {
|
||||
flow_id,
|
||||
source_table_ids: flow_info_value.source_table_ids.clone(),
|
||||
flownode_ids: flow_info_value.flownode_ids.values().cloned().collect(),
|
||||
flow_part2node_id: flow_part2nodes,
|
||||
}),
|
||||
],
|
||||
)
|
||||
|
||||
@@ -514,11 +514,25 @@ pub enum Error {
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to build a Kafka partition client, topic: {}, partition: {}",
|
||||
"Failed to get a Kafka partition client, topic: {}, partition: {}",
|
||||
topic,
|
||||
partition
|
||||
))]
|
||||
BuildKafkaPartitionClient {
|
||||
KafkaPartitionClient {
|
||||
topic: String,
|
||||
partition: i32,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
#[snafu(source)]
|
||||
error: rskafka::client::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to get offset from Kafka, topic: {}, partition: {}",
|
||||
topic,
|
||||
partition
|
||||
))]
|
||||
KafkaGetOffset {
|
||||
topic: String,
|
||||
partition: i32,
|
||||
#[snafu(implicit)]
|
||||
@@ -843,7 +857,7 @@ impl ErrorExt for Error {
|
||||
| EncodeWalOptions { .. }
|
||||
| BuildKafkaClient { .. }
|
||||
| BuildKafkaCtrlClient { .. }
|
||||
| BuildKafkaPartitionClient { .. }
|
||||
| KafkaPartitionClient { .. }
|
||||
| ResolveKafkaEndpoint { .. }
|
||||
| ProduceRecord { .. }
|
||||
| CreateKafkaWalTopic { .. }
|
||||
@@ -852,7 +866,8 @@ impl ErrorExt for Error {
|
||||
| ProcedureOutput { .. }
|
||||
| FromUtf8 { .. }
|
||||
| MetadataCorruption { .. }
|
||||
| ParseWalOptions { .. } => StatusCode::Unexpected,
|
||||
| ParseWalOptions { .. }
|
||||
| KafkaGetOffset { .. } => StatusCode::Unexpected,
|
||||
|
||||
SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,
|
||||
|
||||
|
||||
@@ -12,9 +12,12 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::meta::heartbeat_request::NodeWorkloads;
|
||||
use api::v1::meta::mailbox_message::Payload;
|
||||
use api::v1::meta::MailboxMessage;
|
||||
use api::v1::meta::{DatanodeWorkloads, MailboxMessage};
|
||||
use common_telemetry::warn;
|
||||
use common_time::util::current_time_millis;
|
||||
use common_workload::DatanodeWorkloadType;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::error::{self, Result};
|
||||
@@ -56,3 +59,39 @@ pub fn outgoing_message_to_mailbox_message(
|
||||
)),
|
||||
})
|
||||
}
|
||||
|
||||
/// Extracts datanode workloads from the provided optional `NodeWorkloads`.
|
||||
///
|
||||
/// Returns default datanode workloads if the input is `None`.
|
||||
pub fn get_datanode_workloads(node_workloads: Option<&NodeWorkloads>) -> DatanodeWorkloads {
|
||||
match node_workloads {
|
||||
Some(NodeWorkloads::Datanode(datanode_workloads)) => {
|
||||
let mut datanode_workloads = datanode_workloads.clone();
|
||||
let unexpected_workloads = datanode_workloads
|
||||
.types
|
||||
.extract_if(.., |t| DatanodeWorkloadType::from_i32(*t).is_none())
|
||||
.collect::<Vec<_>>();
|
||||
if !unexpected_workloads.is_empty() {
|
||||
warn!("Unexpected datanode workloads: {:?}", unexpected_workloads);
|
||||
}
|
||||
datanode_workloads
|
||||
}
|
||||
_ => DatanodeWorkloads {
|
||||
types: vec![DatanodeWorkloadType::Hybrid.to_i32()],
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_get_datanode_workloads() {
|
||||
let node_workloads = Some(NodeWorkloads::Datanode(DatanodeWorkloads {
|
||||
types: vec![DatanodeWorkloadType::Hybrid.to_i32(), 100],
|
||||
}));
|
||||
let workloads = get_datanode_workloads(node_workloads.as_ref());
|
||||
assert_eq!(workloads.types, vec![DatanodeWorkloadType::Hybrid.to_i32()]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,7 +24,7 @@ use table::table_name::TableName;
|
||||
|
||||
use crate::flow_name::FlowName;
|
||||
use crate::key::schema_name::SchemaName;
|
||||
use crate::key::FlowId;
|
||||
use crate::key::{FlowId, FlowPartitionId};
|
||||
use crate::peer::Peer;
|
||||
use crate::{DatanodeId, FlownodeId};
|
||||
|
||||
@@ -184,14 +184,19 @@ pub enum CacheIdent {
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct CreateFlow {
|
||||
/// The unique identifier for the flow.
|
||||
pub flow_id: FlowId,
|
||||
pub source_table_ids: Vec<TableId>,
|
||||
pub flownodes: Vec<Peer>,
|
||||
/// Mapping of flow partition to peer information
|
||||
pub partition_to_peer_mapping: Vec<(FlowPartitionId, Peer)>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct DropFlow {
|
||||
pub flow_id: FlowId,
|
||||
pub source_table_ids: Vec<TableId>,
|
||||
pub flownode_ids: Vec<FlownodeId>,
|
||||
/// Mapping of flow partition to flownode id
|
||||
pub flow_part2node_id: Vec<(FlowPartitionId, FlownodeId)>,
|
||||
}
|
||||
|
||||
/// Flushes a batch of regions.
|
||||
|
||||
@@ -246,27 +246,32 @@ impl FlowMetadataManager {
|
||||
new_flow_info: &FlowInfoValue,
|
||||
flow_routes: Vec<(FlowPartitionId, FlowRouteValue)>,
|
||||
) -> Result<()> {
|
||||
let (create_flow_flow_name_txn, on_create_flow_flow_name_failure) =
|
||||
let (update_flow_flow_name_txn, on_create_flow_flow_name_failure) =
|
||||
self.flow_name_manager.build_update_txn(
|
||||
&new_flow_info.catalog_name,
|
||||
&new_flow_info.flow_name,
|
||||
flow_id,
|
||||
)?;
|
||||
|
||||
let (create_flow_txn, on_create_flow_failure) =
|
||||
let (update_flow_txn, on_create_flow_failure) =
|
||||
self.flow_info_manager
|
||||
.build_update_txn(flow_id, current_flow_info, new_flow_info)?;
|
||||
|
||||
let create_flow_routes_txn = self
|
||||
.flow_route_manager
|
||||
.build_create_txn(flow_id, flow_routes.clone())?;
|
||||
|
||||
let create_flownode_flow_txn = self
|
||||
.flownode_flow_manager
|
||||
.build_create_txn(flow_id, new_flow_info.flownode_ids().clone());
|
||||
|
||||
let create_table_flow_txn = self.table_flow_manager.build_create_txn(
|
||||
let update_flow_routes_txn = self.flow_route_manager.build_update_txn(
|
||||
flow_id,
|
||||
current_flow_info,
|
||||
flow_routes.clone(),
|
||||
)?;
|
||||
|
||||
let update_flownode_flow_txn = self.flownode_flow_manager.build_update_txn(
|
||||
flow_id,
|
||||
current_flow_info,
|
||||
new_flow_info.flownode_ids().clone(),
|
||||
);
|
||||
|
||||
let update_table_flow_txn = self.table_flow_manager.build_update_txn(
|
||||
flow_id,
|
||||
current_flow_info,
|
||||
flow_routes
|
||||
.into_iter()
|
||||
.map(|(partition_id, route)| (partition_id, TableFlowValue { peer: route.peer }))
|
||||
@@ -275,11 +280,11 @@ impl FlowMetadataManager {
|
||||
)?;
|
||||
|
||||
let txn = Txn::merge_all(vec![
|
||||
create_flow_flow_name_txn,
|
||||
create_flow_txn,
|
||||
create_flow_routes_txn,
|
||||
create_flownode_flow_txn,
|
||||
create_table_flow_txn,
|
||||
update_flow_flow_name_txn,
|
||||
update_flow_txn,
|
||||
update_flow_routes_txn,
|
||||
update_flownode_flow_txn,
|
||||
update_table_flow_txn,
|
||||
]);
|
||||
info!(
|
||||
"Creating flow {}.{}({}), with {} txn operations",
|
||||
@@ -783,6 +788,141 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_update_flow_metadata_diff_flownode() {
|
||||
let mem_kv = Arc::new(MemoryKvBackend::default());
|
||||
let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
|
||||
let flow_id = 10;
|
||||
let flow_value = test_flow_info_value(
|
||||
"flow",
|
||||
[(0u32, 1u64), (1u32, 2u64)].into(),
|
||||
vec![1024, 1025, 1026],
|
||||
);
|
||||
let flow_routes = vec![
|
||||
(
|
||||
0u32,
|
||||
FlowRouteValue {
|
||||
peer: Peer::empty(1),
|
||||
},
|
||||
),
|
||||
(
|
||||
1,
|
||||
FlowRouteValue {
|
||||
peer: Peer::empty(2),
|
||||
},
|
||||
),
|
||||
];
|
||||
flow_metadata_manager
|
||||
.create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let new_flow_value = {
|
||||
let mut tmp = flow_value.clone();
|
||||
tmp.raw_sql = "new".to_string();
|
||||
// move to different flownodes
|
||||
tmp.flownode_ids = [(0, 3u64), (1, 4u64)].into();
|
||||
tmp
|
||||
};
|
||||
let new_flow_routes = vec![
|
||||
(
|
||||
0u32,
|
||||
FlowRouteValue {
|
||||
peer: Peer::empty(3),
|
||||
},
|
||||
),
|
||||
(
|
||||
1,
|
||||
FlowRouteValue {
|
||||
peer: Peer::empty(4),
|
||||
},
|
||||
),
|
||||
];
|
||||
|
||||
// Update flow instead
|
||||
flow_metadata_manager
|
||||
.update_flow_metadata(
|
||||
flow_id,
|
||||
&DeserializedValueWithBytes::from_inner(flow_value.clone()),
|
||||
&new_flow_value,
|
||||
new_flow_routes.clone(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let got = flow_metadata_manager
|
||||
.flow_info_manager()
|
||||
.get(flow_id)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let routes = flow_metadata_manager
|
||||
.flow_route_manager()
|
||||
.routes(flow_id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
routes,
|
||||
vec![
|
||||
(
|
||||
FlowRouteKey::new(flow_id, 0),
|
||||
FlowRouteValue {
|
||||
peer: Peer::empty(3),
|
||||
},
|
||||
),
|
||||
(
|
||||
FlowRouteKey::new(flow_id, 1),
|
||||
FlowRouteValue {
|
||||
peer: Peer::empty(4),
|
||||
},
|
||||
),
|
||||
]
|
||||
);
|
||||
assert_eq!(got, new_flow_value);
|
||||
|
||||
let flows = flow_metadata_manager
|
||||
.flownode_flow_manager()
|
||||
.flows(1)
|
||||
.try_collect::<Vec<_>>()
|
||||
.await
|
||||
.unwrap();
|
||||
// should moved to different flownode
|
||||
assert_eq!(flows, vec![]);
|
||||
|
||||
let flows = flow_metadata_manager
|
||||
.flownode_flow_manager()
|
||||
.flows(3)
|
||||
.try_collect::<Vec<_>>()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(flows, vec![(flow_id, 0)]);
|
||||
|
||||
for table_id in [1024, 1025, 1026] {
|
||||
let nodes = flow_metadata_manager
|
||||
.table_flow_manager()
|
||||
.flows(table_id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
nodes,
|
||||
vec![
|
||||
(
|
||||
TableFlowKey::new(table_id, 3, flow_id, 0),
|
||||
TableFlowValue {
|
||||
peer: Peer::empty(3)
|
||||
}
|
||||
),
|
||||
(
|
||||
TableFlowKey::new(table_id, 4, flow_id, 1),
|
||||
TableFlowValue {
|
||||
peer: Peer::empty(4)
|
||||
}
|
||||
)
|
||||
]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_update_flow_metadata_flow_replace_diff_id_err() {
|
||||
let mem_kv = Arc::new(MemoryKvBackend::default());
|
||||
|
||||
@@ -272,10 +272,11 @@ impl FlowInfoManager {
|
||||
let raw_value = new_flow_value.try_as_raw_value()?;
|
||||
let prev_value = current_flow_value.get_raw_bytes();
|
||||
let txn = Txn::new()
|
||||
.when(vec![
|
||||
Compare::new(key.clone(), CompareOp::NotEqual, None),
|
||||
Compare::new(key.clone(), CompareOp::Equal, Some(prev_value)),
|
||||
])
|
||||
.when(vec![Compare::new(
|
||||
key.clone(),
|
||||
CompareOp::Equal,
|
||||
Some(prev_value),
|
||||
)])
|
||||
.and_then(vec![TxnOp::Put(key.clone(), raw_value)])
|
||||
.or_else(vec![TxnOp::Get(key.clone())]);
|
||||
|
||||
|
||||
@@ -19,9 +19,12 @@ use serde::{Deserialize, Serialize};
|
||||
use snafu::OptionExt;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::key::flow::flow_info::FlowInfoValue;
|
||||
use crate::key::flow::{flownode_addr_helper, FlowScoped};
|
||||
use crate::key::node_address::NodeAddressKey;
|
||||
use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetadataKey, MetadataValue};
|
||||
use crate::key::{
|
||||
BytesAdapter, DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey, MetadataValue,
|
||||
};
|
||||
use crate::kv_backend::txn::{Txn, TxnOp};
|
||||
use crate::kv_backend::KvBackendRef;
|
||||
use crate::peer::Peer;
|
||||
@@ -204,6 +207,30 @@ impl FlowRouteManager {
|
||||
Ok(Txn::new().and_then(txns))
|
||||
}
|
||||
|
||||
/// Builds a update flow routes transaction.
|
||||
///
|
||||
/// Puts `__flow/route/{flow_id}/{partition_id}` keys.
|
||||
/// Also removes `__flow/route/{flow_id}/{old_partition_id}` keys.
|
||||
pub(crate) fn build_update_txn<I: IntoIterator<Item = (FlowPartitionId, FlowRouteValue)>>(
|
||||
&self,
|
||||
flow_id: FlowId,
|
||||
current_flow_info: &DeserializedValueWithBytes<FlowInfoValue>,
|
||||
flow_routes: I,
|
||||
) -> Result<Txn> {
|
||||
let del_txns = current_flow_info.flownode_ids().keys().map(|partition_id| {
|
||||
let key = FlowRouteKey::new(flow_id, *partition_id).to_bytes();
|
||||
Ok(TxnOp::Delete(key))
|
||||
});
|
||||
|
||||
let put_txns = flow_routes.into_iter().map(|(partition_id, route)| {
|
||||
let key = FlowRouteKey::new(flow_id, partition_id).to_bytes();
|
||||
|
||||
Ok(TxnOp::Put(key, route.try_as_raw_value()?))
|
||||
});
|
||||
let txns = del_txns.chain(put_txns).collect::<Result<Vec<_>>>()?;
|
||||
Ok(Txn::new().and_then(txns))
|
||||
}
|
||||
|
||||
async fn remap_flow_route_addresses(
|
||||
&self,
|
||||
flow_routes: &mut [(FlowRouteKey, FlowRouteValue)],
|
||||
|
||||
@@ -19,8 +19,9 @@ use regex::Regex;
|
||||
use snafu::OptionExt;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::key::flow::flow_info::FlowInfoValue;
|
||||
use crate::key::flow::FlowScoped;
|
||||
use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetadataKey};
|
||||
use crate::key::{BytesAdapter, DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey};
|
||||
use crate::kv_backend::txn::{Txn, TxnOp};
|
||||
use crate::kv_backend::KvBackendRef;
|
||||
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
|
||||
@@ -202,6 +203,33 @@ impl FlownodeFlowManager {
|
||||
|
||||
Txn::new().and_then(txns)
|
||||
}
|
||||
|
||||
/// Builds a update flownode flow transaction.
|
||||
///
|
||||
/// Puts `__flownode_flow/{flownode_id}/{flow_id}/{partition_id}` keys.
|
||||
/// Remove the old `__flownode_flow/{old_flownode_id}/{flow_id}/{old_partition_id}` keys.
|
||||
pub(crate) fn build_update_txn<I: IntoIterator<Item = (FlowPartitionId, FlownodeId)>>(
|
||||
&self,
|
||||
flow_id: FlowId,
|
||||
current_flow_info: &DeserializedValueWithBytes<FlowInfoValue>,
|
||||
flownode_ids: I,
|
||||
) -> Txn {
|
||||
let del_txns =
|
||||
current_flow_info
|
||||
.flownode_ids()
|
||||
.iter()
|
||||
.map(|(partition_id, flownode_id)| {
|
||||
let key = FlownodeFlowKey::new(*flownode_id, flow_id, *partition_id).to_bytes();
|
||||
TxnOp::Delete(key)
|
||||
});
|
||||
let put_txns = flownode_ids.into_iter().map(|(partition_id, flownode_id)| {
|
||||
let key = FlownodeFlowKey::new(flownode_id, flow_id, partition_id).to_bytes();
|
||||
TxnOp::Put(key, vec![])
|
||||
});
|
||||
let txns = del_txns.chain(put_txns).collect::<Vec<_>>();
|
||||
|
||||
Txn::new().and_then(txns)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -22,9 +22,12 @@ use snafu::OptionExt;
|
||||
use table::metadata::TableId;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::key::flow::flow_info::FlowInfoValue;
|
||||
use crate::key::flow::{flownode_addr_helper, FlowScoped};
|
||||
use crate::key::node_address::NodeAddressKey;
|
||||
use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetadataKey, MetadataValue};
|
||||
use crate::key::{
|
||||
BytesAdapter, DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey, MetadataValue,
|
||||
};
|
||||
use crate::kv_backend::txn::{Txn, TxnOp};
|
||||
use crate::kv_backend::KvBackendRef;
|
||||
use crate::peer::Peer;
|
||||
@@ -215,7 +218,7 @@ impl TableFlowManager {
|
||||
|
||||
/// Builds a create table flow transaction.
|
||||
///
|
||||
/// Puts `__flow/source_table/{table_id}/{node_id}/{partition_id}` keys.
|
||||
/// Puts `__flow/source_table/{table_id}/{node_id}/{flow_id}/{partition_id}` keys.
|
||||
pub fn build_create_txn(
|
||||
&self,
|
||||
flow_id: FlowId,
|
||||
@@ -239,6 +242,44 @@ impl TableFlowManager {
|
||||
Ok(Txn::new().and_then(txns))
|
||||
}
|
||||
|
||||
/// Builds a update table flow transaction.
|
||||
///
|
||||
/// Puts `__flow/source_table/{table_id}/{node_id}/{flow_id}/{partition_id}` keys,
|
||||
/// Also remove previous
|
||||
/// `__flow/source_table/{table_id}/{old_node_id}/{flow_id}/{partition_id}` keys.
|
||||
pub fn build_update_txn(
|
||||
&self,
|
||||
flow_id: FlowId,
|
||||
current_flow_info: &DeserializedValueWithBytes<FlowInfoValue>,
|
||||
table_flow_values: Vec<(FlowPartitionId, TableFlowValue)>,
|
||||
source_table_ids: &[TableId],
|
||||
) -> Result<Txn> {
|
||||
let mut txns = Vec::with_capacity(2 * source_table_ids.len() * table_flow_values.len());
|
||||
|
||||
// first remove the old keys
|
||||
for (part_id, node_id) in current_flow_info.flownode_ids() {
|
||||
for source_table_id in current_flow_info.source_table_ids() {
|
||||
txns.push(TxnOp::Delete(
|
||||
TableFlowKey::new(*source_table_id, *node_id, flow_id, *part_id).to_bytes(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
for (partition_id, table_flow_value) in table_flow_values {
|
||||
let flownode_id = table_flow_value.peer.id;
|
||||
let value = table_flow_value.try_as_raw_value()?;
|
||||
for source_table_id in source_table_ids {
|
||||
txns.push(TxnOp::Put(
|
||||
TableFlowKey::new(*source_table_id, flownode_id, flow_id, partition_id)
|
||||
.to_bytes(),
|
||||
value.clone(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Txn::new().and_then(txns))
|
||||
}
|
||||
|
||||
async fn remap_table_flow_addresses(
|
||||
&self,
|
||||
table_flows: &mut [(TableFlowKey, TableFlowValue)],
|
||||
|
||||
@@ -20,6 +20,8 @@ use api::v1::region::{InsertRequests, RegionRequest};
|
||||
pub use common_base::AffectedRows;
|
||||
use common_query::request::QueryRequest;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
|
||||
use common_wal::config::kafka::MetasrvKafkaConfig;
|
||||
|
||||
use crate::cache_invalidator::DummyCacheInvalidator;
|
||||
use crate::ddl::flow_meta::FlowMetadataAllocator;
|
||||
@@ -37,7 +39,8 @@ use crate::peer::{Peer, PeerLookupService};
|
||||
use crate::region_keeper::MemoryRegionKeeper;
|
||||
use crate::region_registry::LeaderRegionRegistry;
|
||||
use crate::sequence::SequenceBuilder;
|
||||
use crate::wal_options_allocator::WalOptionsAllocator;
|
||||
use crate::wal_options_allocator::topic_pool::KafkaTopicPool;
|
||||
use crate::wal_options_allocator::{build_kafka_topic_creator, WalOptionsAllocator};
|
||||
use crate::{DatanodeId, FlownodeId};
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@@ -199,3 +202,34 @@ impl PeerLookupService for NoopPeerLookupService {
|
||||
Ok(Some(Peer::empty(id)))
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a kafka topic pool for testing.
|
||||
pub async fn test_kafka_topic_pool(
|
||||
broker_endpoints: Vec<String>,
|
||||
num_topics: usize,
|
||||
auto_create_topics: bool,
|
||||
topic_name_prefix: Option<&str>,
|
||||
) -> KafkaTopicPool {
|
||||
let mut config = MetasrvKafkaConfig {
|
||||
connection: KafkaConnectionConfig {
|
||||
broker_endpoints,
|
||||
..Default::default()
|
||||
},
|
||||
kafka_topic: KafkaTopicConfig {
|
||||
num_topics,
|
||||
|
||||
..Default::default()
|
||||
},
|
||||
auto_create_topics,
|
||||
..Default::default()
|
||||
};
|
||||
if let Some(prefix) = topic_name_prefix {
|
||||
config.kafka_topic.topic_name_prefix = prefix.to_string();
|
||||
}
|
||||
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
|
||||
let topic_creator = build_kafka_topic_creator(&config.connection, &config.kafka_topic)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
KafkaTopicPool::new(&config, kv_backend, topic_creator)
|
||||
}
|
||||
|
||||
@@ -112,7 +112,9 @@ pub async fn build_wal_options_allocator(
|
||||
NAME_PATTERN_REGEX.is_match(prefix),
|
||||
InvalidTopicNamePrefixSnafu { prefix }
|
||||
);
|
||||
let topic_creator = build_kafka_topic_creator(kafka_config).await?;
|
||||
let topic_creator =
|
||||
build_kafka_topic_creator(&kafka_config.connection, &kafka_config.kafka_topic)
|
||||
.await?;
|
||||
let topic_pool = KafkaTopicPool::new(kafka_config, kv_backend, topic_creator);
|
||||
Ok(WalOptionsAllocator::Kafka(topic_pool))
|
||||
}
|
||||
@@ -151,13 +153,16 @@ pub fn prepare_wal_options(
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
|
||||
use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
|
||||
use common_wal::config::kafka::common::KafkaTopicConfig;
|
||||
use common_wal::config::kafka::MetasrvKafkaConfig;
|
||||
use common_wal::test_util::run_test_with_kafka_wal;
|
||||
use common_wal::maybe_skip_kafka_integration_test;
|
||||
use common_wal::test_util::get_kafka_endpoints;
|
||||
|
||||
use super::*;
|
||||
use crate::error::Error;
|
||||
use crate::kv_backend::memory::MemoryKvBackend;
|
||||
use crate::test_util::test_kafka_topic_pool;
|
||||
use crate::wal_options_allocator::selector::RoundRobinTopicSelector;
|
||||
|
||||
// Tests that the wal options allocator could successfully allocate raft-engine wal options.
|
||||
#[tokio::test]
|
||||
@@ -197,55 +202,42 @@ mod tests {
|
||||
assert_matches!(got, Error::InvalidTopicNamePrefix { .. });
|
||||
}
|
||||
|
||||
// Tests that the wal options allocator could successfully allocate Kafka wal options.
|
||||
#[tokio::test]
|
||||
async fn test_allocator_with_kafka() {
|
||||
run_test_with_kafka_wal(|broker_endpoints| {
|
||||
Box::pin(async {
|
||||
let topics = (0..256)
|
||||
.map(|i| format!("test_allocator_with_kafka_{}_{}", i, uuid::Uuid::new_v4()))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Creates a topic manager.
|
||||
let kafka_topic = KafkaTopicConfig {
|
||||
replication_factor: broker_endpoints.len() as i16,
|
||||
..Default::default()
|
||||
};
|
||||
let config = MetasrvKafkaConfig {
|
||||
connection: KafkaConnectionConfig {
|
||||
broker_endpoints,
|
||||
..Default::default()
|
||||
},
|
||||
kafka_topic,
|
||||
..Default::default()
|
||||
};
|
||||
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
|
||||
let topic_creator = build_kafka_topic_creator(&config).await.unwrap();
|
||||
let mut topic_pool = KafkaTopicPool::new(&config, kv_backend, topic_creator);
|
||||
topic_pool.topics.clone_from(&topics);
|
||||
topic_pool.selector = Arc::new(selector::RoundRobinTopicSelector::default());
|
||||
|
||||
// Creates an options allocator.
|
||||
let allocator = WalOptionsAllocator::Kafka(topic_pool);
|
||||
allocator.start().await.unwrap();
|
||||
|
||||
let num_regions = 32;
|
||||
let regions = (0..num_regions).collect::<Vec<_>>();
|
||||
let got = allocate_region_wal_options(regions.clone(), &allocator, false).unwrap();
|
||||
|
||||
// Check the allocated wal options contain the expected topics.
|
||||
let expected = (0..num_regions)
|
||||
.map(|i| {
|
||||
let options = WalOptions::Kafka(KafkaWalOptions {
|
||||
topic: topics[i as usize].clone(),
|
||||
});
|
||||
(i, serde_json::to_string(&options).unwrap())
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
assert_eq!(got, expected);
|
||||
})
|
||||
})
|
||||
async fn test_allocator_with_kafka_allocate_wal_options() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
maybe_skip_kafka_integration_test!();
|
||||
let num_topics = 5;
|
||||
let mut topic_pool = test_kafka_topic_pool(
|
||||
get_kafka_endpoints(),
|
||||
num_topics,
|
||||
true,
|
||||
Some("test_allocator_with_kafka"),
|
||||
)
|
||||
.await;
|
||||
topic_pool.selector = Arc::new(RoundRobinTopicSelector::default());
|
||||
let topics = topic_pool.topics.clone();
|
||||
// clean up the topics before test
|
||||
let topic_creator = topic_pool.topic_creator();
|
||||
topic_creator.delete_topics(&topics).await.unwrap();
|
||||
|
||||
// Creates an options allocator.
|
||||
let allocator = WalOptionsAllocator::Kafka(topic_pool);
|
||||
allocator.start().await.unwrap();
|
||||
|
||||
let num_regions = 3;
|
||||
let regions = (0..num_regions).collect::<Vec<_>>();
|
||||
let got = allocate_region_wal_options(regions.clone(), &allocator, false).unwrap();
|
||||
|
||||
// Check the allocated wal options contain the expected topics.
|
||||
let expected = (0..num_regions)
|
||||
.map(|i| {
|
||||
let options = WalOptions::Kafka(KafkaWalOptions {
|
||||
topic: topics[i as usize].clone(),
|
||||
});
|
||||
(i, serde_json::to_string(&options).unwrap())
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
assert_eq!(got, expected);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -12,20 +12,21 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_telemetry::{error, info};
|
||||
use common_wal::config::kafka::common::DEFAULT_BACKOFF_CONFIG;
|
||||
use common_wal::config::kafka::MetasrvKafkaConfig;
|
||||
use common_telemetry::{debug, error, info};
|
||||
use common_wal::config::kafka::common::{
|
||||
KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_BACKOFF_CONFIG,
|
||||
};
|
||||
use rskafka::client::error::Error as RsKafkaError;
|
||||
use rskafka::client::error::ProtocolError::TopicAlreadyExists;
|
||||
use rskafka::client::partition::{Compression, UnknownTopicHandling};
|
||||
use rskafka::client::partition::{Compression, OffsetAt, PartitionClient, UnknownTopicHandling};
|
||||
use rskafka::client::{Client, ClientBuilder};
|
||||
use rskafka::record::Record;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{
|
||||
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, BuildKafkaPartitionClientSnafu,
|
||||
CreateKafkaWalTopicSnafu, ProduceRecordSnafu, ResolveKafkaEndpointSnafu, Result,
|
||||
TlsConfigSnafu,
|
||||
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaWalTopicSnafu,
|
||||
KafkaGetOffsetSnafu, KafkaPartitionClientSnafu, ProduceRecordSnafu, ResolveKafkaEndpointSnafu,
|
||||
Result, TlsConfigSnafu,
|
||||
};
|
||||
|
||||
// Each topic only has one partition for now.
|
||||
@@ -70,21 +71,47 @@ impl KafkaTopicCreator {
|
||||
info!("The topic {} already exists", topic);
|
||||
Ok(())
|
||||
} else {
|
||||
error!("Failed to create a topic {}, error {:?}", topic, e);
|
||||
error!(e; "Failed to create a topic {}", topic);
|
||||
Err(e).context(CreateKafkaWalTopicSnafu)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn append_noop_record(&self, topic: &String, client: &Client) -> Result<()> {
|
||||
let partition_client = client
|
||||
async fn prepare_topic(&self, topic: &String) -> Result<()> {
|
||||
let partition_client = self.partition_client(topic).await?;
|
||||
self.append_noop_record(topic, &partition_client).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Creates a [PartitionClient] for the given topic.
|
||||
async fn partition_client(&self, topic: &str) -> Result<PartitionClient> {
|
||||
self.client
|
||||
.partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry)
|
||||
.await
|
||||
.context(BuildKafkaPartitionClientSnafu {
|
||||
.context(KafkaPartitionClientSnafu {
|
||||
topic,
|
||||
partition: DEFAULT_PARTITION,
|
||||
})
|
||||
}
|
||||
|
||||
/// Appends a noop record to the topic.
|
||||
/// It only appends a noop record if the topic is empty.
|
||||
async fn append_noop_record(
|
||||
&self,
|
||||
topic: &String,
|
||||
partition_client: &PartitionClient,
|
||||
) -> Result<()> {
|
||||
let end_offset = partition_client
|
||||
.get_offset(OffsetAt::Latest)
|
||||
.await
|
||||
.context(KafkaGetOffsetSnafu {
|
||||
topic: topic.to_string(),
|
||||
partition: DEFAULT_PARTITION,
|
||||
})?;
|
||||
if end_offset > 0 {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
partition_client
|
||||
.produce(
|
||||
@@ -98,22 +125,28 @@ impl KafkaTopicCreator {
|
||||
)
|
||||
.await
|
||||
.context(ProduceRecordSnafu { topic })?;
|
||||
debug!("Appended a noop record to topic {}", topic);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Creates topics in Kafka.
|
||||
pub async fn create_topics(&self, topics: &[String]) -> Result<()> {
|
||||
let tasks = topics
|
||||
.iter()
|
||||
.map(|topic| async { self.create_topic(topic, &self.client).await })
|
||||
.collect::<Vec<_>>();
|
||||
futures::future::try_join_all(tasks).await.map(|_| ())
|
||||
}
|
||||
|
||||
/// Prepares topics in Kafka.
|
||||
/// 1. Creates missing topics.
|
||||
/// 2. Appends a noop record to each topic.
|
||||
pub async fn prepare_topics(&self, topics: &[&String]) -> Result<()> {
|
||||
///
|
||||
/// It appends a noop record to each topic if the topic is empty.
|
||||
pub async fn prepare_topics(&self, topics: &[String]) -> Result<()> {
|
||||
// Try to create missing topics.
|
||||
let tasks = topics
|
||||
.iter()
|
||||
.map(|topic| async {
|
||||
self.create_topic(topic, &self.client).await?;
|
||||
self.append_noop_record(topic, &self.client).await?;
|
||||
Ok(())
|
||||
})
|
||||
.map(|topic| async { self.prepare_topic(topic).await })
|
||||
.collect::<Vec<_>>();
|
||||
futures::future::try_join_all(tasks).await.map(|_| ())
|
||||
}
|
||||
@@ -129,34 +162,244 @@ impl KafkaTopicCreator {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl KafkaTopicCreator {
|
||||
pub async fn delete_topics(&self, topics: &[String]) -> Result<()> {
|
||||
let tasks = topics
|
||||
.iter()
|
||||
.map(|topic| async { self.delete_topic(topic, &self.client).await })
|
||||
.collect::<Vec<_>>();
|
||||
futures::future::try_join_all(tasks).await.map(|_| ())
|
||||
}
|
||||
|
||||
async fn delete_topic(&self, topic: &String, client: &Client) -> Result<()> {
|
||||
let controller = client
|
||||
.controller_client()
|
||||
.context(BuildKafkaCtrlClientSnafu)?;
|
||||
match controller.delete_topic(topic, 10).await {
|
||||
Ok(_) => {
|
||||
info!("Successfully deleted topic {}", topic);
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
if Self::is_unknown_topic_err(&e) {
|
||||
info!("The topic {} does not exist", topic);
|
||||
Ok(())
|
||||
} else {
|
||||
panic!("Failed to delete a topic {}, error: {}", topic, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn is_unknown_topic_err(e: &RsKafkaError) -> bool {
|
||||
matches!(
|
||||
e,
|
||||
&RsKafkaError::ServerError {
|
||||
protocol_error: rskafka::client::error::ProtocolError::UnknownTopicOrPartition,
|
||||
..
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn get_partition_client(&self, topic: &str) -> PartitionClient {
|
||||
self.partition_client(topic).await.unwrap()
|
||||
}
|
||||
}
|
||||
/// Builds a kafka [Client](rskafka::client::Client).
|
||||
pub async fn build_kafka_client(config: &MetasrvKafkaConfig) -> Result<Client> {
|
||||
pub async fn build_kafka_client(connection: &KafkaConnectionConfig) -> Result<Client> {
|
||||
// Builds an kafka controller client for creating topics.
|
||||
let broker_endpoints = common_wal::resolve_to_ipv4(&config.connection.broker_endpoints)
|
||||
let broker_endpoints = common_wal::resolve_to_ipv4(&connection.broker_endpoints)
|
||||
.await
|
||||
.context(ResolveKafkaEndpointSnafu)?;
|
||||
let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(DEFAULT_BACKOFF_CONFIG);
|
||||
if let Some(sasl) = &config.connection.sasl {
|
||||
if let Some(sasl) = &connection.sasl {
|
||||
builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
|
||||
};
|
||||
if let Some(tls) = &config.connection.tls {
|
||||
if let Some(tls) = &connection.tls {
|
||||
builder = builder.tls_config(tls.to_tls_config().await.context(TlsConfigSnafu)?)
|
||||
};
|
||||
builder
|
||||
.build()
|
||||
.await
|
||||
.with_context(|_| BuildKafkaClientSnafu {
|
||||
broker_endpoints: config.connection.broker_endpoints.clone(),
|
||||
broker_endpoints: connection.broker_endpoints.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Builds a [KafkaTopicCreator].
|
||||
pub async fn build_kafka_topic_creator(config: &MetasrvKafkaConfig) -> Result<KafkaTopicCreator> {
|
||||
let client = build_kafka_client(config).await?;
|
||||
pub async fn build_kafka_topic_creator(
|
||||
connection: &KafkaConnectionConfig,
|
||||
kafka_topic: &KafkaTopicConfig,
|
||||
) -> Result<KafkaTopicCreator> {
|
||||
let client = build_kafka_client(connection).await?;
|
||||
Ok(KafkaTopicCreator {
|
||||
client,
|
||||
num_partitions: config.kafka_topic.num_partitions,
|
||||
replication_factor: config.kafka_topic.replication_factor,
|
||||
create_topic_timeout: config.kafka_topic.create_topic_timeout.as_millis() as i32,
|
||||
num_partitions: kafka_topic.num_partitions,
|
||||
replication_factor: kafka_topic.replication_factor,
|
||||
create_topic_timeout: kafka_topic.create_topic_timeout.as_millis() as i32,
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
|
||||
use common_wal::maybe_skip_kafka_integration_test;
|
||||
use common_wal::test_util::get_kafka_endpoints;
|
||||
|
||||
use super::*;
|
||||
|
||||
async fn test_topic_creator(broker_endpoints: Vec<String>) -> KafkaTopicCreator {
|
||||
let connection = KafkaConnectionConfig {
|
||||
broker_endpoints,
|
||||
..Default::default()
|
||||
};
|
||||
let kafka_topic = KafkaTopicConfig::default();
|
||||
|
||||
build_kafka_topic_creator(&connection, &kafka_topic)
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
async fn append_records(partition_client: &PartitionClient, num_records: usize) -> Result<()> {
|
||||
for i in 0..num_records {
|
||||
partition_client
|
||||
.produce(
|
||||
vec![Record {
|
||||
key: Some(b"test".to_vec()),
|
||||
value: Some(format!("test {}", i).as_bytes().to_vec()),
|
||||
timestamp: chrono::Utc::now(),
|
||||
headers: Default::default(),
|
||||
}],
|
||||
Compression::Lz4,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_append_noop_record_to_empty_topic() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
maybe_skip_kafka_integration_test!();
|
||||
let prefix = "append_noop_record_to_empty_topic";
|
||||
let creator = test_topic_creator(get_kafka_endpoints()).await;
|
||||
|
||||
let topic = format!("{}{}", prefix, "0");
|
||||
// Clean up the topics before test
|
||||
creator.delete_topics(&[topic.to_string()]).await.unwrap();
|
||||
creator.create_topics(&[topic.to_string()]).await.unwrap();
|
||||
|
||||
let partition_client = creator.partition_client(&topic).await.unwrap();
|
||||
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
|
||||
assert_eq!(end_offset, 0);
|
||||
|
||||
// The topic is not empty, so no noop record is appended.
|
||||
creator
|
||||
.append_noop_record(&topic, &partition_client)
|
||||
.await
|
||||
.unwrap();
|
||||
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
|
||||
assert_eq!(end_offset, 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_append_noop_record_to_non_empty_topic() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
maybe_skip_kafka_integration_test!();
|
||||
let prefix = "append_noop_record_to_non_empty_topic";
|
||||
let creator = test_topic_creator(get_kafka_endpoints()).await;
|
||||
|
||||
let topic = format!("{}{}", prefix, "0");
|
||||
// Clean up the topics before test
|
||||
creator.delete_topics(&[topic.to_string()]).await.unwrap();
|
||||
|
||||
creator.create_topics(&[topic.to_string()]).await.unwrap();
|
||||
let partition_client = creator.partition_client(&topic).await.unwrap();
|
||||
append_records(&partition_client, 2).await.unwrap();
|
||||
|
||||
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
|
||||
assert_eq!(end_offset, 2);
|
||||
|
||||
// The topic is not empty, so no noop record is appended.
|
||||
creator
|
||||
.append_noop_record(&topic, &partition_client)
|
||||
.await
|
||||
.unwrap();
|
||||
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
|
||||
assert_eq!(end_offset, 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_topic() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
maybe_skip_kafka_integration_test!();
|
||||
let prefix = "create_topic";
|
||||
let creator = test_topic_creator(get_kafka_endpoints()).await;
|
||||
|
||||
let topic = format!("{}{}", prefix, "0");
|
||||
// Clean up the topics before test
|
||||
creator.delete_topics(&[topic.to_string()]).await.unwrap();
|
||||
|
||||
creator.create_topics(&[topic.to_string()]).await.unwrap();
|
||||
// Should be ok
|
||||
creator.create_topics(&[topic.to_string()]).await.unwrap();
|
||||
|
||||
let partition_client = creator.partition_client(&topic).await.unwrap();
|
||||
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
|
||||
assert_eq!(end_offset, 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_prepare_topic() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
maybe_skip_kafka_integration_test!();
|
||||
let prefix = "prepare_topic";
|
||||
let creator = test_topic_creator(get_kafka_endpoints()).await;
|
||||
|
||||
let topic = format!("{}{}", prefix, "0");
|
||||
// Clean up the topics before test
|
||||
creator.delete_topics(&[topic.to_string()]).await.unwrap();
|
||||
|
||||
creator.create_topics(&[topic.to_string()]).await.unwrap();
|
||||
creator.prepare_topic(&topic).await.unwrap();
|
||||
|
||||
let partition_client = creator.partition_client(&topic).await.unwrap();
|
||||
let start_offset = partition_client
|
||||
.get_offset(OffsetAt::Earliest)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(start_offset, 0);
|
||||
|
||||
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
|
||||
assert_eq!(end_offset, 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_prepare_topic_with_stale_records_without_pruning() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
maybe_skip_kafka_integration_test!();
|
||||
|
||||
let prefix = "prepare_topic_with_stale_records_without_pruning";
|
||||
let creator = test_topic_creator(get_kafka_endpoints()).await;
|
||||
|
||||
let topic = format!("{}{}", prefix, "0");
|
||||
// Clean up the topics before test
|
||||
creator.delete_topics(&[topic.to_string()]).await.unwrap();
|
||||
|
||||
creator.create_topics(&[topic.to_string()]).await.unwrap();
|
||||
let partition_client = creator.partition_client(&topic).await.unwrap();
|
||||
append_records(&partition_client, 10).await.unwrap();
|
||||
|
||||
creator.prepare_topic(&topic).await.unwrap();
|
||||
|
||||
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
|
||||
assert_eq!(end_offset, 10);
|
||||
let start_offset = partition_client
|
||||
.get_offset(OffsetAt::Earliest)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(start_offset, 0);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,24 +40,21 @@ impl KafkaTopicManager {
|
||||
Ok(topics)
|
||||
}
|
||||
|
||||
/// Restores topics from the key-value backend. and returns the topics that are not stored in kvbackend.
|
||||
pub async fn get_topics_to_create<'a>(
|
||||
&self,
|
||||
all_topics: &'a [String],
|
||||
) -> Result<Vec<&'a String>> {
|
||||
/// Returns the topics that are not prepared.
|
||||
pub async fn unprepare_topics(&self, all_topics: &[String]) -> Result<Vec<String>> {
|
||||
let existing_topics = self.restore_topics().await?;
|
||||
let existing_topic_set = existing_topics.iter().collect::<HashSet<_>>();
|
||||
let mut topics_to_create = Vec::with_capacity(all_topics.len());
|
||||
for topic in all_topics {
|
||||
if !existing_topic_set.contains(topic) {
|
||||
topics_to_create.push(topic);
|
||||
topics_to_create.push(topic.to_string());
|
||||
}
|
||||
}
|
||||
Ok(topics_to_create)
|
||||
}
|
||||
|
||||
/// Persists topics into the key-value backend.
|
||||
pub async fn persist_topics(&self, topics: &[String]) -> Result<()> {
|
||||
/// Persists prepared topics into the key-value backend.
|
||||
pub async fn persist_prepared_topics(&self, topics: &[String]) -> Result<()> {
|
||||
self.topic_name_manager
|
||||
.batch_put(
|
||||
topics
|
||||
@@ -70,6 +67,14 @@ impl KafkaTopicManager {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl KafkaTopicManager {
|
||||
/// Lists all topics in the key-value backend.
|
||||
pub async fn list_topics(&self) -> Result<Vec<String>> {
|
||||
self.topic_name_manager.range().await
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
@@ -90,11 +95,11 @@ mod tests {
|
||||
|
||||
// No legacy topics.
|
||||
let mut topics_to_be_created = topic_kvbackend_manager
|
||||
.get_topics_to_create(&all_topics)
|
||||
.unprepare_topics(&all_topics)
|
||||
.await
|
||||
.unwrap();
|
||||
topics_to_be_created.sort();
|
||||
let mut expected = all_topics.iter().collect::<Vec<_>>();
|
||||
let mut expected = all_topics.clone();
|
||||
expected.sort();
|
||||
assert_eq!(expected, topics_to_be_created);
|
||||
|
||||
@@ -109,7 +114,7 @@ mod tests {
|
||||
assert!(res.prev_kv.is_none());
|
||||
|
||||
let topics_to_be_created = topic_kvbackend_manager
|
||||
.get_topics_to_create(&all_topics)
|
||||
.unprepare_topics(&all_topics)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(topics_to_be_created.is_empty());
|
||||
@@ -144,21 +149,21 @@ mod tests {
|
||||
let topic_kvbackend_manager = KafkaTopicManager::new(kv_backend);
|
||||
|
||||
let mut topics_to_be_created = topic_kvbackend_manager
|
||||
.get_topics_to_create(&all_topics)
|
||||
.unprepare_topics(&all_topics)
|
||||
.await
|
||||
.unwrap();
|
||||
topics_to_be_created.sort();
|
||||
let mut expected = all_topics.iter().collect::<Vec<_>>();
|
||||
let mut expected = all_topics.clone();
|
||||
expected.sort();
|
||||
assert_eq!(expected, topics_to_be_created);
|
||||
|
||||
// Persists topics to kv backend.
|
||||
topic_kvbackend_manager
|
||||
.persist_topics(&all_topics)
|
||||
.persist_prepared_topics(&all_topics)
|
||||
.await
|
||||
.unwrap();
|
||||
let topics_to_be_created = topic_kvbackend_manager
|
||||
.get_topics_to_create(&all_topics)
|
||||
.unprepare_topics(&all_topics)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(topics_to_be_created.is_empty());
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
use std::fmt::{self, Formatter};
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::info;
|
||||
use common_wal::config::kafka::MetasrvKafkaConfig;
|
||||
use common_wal::TopicSelectorType;
|
||||
use snafu::ensure;
|
||||
@@ -77,27 +78,35 @@ impl KafkaTopicPool {
|
||||
}
|
||||
|
||||
/// Tries to activate the topic manager when metasrv becomes the leader.
|
||||
///
|
||||
/// First tries to restore persisted topics from the kv backend.
|
||||
/// If not enough topics retrieved, it will try to contact the Kafka cluster and request creating more topics.
|
||||
/// If there are unprepared topics (topics that exist in the configuration but not in the kv backend),
|
||||
/// it will create these topics in Kafka if `auto_create_topics` is enabled.
|
||||
///
|
||||
/// Then it prepares all unprepared topics by appending a noop record if the topic is empty,
|
||||
/// and persists them in the kv backend for future use.
|
||||
pub async fn activate(&self) -> Result<()> {
|
||||
if !self.auto_create_topics {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let num_topics = self.topics.len();
|
||||
ensure!(num_topics > 0, InvalidNumTopicsSnafu { num_topics });
|
||||
|
||||
let topics_to_be_created = self
|
||||
.topic_manager
|
||||
.get_topics_to_create(&self.topics)
|
||||
.await?;
|
||||
let unprepared_topics = self.topic_manager.unprepare_topics(&self.topics).await?;
|
||||
|
||||
if !topics_to_be_created.is_empty() {
|
||||
if !unprepared_topics.is_empty() {
|
||||
if self.auto_create_topics {
|
||||
info!("Creating {} topics", unprepared_topics.len());
|
||||
self.topic_creator.create_topics(&unprepared_topics).await?;
|
||||
} else {
|
||||
info!("Auto create topics is disabled, skipping topic creation.");
|
||||
}
|
||||
self.topic_creator
|
||||
.prepare_topics(&topics_to_be_created)
|
||||
.prepare_topics(&unprepared_topics)
|
||||
.await?;
|
||||
self.topic_manager
|
||||
.persist_prepared_topics(&unprepared_topics)
|
||||
.await?;
|
||||
self.topic_manager.persist_topics(&self.topics).await?;
|
||||
}
|
||||
info!("Activated topic pool with {} topics", self.topics.len());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -114,77 +123,147 @@ impl KafkaTopicPool {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl KafkaTopicPool {
|
||||
pub(crate) fn topic_manager(&self) -> &KafkaTopicManager {
|
||||
&self.topic_manager
|
||||
}
|
||||
|
||||
pub(crate) fn topic_creator(&self) -> &KafkaTopicCreator {
|
||||
&self.topic_creator
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
|
||||
use common_wal::test_util::run_test_with_kafka_wal;
|
||||
use std::assert_matches::assert_matches;
|
||||
|
||||
use common_wal::maybe_skip_kafka_integration_test;
|
||||
use common_wal::test_util::get_kafka_endpoints;
|
||||
|
||||
use super::*;
|
||||
use crate::kv_backend::memory::MemoryKvBackend;
|
||||
use crate::wal_options_allocator::topic_creator::build_kafka_topic_creator;
|
||||
use crate::error::Error;
|
||||
use crate::test_util::test_kafka_topic_pool;
|
||||
use crate::wal_options_allocator::selector::RoundRobinTopicSelector;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_pool_invalid_number_topics_err() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
maybe_skip_kafka_integration_test!();
|
||||
let endpoints = get_kafka_endpoints();
|
||||
|
||||
let pool = test_kafka_topic_pool(endpoints.clone(), 0, false, None).await;
|
||||
let err = pool.activate().await.unwrap_err();
|
||||
assert_matches!(err, Error::InvalidNumTopics { .. });
|
||||
|
||||
let pool = test_kafka_topic_pool(endpoints, 0, true, None).await;
|
||||
let err = pool.activate().await.unwrap_err();
|
||||
assert_matches!(err, Error::InvalidNumTopics { .. });
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_pool_activate_unknown_topics_err() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
maybe_skip_kafka_integration_test!();
|
||||
let pool =
|
||||
test_kafka_topic_pool(get_kafka_endpoints(), 1, false, Some("unknown_topic")).await;
|
||||
let err = pool.activate().await.unwrap_err();
|
||||
assert_matches!(err, Error::KafkaPartitionClient { .. });
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_pool_activate() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
maybe_skip_kafka_integration_test!();
|
||||
let pool =
|
||||
test_kafka_topic_pool(get_kafka_endpoints(), 2, true, Some("pool_activate")).await;
|
||||
// clean up the topics before test
|
||||
let topic_creator = pool.topic_creator();
|
||||
topic_creator.delete_topics(&pool.topics).await.unwrap();
|
||||
|
||||
let topic_manager = pool.topic_manager();
|
||||
pool.activate().await.unwrap();
|
||||
let topics = topic_manager.list_topics().await.unwrap();
|
||||
assert_eq!(topics.len(), 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_pool_activate_with_existing_topics() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
maybe_skip_kafka_integration_test!();
|
||||
let prefix = "pool_activate_with_existing_topics";
|
||||
let pool = test_kafka_topic_pool(get_kafka_endpoints(), 2, true, Some(prefix)).await;
|
||||
let topic_creator = pool.topic_creator();
|
||||
topic_creator.delete_topics(&pool.topics).await.unwrap();
|
||||
|
||||
let topic_manager = pool.topic_manager();
|
||||
// persists one topic info, then pool.activate() will create new topics that not persisted.
|
||||
topic_manager
|
||||
.persist_prepared_topics(&pool.topics[0..1])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
pool.activate().await.unwrap();
|
||||
let topics = topic_manager.list_topics().await.unwrap();
|
||||
assert_eq!(topics.len(), 2);
|
||||
|
||||
let client = pool.topic_creator().client();
|
||||
let topics = client
|
||||
.list_topics()
|
||||
.await
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.filter(|t| t.name.starts_with(prefix))
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(topics.len(), 1);
|
||||
}
|
||||
|
||||
/// Tests that the topic manager could allocate topics correctly.
|
||||
#[tokio::test]
|
||||
async fn test_alloc_topics() {
|
||||
run_test_with_kafka_wal(|broker_endpoints| {
|
||||
Box::pin(async {
|
||||
// Constructs topics that should be created.
|
||||
let topics = (0..256)
|
||||
.map(|i| format!("test_alloc_topics_{}_{}", i, uuid::Uuid::new_v4()))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Creates a topic manager.
|
||||
let kafka_topic = KafkaTopicConfig {
|
||||
replication_factor: broker_endpoints.len() as i16,
|
||||
..Default::default()
|
||||
};
|
||||
let config = MetasrvKafkaConfig {
|
||||
connection: KafkaConnectionConfig {
|
||||
broker_endpoints,
|
||||
..Default::default()
|
||||
},
|
||||
kafka_topic,
|
||||
..Default::default()
|
||||
};
|
||||
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
|
||||
let topic_creator = build_kafka_topic_creator(&config).await.unwrap();
|
||||
let mut topic_pool = KafkaTopicPool::new(&config, kv_backend, topic_creator);
|
||||
// Replaces the default topic pool with the constructed topics.
|
||||
topic_pool.topics.clone_from(&topics);
|
||||
// Replaces the default selector with a round-robin selector without shuffled.
|
||||
topic_pool.selector = Arc::new(RoundRobinTopicSelector::default());
|
||||
topic_pool.activate().await.unwrap();
|
||||
|
||||
// Selects exactly the number of `num_topics` topics one by one.
|
||||
let got = (0..topics.len())
|
||||
.map(|_| topic_pool.select().unwrap())
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(got, topics);
|
||||
|
||||
// Selects exactly the number of `num_topics` topics in a batching manner.
|
||||
let got = topic_pool
|
||||
.select_batch(topics.len())
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(ToString::to_string)
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(got, topics);
|
||||
|
||||
// Selects more than the number of `num_topics` topics.
|
||||
let got = topic_pool
|
||||
.select_batch(2 * topics.len())
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(ToString::to_string)
|
||||
.collect::<Vec<_>>();
|
||||
let expected = vec![topics.clone(); 2]
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(got, expected);
|
||||
})
|
||||
})
|
||||
common_telemetry::init_default_ut_logging();
|
||||
maybe_skip_kafka_integration_test!();
|
||||
let num_topics = 5;
|
||||
let mut topic_pool = test_kafka_topic_pool(
|
||||
get_kafka_endpoints(),
|
||||
num_topics,
|
||||
true,
|
||||
Some("test_allocator_with_kafka"),
|
||||
)
|
||||
.await;
|
||||
topic_pool.selector = Arc::new(RoundRobinTopicSelector::default());
|
||||
let topics = topic_pool.topics.clone();
|
||||
// clean up the topics before test
|
||||
let topic_creator = topic_pool.topic_creator();
|
||||
topic_creator.delete_topics(&topics).await.unwrap();
|
||||
|
||||
// Selects exactly the number of `num_topics` topics one by one.
|
||||
let got = (0..topics.len())
|
||||
.map(|_| topic_pool.select().unwrap())
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(got, topics);
|
||||
|
||||
// Selects exactly the number of `num_topics` topics in a batching manner.
|
||||
let got = topic_pool
|
||||
.select_batch(topics.len())
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(ToString::to_string)
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(got, topics);
|
||||
|
||||
// Selects more than the number of `num_topics` topics.
|
||||
let got = topic_pool
|
||||
.select_batch(2 * topics.len())
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(ToString::to_string)
|
||||
.collect::<Vec<_>>();
|
||||
let expected = vec![topics.clone(); 2]
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(got, expected);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,24 +68,46 @@ pub struct LoggingOptions {
|
||||
|
||||
/// The tracing sample ratio.
|
||||
pub tracing_sample_ratio: Option<TracingSampleOptions>,
|
||||
|
||||
/// The logging options of slow query.
|
||||
pub slow_query: SlowQueryOptions,
|
||||
}
|
||||
|
||||
/// The options of slow query.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
|
||||
#[serde(default)]
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
||||
pub struct SlowQueryOptions {
|
||||
/// Whether to enable slow query log.
|
||||
pub enable: bool,
|
||||
|
||||
/// The record type of slow queries.
|
||||
pub record_type: SlowQueriesRecordType,
|
||||
|
||||
/// The threshold of slow queries.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub threshold: Option<Duration>,
|
||||
|
||||
/// The sample ratio of slow queries.
|
||||
pub sample_ratio: Option<f64>,
|
||||
|
||||
/// The table TTL of `slow_queries` system table. Default is "30d".
|
||||
/// It's used when `record_type` is `SystemTable`.
|
||||
pub ttl: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for SlowQueryOptions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
enable: true,
|
||||
record_type: SlowQueriesRecordType::SystemTable,
|
||||
threshold: Some(Duration::from_secs(30)),
|
||||
sample_ratio: Some(1.0),
|
||||
ttl: Some("30d".to_string()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Copy, PartialEq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum SlowQueriesRecordType {
|
||||
SystemTable,
|
||||
Log,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
@@ -118,7 +140,6 @@ impl Default for LoggingOptions {
|
||||
otlp_endpoint: None,
|
||||
tracing_sample_ratio: None,
|
||||
append_stdout: true,
|
||||
slow_query: SlowQueryOptions::default(),
|
||||
// Rotation hourly, 24 files per day, keeps info log files of 30 days
|
||||
max_log_files: 720,
|
||||
}
|
||||
@@ -158,7 +179,8 @@ pub fn init_default_ut_logging() {
|
||||
"unittest",
|
||||
&opts,
|
||||
&TracingOptions::default(),
|
||||
None
|
||||
None,
|
||||
None,
|
||||
));
|
||||
|
||||
crate::info!("logs dir = {}", dir);
|
||||
@@ -176,6 +198,7 @@ pub fn init_global_logging(
|
||||
opts: &LoggingOptions,
|
||||
tracing_opts: &TracingOptions,
|
||||
node_id: Option<String>,
|
||||
slow_query_opts: Option<&SlowQueryOptions>,
|
||||
) -> Vec<WorkerGuard> {
|
||||
static START: Once = Once::new();
|
||||
let mut guards = vec![];
|
||||
@@ -278,50 +301,7 @@ pub fn init_global_logging(
|
||||
None
|
||||
};
|
||||
|
||||
let slow_query_logging_layer = if !opts.dir.is_empty() && opts.slow_query.enable {
|
||||
let rolling_appender = RollingFileAppender::builder()
|
||||
.rotation(Rotation::HOURLY)
|
||||
.filename_prefix("greptimedb-slow-queries")
|
||||
.max_log_files(opts.max_log_files)
|
||||
.build(&opts.dir)
|
||||
.unwrap_or_else(|e| {
|
||||
panic!(
|
||||
"initializing rolling file appender at {} failed: {}",
|
||||
&opts.dir, e
|
||||
)
|
||||
});
|
||||
let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
|
||||
guards.push(guard);
|
||||
|
||||
// Only logs if the field contains "slow".
|
||||
let slow_query_filter = FilterFn::new(|metadata| {
|
||||
metadata
|
||||
.fields()
|
||||
.iter()
|
||||
.any(|field| field.name().contains("slow"))
|
||||
});
|
||||
|
||||
if opts.log_format == LogFormat::Json {
|
||||
Some(
|
||||
Layer::new()
|
||||
.json()
|
||||
.with_writer(writer)
|
||||
.with_ansi(false)
|
||||
.with_filter(slow_query_filter)
|
||||
.boxed(),
|
||||
)
|
||||
} else {
|
||||
Some(
|
||||
Layer::new()
|
||||
.with_writer(writer)
|
||||
.with_ansi(false)
|
||||
.with_filter(slow_query_filter)
|
||||
.boxed(),
|
||||
)
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let slow_query_logging_layer = build_slow_query_logger(opts, slow_query_opts, &mut guards);
|
||||
|
||||
// resolve log level settings from:
|
||||
// - options from command line or config files
|
||||
@@ -435,3 +415,67 @@ pub fn init_global_logging(
|
||||
|
||||
guards
|
||||
}
|
||||
|
||||
fn build_slow_query_logger<S>(
|
||||
opts: &LoggingOptions,
|
||||
slow_query_opts: Option<&SlowQueryOptions>,
|
||||
guards: &mut Vec<WorkerGuard>,
|
||||
) -> Option<Box<dyn tracing_subscriber::Layer<S> + Send + Sync + 'static>>
|
||||
where
|
||||
S: tracing::Subscriber
|
||||
+ Send
|
||||
+ 'static
|
||||
+ for<'span> tracing_subscriber::registry::LookupSpan<'span>,
|
||||
{
|
||||
if let Some(slow_query_opts) = slow_query_opts {
|
||||
if !opts.dir.is_empty()
|
||||
&& slow_query_opts.enable
|
||||
&& slow_query_opts.record_type == SlowQueriesRecordType::Log
|
||||
{
|
||||
let rolling_appender = RollingFileAppender::builder()
|
||||
.rotation(Rotation::HOURLY)
|
||||
.filename_prefix("greptimedb-slow-queries")
|
||||
.max_log_files(opts.max_log_files)
|
||||
.build(&opts.dir)
|
||||
.unwrap_or_else(|e| {
|
||||
panic!(
|
||||
"initializing rolling file appender at {} failed: {}",
|
||||
&opts.dir, e
|
||||
)
|
||||
});
|
||||
let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
|
||||
guards.push(guard);
|
||||
|
||||
// Only logs if the field contains "slow".
|
||||
let slow_query_filter = FilterFn::new(|metadata| {
|
||||
metadata
|
||||
.fields()
|
||||
.iter()
|
||||
.any(|field| field.name().contains("slow"))
|
||||
});
|
||||
|
||||
if opts.log_format == LogFormat::Json {
|
||||
Some(
|
||||
Layer::new()
|
||||
.json()
|
||||
.with_writer(writer)
|
||||
.with_ansi(false)
|
||||
.with_filter(slow_query_filter)
|
||||
.boxed(),
|
||||
)
|
||||
} else {
|
||||
Some(
|
||||
Layer::new()
|
||||
.with_writer(writer)
|
||||
.with_ansi(false)
|
||||
.with_filter(slow_query_filter)
|
||||
.boxed(),
|
||||
)
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,11 +23,16 @@ use serde::{Deserialize, Serialize};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
/// The default backoff config for kafka client.
|
||||
///
|
||||
/// If the operation fails, the client will retry 3 times.
|
||||
/// The backoff time is 100ms, 300ms, 900ms.
|
||||
pub const DEFAULT_BACKOFF_CONFIG: BackoffConfig = BackoffConfig {
|
||||
init_backoff: Duration::from_millis(100),
|
||||
max_backoff: Duration::from_secs(10),
|
||||
base: 2.0,
|
||||
deadline: Some(Duration::from_secs(120)),
|
||||
max_backoff: Duration::from_secs(1),
|
||||
base: 3.0,
|
||||
// The deadline shouldn't be too long,
|
||||
// otherwise the client will block the worker loop for a long time.
|
||||
deadline: Some(Duration::from_secs(3)),
|
||||
};
|
||||
|
||||
/// Default interval for auto WAL pruning.
|
||||
|
||||
@@ -31,3 +31,33 @@ where
|
||||
|
||||
test(endpoints).await
|
||||
}
|
||||
|
||||
/// Get the kafka endpoints from the environment variable `GT_KAFKA_ENDPOINTS`.
|
||||
///
|
||||
/// The format of the environment variable is:
|
||||
/// ```
|
||||
/// GT_KAFKA_ENDPOINTS=localhost:9092,localhost:9093
|
||||
/// ```
|
||||
pub fn get_kafka_endpoints() -> Vec<String> {
|
||||
let endpoints = std::env::var("GT_KAFKA_ENDPOINTS").unwrap();
|
||||
endpoints
|
||||
.split(',')
|
||||
.map(|s| s.trim().to_string())
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
/// Skip the test if the environment variable `GT_KAFKA_ENDPOINTS` is not set.
|
||||
///
|
||||
/// The format of the environment variable is:
|
||||
/// ```
|
||||
/// GT_KAFKA_ENDPOINTS=localhost:9092,localhost:9093
|
||||
/// ```
|
||||
macro_rules! maybe_skip_kafka_integration_test {
|
||||
() => {
|
||||
if std::env::var("GT_KAFKA_ENDPOINTS").is_err() {
|
||||
common_telemetry::warn!("The endpoints is empty, skipping the test");
|
||||
return;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
13
src/common/workload/Cargo.toml
Normal file
13
src/common/workload/Cargo.toml
Normal file
@@ -0,0 +1,13 @@
|
||||
[package]
|
||||
name = "common-workload"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
api.workspace = true
|
||||
common-telemetry.workspace = true
|
||||
serde.workspace = true
|
||||
68
src/common/workload/src/lib.rs
Normal file
68
src/common/workload/src/lib.rs
Normal file
@@ -0,0 +1,68 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_telemetry::info;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// The workload type of the datanode.
|
||||
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum DatanodeWorkloadType {
|
||||
/// The datanode can handle all workloads.
|
||||
Hybrid = 0,
|
||||
}
|
||||
|
||||
impl DatanodeWorkloadType {
|
||||
/// Convert from `i32` to `DatanodeWorkloadType`.
|
||||
pub fn from_i32(value: i32) -> Option<Self> {
|
||||
match value {
|
||||
v if v == Self::Hybrid as i32 => Some(Self::Hybrid),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert from `DatanodeWorkloadType` to `i32`.
|
||||
pub fn to_i32(self) -> i32 {
|
||||
self as i32
|
||||
}
|
||||
|
||||
pub fn accept_ingest(&self) -> bool {
|
||||
matches!(self, Self::Hybrid)
|
||||
}
|
||||
}
|
||||
|
||||
/// Sanitize the workload types.
|
||||
pub fn sanitize_workload_types(workload_types: &mut Vec<DatanodeWorkloadType>) {
|
||||
if workload_types.is_empty() {
|
||||
info!("The workload types is empty, use Hybrid workload type");
|
||||
workload_types.push(DatanodeWorkloadType::Hybrid);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_sanitize_workload_types() {
|
||||
let hybrid = DatanodeWorkloadType::Hybrid;
|
||||
assert_eq!(hybrid as i32, 0);
|
||||
let hybrid_i32 = hybrid.to_i32();
|
||||
assert_eq!(hybrid_i32, 0);
|
||||
assert_eq!(DatanodeWorkloadType::from_i32(hybrid_i32), Some(hybrid));
|
||||
|
||||
let unexpected_i32 = 100;
|
||||
assert_eq!(DatanodeWorkloadType::from_i32(unexpected_i32), None);
|
||||
}
|
||||
}
|
||||
@@ -33,6 +33,7 @@ common-telemetry.workspace = true
|
||||
common-time.workspace = true
|
||||
common-version.workspace = true
|
||||
common-wal.workspace = true
|
||||
common-workload.workspace = true
|
||||
dashmap.workspace = true
|
||||
datafusion.workspace = true
|
||||
datafusion-common.workspace = true
|
||||
|
||||
@@ -22,6 +22,7 @@ use common_config::Configurable;
|
||||
pub use common_procedure::options::ProcedureConfig;
|
||||
use common_telemetry::logging::{LoggingOptions, TracingOptions};
|
||||
use common_wal::config::DatanodeWalConfig;
|
||||
use common_workload::{sanitize_workload_types, DatanodeWorkloadType};
|
||||
use file_engine::config::EngineConfig as FileEngineConfig;
|
||||
use meta_client::MetaClientOptions;
|
||||
use metric_engine::config::EngineConfig as MetricEngineConfig;
|
||||
@@ -360,6 +361,7 @@ impl Default for ObjectStoreConfig {
|
||||
#[serde(default)]
|
||||
pub struct DatanodeOptions {
|
||||
pub node_id: Option<u64>,
|
||||
pub workload_types: Vec<DatanodeWorkloadType>,
|
||||
pub require_lease_before_startup: bool,
|
||||
pub init_regions_in_background: bool,
|
||||
pub init_regions_parallelism: usize,
|
||||
@@ -391,11 +393,19 @@ pub struct DatanodeOptions {
|
||||
pub rpc_max_send_message_size: Option<ReadableSize>,
|
||||
}
|
||||
|
||||
impl DatanodeOptions {
|
||||
/// Sanitize the `DatanodeOptions` to ensure the config is valid.
|
||||
pub fn sanitize(&mut self) {
|
||||
sanitize_workload_types(&mut self.workload_types);
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for DatanodeOptions {
|
||||
#[allow(deprecated)]
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
node_id: None,
|
||||
workload_types: vec![DatanodeWorkloadType::Hybrid],
|
||||
require_lease_before_startup: false,
|
||||
init_regions_in_background: false,
|
||||
init_regions_parallelism: 16,
|
||||
|
||||
@@ -17,7 +17,8 @@ use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
|
||||
use api::v1::meta::heartbeat_request::NodeWorkloads;
|
||||
use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
|
||||
use common_base::Plugins;
|
||||
use common_meta::cache_invalidator::CacheInvalidatorRef;
|
||||
use common_meta::datanode::REGION_STATISTIC_KEY;
|
||||
@@ -30,6 +31,7 @@ use common_meta::heartbeat::handler::{
|
||||
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef};
|
||||
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
|
||||
use common_telemetry::{debug, error, info, trace, warn};
|
||||
use common_workload::DatanodeWorkloadType;
|
||||
use meta_client::client::{HeartbeatSender, MetaClient};
|
||||
use meta_client::MetaClientRef;
|
||||
use servers::addrs;
|
||||
@@ -51,6 +53,7 @@ pub(crate) mod task_tracker;
|
||||
/// The datanode heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background.
|
||||
pub struct HeartbeatTask {
|
||||
node_id: u64,
|
||||
workload_types: Vec<DatanodeWorkloadType>,
|
||||
node_epoch: u64,
|
||||
peer_addr: String,
|
||||
running: Arc<AtomicBool>,
|
||||
@@ -91,6 +94,7 @@ impl HeartbeatTask {
|
||||
|
||||
Ok(Self {
|
||||
node_id: opts.node_id.unwrap_or(0),
|
||||
workload_types: opts.workload_types.clone(),
|
||||
// We use datanode's start time millis as the node's epoch.
|
||||
node_epoch: common_time::util::current_time_millis() as u64,
|
||||
peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
|
||||
@@ -221,6 +225,7 @@ impl HeartbeatTask {
|
||||
addr: addr.clone(),
|
||||
});
|
||||
let epoch = self.region_alive_keeper.epoch();
|
||||
let workload_types = self.workload_types.clone();
|
||||
|
||||
self.region_alive_keeper.start(Some(event_receiver)).await?;
|
||||
let mut last_sent = Instant::now();
|
||||
@@ -239,6 +244,9 @@ impl HeartbeatTask {
|
||||
start_time_ms: node_epoch,
|
||||
cpus: num_cpus::get() as u32,
|
||||
}),
|
||||
node_workloads: Some(NodeWorkloads::Datanode(DatanodeWorkloads {
|
||||
types: workload_types.iter().map(|w| w.to_i32()).collect(),
|
||||
})),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_telemetry::{info, warn};
|
||||
use mito2::access_layer::{ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR};
|
||||
use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer};
|
||||
use object_store::services::Fs;
|
||||
use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
|
||||
@@ -168,9 +169,13 @@ async fn build_cache_layer(
|
||||
if let Some(path) = cache_path.as_ref()
|
||||
&& !path.trim().is_empty()
|
||||
{
|
||||
let atomic_temp_dir = join_dir(path, ".tmp/");
|
||||
let atomic_temp_dir = join_dir(path, ATOMIC_WRITE_DIR);
|
||||
clean_temp_dir(&atomic_temp_dir)?;
|
||||
|
||||
// Compatible code. Remove this after a major release.
|
||||
let old_atomic_temp_dir = join_dir(path, OLD_ATOMIC_WRITE_DIR);
|
||||
clean_temp_dir(&old_atomic_temp_dir)?;
|
||||
|
||||
let cache_store = Fs::default()
|
||||
.root(path)
|
||||
.atomic_write_dir(&atomic_temp_dir)
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
use std::{fs, path};
|
||||
|
||||
use common_telemetry::info;
|
||||
use mito2::access_layer::{ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR};
|
||||
use object_store::services::Fs;
|
||||
use object_store::util::join_dir;
|
||||
use object_store::ObjectStore;
|
||||
@@ -33,9 +34,13 @@ pub async fn new_fs_object_store(
|
||||
.context(error::CreateDirSnafu { dir: data_home })?;
|
||||
info!("The file storage home is: {}", data_home);
|
||||
|
||||
let atomic_write_dir = join_dir(data_home, ".tmp/");
|
||||
let atomic_write_dir = join_dir(data_home, ATOMIC_WRITE_DIR);
|
||||
store::clean_temp_dir(&atomic_write_dir)?;
|
||||
|
||||
// Compatible code. Remove this after a major release.
|
||||
let old_atomic_temp_dir = join_dir(data_home, OLD_ATOMIC_WRITE_DIR);
|
||||
store::clean_temp_dir(&old_atomic_temp_dir)?;
|
||||
|
||||
let builder = Fs::default()
|
||||
.root(data_home)
|
||||
.atomic_write_dir(&atomic_write_dir);
|
||||
|
||||
@@ -16,8 +16,8 @@ use std::any::Any;
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::array::Array;
|
||||
use arrow::datatypes::Int32Type;
|
||||
use arrow_array::{ArrayRef, DictionaryArray, Int32Array};
|
||||
use arrow::datatypes::Int64Type;
|
||||
use arrow_array::{ArrayRef, DictionaryArray, Int64Array};
|
||||
use serde_json::Value as JsonValue;
|
||||
use snafu::ResultExt;
|
||||
|
||||
@@ -32,7 +32,7 @@ use crate::vectors::{self, Helper, Validity, Vector, VectorRef};
|
||||
/// Vector of dictionaries, basically backed by Arrow's `DictionaryArray`.
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct DictionaryVector {
|
||||
array: DictionaryArray<Int32Type>,
|
||||
array: DictionaryArray<Int64Type>,
|
||||
/// The datatype of the items in the dictionary.
|
||||
item_type: ConcreteDataType,
|
||||
/// The vector of items in the dictionary.
|
||||
@@ -41,7 +41,7 @@ pub struct DictionaryVector {
|
||||
|
||||
impl DictionaryVector {
|
||||
/// Create a new instance of `DictionaryVector` from a dictionary array and item type
|
||||
pub fn new(array: DictionaryArray<Int32Type>, item_type: ConcreteDataType) -> Result<Self> {
|
||||
pub fn new(array: DictionaryArray<Int64Type>, item_type: ConcreteDataType) -> Result<Self> {
|
||||
let item_vector = Helper::try_into_vector(array.values())?;
|
||||
|
||||
Ok(Self {
|
||||
@@ -52,12 +52,12 @@ impl DictionaryVector {
|
||||
}
|
||||
|
||||
/// Returns the underlying Arrow dictionary array
|
||||
pub fn array(&self) -> &DictionaryArray<Int32Type> {
|
||||
pub fn array(&self) -> &DictionaryArray<Int64Type> {
|
||||
&self.array
|
||||
}
|
||||
|
||||
/// Returns the keys array of this dictionary
|
||||
pub fn keys(&self) -> &arrow_array::PrimitiveArray<Int32Type> {
|
||||
pub fn keys(&self) -> &arrow_array::PrimitiveArray<Int64Type> {
|
||||
self.array.keys()
|
||||
}
|
||||
|
||||
@@ -74,7 +74,7 @@ impl DictionaryVector {
|
||||
impl Vector for DictionaryVector {
|
||||
fn data_type(&self) -> ConcreteDataType {
|
||||
ConcreteDataType::Dictionary(DictionaryType::new(
|
||||
ConcreteDataType::int32_datatype(),
|
||||
ConcreteDataType::int64_datatype(),
|
||||
self.item_type.clone(),
|
||||
))
|
||||
}
|
||||
@@ -163,10 +163,10 @@ impl Serializable for DictionaryVector {
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<DictionaryArray<Int32Type>> for DictionaryVector {
|
||||
impl TryFrom<DictionaryArray<Int64Type>> for DictionaryVector {
|
||||
type Error = crate::error::Error;
|
||||
|
||||
fn try_from(array: DictionaryArray<Int32Type>) -> Result<Self> {
|
||||
fn try_from(array: DictionaryArray<Int64Type>) -> Result<Self> {
|
||||
let item_type = ConcreteDataType::from_arrow_type(array.values().data_type());
|
||||
let item_vector = Helper::try_into_vector(array.values())?;
|
||||
|
||||
@@ -243,7 +243,7 @@ impl VectorOp for DictionaryVector {
|
||||
previous_offset = offset;
|
||||
}
|
||||
|
||||
let new_keys = Int32Array::from(replicated_keys);
|
||||
let new_keys = Int64Array::from(replicated_keys);
|
||||
let new_array = DictionaryArray::try_new(new_keys, self.values().clone())
|
||||
.expect("Failed to create replicated dictionary array");
|
||||
|
||||
@@ -261,7 +261,7 @@ impl VectorOp for DictionaryVector {
|
||||
let filtered_key_array = filtered_key_vector.to_arrow_array();
|
||||
let filtered_key_array = filtered_key_array
|
||||
.as_any()
|
||||
.downcast_ref::<Int32Array>()
|
||||
.downcast_ref::<Int64Array>()
|
||||
.unwrap();
|
||||
|
||||
let new_array = DictionaryArray::try_new(filtered_key_array.clone(), self.values().clone())
|
||||
@@ -291,7 +291,7 @@ impl VectorOp for DictionaryVector {
|
||||
let key_vector = Helper::try_into_vector(&key_array)?;
|
||||
let new_key_vector = key_vector.take(indices)?;
|
||||
let new_key_array = new_key_vector.to_arrow_array();
|
||||
let new_key_array = new_key_array.as_any().downcast_ref::<Int32Array>().unwrap();
|
||||
let new_key_array = new_key_array.as_any().downcast_ref::<Int64Array>().unwrap();
|
||||
|
||||
let new_array = DictionaryArray::try_new(new_key_array.clone(), self.values().clone())
|
||||
.expect("Failed to create filtered dictionary array");
|
||||
@@ -318,7 +318,7 @@ mod tests {
|
||||
// Keys: [0, 1, 2, null, 1, 3]
|
||||
// Resulting in: ["a", "b", "c", null, "b", "d"]
|
||||
let values = StringArray::from(vec!["a", "b", "c", "d"]);
|
||||
let keys = Int32Array::from(vec![Some(0), Some(1), Some(2), None, Some(1), Some(3)]);
|
||||
let keys = Int64Array::from(vec![Some(0), Some(1), Some(2), None, Some(1), Some(3)]);
|
||||
let dict_array = DictionaryArray::new(keys, Arc::new(values));
|
||||
DictionaryVector::try_from(dict_array).unwrap()
|
||||
}
|
||||
@@ -404,7 +404,7 @@ mod tests {
|
||||
assert_eq!(
|
||||
casted.data_type(),
|
||||
ConcreteDataType::Dictionary(DictionaryType::new(
|
||||
ConcreteDataType::int32_datatype(),
|
||||
ConcreteDataType::int64_datatype(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
))
|
||||
);
|
||||
|
||||
@@ -20,7 +20,7 @@ use std::sync::Arc;
|
||||
use arrow::array::{Array, ArrayRef, StringArray};
|
||||
use arrow::compute;
|
||||
use arrow::compute::kernels::comparison;
|
||||
use arrow::datatypes::{DataType as ArrowDataType, Int32Type, TimeUnit};
|
||||
use arrow::datatypes::{DataType as ArrowDataType, Int64Type, TimeUnit};
|
||||
use arrow_array::DictionaryArray;
|
||||
use arrow_schema::IntervalUnit;
|
||||
use datafusion_common::ScalarValue;
|
||||
@@ -348,11 +348,11 @@ impl Helper {
|
||||
ArrowDataType::Decimal128(_, _) => {
|
||||
Arc::new(Decimal128Vector::try_from_arrow_array(array)?)
|
||||
}
|
||||
ArrowDataType::Dictionary(key, value) if matches!(&**key, ArrowDataType::Int32) => {
|
||||
ArrowDataType::Dictionary(key, value) if matches!(&**key, ArrowDataType::Int64) => {
|
||||
let array = array
|
||||
.as_ref()
|
||||
.as_any()
|
||||
.downcast_ref::<DictionaryArray<Int32Type>>()
|
||||
.downcast_ref::<DictionaryArray<Int64Type>>()
|
||||
.unwrap(); // Safety: the type is guarded by match arm condition
|
||||
Arc::new(DictionaryVector::new(
|
||||
array.clone(),
|
||||
|
||||
@@ -59,6 +59,7 @@ partition.workspace = true
|
||||
prometheus.workspace = true
|
||||
prost.workspace = true
|
||||
query.workspace = true
|
||||
rand.workspace = true
|
||||
serde.workspace = true
|
||||
servers.workspace = true
|
||||
session.workspace = true
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
//! impl `FlowNode` trait for FlowNodeManager so standalone can call them
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::flow::{
|
||||
@@ -41,9 +42,9 @@ use crate::batching_mode::engine::BatchingEngine;
|
||||
use crate::batching_mode::{FRONTEND_SCAN_TIMEOUT, MIN_REFRESH_DURATION};
|
||||
use crate::engine::FlowEngine;
|
||||
use crate::error::{
|
||||
CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, IllegalCheckTaskStateSnafu,
|
||||
InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu, NoAvailableFrontendSnafu,
|
||||
SyncCheckTaskSnafu, UnexpectedSnafu,
|
||||
CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, FlowNotRecoveredSnafu,
|
||||
IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
|
||||
NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::metrics::METRIC_FLOW_TASK_COUNT;
|
||||
use crate::repr::{self, DiffRow};
|
||||
@@ -65,6 +66,7 @@ pub struct FlowDualEngine {
|
||||
catalog_manager: Arc<dyn CatalogManager>,
|
||||
check_task: tokio::sync::Mutex<Option<ConsistentCheckTask>>,
|
||||
plugins: Plugins,
|
||||
done_recovering: AtomicBool,
|
||||
}
|
||||
|
||||
impl FlowDualEngine {
|
||||
@@ -83,9 +85,55 @@ impl FlowDualEngine {
|
||||
catalog_manager,
|
||||
check_task: Mutex::new(None),
|
||||
plugins,
|
||||
done_recovering: AtomicBool::new(false),
|
||||
}
|
||||
}
|
||||
|
||||
/// Set `done_recovering` to true
|
||||
/// indicate that we are ready to handle requests
|
||||
pub fn set_done_recovering(&self) {
|
||||
info!("FlowDualEngine done recovering");
|
||||
self.done_recovering
|
||||
.store(true, std::sync::atomic::Ordering::Release);
|
||||
}
|
||||
|
||||
/// Check if `done_recovering` is true
|
||||
pub fn is_recover_done(&self) -> bool {
|
||||
self.done_recovering
|
||||
.load(std::sync::atomic::Ordering::Acquire)
|
||||
}
|
||||
|
||||
/// wait for recovering to be done, this will only happen when flownode just started
|
||||
async fn wait_for_all_flow_recover(&self, waiting_req_cnt: usize) -> Result<(), Error> {
|
||||
if self.is_recover_done() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
warn!(
|
||||
"FlowDualEngine is not done recovering, {} insert request waiting for recovery",
|
||||
waiting_req_cnt
|
||||
);
|
||||
// wait 3 seconds, check every 1 second
|
||||
// TODO(discord9): make this configurable
|
||||
let mut retry = 0;
|
||||
let max_retry = 3;
|
||||
while retry < max_retry && !self.is_recover_done() {
|
||||
warn!(
|
||||
"FlowDualEngine is not done recovering, retry {} in 1s",
|
||||
retry
|
||||
);
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
retry += 1;
|
||||
}
|
||||
if retry == max_retry {
|
||||
return FlowNotRecoveredSnafu.fail();
|
||||
} else {
|
||||
info!("FlowDualEngine is done recovering");
|
||||
}
|
||||
// TODO(discord9): also put to centralized logging for flow once it implemented
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn plugins(&self) -> &Plugins {
|
||||
&self.plugins
|
||||
}
|
||||
@@ -243,7 +291,7 @@ impl FlowDualEngine {
|
||||
to_be_created
|
||||
);
|
||||
let mut errors = vec![];
|
||||
for flow_id in to_be_created {
|
||||
for flow_id in to_be_created.clone() {
|
||||
let flow_id = *flow_id;
|
||||
let info = self
|
||||
.flow_metadata_manager
|
||||
@@ -302,6 +350,10 @@ impl FlowDualEngine {
|
||||
errors.push((flow_id, err));
|
||||
}
|
||||
}
|
||||
if errors.is_empty() {
|
||||
info!("Recover flows successfully, flows: {:?}", to_be_created);
|
||||
}
|
||||
|
||||
for (flow_id, err) in errors {
|
||||
warn!("Failed to recreate flow {}, err={:#?}", flow_id, err);
|
||||
}
|
||||
@@ -410,6 +462,8 @@ impl ConsistentCheckTask {
|
||||
tokio::time::sleep(MIN_REFRESH_DURATION).await;
|
||||
}
|
||||
|
||||
engine.set_done_recovering();
|
||||
|
||||
// then do check flows, with configurable allow_create and allow_drop
|
||||
let (mut allow_create, mut allow_drop) = (false, false);
|
||||
let mut ret_signal: Option<tokio::sync::oneshot::Sender<()>> = None;
|
||||
@@ -629,11 +683,14 @@ impl FlowEngine for FlowDualEngine {
|
||||
&self,
|
||||
request: api::v1::region::InsertRequests,
|
||||
) -> Result<(), Error> {
|
||||
self.wait_for_all_flow_recover(request.requests.len())
|
||||
.await?;
|
||||
// TODO(discord9): make as little clone as possible
|
||||
let mut to_stream_engine = Vec::with_capacity(request.requests.len());
|
||||
let mut to_batch_engine = request.requests;
|
||||
|
||||
{
|
||||
// not locking this, or recover flows will be starved when also handling flow inserts
|
||||
let src_table2flow = self.src_table2flow.read().await;
|
||||
to_batch_engine.retain(|req| {
|
||||
let region_id = RegionId::from(req.region_id);
|
||||
|
||||
@@ -330,7 +330,7 @@ impl BatchingEngine {
|
||||
let frontend = self.frontend_client.clone();
|
||||
|
||||
// check execute once first to detect any error early
|
||||
task.check_execute(&engine, &frontend).await?;
|
||||
task.check_or_create_sink_table(&engine, &frontend).await?;
|
||||
|
||||
// TODO(discord9): use time wheel or what for better
|
||||
let handle = common_runtime::spawn_global(async move {
|
||||
|
||||
@@ -27,8 +27,9 @@ use common_meta::peer::Peer;
|
||||
use common_meta::rpc::store::RangeRequest;
|
||||
use common_query::Output;
|
||||
use common_telemetry::warn;
|
||||
use itertools::Itertools;
|
||||
use meta_client::client::MetaClient;
|
||||
use rand::rng;
|
||||
use rand::seq::SliceRandom;
|
||||
use servers::query_handler::grpc::GrpcQueryHandler;
|
||||
use session::context::{QueryContextBuilder, QueryContextRef};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
@@ -180,8 +181,9 @@ impl FrontendClient {
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
/// Get the database with maximum `last_activity_ts`& is able to process query
|
||||
async fn get_latest_active_frontend(
|
||||
/// Get the frontend with recent enough(less than 1 minute from now) `last_activity_ts`
|
||||
/// and is able to process query
|
||||
async fn get_random_active_frontend(
|
||||
&self,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
@@ -201,17 +203,17 @@ impl FrontendClient {
|
||||
let mut interval = tokio::time::interval(GRPC_CONN_TIMEOUT);
|
||||
interval.tick().await;
|
||||
for retry in 0..GRPC_MAX_RETRIES {
|
||||
let frontends = self.scan_for_frontend().await?;
|
||||
let mut frontends = self.scan_for_frontend().await?;
|
||||
let now_in_ms = SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis() as i64;
|
||||
// shuffle the frontends to avoid always pick the same one
|
||||
frontends.shuffle(&mut rng());
|
||||
|
||||
// found node with maximum last_activity_ts
|
||||
for (_, node_info) in frontends
|
||||
.iter()
|
||||
.sorted_by_key(|(_, node_info)| node_info.last_activity_ts)
|
||||
.rev()
|
||||
// filter out frontend that have been down for more than 1 min
|
||||
.filter(|(_, node_info)| {
|
||||
node_info.last_activity_ts + FRONTEND_ACTIVITY_TIMEOUT.as_millis() as i64
|
||||
@@ -277,7 +279,7 @@ impl FrontendClient {
|
||||
) -> Result<u32, Error> {
|
||||
match self {
|
||||
FrontendClient::Distributed { .. } => {
|
||||
let db = self.get_latest_active_frontend(catalog, schema).await?;
|
||||
let db = self.get_random_active_frontend(catalog, schema).await?;
|
||||
|
||||
*peer_desc = Some(PeerDesc::Dist {
|
||||
peer: db.peer.clone(),
|
||||
|
||||
@@ -142,26 +142,12 @@ impl BatchingTask {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Test execute, for check syntax or such
|
||||
pub async fn check_execute(
|
||||
/// Create sink table if not exists
|
||||
pub async fn check_or_create_sink_table(
|
||||
&self,
|
||||
engine: &QueryEngineRef,
|
||||
frontend_client: &Arc<FrontendClient>,
|
||||
) -> Result<Option<(u32, Duration)>, Error> {
|
||||
// use current time to test get a dirty time window, which should be safe
|
||||
let start = SystemTime::now();
|
||||
let ts = Timestamp::new_second(
|
||||
start
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.expect("Time went backwards")
|
||||
.as_secs() as _,
|
||||
);
|
||||
self.state
|
||||
.write()
|
||||
.unwrap()
|
||||
.dirty_time_windows
|
||||
.add_lower_bounds(vec![ts].into_iter());
|
||||
|
||||
if !self.is_table_exist(&self.config.sink_table_name).await? {
|
||||
let create_table = self.gen_create_table_expr(engine.clone()).await?;
|
||||
info!(
|
||||
@@ -174,7 +160,8 @@ impl BatchingTask {
|
||||
self.config.sink_table_name.join(".")
|
||||
);
|
||||
}
|
||||
self.gen_exec_once(engine, frontend_client).await
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
async fn is_table_exist(&self, table_name: &[String; 3]) -> Result<bool, Error> {
|
||||
|
||||
@@ -65,6 +65,7 @@ impl DataflowState {
|
||||
/// schedule all subgraph that need to run with time <= `as_of` and run_available()
|
||||
///
|
||||
/// return true if any subgraph actually executed
|
||||
#[allow(clippy::swap_with_temporary)]
|
||||
pub fn run_available_with_schedule(&mut self, df: &mut Dfir) -> bool {
|
||||
// first split keys <= as_of into another map
|
||||
let mut before = self
|
||||
|
||||
@@ -46,6 +46,12 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Flow engine is still recovering"))]
|
||||
FlowNotRecovered {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Error encountered while creating flow: {sql}"))]
|
||||
CreateFlow {
|
||||
sql: String,
|
||||
@@ -310,7 +316,8 @@ impl ErrorExt for Error {
|
||||
| Self::JoinTask { .. }
|
||||
| Self::Datafusion { .. }
|
||||
| Self::InsertIntoFlow { .. }
|
||||
| Self::NoAvailableFrontend { .. } => StatusCode::Internal,
|
||||
| Self::NoAvailableFrontend { .. }
|
||||
| Self::FlowNotRecovered { .. } => StatusCode::Internal,
|
||||
Self::FlowAlreadyExist { .. } => StatusCode::TableAlreadyExists,
|
||||
Self::TableNotFound { .. }
|
||||
| Self::TableNotFoundMeta { .. }
|
||||
|
||||
@@ -43,7 +43,7 @@ use servers::error::{StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu};
|
||||
use servers::http::HttpServerBuilder;
|
||||
use servers::metrics_handler::MetricsHandler;
|
||||
use servers::server::{ServerHandler, ServerHandlers};
|
||||
use session::context::{QueryContextBuilder, QueryContextRef};
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::{broadcast, oneshot, Mutex};
|
||||
@@ -54,19 +54,15 @@ use tonic::{Request, Response, Status};
|
||||
use crate::adapter::flownode_impl::{FlowDualEngine, FlowDualEngineRef};
|
||||
use crate::adapter::{create_worker, FlowStreamingEngineRef};
|
||||
use crate::batching_mode::engine::BatchingEngine;
|
||||
use crate::engine::FlowEngine;
|
||||
use crate::error::{
|
||||
to_status_with_last_err, CacheRequiredSnafu, CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu,
|
||||
IllegalAuthConfigSnafu, ListFlowsSnafu, ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu,
|
||||
UnexpectedSnafu,
|
||||
to_status_with_last_err, CacheRequiredSnafu, ExternalSnafu, IllegalAuthConfigSnafu,
|
||||
ListFlowsSnafu, ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::heartbeat::HeartbeatTask;
|
||||
use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS};
|
||||
use crate::transform::register_function_to_query_engine;
|
||||
use crate::utils::{SizeReportSender, StateReportHandler};
|
||||
use crate::{
|
||||
CreateFlowArgs, Error, FlowAuthHeader, FlownodeOptions, FrontendClient, StreamingEngine,
|
||||
};
|
||||
use crate::{Error, FlowAuthHeader, FlownodeOptions, FrontendClient, StreamingEngine};
|
||||
|
||||
pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER";
|
||||
/// wrapping flow node manager to avoid orphan rule with Arc<...>
|
||||
@@ -416,109 +412,6 @@ impl FlownodeBuilder {
|
||||
Ok(instance)
|
||||
}
|
||||
|
||||
/// recover all flow tasks in this flownode in distributed mode(nodeid is Some(<num>))
|
||||
///
|
||||
/// or recover all existing flow tasks if in standalone mode(nodeid is None)
|
||||
///
|
||||
/// TODO(discord9): persistent flow tasks with internal state
|
||||
async fn recover_flows(&self, manager: &FlowDualEngine) -> Result<usize, Error> {
|
||||
let nodeid = self.opts.node_id;
|
||||
let to_be_recovered: Vec<_> = if let Some(nodeid) = nodeid {
|
||||
let to_be_recover = self
|
||||
.flow_metadata_manager
|
||||
.flownode_flow_manager()
|
||||
.flows(nodeid)
|
||||
.try_collect::<Vec<_>>()
|
||||
.await
|
||||
.context(ListFlowsSnafu { id: Some(nodeid) })?;
|
||||
to_be_recover.into_iter().map(|(id, _)| id).collect()
|
||||
} else {
|
||||
let all_catalogs = self
|
||||
.catalog_manager
|
||||
.catalog_names()
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
let mut all_flow_ids = vec![];
|
||||
for catalog in all_catalogs {
|
||||
let flows = self
|
||||
.flow_metadata_manager
|
||||
.flow_name_manager()
|
||||
.flow_names(&catalog)
|
||||
.await
|
||||
.try_collect::<Vec<_>>()
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
|
||||
all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id()));
|
||||
}
|
||||
all_flow_ids
|
||||
};
|
||||
let cnt = to_be_recovered.len();
|
||||
|
||||
// TODO(discord9): recover in parallel
|
||||
info!("Recovering {} flows: {:?}", cnt, to_be_recovered);
|
||||
for flow_id in to_be_recovered {
|
||||
let info = self
|
||||
.flow_metadata_manager
|
||||
.flow_info_manager()
|
||||
.get(flow_id)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?
|
||||
.context(FlowNotFoundSnafu { id: flow_id })?;
|
||||
|
||||
let sink_table_name = [
|
||||
info.sink_table_name().catalog_name.clone(),
|
||||
info.sink_table_name().schema_name.clone(),
|
||||
info.sink_table_name().table_name.clone(),
|
||||
];
|
||||
|
||||
let args = CreateFlowArgs {
|
||||
flow_id: flow_id as _,
|
||||
sink_table_name,
|
||||
source_table_ids: info.source_table_ids().to_vec(),
|
||||
// because recover should only happen on restart the `create_if_not_exists` and `or_replace` can be arbitrary value(since flow doesn't exist)
|
||||
// but for the sake of consistency and to make sure recover of flow actually happen, we set both to true
|
||||
// (which is also fine since checks for not allow both to be true is on metasrv and we already pass that)
|
||||
create_if_not_exists: true,
|
||||
or_replace: true,
|
||||
expire_after: info.expire_after(),
|
||||
comment: Some(info.comment().clone()),
|
||||
sql: info.raw_sql().clone(),
|
||||
flow_options: info.options().clone(),
|
||||
query_ctx: info
|
||||
.query_context()
|
||||
.clone()
|
||||
.map(|ctx| {
|
||||
ctx.try_into()
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)
|
||||
})
|
||||
.transpose()?
|
||||
// or use default QueryContext with catalog_name from info
|
||||
// to keep compatibility with old version
|
||||
.or_else(|| {
|
||||
Some(
|
||||
QueryContextBuilder::default()
|
||||
.current_catalog(info.catalog_name().to_string())
|
||||
.build(),
|
||||
)
|
||||
}),
|
||||
};
|
||||
manager
|
||||
.create_flow(args)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.with_context(|_| CreateFlowSnafu {
|
||||
sql: info.raw_sql().clone(),
|
||||
})?;
|
||||
}
|
||||
|
||||
Ok(cnt)
|
||||
}
|
||||
|
||||
/// build [`FlowWorkerManager`], note this doesn't take ownership of `self`,
|
||||
/// nor does it actually start running the worker.
|
||||
async fn build_manager(
|
||||
|
||||
@@ -56,6 +56,7 @@ prometheus.workspace = true
|
||||
promql-parser.workspace = true
|
||||
prost.workspace = true
|
||||
query.workspace = true
|
||||
rand.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
servers.workspace = true
|
||||
|
||||
@@ -17,7 +17,7 @@ use std::sync::Arc;
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_config::config::Configurable;
|
||||
use common_options::datanode::DatanodeClientOptions;
|
||||
use common_telemetry::logging::{LoggingOptions, TracingOptions};
|
||||
use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions};
|
||||
use meta_client::MetaClientOptions;
|
||||
use query::options::QueryOptions;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -38,7 +38,7 @@ use crate::service_config::{
|
||||
PromStoreOptions,
|
||||
};
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(default)]
|
||||
pub struct FrontendOptions {
|
||||
pub node_id: Option<String>,
|
||||
@@ -61,6 +61,7 @@ pub struct FrontendOptions {
|
||||
pub tracing: TracingOptions,
|
||||
pub query: QueryOptions,
|
||||
pub max_in_flight_write_bytes: Option<ReadableSize>,
|
||||
pub slow_query: Option<SlowQueryOptions>,
|
||||
}
|
||||
|
||||
impl Default for FrontendOptions {
|
||||
@@ -86,6 +87,7 @@ impl Default for FrontendOptions {
|
||||
tracing: TracingOptions::default(),
|
||||
query: QueryOptions::default(),
|
||||
max_in_flight_write_bytes: None,
|
||||
slow_query: Some(SlowQueryOptions::default()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -55,7 +55,6 @@ use query::metrics::OnDone;
|
||||
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
|
||||
use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
|
||||
use query::query_engine::DescribeResult;
|
||||
use query::stats::StatementStatistics;
|
||||
use query::QueryEngineRef;
|
||||
use servers::error as server_error;
|
||||
use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu};
|
||||
@@ -80,6 +79,7 @@ use crate::error::{
|
||||
TableOperationSnafu,
|
||||
};
|
||||
use crate::limiter::LimiterRef;
|
||||
use crate::slow_query_recorder::SlowQueryRecorder;
|
||||
|
||||
/// The frontend instance contains necessary components, and implements many
|
||||
/// traits, like [`servers::query_handler::grpc::GrpcQueryHandler`],
|
||||
@@ -94,7 +94,7 @@ pub struct Instance {
|
||||
inserter: InserterRef,
|
||||
deleter: DeleterRef,
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
stats: StatementStatistics,
|
||||
slow_query_recorder: Option<SlowQueryRecorder>,
|
||||
limiter: Option<LimiterRef>,
|
||||
}
|
||||
|
||||
@@ -166,9 +166,11 @@ impl Instance {
|
||||
let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
|
||||
let query_interceptor = query_interceptor.as_ref();
|
||||
|
||||
let _slow_query_timer = self
|
||||
.stats
|
||||
.start_slow_query_timer(QueryStatement::Sql(stmt.clone()));
|
||||
let _slow_query_timer = if let Some(recorder) = &self.slow_query_recorder {
|
||||
recorder.start(QueryStatement::Sql(stmt.clone()), query_ctx.clone())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let output = match stmt {
|
||||
Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
|
||||
@@ -212,6 +214,7 @@ impl Instance {
|
||||
self.statement_executor.execute_sql(stmt, query_ctx).await
|
||||
}
|
||||
};
|
||||
|
||||
output.context(TableOperationSnafu)
|
||||
}
|
||||
}
|
||||
@@ -374,7 +377,11 @@ impl PrometheusHandler for Instance {
|
||||
}
|
||||
})?;
|
||||
|
||||
let _slow_query_timer = self.stats.start_slow_query_timer(stmt.clone());
|
||||
let _slow_query_timer = if let Some(recorder) = &self.slow_query_recorder {
|
||||
recorder.start(stmt.clone(), query_ctx.clone())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let plan = self
|
||||
.statement_executor
|
||||
|
||||
@@ -34,7 +34,6 @@ use operator::table::TableMutationOperator;
|
||||
use partition::manager::PartitionRuleManager;
|
||||
use pipeline::pipeline_operator::PipelineOperator;
|
||||
use query::region_query::RegionQueryHandlerFactoryRef;
|
||||
use query::stats::StatementStatistics;
|
||||
use query::QueryEngineFactory;
|
||||
use snafu::OptionExt;
|
||||
|
||||
@@ -43,6 +42,7 @@ use crate::frontend::FrontendOptions;
|
||||
use crate::instance::region_query::FrontendRegionQueryHandler;
|
||||
use crate::instance::Instance;
|
||||
use crate::limiter::Limiter;
|
||||
use crate::slow_query_recorder::SlowQueryRecorder;
|
||||
|
||||
/// The frontend [`Instance`] builder.
|
||||
pub struct FrontendBuilder {
|
||||
@@ -54,7 +54,6 @@ pub struct FrontendBuilder {
|
||||
node_manager: NodeManagerRef,
|
||||
plugins: Option<Plugins>,
|
||||
procedure_executor: ProcedureExecutorRef,
|
||||
stats: StatementStatistics,
|
||||
}
|
||||
|
||||
impl FrontendBuilder {
|
||||
@@ -65,7 +64,6 @@ impl FrontendBuilder {
|
||||
catalog_manager: CatalogManagerRef,
|
||||
node_manager: NodeManagerRef,
|
||||
procedure_executor: ProcedureExecutorRef,
|
||||
stats: StatementStatistics,
|
||||
) -> Self {
|
||||
Self {
|
||||
options,
|
||||
@@ -76,7 +74,6 @@ impl FrontendBuilder {
|
||||
node_manager,
|
||||
plugins: None,
|
||||
procedure_executor,
|
||||
stats,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -189,6 +186,17 @@ impl FrontendBuilder {
|
||||
|
||||
plugins.insert::<StatementExecutorRef>(statement_executor.clone());
|
||||
|
||||
let slow_query_recorder = self.options.slow_query.and_then(|opts| {
|
||||
opts.enable.then(|| {
|
||||
SlowQueryRecorder::new(
|
||||
opts.clone(),
|
||||
inserter.clone(),
|
||||
statement_executor.clone(),
|
||||
self.catalog_manager.clone(),
|
||||
)
|
||||
})
|
||||
});
|
||||
|
||||
// Create the limiter if the max_in_flight_write_bytes is set.
|
||||
let limiter = self
|
||||
.options
|
||||
@@ -206,7 +214,7 @@ impl FrontendBuilder {
|
||||
inserter,
|
||||
deleter,
|
||||
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
|
||||
stats: self.stats,
|
||||
slow_query_recorder,
|
||||
limiter,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -22,3 +22,4 @@ pub(crate) mod limiter;
|
||||
pub(crate) mod metrics;
|
||||
pub mod server;
|
||||
pub mod service_config;
|
||||
pub(crate) mod slow_query_recorder;
|
||||
|
||||
@@ -102,6 +102,7 @@ where
|
||||
builder = builder
|
||||
.with_prom_handler(
|
||||
self.instance.clone(),
|
||||
Some(self.instance.clone()),
|
||||
opts.prom_store.with_metric_engine,
|
||||
opts.http.is_strict_mode,
|
||||
)
|
||||
|
||||
531
src/frontend/src/slow_query_recorder.rs
Normal file
531
src/frontend/src/slow_query_recorder.rs
Normal file
@@ -0,0 +1,531 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant, UNIX_EPOCH};
|
||||
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{
|
||||
ColumnDataType, ColumnDef, ColumnSchema, CreateTableExpr, Row, RowInsertRequest,
|
||||
RowInsertRequests, Rows, SemanticType,
|
||||
};
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_catalog::consts::{default_engine, DEFAULT_PRIVATE_SCHEMA_NAME};
|
||||
use common_telemetry::logging::{SlowQueriesRecordType, SlowQueryOptions};
|
||||
use common_telemetry::{debug, error, info, slow};
|
||||
use common_time::timestamp::{TimeUnit, Timestamp};
|
||||
use operator::insert::InserterRef;
|
||||
use operator::statement::StatementExecutorRef;
|
||||
use query::parser::QueryStatement;
|
||||
use rand::random;
|
||||
use session::context::{QueryContextBuilder, QueryContextRef};
|
||||
use snafu::ResultExt;
|
||||
use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
|
||||
use table::TableRef;
|
||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use crate::error::{CatalogSnafu, Result, TableOperationSnafu};
|
||||
|
||||
const SLOW_QUERY_TABLE_NAME: &str = "slow_queries";
|
||||
const SLOW_QUERY_TABLE_COST_COLUMN_NAME: &str = "cost";
|
||||
const SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME: &str = "threshold";
|
||||
const SLOW_QUERY_TABLE_QUERY_COLUMN_NAME: &str = "query";
|
||||
const SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME: &str = "timestamp";
|
||||
const SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME: &str = "is_promql";
|
||||
const SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME: &str = "promql_start";
|
||||
const SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME: &str = "promql_end";
|
||||
const SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME: &str = "promql_range";
|
||||
const SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME: &str = "promql_step";
|
||||
|
||||
const DEFAULT_SLOW_QUERY_TABLE_TTL: &str = "30d";
|
||||
const DEFAULT_SLOW_QUERY_EVENTS_CHANNEL_SIZE: usize = 1024;
|
||||
|
||||
/// SlowQueryRecorder is responsible for recording slow queries.
|
||||
#[derive(Clone)]
|
||||
pub struct SlowQueryRecorder {
|
||||
tx: Sender<SlowQueryEvent>,
|
||||
slow_query_opts: SlowQueryOptions,
|
||||
_handle: Arc<JoinHandle<()>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct SlowQueryEvent {
|
||||
cost: u64,
|
||||
threshold: u64,
|
||||
query: String,
|
||||
is_promql: bool,
|
||||
query_ctx: QueryContextRef,
|
||||
promql_range: Option<u64>,
|
||||
promql_step: Option<u64>,
|
||||
promql_start: Option<i64>,
|
||||
promql_end: Option<i64>,
|
||||
}
|
||||
|
||||
impl SlowQueryRecorder {
|
||||
/// Create a new SlowQueryRecorder.
|
||||
pub fn new(
|
||||
slow_query_opts: SlowQueryOptions,
|
||||
inserter: InserterRef,
|
||||
statement_executor: StatementExecutorRef,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
) -> Self {
|
||||
let (tx, rx) = channel(DEFAULT_SLOW_QUERY_EVENTS_CHANNEL_SIZE);
|
||||
|
||||
let ttl = slow_query_opts
|
||||
.ttl
|
||||
.clone()
|
||||
.unwrap_or(DEFAULT_SLOW_QUERY_TABLE_TTL.to_string());
|
||||
|
||||
// Start a new task to process the slow query events.
|
||||
let event_handler = SlowQueryEventHandler {
|
||||
inserter,
|
||||
statement_executor,
|
||||
catalog_manager,
|
||||
rx,
|
||||
record_type: slow_query_opts.record_type,
|
||||
ttl,
|
||||
};
|
||||
|
||||
// Start a new background task to process the slow query events.
|
||||
let handle = tokio::spawn(async move {
|
||||
event_handler.process_slow_query().await;
|
||||
});
|
||||
|
||||
Self {
|
||||
tx,
|
||||
slow_query_opts,
|
||||
_handle: Arc::new(handle),
|
||||
}
|
||||
}
|
||||
|
||||
/// Starts a new SlowQueryTimer. Returns `None` if `slow_query.enable` is false.
|
||||
/// The timer sets the start time when created and calculates the elapsed duration when dropped.
|
||||
pub fn start(
|
||||
&self,
|
||||
stmt: QueryStatement,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Option<SlowQueryTimer> {
|
||||
if self.slow_query_opts.enable {
|
||||
Some(SlowQueryTimer {
|
||||
stmt,
|
||||
query_ctx,
|
||||
start: Instant::now(), // Set the initial start time.
|
||||
threshold: self.slow_query_opts.threshold,
|
||||
sample_ratio: self.slow_query_opts.sample_ratio,
|
||||
tx: self.tx.clone(),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct SlowQueryEventHandler {
|
||||
inserter: InserterRef,
|
||||
statement_executor: StatementExecutorRef,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
rx: Receiver<SlowQueryEvent>,
|
||||
record_type: SlowQueriesRecordType,
|
||||
ttl: String,
|
||||
}
|
||||
|
||||
impl SlowQueryEventHandler {
|
||||
async fn process_slow_query(mut self) {
|
||||
info!(
|
||||
"Start the background handler to process slow query events and record them in {:?}.",
|
||||
self.record_type
|
||||
);
|
||||
while let Some(event) = self.rx.recv().await {
|
||||
self.record_slow_query(event).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn record_slow_query(&self, event: SlowQueryEvent) {
|
||||
match self.record_type {
|
||||
SlowQueriesRecordType::Log => {
|
||||
// Record the slow query in a specific logs file.
|
||||
slow!(
|
||||
cost = event.cost,
|
||||
threshold = event.threshold,
|
||||
query = event.query,
|
||||
is_promql = event.is_promql,
|
||||
promql_range = event.promql_range,
|
||||
promql_step = event.promql_step,
|
||||
promql_start = event.promql_start,
|
||||
promql_end = event.promql_end,
|
||||
);
|
||||
}
|
||||
SlowQueriesRecordType::SystemTable => {
|
||||
// Record the slow query in a system table that is stored in greptimedb itself.
|
||||
if let Err(e) = self.insert_slow_query(&event).await {
|
||||
error!(e; "Failed to insert slow query, query: {:?}", event);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn insert_slow_query(&self, event: &SlowQueryEvent) -> Result<()> {
|
||||
debug!("Handle the slow query event: {:?}", event);
|
||||
|
||||
let table = if let Some(table) = self
|
||||
.catalog_manager
|
||||
.table(
|
||||
event.query_ctx.current_catalog(),
|
||||
DEFAULT_PRIVATE_SCHEMA_NAME,
|
||||
SLOW_QUERY_TABLE_NAME,
|
||||
Some(&event.query_ctx),
|
||||
)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
{
|
||||
table
|
||||
} else {
|
||||
// Create the system table if it doesn't exist.
|
||||
self.create_system_table(event.query_ctx.clone()).await?
|
||||
};
|
||||
|
||||
let insert = RowInsertRequest {
|
||||
table_name: SLOW_QUERY_TABLE_NAME.to_string(),
|
||||
rows: Some(Rows {
|
||||
schema: self.build_insert_column_schema(),
|
||||
rows: vec![Row {
|
||||
values: vec![
|
||||
ValueData::U64Value(event.cost).into(),
|
||||
ValueData::U64Value(event.threshold).into(),
|
||||
ValueData::StringValue(event.query.to_string()).into(),
|
||||
ValueData::BoolValue(event.is_promql).into(),
|
||||
ValueData::TimestampNanosecondValue(
|
||||
Timestamp::current_time(TimeUnit::Nanosecond).value(),
|
||||
)
|
||||
.into(),
|
||||
ValueData::U64Value(event.promql_range.unwrap_or(0)).into(),
|
||||
ValueData::U64Value(event.promql_step.unwrap_or(0)).into(),
|
||||
ValueData::TimestampMillisecondValue(event.promql_start.unwrap_or(0))
|
||||
.into(),
|
||||
ValueData::TimestampMillisecondValue(event.promql_end.unwrap_or(0)).into(),
|
||||
],
|
||||
}],
|
||||
}),
|
||||
};
|
||||
|
||||
let requests = RowInsertRequests {
|
||||
inserts: vec![insert],
|
||||
};
|
||||
|
||||
let table_info = table.table_info();
|
||||
let query_ctx = QueryContextBuilder::default()
|
||||
.current_catalog(table_info.catalog_name.to_string())
|
||||
.current_schema(table_info.schema_name.to_string())
|
||||
.build()
|
||||
.into();
|
||||
|
||||
self.inserter
|
||||
.handle_row_inserts(requests, query_ctx, &self.statement_executor)
|
||||
.await
|
||||
.context(TableOperationSnafu)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn create_system_table(&self, query_ctx: QueryContextRef) -> Result<TableRef> {
|
||||
let mut create_table_expr = self.build_create_table_expr(query_ctx.current_catalog());
|
||||
if let Some(table) = self
|
||||
.catalog_manager
|
||||
.table(
|
||||
&create_table_expr.catalog_name,
|
||||
&create_table_expr.schema_name,
|
||||
&create_table_expr.table_name,
|
||||
Some(&query_ctx),
|
||||
)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
{
|
||||
// The table is already created, so we don't need to create it again.
|
||||
return Ok(table);
|
||||
}
|
||||
|
||||
// Create the `slow_queries` system table.
|
||||
let table = self
|
||||
.statement_executor
|
||||
.create_table_inner(&mut create_table_expr, None, query_ctx.clone())
|
||||
.await
|
||||
.context(TableOperationSnafu)?;
|
||||
|
||||
info!(
|
||||
"Create the {} system table in {:?} successfully.",
|
||||
SLOW_QUERY_TABLE_NAME, DEFAULT_PRIVATE_SCHEMA_NAME
|
||||
);
|
||||
|
||||
Ok(table)
|
||||
}
|
||||
|
||||
fn build_create_table_expr(&self, catalog: &str) -> CreateTableExpr {
|
||||
let column_defs = vec![
|
||||
ColumnDef {
|
||||
name: SLOW_QUERY_TABLE_COST_COLUMN_NAME.to_string(),
|
||||
data_type: ColumnDataType::Uint64 as i32,
|
||||
is_nullable: false,
|
||||
default_constraint: vec![],
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
comment: "The cost of the slow query in milliseconds".to_string(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
ColumnDef {
|
||||
name: SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME.to_string(),
|
||||
data_type: ColumnDataType::Uint64 as i32,
|
||||
is_nullable: false,
|
||||
default_constraint: vec![],
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
comment:
|
||||
"When the query cost exceeds this value, it will be recorded as a slow query"
|
||||
.to_string(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
ColumnDef {
|
||||
name: SLOW_QUERY_TABLE_QUERY_COLUMN_NAME.to_string(),
|
||||
data_type: ColumnDataType::String as i32,
|
||||
is_nullable: false,
|
||||
default_constraint: vec![],
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
comment: "The original query statement".to_string(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
ColumnDef {
|
||||
name: SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME.to_string(),
|
||||
data_type: ColumnDataType::Boolean as i32,
|
||||
is_nullable: false,
|
||||
default_constraint: vec![],
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
comment: "Whether the query is a PromQL query".to_string(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
ColumnDef {
|
||||
name: SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME.to_string(),
|
||||
data_type: ColumnDataType::TimestampNanosecond as i32,
|
||||
is_nullable: false,
|
||||
default_constraint: vec![],
|
||||
semantic_type: SemanticType::Timestamp as i32,
|
||||
comment: "The timestamp of the slow query".to_string(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
ColumnDef {
|
||||
name: SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME.to_string(),
|
||||
data_type: ColumnDataType::Uint64 as i32,
|
||||
is_nullable: false,
|
||||
default_constraint: vec![],
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
comment: "The time range of the PromQL query in milliseconds".to_string(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
ColumnDef {
|
||||
name: SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME.to_string(),
|
||||
data_type: ColumnDataType::Uint64 as i32,
|
||||
is_nullable: false,
|
||||
default_constraint: vec![],
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
comment: "The step of the PromQL query in milliseconds".to_string(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
ColumnDef {
|
||||
name: SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME.to_string(),
|
||||
data_type: ColumnDataType::TimestampMillisecond as i32,
|
||||
is_nullable: false,
|
||||
default_constraint: vec![],
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
comment: "The start timestamp of the PromQL query in milliseconds".to_string(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
ColumnDef {
|
||||
name: SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME.to_string(),
|
||||
data_type: ColumnDataType::TimestampMillisecond as i32,
|
||||
is_nullable: false,
|
||||
default_constraint: vec![],
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
comment: "The end timestamp of the PromQL query in milliseconds".to_string(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
];
|
||||
|
||||
let table_options = HashMap::from([
|
||||
(APPEND_MODE_KEY.to_string(), "true".to_string()),
|
||||
(TTL_KEY.to_string(), self.ttl.to_string()),
|
||||
]);
|
||||
|
||||
CreateTableExpr {
|
||||
catalog_name: catalog.to_string(),
|
||||
schema_name: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(), // Always to store in the `greptime_private` schema.
|
||||
table_name: SLOW_QUERY_TABLE_NAME.to_string(),
|
||||
desc: "GreptimeDB system table for storing slow queries".to_string(),
|
||||
column_defs,
|
||||
time_index: SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME.to_string(),
|
||||
primary_keys: vec![],
|
||||
create_if_not_exists: true,
|
||||
table_options,
|
||||
table_id: None,
|
||||
engine: default_engine().to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
fn build_insert_column_schema(&self) -> Vec<ColumnSchema> {
|
||||
vec![
|
||||
ColumnSchema {
|
||||
column_name: SLOW_QUERY_TABLE_COST_COLUMN_NAME.to_string(),
|
||||
datatype: ColumnDataType::Uint64.into(),
|
||||
semantic_type: SemanticType::Field.into(),
|
||||
..Default::default()
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME.to_string(),
|
||||
datatype: ColumnDataType::Uint64.into(),
|
||||
semantic_type: SemanticType::Field.into(),
|
||||
..Default::default()
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: SLOW_QUERY_TABLE_QUERY_COLUMN_NAME.to_string(),
|
||||
datatype: ColumnDataType::String.into(),
|
||||
semantic_type: SemanticType::Field.into(),
|
||||
..Default::default()
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME.to_string(),
|
||||
datatype: ColumnDataType::Boolean.into(),
|
||||
semantic_type: SemanticType::Field.into(),
|
||||
..Default::default()
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME.to_string(),
|
||||
datatype: ColumnDataType::TimestampNanosecond.into(),
|
||||
semantic_type: SemanticType::Timestamp.into(),
|
||||
..Default::default()
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME.to_string(),
|
||||
datatype: ColumnDataType::Uint64.into(),
|
||||
semantic_type: SemanticType::Field.into(),
|
||||
..Default::default()
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME.to_string(),
|
||||
datatype: ColumnDataType::Uint64.into(),
|
||||
semantic_type: SemanticType::Field.into(),
|
||||
..Default::default()
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME.to_string(),
|
||||
datatype: ColumnDataType::TimestampMillisecond.into(),
|
||||
semantic_type: SemanticType::Field.into(),
|
||||
..Default::default()
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME.to_string(),
|
||||
datatype: ColumnDataType::TimestampMillisecond.into(),
|
||||
semantic_type: SemanticType::Field.into(),
|
||||
..Default::default()
|
||||
},
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
/// SlowQueryTimer is used to log slow query when it's dropped.
|
||||
/// In drop(), it will check if the query is slow and send the slow query event to the handler.
|
||||
pub struct SlowQueryTimer {
|
||||
start: Instant,
|
||||
stmt: QueryStatement,
|
||||
query_ctx: QueryContextRef,
|
||||
threshold: Option<Duration>,
|
||||
sample_ratio: Option<f64>,
|
||||
tx: Sender<SlowQueryEvent>,
|
||||
}
|
||||
|
||||
impl SlowQueryTimer {
|
||||
fn send_slow_query_event(&self, elapsed: Duration, threshold: Duration) {
|
||||
let mut slow_query_event = SlowQueryEvent {
|
||||
cost: elapsed.as_millis() as u64,
|
||||
threshold: threshold.as_millis() as u64,
|
||||
query: "".to_string(),
|
||||
query_ctx: self.query_ctx.clone(),
|
||||
|
||||
// The following fields are only used for PromQL queries.
|
||||
is_promql: false,
|
||||
promql_range: None,
|
||||
promql_step: None,
|
||||
promql_start: None,
|
||||
promql_end: None,
|
||||
};
|
||||
|
||||
match &self.stmt {
|
||||
QueryStatement::Promql(stmt) => {
|
||||
slow_query_event.is_promql = true;
|
||||
slow_query_event.query = stmt.expr.to_string();
|
||||
slow_query_event.promql_step = Some(stmt.interval.as_millis() as u64);
|
||||
|
||||
let start = stmt
|
||||
.start
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_millis() as i64;
|
||||
|
||||
let end = stmt
|
||||
.end
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_millis() as i64;
|
||||
|
||||
slow_query_event.promql_range = Some((end - start) as u64);
|
||||
slow_query_event.promql_start = Some(start);
|
||||
slow_query_event.promql_end = Some(end);
|
||||
}
|
||||
QueryStatement::Sql(stmt) => {
|
||||
slow_query_event.query = stmt.to_string();
|
||||
}
|
||||
}
|
||||
|
||||
// Send SlowQueryEvent to the handler.
|
||||
if let Err(e) = self.tx.try_send(slow_query_event) {
|
||||
error!(e; "Failed to send slow query event");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for SlowQueryTimer {
|
||||
fn drop(&mut self) {
|
||||
if let Some(threshold) = self.threshold {
|
||||
// Calculate the elaspsed duration since the timer is created.
|
||||
let elapsed = self.start.elapsed();
|
||||
if elapsed > threshold {
|
||||
if let Some(ratio) = self.sample_ratio {
|
||||
// Only capture a portion of slow queries based on sample_ratio.
|
||||
// Generate a random number in [0, 1) and compare it with sample_ratio.
|
||||
if ratio >= 1.0 || random::<f64>() <= ratio {
|
||||
self.send_slow_query_event(elapsed, threshold);
|
||||
}
|
||||
} else {
|
||||
// Captures all slow queries if sample_ratio is not set.
|
||||
self.send_slow_query_event(elapsed, threshold);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::collections::BTreeSet;
|
||||
use std::ops::Range;
|
||||
|
||||
use fastbloom::BloomFilter;
|
||||
@@ -25,10 +25,10 @@ use crate::Bytes;
|
||||
|
||||
/// `InListPredicate` contains a list of acceptable values. A value needs to match at least
|
||||
/// one of the elements (logical OR semantic) for the predicate to be satisfied.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct InListPredicate {
|
||||
/// List of acceptable values.
|
||||
pub list: HashSet<Bytes>,
|
||||
pub list: BTreeSet<Bytes>,
|
||||
}
|
||||
|
||||
pub struct BloomFilterApplier {
|
||||
@@ -277,21 +277,21 @@ mod tests {
|
||||
// Single value predicates
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"row00".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"row00".to_vec()]),
|
||||
}],
|
||||
0..28,
|
||||
vec![0..4],
|
||||
),
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"row05".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"row05".to_vec()]),
|
||||
}],
|
||||
4..8,
|
||||
vec![4..8],
|
||||
),
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"row03".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"row03".to_vec()]),
|
||||
}],
|
||||
4..8,
|
||||
vec![],
|
||||
@@ -299,14 +299,14 @@ mod tests {
|
||||
// Multiple values in a single predicate (OR logic)
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"overl".to_vec(), b"row06".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"overl".to_vec(), b"row06".to_vec()]),
|
||||
}],
|
||||
0..28,
|
||||
vec![0..8],
|
||||
),
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"seg01".to_vec(), b"overp".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"seg01".to_vec(), b"overp".to_vec()]),
|
||||
}],
|
||||
0..28,
|
||||
vec![4..12],
|
||||
@@ -314,7 +314,7 @@ mod tests {
|
||||
// Non-existent values
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"row99".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"row99".to_vec()]),
|
||||
}],
|
||||
0..28,
|
||||
vec![],
|
||||
@@ -322,7 +322,7 @@ mod tests {
|
||||
// Empty range
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"row00".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"row00".to_vec()]),
|
||||
}],
|
||||
12..12,
|
||||
vec![],
|
||||
@@ -330,21 +330,21 @@ mod tests {
|
||||
// Multiple values in a single predicate within specific ranges
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"row04".to_vec(), b"row05".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"row04".to_vec(), b"row05".to_vec()]),
|
||||
}],
|
||||
0..12,
|
||||
vec![4..8],
|
||||
),
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"seg01".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"seg01".to_vec()]),
|
||||
}],
|
||||
0..28,
|
||||
vec![4..8],
|
||||
),
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"seg01".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"seg01".to_vec()]),
|
||||
}],
|
||||
6..28,
|
||||
vec![6..8],
|
||||
@@ -352,21 +352,21 @@ mod tests {
|
||||
// Values spanning multiple segments
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"overl".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"overl".to_vec()]),
|
||||
}],
|
||||
0..28,
|
||||
vec![0..8],
|
||||
),
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"overl".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"overl".to_vec()]),
|
||||
}],
|
||||
2..28,
|
||||
vec![2..8],
|
||||
),
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"overp".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"overp".to_vec()]),
|
||||
}],
|
||||
0..10,
|
||||
vec![4..10],
|
||||
@@ -374,21 +374,21 @@ mod tests {
|
||||
// Duplicate values
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"dup".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"dup".to_vec()]),
|
||||
}],
|
||||
0..12,
|
||||
vec![],
|
||||
),
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"dup".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"dup".to_vec()]),
|
||||
}],
|
||||
0..16,
|
||||
vec![12..16],
|
||||
),
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"dup".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"dup".to_vec()]),
|
||||
}],
|
||||
0..28,
|
||||
vec![12..28],
|
||||
@@ -397,10 +397,10 @@ mod tests {
|
||||
(
|
||||
vec![
|
||||
InListPredicate {
|
||||
list: HashSet::from_iter([b"row00".to_vec(), b"row01".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"row00".to_vec(), b"row01".to_vec()]),
|
||||
},
|
||||
InListPredicate {
|
||||
list: HashSet::from_iter([b"seg00".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"seg00".to_vec()]),
|
||||
},
|
||||
],
|
||||
0..28,
|
||||
@@ -409,10 +409,10 @@ mod tests {
|
||||
(
|
||||
vec![
|
||||
InListPredicate {
|
||||
list: HashSet::from_iter([b"overl".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"overl".to_vec()]),
|
||||
},
|
||||
InListPredicate {
|
||||
list: HashSet::from_iter([b"seg01".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"seg01".to_vec()]),
|
||||
},
|
||||
],
|
||||
0..28,
|
||||
|
||||
@@ -183,7 +183,7 @@ impl TryFrom<Vec<Predicate>> for IntersectionFstApplier {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashSet;
|
||||
use std::collections::BTreeSet;
|
||||
|
||||
use super::*;
|
||||
use crate::inverted_index::error::Error;
|
||||
@@ -405,7 +405,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_intersection_fst_applier_with_in_list_predicate() {
|
||||
let result = IntersectionFstApplier::try_from(vec![Predicate::InList(InListPredicate {
|
||||
list: HashSet::from_iter([b"one".to_vec(), b"two".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"one".to_vec(), b"two".to_vec()]),
|
||||
})]);
|
||||
assert!(matches!(
|
||||
result,
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::collections::BTreeSet;
|
||||
use std::mem::size_of;
|
||||
|
||||
use snafu::{ensure, ResultExt};
|
||||
@@ -93,7 +93,7 @@ impl KeysFstApplier {
|
||||
|
||||
fn intersect_with_lists(in_lists: &mut [Predicate]) -> Vec<Bytes> {
|
||||
#[inline]
|
||||
fn get_list(p: &Predicate) -> &HashSet<Bytes> {
|
||||
fn get_list(p: &Predicate) -> &BTreeSet<Bytes> {
|
||||
match p {
|
||||
Predicate::InList(i) => &i.list,
|
||||
_ => unreachable!(), // `in_lists` is filtered by `split_at_in_lists`
|
||||
@@ -229,7 +229,7 @@ mod tests {
|
||||
fn test_keys_fst_applier_try_from() {
|
||||
let predicates = vec![
|
||||
Predicate::InList(InListPredicate {
|
||||
list: HashSet::from_iter(vec![b("foo"), b("bar")]),
|
||||
list: BTreeSet::from_iter(vec![b("foo"), b("bar")]),
|
||||
}),
|
||||
Predicate::Range(RangePredicate {
|
||||
range: Range {
|
||||
@@ -252,7 +252,7 @@ mod tests {
|
||||
fn test_keys_fst_applier_try_from_filter_out_unmatched_keys() {
|
||||
let predicates = vec![
|
||||
Predicate::InList(InListPredicate {
|
||||
list: HashSet::from_iter(vec![b("foo"), b("bar")]),
|
||||
list: BTreeSet::from_iter(vec![b("foo"), b("bar")]),
|
||||
}),
|
||||
Predicate::Range(RangePredicate {
|
||||
range: Range {
|
||||
@@ -300,7 +300,7 @@ mod tests {
|
||||
fn test_keys_fst_applier_try_from_with_invalid_regex() {
|
||||
let predicates = vec![
|
||||
Predicate::InList(InListPredicate {
|
||||
list: HashSet::from_iter(vec![b("foo"), b("bar")]),
|
||||
list: BTreeSet::from_iter(vec![b("foo"), b("bar")]),
|
||||
}),
|
||||
Predicate::RegexMatch(RegexMatchPredicate {
|
||||
pattern: "*invalid regex".to_string(),
|
||||
|
||||
@@ -12,12 +12,12 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::collections::BTreeSet;
|
||||
|
||||
use crate::Bytes;
|
||||
|
||||
/// Enumerates types of predicates for value filtering.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub enum Predicate {
|
||||
/// Predicate for matching values in a list.
|
||||
InList(InListPredicate),
|
||||
@@ -31,14 +31,14 @@ pub enum Predicate {
|
||||
|
||||
/// `InListPredicate` contains a list of acceptable values. A value needs to match at least
|
||||
/// one of the elements (logical OR semantic) for the predicate to be satisfied.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct InListPredicate {
|
||||
/// List of acceptable values.
|
||||
pub list: HashSet<Bytes>,
|
||||
pub list: BTreeSet<Bytes>,
|
||||
}
|
||||
|
||||
/// `Bound` is a sub-component of a range, representing a single-sided limit that could be inclusive or exclusive.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct Bound {
|
||||
/// Whether the bound is inclusive or exclusive.
|
||||
pub inclusive: bool,
|
||||
@@ -48,7 +48,7 @@ pub struct Bound {
|
||||
|
||||
/// `Range` defines a single continuous range which can optionally have a lower and/or upper limit.
|
||||
/// Both the lower and upper bounds must be satisfied for the range condition to be true.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct Range {
|
||||
/// The lower bound of the range.
|
||||
pub lower: Option<Bound>,
|
||||
@@ -58,7 +58,7 @@ pub struct Range {
|
||||
|
||||
/// `RangePredicate` encapsulates a range condition that must be satisfied
|
||||
/// for the predicate to hold true (logical AND semantic between the bounds).
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct RangePredicate {
|
||||
/// The range condition.
|
||||
pub range: Range,
|
||||
@@ -66,7 +66,7 @@ pub struct RangePredicate {
|
||||
|
||||
/// `RegexMatchPredicate` encapsulates a single regex pattern. A value must match
|
||||
/// the pattern for the predicate to be satisfied.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct RegexMatchPredicate {
|
||||
/// The regex pattern.
|
||||
pub pattern: String,
|
||||
|
||||
@@ -182,6 +182,14 @@ impl ClientManager {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl ClientManager {
|
||||
/// Returns the controller client.
|
||||
pub(crate) fn controller_client(&self) -> rskafka::client::controller::ControllerClient {
|
||||
self.client.controller_client().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_wal::test_util::run_test_with_kafka_wal;
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user