Compare commits

..

3 Commits

Author SHA1 Message Date
liyang
b4b105ad35 test 2024-08-27 10:11:32 +08:00
liyang
e1d0bb3749 test 2024-08-27 02:07:07 +08:00
liyang
867d6ab600 test: skopeo authentication 2024-08-27 01:19:54 +08:00
63 changed files with 779 additions and 1243 deletions

View File

@@ -4,6 +4,9 @@ inputs:
arch:
description: Architecture to build
required: true
rust-toolchain:
description: Rust toolchain to use
required: true
cargo-profile:
description: Cargo profile to build
required: true
@@ -40,8 +43,9 @@ runs:
brew install protobuf
- name: Install rust toolchain
uses: actions-rust-lang/setup-rust-toolchain@v1
uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ inputs.rust-toolchain }}
targets: ${{ inputs.arch }}
- name: Start etcd # For integration tests.

View File

@@ -4,6 +4,9 @@ inputs:
arch:
description: Architecture to build
required: true
rust-toolchain:
description: Rust toolchain to use
required: true
cargo-profile:
description: Cargo profile to build
required: true
@@ -25,8 +28,9 @@ runs:
- uses: arduino/setup-protoc@v3
- name: Install rust toolchain
uses: actions-rust-lang/setup-rust-toolchain@v1
uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ inputs.rust-toolchain }}
targets: ${{ inputs.arch }}
components: llvm-tools-preview

View File

@@ -12,6 +12,9 @@ on:
name: Build API docs
env:
RUST_TOOLCHAIN: nightly-2024-06-06
jobs:
apidoc:
runs-on: ubuntu-20.04
@@ -20,7 +23,9 @@ jobs:
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions-rust-lang/setup-rust-toolchain@v1
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
- run: cargo doc --workspace --no-deps --document-private-items
- run: |
cat <<EOF > target/doc/index.html

View File

@@ -29,6 +29,9 @@ concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true
env:
RUST_TOOLCHAIN: nightly-2024-06-06
jobs:
check-typos-and-docs:
name: Check typos and docs
@@ -61,7 +64,9 @@ jobs:
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions-rust-lang/setup-rust-toolchain@v1
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
@@ -77,7 +82,9 @@ jobs:
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
- uses: actions-rust-lang/setup-rust-toolchain@v1
- uses: dtolnay/rust-toolchain@master
with:
toolchain: stable
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
@@ -100,7 +107,9 @@ jobs:
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions-rust-lang/setup-rust-toolchain@v1
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
- uses: Swatinem/rust-cache@v2
with:
# Shares across multiple jobs
@@ -152,7 +161,9 @@ jobs:
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions-rust-lang/setup-rust-toolchain@v1
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
@@ -170,7 +181,7 @@ jobs:
name: bins
path: .
- name: Unzip binaries
run: |
run: |
tar -xvf ./bins.tar.gz
rm ./bins.tar.gz
- name: Run GreptimeDB
@@ -210,7 +221,9 @@ jobs:
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions-rust-lang/setup-rust-toolchain@v1
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
@@ -261,7 +274,9 @@ jobs:
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions-rust-lang/setup-rust-toolchain@v1
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
- uses: Swatinem/rust-cache@v2
with:
# Shares across multiple jobs
@@ -272,7 +287,7 @@ jobs:
- name: Build greptime bianry
shell: bash
# `cargo gc` will invoke `cargo build` with specified args
run: cargo gc --profile ci -- --bin greptime
run: cargo gc --profile ci -- --bin greptime
- name: Pack greptime binary
shell: bash
run: |
@@ -286,7 +301,7 @@ jobs:
artifacts-dir: bin
version: current
distributed-fuzztest:
distributed-fuzztest:
name: Fuzz Test (Distributed, ${{ matrix.mode.name }}, ${{ matrix.target }})
runs-on: ubuntu-latest
needs: build-greptime-ci
@@ -329,7 +344,9 @@ jobs:
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions-rust-lang/setup-rust-toolchain@v1
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
@@ -399,12 +416,12 @@ jobs:
- name: Describe Nodes
if: failure()
shell: bash
run: |
kubectl describe nodes
run: |
kubectl describe nodes
- name: Export kind logs
if: failure()
shell: bash
run: |
run: |
kind export logs /tmp/kind
- name: Upload logs
if: failure()
@@ -416,13 +433,13 @@ jobs:
- name: Delete cluster
if: success()
shell: bash
run: |
run: |
kind delete cluster
docker stop $(docker ps -a -q)
docker rm $(docker ps -a -q)
docker system prune -f
distributed-fuzztest-with-chaos:
distributed-fuzztest-with-chaos:
name: Fuzz Test with Chaos (Distributed, ${{ matrix.mode.name }}, ${{ matrix.target }})
runs-on: ubuntu-latest
needs: build-greptime-ci
@@ -430,7 +447,7 @@ jobs:
strategy:
matrix:
target: ["fuzz_migrate_mito_regions", "fuzz_failover_mito_regions", "fuzz_failover_metric_regions"]
mode:
mode:
- name: "Remote WAL"
minio: true
kafka: true
@@ -467,7 +484,9 @@ jobs:
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions-rust-lang/setup-rust-toolchain@v1
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
@@ -538,12 +557,12 @@ jobs:
- name: Describe Nodes
if: failure()
shell: bash
run: |
kubectl describe nodes
run: |
kubectl describe nodes
- name: Export kind logs
if: failure()
shell: bash
run: |
run: |
kind export logs /tmp/kind
- name: Upload logs
if: failure()
@@ -555,7 +574,7 @@ jobs:
- name: Delete cluster
if: success()
shell: bash
run: |
run: |
kind delete cluster
docker stop $(docker ps -a -q)
docker rm $(docker ps -a -q)
@@ -608,8 +627,9 @@ jobs:
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions-rust-lang/setup-rust-toolchain@v1
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
components: rustfmt
- name: Rust Cache
uses: Swatinem/rust-cache@v2
@@ -628,8 +648,9 @@ jobs:
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions-rust-lang/setup-rust-toolchain@v1
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
components: clippy
- name: Rust Cache
uses: Swatinem/rust-cache@v2
@@ -653,8 +674,9 @@ jobs:
with:
version: "14.0"
- name: Install toolchain
uses: actions-rust-lang/setup-rust-toolchain@v1
uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
components: llvm-tools-preview
- name: Rust Cache
uses: Swatinem/rust-cache@v2

View File

@@ -9,6 +9,9 @@ concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true
env:
RUST_TOOLCHAIN: nightly-2024-06-06
permissions:
issues: write
@@ -49,7 +52,9 @@ jobs:
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions-rust-lang/setup-rust-toolchain@v1
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
- name: Rust Cache
uses: Swatinem/rust-cache@v2
- name: Run sqlness
@@ -80,8 +85,9 @@ jobs:
with:
version: "14.0"
- name: Install Rust toolchain
uses: actions-rust-lang/setup-rust-toolchain@v1
uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
components: llvm-tools-preview
- name: Rust Cache
uses: Swatinem/rust-cache@v2

View File

@@ -1,12 +1,6 @@
name: Release dev-builder images
on:
push:
branches:
- main
paths:
- rust-toolchain.toml
- 'docker/dev-builder/**'
workflow_dispatch: # Allows you to run this workflow manually.
inputs:
release_dev_builder_ubuntu_image:

View File

@@ -82,6 +82,7 @@ on:
# Use env variables to control all the release process.
env:
# The arguments of building greptime.
RUST_TOOLCHAIN: nightly-2024-06-06
CARGO_PROFILE: nightly
# Controls whether to run tests, include unit-test, integration-test and sqlness.
@@ -98,16 +99,6 @@ permissions:
contents: write # Allows the action to create a release.
jobs:
check-builder-rust-version:
name: Check rust version in builder
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v4
- name: Check Rust toolchain version
shell: bash
run: |
./scripts/check-builder-rust-version.sh
allocate-runners:
name: Allocate runners
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
@@ -253,6 +244,7 @@ jobs:
- uses: ./.github/actions/build-macos-artifacts
with:
arch: ${{ matrix.arch }}
rust-toolchain: ${{ env.RUST_TOOLCHAIN }}
cargo-profile: ${{ env.CARGO_PROFILE }}
features: ${{ matrix.features }}
version: ${{ needs.allocate-runners.outputs.version }}
@@ -295,6 +287,7 @@ jobs:
- uses: ./.github/actions/build-windows-artifacts
with:
arch: ${{ matrix.arch }}
rust-toolchain: ${{ env.RUST_TOOLCHAIN }}
cargo-profile: ${{ env.CARGO_PROFILE }}
features: ${{ matrix.features }}
version: ${{ needs.allocate-runners.outputs.version }}

66
Cargo.lock generated
View File

@@ -897,15 +897,6 @@ dependencies = [
"tokio",
]
[[package]]
name = "backon"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2caf634d05fe0642d0fb1ab43497fa627088ecd93f84b2d0f2a5d7b91f7730db"
dependencies = [
"fastrand",
]
[[package]]
name = "backtrace"
version = "0.3.73"
@@ -1950,8 +1941,6 @@ dependencies = [
"common-version",
"datafusion",
"datatypes",
"geohash",
"h3o",
"num",
"num-traits",
"once_cell",
@@ -2122,7 +2111,7 @@ version = "0.9.2"
dependencies = [
"async-stream",
"async-trait",
"backon 1.0.2",
"backon",
"common-base",
"common-error",
"common-macro",
@@ -3815,12 +3804,6 @@ dependencies = [
"num-traits",
]
[[package]]
name = "float_eq"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28a80e3145d8ad11ba0995949bbcf48b9df2be62772b3d351ef017dff6ecb853"
[[package]]
name = "flow"
version = "0.9.2"
@@ -4219,27 +4202,6 @@ dependencies = [
"version_check",
]
[[package]]
name = "geo-types"
version = "0.7.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ff16065e5720f376fbced200a5ae0f47ace85fd70b7e54269790281353b6d61"
dependencies = [
"approx",
"num-traits",
"serde",
]
[[package]]
name = "geohash"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fb94b1a65401d6cbf22958a9040aa364812c26674f841bee538b12c135db1e6"
dependencies = [
"geo-types",
"libm",
]
[[package]]
name = "gethostname"
version = "0.2.3"
@@ -4330,25 +4292,6 @@ dependencies = [
"tracing",
]
[[package]]
name = "h3o"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0de3592e1f699692aa0525c42ff7879ec3ee7e36329af20967bc910a1cdc39c7"
dependencies = [
"ahash 0.8.11",
"either",
"float_eq",
"h3o-bit",
"libm",
]
[[package]]
name = "h3o-bit"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fb45e8060378c0353781abf67e1917b545a6b710d0342d85b70c125af7ef320"
[[package]]
name = "half"
version = "1.8.3"
@@ -5874,7 +5817,6 @@ dependencies = [
"common-time",
"common-wal",
"delta-encoding",
"derive_builder 0.12.0",
"futures",
"futures-util",
"itertools 0.10.5",
@@ -7069,13 +7011,13 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
[[package]]
name = "opendal"
version = "0.49.1"
version = "0.49.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba615070686c8781ce97376fdafca29d7c42f47b31d2230d7c8c1642ec823950"
checksum = "39d516adf7db912c38af382c3e92c27cd62fbbc240e630920555d784c2ab1494"
dependencies = [
"anyhow",
"async-trait",
"backon 0.4.4",
"backon",
"base64 0.22.1",
"bytes",
"chrono",

View File

@@ -8,7 +8,6 @@ CARGO_BUILD_OPTS := --locked
IMAGE_REGISTRY ?= docker.io
IMAGE_NAMESPACE ?= greptime
IMAGE_TAG ?= latest
DEV_BUILDER_IMAGE_TAG ?= 2024-06-06-b4b105ad-20240827021230
BUILDX_MULTI_PLATFORM_BUILD ?= false
BUILDX_BUILDER_NAME ?= gtbuilder
BASE_IMAGE ?= ubuntu
@@ -78,7 +77,7 @@ build: ## Build debug version greptime.
build-by-dev-builder: ## Build greptime by dev-builder.
docker run --network=host \
-v ${PWD}:/greptimedb -v ${CARGO_REGISTRY_CACHE}:/root/.cargo/registry \
-w /greptimedb ${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/dev-builder-${BASE_IMAGE}:${DEV_BUILDER_IMAGE_TAG} \
-w /greptimedb ${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/dev-builder-${BASE_IMAGE}:latest \
make build \
CARGO_EXTENSION="${CARGO_EXTENSION}" \
CARGO_PROFILE=${CARGO_PROFILE} \
@@ -92,7 +91,7 @@ build-by-dev-builder: ## Build greptime by dev-builder.
build-android-bin: ## Build greptime binary for android.
docker run --network=host \
-v ${PWD}:/greptimedb -v ${CARGO_REGISTRY_CACHE}:/root/.cargo/registry \
-w /greptimedb ${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/dev-builder-android:${DEV_BUILDER_IMAGE_TAG} \
-w /greptimedb ${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/dev-builder-android:latest \
make build \
CARGO_EXTENSION="ndk --platform 23 -t aarch64-linux-android" \
CARGO_PROFILE=release \
@@ -106,7 +105,7 @@ build-android-bin: ## Build greptime binary for android.
strip-android-bin: build-android-bin ## Strip greptime binary for android.
docker run --network=host \
-v ${PWD}:/greptimedb \
-w /greptimedb ${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/dev-builder-android:${DEV_BUILDER_IMAGE_TAG} \
-w /greptimedb ${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/dev-builder-android:latest \
bash -c '$${NDK_ROOT}/toolchains/llvm/prebuilt/linux-x86_64/bin/llvm-strip --strip-debug /greptimedb/target/aarch64-linux-android/release/greptime'
.PHONY: clean
@@ -146,7 +145,7 @@ dev-builder: multi-platform-buildx ## Build dev-builder image.
docker buildx build --builder ${BUILDX_BUILDER_NAME} \
--build-arg="RUST_TOOLCHAIN=${RUST_TOOLCHAIN}" \
-f docker/dev-builder/${BASE_IMAGE}/Dockerfile \
-t ${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/dev-builder-${BASE_IMAGE}:${DEV_BUILDER_IMAGE_TAG} ${BUILDX_MULTI_PLATFORM_BUILD_OPTS} .
-t ${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/dev-builder-${BASE_IMAGE}:${IMAGE_TAG} ${BUILDX_MULTI_PLATFORM_BUILD_OPTS} .
.PHONY: multi-platform-buildx
multi-platform-buildx: ## Create buildx multi-platform builder.
@@ -204,7 +203,7 @@ stop-etcd: ## Stop single node etcd for testing purpose.
run-it-in-container: start-etcd ## Run integration tests in dev-builder.
docker run --network=host \
-v ${PWD}:/greptimedb -v ${CARGO_REGISTRY_CACHE}:/root/.cargo/registry -v /tmp:/tmp \
-w /greptimedb ${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/dev-builder-${BASE_IMAGE}:${DEV_BUILDER_IMAGE_TAG} \
-w /greptimedb ${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/dev-builder-${BASE_IMAGE}:latest \
make test sqlness-test BUILD_JOBS=${BUILD_JOBS}
.PHONY: start-cluster

View File

@@ -160,10 +160,10 @@
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself |
| `export_metrics.self_import.db` | String | `None` | -- |
| `export_metrics.remote_write` | -- | -- | -- |
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema`. |
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | `None` | The tokio console address. |
@@ -245,10 +245,10 @@
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself |
| `export_metrics.self_import.db` | String | `None` | -- |
| `export_metrics.remote_write` | -- | -- | -- |
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema`. |
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | `None` | The tokio console address. |
@@ -309,10 +309,10 @@
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself |
| `export_metrics.self_import.db` | String | `None` | -- |
| `export_metrics.remote_write` | -- | -- | -- |
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema`. |
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | `None` | The tokio console address. |
@@ -333,10 +333,6 @@
| `rpc_runtime_size` | Integer | `None` | Deprecated, use `grpc.runtime_size` instead. |
| `rpc_max_recv_message_size` | String | `None` | Deprecated, use `grpc.rpc_max_recv_message_size` instead. |
| `rpc_max_send_message_size` | String | `None` | Deprecated, use `grpc.rpc_max_send_message_size` instead. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.addr` | String | `127.0.0.1:3001` | The address to bind the gRPC server. |
| `grpc.hostname` | String | `127.0.0.1` | The hostname advertised to the metasrv,<br/>and used for connections from outside the host |
@@ -457,10 +453,10 @@
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself |
| `export_metrics.self_import.db` | String | `None` | -- |
| `export_metrics.remote_write` | -- | -- | -- |
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema`. |
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | `None` | The tokio console address. |

View File

@@ -39,18 +39,6 @@ rpc_max_recv_message_size = "512MB"
## +toml2docs:none-default
rpc_max_send_message_size = "512MB"
## The HTTP server options.
[http]
## The address to bind the HTTP server.
addr = "127.0.0.1:4000"
## HTTP request timeout. Set to 0 to disable timeout.
timeout = "30s"
## HTTP request body limit.
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
## Set to 0 to disable limit.
body_limit = "64MB"
## The gRPC server options.
[grpc]
## The address to bind the gRPC server.
@@ -564,13 +552,12 @@ enable = false
write_interval = "30s"
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
## You must create the database before enabling it.
[export_metrics.self_import]
## +toml2docs:none-default
db = "greptime_metrics"
db = "information_schema"
[export_metrics.remote_write]
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema`.
url = ""
## HTTP headers of Prometheus remote-write carry.

View File

@@ -199,13 +199,12 @@ enable = false
write_interval = "30s"
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
## You must create the database before enabling it.
[export_metrics.self_import]
## +toml2docs:none-default
db = "greptime_metrics"
db = "information_schema"
[export_metrics.remote_write]
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema`.
url = ""
## HTTP headers of Prometheus remote-write carry.

View File

@@ -186,13 +186,12 @@ enable = false
write_interval = "30s"
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
## You must create the database before enabling it.
[export_metrics.self_import]
## +toml2docs:none-default
db = "greptime_metrics"
db = "information_schema"
[export_metrics.remote_write]
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema`.
url = ""
## HTTP headers of Prometheus remote-write carry.

View File

@@ -601,13 +601,12 @@ enable = false
write_interval = "30s"
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
## You must create the database before enabling it.
[export_metrics.self_import]
## +toml2docs:none-default
db = "greptime_metrics"
db = "information_schema"
[export_metrics.remote_write]
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema`.
url = ""
## HTTP headers of Prometheus remote-write carry.

View File

@@ -157,6 +157,26 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -306,6 +326,26 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -457,6 +497,26 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -608,6 +668,26 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -772,6 +852,26 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -923,6 +1023,26 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -1074,6 +1194,26 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -1225,6 +1365,26 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -1376,6 +1536,26 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -1527,6 +1707,26 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -1678,6 +1878,26 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -1842,6 +2062,26 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -1993,6 +2233,26 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -2157,6 +2417,26 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -2289,6 +2569,26 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -2451,6 +2751,26 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -2771,7 +3091,28 @@
},
"unit": "s"
},
"overrides": []
"overrides": [
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
"gridPos": {
"h": 8,
@@ -2901,6 +3242,26 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -3068,6 +3429,26 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -3217,6 +3598,26 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -3339,7 +3740,28 @@
},
"unit": "s"
},
"overrides": []
"overrides": [
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
"gridPos": {
"h": 8,
@@ -3679,6 +4101,26 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -3828,6 +4270,26 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -3977,6 +4439,26 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -4126,6 +4608,26 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -4275,6 +4777,26 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -4424,6 +4946,26 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -4573,6 +5115,26 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -4679,4 +5241,4 @@
"uid": "ea35efe5-918e-44fa-9743-e9aa1a340a3f",
"version": 11,
"weekStart": ""
}
}

View File

@@ -1,42 +0,0 @@
#!/usr/bin/env bash
set -e
RUST_TOOLCHAIN_VERSION_FILE="rust-toolchain.toml"
DEV_BUILDER_UBUNTU_REGISTRY="docker.io"
DEV_BUILDER_UBUNTU_NAMESPACE="greptime"
DEV_BUILDER_UBUNTU_NAME="dev-builder-ubuntu"
function check_rust_toolchain_version() {
DEV_BUILDER_IMAGE_TAG=$(grep "DEV_BUILDER_IMAGE_TAG ?= " Makefile | cut -d= -f2 | sed 's/^[ \t]*//')
if [ -z "$DEV_BUILDER_IMAGE_TAG" ]; then
echo "Error: No DEV_BUILDER_IMAGE_TAG found in Makefile"
exit 1
fi
DEV_BUILDER_UBUNTU_IMAGE="$DEV_BUILDER_UBUNTU_REGISTRY/$DEV_BUILDER_UBUNTU_NAMESPACE/$DEV_BUILDER_UBUNTU_NAME:$DEV_BUILDER_IMAGE_TAG"
CURRENT_VERSION=$(grep -Eo '[0-9]{4}-[0-9]{2}-[0-9]{2}' "$RUST_TOOLCHAIN_VERSION_FILE")
if [ -z "$CURRENT_VERSION" ]; then
echo "Error: No rust toolchain version found in $RUST_TOOLCHAIN_VERSION_FILE"
exit 1
fi
RUST_TOOLCHAIN_VERSION_IN_BUILDER=$(docker run "$DEV_BUILDER_UBUNTU_IMAGE" rustc --version | grep -Eo '[0-9]{4}-[0-9]{2}-[0-9]{2}')
if [ -z "$RUST_TOOLCHAIN_VERSION_IN_BUILDER" ]; then
echo "Error: No rustc version found in $DEV_BUILDER_UBUNTU_IMAGE"
exit 1
fi
# Compare the version and the difference should be less than 1 day.
current_rust_toolchain_seconds=$(date -d "$CURRENT_VERSION" +%s)
rust_toolchain_in_dev_builder_ubuntu_seconds=$(date -d "$RUST_TOOLCHAIN_VERSION_IN_BUILDER" +%s)
date_diff=$(( (current_rust_toolchain_seconds - rust_toolchain_in_dev_builder_ubuntu_seconds) / 86400 ))
if [ $date_diff -gt 1 ]; then
echo "Error: The rust toolchain '$RUST_TOOLCHAIN_VERSION_IN_BUILDER' in builder '$DEV_BUILDER_UBUNTU_IMAGE' maybe outdated, please update it to '$CURRENT_VERSION'"
exit 1
fi
}
check_rust_toolchain_version

View File

@@ -267,7 +267,7 @@ impl StartCommand {
&opts.component.tracing,
opts.component.node_id.map(|x| x.to_string()),
);
log_versions(version(), short_version(), APP_NAME);
log_versions(version(), short_version());
info!("Datanode start command: {:#?}", self);
info!("Datanode options: {:#?}", opts);

View File

@@ -215,7 +215,7 @@ impl StartCommand {
&opts.component.tracing,
opts.component.node_id.map(|x| x.to_string()),
);
log_versions(version(), short_version(), APP_NAME);
log_versions(version(), short_version());
info!("Flownode start command: {:#?}", self);
info!("Flownode options: {:#?}", opts);

View File

@@ -261,7 +261,7 @@ impl StartCommand {
&opts.component.tracing,
opts.component.node_id.clone(),
);
log_versions(version(), short_version(), APP_NAME);
log_versions(version(), short_version());
info!("Frontend start command: {:#?}", self);
info!("Frontend options: {:#?}", opts);

View File

@@ -30,7 +30,7 @@ pub mod standalone;
lazy_static::lazy_static! {
static ref APP_VERSION: prometheus::IntGaugeVec =
prometheus::register_int_gauge_vec!("greptime_app_version", "app version", &["version", "short_version", "app"]).unwrap();
prometheus::register_int_gauge_vec!("greptime_app_version", "app version", &["version", "short_version"]).unwrap();
}
#[async_trait]
@@ -76,10 +76,10 @@ pub trait App: Send {
/// Log the versions of the application, and the arguments passed to the cli.
/// `version` should be the same as the output of cli "--version";
/// and the `short_version` is the short version of the codes, often consist of git branch and commit.
pub fn log_versions(version: &str, short_version: &str, app: &str) {
pub fn log_versions(version: &str, short_version: &str) {
// Report app version as gauge.
APP_VERSION
.with_label_values(&[env!("CARGO_PKG_VERSION"), short_version, app])
.with_label_values(&[env!("CARGO_PKG_VERSION"), short_version])
.inc();
// Log version and argument flags.

View File

@@ -244,7 +244,7 @@ impl StartCommand {
&opts.component.tracing,
None,
);
log_versions(version(), short_version(), APP_NAME);
log_versions(version(), short_version());
info!("Metasrv start command: {:#?}", self);
info!("Metasrv options: {:#?}", opts);

View File

@@ -178,16 +178,6 @@ impl Configurable for StandaloneOptions {
}
}
/// The [`StandaloneOptions`] is only defined in cmd crate,
/// we don't want to make `frontend` depends on it, so impl [`Into`]
/// rather than [`From`].
#[allow(clippy::from_over_into)]
impl Into<FrontendOptions> for StandaloneOptions {
fn into(self) -> FrontendOptions {
self.frontend_options()
}
}
impl StandaloneOptions {
pub fn frontend_options(&self) -> FrontendOptions {
let cloned_opts = self.clone();
@@ -425,7 +415,7 @@ impl StartCommand {
&opts.component.tracing,
None,
);
log_versions(version(), short_version(), APP_NAME);
log_versions(version(), short_version());
info!("Standalone start command: {:#?}", self);
info!("Standalone options: {opts:#?}");
@@ -520,7 +510,7 @@ impl StartCommand {
.build(),
);
let wal_options_allocator = Arc::new(WalOptionsAllocator::new(
opts.wal.clone().into(),
opts.wal.into(),
kv_backend.clone(),
));
let table_meta_allocator = Arc::new(TableMetadataAllocator::new(
@@ -543,7 +533,7 @@ impl StartCommand {
.await?;
let mut frontend = FrontendBuilder::new(
fe_opts,
fe_opts.clone(),
kv_backend.clone(),
layered_cache_registry.clone(),
catalog_manager.clone(),
@@ -571,7 +561,7 @@ impl StartCommand {
let (tx, _rx) = broadcast::channel(1);
let servers = Services::new(opts, Arc::new(frontend.clone()), plugins)
let servers = Services::new(fe_opts, Arc::new(frontend.clone()), plugins)
.build()
.await
.context(StartFrontendSnafu)?;

View File

@@ -218,7 +218,6 @@ fn test_load_standalone_example_config() {
sst_meta_cache_size: ReadableSize::mb(128),
vector_cache_size: ReadableSize::mb(512),
page_cache_size: ReadableSize::mb(512),
selector_result_cache_size: ReadableSize::mb(512),
max_background_jobs: 4,
experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
..Default::default()

View File

@@ -7,10 +7,6 @@ license.workspace = true
[lints]
workspace = true
[features]
default = ["geo"]
geo = ["geohash", "h3o"]
[dependencies]
api.workspace = true
arc-swap = "1.0"
@@ -27,8 +23,6 @@ common-time.workspace = true
common-version.workspace = true
datafusion.workspace = true
datatypes.workspace = true
geohash = { version = "0.13", optional = true }
h3o = { version = "0.6", optional = true }
num = "0.4"
num-traits = "0.2"
once_cell.workspace = true

View File

@@ -116,10 +116,6 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
SystemFunction::register(&function_registry);
TableFunction::register(&function_registry);
// Geo functions
#[cfg(feature = "geo")]
crate::scalars::geo::GeoFunctions::register(&function_registry);
Arc::new(function_registry)
});

View File

@@ -15,8 +15,6 @@
pub mod aggregate;
pub(crate) mod date;
pub mod expression;
#[cfg(feature = "geo")]
pub mod geo;
pub mod matches;
pub mod math;
pub mod numpy;

View File

@@ -1,31 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
mod geohash;
mod h3;
use geohash::GeohashFunction;
use h3::H3Function;
use crate::function_registry::FunctionRegistry;
pub(crate) struct GeoFunctions;
impl GeoFunctions {
pub fn register(registry: &FunctionRegistry) {
registry.register(Arc::new(GeohashFunction));
registry.register(Arc::new(H3Function));
}
}

View File

@@ -1,135 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use common_error::ext::{BoxedError, PlainError};
use common_error::status_code::StatusCode;
use common_query::error::{self, InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, TypeSignature};
use datafusion::logical_expr::Volatility;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::value::Value;
use datatypes::vectors::{MutableVector, StringVectorBuilder, VectorRef};
use geohash::Coord;
use snafu::{ensure, ResultExt};
use crate::function::{Function, FunctionContext};
/// Function that return geohash string for a given geospatial coordinate.
#[derive(Clone, Debug, Default)]
pub struct GeohashFunction;
const NAME: &str = "geohash";
impl Function for GeohashFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::string_datatype())
}
fn signature(&self) -> Signature {
let mut signatures = Vec::new();
for coord_type in &[
ConcreteDataType::float32_datatype(),
ConcreteDataType::float64_datatype(),
] {
for resolution_type in &[
ConcreteDataType::int8_datatype(),
ConcreteDataType::int16_datatype(),
ConcreteDataType::int32_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::uint8_datatype(),
ConcreteDataType::uint16_datatype(),
ConcreteDataType::uint32_datatype(),
ConcreteDataType::uint64_datatype(),
] {
signatures.push(TypeSignature::Exact(vec![
// latitude
coord_type.clone(),
// longitude
coord_type.clone(),
// resolution
resolution_type.clone(),
]));
}
}
Signature::one_of(signatures, Volatility::Stable)
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 3,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 3, provided : {}",
columns.len()
),
}
);
let lat_vec = &columns[0];
let lon_vec = &columns[1];
let resolution_vec = &columns[2];
let size = lat_vec.len();
let mut results = StringVectorBuilder::with_capacity(size);
for i in 0..size {
let lat = lat_vec.get(i).as_f64_lossy();
let lon = lon_vec.get(i).as_f64_lossy();
let r = match resolution_vec.get(i) {
Value::Int8(v) => v as usize,
Value::Int16(v) => v as usize,
Value::Int32(v) => v as usize,
Value::Int64(v) => v as usize,
Value::UInt8(v) => v as usize,
Value::UInt16(v) => v as usize,
Value::UInt32(v) => v as usize,
Value::UInt64(v) => v as usize,
_ => unreachable!(),
};
let result = match (lat, lon) {
(Some(lat), Some(lon)) => {
let coord = Coord { x: lon, y: lat };
let encoded = geohash::encode(coord, r)
.map_err(|e| {
BoxedError::new(PlainError::new(
format!("Geohash error: {}", e),
StatusCode::EngineExecuteQuery,
))
})
.context(error::ExecuteSnafu)?;
Some(encoded)
}
_ => None,
};
results.push(result.as_deref());
}
Ok(results.to_vector())
}
}
impl fmt::Display for GeohashFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", NAME)
}
}

View File

@@ -1,143 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use common_error::ext::{BoxedError, PlainError};
use common_error::status_code::StatusCode;
use common_query::error::{self, InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, TypeSignature};
use datafusion::logical_expr::Volatility;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::value::Value;
use datatypes::vectors::{MutableVector, StringVectorBuilder, VectorRef};
use h3o::{LatLng, Resolution};
use snafu::{ensure, ResultExt};
use crate::function::{Function, FunctionContext};
/// Function that return h3 encoding string for a given geospatial coordinate.
#[derive(Clone, Debug, Default)]
pub struct H3Function;
const NAME: &str = "h3";
impl Function for H3Function {
fn name(&self) -> &str {
NAME
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::string_datatype())
}
fn signature(&self) -> Signature {
let mut signatures = Vec::new();
for coord_type in &[
ConcreteDataType::float32_datatype(),
ConcreteDataType::float64_datatype(),
] {
for resolution_type in &[
ConcreteDataType::int8_datatype(),
ConcreteDataType::int16_datatype(),
ConcreteDataType::int32_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::uint8_datatype(),
ConcreteDataType::uint16_datatype(),
ConcreteDataType::uint32_datatype(),
ConcreteDataType::uint64_datatype(),
] {
signatures.push(TypeSignature::Exact(vec![
// latitude
coord_type.clone(),
// longitude
coord_type.clone(),
// resolution
resolution_type.clone(),
]));
}
}
Signature::one_of(signatures, Volatility::Stable)
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 3,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 3, provided : {}",
columns.len()
),
}
);
let lat_vec = &columns[0];
let lon_vec = &columns[1];
let resolution_vec = &columns[2];
let size = lat_vec.len();
let mut results = StringVectorBuilder::with_capacity(size);
for i in 0..size {
let lat = lat_vec.get(i).as_f64_lossy();
let lon = lon_vec.get(i).as_f64_lossy();
let r = match resolution_vec.get(i) {
Value::Int8(v) => v as u8,
Value::Int16(v) => v as u8,
Value::Int32(v) => v as u8,
Value::Int64(v) => v as u8,
Value::UInt8(v) => v,
Value::UInt16(v) => v as u8,
Value::UInt32(v) => v as u8,
Value::UInt64(v) => v as u8,
_ => unreachable!(),
};
let result = match (lat, lon) {
(Some(lat), Some(lon)) => {
let coord = LatLng::new(lat, lon)
.map_err(|e| {
BoxedError::new(PlainError::new(
format!("H3 error: {}", e),
StatusCode::EngineExecuteQuery,
))
})
.context(error::ExecuteSnafu)?;
let r = Resolution::try_from(r as u8)
.map_err(|e| {
BoxedError::new(PlainError::new(
format!("H3 error: {}", e),
StatusCode::EngineExecuteQuery,
))
})
.context(error::ExecuteSnafu)?;
let encoded = coord.to_cell(r).to_string();
Some(encoded)
}
_ => None,
};
results.push(result.as_deref());
}
Ok(results.to_vector())
}
}
impl fmt::Display for H3Function {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", NAME)
}
}

View File

@@ -153,9 +153,6 @@ pub struct UpgradeRegion {
/// it's helpful to verify whether the leader region is ready.
#[serde(with = "humantime_serde")]
pub wait_for_replay_timeout: Option<Duration>,
/// The hint for replaying memtable.
#[serde(default)]
pub location_id: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]

View File

@@ -13,7 +13,7 @@ workspace = true
[dependencies]
async-stream.workspace = true
async-trait.workspace = true
backon = "1"
backon = "0.4"
common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true

View File

@@ -373,7 +373,7 @@ impl Runner {
procedure,
manager_ctx: self.manager_ctx.clone(),
step,
exponential_builder: self.exponential_builder,
exponential_builder: self.exponential_builder.clone(),
store: self.store.clone(),
rolling_back: false,
};

View File

@@ -172,13 +172,12 @@ impl ErrorExt for Error {
Error::DataTypes { .. }
| Error::CreateRecordBatches { .. }
| Error::PollStream { .. }
| Error::Format { .. }
| Error::ToArrowScalar { .. }
| Error::ProjectArrowRecordBatch { .. }
| Error::PhysicalExpr { .. } => StatusCode::Internal,
Error::PollStream { .. } => StatusCode::EngineExecuteQuery,
Error::ArrowCompute { .. } => StatusCode::IllegalState,
Error::ColumnNotExists { .. } => StatusCode::TableColumnNotFound,

View File

@@ -206,7 +206,6 @@ mod tests {
region_id,
last_entry_id: None,
wait_for_replay_timeout: None,
location_id: None,
});
assert!(
heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction)))

View File

@@ -27,7 +27,6 @@ impl HandlerContext {
region_id,
last_entry_id,
wait_for_replay_timeout,
location_id,
}: UpgradeRegion,
) -> BoxFuture<'static, InstructionReply> {
Box::pin(async move {
@@ -63,7 +62,6 @@ impl HandlerContext {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true,
entry_id: last_entry_id,
location_id,
}),
)
.await?;
@@ -153,7 +151,6 @@ mod tests {
region_id,
last_entry_id: None,
wait_for_replay_timeout,
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
@@ -194,7 +191,6 @@ mod tests {
region_id,
last_entry_id: None,
wait_for_replay_timeout,
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
@@ -236,7 +232,6 @@ mod tests {
region_id,
last_entry_id: None,
wait_for_replay_timeout,
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
@@ -279,9 +274,8 @@ mod tests {
.clone()
.handle_upgrade_region_instruction(UpgradeRegion {
region_id,
wait_for_replay_timeout,
last_entry_id: None,
location_id: None,
wait_for_replay_timeout,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
@@ -299,7 +293,6 @@ mod tests {
region_id,
last_entry_id: None,
wait_for_replay_timeout: Some(Duration::from_millis(500)),
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
@@ -344,7 +337,6 @@ mod tests {
region_id,
last_entry_id: None,
wait_for_replay_timeout: None,
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
@@ -362,7 +354,6 @@ mod tests {
region_id,
last_entry_id: None,
wait_for_replay_timeout: Some(Duration::from_millis(200)),
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));

View File

@@ -268,23 +268,6 @@ impl Value {
}
}
/// Cast Value to f32. Return None if it's not castable;
pub fn as_f64_lossy(&self) -> Option<f64> {
match self {
Value::Float32(v) => Some(v.0 as _),
Value::Float64(v) => Some(v.0),
Value::Int8(v) => Some(*v as _),
Value::Int16(v) => Some(*v as _),
Value::Int32(v) => Some(*v as _),
Value::Int64(v) => Some(*v as _),
Value::UInt8(v) => Some(*v as _),
Value::UInt16(v) => Some(*v as _),
Value::UInt32(v) => Some(*v as _),
Value::UInt64(v) => Some(*v as _),
_ => None,
}
}
/// Returns the logical type of the value.
pub fn logical_type_id(&self) -> LogicalTypeId {
match self {

View File

@@ -26,7 +26,6 @@ common-telemetry.workspace = true
common-time.workspace = true
common-wal.workspace = true
delta-encoding = "0.4"
derive_builder.workspace = true
futures.workspace = true
futures-util.workspace = true
itertools.workspace = true

View File

@@ -304,15 +304,6 @@ pub enum Error {
error: object_store::Error,
},
#[snafu(display("Failed to read index, path: {path}"))]
ReadIndex {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: object_store::Error,
path: String,
},
#[snafu(display(
"The length of meta if exceeded the limit: {}, actual: {}",
limit,

View File

@@ -14,10 +14,15 @@
pub(crate) mod client_manager;
pub(crate) mod consumer;
/// TODO(weny): remove it.
#[allow(dead_code)]
#[allow(unused_imports)]
pub(crate) mod index;
pub mod log_store;
pub(crate) mod producer;
pub(crate) mod util;
/// TODO(weny): remove it.
#[allow(dead_code)]
pub(crate) mod worker;
pub use index::{default_index_file, GlobalIndexCollector};

View File

@@ -19,7 +19,6 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use common_telemetry::debug;
use derive_builder::Builder;
use futures::future::{BoxFuture, Fuse, FusedFuture};
use futures::{FutureExt, Stream};
use pin_project::pin_project;
@@ -61,61 +60,40 @@ struct FetchResult {
used_offset: i64,
}
const MAX_BATCH_SIZE: usize = 52428800;
const AVG_RECORD_SIZE: usize = 256 * 1024;
/// The [`Consumer`] struct represents a Kafka consumer that fetches messages from
/// a Kafka cluster. Yielding records respecting the [`RegionWalIndexIterator`].
#[pin_project]
#[derive(Builder)]
#[builder(pattern = "owned")]
pub struct Consumer {
#[builder(default = "-1")]
last_high_watermark: i64,
/// The client is used to fetch records from kafka topic.
client: Arc<dyn FetchClient>,
/// The max batch size in a single fetch request.
#[builder(default = "MAX_BATCH_SIZE")]
max_batch_size: usize,
/// The max wait milliseconds.
#[builder(default = "500")]
max_wait_ms: u32,
/// The avg record size
#[builder(default = "AVG_RECORD_SIZE")]
avg_record_size: usize,
/// Termination flag
#[builder(default = "false")]
terminated: bool,
/// The buffer of records.
buffer: RecordsBuffer,
/// The fetch future.
#[builder(default = "Fuse::terminated()")]
fetch_fut: Fuse<BoxFuture<'static, rskafka::client::error::Result<FetchResult>>>,
}
pub(crate) struct RecordsBuffer {
struct RecordsBuffer {
buffer: VecDeque<RecordAndOffset>,
index: Box<dyn RegionWalIndexIterator>,
}
impl RecordsBuffer {
/// Creates an empty [`RecordsBuffer`]
pub fn new(index: Box<dyn RegionWalIndexIterator>) -> Self {
RecordsBuffer {
buffer: VecDeque::new(),
index,
}
}
}
impl RecordsBuffer {
fn pop_front(&mut self) -> Option<RecordAndOffset> {
while let Some(index) = self.index.peek() {

View File

@@ -20,11 +20,10 @@ pub use collector::GlobalIndexCollector;
pub(crate) use collector::{IndexCollector, NoopCollector};
pub(crate) use encoder::{IndexEncoder, JsonIndexEncoder};
pub(crate) use iterator::{
build_region_wal_index_iterator, NextBatchHint, RegionWalIndexIterator, MIN_BATCH_WINDOW_SIZE,
MultipleRegionWalIndexIterator, NextBatchHint, RegionWalIndexIterator, RegionWalRange,
RegionWalVecIndex,
};
#[cfg(test)]
pub(crate) use iterator::{MultipleRegionWalIndexIterator, RegionWalRange, RegionWalVecIndex};
pub fn default_index_file(location_id: u64) -> String {
format!("__wal/{location_id}/index.json")
pub fn default_index_file(datanode_id: u64) -> String {
format!("__datanode/{datanode_id}/index.json")
}

View File

@@ -19,7 +19,6 @@ use std::time::Duration;
use common_telemetry::{error, info};
use futures::future::try_join_all;
use object_store::ErrorKind;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::logstore::provider::KafkaProvider;
@@ -29,9 +28,8 @@ use tokio::select;
use tokio::sync::mpsc::Sender;
use tokio::sync::Mutex as TokioMutex;
use super::default_index_file;
use crate::error::{self, Result};
use crate::kafka::index::encoder::{DatanodeWalIndexes, IndexEncoder};
use crate::kafka::index::encoder::IndexEncoder;
use crate::kafka::index::JsonIndexEncoder;
use crate::kafka::worker::{DumpIndexRequest, TruncateIndexRequest, WorkerRequest};
@@ -54,11 +52,10 @@ pub trait IndexCollector: Send + Sync {
/// The [`GlobalIndexCollector`] struct is responsible for managing index entries
/// across multiple providers.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct GlobalIndexCollector {
providers: Arc<TokioMutex<HashMap<Arc<KafkaProvider>, Sender<WorkerRequest>>>>,
operator: object_store::ObjectStore,
_handle: CollectionTaskHandle,
task: CollectionTask,
}
#[derive(Debug, Clone)]
@@ -106,7 +103,7 @@ impl CollectionTask {
/// The background task performs two main operations:
/// - Persists the WAL index to the specified `path` at every `dump_index_interval`.
/// - Updates the latest index ID for each WAL provider at every `checkpoint_interval`.
fn run(self) -> CollectionTaskHandle {
fn run(&self) {
let mut dump_index_interval = tokio::time::interval(self.dump_index_interval);
let running = self.running.clone();
let moved_self = self.clone();
@@ -125,23 +122,15 @@ impl CollectionTask {
}
}
});
CollectionTaskHandle {
running: self.running.clone(),
}
}
}
impl Drop for CollectionTaskHandle {
impl Drop for CollectionTask {
fn drop(&mut self) {
self.running.store(false, Ordering::Relaxed);
}
}
#[derive(Debug, Default)]
struct CollectionTaskHandle {
running: Arc<AtomicBool>,
}
impl GlobalIndexCollector {
/// Constructs a [`GlobalIndexCollector`].
///
@@ -159,65 +148,16 @@ impl GlobalIndexCollector {
let task = CollectionTask {
providers: providers.clone(),
dump_index_interval,
operator: operator.clone(),
operator,
path,
running: Arc::new(AtomicBool::new(true)),
};
let handle = task.run();
Self {
providers,
operator,
_handle: handle,
}
}
#[cfg(test)]
pub fn new_for_test(operator: object_store::ObjectStore) -> Self {
Self {
providers: Default::default(),
operator,
_handle: Default::default(),
}
task.run();
Self { providers, task }
}
}
impl GlobalIndexCollector {
/// Retrieve [`EntryId`]s for a specified `region_id` in `datanode_id`
/// that are greater than or equal to a given `entry_id`.
pub(crate) async fn read_remote_region_index(
&self,
location_id: u64,
provider: &KafkaProvider,
region_id: RegionId,
entry_id: EntryId,
) -> Result<Option<(BTreeSet<EntryId>, EntryId)>> {
let path = default_index_file(location_id);
let bytes = match self.operator.read(&path).await {
Ok(bytes) => bytes.to_vec(),
Err(err) => {
if err.kind() == ErrorKind::NotFound {
return Ok(None);
} else {
return Err(err).context(error::ReadIndexSnafu { path });
}
}
};
match DatanodeWalIndexes::decode(&bytes)?.provider(provider) {
Some(indexes) => {
let last_index = indexes.last_index();
let indexes = indexes
.region(region_id)
.unwrap_or_default()
.split_off(&entry_id);
Ok(Some((indexes, last_index)))
}
None => Ok(None),
}
}
/// Creates a new [`ProviderLevelIndexCollector`] for a specified provider.
pub(crate) async fn provider_level_index_collector(
&self,
@@ -326,92 +266,3 @@ impl IndexCollector for NoopCollector {
fn dump(&mut self, _encoder: &dyn IndexEncoder) {}
}
#[cfg(test)]
mod tests {
use std::collections::{BTreeSet, HashMap};
use store_api::logstore::provider::KafkaProvider;
use store_api::storage::RegionId;
use crate::kafka::index::collector::RegionIndexes;
use crate::kafka::index::encoder::IndexEncoder;
use crate::kafka::index::JsonIndexEncoder;
use crate::kafka::{default_index_file, GlobalIndexCollector};
#[tokio::test]
async fn test_read_remote_region_index() {
let operator = object_store::ObjectStore::new(object_store::services::Memory::default())
.unwrap()
.finish();
let path = default_index_file(0);
let encoder = JsonIndexEncoder::default();
encoder.encode(
&KafkaProvider::new("my_topic_0".to_string()),
&RegionIndexes {
regions: HashMap::from([(RegionId::new(1, 1), BTreeSet::from([1, 5, 15]))]),
latest_entry_id: 20,
},
);
let bytes = encoder.finish().unwrap();
let mut writer = operator.writer(&path).await.unwrap();
writer.write(bytes).await.unwrap();
writer.close().await.unwrap();
let collector = GlobalIndexCollector::new_for_test(operator.clone());
// Index file doesn't exist
let result = collector
.read_remote_region_index(
1,
&KafkaProvider::new("my_topic_0".to_string()),
RegionId::new(1, 1),
1,
)
.await
.unwrap();
assert!(result.is_none());
// RegionId doesn't exist
let (indexes, last_index) = collector
.read_remote_region_index(
0,
&KafkaProvider::new("my_topic_0".to_string()),
RegionId::new(1, 2),
5,
)
.await
.unwrap()
.unwrap();
assert_eq!(indexes, BTreeSet::new());
assert_eq!(last_index, 20);
// RegionId(1, 1), Start EntryId: 5
let (indexes, last_index) = collector
.read_remote_region_index(
0,
&KafkaProvider::new("my_topic_0".to_string()),
RegionId::new(1, 1),
5,
)
.await
.unwrap()
.unwrap();
assert_eq!(indexes, BTreeSet::from([5, 15]));
assert_eq!(last_index, 20);
// RegionId(1, 1), Start EntryId: 20
let (indexes, last_index) = collector
.read_remote_region_index(
0,
&KafkaProvider::new("my_topic_0".to_string()),
RegionId::new(1, 1),
20,
)
.await
.unwrap()
.unwrap();
assert_eq!(indexes, BTreeSet::new());
assert_eq!(last_index, 20);
}
}

View File

@@ -50,7 +50,7 @@ pub struct DeltaEncodedRegionIndexes {
impl DeltaEncodedRegionIndexes {
/// Retrieves the original (decoded) index values for a given region.
pub(crate) fn region(&self, region_id: RegionId) -> Option<BTreeSet<u64>> {
fn region(&self, region_id: RegionId) -> Option<BTreeSet<u64>> {
let decoded = self
.regions
.get(&region_id)
@@ -60,7 +60,7 @@ impl DeltaEncodedRegionIndexes {
}
/// Retrieves the last index.
pub(crate) fn last_index(&self) -> u64 {
fn last_index(&self) -> u64 {
self.last_index
}
}
@@ -86,7 +86,7 @@ impl DatanodeWalIndexes {
value
}
pub(crate) fn decode(byte: &[u8]) -> Result<Self> {
fn decode(byte: &[u8]) -> Result<Self> {
serde_json::from_slice(byte).context(error::DecodeJsonSnafu)
}
@@ -118,7 +118,7 @@ impl IndexEncoder for JsonIndexEncoder {
#[cfg(test)]
mod tests {
use std::collections::{BTreeSet, HashMap};
use std::collections::{BTreeSet, HashMap, HashSet};
use store_api::logstore::provider::KafkaProvider;
use store_api::storage::RegionId;

View File

@@ -12,9 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::cmp::{max, min};
use std::collections::{BTreeSet, VecDeque};
use std::fmt::Debug;
use std::cmp::min;
use std::collections::VecDeque;
use std::ops::Range;
use store_api::logstore::EntryId;
@@ -28,7 +27,7 @@ pub(crate) struct NextBatchHint {
}
/// An iterator over WAL (Write-Ahead Log) entries index for a region.
pub trait RegionWalIndexIterator: Send + Sync + Debug {
pub trait RegionWalIndexIterator: Send + Sync {
/// Returns next batch hint.
fn next_batch_hint(&self, avg_size: usize) -> Option<NextBatchHint>;
@@ -37,13 +36,9 @@ pub trait RegionWalIndexIterator: Send + Sync + Debug {
// Advances the iterator and returns the next EntryId.
fn next(&mut self) -> Option<EntryId>;
#[cfg(test)]
fn as_any(&self) -> &dyn std::any::Any;
}
/// Represents a range [next_entry_id, end_entry_id) of WAL entries for a region.
#[derive(Debug)]
pub struct RegionWalRange {
current_entry_id: EntryId,
end_entry_id: EntryId,
@@ -101,18 +96,10 @@ impl RegionWalIndexIterator for RegionWalRange {
None
}
}
#[cfg(test)]
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
pub const MIN_BATCH_WINDOW_SIZE: usize = 4 * 1024 * 1024;
/// Represents an index of Write-Ahead Log entries for a region,
/// stored as a vector of [EntryId]s.
#[derive(Debug)]
pub struct RegionWalVecIndex {
index: VecDeque<EntryId>,
min_batch_window_size: usize,
@@ -147,17 +134,11 @@ impl RegionWalIndexIterator for RegionWalVecIndex {
fn next(&mut self) -> Option<EntryId> {
self.index.pop_front()
}
#[cfg(test)]
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
/// Represents an iterator over multiple region WAL indexes.
///
/// Allowing iteration through multiple WAL indexes.
#[derive(Debug)]
pub struct MultipleRegionWalIndexIterator {
iterator: VecDeque<Box<dyn RegionWalIndexIterator>>,
}
@@ -204,53 +185,6 @@ impl RegionWalIndexIterator for MultipleRegionWalIndexIterator {
self.iterator.front_mut().and_then(|iter| iter.next())
}
#[cfg(test)]
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
/// Builds [`RegionWalIndexIterator`].
///
/// Returns None means there are no entries to replay.
pub fn build_region_wal_index_iterator(
start_entry_id: EntryId,
end_entry_id: EntryId,
region_indexes: Option<(BTreeSet<EntryId>, EntryId)>,
max_batch_bytes: usize,
min_window_size: usize,
) -> Option<Box<dyn RegionWalIndexIterator>> {
if (start_entry_id..end_entry_id).is_empty() {
return None;
}
match region_indexes {
Some((region_indexes, last_index)) => {
if region_indexes.is_empty() && last_index >= end_entry_id {
return None;
}
let mut iterator: Vec<Box<dyn RegionWalIndexIterator>> = Vec::with_capacity(2);
if !region_indexes.is_empty() {
let index = RegionWalVecIndex::new(region_indexes, min_window_size);
iterator.push(Box::new(index));
}
let known_last_index = max(last_index, start_entry_id);
if known_last_index < end_entry_id {
let range = known_last_index..end_entry_id;
let index = RegionWalRange::new(range, max_batch_bytes);
iterator.push(Box::new(index));
}
Some(Box::new(MultipleRegionWalIndexIterator::new(iterator)))
}
None => {
let range = start_entry_id..end_entry_id;
Some(Box::new(RegionWalRange::new(range, max_batch_bytes)))
}
}
}
#[cfg(test)]
@@ -419,69 +353,4 @@ mod tests {
assert_eq!(iter.peek(), None);
assert_eq!(iter.next(), None);
}
#[test]
fn test_build_region_wal_index_iterator() {
let iterator = build_region_wal_index_iterator(1024, 1024, None, 5, 5);
assert!(iterator.is_none());
let iterator = build_region_wal_index_iterator(1024, 1023, None, 5, 5);
assert!(iterator.is_none());
let iterator =
build_region_wal_index_iterator(1024, 1024, Some((BTreeSet::new(), 1024)), 5, 5);
assert!(iterator.is_none());
let iterator =
build_region_wal_index_iterator(1, 1024, Some((BTreeSet::new(), 1024)), 5, 5);
assert!(iterator.is_none());
let iterator =
build_region_wal_index_iterator(1, 1024, Some((BTreeSet::new(), 1025)), 5, 5);
assert!(iterator.is_none());
let iterator = build_region_wal_index_iterator(
1,
1024,
Some((BTreeSet::from([512, 756]), 1024)),
5,
5,
)
.unwrap();
let iter = iterator
.as_any()
.downcast_ref::<MultipleRegionWalIndexIterator>()
.unwrap();
assert_eq!(iter.iterator.len(), 1);
let vec_index = iter.iterator[0]
.as_any()
.downcast_ref::<RegionWalVecIndex>()
.unwrap();
assert_eq!(vec_index.index, VecDeque::from([512, 756]));
let iterator = build_region_wal_index_iterator(
1,
1024,
Some((BTreeSet::from([512, 756]), 1023)),
5,
5,
)
.unwrap();
let iter = iterator
.as_any()
.downcast_ref::<MultipleRegionWalIndexIterator>()
.unwrap();
assert_eq!(iter.iterator.len(), 2);
let vec_index = iter.iterator[0]
.as_any()
.downcast_ref::<RegionWalVecIndex>()
.unwrap();
assert_eq!(vec_index.index, VecDeque::from([512, 756]));
let wal_range = iter.iterator[1]
.as_any()
.downcast_ref::<RegionWalRange>()
.unwrap();
assert_eq!(wal_range.current_entry_id, 1023);
assert_eq!(wal_range.end_entry_id, 1024);
}
}

View File

@@ -20,20 +20,19 @@ use common_telemetry::{debug, warn};
use common_wal::config::kafka::DatanodeKafkaConfig;
use futures::future::try_join_all;
use futures_util::StreamExt;
use rskafka::client::consumer::{StartOffset, StreamConsumerBuilder};
use rskafka::client::partition::OffsetAt;
use snafu::{OptionExt, ResultExt};
use store_api::logstore::entry::{
Entry, Id as EntryId, MultiplePartEntry, MultiplePartHeader, NaiveEntry,
};
use store_api::logstore::provider::{KafkaProvider, Provider};
use store_api::logstore::{AppendBatchResponse, LogStore, SendableEntryStream, WalIndex};
use store_api::logstore::{AppendBatchResponse, LogStore, SendableEntryStream};
use store_api::storage::RegionId;
use super::index::build_region_wal_index_iterator;
use crate::error::{self, ConsumeRecordSnafu, Error, GetOffsetSnafu, InvalidProviderSnafu, Result};
use crate::kafka::client_manager::{ClientManager, ClientManagerRef};
use crate::kafka::consumer::{ConsumerBuilder, RecordsBuffer};
use crate::kafka::index::{GlobalIndexCollector, MIN_BATCH_WINDOW_SIZE};
use crate::kafka::index::GlobalIndexCollector;
use crate::kafka::producer::OrderedBatchProducerRef;
use crate::kafka::util::record::{
convert_to_kafka_records, maybe_emit_entry, remaining_entries, Record, ESTIMATED_META_SIZE,
@@ -206,7 +205,6 @@ impl LogStore for KafkaLogStore {
&self,
provider: &Provider,
entry_id: EntryId,
index: Option<WalIndex>,
) -> Result<SendableEntryStream<'static, Entry, Self::Error>> {
let provider = provider
.as_kafka_provider()
@@ -234,41 +232,35 @@ impl LogStore for KafkaLogStore {
.await
.context(GetOffsetSnafu {
topic: &provider.topic,
})?;
})?
- 1;
// Reads entries with offsets in the range [start_offset, end_offset].
let start_offset = entry_id as i64;
let region_indexes = if let (Some(index), Some(collector)) =
(index, self.client_manager.global_index_collector())
{
collector
.read_remote_region_index(index.location_id, provider, index.region_id, entry_id)
.await?
} else {
None
};
debug!(
"Start reading entries in range [{}, {}] for ns {}",
start_offset, end_offset, provider
);
let Some(iterator) = build_region_wal_index_iterator(
entry_id,
end_offset as u64,
region_indexes,
self.max_batch_bytes,
MIN_BATCH_WINDOW_SIZE,
) else {
let range = entry_id..end_offset as u64;
warn!("No new entries in range {:?} of ns {}", range, provider);
// Abort if there're no new entries.
// FIXME(niebayes): how come this case happens?
if start_offset > end_offset {
warn!(
"No new entries for ns {} in range [{}, {}]",
provider, start_offset, end_offset
);
return Ok(futures_util::stream::empty().boxed());
};
}
debug!("Reading entries with {:?} of ns {}", iterator, provider);
let mut stream_consumer = StreamConsumerBuilder::new(client, StartOffset::At(start_offset))
.with_max_batch_size(self.max_batch_bytes as i32)
.with_max_wait_ms(self.consumer_wait_timeout.as_millis() as i32)
.build();
// Safety: Must be ok.
let mut stream_consumer = ConsumerBuilder::default()
.client(client)
// Safety: checked before.
.buffer(RecordsBuffer::new(iterator))
.max_batch_size(self.max_batch_bytes)
.max_wait_ms(self.consumer_wait_timeout.as_millis() as u32)
.build()
.unwrap();
debug!(
"Built a stream consumer for ns {} to consume entries in range [{}, {}]",
provider, start_offset, end_offset
);
// A buffer is used to collect records to construct a complete entry.
let mut entry_records: HashMap<RegionId, Vec<Record>> = HashMap::new();
@@ -519,7 +511,7 @@ mod tests {
// 5 region
assert_eq!(response.last_entry_ids.len(), 5);
let got_entries = logstore
.read(&provider, 0, None)
.read(&provider, 0)
.await
.unwrap()
.try_collect::<Vec<_>>()
@@ -592,7 +584,7 @@ mod tests {
// 5 region
assert_eq!(response.last_entry_ids.len(), 5);
let got_entries = logstore
.read(&provider, 0, None)
.read(&provider, 0)
.await
.unwrap()
.try_collect::<Vec<_>>()

View File

@@ -25,7 +25,7 @@ use raft_engine::{Config, Engine, LogBatch, MessageExt, ReadableSize, RecoveryMo
use snafu::{ensure, OptionExt, ResultExt};
use store_api::logstore::entry::{Entry, Id as EntryId, NaiveEntry};
use store_api::logstore::provider::{Provider, RaftEngineProvider};
use store_api::logstore::{AppendBatchResponse, LogStore, SendableEntryStream, WalIndex};
use store_api::logstore::{AppendBatchResponse, LogStore, SendableEntryStream};
use store_api::storage::RegionId;
use crate::error::{
@@ -252,7 +252,6 @@ impl LogStore for RaftEngineLogStore {
&self,
provider: &Provider,
entry_id: EntryId,
_index: Option<WalIndex>,
) -> Result<SendableEntryStream<'static, Entry, Self::Error>> {
let ns = provider
.as_raft_engine_provider()
@@ -546,7 +545,7 @@ mod tests {
}
let mut entries = HashSet::with_capacity(1024);
let mut s = logstore
.read(&Provider::raft_engine_provider(1), 0, None)
.read(&Provider::raft_engine_provider(1), 0)
.await
.unwrap();
while let Some(r) = s.next().await {
@@ -579,7 +578,7 @@ mod tests {
.await
.is_ok());
let entries = logstore
.read(&Provider::raft_engine_provider(1), 1, None)
.read(&Provider::raft_engine_provider(1), 1)
.await
.unwrap()
.collect::<Vec<_>>()
@@ -597,7 +596,7 @@ mod tests {
let entries = collect_entries(
logstore
.read(&Provider::raft_engine_provider(1), 1, None)
.read(&Provider::raft_engine_provider(1), 1)
.await
.unwrap(),
)
@@ -683,7 +682,7 @@ mod tests {
logstore.obsolete(&namespace, region_id, 100).await.unwrap();
assert_eq!(101, logstore.engine.first_index(namespace_id).unwrap());
let res = logstore.read(&namespace, 100, None).await.unwrap();
let res = logstore.read(&namespace, 100).await.unwrap();
let mut vec = collect_entries(res).await;
vec.sort_by(|a, b| a.entry_id().partial_cmp(&b.entry_id()).unwrap());
assert_eq!(101, vec.first().unwrap().entry_id());

View File

@@ -90,7 +90,6 @@ impl UpgradeCandidateRegion {
region_id,
last_entry_id,
wait_for_replay_timeout: Some(self.replay_timeout),
location_id: Some(ctx.persistent_ctx.from_peer.id),
})
}

View File

@@ -41,7 +41,6 @@ impl MetricEngineInner {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: req.set_writable,
entry_id: None,
location_id: req.location_id,
}),
)
.await
@@ -53,7 +52,6 @@ impl MetricEngineInner {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: req.set_writable,
entry_id: req.entry_id,
location_id: req.location_id,
}),
)
.await

View File

@@ -94,7 +94,6 @@ async fn test_catchup_with_last_entry_id() {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false,
entry_id: last_entry_id,
location_id: None,
}),
)
.await;
@@ -126,7 +125,6 @@ async fn test_catchup_with_last_entry_id() {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true,
entry_id: last_entry_id,
location_id: None,
}),
)
.await;
@@ -193,7 +191,6 @@ async fn test_catchup_with_incorrect_last_entry_id() {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false,
entry_id: incorrect_last_entry_id,
location_id: None,
}),
)
.await
@@ -210,7 +207,6 @@ async fn test_catchup_with_incorrect_last_entry_id() {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false,
entry_id: incorrect_last_entry_id,
location_id: None,
}),
)
.await;
@@ -259,7 +255,6 @@ async fn test_catchup_without_last_entry_id() {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false,
entry_id: None,
location_id: None,
}),
)
.await;
@@ -290,7 +285,6 @@ async fn test_catchup_without_last_entry_id() {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true,
entry_id: None,
location_id: None,
}),
)
.await;
@@ -360,7 +354,6 @@ async fn test_catchup_with_manifest_update() {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false,
entry_id: None,
location_id: None,
}),
)
.await;
@@ -397,7 +390,6 @@ async fn test_catchup_with_manifest_update() {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true,
entry_id: None,
location_id: None,
}),
)
.await;
@@ -419,7 +411,6 @@ async fn test_catchup_not_exist() {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true,
entry_id: None,
location_id: None,
}),
)
.await

View File

@@ -202,8 +202,8 @@ impl RegionOpener {
options.need_dedup(),
options.merge_mode(),
);
let part_duration = options.compaction.time_window();
// Initial memtable id is 0.
let part_duration = options.compaction.time_window();
let mutable = Arc::new(TimePartitions::new(
metadata.clone(),
memtable_builder.clone(),
@@ -313,7 +313,7 @@ impl RegionOpener {
let wal_entry_reader = self
.wal_entry_reader
.take()
.unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
.unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id));
let on_region_opened = wal.on_region_opened();
let object_store = self.object_store(&region_options.storage)?.clone();
@@ -335,13 +335,8 @@ impl RegionOpener {
region_options.need_dedup(),
region_options.merge_mode(),
);
// Use compaction time window in the manifest if region doesn't provide
// the time window option.
let part_duration = region_options
.compaction
.time_window()
.or(manifest.compaction_time_window);
// Initial memtable id is 0.
let part_duration = region_options.compaction.time_window();
let mutable = Arc::new(TimePartitions::new(
metadata.clone(),
memtable_builder.clone(),

View File

@@ -30,7 +30,7 @@ use prost::Message;
use snafu::ResultExt;
use store_api::logstore::entry::Entry;
use store_api::logstore::provider::Provider;
use store_api::logstore::{AppendBatchResponse, LogStore, WalIndex};
use store_api::logstore::{AppendBatchResponse, LogStore};
use store_api::storage::RegionId;
use crate::error::{BuildEntrySnafu, DeleteWalSnafu, EncodeWalSnafu, Result, WriteWalSnafu};
@@ -102,24 +102,15 @@ impl<S: LogStore> Wal<S> {
&self,
provider: &Provider,
region_id: RegionId,
location_id: Option<u64>,
) -> Box<dyn WalEntryReader> {
match provider {
Provider::RaftEngine(_) => Box::new(LogStoreEntryReader::new(
LogStoreRawEntryReader::new(self.store.clone()),
)),
Provider::Kafka(_) => {
let reader = if let Some(location_id) = location_id {
LogStoreRawEntryReader::new(self.store.clone())
.with_wal_index(WalIndex::new(region_id, location_id))
} else {
LogStoreRawEntryReader::new(self.store.clone())
};
Box::new(LogStoreEntryReader::new(RegionRawEntryReader::new(
reader, region_id,
)))
}
Provider::Kafka(_) => Box::new(LogStoreEntryReader::new(RegionRawEntryReader::new(
LogStoreRawEntryReader::new(self.store.clone()),
region_id,
))),
}
}

View File

@@ -20,7 +20,7 @@ use futures::stream::BoxStream;
use snafu::ResultExt;
use store_api::logstore::entry::Entry;
use store_api::logstore::provider::Provider;
use store_api::logstore::{LogStore, WalIndex};
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
use tokio_stream::StreamExt;
@@ -38,20 +38,11 @@ pub(crate) trait RawEntryReader: Send + Sync {
/// Implement the [RawEntryReader] for the [LogStore].
pub struct LogStoreRawEntryReader<S> {
store: Arc<S>,
wal_index: Option<WalIndex>,
}
impl<S> LogStoreRawEntryReader<S> {
pub fn new(store: Arc<S>) -> Self {
Self {
store,
wal_index: None,
}
}
pub fn with_wal_index(mut self, wal_index: WalIndex) -> Self {
self.wal_index = Some(wal_index);
self
Self { store }
}
}
@@ -59,10 +50,9 @@ impl<S: LogStore> RawEntryReader for LogStoreRawEntryReader<S> {
fn read(&self, provider: &Provider, start_id: EntryId) -> Result<EntryStream<'static>> {
let store = self.store.clone();
let provider = provider.clone();
let wal_index = self.wal_index;
let stream = try_stream!({
let mut stream = store
.read(&provider, start_id, wal_index)
.read(&provider, start_id)
.await
.map_err(BoxedError::new)
.with_context(|_| error::ReadWalSnafu {
@@ -129,9 +119,7 @@ mod tests {
use futures::{stream, TryStreamExt};
use store_api::logstore::entry::{Entry, NaiveEntry};
use store_api::logstore::{
AppendBatchResponse, EntryId, LogStore, SendableEntryStream, WalIndex,
};
use store_api::logstore::{AppendBatchResponse, EntryId, LogStore, SendableEntryStream};
use store_api::storage::RegionId;
use super::*;
@@ -161,7 +149,6 @@ mod tests {
&self,
_provider: &Provider,
_id: EntryId,
_index: Option<WalIndex>,
) -> Result<SendableEntryStream<'static, Entry, Self::Error>, Self::Error> {
Ok(Box::pin(stream::iter(vec![Ok(self.entries.clone())])))
}

View File

@@ -74,9 +74,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let flushed_entry_id = region.version_control.current().last_entry_id;
info!("Trying to replay memtable for region: {region_id}, flushed entry id: {flushed_entry_id}");
let timer = Instant::now();
let wal_entry_reader =
self.wal
.wal_entry_reader(&region.provider, region_id, request.location_id);
let wal_entry_reader = self.wal.wal_entry_reader(&region.provider, region_id);
let on_region_opened = self.wal.on_region_opened();
let last_entry_id = replay_memtable(
&region.provider,
@@ -95,8 +93,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
);
if let Some(expected_last_entry_id) = request.entry_id {
ensure!(
// The replayed last entry id may be greater than the `expected_last_entry_id`.
last_entry_id >= expected_last_entry_id,
expected_last_entry_id == last_entry_id,
error::UnexpectedReplaySnafu {
region_id,
expected_last_entry_id,

View File

@@ -41,6 +41,7 @@ use datafusion_physical_expr::EquivalenceProperties;
use datatypes::schema::{Schema, SchemaRef};
use futures_util::StreamExt;
use greptime_proto::v1::region::RegionRequestHeader;
use greptime_proto::v1::QueryContext;
use meter_core::data::ReadItem;
use meter_macros::read_meter;
use session::context::QueryContextRef;
@@ -184,25 +185,24 @@ impl MergeScanExec {
context: Arc<TaskContext>,
partition: usize,
) -> Result<SendableRecordBatchStream> {
// prepare states to move
let regions = self.regions.clone();
let region_query_handler = self.region_query_handler.clone();
let metric = MergeScanMetric::new(&self.metric);
let schema = self.schema.clone();
let query_ctx = self.query_ctx.clone();
let sub_stage_metrics_moved = self.sub_stage_metrics.clone();
let plan = self.plan.clone();
let target_partition = self.target_partition;
let schema = Self::arrow_schema_to_schema(self.schema())?;
let dbname = context.task_id().unwrap_or_default();
let tracing_context = TracingContext::from_json(context.session_id().as_str());
let current_catalog = self.query_ctx.current_catalog().to_string();
let current_schema = self.query_ctx.current_schema().to_string();
let current_channel = self.query_ctx.channel();
let timezone = self.query_ctx.timezone().to_string();
let extensions = self.query_ctx.extensions();
let target_partition = self.target_partition;
let sub_stage_metrics_moved = self.sub_stage_metrics.clone();
let plan = self.plan.clone();
let stream = Box::pin(stream!({
// only report metrics once for each MergeScan
if partition == 0 {
MERGE_SCAN_REGIONS.observe(regions.len() as f64);
}
MERGE_SCAN_REGIONS.observe(regions.len() as f64);
let _finish_timer = metric.finish_time().timer();
let mut ready_timer = metric.ready_time().timer();
let mut first_consume_timer = Some(metric.first_consume_time().timer());
@@ -217,7 +217,13 @@ impl MergeScanExec {
header: Some(RegionRequestHeader {
tracing_context: tracing_context.to_w3c(),
dbname: dbname.clone(),
query_context: Some(query_ctx.as_ref().into()),
query_context: Some(QueryContext {
current_catalog: current_catalog.clone(),
current_schema: current_schema.clone(),
timezone: timezone.clone(),
extensions: extensions.clone(),
channel: current_channel as u32,
}),
}),
region_id,
plan: plan.clone(),

View File

@@ -63,10 +63,9 @@ impl ParallelizeScan {
);
// update the partition ranges
let new_exec = region_scan_exec
.with_new_partitions(partition_ranges)
region_scan_exec
.set_partitions(partition_ranges)
.map_err(|e| DataFusionError::External(e.into_inner()))?;
return Ok(Transformed::yes(Arc::new(new_exec)));
}
// The plan might be modified, but it's modified in-place so we always return
@@ -81,15 +80,11 @@ impl ParallelizeScan {
/// Distribute [`PartitionRange`]s to each partition.
///
/// Currently we use a simple round-robin strategy to assign ranges to partitions.
/// This method may return partitions with smaller number than `expected_partition_num`
/// if the number of ranges is smaller than `expected_partition_num`. But this will
/// return at least one partition.
fn assign_partition_range(
ranges: Vec<PartitionRange>,
expected_partition_num: usize,
) -> Vec<Vec<PartitionRange>> {
let actual_partition_num = expected_partition_num.min(ranges.len()).max(1);
let mut partition_ranges = vec![vec![]; actual_partition_num];
let mut partition_ranges = vec![vec![]; expected_partition_num];
// round-robin assignment
for (i, range) in ranges.into_iter().enumerate() {
@@ -100,112 +95,3 @@ impl ParallelizeScan {
partition_ranges
}
}
#[cfg(test)]
mod test {
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use super::*;
#[test]
fn test_assign_partition_range() {
let ranges = vec![
PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
end: Timestamp::new(10, TimeUnit::Second),
estimated_size: 100,
identifier: 1,
},
PartitionRange {
start: Timestamp::new(10, TimeUnit::Second),
end: Timestamp::new(20, TimeUnit::Second),
estimated_size: 200,
identifier: 2,
},
PartitionRange {
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
estimated_size: 150,
identifier: 3,
},
PartitionRange {
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
estimated_size: 250,
identifier: 4,
},
];
// assign to 2 partitions
let expected_partition_num = 2;
let result =
ParallelizeScan::assign_partition_range(ranges.clone(), expected_partition_num);
let expected = vec![
vec![
PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
end: Timestamp::new(10, TimeUnit::Second),
estimated_size: 100,
identifier: 1,
},
PartitionRange {
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
estimated_size: 150,
identifier: 3,
},
],
vec![
PartitionRange {
start: Timestamp::new(10, TimeUnit::Second),
end: Timestamp::new(20, TimeUnit::Second),
estimated_size: 200,
identifier: 2,
},
PartitionRange {
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
estimated_size: 250,
identifier: 4,
},
],
];
assert_eq!(result, expected);
// assign 4 ranges to 5 partitions. Only 4 partitions are returned.
let expected_partition_num = 5;
let result = ParallelizeScan::assign_partition_range(ranges, expected_partition_num);
let expected = vec![
vec![PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
end: Timestamp::new(10, TimeUnit::Second),
estimated_size: 100,
identifier: 1,
}],
vec![PartitionRange {
start: Timestamp::new(10, TimeUnit::Second),
end: Timestamp::new(20, TimeUnit::Second),
estimated_size: 200,
identifier: 2,
}],
vec![PartitionRange {
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
estimated_size: 150,
identifier: 3,
}],
vec![PartitionRange {
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
estimated_size: 250,
identifier: 4,
}],
];
assert_eq!(result, expected);
// assign 0 ranges to 5 partitions. Only 1 partition is returned.
let result = ParallelizeScan::assign_partition_range(vec![], 5);
assert_eq!(result.len(), 1);
}
}

View File

@@ -59,7 +59,7 @@ pub struct SelfImportOption {
impl Default for SelfImportOption {
fn default() -> Self {
Self {
db: "greptime_metrics".to_string(),
db: "information_schema".to_string(),
}
}
}

View File

@@ -137,12 +137,6 @@ impl From<QueryContext> for api::v1::QueryContext {
}
}
impl From<&QueryContext> for api::v1::QueryContext {
fn from(ctx: &QueryContext) -> Self {
ctx.clone().into()
}
}
impl QueryContext {
pub fn arc() -> QueryContextRef {
Arc::new(QueryContextBuilder::default().build())

View File

@@ -30,22 +30,6 @@ pub use crate::logstore::entry::Id as EntryId;
use crate::logstore::provider::Provider;
use crate::storage::RegionId;
// The information used to locate WAL index for the specified region.
#[derive(Debug, Clone, Copy)]
pub struct WalIndex {
pub region_id: RegionId,
pub location_id: u64,
}
impl WalIndex {
pub fn new(region_id: RegionId, location_id: u64) -> Self {
Self {
region_id,
location_id,
}
}
}
/// `LogStore` serves as a Write-Ahead-Log for storage engine.
#[async_trait::async_trait]
pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
@@ -64,7 +48,6 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
&self,
provider: &Provider,
id: EntryId,
index: Option<WalIndex>,
) -> Result<SendableEntryStream<'static, Entry, Self::Error>, Self::Error>;
/// Creates a new `Namespace` from the given ref.

View File

@@ -143,7 +143,7 @@ impl ScannerPartitioning {
}
/// Represents one data range within a partition
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, Copy)]
pub struct PartitionRange {
/// Start time of time index column. Inclusive.
pub start: Timestamp,

View File

@@ -673,8 +673,6 @@ pub struct RegionCatchupRequest {
/// The `entry_id` that was expected to reply to.
/// `None` stands replaying to latest.
pub entry_id: Option<entry::Id>,
/// The hint for replaying memtable.
pub location_id: Option<u64>,
}
impl fmt::Display for RegionRequest {

View File

@@ -41,7 +41,7 @@ use crate::table::metrics::StreamMetrics;
/// A plan to read multiple partitions from a region of a table.
#[derive(Debug)]
pub struct RegionScanExec {
scanner: Arc<Mutex<RegionScannerRef>>,
scanner: Mutex<RegionScannerRef>,
arrow_schema: ArrowSchemaRef,
/// The expected output ordering for the plan.
output_ordering: Option<Vec<PhysicalSortExpr>>,
@@ -70,7 +70,7 @@ impl RegionScanExec {
let append_mode = scanner_props.append_mode();
let total_rows = scanner_props.total_rows();
Self {
scanner: Arc::new(Mutex::new(scanner)),
scanner: Mutex::new(scanner),
arrow_schema,
output_ordering: None,
metric: ExecutionPlanMetricsSet::new(),
@@ -102,28 +102,9 @@ impl RegionScanExec {
}
/// Update the partition ranges of underlying scanner.
pub fn with_new_partitions(
&self,
partitions: Vec<Vec<PartitionRange>>,
) -> Result<Self, BoxedError> {
let num_partitions = partitions.len();
let mut properties = self.properties.clone();
properties.partitioning = Partitioning::UnknownPartitioning(num_partitions);
{
let mut scanner = self.scanner.lock().unwrap();
scanner.prepare(partitions)?;
}
Ok(Self {
scanner: self.scanner.clone(),
arrow_schema: self.arrow_schema.clone(),
output_ordering: self.output_ordering.clone(),
metric: self.metric.clone(),
properties,
append_mode: self.append_mode,
total_rows: self.total_rows,
})
pub fn set_partitions(&self, partitions: Vec<Vec<PartitionRange>>) -> Result<(), BoxedError> {
let mut scanner = self.scanner.lock().unwrap();
scanner.prepare(partitions)
}
}

View File

@@ -1,48 +0,0 @@
SELECT h3(37.76938, -122.3889, 0);
+---------------------------------------------------+
| h3(Float64(37.76938),Float64(-122.3889),Int64(0)) |
+---------------------------------------------------+
| 8029fffffffffff |
+---------------------------------------------------+
SELECT h3(37.76938, -122.3889, 1);
+---------------------------------------------------+
| h3(Float64(37.76938),Float64(-122.3889),Int64(1)) |
+---------------------------------------------------+
| 81283ffffffffff |
+---------------------------------------------------+
SELECT h3(37.76938, -122.3889, 8);
+---------------------------------------------------+
| h3(Float64(37.76938),Float64(-122.3889),Int64(8)) |
+---------------------------------------------------+
| 88283082e7fffff |
+---------------------------------------------------+
SELECT geohash(37.76938, -122.3889, 9);
+--------------------------------------------------------+
| geohash(Float64(37.76938),Float64(-122.3889),Int64(9)) |
+--------------------------------------------------------+
| 9q8yygxne |
+--------------------------------------------------------+
SELECT geohash(37.76938, -122.3889, 10);
+---------------------------------------------------------+
| geohash(Float64(37.76938),Float64(-122.3889),Int64(10)) |
+---------------------------------------------------------+
| 9q8yygxnef |
+---------------------------------------------------------+
SELECT geohash(37.76938, -122.3889, 11);
+---------------------------------------------------------+
| geohash(Float64(37.76938),Float64(-122.3889),Int64(11)) |
+---------------------------------------------------------+
| 9q8yygxneft |
+---------------------------------------------------------+

View File

@@ -1,11 +0,0 @@
SELECT h3(37.76938, -122.3889, 0);
SELECT h3(37.76938, -122.3889, 1);
SELECT h3(37.76938, -122.3889, 8);
SELECT geohash(37.76938, -122.3889, 9);
SELECT geohash(37.76938, -122.3889, 10);
SELECT geohash(37.76938, -122.3889, 11);