mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-26 16:10:02 +00:00
Compare commits
18 Commits
test/skope
...
feat/geo-f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0bff41e038 | ||
|
|
e56709b545 | ||
|
|
a64eb0a5bc | ||
|
|
2e4ab6dd91 | ||
|
|
f761798f93 | ||
|
|
420446f19f | ||
|
|
841e66c810 | ||
|
|
d1c635085c | ||
|
|
47657ebbc8 | ||
|
|
64ae32def0 | ||
|
|
744946957e | ||
|
|
d5455db2d5 | ||
|
|
28bf549907 | ||
|
|
4ea412249a | ||
|
|
eacc7bc471 | ||
|
|
b72d3bc71d | ||
|
|
0b102ef846 | ||
|
|
e404e9dafc |
@@ -4,9 +4,6 @@ inputs:
|
||||
arch:
|
||||
description: Architecture to build
|
||||
required: true
|
||||
rust-toolchain:
|
||||
description: Rust toolchain to use
|
||||
required: true
|
||||
cargo-profile:
|
||||
description: Cargo profile to build
|
||||
required: true
|
||||
@@ -43,9 +40,8 @@ runs:
|
||||
brew install protobuf
|
||||
|
||||
- name: Install rust toolchain
|
||||
uses: dtolnay/rust-toolchain@master
|
||||
uses: actions-rust-lang/setup-rust-toolchain@v1
|
||||
with:
|
||||
toolchain: ${{ inputs.rust-toolchain }}
|
||||
targets: ${{ inputs.arch }}
|
||||
|
||||
- name: Start etcd # For integration tests.
|
||||
|
||||
@@ -4,9 +4,6 @@ inputs:
|
||||
arch:
|
||||
description: Architecture to build
|
||||
required: true
|
||||
rust-toolchain:
|
||||
description: Rust toolchain to use
|
||||
required: true
|
||||
cargo-profile:
|
||||
description: Cargo profile to build
|
||||
required: true
|
||||
@@ -28,9 +25,8 @@ runs:
|
||||
- uses: arduino/setup-protoc@v3
|
||||
|
||||
- name: Install rust toolchain
|
||||
uses: dtolnay/rust-toolchain@master
|
||||
uses: actions-rust-lang/setup-rust-toolchain@v1
|
||||
with:
|
||||
toolchain: ${{ inputs.rust-toolchain }}
|
||||
targets: ${{ inputs.arch }}
|
||||
components: llvm-tools-preview
|
||||
|
||||
|
||||
7
.github/workflows/apidoc.yml
vendored
7
.github/workflows/apidoc.yml
vendored
@@ -12,9 +12,6 @@ on:
|
||||
|
||||
name: Build API docs
|
||||
|
||||
env:
|
||||
RUST_TOOLCHAIN: nightly-2024-06-06
|
||||
|
||||
jobs:
|
||||
apidoc:
|
||||
runs-on: ubuntu-20.04
|
||||
@@ -23,9 +20,7 @@ jobs:
|
||||
- uses: arduino/setup-protoc@v3
|
||||
with:
|
||||
repo-token: ${{ secrets.GITHUB_TOKEN }}
|
||||
- uses: dtolnay/rust-toolchain@master
|
||||
with:
|
||||
toolchain: ${{ env.RUST_TOOLCHAIN }}
|
||||
- uses: actions-rust-lang/setup-rust-toolchain@v1
|
||||
- run: cargo doc --workspace --no-deps --document-private-items
|
||||
- run: |
|
||||
cat <<EOF > target/doc/index.html
|
||||
|
||||
70
.github/workflows/develop.yml
vendored
70
.github/workflows/develop.yml
vendored
@@ -29,9 +29,6 @@ concurrency:
|
||||
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
|
||||
cancel-in-progress: true
|
||||
|
||||
env:
|
||||
RUST_TOOLCHAIN: nightly-2024-06-06
|
||||
|
||||
jobs:
|
||||
check-typos-and-docs:
|
||||
name: Check typos and docs
|
||||
@@ -64,9 +61,7 @@ jobs:
|
||||
- uses: arduino/setup-protoc@v3
|
||||
with:
|
||||
repo-token: ${{ secrets.GITHUB_TOKEN }}
|
||||
- uses: dtolnay/rust-toolchain@master
|
||||
with:
|
||||
toolchain: ${{ env.RUST_TOOLCHAIN }}
|
||||
- uses: actions-rust-lang/setup-rust-toolchain@v1
|
||||
- name: Rust Cache
|
||||
uses: Swatinem/rust-cache@v2
|
||||
with:
|
||||
@@ -82,9 +77,7 @@ jobs:
|
||||
timeout-minutes: 60
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: dtolnay/rust-toolchain@master
|
||||
with:
|
||||
toolchain: stable
|
||||
- uses: actions-rust-lang/setup-rust-toolchain@v1
|
||||
- name: Rust Cache
|
||||
uses: Swatinem/rust-cache@v2
|
||||
with:
|
||||
@@ -107,9 +100,7 @@ jobs:
|
||||
- uses: arduino/setup-protoc@v3
|
||||
with:
|
||||
repo-token: ${{ secrets.GITHUB_TOKEN }}
|
||||
- uses: dtolnay/rust-toolchain@master
|
||||
with:
|
||||
toolchain: ${{ env.RUST_TOOLCHAIN }}
|
||||
- uses: actions-rust-lang/setup-rust-toolchain@v1
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
with:
|
||||
# Shares across multiple jobs
|
||||
@@ -161,9 +152,7 @@ jobs:
|
||||
- uses: arduino/setup-protoc@v3
|
||||
with:
|
||||
repo-token: ${{ secrets.GITHUB_TOKEN }}
|
||||
- uses: dtolnay/rust-toolchain@master
|
||||
with:
|
||||
toolchain: ${{ env.RUST_TOOLCHAIN }}
|
||||
- uses: actions-rust-lang/setup-rust-toolchain@v1
|
||||
- name: Rust Cache
|
||||
uses: Swatinem/rust-cache@v2
|
||||
with:
|
||||
@@ -181,7 +170,7 @@ jobs:
|
||||
name: bins
|
||||
path: .
|
||||
- name: Unzip binaries
|
||||
run: |
|
||||
run: |
|
||||
tar -xvf ./bins.tar.gz
|
||||
rm ./bins.tar.gz
|
||||
- name: Run GreptimeDB
|
||||
@@ -221,9 +210,7 @@ jobs:
|
||||
- uses: arduino/setup-protoc@v3
|
||||
with:
|
||||
repo-token: ${{ secrets.GITHUB_TOKEN }}
|
||||
- uses: dtolnay/rust-toolchain@master
|
||||
with:
|
||||
toolchain: ${{ env.RUST_TOOLCHAIN }}
|
||||
- uses: actions-rust-lang/setup-rust-toolchain@v1
|
||||
- name: Rust Cache
|
||||
uses: Swatinem/rust-cache@v2
|
||||
with:
|
||||
@@ -274,9 +261,7 @@ jobs:
|
||||
- uses: arduino/setup-protoc@v3
|
||||
with:
|
||||
repo-token: ${{ secrets.GITHUB_TOKEN }}
|
||||
- uses: dtolnay/rust-toolchain@master
|
||||
with:
|
||||
toolchain: ${{ env.RUST_TOOLCHAIN }}
|
||||
- uses: actions-rust-lang/setup-rust-toolchain@v1
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
with:
|
||||
# Shares across multiple jobs
|
||||
@@ -287,7 +272,7 @@ jobs:
|
||||
- name: Build greptime bianry
|
||||
shell: bash
|
||||
# `cargo gc` will invoke `cargo build` with specified args
|
||||
run: cargo gc --profile ci -- --bin greptime
|
||||
run: cargo gc --profile ci -- --bin greptime
|
||||
- name: Pack greptime binary
|
||||
shell: bash
|
||||
run: |
|
||||
@@ -301,7 +286,7 @@ jobs:
|
||||
artifacts-dir: bin
|
||||
version: current
|
||||
|
||||
distributed-fuzztest:
|
||||
distributed-fuzztest:
|
||||
name: Fuzz Test (Distributed, ${{ matrix.mode.name }}, ${{ matrix.target }})
|
||||
runs-on: ubuntu-latest
|
||||
needs: build-greptime-ci
|
||||
@@ -344,9 +329,7 @@ jobs:
|
||||
- uses: arduino/setup-protoc@v3
|
||||
with:
|
||||
repo-token: ${{ secrets.GITHUB_TOKEN }}
|
||||
- uses: dtolnay/rust-toolchain@master
|
||||
with:
|
||||
toolchain: ${{ env.RUST_TOOLCHAIN }}
|
||||
- uses: actions-rust-lang/setup-rust-toolchain@v1
|
||||
- name: Rust Cache
|
||||
uses: Swatinem/rust-cache@v2
|
||||
with:
|
||||
@@ -416,12 +399,12 @@ jobs:
|
||||
- name: Describe Nodes
|
||||
if: failure()
|
||||
shell: bash
|
||||
run: |
|
||||
kubectl describe nodes
|
||||
run: |
|
||||
kubectl describe nodes
|
||||
- name: Export kind logs
|
||||
if: failure()
|
||||
shell: bash
|
||||
run: |
|
||||
run: |
|
||||
kind export logs /tmp/kind
|
||||
- name: Upload logs
|
||||
if: failure()
|
||||
@@ -433,13 +416,13 @@ jobs:
|
||||
- name: Delete cluster
|
||||
if: success()
|
||||
shell: bash
|
||||
run: |
|
||||
run: |
|
||||
kind delete cluster
|
||||
docker stop $(docker ps -a -q)
|
||||
docker rm $(docker ps -a -q)
|
||||
docker system prune -f
|
||||
|
||||
distributed-fuzztest-with-chaos:
|
||||
distributed-fuzztest-with-chaos:
|
||||
name: Fuzz Test with Chaos (Distributed, ${{ matrix.mode.name }}, ${{ matrix.target }})
|
||||
runs-on: ubuntu-latest
|
||||
needs: build-greptime-ci
|
||||
@@ -447,7 +430,7 @@ jobs:
|
||||
strategy:
|
||||
matrix:
|
||||
target: ["fuzz_migrate_mito_regions", "fuzz_failover_mito_regions", "fuzz_failover_metric_regions"]
|
||||
mode:
|
||||
mode:
|
||||
- name: "Remote WAL"
|
||||
minio: true
|
||||
kafka: true
|
||||
@@ -484,9 +467,7 @@ jobs:
|
||||
- uses: arduino/setup-protoc@v3
|
||||
with:
|
||||
repo-token: ${{ secrets.GITHUB_TOKEN }}
|
||||
- uses: dtolnay/rust-toolchain@master
|
||||
with:
|
||||
toolchain: ${{ env.RUST_TOOLCHAIN }}
|
||||
- uses: actions-rust-lang/setup-rust-toolchain@v1
|
||||
- name: Rust Cache
|
||||
uses: Swatinem/rust-cache@v2
|
||||
with:
|
||||
@@ -557,12 +538,12 @@ jobs:
|
||||
- name: Describe Nodes
|
||||
if: failure()
|
||||
shell: bash
|
||||
run: |
|
||||
kubectl describe nodes
|
||||
run: |
|
||||
kubectl describe nodes
|
||||
- name: Export kind logs
|
||||
if: failure()
|
||||
shell: bash
|
||||
run: |
|
||||
run: |
|
||||
kind export logs /tmp/kind
|
||||
- name: Upload logs
|
||||
if: failure()
|
||||
@@ -574,7 +555,7 @@ jobs:
|
||||
- name: Delete cluster
|
||||
if: success()
|
||||
shell: bash
|
||||
run: |
|
||||
run: |
|
||||
kind delete cluster
|
||||
docker stop $(docker ps -a -q)
|
||||
docker rm $(docker ps -a -q)
|
||||
@@ -627,9 +608,8 @@ jobs:
|
||||
- uses: arduino/setup-protoc@v3
|
||||
with:
|
||||
repo-token: ${{ secrets.GITHUB_TOKEN }}
|
||||
- uses: dtolnay/rust-toolchain@master
|
||||
- uses: actions-rust-lang/setup-rust-toolchain@v1
|
||||
with:
|
||||
toolchain: ${{ env.RUST_TOOLCHAIN }}
|
||||
components: rustfmt
|
||||
- name: Rust Cache
|
||||
uses: Swatinem/rust-cache@v2
|
||||
@@ -648,9 +628,8 @@ jobs:
|
||||
- uses: arduino/setup-protoc@v3
|
||||
with:
|
||||
repo-token: ${{ secrets.GITHUB_TOKEN }}
|
||||
- uses: dtolnay/rust-toolchain@master
|
||||
- uses: actions-rust-lang/setup-rust-toolchain@v1
|
||||
with:
|
||||
toolchain: ${{ env.RUST_TOOLCHAIN }}
|
||||
components: clippy
|
||||
- name: Rust Cache
|
||||
uses: Swatinem/rust-cache@v2
|
||||
@@ -674,9 +653,8 @@ jobs:
|
||||
with:
|
||||
version: "14.0"
|
||||
- name: Install toolchain
|
||||
uses: dtolnay/rust-toolchain@master
|
||||
uses: actions-rust-lang/setup-rust-toolchain@v1
|
||||
with:
|
||||
toolchain: ${{ env.RUST_TOOLCHAIN }}
|
||||
components: llvm-tools-preview
|
||||
- name: Rust Cache
|
||||
uses: Swatinem/rust-cache@v2
|
||||
|
||||
10
.github/workflows/nightly-ci.yml
vendored
10
.github/workflows/nightly-ci.yml
vendored
@@ -9,9 +9,6 @@ concurrency:
|
||||
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
|
||||
cancel-in-progress: true
|
||||
|
||||
env:
|
||||
RUST_TOOLCHAIN: nightly-2024-06-06
|
||||
|
||||
permissions:
|
||||
issues: write
|
||||
|
||||
@@ -52,9 +49,7 @@ jobs:
|
||||
- uses: arduino/setup-protoc@v3
|
||||
with:
|
||||
repo-token: ${{ secrets.GITHUB_TOKEN }}
|
||||
- uses: dtolnay/rust-toolchain@master
|
||||
with:
|
||||
toolchain: ${{ env.RUST_TOOLCHAIN }}
|
||||
- uses: actions-rust-lang/setup-rust-toolchain@v1
|
||||
- name: Rust Cache
|
||||
uses: Swatinem/rust-cache@v2
|
||||
- name: Run sqlness
|
||||
@@ -85,9 +80,8 @@ jobs:
|
||||
with:
|
||||
version: "14.0"
|
||||
- name: Install Rust toolchain
|
||||
uses: dtolnay/rust-toolchain@master
|
||||
uses: actions-rust-lang/setup-rust-toolchain@v1
|
||||
with:
|
||||
toolchain: ${{ env.RUST_TOOLCHAIN }}
|
||||
components: llvm-tools-preview
|
||||
- name: Rust Cache
|
||||
uses: Swatinem/rust-cache@v2
|
||||
|
||||
@@ -1,6 +1,12 @@
|
||||
name: Release dev-builder images
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- main
|
||||
paths:
|
||||
- rust-toolchain.toml
|
||||
- 'docker/dev-builder/**'
|
||||
workflow_dispatch: # Allows you to run this workflow manually.
|
||||
inputs:
|
||||
release_dev_builder_ubuntu_image:
|
||||
@@ -80,28 +86,46 @@ jobs:
|
||||
shell: bash
|
||||
if: ${{ inputs.release_dev_builder_ubuntu_image }}
|
||||
run: |
|
||||
docker run quay.io/skopeo/stable:latest copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-ubuntu:${{ needs.release-dev-builder-images.outputs.version }} \
|
||||
docker run -v "${DOCKER_CONFIG:-$HOME/.docker}:/root/.docker:ro" \
|
||||
-e "REGISTRY_AUTH_FILE=/root/.docker/config.json" \
|
||||
quay.io/skopeo/stable:latest \
|
||||
copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-ubuntu:${{ needs.release-dev-builder-images.outputs.version }} \
|
||||
docker://${{ vars.ECR_IMAGE_REGISTRY }}/${{ vars.ECR_IMAGE_NAMESPACE }}/dev-builder-ubuntu:${{ needs.release-dev-builder-images.outputs.version }}
|
||||
|
||||
docker run quay.io/skopeo/stable:latest copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-ubuntu:latest \
|
||||
docker run -v "${DOCKER_CONFIG:-$HOME/.docker}:/root/.docker:ro" \
|
||||
-e "REGISTRY_AUTH_FILE=/root/.docker/config.json" \
|
||||
quay.io/skopeo/stable:latest \
|
||||
copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-ubuntu:latest \
|
||||
docker://${{ vars.ECR_IMAGE_REGISTRY }}/${{ vars.ECR_IMAGE_NAMESPACE }}/dev-builder-ubuntu:latest
|
||||
- name: Push dev-builder-centos image
|
||||
shell: bash
|
||||
if: ${{ inputs.release_dev_builder_centos_image }}
|
||||
run: |
|
||||
docker run quay.io/skopeo/stable:latest copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-centos:${{ needs.release-dev-builder-images.outputs.version }} \
|
||||
docker run -v "${DOCKER_CONFIG:-$HOME/.docker}:/root/.docker:ro" \
|
||||
-e "REGISTRY_AUTH_FILE=/root/.docker/config.json" \
|
||||
quay.io/skopeo/stable:latest \
|
||||
copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-centos:${{ needs.release-dev-builder-images.outputs.version }} \
|
||||
docker://${{ vars.ECR_IMAGE_REGISTRY }}/${{ vars.ECR_IMAGE_NAMESPACE }}/dev-builder-centos:${{ needs.release-dev-builder-images.outputs.version }}
|
||||
|
||||
docker run quay.io/skopeo/stable:latest copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-centos:latest \
|
||||
docker run -v "${DOCKER_CONFIG:-$HOME/.docker}:/root/.docker:ro" \
|
||||
-e "REGISTRY_AUTH_FILE=/root/.docker/config.json" \
|
||||
quay.io/skopeo/stable:latest \
|
||||
copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-centos:latest \
|
||||
docker://${{ vars.ECR_IMAGE_REGISTRY }}/${{ vars.ECR_IMAGE_NAMESPACE }}/dev-builder-centos:latest
|
||||
- name: Push dev-builder-android image
|
||||
shell: bash
|
||||
if: ${{ inputs.release_dev_builder_android_image }}
|
||||
run: |
|
||||
docker run quay.io/skopeo/stable:latest copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-android:${{ needs.release-dev-builder-images.outputs.version }} \
|
||||
docker run -v "${DOCKER_CONFIG:-$HOME/.docker}:/root/.docker:ro" \
|
||||
-e "REGISTRY_AUTH_FILE=/root/.docker/config.json" \
|
||||
quay.io/skopeo/stable:latest \
|
||||
copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-android:${{ needs.release-dev-builder-images.outputs.version }} \
|
||||
docker://${{ vars.ECR_IMAGE_REGISTRY }}/${{ vars.ECR_IMAGE_NAMESPACE }}/dev-builder-android:${{ needs.release-dev-builder-images.outputs.version }}
|
||||
|
||||
docker run quay.io/skopeo/stable:latest copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-android:latest \
|
||||
docker run -v "${DOCKER_CONFIG:-$HOME/.docker}:/root/.docker:ro" \
|
||||
-e "REGISTRY_AUTH_FILE=/root/.docker/config.json" \
|
||||
quay.io/skopeo/stable:latest \
|
||||
copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-android:latest \
|
||||
docker://${{ vars.ECR_IMAGE_REGISTRY }}/${{ vars.ECR_IMAGE_NAMESPACE }}/dev-builder-android:latest
|
||||
release-dev-builder-images-cn: # Note: Be careful issue: https://github.com/containers/skopeo/issues/1874 and we decide to use the latest stable skopeo container.
|
||||
name: Release dev builder images to CN region
|
||||
@@ -121,19 +145,28 @@ jobs:
|
||||
shell: bash
|
||||
if: ${{ inputs.release_dev_builder_ubuntu_image }}
|
||||
run: |
|
||||
docker run quay.io/skopeo/stable:latest copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-ubuntu:${{ needs.release-dev-builder-images.outputs.version }} \
|
||||
docker run -v "${DOCKER_CONFIG:-$HOME/.docker}:/root/.docker:ro" \
|
||||
-e "REGISTRY_AUTH_FILE=/root/.docker/config.json" \
|
||||
quay.io/skopeo/stable:latest \
|
||||
copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-ubuntu:${{ needs.release-dev-builder-images.outputs.version }} \
|
||||
docker://${{ vars.ACR_IMAGE_REGISTRY }}/${{ vars.IMAGE_NAMESPACE }}/dev-builder-ubuntu:${{ needs.release-dev-builder-images.outputs.version }}
|
||||
|
||||
- name: Push dev-builder-centos image
|
||||
shell: bash
|
||||
if: ${{ inputs.release_dev_builder_centos_image }}
|
||||
run: |
|
||||
docker run quay.io/skopeo/stable:latest copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-centos:${{ needs.release-dev-builder-images.outputs.version }} \
|
||||
docker run -v "${DOCKER_CONFIG:-$HOME/.docker}:/root/.docker:ro" \
|
||||
-e "REGISTRY_AUTH_FILE=/root/.docker/config.json" \
|
||||
quay.io/skopeo/stable:latest \
|
||||
copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-centos:${{ needs.release-dev-builder-images.outputs.version }} \
|
||||
docker://${{ vars.ACR_IMAGE_REGISTRY }}/${{ vars.IMAGE_NAMESPACE }}/dev-builder-centos:${{ needs.release-dev-builder-images.outputs.version }}
|
||||
|
||||
- name: Push dev-builder-android image
|
||||
shell: bash
|
||||
if: ${{ inputs.release_dev_builder_android_image }}
|
||||
run: |
|
||||
docker run quay.io/skopeo/stable:latest copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-android:${{ needs.release-dev-builder-images.outputs.version }} \
|
||||
docker run -v "${DOCKER_CONFIG:-$HOME/.docker}:/root/.docker:ro" \
|
||||
-e "REGISTRY_AUTH_FILE=/root/.docker/config.json" \
|
||||
quay.io/skopeo/stable:latest \
|
||||
copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-android:${{ needs.release-dev-builder-images.outputs.version }} \
|
||||
docker://${{ vars.ACR_IMAGE_REGISTRY }}/${{ vars.IMAGE_NAMESPACE }}/dev-builder-android:${{ needs.release-dev-builder-images.outputs.version }}
|
||||
|
||||
13
.github/workflows/release.yml
vendored
13
.github/workflows/release.yml
vendored
@@ -82,7 +82,6 @@ on:
|
||||
# Use env variables to control all the release process.
|
||||
env:
|
||||
# The arguments of building greptime.
|
||||
RUST_TOOLCHAIN: nightly-2024-06-06
|
||||
CARGO_PROFILE: nightly
|
||||
|
||||
# Controls whether to run tests, include unit-test, integration-test and sqlness.
|
||||
@@ -99,6 +98,16 @@ permissions:
|
||||
contents: write # Allows the action to create a release.
|
||||
|
||||
jobs:
|
||||
check-builder-rust-version:
|
||||
name: Check rust version in builder
|
||||
runs-on: ubuntu-20.04
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- name: Check Rust toolchain version
|
||||
shell: bash
|
||||
run: |
|
||||
./scripts/check-builder-rust-version.sh
|
||||
|
||||
allocate-runners:
|
||||
name: Allocate runners
|
||||
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
|
||||
@@ -244,7 +253,6 @@ jobs:
|
||||
- uses: ./.github/actions/build-macos-artifacts
|
||||
with:
|
||||
arch: ${{ matrix.arch }}
|
||||
rust-toolchain: ${{ env.RUST_TOOLCHAIN }}
|
||||
cargo-profile: ${{ env.CARGO_PROFILE }}
|
||||
features: ${{ matrix.features }}
|
||||
version: ${{ needs.allocate-runners.outputs.version }}
|
||||
@@ -287,7 +295,6 @@ jobs:
|
||||
- uses: ./.github/actions/build-windows-artifacts
|
||||
with:
|
||||
arch: ${{ matrix.arch }}
|
||||
rust-toolchain: ${{ env.RUST_TOOLCHAIN }}
|
||||
cargo-profile: ${{ env.CARGO_PROFILE }}
|
||||
features: ${{ matrix.features }}
|
||||
version: ${{ needs.allocate-runners.outputs.version }}
|
||||
|
||||
66
Cargo.lock
generated
66
Cargo.lock
generated
@@ -897,6 +897,15 @@ dependencies = [
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "backon"
|
||||
version = "1.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2caf634d05fe0642d0fb1ab43497fa627088ecd93f84b2d0f2a5d7b91f7730db"
|
||||
dependencies = [
|
||||
"fastrand",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "backtrace"
|
||||
version = "0.3.73"
|
||||
@@ -1941,6 +1950,8 @@ dependencies = [
|
||||
"common-version",
|
||||
"datafusion",
|
||||
"datatypes",
|
||||
"geohash",
|
||||
"h3o",
|
||||
"num",
|
||||
"num-traits",
|
||||
"once_cell",
|
||||
@@ -2111,7 +2122,7 @@ version = "0.9.2"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
"backon",
|
||||
"backon 1.0.2",
|
||||
"common-base",
|
||||
"common-error",
|
||||
"common-macro",
|
||||
@@ -3804,6 +3815,12 @@ dependencies = [
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "float_eq"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "28a80e3145d8ad11ba0995949bbcf48b9df2be62772b3d351ef017dff6ecb853"
|
||||
|
||||
[[package]]
|
||||
name = "flow"
|
||||
version = "0.9.2"
|
||||
@@ -4202,6 +4219,27 @@ dependencies = [
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "geo-types"
|
||||
version = "0.7.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9ff16065e5720f376fbced200a5ae0f47ace85fd70b7e54269790281353b6d61"
|
||||
dependencies = [
|
||||
"approx",
|
||||
"num-traits",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "geohash"
|
||||
version = "0.13.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0fb94b1a65401d6cbf22958a9040aa364812c26674f841bee538b12c135db1e6"
|
||||
dependencies = [
|
||||
"geo-types",
|
||||
"libm",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "gethostname"
|
||||
version = "0.2.3"
|
||||
@@ -4292,6 +4330,25 @@ dependencies = [
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "h3o"
|
||||
version = "0.6.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0de3592e1f699692aa0525c42ff7879ec3ee7e36329af20967bc910a1cdc39c7"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"either",
|
||||
"float_eq",
|
||||
"h3o-bit",
|
||||
"libm",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "h3o-bit"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6fb45e8060378c0353781abf67e1917b545a6b710d0342d85b70c125af7ef320"
|
||||
|
||||
[[package]]
|
||||
name = "half"
|
||||
version = "1.8.3"
|
||||
@@ -5817,6 +5874,7 @@ dependencies = [
|
||||
"common-time",
|
||||
"common-wal",
|
||||
"delta-encoding",
|
||||
"derive_builder 0.12.0",
|
||||
"futures",
|
||||
"futures-util",
|
||||
"itertools 0.10.5",
|
||||
@@ -7011,13 +7069,13 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
|
||||
|
||||
[[package]]
|
||||
name = "opendal"
|
||||
version = "0.49.0"
|
||||
version = "0.49.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "39d516adf7db912c38af382c3e92c27cd62fbbc240e630920555d784c2ab1494"
|
||||
checksum = "ba615070686c8781ce97376fdafca29d7c42f47b31d2230d7c8c1642ec823950"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"backon",
|
||||
"backon 0.4.4",
|
||||
"base64 0.22.1",
|
||||
"bytes",
|
||||
"chrono",
|
||||
|
||||
11
Makefile
11
Makefile
@@ -8,6 +8,7 @@ CARGO_BUILD_OPTS := --locked
|
||||
IMAGE_REGISTRY ?= docker.io
|
||||
IMAGE_NAMESPACE ?= greptime
|
||||
IMAGE_TAG ?= latest
|
||||
DEV_BUILDER_IMAGE_TAG ?= 2024-06-06-b4b105ad-20240827021230
|
||||
BUILDX_MULTI_PLATFORM_BUILD ?= false
|
||||
BUILDX_BUILDER_NAME ?= gtbuilder
|
||||
BASE_IMAGE ?= ubuntu
|
||||
@@ -77,7 +78,7 @@ build: ## Build debug version greptime.
|
||||
build-by-dev-builder: ## Build greptime by dev-builder.
|
||||
docker run --network=host \
|
||||
-v ${PWD}:/greptimedb -v ${CARGO_REGISTRY_CACHE}:/root/.cargo/registry \
|
||||
-w /greptimedb ${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/dev-builder-${BASE_IMAGE}:latest \
|
||||
-w /greptimedb ${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/dev-builder-${BASE_IMAGE}:${DEV_BUILDER_IMAGE_TAG} \
|
||||
make build \
|
||||
CARGO_EXTENSION="${CARGO_EXTENSION}" \
|
||||
CARGO_PROFILE=${CARGO_PROFILE} \
|
||||
@@ -91,7 +92,7 @@ build-by-dev-builder: ## Build greptime by dev-builder.
|
||||
build-android-bin: ## Build greptime binary for android.
|
||||
docker run --network=host \
|
||||
-v ${PWD}:/greptimedb -v ${CARGO_REGISTRY_CACHE}:/root/.cargo/registry \
|
||||
-w /greptimedb ${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/dev-builder-android:latest \
|
||||
-w /greptimedb ${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/dev-builder-android:${DEV_BUILDER_IMAGE_TAG} \
|
||||
make build \
|
||||
CARGO_EXTENSION="ndk --platform 23 -t aarch64-linux-android" \
|
||||
CARGO_PROFILE=release \
|
||||
@@ -105,7 +106,7 @@ build-android-bin: ## Build greptime binary for android.
|
||||
strip-android-bin: build-android-bin ## Strip greptime binary for android.
|
||||
docker run --network=host \
|
||||
-v ${PWD}:/greptimedb \
|
||||
-w /greptimedb ${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/dev-builder-android:latest \
|
||||
-w /greptimedb ${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/dev-builder-android:${DEV_BUILDER_IMAGE_TAG} \
|
||||
bash -c '$${NDK_ROOT}/toolchains/llvm/prebuilt/linux-x86_64/bin/llvm-strip --strip-debug /greptimedb/target/aarch64-linux-android/release/greptime'
|
||||
|
||||
.PHONY: clean
|
||||
@@ -145,7 +146,7 @@ dev-builder: multi-platform-buildx ## Build dev-builder image.
|
||||
docker buildx build --builder ${BUILDX_BUILDER_NAME} \
|
||||
--build-arg="RUST_TOOLCHAIN=${RUST_TOOLCHAIN}" \
|
||||
-f docker/dev-builder/${BASE_IMAGE}/Dockerfile \
|
||||
-t ${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/dev-builder-${BASE_IMAGE}:${IMAGE_TAG} ${BUILDX_MULTI_PLATFORM_BUILD_OPTS} .
|
||||
-t ${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/dev-builder-${BASE_IMAGE}:${DEV_BUILDER_IMAGE_TAG} ${BUILDX_MULTI_PLATFORM_BUILD_OPTS} .
|
||||
|
||||
.PHONY: multi-platform-buildx
|
||||
multi-platform-buildx: ## Create buildx multi-platform builder.
|
||||
@@ -203,7 +204,7 @@ stop-etcd: ## Stop single node etcd for testing purpose.
|
||||
run-it-in-container: start-etcd ## Run integration tests in dev-builder.
|
||||
docker run --network=host \
|
||||
-v ${PWD}:/greptimedb -v ${CARGO_REGISTRY_CACHE}:/root/.cargo/registry -v /tmp:/tmp \
|
||||
-w /greptimedb ${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/dev-builder-${BASE_IMAGE}:latest \
|
||||
-w /greptimedb ${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/dev-builder-${BASE_IMAGE}:${DEV_BUILDER_IMAGE_TAG} \
|
||||
make test sqlness-test BUILD_JOBS=${BUILD_JOBS}
|
||||
|
||||
.PHONY: start-cluster
|
||||
|
||||
@@ -160,10 +160,10 @@
|
||||
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
|
||||
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
|
||||
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself |
|
||||
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
|
||||
| `export_metrics.self_import.db` | String | `None` | -- |
|
||||
| `export_metrics.remote_write` | -- | -- | -- |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema`. |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
|
||||
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
|
||||
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
|
||||
| `tracing.tokio_console_addr` | String | `None` | The tokio console address. |
|
||||
@@ -245,10 +245,10 @@
|
||||
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
|
||||
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
|
||||
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself |
|
||||
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
|
||||
| `export_metrics.self_import.db` | String | `None` | -- |
|
||||
| `export_metrics.remote_write` | -- | -- | -- |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema`. |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
|
||||
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
|
||||
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
|
||||
| `tracing.tokio_console_addr` | String | `None` | The tokio console address. |
|
||||
@@ -309,10 +309,10 @@
|
||||
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
|
||||
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
|
||||
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself |
|
||||
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
|
||||
| `export_metrics.self_import.db` | String | `None` | -- |
|
||||
| `export_metrics.remote_write` | -- | -- | -- |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema`. |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
|
||||
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
|
||||
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
|
||||
| `tracing.tokio_console_addr` | String | `None` | The tokio console address. |
|
||||
@@ -333,6 +333,10 @@
|
||||
| `rpc_runtime_size` | Integer | `None` | Deprecated, use `grpc.runtime_size` instead. |
|
||||
| `rpc_max_recv_message_size` | String | `None` | Deprecated, use `grpc.rpc_max_recv_message_size` instead. |
|
||||
| `rpc_max_send_message_size` | String | `None` | Deprecated, use `grpc.rpc_max_send_message_size` instead. |
|
||||
| `http` | -- | -- | The HTTP server options. |
|
||||
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
|
||||
| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
|
||||
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
|
||||
| `grpc` | -- | -- | The gRPC server options. |
|
||||
| `grpc.addr` | String | `127.0.0.1:3001` | The address to bind the gRPC server. |
|
||||
| `grpc.hostname` | String | `127.0.0.1` | The hostname advertised to the metasrv,<br/>and used for connections from outside the host |
|
||||
@@ -453,10 +457,10 @@
|
||||
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
|
||||
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
|
||||
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself |
|
||||
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
|
||||
| `export_metrics.self_import.db` | String | `None` | -- |
|
||||
| `export_metrics.remote_write` | -- | -- | -- |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema`. |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
|
||||
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
|
||||
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
|
||||
| `tracing.tokio_console_addr` | String | `None` | The tokio console address. |
|
||||
|
||||
@@ -39,6 +39,18 @@ rpc_max_recv_message_size = "512MB"
|
||||
## +toml2docs:none-default
|
||||
rpc_max_send_message_size = "512MB"
|
||||
|
||||
|
||||
## The HTTP server options.
|
||||
[http]
|
||||
## The address to bind the HTTP server.
|
||||
addr = "127.0.0.1:4000"
|
||||
## HTTP request timeout. Set to 0 to disable timeout.
|
||||
timeout = "30s"
|
||||
## HTTP request body limit.
|
||||
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
|
||||
## Set to 0 to disable limit.
|
||||
body_limit = "64MB"
|
||||
|
||||
## The gRPC server options.
|
||||
[grpc]
|
||||
## The address to bind the gRPC server.
|
||||
@@ -552,12 +564,13 @@ enable = false
|
||||
write_interval = "30s"
|
||||
|
||||
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
|
||||
## You must create the database before enabling it.
|
||||
[export_metrics.self_import]
|
||||
## +toml2docs:none-default
|
||||
db = "information_schema"
|
||||
db = "greptime_metrics"
|
||||
|
||||
[export_metrics.remote_write]
|
||||
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema`.
|
||||
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
|
||||
url = ""
|
||||
|
||||
## HTTP headers of Prometheus remote-write carry.
|
||||
|
||||
@@ -199,12 +199,13 @@ enable = false
|
||||
write_interval = "30s"
|
||||
|
||||
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
|
||||
## You must create the database before enabling it.
|
||||
[export_metrics.self_import]
|
||||
## +toml2docs:none-default
|
||||
db = "information_schema"
|
||||
db = "greptime_metrics"
|
||||
|
||||
[export_metrics.remote_write]
|
||||
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema`.
|
||||
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
|
||||
url = ""
|
||||
|
||||
## HTTP headers of Prometheus remote-write carry.
|
||||
|
||||
@@ -186,12 +186,13 @@ enable = false
|
||||
write_interval = "30s"
|
||||
|
||||
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
|
||||
## You must create the database before enabling it.
|
||||
[export_metrics.self_import]
|
||||
## +toml2docs:none-default
|
||||
db = "information_schema"
|
||||
db = "greptime_metrics"
|
||||
|
||||
[export_metrics.remote_write]
|
||||
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema`.
|
||||
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
|
||||
url = ""
|
||||
|
||||
## HTTP headers of Prometheus remote-write carry.
|
||||
|
||||
@@ -601,12 +601,13 @@ enable = false
|
||||
write_interval = "30s"
|
||||
|
||||
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
|
||||
## You must create the database before enabling it.
|
||||
[export_metrics.self_import]
|
||||
## +toml2docs:none-default
|
||||
db = "information_schema"
|
||||
db = "greptime_metrics"
|
||||
|
||||
[export_metrics.remote_write]
|
||||
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema`.
|
||||
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
|
||||
url = ""
|
||||
|
||||
## HTTP headers of Prometheus remote-write carry.
|
||||
|
||||
@@ -157,26 +157,6 @@
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"matcher": {
|
||||
"id": "byValue",
|
||||
"options": {
|
||||
"op": "gte",
|
||||
"reducer": "allIsZero",
|
||||
"value": 0
|
||||
}
|
||||
},
|
||||
"properties": [
|
||||
{
|
||||
"id": "custom.hideFrom",
|
||||
"value": {
|
||||
"legend": true,
|
||||
"tooltip": true,
|
||||
"viz": true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
@@ -326,26 +306,6 @@
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"matcher": {
|
||||
"id": "byValue",
|
||||
"options": {
|
||||
"op": "gte",
|
||||
"reducer": "allIsZero",
|
||||
"value": 0
|
||||
}
|
||||
},
|
||||
"properties": [
|
||||
{
|
||||
"id": "custom.hideFrom",
|
||||
"value": {
|
||||
"legend": true,
|
||||
"tooltip": true,
|
||||
"viz": true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
@@ -497,26 +457,6 @@
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"matcher": {
|
||||
"id": "byValue",
|
||||
"options": {
|
||||
"op": "gte",
|
||||
"reducer": "allIsZero",
|
||||
"value": 0
|
||||
}
|
||||
},
|
||||
"properties": [
|
||||
{
|
||||
"id": "custom.hideFrom",
|
||||
"value": {
|
||||
"legend": true,
|
||||
"tooltip": true,
|
||||
"viz": true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
@@ -668,26 +608,6 @@
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"matcher": {
|
||||
"id": "byValue",
|
||||
"options": {
|
||||
"op": "gte",
|
||||
"reducer": "allIsZero",
|
||||
"value": 0
|
||||
}
|
||||
},
|
||||
"properties": [
|
||||
{
|
||||
"id": "custom.hideFrom",
|
||||
"value": {
|
||||
"legend": true,
|
||||
"tooltip": true,
|
||||
"viz": true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
@@ -852,26 +772,6 @@
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"matcher": {
|
||||
"id": "byValue",
|
||||
"options": {
|
||||
"op": "gte",
|
||||
"reducer": "allIsZero",
|
||||
"value": 0
|
||||
}
|
||||
},
|
||||
"properties": [
|
||||
{
|
||||
"id": "custom.hideFrom",
|
||||
"value": {
|
||||
"legend": true,
|
||||
"tooltip": true,
|
||||
"viz": true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
@@ -1023,26 +923,6 @@
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"matcher": {
|
||||
"id": "byValue",
|
||||
"options": {
|
||||
"op": "gte",
|
||||
"reducer": "allIsZero",
|
||||
"value": 0
|
||||
}
|
||||
},
|
||||
"properties": [
|
||||
{
|
||||
"id": "custom.hideFrom",
|
||||
"value": {
|
||||
"legend": true,
|
||||
"tooltip": true,
|
||||
"viz": true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
@@ -1194,26 +1074,6 @@
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"matcher": {
|
||||
"id": "byValue",
|
||||
"options": {
|
||||
"op": "gte",
|
||||
"reducer": "allIsZero",
|
||||
"value": 0
|
||||
}
|
||||
},
|
||||
"properties": [
|
||||
{
|
||||
"id": "custom.hideFrom",
|
||||
"value": {
|
||||
"legend": true,
|
||||
"tooltip": true,
|
||||
"viz": true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
@@ -1365,26 +1225,6 @@
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"matcher": {
|
||||
"id": "byValue",
|
||||
"options": {
|
||||
"op": "gte",
|
||||
"reducer": "allIsZero",
|
||||
"value": 0
|
||||
}
|
||||
},
|
||||
"properties": [
|
||||
{
|
||||
"id": "custom.hideFrom",
|
||||
"value": {
|
||||
"legend": true,
|
||||
"tooltip": true,
|
||||
"viz": true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
@@ -1536,26 +1376,6 @@
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"matcher": {
|
||||
"id": "byValue",
|
||||
"options": {
|
||||
"op": "gte",
|
||||
"reducer": "allIsZero",
|
||||
"value": 0
|
||||
}
|
||||
},
|
||||
"properties": [
|
||||
{
|
||||
"id": "custom.hideFrom",
|
||||
"value": {
|
||||
"legend": true,
|
||||
"tooltip": true,
|
||||
"viz": true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
@@ -1707,26 +1527,6 @@
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"matcher": {
|
||||
"id": "byValue",
|
||||
"options": {
|
||||
"op": "gte",
|
||||
"reducer": "allIsZero",
|
||||
"value": 0
|
||||
}
|
||||
},
|
||||
"properties": [
|
||||
{
|
||||
"id": "custom.hideFrom",
|
||||
"value": {
|
||||
"legend": true,
|
||||
"tooltip": true,
|
||||
"viz": true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
@@ -1878,26 +1678,6 @@
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"matcher": {
|
||||
"id": "byValue",
|
||||
"options": {
|
||||
"op": "gte",
|
||||
"reducer": "allIsZero",
|
||||
"value": 0
|
||||
}
|
||||
},
|
||||
"properties": [
|
||||
{
|
||||
"id": "custom.hideFrom",
|
||||
"value": {
|
||||
"legend": true,
|
||||
"tooltip": true,
|
||||
"viz": true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
@@ -2062,26 +1842,6 @@
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"matcher": {
|
||||
"id": "byValue",
|
||||
"options": {
|
||||
"op": "gte",
|
||||
"reducer": "allIsZero",
|
||||
"value": 0
|
||||
}
|
||||
},
|
||||
"properties": [
|
||||
{
|
||||
"id": "custom.hideFrom",
|
||||
"value": {
|
||||
"legend": true,
|
||||
"tooltip": true,
|
||||
"viz": true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
@@ -2233,26 +1993,6 @@
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"matcher": {
|
||||
"id": "byValue",
|
||||
"options": {
|
||||
"op": "gte",
|
||||
"reducer": "allIsZero",
|
||||
"value": 0
|
||||
}
|
||||
},
|
||||
"properties": [
|
||||
{
|
||||
"id": "custom.hideFrom",
|
||||
"value": {
|
||||
"legend": true,
|
||||
"tooltip": true,
|
||||
"viz": true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
@@ -2417,26 +2157,6 @@
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"matcher": {
|
||||
"id": "byValue",
|
||||
"options": {
|
||||
"op": "gte",
|
||||
"reducer": "allIsZero",
|
||||
"value": 0
|
||||
}
|
||||
},
|
||||
"properties": [
|
||||
{
|
||||
"id": "custom.hideFrom",
|
||||
"value": {
|
||||
"legend": true,
|
||||
"tooltip": true,
|
||||
"viz": true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
@@ -2569,26 +2289,6 @@
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"matcher": {
|
||||
"id": "byValue",
|
||||
"options": {
|
||||
"op": "gte",
|
||||
"reducer": "allIsZero",
|
||||
"value": 0
|
||||
}
|
||||
},
|
||||
"properties": [
|
||||
{
|
||||
"id": "custom.hideFrom",
|
||||
"value": {
|
||||
"legend": true,
|
||||
"tooltip": true,
|
||||
"viz": true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
@@ -2751,26 +2451,6 @@
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"matcher": {
|
||||
"id": "byValue",
|
||||
"options": {
|
||||
"op": "gte",
|
||||
"reducer": "allIsZero",
|
||||
"value": 0
|
||||
}
|
||||
},
|
||||
"properties": [
|
||||
{
|
||||
"id": "custom.hideFrom",
|
||||
"value": {
|
||||
"legend": true,
|
||||
"tooltip": true,
|
||||
"viz": true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
@@ -3091,28 +2771,7 @@
|
||||
},
|
||||
"unit": "s"
|
||||
},
|
||||
"overrides": [
|
||||
{
|
||||
"matcher": {
|
||||
"id": "byValue",
|
||||
"options": {
|
||||
"op": "gte",
|
||||
"reducer": "allIsZero",
|
||||
"value": 0
|
||||
}
|
||||
},
|
||||
"properties": [
|
||||
{
|
||||
"id": "custom.hideFrom",
|
||||
"value": {
|
||||
"legend": true,
|
||||
"tooltip": true,
|
||||
"viz": true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
"overrides": []
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 8,
|
||||
@@ -3242,26 +2901,6 @@
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"matcher": {
|
||||
"id": "byValue",
|
||||
"options": {
|
||||
"op": "gte",
|
||||
"reducer": "allIsZero",
|
||||
"value": 0
|
||||
}
|
||||
},
|
||||
"properties": [
|
||||
{
|
||||
"id": "custom.hideFrom",
|
||||
"value": {
|
||||
"legend": true,
|
||||
"tooltip": true,
|
||||
"viz": true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
@@ -3429,26 +3068,6 @@
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"matcher": {
|
||||
"id": "byValue",
|
||||
"options": {
|
||||
"op": "gte",
|
||||
"reducer": "allIsZero",
|
||||
"value": 0
|
||||
}
|
||||
},
|
||||
"properties": [
|
||||
{
|
||||
"id": "custom.hideFrom",
|
||||
"value": {
|
||||
"legend": true,
|
||||
"tooltip": true,
|
||||
"viz": true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
@@ -3598,26 +3217,6 @@
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"matcher": {
|
||||
"id": "byValue",
|
||||
"options": {
|
||||
"op": "gte",
|
||||
"reducer": "allIsZero",
|
||||
"value": 0
|
||||
}
|
||||
},
|
||||
"properties": [
|
||||
{
|
||||
"id": "custom.hideFrom",
|
||||
"value": {
|
||||
"legend": true,
|
||||
"tooltip": true,
|
||||
"viz": true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
@@ -3740,28 +3339,7 @@
|
||||
},
|
||||
"unit": "s"
|
||||
},
|
||||
"overrides": [
|
||||
{
|
||||
"matcher": {
|
||||
"id": "byValue",
|
||||
"options": {
|
||||
"op": "gte",
|
||||
"reducer": "allIsZero",
|
||||
"value": 0
|
||||
}
|
||||
},
|
||||
"properties": [
|
||||
{
|
||||
"id": "custom.hideFrom",
|
||||
"value": {
|
||||
"legend": true,
|
||||
"tooltip": true,
|
||||
"viz": true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
"overrides": []
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 8,
|
||||
@@ -4101,26 +3679,6 @@
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"matcher": {
|
||||
"id": "byValue",
|
||||
"options": {
|
||||
"op": "gte",
|
||||
"reducer": "allIsZero",
|
||||
"value": 0
|
||||
}
|
||||
},
|
||||
"properties": [
|
||||
{
|
||||
"id": "custom.hideFrom",
|
||||
"value": {
|
||||
"legend": true,
|
||||
"tooltip": true,
|
||||
"viz": true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
@@ -4270,26 +3828,6 @@
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"matcher": {
|
||||
"id": "byValue",
|
||||
"options": {
|
||||
"op": "gte",
|
||||
"reducer": "allIsZero",
|
||||
"value": 0
|
||||
}
|
||||
},
|
||||
"properties": [
|
||||
{
|
||||
"id": "custom.hideFrom",
|
||||
"value": {
|
||||
"legend": true,
|
||||
"tooltip": true,
|
||||
"viz": true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
@@ -4439,26 +3977,6 @@
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"matcher": {
|
||||
"id": "byValue",
|
||||
"options": {
|
||||
"op": "gte",
|
||||
"reducer": "allIsZero",
|
||||
"value": 0
|
||||
}
|
||||
},
|
||||
"properties": [
|
||||
{
|
||||
"id": "custom.hideFrom",
|
||||
"value": {
|
||||
"legend": true,
|
||||
"tooltip": true,
|
||||
"viz": true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
@@ -4608,26 +4126,6 @@
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"matcher": {
|
||||
"id": "byValue",
|
||||
"options": {
|
||||
"op": "gte",
|
||||
"reducer": "allIsZero",
|
||||
"value": 0
|
||||
}
|
||||
},
|
||||
"properties": [
|
||||
{
|
||||
"id": "custom.hideFrom",
|
||||
"value": {
|
||||
"legend": true,
|
||||
"tooltip": true,
|
||||
"viz": true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
@@ -4777,26 +4275,6 @@
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"matcher": {
|
||||
"id": "byValue",
|
||||
"options": {
|
||||
"op": "gte",
|
||||
"reducer": "allIsZero",
|
||||
"value": 0
|
||||
}
|
||||
},
|
||||
"properties": [
|
||||
{
|
||||
"id": "custom.hideFrom",
|
||||
"value": {
|
||||
"legend": true,
|
||||
"tooltip": true,
|
||||
"viz": true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
@@ -4946,26 +4424,6 @@
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"matcher": {
|
||||
"id": "byValue",
|
||||
"options": {
|
||||
"op": "gte",
|
||||
"reducer": "allIsZero",
|
||||
"value": 0
|
||||
}
|
||||
},
|
||||
"properties": [
|
||||
{
|
||||
"id": "custom.hideFrom",
|
||||
"value": {
|
||||
"legend": true,
|
||||
"tooltip": true,
|
||||
"viz": true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
@@ -5115,26 +4573,6 @@
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"matcher": {
|
||||
"id": "byValue",
|
||||
"options": {
|
||||
"op": "gte",
|
||||
"reducer": "allIsZero",
|
||||
"value": 0
|
||||
}
|
||||
},
|
||||
"properties": [
|
||||
{
|
||||
"id": "custom.hideFrom",
|
||||
"value": {
|
||||
"legend": true,
|
||||
"tooltip": true,
|
||||
"viz": true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
@@ -5241,4 +4679,4 @@
|
||||
"uid": "ea35efe5-918e-44fa-9743-e9aa1a340a3f",
|
||||
"version": 11,
|
||||
"weekStart": ""
|
||||
}
|
||||
}
|
||||
42
scripts/check-builder-rust-version.sh
Executable file
42
scripts/check-builder-rust-version.sh
Executable file
@@ -0,0 +1,42 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
set -e
|
||||
|
||||
RUST_TOOLCHAIN_VERSION_FILE="rust-toolchain.toml"
|
||||
DEV_BUILDER_UBUNTU_REGISTRY="docker.io"
|
||||
DEV_BUILDER_UBUNTU_NAMESPACE="greptime"
|
||||
DEV_BUILDER_UBUNTU_NAME="dev-builder-ubuntu"
|
||||
|
||||
function check_rust_toolchain_version() {
|
||||
DEV_BUILDER_IMAGE_TAG=$(grep "DEV_BUILDER_IMAGE_TAG ?= " Makefile | cut -d= -f2 | sed 's/^[ \t]*//')
|
||||
if [ -z "$DEV_BUILDER_IMAGE_TAG" ]; then
|
||||
echo "Error: No DEV_BUILDER_IMAGE_TAG found in Makefile"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
DEV_BUILDER_UBUNTU_IMAGE="$DEV_BUILDER_UBUNTU_REGISTRY/$DEV_BUILDER_UBUNTU_NAMESPACE/$DEV_BUILDER_UBUNTU_NAME:$DEV_BUILDER_IMAGE_TAG"
|
||||
|
||||
CURRENT_VERSION=$(grep -Eo '[0-9]{4}-[0-9]{2}-[0-9]{2}' "$RUST_TOOLCHAIN_VERSION_FILE")
|
||||
if [ -z "$CURRENT_VERSION" ]; then
|
||||
echo "Error: No rust toolchain version found in $RUST_TOOLCHAIN_VERSION_FILE"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
RUST_TOOLCHAIN_VERSION_IN_BUILDER=$(docker run "$DEV_BUILDER_UBUNTU_IMAGE" rustc --version | grep -Eo '[0-9]{4}-[0-9]{2}-[0-9]{2}')
|
||||
if [ -z "$RUST_TOOLCHAIN_VERSION_IN_BUILDER" ]; then
|
||||
echo "Error: No rustc version found in $DEV_BUILDER_UBUNTU_IMAGE"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Compare the version and the difference should be less than 1 day.
|
||||
current_rust_toolchain_seconds=$(date -d "$CURRENT_VERSION" +%s)
|
||||
rust_toolchain_in_dev_builder_ubuntu_seconds=$(date -d "$RUST_TOOLCHAIN_VERSION_IN_BUILDER" +%s)
|
||||
date_diff=$(( (current_rust_toolchain_seconds - rust_toolchain_in_dev_builder_ubuntu_seconds) / 86400 ))
|
||||
|
||||
if [ $date_diff -gt 1 ]; then
|
||||
echo "Error: The rust toolchain '$RUST_TOOLCHAIN_VERSION_IN_BUILDER' in builder '$DEV_BUILDER_UBUNTU_IMAGE' maybe outdated, please update it to '$CURRENT_VERSION'"
|
||||
exit 1
|
||||
fi
|
||||
}
|
||||
|
||||
check_rust_toolchain_version
|
||||
@@ -267,7 +267,7 @@ impl StartCommand {
|
||||
&opts.component.tracing,
|
||||
opts.component.node_id.map(|x| x.to_string()),
|
||||
);
|
||||
log_versions(version(), short_version());
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
|
||||
info!("Datanode start command: {:#?}", self);
|
||||
info!("Datanode options: {:#?}", opts);
|
||||
|
||||
@@ -215,7 +215,7 @@ impl StartCommand {
|
||||
&opts.component.tracing,
|
||||
opts.component.node_id.map(|x| x.to_string()),
|
||||
);
|
||||
log_versions(version(), short_version());
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
|
||||
info!("Flownode start command: {:#?}", self);
|
||||
info!("Flownode options: {:#?}", opts);
|
||||
|
||||
@@ -261,7 +261,7 @@ impl StartCommand {
|
||||
&opts.component.tracing,
|
||||
opts.component.node_id.clone(),
|
||||
);
|
||||
log_versions(version(), short_version());
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
|
||||
info!("Frontend start command: {:#?}", self);
|
||||
info!("Frontend options: {:#?}", opts);
|
||||
|
||||
@@ -30,7 +30,7 @@ pub mod standalone;
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
static ref APP_VERSION: prometheus::IntGaugeVec =
|
||||
prometheus::register_int_gauge_vec!("greptime_app_version", "app version", &["version", "short_version"]).unwrap();
|
||||
prometheus::register_int_gauge_vec!("greptime_app_version", "app version", &["version", "short_version", "app"]).unwrap();
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -76,10 +76,10 @@ pub trait App: Send {
|
||||
/// Log the versions of the application, and the arguments passed to the cli.
|
||||
/// `version` should be the same as the output of cli "--version";
|
||||
/// and the `short_version` is the short version of the codes, often consist of git branch and commit.
|
||||
pub fn log_versions(version: &str, short_version: &str) {
|
||||
pub fn log_versions(version: &str, short_version: &str, app: &str) {
|
||||
// Report app version as gauge.
|
||||
APP_VERSION
|
||||
.with_label_values(&[env!("CARGO_PKG_VERSION"), short_version])
|
||||
.with_label_values(&[env!("CARGO_PKG_VERSION"), short_version, app])
|
||||
.inc();
|
||||
|
||||
// Log version and argument flags.
|
||||
|
||||
@@ -244,7 +244,7 @@ impl StartCommand {
|
||||
&opts.component.tracing,
|
||||
None,
|
||||
);
|
||||
log_versions(version(), short_version());
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
|
||||
info!("Metasrv start command: {:#?}", self);
|
||||
info!("Metasrv options: {:#?}", opts);
|
||||
|
||||
@@ -178,6 +178,16 @@ impl Configurable for StandaloneOptions {
|
||||
}
|
||||
}
|
||||
|
||||
/// The [`StandaloneOptions`] is only defined in cmd crate,
|
||||
/// we don't want to make `frontend` depends on it, so impl [`Into`]
|
||||
/// rather than [`From`].
|
||||
#[allow(clippy::from_over_into)]
|
||||
impl Into<FrontendOptions> for StandaloneOptions {
|
||||
fn into(self) -> FrontendOptions {
|
||||
self.frontend_options()
|
||||
}
|
||||
}
|
||||
|
||||
impl StandaloneOptions {
|
||||
pub fn frontend_options(&self) -> FrontendOptions {
|
||||
let cloned_opts = self.clone();
|
||||
@@ -415,7 +425,7 @@ impl StartCommand {
|
||||
&opts.component.tracing,
|
||||
None,
|
||||
);
|
||||
log_versions(version(), short_version());
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
|
||||
info!("Standalone start command: {:#?}", self);
|
||||
info!("Standalone options: {opts:#?}");
|
||||
@@ -510,7 +520,7 @@ impl StartCommand {
|
||||
.build(),
|
||||
);
|
||||
let wal_options_allocator = Arc::new(WalOptionsAllocator::new(
|
||||
opts.wal.into(),
|
||||
opts.wal.clone().into(),
|
||||
kv_backend.clone(),
|
||||
));
|
||||
let table_meta_allocator = Arc::new(TableMetadataAllocator::new(
|
||||
@@ -533,7 +543,7 @@ impl StartCommand {
|
||||
.await?;
|
||||
|
||||
let mut frontend = FrontendBuilder::new(
|
||||
fe_opts.clone(),
|
||||
fe_opts,
|
||||
kv_backend.clone(),
|
||||
layered_cache_registry.clone(),
|
||||
catalog_manager.clone(),
|
||||
@@ -561,7 +571,7 @@ impl StartCommand {
|
||||
|
||||
let (tx, _rx) = broadcast::channel(1);
|
||||
|
||||
let servers = Services::new(fe_opts, Arc::new(frontend.clone()), plugins)
|
||||
let servers = Services::new(opts, Arc::new(frontend.clone()), plugins)
|
||||
.build()
|
||||
.await
|
||||
.context(StartFrontendSnafu)?;
|
||||
|
||||
@@ -218,6 +218,7 @@ fn test_load_standalone_example_config() {
|
||||
sst_meta_cache_size: ReadableSize::mb(128),
|
||||
vector_cache_size: ReadableSize::mb(512),
|
||||
page_cache_size: ReadableSize::mb(512),
|
||||
selector_result_cache_size: ReadableSize::mb(512),
|
||||
max_background_jobs: 4,
|
||||
experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
|
||||
..Default::default()
|
||||
|
||||
@@ -7,6 +7,10 @@ license.workspace = true
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[features]
|
||||
default = ["geo"]
|
||||
geo = ["geohash", "h3o"]
|
||||
|
||||
[dependencies]
|
||||
api.workspace = true
|
||||
arc-swap = "1.0"
|
||||
@@ -23,6 +27,8 @@ common-time.workspace = true
|
||||
common-version.workspace = true
|
||||
datafusion.workspace = true
|
||||
datatypes.workspace = true
|
||||
geohash = { version = "0.13", optional = true }
|
||||
h3o = { version = "0.6", optional = true }
|
||||
num = "0.4"
|
||||
num-traits = "0.2"
|
||||
once_cell.workspace = true
|
||||
|
||||
@@ -116,6 +116,10 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
|
||||
SystemFunction::register(&function_registry);
|
||||
TableFunction::register(&function_registry);
|
||||
|
||||
// Geo functions
|
||||
#[cfg(feature = "geo")]
|
||||
crate::scalars::geo::GeoFunctions::register(&function_registry);
|
||||
|
||||
Arc::new(function_registry)
|
||||
});
|
||||
|
||||
|
||||
@@ -15,6 +15,8 @@
|
||||
pub mod aggregate;
|
||||
pub(crate) mod date;
|
||||
pub mod expression;
|
||||
#[cfg(feature = "geo")]
|
||||
pub mod geo;
|
||||
pub mod matches;
|
||||
pub mod math;
|
||||
pub mod numpy;
|
||||
|
||||
31
src/common/function/src/scalars/geo.rs
Normal file
31
src/common/function/src/scalars/geo.rs
Normal file
@@ -0,0 +1,31 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
mod geohash;
|
||||
mod h3;
|
||||
|
||||
use geohash::GeohashFunction;
|
||||
use h3::H3Function;
|
||||
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
|
||||
pub(crate) struct GeoFunctions;
|
||||
|
||||
impl GeoFunctions {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register(Arc::new(GeohashFunction));
|
||||
registry.register(Arc::new(H3Function));
|
||||
}
|
||||
}
|
||||
135
src/common/function/src/scalars/geo/geohash.rs
Normal file
135
src/common/function/src/scalars/geo/geohash.rs
Normal file
@@ -0,0 +1,135 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt;
|
||||
|
||||
use common_error::ext::{BoxedError, PlainError};
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_query::error::{self, InvalidFuncArgsSnafu, Result};
|
||||
use common_query::prelude::{Signature, TypeSignature};
|
||||
use datafusion::logical_expr::Volatility;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::scalars::ScalarVectorBuilder;
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::{MutableVector, StringVectorBuilder, VectorRef};
|
||||
use geohash::Coord;
|
||||
use snafu::{ensure, ResultExt};
|
||||
|
||||
use crate::function::{Function, FunctionContext};
|
||||
|
||||
/// Function that return geohash string for a given geospatial coordinate.
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct GeohashFunction;
|
||||
|
||||
const NAME: &str = "geohash";
|
||||
|
||||
impl Function for GeohashFunction {
|
||||
fn name(&self) -> &str {
|
||||
NAME
|
||||
}
|
||||
|
||||
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
|
||||
Ok(ConcreteDataType::string_datatype())
|
||||
}
|
||||
|
||||
fn signature(&self) -> Signature {
|
||||
let mut signatures = Vec::new();
|
||||
for coord_type in &[
|
||||
ConcreteDataType::float32_datatype(),
|
||||
ConcreteDataType::float64_datatype(),
|
||||
] {
|
||||
for resolution_type in &[
|
||||
ConcreteDataType::int8_datatype(),
|
||||
ConcreteDataType::int16_datatype(),
|
||||
ConcreteDataType::int32_datatype(),
|
||||
ConcreteDataType::int64_datatype(),
|
||||
ConcreteDataType::uint8_datatype(),
|
||||
ConcreteDataType::uint16_datatype(),
|
||||
ConcreteDataType::uint32_datatype(),
|
||||
ConcreteDataType::uint64_datatype(),
|
||||
] {
|
||||
signatures.push(TypeSignature::Exact(vec![
|
||||
// latitude
|
||||
coord_type.clone(),
|
||||
// longitude
|
||||
coord_type.clone(),
|
||||
// resolution
|
||||
resolution_type.clone(),
|
||||
]));
|
||||
}
|
||||
}
|
||||
Signature::one_of(signatures, Volatility::Stable)
|
||||
}
|
||||
|
||||
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
|
||||
ensure!(
|
||||
columns.len() == 3,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect 3, provided : {}",
|
||||
columns.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let lat_vec = &columns[0];
|
||||
let lon_vec = &columns[1];
|
||||
let resolution_vec = &columns[2];
|
||||
|
||||
let size = lat_vec.len();
|
||||
let mut results = StringVectorBuilder::with_capacity(size);
|
||||
|
||||
for i in 0..size {
|
||||
let lat = lat_vec.get(i).as_f64_lossy();
|
||||
let lon = lon_vec.get(i).as_f64_lossy();
|
||||
let r = match resolution_vec.get(i) {
|
||||
Value::Int8(v) => v as usize,
|
||||
Value::Int16(v) => v as usize,
|
||||
Value::Int32(v) => v as usize,
|
||||
Value::Int64(v) => v as usize,
|
||||
Value::UInt8(v) => v as usize,
|
||||
Value::UInt16(v) => v as usize,
|
||||
Value::UInt32(v) => v as usize,
|
||||
Value::UInt64(v) => v as usize,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
let result = match (lat, lon) {
|
||||
(Some(lat), Some(lon)) => {
|
||||
let coord = Coord { x: lon, y: lat };
|
||||
let encoded = geohash::encode(coord, r)
|
||||
.map_err(|e| {
|
||||
BoxedError::new(PlainError::new(
|
||||
format!("Geohash error: {}", e),
|
||||
StatusCode::EngineExecuteQuery,
|
||||
))
|
||||
})
|
||||
.context(error::ExecuteSnafu)?;
|
||||
Some(encoded)
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
|
||||
results.push(result.as_deref());
|
||||
}
|
||||
|
||||
Ok(results.to_vector())
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for GeohashFunction {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{}", NAME)
|
||||
}
|
||||
}
|
||||
143
src/common/function/src/scalars/geo/h3.rs
Normal file
143
src/common/function/src/scalars/geo/h3.rs
Normal file
@@ -0,0 +1,143 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt;
|
||||
|
||||
use common_error::ext::{BoxedError, PlainError};
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_query::error::{self, InvalidFuncArgsSnafu, Result};
|
||||
use common_query::prelude::{Signature, TypeSignature};
|
||||
use datafusion::logical_expr::Volatility;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::scalars::ScalarVectorBuilder;
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::{MutableVector, StringVectorBuilder, VectorRef};
|
||||
use h3o::{LatLng, Resolution};
|
||||
use snafu::{ensure, ResultExt};
|
||||
|
||||
use crate::function::{Function, FunctionContext};
|
||||
|
||||
/// Function that return h3 encoding string for a given geospatial coordinate.
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct H3Function;
|
||||
|
||||
const NAME: &str = "h3";
|
||||
|
||||
impl Function for H3Function {
|
||||
fn name(&self) -> &str {
|
||||
NAME
|
||||
}
|
||||
|
||||
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
|
||||
Ok(ConcreteDataType::string_datatype())
|
||||
}
|
||||
|
||||
fn signature(&self) -> Signature {
|
||||
let mut signatures = Vec::new();
|
||||
for coord_type in &[
|
||||
ConcreteDataType::float32_datatype(),
|
||||
ConcreteDataType::float64_datatype(),
|
||||
] {
|
||||
for resolution_type in &[
|
||||
ConcreteDataType::int8_datatype(),
|
||||
ConcreteDataType::int16_datatype(),
|
||||
ConcreteDataType::int32_datatype(),
|
||||
ConcreteDataType::int64_datatype(),
|
||||
ConcreteDataType::uint8_datatype(),
|
||||
ConcreteDataType::uint16_datatype(),
|
||||
ConcreteDataType::uint32_datatype(),
|
||||
ConcreteDataType::uint64_datatype(),
|
||||
] {
|
||||
signatures.push(TypeSignature::Exact(vec![
|
||||
// latitude
|
||||
coord_type.clone(),
|
||||
// longitude
|
||||
coord_type.clone(),
|
||||
// resolution
|
||||
resolution_type.clone(),
|
||||
]));
|
||||
}
|
||||
}
|
||||
Signature::one_of(signatures, Volatility::Stable)
|
||||
}
|
||||
|
||||
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
|
||||
ensure!(
|
||||
columns.len() == 3,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect 3, provided : {}",
|
||||
columns.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let lat_vec = &columns[0];
|
||||
let lon_vec = &columns[1];
|
||||
let resolution_vec = &columns[2];
|
||||
|
||||
let size = lat_vec.len();
|
||||
let mut results = StringVectorBuilder::with_capacity(size);
|
||||
|
||||
for i in 0..size {
|
||||
let lat = lat_vec.get(i).as_f64_lossy();
|
||||
let lon = lon_vec.get(i).as_f64_lossy();
|
||||
let r = match resolution_vec.get(i) {
|
||||
Value::Int8(v) => v as u8,
|
||||
Value::Int16(v) => v as u8,
|
||||
Value::Int32(v) => v as u8,
|
||||
Value::Int64(v) => v as u8,
|
||||
Value::UInt8(v) => v,
|
||||
Value::UInt16(v) => v as u8,
|
||||
Value::UInt32(v) => v as u8,
|
||||
Value::UInt64(v) => v as u8,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
let result = match (lat, lon) {
|
||||
(Some(lat), Some(lon)) => {
|
||||
let coord = LatLng::new(lat, lon)
|
||||
.map_err(|e| {
|
||||
BoxedError::new(PlainError::new(
|
||||
format!("H3 error: {}", e),
|
||||
StatusCode::EngineExecuteQuery,
|
||||
))
|
||||
})
|
||||
.context(error::ExecuteSnafu)?;
|
||||
let r = Resolution::try_from(r as u8)
|
||||
.map_err(|e| {
|
||||
BoxedError::new(PlainError::new(
|
||||
format!("H3 error: {}", e),
|
||||
StatusCode::EngineExecuteQuery,
|
||||
))
|
||||
})
|
||||
.context(error::ExecuteSnafu)?;
|
||||
let encoded = coord.to_cell(r).to_string();
|
||||
Some(encoded)
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
|
||||
results.push(result.as_deref());
|
||||
}
|
||||
|
||||
Ok(results.to_vector())
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for H3Function {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{}", NAME)
|
||||
}
|
||||
}
|
||||
@@ -153,6 +153,9 @@ pub struct UpgradeRegion {
|
||||
/// it's helpful to verify whether the leader region is ready.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub wait_for_replay_timeout: Option<Duration>,
|
||||
/// The hint for replaying memtable.
|
||||
#[serde(default)]
|
||||
pub location_id: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
|
||||
@@ -13,7 +13,7 @@ workspace = true
|
||||
[dependencies]
|
||||
async-stream.workspace = true
|
||||
async-trait.workspace = true
|
||||
backon = "0.4"
|
||||
backon = "1"
|
||||
common-base.workspace = true
|
||||
common-error.workspace = true
|
||||
common-macro.workspace = true
|
||||
|
||||
@@ -373,7 +373,7 @@ impl Runner {
|
||||
procedure,
|
||||
manager_ctx: self.manager_ctx.clone(),
|
||||
step,
|
||||
exponential_builder: self.exponential_builder.clone(),
|
||||
exponential_builder: self.exponential_builder,
|
||||
store: self.store.clone(),
|
||||
rolling_back: false,
|
||||
};
|
||||
|
||||
@@ -172,12 +172,13 @@ impl ErrorExt for Error {
|
||||
|
||||
Error::DataTypes { .. }
|
||||
| Error::CreateRecordBatches { .. }
|
||||
| Error::PollStream { .. }
|
||||
| Error::Format { .. }
|
||||
| Error::ToArrowScalar { .. }
|
||||
| Error::ProjectArrowRecordBatch { .. }
|
||||
| Error::PhysicalExpr { .. } => StatusCode::Internal,
|
||||
|
||||
Error::PollStream { .. } => StatusCode::EngineExecuteQuery,
|
||||
|
||||
Error::ArrowCompute { .. } => StatusCode::IllegalState,
|
||||
|
||||
Error::ColumnNotExists { .. } => StatusCode::TableColumnNotFound,
|
||||
|
||||
@@ -206,6 +206,7 @@ mod tests {
|
||||
region_id,
|
||||
last_entry_id: None,
|
||||
wait_for_replay_timeout: None,
|
||||
location_id: None,
|
||||
});
|
||||
assert!(
|
||||
heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction)))
|
||||
|
||||
@@ -27,6 +27,7 @@ impl HandlerContext {
|
||||
region_id,
|
||||
last_entry_id,
|
||||
wait_for_replay_timeout,
|
||||
location_id,
|
||||
}: UpgradeRegion,
|
||||
) -> BoxFuture<'static, InstructionReply> {
|
||||
Box::pin(async move {
|
||||
@@ -62,6 +63,7 @@ impl HandlerContext {
|
||||
RegionRequest::Catchup(RegionCatchupRequest {
|
||||
set_writable: true,
|
||||
entry_id: last_entry_id,
|
||||
location_id,
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
@@ -151,6 +153,7 @@ mod tests {
|
||||
region_id,
|
||||
last_entry_id: None,
|
||||
wait_for_replay_timeout,
|
||||
location_id: None,
|
||||
})
|
||||
.await;
|
||||
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
|
||||
@@ -191,6 +194,7 @@ mod tests {
|
||||
region_id,
|
||||
last_entry_id: None,
|
||||
wait_for_replay_timeout,
|
||||
location_id: None,
|
||||
})
|
||||
.await;
|
||||
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
|
||||
@@ -232,6 +236,7 @@ mod tests {
|
||||
region_id,
|
||||
last_entry_id: None,
|
||||
wait_for_replay_timeout,
|
||||
location_id: None,
|
||||
})
|
||||
.await;
|
||||
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
|
||||
@@ -274,8 +279,9 @@ mod tests {
|
||||
.clone()
|
||||
.handle_upgrade_region_instruction(UpgradeRegion {
|
||||
region_id,
|
||||
last_entry_id: None,
|
||||
wait_for_replay_timeout,
|
||||
last_entry_id: None,
|
||||
location_id: None,
|
||||
})
|
||||
.await;
|
||||
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
|
||||
@@ -293,6 +299,7 @@ mod tests {
|
||||
region_id,
|
||||
last_entry_id: None,
|
||||
wait_for_replay_timeout: Some(Duration::from_millis(500)),
|
||||
location_id: None,
|
||||
})
|
||||
.await;
|
||||
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
|
||||
@@ -337,6 +344,7 @@ mod tests {
|
||||
region_id,
|
||||
last_entry_id: None,
|
||||
wait_for_replay_timeout: None,
|
||||
location_id: None,
|
||||
})
|
||||
.await;
|
||||
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
|
||||
@@ -354,6 +362,7 @@ mod tests {
|
||||
region_id,
|
||||
last_entry_id: None,
|
||||
wait_for_replay_timeout: Some(Duration::from_millis(200)),
|
||||
location_id: None,
|
||||
})
|
||||
.await;
|
||||
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
|
||||
|
||||
@@ -268,6 +268,23 @@ impl Value {
|
||||
}
|
||||
}
|
||||
|
||||
/// Cast Value to f32. Return None if it's not castable;
|
||||
pub fn as_f64_lossy(&self) -> Option<f64> {
|
||||
match self {
|
||||
Value::Float32(v) => Some(v.0 as _),
|
||||
Value::Float64(v) => Some(v.0),
|
||||
Value::Int8(v) => Some(*v as _),
|
||||
Value::Int16(v) => Some(*v as _),
|
||||
Value::Int32(v) => Some(*v as _),
|
||||
Value::Int64(v) => Some(*v as _),
|
||||
Value::UInt8(v) => Some(*v as _),
|
||||
Value::UInt16(v) => Some(*v as _),
|
||||
Value::UInt32(v) => Some(*v as _),
|
||||
Value::UInt64(v) => Some(*v as _),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the logical type of the value.
|
||||
pub fn logical_type_id(&self) -> LogicalTypeId {
|
||||
match self {
|
||||
|
||||
@@ -26,6 +26,7 @@ common-telemetry.workspace = true
|
||||
common-time.workspace = true
|
||||
common-wal.workspace = true
|
||||
delta-encoding = "0.4"
|
||||
derive_builder.workspace = true
|
||||
futures.workspace = true
|
||||
futures-util.workspace = true
|
||||
itertools.workspace = true
|
||||
|
||||
@@ -304,6 +304,15 @@ pub enum Error {
|
||||
error: object_store::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to read index, path: {path}"))]
|
||||
ReadIndex {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
#[snafu(source)]
|
||||
error: object_store::Error,
|
||||
path: String,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"The length of meta if exceeded the limit: {}, actual: {}",
|
||||
limit,
|
||||
|
||||
@@ -14,15 +14,10 @@
|
||||
|
||||
pub(crate) mod client_manager;
|
||||
pub(crate) mod consumer;
|
||||
/// TODO(weny): remove it.
|
||||
#[allow(dead_code)]
|
||||
#[allow(unused_imports)]
|
||||
pub(crate) mod index;
|
||||
pub mod log_store;
|
||||
pub(crate) mod producer;
|
||||
pub(crate) mod util;
|
||||
/// TODO(weny): remove it.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) mod worker;
|
||||
|
||||
pub use index::{default_index_file, GlobalIndexCollector};
|
||||
|
||||
@@ -19,6 +19,7 @@ use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use common_telemetry::debug;
|
||||
use derive_builder::Builder;
|
||||
use futures::future::{BoxFuture, Fuse, FusedFuture};
|
||||
use futures::{FutureExt, Stream};
|
||||
use pin_project::pin_project;
|
||||
@@ -60,40 +61,61 @@ struct FetchResult {
|
||||
used_offset: i64,
|
||||
}
|
||||
|
||||
const MAX_BATCH_SIZE: usize = 52428800;
|
||||
const AVG_RECORD_SIZE: usize = 256 * 1024;
|
||||
|
||||
/// The [`Consumer`] struct represents a Kafka consumer that fetches messages from
|
||||
/// a Kafka cluster. Yielding records respecting the [`RegionWalIndexIterator`].
|
||||
#[pin_project]
|
||||
#[derive(Builder)]
|
||||
#[builder(pattern = "owned")]
|
||||
pub struct Consumer {
|
||||
#[builder(default = "-1")]
|
||||
last_high_watermark: i64,
|
||||
|
||||
/// The client is used to fetch records from kafka topic.
|
||||
client: Arc<dyn FetchClient>,
|
||||
|
||||
/// The max batch size in a single fetch request.
|
||||
#[builder(default = "MAX_BATCH_SIZE")]
|
||||
max_batch_size: usize,
|
||||
|
||||
/// The max wait milliseconds.
|
||||
#[builder(default = "500")]
|
||||
max_wait_ms: u32,
|
||||
|
||||
/// The avg record size
|
||||
#[builder(default = "AVG_RECORD_SIZE")]
|
||||
avg_record_size: usize,
|
||||
|
||||
/// Termination flag
|
||||
#[builder(default = "false")]
|
||||
terminated: bool,
|
||||
|
||||
/// The buffer of records.
|
||||
buffer: RecordsBuffer,
|
||||
|
||||
/// The fetch future.
|
||||
#[builder(default = "Fuse::terminated()")]
|
||||
fetch_fut: Fuse<BoxFuture<'static, rskafka::client::error::Result<FetchResult>>>,
|
||||
}
|
||||
|
||||
struct RecordsBuffer {
|
||||
pub(crate) struct RecordsBuffer {
|
||||
buffer: VecDeque<RecordAndOffset>,
|
||||
|
||||
index: Box<dyn RegionWalIndexIterator>,
|
||||
}
|
||||
|
||||
impl RecordsBuffer {
|
||||
/// Creates an empty [`RecordsBuffer`]
|
||||
pub fn new(index: Box<dyn RegionWalIndexIterator>) -> Self {
|
||||
RecordsBuffer {
|
||||
buffer: VecDeque::new(),
|
||||
index,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RecordsBuffer {
|
||||
fn pop_front(&mut self) -> Option<RecordAndOffset> {
|
||||
while let Some(index) = self.index.peek() {
|
||||
|
||||
@@ -20,10 +20,11 @@ pub use collector::GlobalIndexCollector;
|
||||
pub(crate) use collector::{IndexCollector, NoopCollector};
|
||||
pub(crate) use encoder::{IndexEncoder, JsonIndexEncoder};
|
||||
pub(crate) use iterator::{
|
||||
MultipleRegionWalIndexIterator, NextBatchHint, RegionWalIndexIterator, RegionWalRange,
|
||||
RegionWalVecIndex,
|
||||
build_region_wal_index_iterator, NextBatchHint, RegionWalIndexIterator, MIN_BATCH_WINDOW_SIZE,
|
||||
};
|
||||
#[cfg(test)]
|
||||
pub(crate) use iterator::{MultipleRegionWalIndexIterator, RegionWalRange, RegionWalVecIndex};
|
||||
|
||||
pub fn default_index_file(datanode_id: u64) -> String {
|
||||
format!("__datanode/{datanode_id}/index.json")
|
||||
pub fn default_index_file(location_id: u64) -> String {
|
||||
format!("__wal/{location_id}/index.json")
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ use std::time::Duration;
|
||||
|
||||
use common_telemetry::{error, info};
|
||||
use futures::future::try_join_all;
|
||||
use object_store::ErrorKind;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::ResultExt;
|
||||
use store_api::logstore::provider::KafkaProvider;
|
||||
@@ -28,8 +29,9 @@ use tokio::select;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::sync::Mutex as TokioMutex;
|
||||
|
||||
use super::default_index_file;
|
||||
use crate::error::{self, Result};
|
||||
use crate::kafka::index::encoder::IndexEncoder;
|
||||
use crate::kafka::index::encoder::{DatanodeWalIndexes, IndexEncoder};
|
||||
use crate::kafka::index::JsonIndexEncoder;
|
||||
use crate::kafka::worker::{DumpIndexRequest, TruncateIndexRequest, WorkerRequest};
|
||||
|
||||
@@ -52,10 +54,11 @@ pub trait IndexCollector: Send + Sync {
|
||||
|
||||
/// The [`GlobalIndexCollector`] struct is responsible for managing index entries
|
||||
/// across multiple providers.
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug)]
|
||||
pub struct GlobalIndexCollector {
|
||||
providers: Arc<TokioMutex<HashMap<Arc<KafkaProvider>, Sender<WorkerRequest>>>>,
|
||||
task: CollectionTask,
|
||||
operator: object_store::ObjectStore,
|
||||
_handle: CollectionTaskHandle,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -103,7 +106,7 @@ impl CollectionTask {
|
||||
/// The background task performs two main operations:
|
||||
/// - Persists the WAL index to the specified `path` at every `dump_index_interval`.
|
||||
/// - Updates the latest index ID for each WAL provider at every `checkpoint_interval`.
|
||||
fn run(&self) {
|
||||
fn run(self) -> CollectionTaskHandle {
|
||||
let mut dump_index_interval = tokio::time::interval(self.dump_index_interval);
|
||||
let running = self.running.clone();
|
||||
let moved_self = self.clone();
|
||||
@@ -122,15 +125,23 @@ impl CollectionTask {
|
||||
}
|
||||
}
|
||||
});
|
||||
CollectionTaskHandle {
|
||||
running: self.running.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for CollectionTask {
|
||||
impl Drop for CollectionTaskHandle {
|
||||
fn drop(&mut self) {
|
||||
self.running.store(false, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct CollectionTaskHandle {
|
||||
running: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl GlobalIndexCollector {
|
||||
/// Constructs a [`GlobalIndexCollector`].
|
||||
///
|
||||
@@ -148,16 +159,65 @@ impl GlobalIndexCollector {
|
||||
let task = CollectionTask {
|
||||
providers: providers.clone(),
|
||||
dump_index_interval,
|
||||
operator,
|
||||
operator: operator.clone(),
|
||||
path,
|
||||
running: Arc::new(AtomicBool::new(true)),
|
||||
};
|
||||
task.run();
|
||||
Self { providers, task }
|
||||
let handle = task.run();
|
||||
Self {
|
||||
providers,
|
||||
operator,
|
||||
_handle: handle,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn new_for_test(operator: object_store::ObjectStore) -> Self {
|
||||
Self {
|
||||
providers: Default::default(),
|
||||
operator,
|
||||
_handle: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl GlobalIndexCollector {
|
||||
/// Retrieve [`EntryId`]s for a specified `region_id` in `datanode_id`
|
||||
/// that are greater than or equal to a given `entry_id`.
|
||||
pub(crate) async fn read_remote_region_index(
|
||||
&self,
|
||||
location_id: u64,
|
||||
provider: &KafkaProvider,
|
||||
region_id: RegionId,
|
||||
entry_id: EntryId,
|
||||
) -> Result<Option<(BTreeSet<EntryId>, EntryId)>> {
|
||||
let path = default_index_file(location_id);
|
||||
|
||||
let bytes = match self.operator.read(&path).await {
|
||||
Ok(bytes) => bytes.to_vec(),
|
||||
Err(err) => {
|
||||
if err.kind() == ErrorKind::NotFound {
|
||||
return Ok(None);
|
||||
} else {
|
||||
return Err(err).context(error::ReadIndexSnafu { path });
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
match DatanodeWalIndexes::decode(&bytes)?.provider(provider) {
|
||||
Some(indexes) => {
|
||||
let last_index = indexes.last_index();
|
||||
let indexes = indexes
|
||||
.region(region_id)
|
||||
.unwrap_or_default()
|
||||
.split_off(&entry_id);
|
||||
|
||||
Ok(Some((indexes, last_index)))
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new [`ProviderLevelIndexCollector`] for a specified provider.
|
||||
pub(crate) async fn provider_level_index_collector(
|
||||
&self,
|
||||
@@ -266,3 +326,92 @@ impl IndexCollector for NoopCollector {
|
||||
|
||||
fn dump(&mut self, _encoder: &dyn IndexEncoder) {}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::{BTreeSet, HashMap};
|
||||
|
||||
use store_api::logstore::provider::KafkaProvider;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::kafka::index::collector::RegionIndexes;
|
||||
use crate::kafka::index::encoder::IndexEncoder;
|
||||
use crate::kafka::index::JsonIndexEncoder;
|
||||
use crate::kafka::{default_index_file, GlobalIndexCollector};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_remote_region_index() {
|
||||
let operator = object_store::ObjectStore::new(object_store::services::Memory::default())
|
||||
.unwrap()
|
||||
.finish();
|
||||
|
||||
let path = default_index_file(0);
|
||||
let encoder = JsonIndexEncoder::default();
|
||||
encoder.encode(
|
||||
&KafkaProvider::new("my_topic_0".to_string()),
|
||||
&RegionIndexes {
|
||||
regions: HashMap::from([(RegionId::new(1, 1), BTreeSet::from([1, 5, 15]))]),
|
||||
latest_entry_id: 20,
|
||||
},
|
||||
);
|
||||
let bytes = encoder.finish().unwrap();
|
||||
let mut writer = operator.writer(&path).await.unwrap();
|
||||
writer.write(bytes).await.unwrap();
|
||||
writer.close().await.unwrap();
|
||||
|
||||
let collector = GlobalIndexCollector::new_for_test(operator.clone());
|
||||
// Index file doesn't exist
|
||||
let result = collector
|
||||
.read_remote_region_index(
|
||||
1,
|
||||
&KafkaProvider::new("my_topic_0".to_string()),
|
||||
RegionId::new(1, 1),
|
||||
1,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(result.is_none());
|
||||
|
||||
// RegionId doesn't exist
|
||||
let (indexes, last_index) = collector
|
||||
.read_remote_region_index(
|
||||
0,
|
||||
&KafkaProvider::new("my_topic_0".to_string()),
|
||||
RegionId::new(1, 2),
|
||||
5,
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(indexes, BTreeSet::new());
|
||||
assert_eq!(last_index, 20);
|
||||
|
||||
// RegionId(1, 1), Start EntryId: 5
|
||||
let (indexes, last_index) = collector
|
||||
.read_remote_region_index(
|
||||
0,
|
||||
&KafkaProvider::new("my_topic_0".to_string()),
|
||||
RegionId::new(1, 1),
|
||||
5,
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(indexes, BTreeSet::from([5, 15]));
|
||||
assert_eq!(last_index, 20);
|
||||
|
||||
// RegionId(1, 1), Start EntryId: 20
|
||||
let (indexes, last_index) = collector
|
||||
.read_remote_region_index(
|
||||
0,
|
||||
&KafkaProvider::new("my_topic_0".to_string()),
|
||||
RegionId::new(1, 1),
|
||||
20,
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(indexes, BTreeSet::new());
|
||||
assert_eq!(last_index, 20);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,7 +50,7 @@ pub struct DeltaEncodedRegionIndexes {
|
||||
|
||||
impl DeltaEncodedRegionIndexes {
|
||||
/// Retrieves the original (decoded) index values for a given region.
|
||||
fn region(&self, region_id: RegionId) -> Option<BTreeSet<u64>> {
|
||||
pub(crate) fn region(&self, region_id: RegionId) -> Option<BTreeSet<u64>> {
|
||||
let decoded = self
|
||||
.regions
|
||||
.get(®ion_id)
|
||||
@@ -60,7 +60,7 @@ impl DeltaEncodedRegionIndexes {
|
||||
}
|
||||
|
||||
/// Retrieves the last index.
|
||||
fn last_index(&self) -> u64 {
|
||||
pub(crate) fn last_index(&self) -> u64 {
|
||||
self.last_index
|
||||
}
|
||||
}
|
||||
@@ -86,7 +86,7 @@ impl DatanodeWalIndexes {
|
||||
value
|
||||
}
|
||||
|
||||
fn decode(byte: &[u8]) -> Result<Self> {
|
||||
pub(crate) fn decode(byte: &[u8]) -> Result<Self> {
|
||||
serde_json::from_slice(byte).context(error::DecodeJsonSnafu)
|
||||
}
|
||||
|
||||
@@ -118,7 +118,7 @@ impl IndexEncoder for JsonIndexEncoder {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::{BTreeSet, HashMap, HashSet};
|
||||
use std::collections::{BTreeSet, HashMap};
|
||||
|
||||
use store_api::logstore::provider::KafkaProvider;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
@@ -12,8 +12,9 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::cmp::min;
|
||||
use std::collections::VecDeque;
|
||||
use std::cmp::{max, min};
|
||||
use std::collections::{BTreeSet, VecDeque};
|
||||
use std::fmt::Debug;
|
||||
use std::ops::Range;
|
||||
|
||||
use store_api::logstore::EntryId;
|
||||
@@ -27,7 +28,7 @@ pub(crate) struct NextBatchHint {
|
||||
}
|
||||
|
||||
/// An iterator over WAL (Write-Ahead Log) entries index for a region.
|
||||
pub trait RegionWalIndexIterator: Send + Sync {
|
||||
pub trait RegionWalIndexIterator: Send + Sync + Debug {
|
||||
/// Returns next batch hint.
|
||||
fn next_batch_hint(&self, avg_size: usize) -> Option<NextBatchHint>;
|
||||
|
||||
@@ -36,9 +37,13 @@ pub trait RegionWalIndexIterator: Send + Sync {
|
||||
|
||||
// Advances the iterator and returns the next EntryId.
|
||||
fn next(&mut self) -> Option<EntryId>;
|
||||
|
||||
#[cfg(test)]
|
||||
fn as_any(&self) -> &dyn std::any::Any;
|
||||
}
|
||||
|
||||
/// Represents a range [next_entry_id, end_entry_id) of WAL entries for a region.
|
||||
#[derive(Debug)]
|
||||
pub struct RegionWalRange {
|
||||
current_entry_id: EntryId,
|
||||
end_entry_id: EntryId,
|
||||
@@ -96,10 +101,18 @@ impl RegionWalIndexIterator for RegionWalRange {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn as_any(&self) -> &dyn std::any::Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
pub const MIN_BATCH_WINDOW_SIZE: usize = 4 * 1024 * 1024;
|
||||
|
||||
/// Represents an index of Write-Ahead Log entries for a region,
|
||||
/// stored as a vector of [EntryId]s.
|
||||
#[derive(Debug)]
|
||||
pub struct RegionWalVecIndex {
|
||||
index: VecDeque<EntryId>,
|
||||
min_batch_window_size: usize,
|
||||
@@ -134,11 +147,17 @@ impl RegionWalIndexIterator for RegionWalVecIndex {
|
||||
fn next(&mut self) -> Option<EntryId> {
|
||||
self.index.pop_front()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn as_any(&self) -> &dyn std::any::Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents an iterator over multiple region WAL indexes.
|
||||
///
|
||||
/// Allowing iteration through multiple WAL indexes.
|
||||
#[derive(Debug)]
|
||||
pub struct MultipleRegionWalIndexIterator {
|
||||
iterator: VecDeque<Box<dyn RegionWalIndexIterator>>,
|
||||
}
|
||||
@@ -185,6 +204,53 @@ impl RegionWalIndexIterator for MultipleRegionWalIndexIterator {
|
||||
|
||||
self.iterator.front_mut().and_then(|iter| iter.next())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn as_any(&self) -> &dyn std::any::Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds [`RegionWalIndexIterator`].
|
||||
///
|
||||
/// Returns None means there are no entries to replay.
|
||||
pub fn build_region_wal_index_iterator(
|
||||
start_entry_id: EntryId,
|
||||
end_entry_id: EntryId,
|
||||
region_indexes: Option<(BTreeSet<EntryId>, EntryId)>,
|
||||
max_batch_bytes: usize,
|
||||
min_window_size: usize,
|
||||
) -> Option<Box<dyn RegionWalIndexIterator>> {
|
||||
if (start_entry_id..end_entry_id).is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
match region_indexes {
|
||||
Some((region_indexes, last_index)) => {
|
||||
if region_indexes.is_empty() && last_index >= end_entry_id {
|
||||
return None;
|
||||
}
|
||||
|
||||
let mut iterator: Vec<Box<dyn RegionWalIndexIterator>> = Vec::with_capacity(2);
|
||||
if !region_indexes.is_empty() {
|
||||
let index = RegionWalVecIndex::new(region_indexes, min_window_size);
|
||||
iterator.push(Box::new(index));
|
||||
}
|
||||
let known_last_index = max(last_index, start_entry_id);
|
||||
if known_last_index < end_entry_id {
|
||||
let range = known_last_index..end_entry_id;
|
||||
let index = RegionWalRange::new(range, max_batch_bytes);
|
||||
iterator.push(Box::new(index));
|
||||
}
|
||||
|
||||
Some(Box::new(MultipleRegionWalIndexIterator::new(iterator)))
|
||||
}
|
||||
None => {
|
||||
let range = start_entry_id..end_entry_id;
|
||||
|
||||
Some(Box::new(RegionWalRange::new(range, max_batch_bytes)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -353,4 +419,69 @@ mod tests {
|
||||
assert_eq!(iter.peek(), None);
|
||||
assert_eq!(iter.next(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_region_wal_index_iterator() {
|
||||
let iterator = build_region_wal_index_iterator(1024, 1024, None, 5, 5);
|
||||
assert!(iterator.is_none());
|
||||
|
||||
let iterator = build_region_wal_index_iterator(1024, 1023, None, 5, 5);
|
||||
assert!(iterator.is_none());
|
||||
|
||||
let iterator =
|
||||
build_region_wal_index_iterator(1024, 1024, Some((BTreeSet::new(), 1024)), 5, 5);
|
||||
assert!(iterator.is_none());
|
||||
|
||||
let iterator =
|
||||
build_region_wal_index_iterator(1, 1024, Some((BTreeSet::new(), 1024)), 5, 5);
|
||||
assert!(iterator.is_none());
|
||||
|
||||
let iterator =
|
||||
build_region_wal_index_iterator(1, 1024, Some((BTreeSet::new(), 1025)), 5, 5);
|
||||
assert!(iterator.is_none());
|
||||
|
||||
let iterator = build_region_wal_index_iterator(
|
||||
1,
|
||||
1024,
|
||||
Some((BTreeSet::from([512, 756]), 1024)),
|
||||
5,
|
||||
5,
|
||||
)
|
||||
.unwrap();
|
||||
let iter = iterator
|
||||
.as_any()
|
||||
.downcast_ref::<MultipleRegionWalIndexIterator>()
|
||||
.unwrap();
|
||||
assert_eq!(iter.iterator.len(), 1);
|
||||
let vec_index = iter.iterator[0]
|
||||
.as_any()
|
||||
.downcast_ref::<RegionWalVecIndex>()
|
||||
.unwrap();
|
||||
assert_eq!(vec_index.index, VecDeque::from([512, 756]));
|
||||
|
||||
let iterator = build_region_wal_index_iterator(
|
||||
1,
|
||||
1024,
|
||||
Some((BTreeSet::from([512, 756]), 1023)),
|
||||
5,
|
||||
5,
|
||||
)
|
||||
.unwrap();
|
||||
let iter = iterator
|
||||
.as_any()
|
||||
.downcast_ref::<MultipleRegionWalIndexIterator>()
|
||||
.unwrap();
|
||||
assert_eq!(iter.iterator.len(), 2);
|
||||
let vec_index = iter.iterator[0]
|
||||
.as_any()
|
||||
.downcast_ref::<RegionWalVecIndex>()
|
||||
.unwrap();
|
||||
assert_eq!(vec_index.index, VecDeque::from([512, 756]));
|
||||
let wal_range = iter.iterator[1]
|
||||
.as_any()
|
||||
.downcast_ref::<RegionWalRange>()
|
||||
.unwrap();
|
||||
assert_eq!(wal_range.current_entry_id, 1023);
|
||||
assert_eq!(wal_range.end_entry_id, 1024);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,19 +20,20 @@ use common_telemetry::{debug, warn};
|
||||
use common_wal::config::kafka::DatanodeKafkaConfig;
|
||||
use futures::future::try_join_all;
|
||||
use futures_util::StreamExt;
|
||||
use rskafka::client::consumer::{StartOffset, StreamConsumerBuilder};
|
||||
use rskafka::client::partition::OffsetAt;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::logstore::entry::{
|
||||
Entry, Id as EntryId, MultiplePartEntry, MultiplePartHeader, NaiveEntry,
|
||||
};
|
||||
use store_api::logstore::provider::{KafkaProvider, Provider};
|
||||
use store_api::logstore::{AppendBatchResponse, LogStore, SendableEntryStream};
|
||||
use store_api::logstore::{AppendBatchResponse, LogStore, SendableEntryStream, WalIndex};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use super::index::build_region_wal_index_iterator;
|
||||
use crate::error::{self, ConsumeRecordSnafu, Error, GetOffsetSnafu, InvalidProviderSnafu, Result};
|
||||
use crate::kafka::client_manager::{ClientManager, ClientManagerRef};
|
||||
use crate::kafka::index::GlobalIndexCollector;
|
||||
use crate::kafka::consumer::{ConsumerBuilder, RecordsBuffer};
|
||||
use crate::kafka::index::{GlobalIndexCollector, MIN_BATCH_WINDOW_SIZE};
|
||||
use crate::kafka::producer::OrderedBatchProducerRef;
|
||||
use crate::kafka::util::record::{
|
||||
convert_to_kafka_records, maybe_emit_entry, remaining_entries, Record, ESTIMATED_META_SIZE,
|
||||
@@ -205,6 +206,7 @@ impl LogStore for KafkaLogStore {
|
||||
&self,
|
||||
provider: &Provider,
|
||||
entry_id: EntryId,
|
||||
index: Option<WalIndex>,
|
||||
) -> Result<SendableEntryStream<'static, Entry, Self::Error>> {
|
||||
let provider = provider
|
||||
.as_kafka_provider()
|
||||
@@ -232,35 +234,41 @@ impl LogStore for KafkaLogStore {
|
||||
.await
|
||||
.context(GetOffsetSnafu {
|
||||
topic: &provider.topic,
|
||||
})?
|
||||
- 1;
|
||||
// Reads entries with offsets in the range [start_offset, end_offset].
|
||||
let start_offset = entry_id as i64;
|
||||
})?;
|
||||
|
||||
debug!(
|
||||
"Start reading entries in range [{}, {}] for ns {}",
|
||||
start_offset, end_offset, provider
|
||||
);
|
||||
let region_indexes = if let (Some(index), Some(collector)) =
|
||||
(index, self.client_manager.global_index_collector())
|
||||
{
|
||||
collector
|
||||
.read_remote_region_index(index.location_id, provider, index.region_id, entry_id)
|
||||
.await?
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Abort if there're no new entries.
|
||||
// FIXME(niebayes): how come this case happens?
|
||||
if start_offset > end_offset {
|
||||
warn!(
|
||||
"No new entries for ns {} in range [{}, {}]",
|
||||
provider, start_offset, end_offset
|
||||
);
|
||||
let Some(iterator) = build_region_wal_index_iterator(
|
||||
entry_id,
|
||||
end_offset as u64,
|
||||
region_indexes,
|
||||
self.max_batch_bytes,
|
||||
MIN_BATCH_WINDOW_SIZE,
|
||||
) else {
|
||||
let range = entry_id..end_offset as u64;
|
||||
warn!("No new entries in range {:?} of ns {}", range, provider);
|
||||
return Ok(futures_util::stream::empty().boxed());
|
||||
}
|
||||
};
|
||||
|
||||
let mut stream_consumer = StreamConsumerBuilder::new(client, StartOffset::At(start_offset))
|
||||
.with_max_batch_size(self.max_batch_bytes as i32)
|
||||
.with_max_wait_ms(self.consumer_wait_timeout.as_millis() as i32)
|
||||
.build();
|
||||
debug!("Reading entries with {:?} of ns {}", iterator, provider);
|
||||
|
||||
debug!(
|
||||
"Built a stream consumer for ns {} to consume entries in range [{}, {}]",
|
||||
provider, start_offset, end_offset
|
||||
);
|
||||
// Safety: Must be ok.
|
||||
let mut stream_consumer = ConsumerBuilder::default()
|
||||
.client(client)
|
||||
// Safety: checked before.
|
||||
.buffer(RecordsBuffer::new(iterator))
|
||||
.max_batch_size(self.max_batch_bytes)
|
||||
.max_wait_ms(self.consumer_wait_timeout.as_millis() as u32)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
// A buffer is used to collect records to construct a complete entry.
|
||||
let mut entry_records: HashMap<RegionId, Vec<Record>> = HashMap::new();
|
||||
@@ -511,7 +519,7 @@ mod tests {
|
||||
// 5 region
|
||||
assert_eq!(response.last_entry_ids.len(), 5);
|
||||
let got_entries = logstore
|
||||
.read(&provider, 0)
|
||||
.read(&provider, 0, None)
|
||||
.await
|
||||
.unwrap()
|
||||
.try_collect::<Vec<_>>()
|
||||
@@ -584,7 +592,7 @@ mod tests {
|
||||
// 5 region
|
||||
assert_eq!(response.last_entry_ids.len(), 5);
|
||||
let got_entries = logstore
|
||||
.read(&provider, 0)
|
||||
.read(&provider, 0, None)
|
||||
.await
|
||||
.unwrap()
|
||||
.try_collect::<Vec<_>>()
|
||||
|
||||
@@ -25,7 +25,7 @@ use raft_engine::{Config, Engine, LogBatch, MessageExt, ReadableSize, RecoveryMo
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::logstore::entry::{Entry, Id as EntryId, NaiveEntry};
|
||||
use store_api::logstore::provider::{Provider, RaftEngineProvider};
|
||||
use store_api::logstore::{AppendBatchResponse, LogStore, SendableEntryStream};
|
||||
use store_api::logstore::{AppendBatchResponse, LogStore, SendableEntryStream, WalIndex};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::error::{
|
||||
@@ -252,6 +252,7 @@ impl LogStore for RaftEngineLogStore {
|
||||
&self,
|
||||
provider: &Provider,
|
||||
entry_id: EntryId,
|
||||
_index: Option<WalIndex>,
|
||||
) -> Result<SendableEntryStream<'static, Entry, Self::Error>> {
|
||||
let ns = provider
|
||||
.as_raft_engine_provider()
|
||||
@@ -545,7 +546,7 @@ mod tests {
|
||||
}
|
||||
let mut entries = HashSet::with_capacity(1024);
|
||||
let mut s = logstore
|
||||
.read(&Provider::raft_engine_provider(1), 0)
|
||||
.read(&Provider::raft_engine_provider(1), 0, None)
|
||||
.await
|
||||
.unwrap();
|
||||
while let Some(r) = s.next().await {
|
||||
@@ -578,7 +579,7 @@ mod tests {
|
||||
.await
|
||||
.is_ok());
|
||||
let entries = logstore
|
||||
.read(&Provider::raft_engine_provider(1), 1)
|
||||
.read(&Provider::raft_engine_provider(1), 1, None)
|
||||
.await
|
||||
.unwrap()
|
||||
.collect::<Vec<_>>()
|
||||
@@ -596,7 +597,7 @@ mod tests {
|
||||
|
||||
let entries = collect_entries(
|
||||
logstore
|
||||
.read(&Provider::raft_engine_provider(1), 1)
|
||||
.read(&Provider::raft_engine_provider(1), 1, None)
|
||||
.await
|
||||
.unwrap(),
|
||||
)
|
||||
@@ -682,7 +683,7 @@ mod tests {
|
||||
logstore.obsolete(&namespace, region_id, 100).await.unwrap();
|
||||
assert_eq!(101, logstore.engine.first_index(namespace_id).unwrap());
|
||||
|
||||
let res = logstore.read(&namespace, 100).await.unwrap();
|
||||
let res = logstore.read(&namespace, 100, None).await.unwrap();
|
||||
let mut vec = collect_entries(res).await;
|
||||
vec.sort_by(|a, b| a.entry_id().partial_cmp(&b.entry_id()).unwrap());
|
||||
assert_eq!(101, vec.first().unwrap().entry_id());
|
||||
|
||||
@@ -90,6 +90,7 @@ impl UpgradeCandidateRegion {
|
||||
region_id,
|
||||
last_entry_id,
|
||||
wait_for_replay_timeout: Some(self.replay_timeout),
|
||||
location_id: Some(ctx.persistent_ctx.from_peer.id),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -41,6 +41,7 @@ impl MetricEngineInner {
|
||||
RegionRequest::Catchup(RegionCatchupRequest {
|
||||
set_writable: req.set_writable,
|
||||
entry_id: None,
|
||||
location_id: req.location_id,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
@@ -52,6 +53,7 @@ impl MetricEngineInner {
|
||||
RegionRequest::Catchup(RegionCatchupRequest {
|
||||
set_writable: req.set_writable,
|
||||
entry_id: req.entry_id,
|
||||
location_id: req.location_id,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -94,6 +94,7 @@ async fn test_catchup_with_last_entry_id() {
|
||||
RegionRequest::Catchup(RegionCatchupRequest {
|
||||
set_writable: false,
|
||||
entry_id: last_entry_id,
|
||||
location_id: None,
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
@@ -125,6 +126,7 @@ async fn test_catchup_with_last_entry_id() {
|
||||
RegionRequest::Catchup(RegionCatchupRequest {
|
||||
set_writable: true,
|
||||
entry_id: last_entry_id,
|
||||
location_id: None,
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
@@ -191,6 +193,7 @@ async fn test_catchup_with_incorrect_last_entry_id() {
|
||||
RegionRequest::Catchup(RegionCatchupRequest {
|
||||
set_writable: false,
|
||||
entry_id: incorrect_last_entry_id,
|
||||
location_id: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
@@ -207,6 +210,7 @@ async fn test_catchup_with_incorrect_last_entry_id() {
|
||||
RegionRequest::Catchup(RegionCatchupRequest {
|
||||
set_writable: false,
|
||||
entry_id: incorrect_last_entry_id,
|
||||
location_id: None,
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
@@ -255,6 +259,7 @@ async fn test_catchup_without_last_entry_id() {
|
||||
RegionRequest::Catchup(RegionCatchupRequest {
|
||||
set_writable: false,
|
||||
entry_id: None,
|
||||
location_id: None,
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
@@ -285,6 +290,7 @@ async fn test_catchup_without_last_entry_id() {
|
||||
RegionRequest::Catchup(RegionCatchupRequest {
|
||||
set_writable: true,
|
||||
entry_id: None,
|
||||
location_id: None,
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
@@ -354,6 +360,7 @@ async fn test_catchup_with_manifest_update() {
|
||||
RegionRequest::Catchup(RegionCatchupRequest {
|
||||
set_writable: false,
|
||||
entry_id: None,
|
||||
location_id: None,
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
@@ -390,6 +397,7 @@ async fn test_catchup_with_manifest_update() {
|
||||
RegionRequest::Catchup(RegionCatchupRequest {
|
||||
set_writable: true,
|
||||
entry_id: None,
|
||||
location_id: None,
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
@@ -411,6 +419,7 @@ async fn test_catchup_not_exist() {
|
||||
RegionRequest::Catchup(RegionCatchupRequest {
|
||||
set_writable: true,
|
||||
entry_id: None,
|
||||
location_id: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -202,8 +202,8 @@ impl RegionOpener {
|
||||
options.need_dedup(),
|
||||
options.merge_mode(),
|
||||
);
|
||||
// Initial memtable id is 0.
|
||||
let part_duration = options.compaction.time_window();
|
||||
// Initial memtable id is 0.
|
||||
let mutable = Arc::new(TimePartitions::new(
|
||||
metadata.clone(),
|
||||
memtable_builder.clone(),
|
||||
@@ -313,7 +313,7 @@ impl RegionOpener {
|
||||
let wal_entry_reader = self
|
||||
.wal_entry_reader
|
||||
.take()
|
||||
.unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id));
|
||||
.unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
|
||||
let on_region_opened = wal.on_region_opened();
|
||||
let object_store = self.object_store(®ion_options.storage)?.clone();
|
||||
|
||||
@@ -335,8 +335,13 @@ impl RegionOpener {
|
||||
region_options.need_dedup(),
|
||||
region_options.merge_mode(),
|
||||
);
|
||||
// Use compaction time window in the manifest if region doesn't provide
|
||||
// the time window option.
|
||||
let part_duration = region_options
|
||||
.compaction
|
||||
.time_window()
|
||||
.or(manifest.compaction_time_window);
|
||||
// Initial memtable id is 0.
|
||||
let part_duration = region_options.compaction.time_window();
|
||||
let mutable = Arc::new(TimePartitions::new(
|
||||
metadata.clone(),
|
||||
memtable_builder.clone(),
|
||||
|
||||
@@ -30,7 +30,7 @@ use prost::Message;
|
||||
use snafu::ResultExt;
|
||||
use store_api::logstore::entry::Entry;
|
||||
use store_api::logstore::provider::Provider;
|
||||
use store_api::logstore::{AppendBatchResponse, LogStore};
|
||||
use store_api::logstore::{AppendBatchResponse, LogStore, WalIndex};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::error::{BuildEntrySnafu, DeleteWalSnafu, EncodeWalSnafu, Result, WriteWalSnafu};
|
||||
@@ -102,15 +102,24 @@ impl<S: LogStore> Wal<S> {
|
||||
&self,
|
||||
provider: &Provider,
|
||||
region_id: RegionId,
|
||||
location_id: Option<u64>,
|
||||
) -> Box<dyn WalEntryReader> {
|
||||
match provider {
|
||||
Provider::RaftEngine(_) => Box::new(LogStoreEntryReader::new(
|
||||
LogStoreRawEntryReader::new(self.store.clone()),
|
||||
)),
|
||||
Provider::Kafka(_) => Box::new(LogStoreEntryReader::new(RegionRawEntryReader::new(
|
||||
LogStoreRawEntryReader::new(self.store.clone()),
|
||||
region_id,
|
||||
))),
|
||||
Provider::Kafka(_) => {
|
||||
let reader = if let Some(location_id) = location_id {
|
||||
LogStoreRawEntryReader::new(self.store.clone())
|
||||
.with_wal_index(WalIndex::new(region_id, location_id))
|
||||
} else {
|
||||
LogStoreRawEntryReader::new(self.store.clone())
|
||||
};
|
||||
|
||||
Box::new(LogStoreEntryReader::new(RegionRawEntryReader::new(
|
||||
reader, region_id,
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -20,7 +20,7 @@ use futures::stream::BoxStream;
|
||||
use snafu::ResultExt;
|
||||
use store_api::logstore::entry::Entry;
|
||||
use store_api::logstore::provider::Provider;
|
||||
use store_api::logstore::LogStore;
|
||||
use store_api::logstore::{LogStore, WalIndex};
|
||||
use store_api::storage::RegionId;
|
||||
use tokio_stream::StreamExt;
|
||||
|
||||
@@ -38,11 +38,20 @@ pub(crate) trait RawEntryReader: Send + Sync {
|
||||
/// Implement the [RawEntryReader] for the [LogStore].
|
||||
pub struct LogStoreRawEntryReader<S> {
|
||||
store: Arc<S>,
|
||||
wal_index: Option<WalIndex>,
|
||||
}
|
||||
|
||||
impl<S> LogStoreRawEntryReader<S> {
|
||||
pub fn new(store: Arc<S>) -> Self {
|
||||
Self { store }
|
||||
Self {
|
||||
store,
|
||||
wal_index: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_wal_index(mut self, wal_index: WalIndex) -> Self {
|
||||
self.wal_index = Some(wal_index);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,9 +59,10 @@ impl<S: LogStore> RawEntryReader for LogStoreRawEntryReader<S> {
|
||||
fn read(&self, provider: &Provider, start_id: EntryId) -> Result<EntryStream<'static>> {
|
||||
let store = self.store.clone();
|
||||
let provider = provider.clone();
|
||||
let wal_index = self.wal_index;
|
||||
let stream = try_stream!({
|
||||
let mut stream = store
|
||||
.read(&provider, start_id)
|
||||
.read(&provider, start_id, wal_index)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.with_context(|_| error::ReadWalSnafu {
|
||||
@@ -119,7 +129,9 @@ mod tests {
|
||||
|
||||
use futures::{stream, TryStreamExt};
|
||||
use store_api::logstore::entry::{Entry, NaiveEntry};
|
||||
use store_api::logstore::{AppendBatchResponse, EntryId, LogStore, SendableEntryStream};
|
||||
use store_api::logstore::{
|
||||
AppendBatchResponse, EntryId, LogStore, SendableEntryStream, WalIndex,
|
||||
};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use super::*;
|
||||
@@ -149,6 +161,7 @@ mod tests {
|
||||
&self,
|
||||
_provider: &Provider,
|
||||
_id: EntryId,
|
||||
_index: Option<WalIndex>,
|
||||
) -> Result<SendableEntryStream<'static, Entry, Self::Error>, Self::Error> {
|
||||
Ok(Box::pin(stream::iter(vec![Ok(self.entries.clone())])))
|
||||
}
|
||||
|
||||
@@ -74,7 +74,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
let flushed_entry_id = region.version_control.current().last_entry_id;
|
||||
info!("Trying to replay memtable for region: {region_id}, flushed entry id: {flushed_entry_id}");
|
||||
let timer = Instant::now();
|
||||
let wal_entry_reader = self.wal.wal_entry_reader(®ion.provider, region_id);
|
||||
let wal_entry_reader =
|
||||
self.wal
|
||||
.wal_entry_reader(®ion.provider, region_id, request.location_id);
|
||||
let on_region_opened = self.wal.on_region_opened();
|
||||
let last_entry_id = replay_memtable(
|
||||
®ion.provider,
|
||||
@@ -93,7 +95,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
);
|
||||
if let Some(expected_last_entry_id) = request.entry_id {
|
||||
ensure!(
|
||||
expected_last_entry_id == last_entry_id,
|
||||
// The replayed last entry id may be greater than the `expected_last_entry_id`.
|
||||
last_entry_id >= expected_last_entry_id,
|
||||
error::UnexpectedReplaySnafu {
|
||||
region_id,
|
||||
expected_last_entry_id,
|
||||
|
||||
@@ -41,7 +41,6 @@ use datafusion_physical_expr::EquivalenceProperties;
|
||||
use datatypes::schema::{Schema, SchemaRef};
|
||||
use futures_util::StreamExt;
|
||||
use greptime_proto::v1::region::RegionRequestHeader;
|
||||
use greptime_proto::v1::QueryContext;
|
||||
use meter_core::data::ReadItem;
|
||||
use meter_macros::read_meter;
|
||||
use session::context::QueryContextRef;
|
||||
@@ -185,24 +184,25 @@ impl MergeScanExec {
|
||||
context: Arc<TaskContext>,
|
||||
partition: usize,
|
||||
) -> Result<SendableRecordBatchStream> {
|
||||
// prepare states to move
|
||||
let regions = self.regions.clone();
|
||||
let region_query_handler = self.region_query_handler.clone();
|
||||
let metric = MergeScanMetric::new(&self.metric);
|
||||
let schema = Self::arrow_schema_to_schema(self.schema())?;
|
||||
|
||||
let dbname = context.task_id().unwrap_or_default();
|
||||
let tracing_context = TracingContext::from_json(context.session_id().as_str());
|
||||
let current_catalog = self.query_ctx.current_catalog().to_string();
|
||||
let current_schema = self.query_ctx.current_schema().to_string();
|
||||
let current_channel = self.query_ctx.channel();
|
||||
let timezone = self.query_ctx.timezone().to_string();
|
||||
let extensions = self.query_ctx.extensions();
|
||||
let target_partition = self.target_partition;
|
||||
|
||||
let schema = self.schema.clone();
|
||||
let query_ctx = self.query_ctx.clone();
|
||||
let sub_stage_metrics_moved = self.sub_stage_metrics.clone();
|
||||
let plan = self.plan.clone();
|
||||
let target_partition = self.target_partition;
|
||||
let dbname = context.task_id().unwrap_or_default();
|
||||
let tracing_context = TracingContext::from_json(context.session_id().as_str());
|
||||
let current_channel = self.query_ctx.channel();
|
||||
|
||||
let stream = Box::pin(stream!({
|
||||
MERGE_SCAN_REGIONS.observe(regions.len() as f64);
|
||||
// only report metrics once for each MergeScan
|
||||
if partition == 0 {
|
||||
MERGE_SCAN_REGIONS.observe(regions.len() as f64);
|
||||
}
|
||||
|
||||
let _finish_timer = metric.finish_time().timer();
|
||||
let mut ready_timer = metric.ready_time().timer();
|
||||
let mut first_consume_timer = Some(metric.first_consume_time().timer());
|
||||
@@ -217,13 +217,7 @@ impl MergeScanExec {
|
||||
header: Some(RegionRequestHeader {
|
||||
tracing_context: tracing_context.to_w3c(),
|
||||
dbname: dbname.clone(),
|
||||
query_context: Some(QueryContext {
|
||||
current_catalog: current_catalog.clone(),
|
||||
current_schema: current_schema.clone(),
|
||||
timezone: timezone.clone(),
|
||||
extensions: extensions.clone(),
|
||||
channel: current_channel as u32,
|
||||
}),
|
||||
query_context: Some(query_ctx.as_ref().into()),
|
||||
}),
|
||||
region_id,
|
||||
plan: plan.clone(),
|
||||
|
||||
@@ -63,9 +63,10 @@ impl ParallelizeScan {
|
||||
);
|
||||
|
||||
// update the partition ranges
|
||||
region_scan_exec
|
||||
.set_partitions(partition_ranges)
|
||||
let new_exec = region_scan_exec
|
||||
.with_new_partitions(partition_ranges)
|
||||
.map_err(|e| DataFusionError::External(e.into_inner()))?;
|
||||
return Ok(Transformed::yes(Arc::new(new_exec)));
|
||||
}
|
||||
|
||||
// The plan might be modified, but it's modified in-place so we always return
|
||||
@@ -80,11 +81,15 @@ impl ParallelizeScan {
|
||||
/// Distribute [`PartitionRange`]s to each partition.
|
||||
///
|
||||
/// Currently we use a simple round-robin strategy to assign ranges to partitions.
|
||||
/// This method may return partitions with smaller number than `expected_partition_num`
|
||||
/// if the number of ranges is smaller than `expected_partition_num`. But this will
|
||||
/// return at least one partition.
|
||||
fn assign_partition_range(
|
||||
ranges: Vec<PartitionRange>,
|
||||
expected_partition_num: usize,
|
||||
) -> Vec<Vec<PartitionRange>> {
|
||||
let mut partition_ranges = vec![vec![]; expected_partition_num];
|
||||
let actual_partition_num = expected_partition_num.min(ranges.len()).max(1);
|
||||
let mut partition_ranges = vec![vec![]; actual_partition_num];
|
||||
|
||||
// round-robin assignment
|
||||
for (i, range) in ranges.into_iter().enumerate() {
|
||||
@@ -95,3 +100,112 @@ impl ParallelizeScan {
|
||||
partition_ranges
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use common_time::Timestamp;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_assign_partition_range() {
|
||||
let ranges = vec![
|
||||
PartitionRange {
|
||||
start: Timestamp::new(0, TimeUnit::Second),
|
||||
end: Timestamp::new(10, TimeUnit::Second),
|
||||
estimated_size: 100,
|
||||
identifier: 1,
|
||||
},
|
||||
PartitionRange {
|
||||
start: Timestamp::new(10, TimeUnit::Second),
|
||||
end: Timestamp::new(20, TimeUnit::Second),
|
||||
estimated_size: 200,
|
||||
identifier: 2,
|
||||
},
|
||||
PartitionRange {
|
||||
start: Timestamp::new(20, TimeUnit::Second),
|
||||
end: Timestamp::new(30, TimeUnit::Second),
|
||||
estimated_size: 150,
|
||||
identifier: 3,
|
||||
},
|
||||
PartitionRange {
|
||||
start: Timestamp::new(30, TimeUnit::Second),
|
||||
end: Timestamp::new(40, TimeUnit::Second),
|
||||
estimated_size: 250,
|
||||
identifier: 4,
|
||||
},
|
||||
];
|
||||
|
||||
// assign to 2 partitions
|
||||
let expected_partition_num = 2;
|
||||
let result =
|
||||
ParallelizeScan::assign_partition_range(ranges.clone(), expected_partition_num);
|
||||
let expected = vec![
|
||||
vec![
|
||||
PartitionRange {
|
||||
start: Timestamp::new(0, TimeUnit::Second),
|
||||
end: Timestamp::new(10, TimeUnit::Second),
|
||||
estimated_size: 100,
|
||||
identifier: 1,
|
||||
},
|
||||
PartitionRange {
|
||||
start: Timestamp::new(20, TimeUnit::Second),
|
||||
end: Timestamp::new(30, TimeUnit::Second),
|
||||
estimated_size: 150,
|
||||
identifier: 3,
|
||||
},
|
||||
],
|
||||
vec![
|
||||
PartitionRange {
|
||||
start: Timestamp::new(10, TimeUnit::Second),
|
||||
end: Timestamp::new(20, TimeUnit::Second),
|
||||
estimated_size: 200,
|
||||
identifier: 2,
|
||||
},
|
||||
PartitionRange {
|
||||
start: Timestamp::new(30, TimeUnit::Second),
|
||||
end: Timestamp::new(40, TimeUnit::Second),
|
||||
estimated_size: 250,
|
||||
identifier: 4,
|
||||
},
|
||||
],
|
||||
];
|
||||
assert_eq!(result, expected);
|
||||
|
||||
// assign 4 ranges to 5 partitions. Only 4 partitions are returned.
|
||||
let expected_partition_num = 5;
|
||||
let result = ParallelizeScan::assign_partition_range(ranges, expected_partition_num);
|
||||
let expected = vec![
|
||||
vec![PartitionRange {
|
||||
start: Timestamp::new(0, TimeUnit::Second),
|
||||
end: Timestamp::new(10, TimeUnit::Second),
|
||||
estimated_size: 100,
|
||||
identifier: 1,
|
||||
}],
|
||||
vec![PartitionRange {
|
||||
start: Timestamp::new(10, TimeUnit::Second),
|
||||
end: Timestamp::new(20, TimeUnit::Second),
|
||||
estimated_size: 200,
|
||||
identifier: 2,
|
||||
}],
|
||||
vec![PartitionRange {
|
||||
start: Timestamp::new(20, TimeUnit::Second),
|
||||
end: Timestamp::new(30, TimeUnit::Second),
|
||||
estimated_size: 150,
|
||||
identifier: 3,
|
||||
}],
|
||||
vec![PartitionRange {
|
||||
start: Timestamp::new(30, TimeUnit::Second),
|
||||
end: Timestamp::new(40, TimeUnit::Second),
|
||||
estimated_size: 250,
|
||||
identifier: 4,
|
||||
}],
|
||||
];
|
||||
assert_eq!(result, expected);
|
||||
|
||||
// assign 0 ranges to 5 partitions. Only 1 partition is returned.
|
||||
let result = ParallelizeScan::assign_partition_range(vec![], 5);
|
||||
assert_eq!(result.len(), 1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -59,7 +59,7 @@ pub struct SelfImportOption {
|
||||
impl Default for SelfImportOption {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
db: "information_schema".to_string(),
|
||||
db: "greptime_metrics".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -137,6 +137,12 @@ impl From<QueryContext> for api::v1::QueryContext {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&QueryContext> for api::v1::QueryContext {
|
||||
fn from(ctx: &QueryContext) -> Self {
|
||||
ctx.clone().into()
|
||||
}
|
||||
}
|
||||
|
||||
impl QueryContext {
|
||||
pub fn arc() -> QueryContextRef {
|
||||
Arc::new(QueryContextBuilder::default().build())
|
||||
|
||||
@@ -30,6 +30,22 @@ pub use crate::logstore::entry::Id as EntryId;
|
||||
use crate::logstore::provider::Provider;
|
||||
use crate::storage::RegionId;
|
||||
|
||||
// The information used to locate WAL index for the specified region.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct WalIndex {
|
||||
pub region_id: RegionId,
|
||||
pub location_id: u64,
|
||||
}
|
||||
|
||||
impl WalIndex {
|
||||
pub fn new(region_id: RegionId, location_id: u64) -> Self {
|
||||
Self {
|
||||
region_id,
|
||||
location_id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// `LogStore` serves as a Write-Ahead-Log for storage engine.
|
||||
#[async_trait::async_trait]
|
||||
pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
|
||||
@@ -48,6 +64,7 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
|
||||
&self,
|
||||
provider: &Provider,
|
||||
id: EntryId,
|
||||
index: Option<WalIndex>,
|
||||
) -> Result<SendableEntryStream<'static, Entry, Self::Error>, Self::Error>;
|
||||
|
||||
/// Creates a new `Namespace` from the given ref.
|
||||
|
||||
@@ -143,7 +143,7 @@ impl ScannerPartitioning {
|
||||
}
|
||||
|
||||
/// Represents one data range within a partition
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub struct PartitionRange {
|
||||
/// Start time of time index column. Inclusive.
|
||||
pub start: Timestamp,
|
||||
|
||||
@@ -673,6 +673,8 @@ pub struct RegionCatchupRequest {
|
||||
/// The `entry_id` that was expected to reply to.
|
||||
/// `None` stands replaying to latest.
|
||||
pub entry_id: Option<entry::Id>,
|
||||
/// The hint for replaying memtable.
|
||||
pub location_id: Option<u64>,
|
||||
}
|
||||
|
||||
impl fmt::Display for RegionRequest {
|
||||
|
||||
@@ -41,7 +41,7 @@ use crate::table::metrics::StreamMetrics;
|
||||
/// A plan to read multiple partitions from a region of a table.
|
||||
#[derive(Debug)]
|
||||
pub struct RegionScanExec {
|
||||
scanner: Mutex<RegionScannerRef>,
|
||||
scanner: Arc<Mutex<RegionScannerRef>>,
|
||||
arrow_schema: ArrowSchemaRef,
|
||||
/// The expected output ordering for the plan.
|
||||
output_ordering: Option<Vec<PhysicalSortExpr>>,
|
||||
@@ -70,7 +70,7 @@ impl RegionScanExec {
|
||||
let append_mode = scanner_props.append_mode();
|
||||
let total_rows = scanner_props.total_rows();
|
||||
Self {
|
||||
scanner: Mutex::new(scanner),
|
||||
scanner: Arc::new(Mutex::new(scanner)),
|
||||
arrow_schema,
|
||||
output_ordering: None,
|
||||
metric: ExecutionPlanMetricsSet::new(),
|
||||
@@ -102,9 +102,28 @@ impl RegionScanExec {
|
||||
}
|
||||
|
||||
/// Update the partition ranges of underlying scanner.
|
||||
pub fn set_partitions(&self, partitions: Vec<Vec<PartitionRange>>) -> Result<(), BoxedError> {
|
||||
let mut scanner = self.scanner.lock().unwrap();
|
||||
scanner.prepare(partitions)
|
||||
pub fn with_new_partitions(
|
||||
&self,
|
||||
partitions: Vec<Vec<PartitionRange>>,
|
||||
) -> Result<Self, BoxedError> {
|
||||
let num_partitions = partitions.len();
|
||||
let mut properties = self.properties.clone();
|
||||
properties.partitioning = Partitioning::UnknownPartitioning(num_partitions);
|
||||
|
||||
{
|
||||
let mut scanner = self.scanner.lock().unwrap();
|
||||
scanner.prepare(partitions)?;
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
scanner: self.scanner.clone(),
|
||||
arrow_schema: self.arrow_schema.clone(),
|
||||
output_ordering: self.output_ordering.clone(),
|
||||
metric: self.metric.clone(),
|
||||
properties,
|
||||
append_mode: self.append_mode,
|
||||
total_rows: self.total_rows,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
48
tests/cases/standalone/common/function/geo.result
Normal file
48
tests/cases/standalone/common/function/geo.result
Normal file
@@ -0,0 +1,48 @@
|
||||
SELECT h3(37.76938, -122.3889, 0);
|
||||
|
||||
+---------------------------------------------------+
|
||||
| h3(Float64(37.76938),Float64(-122.3889),Int64(0)) |
|
||||
+---------------------------------------------------+
|
||||
| 8029fffffffffff |
|
||||
+---------------------------------------------------+
|
||||
|
||||
SELECT h3(37.76938, -122.3889, 1);
|
||||
|
||||
+---------------------------------------------------+
|
||||
| h3(Float64(37.76938),Float64(-122.3889),Int64(1)) |
|
||||
+---------------------------------------------------+
|
||||
| 81283ffffffffff |
|
||||
+---------------------------------------------------+
|
||||
|
||||
SELECT h3(37.76938, -122.3889, 8);
|
||||
|
||||
+---------------------------------------------------+
|
||||
| h3(Float64(37.76938),Float64(-122.3889),Int64(8)) |
|
||||
+---------------------------------------------------+
|
||||
| 88283082e7fffff |
|
||||
+---------------------------------------------------+
|
||||
|
||||
SELECT geohash(37.76938, -122.3889, 9);
|
||||
|
||||
+--------------------------------------------------------+
|
||||
| geohash(Float64(37.76938),Float64(-122.3889),Int64(9)) |
|
||||
+--------------------------------------------------------+
|
||||
| 9q8yygxne |
|
||||
+--------------------------------------------------------+
|
||||
|
||||
SELECT geohash(37.76938, -122.3889, 10);
|
||||
|
||||
+---------------------------------------------------------+
|
||||
| geohash(Float64(37.76938),Float64(-122.3889),Int64(10)) |
|
||||
+---------------------------------------------------------+
|
||||
| 9q8yygxnef |
|
||||
+---------------------------------------------------------+
|
||||
|
||||
SELECT geohash(37.76938, -122.3889, 11);
|
||||
|
||||
+---------------------------------------------------------+
|
||||
| geohash(Float64(37.76938),Float64(-122.3889),Int64(11)) |
|
||||
+---------------------------------------------------------+
|
||||
| 9q8yygxneft |
|
||||
+---------------------------------------------------------+
|
||||
|
||||
11
tests/cases/standalone/common/function/geo.sql
Normal file
11
tests/cases/standalone/common/function/geo.sql
Normal file
@@ -0,0 +1,11 @@
|
||||
SELECT h3(37.76938, -122.3889, 0);
|
||||
|
||||
SELECT h3(37.76938, -122.3889, 1);
|
||||
|
||||
SELECT h3(37.76938, -122.3889, 8);
|
||||
|
||||
SELECT geohash(37.76938, -122.3889, 9);
|
||||
|
||||
SELECT geohash(37.76938, -122.3889, 10);
|
||||
|
||||
SELECT geohash(37.76938, -122.3889, 11);
|
||||
Reference in New Issue
Block a user