mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-25 15:40:02 +00:00
Compare commits
74 Commits
feat/merge
...
v0.7.0-nig
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2a675e0794 | ||
|
|
0edf1bbacc | ||
|
|
8609977b52 | ||
|
|
2d975e4f22 | ||
|
|
00cbbc97ae | ||
|
|
7d30c2484b | ||
|
|
376409b857 | ||
|
|
d4a54a085b | ||
|
|
c1a370649e | ||
|
|
3cad9d989d | ||
|
|
a50025269f | ||
|
|
a3533c4ea0 | ||
|
|
3413fc0781 | ||
|
|
dc205a2c5d | ||
|
|
a0a8e8c587 | ||
|
|
c3c80b92c8 | ||
|
|
a8cbec824c | ||
|
|
33d894c1f0 | ||
|
|
7942b8fae9 | ||
|
|
b97f957489 | ||
|
|
f3d69e9563 | ||
|
|
4b36c285f1 | ||
|
|
dbb1ce1a9b | ||
|
|
3544c9334c | ||
|
|
492a00969d | ||
|
|
206666bff6 | ||
|
|
7453d9779d | ||
|
|
8e3e0fd528 | ||
|
|
b1e290f959 | ||
|
|
d8dc93fccc | ||
|
|
3887d207b6 | ||
|
|
e859f0e67d | ||
|
|
ce397ebcc6 | ||
|
|
26011ed0b6 | ||
|
|
8087822ab2 | ||
|
|
e481f073f5 | ||
|
|
606309f49a | ||
|
|
8059b95e37 | ||
|
|
afe4633320 | ||
|
|
abbfd23d4b | ||
|
|
1df64f294b | ||
|
|
a6564e72b4 | ||
|
|
1f1d1b4f57 | ||
|
|
b144836935 | ||
|
|
93d9f48dd7 | ||
|
|
90e9b69035 | ||
|
|
2035e7bf4c | ||
|
|
7341f23019 | ||
|
|
41ee0cdd5a | ||
|
|
578dd8f87a | ||
|
|
1dc4fec662 | ||
|
|
f26505b625 | ||
|
|
8289b0dec2 | ||
|
|
53105b99e7 | ||
|
|
564fe3beca | ||
|
|
e9a2b0a9ee | ||
|
|
860b1e9d9e | ||
|
|
7c88d721c2 | ||
|
|
90169c868d | ||
|
|
4c07606da6 | ||
|
|
a7bf458a37 | ||
|
|
fa08085119 | ||
|
|
86a98c80f5 | ||
|
|
085a380019 | ||
|
|
d9a96344ee | ||
|
|
41656c8635 | ||
|
|
cf08a3de6b | ||
|
|
f087a843bb | ||
|
|
450dfe324d | ||
|
|
3dfe4a2e5a | ||
|
|
eded08897d | ||
|
|
b1f54d8a03 | ||
|
|
bf5e1905cd | ||
|
|
6628c41c36 |
@@ -3,13 +3,3 @@ linker = "aarch64-linux-gnu-gcc"
|
||||
|
||||
[alias]
|
||||
sqlness = "run --bin sqlness-runner --"
|
||||
|
||||
|
||||
[build]
|
||||
rustflags = [
|
||||
# lints
|
||||
# TODO: use lint configuration in cargo https://github.com/rust-lang/cargo/issues/5034
|
||||
"-Wclippy::print_stdout",
|
||||
"-Wclippy::print_stderr",
|
||||
"-Wclippy::implicit_clone",
|
||||
]
|
||||
|
||||
@@ -34,7 +34,7 @@ runs:
|
||||
|
||||
- name: Upload sqlness logs
|
||||
if: ${{ failure() && inputs.disable-run-tests == 'false' }} # Only upload logs when the integration tests failed.
|
||||
uses: actions/upload-artifact@v3
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: sqlness-logs
|
||||
path: /tmp/greptime-*.log
|
||||
|
||||
@@ -67,7 +67,7 @@ runs:
|
||||
|
||||
- name: Upload sqlness logs
|
||||
if: ${{ failure() }} # Only upload logs when the integration tests failed.
|
||||
uses: actions/upload-artifact@v3
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: sqlness-logs
|
||||
path: /tmp/greptime-*.log
|
||||
|
||||
@@ -62,10 +62,10 @@ runs:
|
||||
|
||||
- name: Upload sqlness logs
|
||||
if: ${{ failure() }} # Only upload logs when the integration tests failed.
|
||||
uses: actions/upload-artifact@v3
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: sqlness-logs
|
||||
path: ${{ runner.temp }}/greptime-*.log
|
||||
path: /tmp/greptime-*.log
|
||||
retention-days: 3
|
||||
|
||||
- name: Build greptime binary
|
||||
|
||||
8
.github/pull_request_template.md
vendored
8
.github/pull_request_template.md
vendored
@@ -1,8 +1,10 @@
|
||||
I hereby agree to the terms of the [GreptimeDB CLA](https://gist.github.com/xtang/6378857777706e568c1949c7578592cc)
|
||||
I hereby agree to the terms of the [GreptimeDB CLA](https://github.com/GreptimeTeam/.github/blob/main/CLA.md).
|
||||
|
||||
## Refer to a related PR or issue link (optional)
|
||||
|
||||
## What's changed and what's your intention?
|
||||
|
||||
_PLEASE DO NOT LEAVE THIS EMPTY !!!_
|
||||
__!!! DO NOT LEAVE THIS BLOCK EMPTY !!!__
|
||||
|
||||
Please explain IN DETAIL what the changes are in this PR and why they are needed:
|
||||
|
||||
@@ -16,5 +18,3 @@ Please explain IN DETAIL what the changes are in this PR and why they are needed
|
||||
- [ ] I have written the necessary rustdoc comments.
|
||||
- [ ] I have added the necessary unit tests and integration tests.
|
||||
- [x] This PR does not require documentation updates.
|
||||
|
||||
## Refer to a related PR or issue link (optional)
|
||||
|
||||
42
.github/workflows/develop.yml
vendored
42
.github/workflows/develop.yml
vendored
@@ -1,7 +1,7 @@
|
||||
on:
|
||||
merge_group:
|
||||
pull_request:
|
||||
types: [opened, synchronize, reopened, ready_for_review]
|
||||
types: [ opened, synchronize, reopened, ready_for_review ]
|
||||
paths-ignore:
|
||||
- 'docs/**'
|
||||
- 'config/**'
|
||||
@@ -57,7 +57,7 @@ jobs:
|
||||
toolchain: ${{ env.RUST_TOOLCHAIN }}
|
||||
- name: Rust Cache
|
||||
uses: Swatinem/rust-cache@v2
|
||||
with:
|
||||
with:
|
||||
# Shares across multiple jobs
|
||||
# Shares with `Clippy` job
|
||||
shared-key: "check-lint"
|
||||
@@ -75,7 +75,7 @@ jobs:
|
||||
toolchain: stable
|
||||
- name: Rust Cache
|
||||
uses: Swatinem/rust-cache@v2
|
||||
with:
|
||||
with:
|
||||
# Shares across multiple jobs
|
||||
shared-key: "check-toml"
|
||||
- name: Install taplo
|
||||
@@ -136,13 +136,12 @@ jobs:
|
||||
run: tar -xvf ./bins.tar.gz
|
||||
- name: Run sqlness
|
||||
run: RUST_BACKTRACE=1 ./bins/sqlness-runner -c ./tests/cases --bins-dir ./bins
|
||||
# FIXME: Logs cannot found be on failure (or even success). Need to figure out the cause.
|
||||
- name: Upload sqlness logs
|
||||
if: always()
|
||||
uses: actions/upload-artifact@v3
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: sqlness-logs
|
||||
path: ${{ runner.temp }}/greptime-*.log
|
||||
path: /tmp/greptime-*.log
|
||||
retention-days: 3
|
||||
|
||||
sqlness-kafka-wal:
|
||||
@@ -167,13 +166,12 @@ jobs:
|
||||
run: docker compose -f docker-compose-standalone.yml up -d --wait
|
||||
- name: Run sqlness
|
||||
run: RUST_BACKTRACE=1 ./bins/sqlness-runner -w kafka -k 127.0.0.1:9092 -c ./tests/cases --bins-dir ./bins
|
||||
# FIXME: Logs cannot be found on failure (or even success). Need to figure out the cause.
|
||||
- name: Upload sqlness logs
|
||||
if: always()
|
||||
uses: actions/upload-artifact@v3
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: sqlness-logs
|
||||
path: ${{ runner.temp }}/greptime-*.log
|
||||
name: sqlness-logs-with-kafka-wal
|
||||
path: /tmp/greptime-*.log
|
||||
retention-days: 3
|
||||
|
||||
fmt:
|
||||
@@ -191,7 +189,7 @@ jobs:
|
||||
components: rustfmt
|
||||
- name: Rust Cache
|
||||
uses: Swatinem/rust-cache@v2
|
||||
with:
|
||||
with:
|
||||
# Shares across multiple jobs
|
||||
shared-key: "check-rust-fmt"
|
||||
- name: Run cargo fmt
|
||||
@@ -212,7 +210,7 @@ jobs:
|
||||
components: clippy
|
||||
- name: Rust Cache
|
||||
uses: Swatinem/rust-cache@v2
|
||||
with:
|
||||
with:
|
||||
# Shares across multiple jobs
|
||||
# Shares with `Check` job
|
||||
shared-key: "check-lint"
|
||||
@@ -271,10 +269,28 @@ jobs:
|
||||
GT_KAFKA_ENDPOINTS: 127.0.0.1:9092
|
||||
UNITTEST_LOG_DIR: "__unittest_logs"
|
||||
- name: Codecov upload
|
||||
uses: codecov/codecov-action@v2
|
||||
uses: codecov/codecov-action@v4
|
||||
with:
|
||||
token: ${{ secrets.CODECOV_TOKEN }}
|
||||
files: ./lcov.info
|
||||
flags: rust
|
||||
fail_ci_if_error: false
|
||||
verbose: true
|
||||
|
||||
compat:
|
||||
name: Compatibility Test
|
||||
needs: build
|
||||
runs-on: ubuntu-20.04
|
||||
timeout-minutes: 60
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- name: Download pre-built binaries
|
||||
uses: actions/download-artifact@v4
|
||||
with:
|
||||
name: bins
|
||||
path: .
|
||||
- name: Unzip binaries
|
||||
run: |
|
||||
mkdir -p ./bins/current
|
||||
tar -xvf ./bins.tar.gz --strip-components=1 -C ./bins/current
|
||||
- run: ./tests/compat/test-compat.sh 0.6.0
|
||||
|
||||
14
.github/workflows/docs.yml
vendored
14
.github/workflows/docs.yml
vendored
@@ -61,6 +61,18 @@ jobs:
|
||||
|
||||
sqlness:
|
||||
name: Sqlness Test
|
||||
runs-on: ubuntu-20.04
|
||||
runs-on: ${{ matrix.os }}
|
||||
strategy:
|
||||
matrix:
|
||||
os: [ ubuntu-20.04 ]
|
||||
steps:
|
||||
- run: 'echo "No action required"'
|
||||
|
||||
sqlness-kafka-wal:
|
||||
name: Sqlness Test with Kafka Wal
|
||||
runs-on: ${{ matrix.os }}
|
||||
strategy:
|
||||
matrix:
|
||||
os: [ ubuntu-20.04 ]
|
||||
steps:
|
||||
- run: 'echo "No action required"'
|
||||
|
||||
4
.github/workflows/nightly-ci.yml
vendored
4
.github/workflows/nightly-ci.yml
vendored
@@ -45,10 +45,10 @@ jobs:
|
||||
{"text": "Nightly CI failed for sqlness tests"}
|
||||
- name: Upload sqlness logs
|
||||
if: always()
|
||||
uses: actions/upload-artifact@v3
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: sqlness-logs
|
||||
path: ${{ runner.temp }}/greptime-*.log
|
||||
path: /tmp/greptime-*.log
|
||||
retention-days: 3
|
||||
|
||||
test-on-windows:
|
||||
|
||||
8
.github/workflows/release.yml
vendored
8
.github/workflows/release.yml
vendored
@@ -221,6 +221,8 @@ jobs:
|
||||
arch: x86_64-apple-darwin
|
||||
artifacts-dir-prefix: greptime-darwin-amd64-pyo3
|
||||
runs-on: ${{ matrix.os }}
|
||||
outputs:
|
||||
build-macos-result: ${{ steps.set-build-macos-result.outputs.build-macos-result }}
|
||||
needs: [
|
||||
allocate-runners,
|
||||
]
|
||||
@@ -260,6 +262,8 @@ jobs:
|
||||
features: pyo3_backend,servers/dashboard
|
||||
artifacts-dir-prefix: greptime-windows-amd64-pyo3
|
||||
runs-on: ${{ matrix.os }}
|
||||
outputs:
|
||||
build-windows-result: ${{ steps.set-build-windows-result.outputs.build-windows-result }}
|
||||
needs: [
|
||||
allocate-runners,
|
||||
]
|
||||
@@ -295,6 +299,8 @@ jobs:
|
||||
build-linux-arm64-artifacts,
|
||||
]
|
||||
runs-on: ubuntu-2004-16-cores
|
||||
outputs:
|
||||
build-image-result: ${{ steps.set-build-image-result.outputs.build-image-result }}
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
@@ -310,7 +316,7 @@ jobs:
|
||||
version: ${{ needs.allocate-runners.outputs.version }}
|
||||
|
||||
- name: Set build image result
|
||||
id: set-image-build-result
|
||||
id: set-build-image-result
|
||||
run: |
|
||||
echo "build-image-result=success" >> $GITHUB_OUTPUT
|
||||
|
||||
|
||||
207
Cargo.lock
generated
207
Cargo.lock
generated
@@ -412,6 +412,7 @@ dependencies = [
|
||||
"arrow-data",
|
||||
"arrow-schema",
|
||||
"flatbuffers",
|
||||
"lz4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -785,23 +786,6 @@ dependencies = [
|
||||
"syn 2.0.43",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "axum-test-helper"
|
||||
version = "0.1.1"
|
||||
source = "git+https://github.com/sunng87/axum-test-helper.git?branch=patch-1#5aa7843ce2250144ea1b7f589f274c00cf1af4ab"
|
||||
dependencies = [
|
||||
"axum",
|
||||
"bytes",
|
||||
"http",
|
||||
"http-body",
|
||||
"hyper",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"tokio",
|
||||
"tower",
|
||||
"tower-service",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "axum-test-helper"
|
||||
version = "0.3.0"
|
||||
@@ -1557,6 +1541,8 @@ dependencies = [
|
||||
"prometheus",
|
||||
"prost 0.12.3",
|
||||
"rand",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"session",
|
||||
"snafu",
|
||||
"substrait 0.17.1",
|
||||
@@ -1783,8 +1769,11 @@ dependencies = [
|
||||
"arc-swap",
|
||||
"async-trait",
|
||||
"chrono-tz 0.6.3",
|
||||
"common-base",
|
||||
"common-catalog",
|
||||
"common-error",
|
||||
"common-macro",
|
||||
"common-meta",
|
||||
"common-query",
|
||||
"common-runtime",
|
||||
"common-telemetry",
|
||||
@@ -1799,9 +1788,11 @@ dependencies = [
|
||||
"paste",
|
||||
"ron",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"session",
|
||||
"snafu",
|
||||
"statrs",
|
||||
"store-api",
|
||||
"table",
|
||||
]
|
||||
|
||||
@@ -1912,11 +1903,13 @@ dependencies = [
|
||||
"base64 0.21.5",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"common-base",
|
||||
"common-catalog",
|
||||
"common-error",
|
||||
"common-grpc-expr",
|
||||
"common-macro",
|
||||
"common-procedure",
|
||||
"common-procedure-test",
|
||||
"common-recordbatch",
|
||||
"common-runtime",
|
||||
"common-telemetry",
|
||||
@@ -1949,6 +1942,10 @@ dependencies = [
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "common-plugins"
|
||||
version = "0.6.0"
|
||||
|
||||
[[package]]
|
||||
name = "common-procedure"
|
||||
version = "0.6.0"
|
||||
@@ -2048,6 +2045,7 @@ dependencies = [
|
||||
name = "common-telemetry"
|
||||
version = "0.6.0"
|
||||
dependencies = [
|
||||
"atty",
|
||||
"backtrace",
|
||||
"common-error",
|
||||
"console-subscriber",
|
||||
@@ -2111,6 +2109,8 @@ name = "common-wal"
|
||||
version = "0.6.0"
|
||||
dependencies = [
|
||||
"common-base",
|
||||
"common-error",
|
||||
"common-macro",
|
||||
"common-telemetry",
|
||||
"futures-util",
|
||||
"humantime-serde",
|
||||
@@ -2118,6 +2118,8 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_with",
|
||||
"snafu",
|
||||
"tokio",
|
||||
"toml 0.8.8",
|
||||
]
|
||||
|
||||
@@ -3415,6 +3417,7 @@ dependencies = [
|
||||
"datatypes",
|
||||
"hydroflow",
|
||||
"itertools 0.10.5",
|
||||
"num-traits",
|
||||
"serde",
|
||||
"servers",
|
||||
"session",
|
||||
@@ -3590,6 +3593,15 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fsevent-sys"
|
||||
version = "4.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fst"
|
||||
version = "0.4.7"
|
||||
@@ -4374,6 +4386,26 @@ dependencies = [
|
||||
"snafu",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "inotify"
|
||||
version = "0.9.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
"inotify-sys",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "inotify-sys"
|
||||
version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "instant"
|
||||
version = "0.1.12"
|
||||
@@ -4571,6 +4603,26 @@ dependencies = [
|
||||
"indexmap 2.1.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "kqueue"
|
||||
version = "1.0.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7447f1ca1b7b563588a205fe93dea8df60fd981423a768bc1c0ded35ed147d0c"
|
||||
dependencies = [
|
||||
"kqueue-sys",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "kqueue-sys"
|
||||
version = "1.0.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lalrpop"
|
||||
version = "0.19.12"
|
||||
@@ -5178,7 +5230,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "meter-core"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=abbd357c1e193cd270ea65ee7652334a150b628f#abbd357c1e193cd270ea65ee7652334a150b628f"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=80b72716dcde47ec4161478416a5c6c21343364d#80b72716dcde47ec4161478416a5c6c21343364d"
|
||||
dependencies = [
|
||||
"anymap",
|
||||
"once_cell",
|
||||
@@ -5188,7 +5240,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "meter-macros"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=abbd357c1e193cd270ea65ee7652334a150b628f#abbd357c1e193cd270ea65ee7652334a150b628f"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=80b72716dcde47ec4161478416a5c6c21343364d#80b72716dcde47ec4161478416a5c6c21343364d"
|
||||
dependencies = [
|
||||
"meter-core",
|
||||
]
|
||||
@@ -5210,6 +5262,7 @@ dependencies = [
|
||||
"common-time",
|
||||
"datafusion",
|
||||
"datatypes",
|
||||
"itertools 0.10.5",
|
||||
"lazy_static",
|
||||
"mito2",
|
||||
"mur3",
|
||||
@@ -5293,6 +5346,7 @@ dependencies = [
|
||||
"common-test-util",
|
||||
"common-time",
|
||||
"common-wal",
|
||||
"criterion",
|
||||
"dashmap",
|
||||
"datafusion",
|
||||
"datafusion-common",
|
||||
@@ -5313,6 +5367,7 @@ dependencies = [
|
||||
"prometheus",
|
||||
"prost 0.12.3",
|
||||
"puffin",
|
||||
"rand",
|
||||
"regex",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -5418,6 +5473,24 @@ dependencies = [
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mysql-common-derive"
|
||||
version = "0.31.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c60492b5eb751e55b42d716b6b26dceb66767996cd7a5560a842fbf613ca2e92"
|
||||
dependencies = [
|
||||
"darling 0.20.3",
|
||||
"heck",
|
||||
"num-bigint",
|
||||
"proc-macro-crate 3.1.0",
|
||||
"proc-macro-error",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.43",
|
||||
"termcolor",
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mysql_async"
|
||||
version = "0.33.0"
|
||||
@@ -5434,7 +5507,7 @@ dependencies = [
|
||||
"lazy_static",
|
||||
"lru",
|
||||
"mio",
|
||||
"mysql_common",
|
||||
"mysql_common 0.31.0",
|
||||
"once_cell",
|
||||
"pem",
|
||||
"percent-encoding",
|
||||
@@ -5470,13 +5543,12 @@ dependencies = [
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"cc",
|
||||
"chrono",
|
||||
"cmake",
|
||||
"crc32fast",
|
||||
"flate2",
|
||||
"frunk",
|
||||
"lazy_static",
|
||||
"mysql-common-derive",
|
||||
"mysql-common-derive 0.30.2",
|
||||
"num-bigint",
|
||||
"num-traits",
|
||||
"rand",
|
||||
@@ -5495,6 +5567,46 @@ dependencies = [
|
||||
"zstd 0.12.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mysql_common"
|
||||
version = "0.32.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b73aacd01475af6d2efbdf489efd60fc519515ffe94edfd74236f954d521e31b"
|
||||
dependencies = [
|
||||
"base64 0.21.5",
|
||||
"bigdecimal",
|
||||
"bindgen",
|
||||
"bitflags 2.4.1",
|
||||
"bitvec",
|
||||
"btoi",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"cc",
|
||||
"chrono",
|
||||
"cmake",
|
||||
"crc32fast",
|
||||
"flate2",
|
||||
"frunk",
|
||||
"lazy_static",
|
||||
"mysql-common-derive 0.31.0",
|
||||
"num-bigint",
|
||||
"num-traits",
|
||||
"rand",
|
||||
"regex",
|
||||
"rust_decimal",
|
||||
"saturating",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sha1",
|
||||
"sha2",
|
||||
"smallvec",
|
||||
"subprocess",
|
||||
"thiserror",
|
||||
"time",
|
||||
"uuid",
|
||||
"zstd 0.13.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nalgebra"
|
||||
version = "0.29.0"
|
||||
@@ -5601,6 +5713,25 @@ version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be"
|
||||
|
||||
[[package]]
|
||||
name = "notify"
|
||||
version = "6.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d"
|
||||
dependencies = [
|
||||
"bitflags 2.4.1",
|
||||
"crossbeam-channel",
|
||||
"filetime",
|
||||
"fsevent-sys",
|
||||
"inotify",
|
||||
"kqueue",
|
||||
"libc",
|
||||
"log",
|
||||
"mio",
|
||||
"walkdir",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ntapi"
|
||||
version = "0.4.1"
|
||||
@@ -5890,13 +6021,14 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "opensrv-mysql"
|
||||
version = "0.6.0"
|
||||
source = "git+https://github.com/MichaelScofield/opensrv.git?rev=1676c1d#1676c1d166cf33e7745e1b5db54b3fe2b7defec9"
|
||||
version = "0.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4148ab944991b0a33be74d2636a815268974578812a9e4cf7dc785325e858154"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"byteorder",
|
||||
"chrono",
|
||||
"mysql_common",
|
||||
"mysql_common 0.32.0",
|
||||
"nom",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
@@ -6069,6 +6201,7 @@ dependencies = [
|
||||
"meter-macros",
|
||||
"object-store",
|
||||
"partition",
|
||||
"path-slash",
|
||||
"prometheus",
|
||||
"query",
|
||||
"regex",
|
||||
@@ -6343,6 +6476,12 @@ version = "1.0.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c"
|
||||
|
||||
[[package]]
|
||||
name = "path-slash"
|
||||
version = "0.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1e91099d4268b0e11973f036e885d652fb0b21fedcf69738c627f94db6a44f42"
|
||||
|
||||
[[package]]
|
||||
name = "pathdiff"
|
||||
version = "0.2.1"
|
||||
@@ -6811,6 +6950,15 @@ dependencies = [
|
||||
"toml_edit 0.20.7",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro-crate"
|
||||
version = "3.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6d37c51ca738a55da99dc0c4a34860fd675453b8b36209178c2249bb13651284"
|
||||
dependencies = [
|
||||
"toml_edit 0.21.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro-error"
|
||||
version = "1.0.4"
|
||||
@@ -7241,6 +7389,7 @@ dependencies = [
|
||||
"common-function",
|
||||
"common-macro",
|
||||
"common-meta",
|
||||
"common-plugins",
|
||||
"common-query",
|
||||
"common-recordbatch",
|
||||
"common-telemetry",
|
||||
@@ -8883,7 +9032,7 @@ dependencies = [
|
||||
"auth",
|
||||
"axum",
|
||||
"axum-macros",
|
||||
"axum-test-helper 0.3.0",
|
||||
"axum-test-helper",
|
||||
"base64 0.21.5",
|
||||
"bytes",
|
||||
"catalog",
|
||||
@@ -8897,6 +9046,7 @@ dependencies = [
|
||||
"common-macro",
|
||||
"common-mem-prof",
|
||||
"common-meta",
|
||||
"common-plugins",
|
||||
"common-query",
|
||||
"common-recordbatch",
|
||||
"common-runtime",
|
||||
@@ -8922,6 +9072,7 @@ dependencies = [
|
||||
"lazy_static",
|
||||
"mime_guess",
|
||||
"mysql_async",
|
||||
"notify",
|
||||
"once_cell",
|
||||
"openmetrics-parser",
|
||||
"opensrv-mysql",
|
||||
@@ -8954,6 +9105,7 @@ dependencies = [
|
||||
"sql",
|
||||
"strum 0.25.0",
|
||||
"table",
|
||||
"tempfile",
|
||||
"tikv-jemalloc-ctl",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
@@ -8979,6 +9131,7 @@ dependencies = [
|
||||
"common-telemetry",
|
||||
"common-time",
|
||||
"derive_builder 0.12.0",
|
||||
"snafu",
|
||||
"sql",
|
||||
]
|
||||
|
||||
@@ -9955,7 +10108,7 @@ dependencies = [
|
||||
"async-trait",
|
||||
"auth",
|
||||
"axum",
|
||||
"axum-test-helper 0.1.1",
|
||||
"axum-test-helper",
|
||||
"catalog",
|
||||
"chrono",
|
||||
"client",
|
||||
|
||||
14
Cargo.toml
14
Cargo.toml
@@ -18,6 +18,7 @@ members = [
|
||||
"src/common/grpc-expr",
|
||||
"src/common/mem-prof",
|
||||
"src/common/meta",
|
||||
"src/common/plugins",
|
||||
"src/common/procedure",
|
||||
"src/common/procedure-test",
|
||||
"src/common/query",
|
||||
@@ -65,13 +66,19 @@ version = "0.6.0"
|
||||
edition = "2021"
|
||||
license = "Apache-2.0"
|
||||
|
||||
[workspace.lints]
|
||||
clippy.print_stdout = "warn"
|
||||
clippy.print_stderr = "warn"
|
||||
clippy.implicit_clone = "warn"
|
||||
rust.unknown_lints = "deny"
|
||||
|
||||
[workspace.dependencies]
|
||||
ahash = { version = "0.8", features = ["compile-time-rng"] }
|
||||
aquamarine = "0.3"
|
||||
arrow = { version = "47.0" }
|
||||
arrow-array = "47.0"
|
||||
arrow-flight = "47.0"
|
||||
arrow-ipc = "47.0"
|
||||
arrow-ipc = { version = "47.0", features = ["lz4"] }
|
||||
arrow-schema = { version = "47.0", features = ["serde"] }
|
||||
async-stream = "0.3"
|
||||
async-trait = "0.1"
|
||||
@@ -100,7 +107,7 @@ greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", r
|
||||
humantime-serde = "1.1"
|
||||
itertools = "0.10"
|
||||
lazy_static = "1.4"
|
||||
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "abbd357c1e193cd270ea65ee7652334a150b628f" }
|
||||
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "80b72716dcde47ec4161478416a5c6c21343364d" }
|
||||
mockall = "0.11.4"
|
||||
moka = "0.12"
|
||||
num_cpus = "1.16"
|
||||
@@ -164,6 +171,7 @@ common-grpc-expr = { path = "src/common/grpc-expr" }
|
||||
common-macro = { path = "src/common/macro" }
|
||||
common-mem-prof = { path = "src/common/mem-prof" }
|
||||
common-meta = { path = "src/common/meta" }
|
||||
common-plugins = { path = "src/common/plugins" }
|
||||
common-procedure = { path = "src/common/procedure" }
|
||||
common-procedure-test = { path = "src/common/procedure-test" }
|
||||
common-query = { path = "src/common/query" }
|
||||
@@ -201,7 +209,7 @@ table = { path = "src/table" }
|
||||
|
||||
[workspace.dependencies.meter-macros]
|
||||
git = "https://github.com/GreptimeTeam/greptime-meter.git"
|
||||
rev = "abbd357c1e193cd270ea65ee7652334a150b628f"
|
||||
rev = "80b72716dcde47ec4161478416a5c6c21343364d"
|
||||
|
||||
[profile.release]
|
||||
debug = 1
|
||||
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
arrow.workspace = true
|
||||
chrono.workspace = true
|
||||
|
||||
@@ -504,7 +504,7 @@ async fn do_query(num_iter: usize, db: &Database, table_name: &str) {
|
||||
let res = db.sql(&query).await.unwrap();
|
||||
match res {
|
||||
Output::AffectedRows(_) | Output::RecordBatches(_) => (),
|
||||
Output::Stream(stream) => {
|
||||
Output::Stream(stream, _) => {
|
||||
stream.try_collect::<Vec<_>>().await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,5 +8,6 @@ coverage:
|
||||
ignore:
|
||||
- "**/error*.rs" # ignore all error.rs files
|
||||
- "tests/runner/*.rs" # ignore integration test runner
|
||||
- "tests-integration/**/*.rs" # ignore integration tests
|
||||
comment: # this is a top-level key
|
||||
layout: "diff"
|
||||
|
||||
@@ -134,7 +134,7 @@ create_on_compaction = "auto"
|
||||
apply_on_query = "auto"
|
||||
# Memory threshold for performing an external sort during index creation.
|
||||
# Setting to empty will disable external sorting, forcing all sorting operations to happen in memory.
|
||||
mem_threshold_on_create = "64MB"
|
||||
mem_threshold_on_create = "64M"
|
||||
# File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`).
|
||||
intermediate_path = ""
|
||||
|
||||
|
||||
@@ -31,6 +31,7 @@ runtime_size = 2
|
||||
mode = "disable"
|
||||
cert_path = ""
|
||||
key_path = ""
|
||||
watch = false
|
||||
|
||||
# PostgresSQL server options, see `standalone.example.toml`.
|
||||
[postgres]
|
||||
@@ -43,6 +44,7 @@ runtime_size = 2
|
||||
mode = "disable"
|
||||
cert_path = ""
|
||||
key_path = ""
|
||||
watch = false
|
||||
|
||||
# OpenTSDB protocol options, see `standalone.example.toml`.
|
||||
[opentsdb]
|
||||
|
||||
@@ -44,6 +44,8 @@ mode = "disable"
|
||||
cert_path = ""
|
||||
# Private key file path.
|
||||
key_path = ""
|
||||
# Watch for Certificate and key file change and auto reload
|
||||
watch = false
|
||||
|
||||
# PostgresSQL server options.
|
||||
[postgres]
|
||||
@@ -62,6 +64,8 @@ mode = "disable"
|
||||
cert_path = ""
|
||||
# private key file path.
|
||||
key_path = ""
|
||||
# Watch for Certificate and key file change and auto reload
|
||||
watch = false
|
||||
|
||||
# OpenTSDB protocol options.
|
||||
[opentsdb]
|
||||
@@ -118,7 +122,7 @@ sync_period = "1000ms"
|
||||
# Number of topics to be created upon start.
|
||||
# num_topics = 64
|
||||
# Topic selector type.
|
||||
# Available selector types:
|
||||
# Available selector types:
|
||||
# - "round_robin" (default)
|
||||
# selector_type = "round_robin"
|
||||
# The prefix of topic name.
|
||||
|
||||
@@ -66,7 +66,7 @@
|
||||
},
|
||||
"editable": true,
|
||||
"fiscalYearStartMonth": 0,
|
||||
"graphTooltip": 0,
|
||||
"graphTooltip": 1,
|
||||
"id": null,
|
||||
"links": [],
|
||||
"liveNow": false,
|
||||
@@ -2116,7 +2116,7 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
"unit": "bytes"
|
||||
"unit": "none"
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
@@ -2126,7 +2126,7 @@
|
||||
"x": 0,
|
||||
"y": 61
|
||||
},
|
||||
"id": 12,
|
||||
"id": 17,
|
||||
"interval": "1s",
|
||||
"options": {
|
||||
"legend": {
|
||||
@@ -2147,8 +2147,8 @@
|
||||
"uid": "${DS_PROMETHEUS-1}"
|
||||
},
|
||||
"disableTextWrap": false,
|
||||
"editorMode": "builder",
|
||||
"expr": "histogram_quantile(0.95, sum by(le) (rate(raft_engine_write_size_bucket[$__rate_interval])))",
|
||||
"editorMode": "code",
|
||||
"expr": "rate(raft_engine_sync_log_duration_seconds_count[2s])",
|
||||
"fullMetaSearch": false,
|
||||
"includeNullMetadata": false,
|
||||
"instant": false,
|
||||
@@ -2158,7 +2158,7 @@
|
||||
"useBackend": false
|
||||
}
|
||||
],
|
||||
"title": "wal write size",
|
||||
"title": "raft engine sync count",
|
||||
"type": "timeseries"
|
||||
},
|
||||
{
|
||||
@@ -2378,6 +2378,120 @@
|
||||
],
|
||||
"title": "raft engine write duration seconds",
|
||||
"type": "timeseries"
|
||||
},
|
||||
{
|
||||
"datasource": {
|
||||
"type": "prometheus",
|
||||
"uid": "${DS_PROMETHEUS-1}"
|
||||
},
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
"mode": "palette-classic"
|
||||
},
|
||||
"custom": {
|
||||
"axisBorderShow": false,
|
||||
"axisCenteredZero": false,
|
||||
"axisColorMode": "text",
|
||||
"axisLabel": "",
|
||||
"axisPlacement": "auto",
|
||||
"barAlignment": 0,
|
||||
"drawStyle": "line",
|
||||
"fillOpacity": 0,
|
||||
"gradientMode": "none",
|
||||
"hideFrom": {
|
||||
"legend": false,
|
||||
"tooltip": false,
|
||||
"viz": false
|
||||
},
|
||||
"insertNulls": false,
|
||||
"lineInterpolation": "linear",
|
||||
"lineWidth": 1,
|
||||
"pointSize": 5,
|
||||
"scaleDistribution": {
|
||||
"type": "linear"
|
||||
},
|
||||
"showPoints": "auto",
|
||||
"spanNulls": false,
|
||||
"stacking": {
|
||||
"group": "A",
|
||||
"mode": "none"
|
||||
},
|
||||
"thresholdsStyle": {
|
||||
"mode": "off"
|
||||
}
|
||||
},
|
||||
"mappings": [],
|
||||
"thresholds": {
|
||||
"mode": "absolute",
|
||||
"steps": [
|
||||
{
|
||||
"color": "green",
|
||||
"value": null
|
||||
},
|
||||
{
|
||||
"color": "red",
|
||||
"value": 80
|
||||
}
|
||||
]
|
||||
},
|
||||
"unit": "bytes"
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 7,
|
||||
"w": 12,
|
||||
"x": 12,
|
||||
"y": 68
|
||||
},
|
||||
"id": 12,
|
||||
"interval": "1s",
|
||||
"options": {
|
||||
"legend": {
|
||||
"calcs": [],
|
||||
"displayMode": "list",
|
||||
"placement": "bottom",
|
||||
"showLegend": true
|
||||
},
|
||||
"tooltip": {
|
||||
"mode": "single",
|
||||
"sort": "none"
|
||||
}
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"datasource": {
|
||||
"type": "prometheus",
|
||||
"uid": "${DS_PROMETHEUS-1}"
|
||||
},
|
||||
"disableTextWrap": false,
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.95, sum by(le) (rate(raft_engine_write_size_bucket[$__rate_interval])))",
|
||||
"fullMetaSearch": false,
|
||||
"includeNullMetadata": false,
|
||||
"instant": false,
|
||||
"legendFormat": "req-size-p95",
|
||||
"range": true,
|
||||
"refId": "A",
|
||||
"useBackend": false
|
||||
},
|
||||
{
|
||||
"datasource": {
|
||||
"type": "prometheus",
|
||||
"uid": "${DS_PROMETHEUS-1}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "rate(raft_engine_write_size_sum[$__rate_interval])",
|
||||
"hide": false,
|
||||
"instant": false,
|
||||
"legendFormat": "throughput",
|
||||
"range": true,
|
||||
"refId": "B"
|
||||
}
|
||||
],
|
||||
"title": "wal write size",
|
||||
"type": "timeseries"
|
||||
}
|
||||
],
|
||||
"refresh": "10s",
|
||||
@@ -2387,13 +2501,13 @@
|
||||
"list": []
|
||||
},
|
||||
"time": {
|
||||
"from": "now-3h",
|
||||
"from": "now-30m",
|
||||
"to": "now"
|
||||
},
|
||||
"timepicker": {},
|
||||
"timezone": "",
|
||||
"title": "GreptimeDB",
|
||||
"uid": "e7097237-669b-4f8d-b751-13067afbfb68",
|
||||
"version": 9,
|
||||
"version": 12,
|
||||
"weekStart": ""
|
||||
}
|
||||
@@ -1,8 +1,7 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
# This script is used to download built dashboard assets from the "GreptimeTeam/dashboard" repository.
|
||||
|
||||
set -e -x
|
||||
set -ex
|
||||
|
||||
declare -r SCRIPT_DIR=$(cd $(dirname ${0}) >/dev/null 2>&1 && pwd)
|
||||
declare -r ROOT_DIR=$(dirname ${SCRIPT_DIR})
|
||||
@@ -13,13 +12,34 @@ RELEASE_VERSION="$(cat $STATIC_DIR/VERSION | tr -d '\t\r\n ')"
|
||||
|
||||
echo "Downloading assets to dir: $OUT_DIR"
|
||||
cd $OUT_DIR
|
||||
|
||||
if [[ -z "$GITHUB_PROXY_URL" ]]; then
|
||||
GITHUB_URL="https://github.com"
|
||||
else
|
||||
GITHUB_URL="${GITHUB_PROXY_URL%/}"
|
||||
fi
|
||||
|
||||
function retry_fetch() {
|
||||
local url=$1
|
||||
local filename=$2
|
||||
|
||||
curl --connect-timeout 10 --retry 3 -fsSL $url --output $filename || {
|
||||
echo "Failed to download $url"
|
||||
echo "You may try to set http_proxy and https_proxy environment variables."
|
||||
if [[ -z "$GITHUB_PROXY_URL" ]]; then
|
||||
echo "You may try to set GITHUB_PROXY_URL=http://mirror.ghproxy.com/"
|
||||
fi
|
||||
exit 1
|
||||
}
|
||||
}
|
||||
|
||||
# Download the SHA256 checksum attached to the release. To verify the integrity
|
||||
# of the download, this checksum will be used to check the download tar file
|
||||
# containing the built dashboard assets.
|
||||
curl -Ls https://github.com/GreptimeTeam/dashboard/releases/download/$RELEASE_VERSION/sha256.txt --output sha256.txt
|
||||
retry_fetch "${GITHUB_URL}/GreptimeTeam/dashboard/releases/download/${RELEASE_VERSION}/sha256.txt" sha256.txt
|
||||
|
||||
# Download the tar file containing the built dashboard assets.
|
||||
curl -L https://github.com/GreptimeTeam/dashboard/releases/download/$RELEASE_VERSION/build.tar.gz --output build.tar.gz
|
||||
retry_fetch "${GITHUB_URL}/GreptimeTeam/dashboard/releases/download/$RELEASE_VERSION/build.tar.gz" build.tar.gz
|
||||
|
||||
# Verify the checksums match; exit if they don't.
|
||||
case "$(uname -s)" in
|
||||
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
common-base.workspace = true
|
||||
common-decimal.workspace = true
|
||||
|
||||
@@ -8,6 +8,9 @@ license.workspace = true
|
||||
default = []
|
||||
testing = []
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
api.workspace = true
|
||||
async-trait.workspace = true
|
||||
|
||||
@@ -7,6 +7,9 @@ license.workspace = true
|
||||
[features]
|
||||
testing = []
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
api.workspace = true
|
||||
arc-swap = "1.0"
|
||||
|
||||
@@ -164,11 +164,8 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to find table partitions: #{table}"))]
|
||||
FindPartitions {
|
||||
source: partition::error::Error,
|
||||
table: String,
|
||||
},
|
||||
#[snafu(display("Failed to find table partitions"))]
|
||||
FindPartitions { source: partition::error::Error },
|
||||
|
||||
#[snafu(display("Failed to find region routes"))]
|
||||
FindRegionRoutes { source: partition::error::Error },
|
||||
@@ -254,6 +251,12 @@ pub enum Error {
|
||||
source: common_meta::error::Error,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Get null from table cache, key: {}", key))]
|
||||
TableCacheNotGet { key: String, location: Location },
|
||||
|
||||
#[snafu(display("Failed to get table cache, err: {}", err_msg))]
|
||||
GetTableCache { err_msg: String },
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -314,6 +317,7 @@ impl ErrorExt for Error {
|
||||
Error::QueryAccessDenied { .. } => StatusCode::AccessDenied,
|
||||
Error::Datafusion { .. } => StatusCode::EngineExecuteQuery,
|
||||
Error::TableMetadataManager { source, .. } => source.status_code(),
|
||||
Error::TableCacheNotGet { .. } | Error::GetTableCache { .. } => StatusCode::Internal,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use core::pin::pin;
|
||||
use std::sync::{Arc, Weak};
|
||||
|
||||
use arrow_schema::SchemaRef as ArrowSchemaRef;
|
||||
@@ -31,7 +32,7 @@ use datatypes::vectors::{
|
||||
ConstantVector, DateTimeVector, DateTimeVectorBuilder, Int64Vector, Int64VectorBuilder,
|
||||
MutableVector, StringVector, StringVectorBuilder, UInt64VectorBuilder,
|
||||
};
|
||||
use futures::TryStreamExt;
|
||||
use futures::{StreamExt, TryStreamExt};
|
||||
use partition::manager::PartitionInfo;
|
||||
use partition::partition::PartitionDef;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
@@ -240,40 +241,64 @@ impl InformationSchemaPartitionsBuilder {
|
||||
let predicates = Predicates::from_scan_request(&request);
|
||||
|
||||
for schema_name in catalog_manager.schema_names(&catalog_name).await? {
|
||||
let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await;
|
||||
let table_info_stream = catalog_manager
|
||||
.tables(&catalog_name, &schema_name)
|
||||
.await
|
||||
.try_filter_map(|t| async move {
|
||||
let table_info = t.table_info();
|
||||
if table_info.table_type == TableType::Temporary {
|
||||
Ok(None)
|
||||
} else {
|
||||
Ok(Some(table_info))
|
||||
}
|
||||
});
|
||||
|
||||
while let Some(table) = stream.try_next().await? {
|
||||
let table_info = table.table_info();
|
||||
const BATCH_SIZE: usize = 128;
|
||||
|
||||
if table_info.table_type == TableType::Temporary {
|
||||
continue;
|
||||
}
|
||||
// Split table infos into chunks
|
||||
let mut table_info_chunks = pin!(table_info_stream.ready_chunks(BATCH_SIZE));
|
||||
|
||||
let table_id = table_info.ident.table_id;
|
||||
let partitions = if let Some(partition_manager) = &partition_manager {
|
||||
while let Some(table_infos) = table_info_chunks.next().await {
|
||||
let table_infos = table_infos.into_iter().collect::<Result<Vec<_>>>()?;
|
||||
let table_ids: Vec<TableId> =
|
||||
table_infos.iter().map(|info| info.ident.table_id).collect();
|
||||
|
||||
let mut table_partitions = if let Some(partition_manager) = &partition_manager {
|
||||
partition_manager
|
||||
.find_table_partitions(table_id)
|
||||
.batch_find_table_partitions(&table_ids)
|
||||
.await
|
||||
.context(FindPartitionsSnafu {
|
||||
table: &table_info.name,
|
||||
})?
|
||||
.context(FindPartitionsSnafu)?
|
||||
} else {
|
||||
// Current node must be a standalone instance, contains only one partition by default.
|
||||
// TODO(dennis): change it when we support multi-regions for standalone.
|
||||
vec![PartitionInfo {
|
||||
id: RegionId::new(table_id, 0),
|
||||
partition: PartitionDef::new(vec![], vec![]),
|
||||
}]
|
||||
table_ids
|
||||
.into_iter()
|
||||
.map(|table_id| {
|
||||
(
|
||||
table_id,
|
||||
vec![PartitionInfo {
|
||||
id: RegionId::new(table_id, 0),
|
||||
partition: PartitionDef::new(vec![], vec![]),
|
||||
}],
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
};
|
||||
|
||||
self.add_partitions(
|
||||
&predicates,
|
||||
&table_info,
|
||||
&catalog_name,
|
||||
&schema_name,
|
||||
&table_info.name,
|
||||
&partitions,
|
||||
);
|
||||
for table_info in table_infos {
|
||||
let partitions = table_partitions
|
||||
.remove(&table_info.ident.table_id)
|
||||
.unwrap_or(vec![]);
|
||||
|
||||
self.add_partitions(
|
||||
&predicates,
|
||||
&table_info,
|
||||
&catalog_name,
|
||||
&schema_name,
|
||||
&table_info.name,
|
||||
&partitions,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -199,7 +199,7 @@ impl InformationSchemaRegionPeersBuilder {
|
||||
|
||||
let table_routes = if let Some(partition_manager) = &partition_manager {
|
||||
partition_manager
|
||||
.find_region_routes_batch(&table_ids)
|
||||
.batch_find_region_routes(&table_ids)
|
||||
.await
|
||||
.context(FindRegionRoutesSnafu)?
|
||||
} else {
|
||||
|
||||
@@ -82,12 +82,10 @@ impl CachedMetaKvBackendBuilder {
|
||||
let cache_ttl = self.cache_ttl.unwrap_or(DEFAULT_CACHE_TTL);
|
||||
let cache_tti = self.cache_tti.unwrap_or(DEFAULT_CACHE_TTI);
|
||||
|
||||
let cache = Arc::new(
|
||||
CacheBuilder::new(cache_max_capacity)
|
||||
.time_to_live(cache_ttl)
|
||||
.time_to_idle(cache_tti)
|
||||
.build(),
|
||||
);
|
||||
let cache = CacheBuilder::new(cache_max_capacity)
|
||||
.time_to_live(cache_ttl)
|
||||
.time_to_idle(cache_tti)
|
||||
.build();
|
||||
|
||||
let kv_backend = Arc::new(MetaKvBackend {
|
||||
client: self.meta_client,
|
||||
@@ -104,7 +102,7 @@ impl CachedMetaKvBackendBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
pub type CacheBackendRef = Arc<Cache<Vec<u8>, KeyValue>>;
|
||||
pub type CacheBackend = Cache<Vec<u8>, KeyValue>;
|
||||
|
||||
/// A wrapper of `MetaKvBackend` with cache support.
|
||||
///
|
||||
@@ -117,7 +115,7 @@ pub type CacheBackendRef = Arc<Cache<Vec<u8>, KeyValue>>;
|
||||
/// TTL and TTI for cache.
|
||||
pub struct CachedMetaKvBackend {
|
||||
kv_backend: KvBackendRef,
|
||||
cache: CacheBackendRef,
|
||||
cache: CacheBackend,
|
||||
name: String,
|
||||
version: AtomicUsize,
|
||||
}
|
||||
@@ -317,12 +315,10 @@ impl CachedMetaKvBackend {
|
||||
// only for test
|
||||
#[cfg(test)]
|
||||
fn wrap(kv_backend: KvBackendRef) -> Self {
|
||||
let cache = Arc::new(
|
||||
CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
|
||||
.time_to_live(DEFAULT_CACHE_TTL)
|
||||
.time_to_idle(DEFAULT_CACHE_TTI)
|
||||
.build(),
|
||||
);
|
||||
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
|
||||
.time_to_live(DEFAULT_CACHE_TTL)
|
||||
.time_to_idle(DEFAULT_CACHE_TTI)
|
||||
.build();
|
||||
|
||||
let name = format!("CachedKvBackend({})", kv_backend.name());
|
||||
Self {
|
||||
@@ -333,7 +329,7 @@ impl CachedMetaKvBackend {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn cache(&self) -> &CacheBackendRef {
|
||||
pub fn cache(&self) -> &CacheBackend {
|
||||
&self.cache
|
||||
}
|
||||
|
||||
|
||||
@@ -15,9 +15,13 @@
|
||||
use std::any::Any;
|
||||
use std::collections::BTreeSet;
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::Duration;
|
||||
|
||||
use async_stream::try_stream;
|
||||
use common_catalog::consts::{DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID};
|
||||
use common_catalog::consts::{
|
||||
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID,
|
||||
};
|
||||
use common_catalog::format_full_table_name;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::cache_invalidator::{CacheInvalidator, CacheInvalidatorRef, Context};
|
||||
use common_meta::error::Result as MetaResult;
|
||||
@@ -30,6 +34,7 @@ use common_meta::kv_backend::KvBackendRef;
|
||||
use common_meta::table_name::TableName;
|
||||
use futures_util::stream::BoxStream;
|
||||
use futures_util::{StreamExt, TryStreamExt};
|
||||
use moka::future::{Cache as AsyncCache, CacheBuilder};
|
||||
use moka::sync::Cache;
|
||||
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
|
||||
use snafu::prelude::*;
|
||||
@@ -38,9 +43,10 @@ use table::metadata::TableId;
|
||||
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
|
||||
use table::TableRef;
|
||||
|
||||
use crate::error::Error::{GetTableCache, TableCacheNotGet};
|
||||
use crate::error::{
|
||||
self as catalog_err, ListCatalogsSnafu, ListSchemasSnafu, ListTablesSnafu,
|
||||
Result as CatalogResult, TableMetadataManagerSnafu,
|
||||
Result as CatalogResult, TableCacheNotGetSnafu, TableMetadataManagerSnafu,
|
||||
};
|
||||
use crate::information_schema::InformationSchemaProvider;
|
||||
use crate::CatalogManager;
|
||||
@@ -60,6 +66,7 @@ pub struct KvBackendCatalogManager {
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
/// A sub-CatalogManager that handles system tables
|
||||
system_catalog: SystemCatalog,
|
||||
table_cache: AsyncCache<String, TableRef>,
|
||||
}
|
||||
|
||||
fn make_table(table_info_value: TableInfoValue) -> CatalogResult<TableRef> {
|
||||
@@ -79,13 +86,24 @@ impl CacheInvalidator for KvBackendCatalogManager {
|
||||
}
|
||||
|
||||
async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> MetaResult<()> {
|
||||
let table_cache_key = format_full_table_name(
|
||||
&table_name.catalog_name,
|
||||
&table_name.schema_name,
|
||||
&table_name.table_name,
|
||||
);
|
||||
self.cache_invalidator
|
||||
.invalidate_table_name(ctx, table_name)
|
||||
.await
|
||||
.await?;
|
||||
self.table_cache.invalidate(&table_cache_key).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
const DEFAULT_CACHED_CATALOG: u64 = 128;
|
||||
const CATALOG_CACHE_MAX_CAPACITY: u64 = 128;
|
||||
const TABLE_CACHE_MAX_CAPACITY: u64 = 65536;
|
||||
const TABLE_CACHE_TTL: Duration = Duration::from_secs(10 * 60);
|
||||
const TABLE_CACHE_TTI: Duration = Duration::from_secs(5 * 60);
|
||||
|
||||
impl KvBackendCatalogManager {
|
||||
pub fn new(backend: KvBackendRef, cache_invalidator: CacheInvalidatorRef) -> Arc<Self> {
|
||||
@@ -95,13 +113,16 @@ impl KvBackendCatalogManager {
|
||||
cache_invalidator,
|
||||
system_catalog: SystemCatalog {
|
||||
catalog_manager: me.clone(),
|
||||
catalog_cache: Cache::new(DEFAULT_CACHED_CATALOG),
|
||||
catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
|
||||
information_schema_provider: Arc::new(InformationSchemaProvider::new(
|
||||
// The catalog name is not used in system_catalog, so let it empty
|
||||
String::default(),
|
||||
DEFAULT_CATALOG_NAME.to_string(),
|
||||
me.clone(),
|
||||
)),
|
||||
},
|
||||
table_cache: CacheBuilder::new(TABLE_CACHE_MAX_CAPACITY)
|
||||
.time_to_live(TABLE_CACHE_TTL)
|
||||
.time_to_idle(TABLE_CACHE_TTI)
|
||||
.build(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -216,29 +237,52 @@ impl CatalogManager for KvBackendCatalogManager {
|
||||
return Ok(Some(table));
|
||||
}
|
||||
|
||||
let key = TableNameKey::new(catalog, schema, table_name);
|
||||
let Some(table_name_value) = self
|
||||
.table_metadata_manager
|
||||
.table_name_manager()
|
||||
.get(key)
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
let table_id = table_name_value.table_id();
|
||||
let init = async {
|
||||
let table_name_key = TableNameKey::new(catalog, schema, table_name);
|
||||
let Some(table_name_value) = self
|
||||
.table_metadata_manager
|
||||
.table_name_manager()
|
||||
.get(table_name_key)
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?
|
||||
else {
|
||||
return TableCacheNotGetSnafu {
|
||||
key: table_name_key.to_string(),
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
let table_id = table_name_value.table_id();
|
||||
|
||||
let Some(table_info_value) = self
|
||||
.table_metadata_manager
|
||||
.table_info_manager()
|
||||
.get(table_id)
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?
|
||||
.map(|v| v.into_inner())
|
||||
else {
|
||||
return Ok(None);
|
||||
let Some(table_info_value) = self
|
||||
.table_metadata_manager
|
||||
.table_info_manager()
|
||||
.get(table_id)
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?
|
||||
.map(|v| v.into_inner())
|
||||
else {
|
||||
return TableCacheNotGetSnafu {
|
||||
key: table_name_key.to_string(),
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
make_table(table_info_value)
|
||||
};
|
||||
make_table(table_info_value).map(Some)
|
||||
|
||||
match self
|
||||
.table_cache
|
||||
.try_get_with_by_ref(&format_full_table_name(catalog, schema, table_name), init)
|
||||
.await
|
||||
{
|
||||
Ok(table) => Ok(Some(table)),
|
||||
Err(err) => match err.as_ref() {
|
||||
TableCacheNotGet { .. } => Ok(None),
|
||||
_ => Err(err),
|
||||
},
|
||||
}
|
||||
.map_err(|err| GetTableCache {
|
||||
err_msg: err.to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn tables<'a>(
|
||||
|
||||
@@ -7,6 +7,9 @@ license.workspace = true
|
||||
[features]
|
||||
testing = []
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
api.workspace = true
|
||||
arc-swap = "1.6"
|
||||
@@ -34,6 +37,8 @@ parking_lot = "0.12"
|
||||
prometheus.workspace = true
|
||||
prost.workspace = true
|
||||
rand.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
session.workspace = true
|
||||
snafu.workspace = true
|
||||
tokio.workspace = true
|
||||
|
||||
@@ -340,7 +340,7 @@ impl Database {
|
||||
output_ordering: None,
|
||||
metrics: Default::default(),
|
||||
};
|
||||
Ok(Output::Stream(Box::pin(record_batch_stream)))
|
||||
Ok(Output::new_stream(Box::pin(record_batch_stream)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -134,10 +134,17 @@ impl From<Status> for Error {
|
||||
|
||||
impl Error {
|
||||
pub fn should_retry(&self) -> bool {
|
||||
!matches!(
|
||||
// TODO(weny): figure out each case of these codes.
|
||||
matches!(
|
||||
self,
|
||||
Self::RegionServer {
|
||||
code: Code::InvalidArgument,
|
||||
code: Code::Cancelled,
|
||||
..
|
||||
} | Self::RegionServer {
|
||||
code: Code::DeadlineExceeded,
|
||||
..
|
||||
} | Self::RegionServer {
|
||||
code: Code::Unavailable,
|
||||
..
|
||||
}
|
||||
)
|
||||
|
||||
@@ -123,8 +123,8 @@ impl RegionRequester {
|
||||
.fail();
|
||||
};
|
||||
|
||||
let metrics_str = Arc::new(ArcSwapOption::from(None));
|
||||
let ref_str = metrics_str.clone();
|
||||
let metrics = Arc::new(ArcSwapOption::from(None));
|
||||
let metrics_ref = metrics.clone();
|
||||
|
||||
let tracing_context = TracingContext::from_current_span();
|
||||
|
||||
@@ -140,7 +140,8 @@ impl RegionRequester {
|
||||
match flight_message {
|
||||
FlightMessage::Recordbatch(record_batch) => yield Ok(record_batch),
|
||||
FlightMessage::Metrics(s) => {
|
||||
ref_str.swap(Some(Arc::new(s)));
|
||||
let m = serde_json::from_str(&s).ok().map(Arc::new);
|
||||
metrics_ref.swap(m);
|
||||
break;
|
||||
}
|
||||
_ => {
|
||||
@@ -159,7 +160,7 @@ impl RegionRequester {
|
||||
schema,
|
||||
stream,
|
||||
output_ordering: None,
|
||||
metrics: metrics_str,
|
||||
metrics,
|
||||
};
|
||||
Ok(Box::pin(record_batch_stream))
|
||||
}
|
||||
@@ -196,7 +197,7 @@ impl RegionRequester {
|
||||
|
||||
check_response_header(header)?;
|
||||
|
||||
Ok(affected_rows)
|
||||
Ok(affected_rows as _)
|
||||
}
|
||||
|
||||
pub async fn handle(&self, request: RegionRequest) -> Result<AffectedRows> {
|
||||
|
||||
@@ -12,6 +12,9 @@ path = "src/bin/greptime.rs"
|
||||
[features]
|
||||
tokio-console = ["common-telemetry/tokio-console"]
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
anymap = "1.0.0-beta.2"
|
||||
async-trait.workspace = true
|
||||
|
||||
@@ -142,7 +142,7 @@ impl Export {
|
||||
.with_context(|_| RequestDatabaseSnafu {
|
||||
sql: "show databases".to_string(),
|
||||
})?;
|
||||
let Output::Stream(stream) = result else {
|
||||
let Output::Stream(stream, _) = result else {
|
||||
NotDataFromOutputSnafu.fail()?
|
||||
};
|
||||
let record_batch = collect(stream)
|
||||
@@ -183,7 +183,7 @@ impl Export {
|
||||
.sql(&sql)
|
||||
.await
|
||||
.with_context(|_| RequestDatabaseSnafu { sql })?;
|
||||
let Output::Stream(stream) = result else {
|
||||
let Output::Stream(stream, _) = result else {
|
||||
NotDataFromOutputSnafu.fail()?
|
||||
};
|
||||
let Some(record_batch) = collect(stream)
|
||||
@@ -235,7 +235,7 @@ impl Export {
|
||||
.sql(&sql)
|
||||
.await
|
||||
.with_context(|_| RequestDatabaseSnafu { sql })?;
|
||||
let Output::Stream(stream) = result else {
|
||||
let Output::Stream(stream, _) = result else {
|
||||
NotDataFromOutputSnafu.fail()?
|
||||
};
|
||||
let record_batch = collect(stream)
|
||||
|
||||
@@ -185,7 +185,7 @@ impl Repl {
|
||||
.context(RequestDatabaseSnafu { sql: &sql })?;
|
||||
|
||||
let either = match output {
|
||||
Output::Stream(s) => {
|
||||
Output::Stream(s, _) => {
|
||||
let x = RecordBatches::try_collect(s)
|
||||
.await
|
||||
.context(CollectRecordBatchesSnafu)?;
|
||||
@@ -260,6 +260,7 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
|
||||
catalog_list,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
plugins.clone(),
|
||||
));
|
||||
|
||||
@@ -43,6 +43,10 @@ impl Instance {
|
||||
pub fn datanode_mut(&mut self) -> &mut Datanode {
|
||||
&mut self.datanode
|
||||
}
|
||||
|
||||
pub fn datanode(&self) -> &Datanode {
|
||||
&self.datanode
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -235,6 +239,7 @@ impl StartCommand {
|
||||
.with_default_grpc_server(&datanode.region_server())
|
||||
.enable_http_service()
|
||||
.build()
|
||||
.await
|
||||
.context(StartDatanodeSnafu)?;
|
||||
datanode.setup_services(services);
|
||||
|
||||
|
||||
@@ -43,13 +43,17 @@ pub struct Instance {
|
||||
}
|
||||
|
||||
impl Instance {
|
||||
fn new(frontend: FeInstance) -> Self {
|
||||
pub fn new(frontend: FeInstance) -> Self {
|
||||
Self { frontend }
|
||||
}
|
||||
|
||||
pub fn mut_inner(&mut self) -> &mut FeInstance {
|
||||
&mut self.frontend
|
||||
}
|
||||
|
||||
pub fn inner(&self) -> &FeInstance {
|
||||
&self.frontend
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -271,6 +275,7 @@ impl StartCommand {
|
||||
|
||||
let servers = Services::new(opts.clone(), Arc::new(instance.clone()), plugins)
|
||||
.build()
|
||||
.await
|
||||
.context(StartFrontendSnafu)?;
|
||||
instance
|
||||
.build_servers(opts, servers)
|
||||
|
||||
@@ -32,11 +32,11 @@ lazy_static::lazy_static! {
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait App {
|
||||
pub trait App: Send {
|
||||
fn name(&self) -> &str;
|
||||
|
||||
/// A hook for implementor to make something happened before actual startup. Defaults to no-op.
|
||||
fn pre_start(&mut self) -> error::Result<()> {
|
||||
async fn pre_start(&mut self) -> error::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -46,24 +46,21 @@ pub trait App {
|
||||
}
|
||||
|
||||
pub async fn start_app(mut app: Box<dyn App>) -> error::Result<()> {
|
||||
let name = app.name().to_string();
|
||||
info!("Starting app: {}", app.name());
|
||||
|
||||
app.pre_start()?;
|
||||
app.pre_start().await?;
|
||||
|
||||
tokio::select! {
|
||||
result = app.start() => {
|
||||
if let Err(err) = result {
|
||||
error!(err; "Failed to start app {name}!");
|
||||
}
|
||||
}
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
if let Err(err) = app.stop().await {
|
||||
error!(err; "Failed to stop app {name}!");
|
||||
}
|
||||
info!("Goodbye!");
|
||||
}
|
||||
app.start().await?;
|
||||
|
||||
if let Err(e) = tokio::signal::ctrl_c().await {
|
||||
error!("Failed to listen for ctrl-c signal: {}", e);
|
||||
// It's unusual to fail to listen for ctrl-c signal, maybe there's something unexpected in
|
||||
// the underlying system. So we stop the app instead of running nonetheless to let people
|
||||
// investigate the issue.
|
||||
}
|
||||
|
||||
app.stop().await?;
|
||||
info!("Goodbye!");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -21,8 +21,8 @@ use common_catalog::consts::MIN_USER_TABLE_ID;
|
||||
use common_config::{metadata_store_dir, KvBackendConfig};
|
||||
use common_meta::cache_invalidator::DummyCacheInvalidator;
|
||||
use common_meta::datanode_manager::DatanodeManagerRef;
|
||||
use common_meta::ddl::table_meta::TableMetadataAllocator;
|
||||
use common_meta::ddl::DdlTaskExecutorRef;
|
||||
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
|
||||
use common_meta::ddl::ProcedureExecutorRef;
|
||||
use common_meta::ddl_manager::DdlManager;
|
||||
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
@@ -419,11 +419,11 @@ impl StartCommand {
|
||||
let table_metadata_manager =
|
||||
Self::create_table_metadata_manager(kv_backend.clone()).await?;
|
||||
|
||||
let table_meta_allocator = TableMetadataAllocator::new(
|
||||
let table_meta_allocator = Arc::new(TableMetadataAllocator::new(
|
||||
table_id_sequence,
|
||||
wal_options_allocator.clone(),
|
||||
table_metadata_manager.clone(),
|
||||
);
|
||||
table_metadata_manager.table_name_manager().clone(),
|
||||
));
|
||||
|
||||
let ddl_task_executor = Self::create_ddl_task_executor(
|
||||
table_metadata_manager,
|
||||
@@ -441,6 +441,7 @@ impl StartCommand {
|
||||
|
||||
let servers = Services::new(fe_opts.clone(), Arc::new(frontend.clone()), fe_plugins)
|
||||
.build()
|
||||
.await
|
||||
.context(StartFrontendSnafu)?;
|
||||
frontend
|
||||
.build_servers(fe_opts, servers)
|
||||
@@ -458,9 +459,9 @@ impl StartCommand {
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
procedure_manager: ProcedureManagerRef,
|
||||
datanode_manager: DatanodeManagerRef,
|
||||
table_meta_allocator: TableMetadataAllocator,
|
||||
) -> Result<DdlTaskExecutorRef> {
|
||||
let ddl_task_executor: DdlTaskExecutorRef = Arc::new(
|
||||
table_meta_allocator: TableMetadataAllocatorRef,
|
||||
) -> Result<ProcedureExecutorRef> {
|
||||
let procedure_executor: ProcedureExecutorRef = Arc::new(
|
||||
DdlManager::try_new(
|
||||
procedure_manager,
|
||||
datanode_manager,
|
||||
@@ -472,7 +473,7 @@ impl StartCommand {
|
||||
.context(InitDdlManagerSnafu)?,
|
||||
);
|
||||
|
||||
Ok(ddl_task_executor)
|
||||
Ok(procedure_executor)
|
||||
}
|
||||
|
||||
pub async fn create_table_metadata_manager(
|
||||
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
anymap = "1.0.0-beta.2"
|
||||
bitvec = "1.0"
|
||||
|
||||
@@ -21,6 +21,8 @@ pub mod readable_size;
|
||||
use core::any::Any;
|
||||
use std::sync::{Arc, Mutex, MutexGuard};
|
||||
|
||||
pub type AffectedRows = usize;
|
||||
|
||||
pub use bit_vec::BitVec;
|
||||
|
||||
/// [`Plugins`] is a wrapper of Arc contents.
|
||||
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
common-error.workspace = true
|
||||
common-macro.workspace = true
|
||||
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
common-base.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
arrow.workspace = true
|
||||
arrow-schema.workspace = true
|
||||
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
arrow.workspace = true
|
||||
bigdecimal.workspace = true
|
||||
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
snafu.workspace = true
|
||||
strum.workspace = true
|
||||
|
||||
@@ -19,7 +19,9 @@ pub mod format;
|
||||
pub mod mock;
|
||||
pub mod status_code;
|
||||
|
||||
pub use snafu;
|
||||
|
||||
// HACK - these headers are here for shared in gRPC services. For common HTTP headers,
|
||||
// please define in `src/servers/src/http/header.rs`.
|
||||
pub const GREPTIME_DB_HEADER_ERROR_CODE: &str = "x-greptime-err-code";
|
||||
pub const GREPTIME_DB_HEADER_ERROR_MSG: &str = "x-greptime-err-msg";
|
||||
|
||||
pub use snafu;
|
||||
|
||||
@@ -4,13 +4,19 @@ edition.workspace = true
|
||||
version.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
api.workspace = true
|
||||
arc-swap = "1.0"
|
||||
async-trait.workspace = true
|
||||
chrono-tz = "0.6"
|
||||
common-base.workspace = true
|
||||
common-catalog.workspace = true
|
||||
common-error.workspace = true
|
||||
common-macro.workspace = true
|
||||
common-meta.workspace = true
|
||||
common-query.workspace = true
|
||||
common-runtime.workspace = true
|
||||
common-telemetry.workspace = true
|
||||
@@ -23,9 +29,12 @@ num = "0.4"
|
||||
num-traits = "0.2"
|
||||
once_cell.workspace = true
|
||||
paste = "1.0"
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
session.workspace = true
|
||||
snafu.workspace = true
|
||||
statrs = "0.16"
|
||||
store-api.workspace = true
|
||||
table.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
|
||||
@@ -30,6 +30,17 @@ pub struct FunctionContext {
|
||||
pub state: Arc<FunctionState>,
|
||||
}
|
||||
|
||||
impl FunctionContext {
|
||||
/// Create a mock [`FunctionContext`] for test.
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub fn mock() -> Self {
|
||||
Self {
|
||||
query_ctx: QueryContextBuilder::default().build(),
|
||||
state: Arc::new(FunctionState::mock()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for FunctionContext {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
|
||||
@@ -21,6 +21,7 @@ use once_cell::sync::Lazy;
|
||||
use crate::function::FunctionRef;
|
||||
use crate::scalars::aggregate::{AggregateFunctionMetaRef, AggregateFunctions};
|
||||
use crate::scalars::date::DateFunction;
|
||||
use crate::scalars::expression::ExpressionFunction;
|
||||
use crate::scalars::math::MathFunction;
|
||||
use crate::scalars::numpy::NumpyFunction;
|
||||
use crate::scalars::timestamp::TimestampFunction;
|
||||
@@ -80,6 +81,7 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
|
||||
NumpyFunction::register(&function_registry);
|
||||
TimestampFunction::register(&function_registry);
|
||||
DateFunction::register(&function_registry);
|
||||
ExpressionFunction::register(&function_registry);
|
||||
|
||||
// Aggregate functions
|
||||
AggregateFunctions::register(&function_registry);
|
||||
|
||||
@@ -13,15 +13,14 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::meta::ProcedureStateResponse;
|
||||
use async_trait::async_trait;
|
||||
use common_base::AffectedRows;
|
||||
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
|
||||
use common_query::error::Result;
|
||||
use session::context::QueryContextRef;
|
||||
use table::requests::{DeleteRequest, InsertRequest};
|
||||
|
||||
pub type AffectedRows = usize;
|
||||
use store_api::storage::RegionId;
|
||||
use table::requests::{CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest};
|
||||
|
||||
/// A trait for handling table mutations in `QueryEngine`.
|
||||
#[async_trait]
|
||||
@@ -32,23 +31,39 @@ pub trait TableMutationHandler: Send + Sync {
|
||||
/// Delete rows from the table.
|
||||
async fn delete(&self, request: DeleteRequest, ctx: QueryContextRef) -> Result<AffectedRows>;
|
||||
|
||||
/// Migrate a region from source peer to target peer, returns the procedure id if success.
|
||||
async fn migrate_region(
|
||||
/// Trigger a flush task for table.
|
||||
async fn flush(&self, request: FlushTableRequest, ctx: QueryContextRef)
|
||||
-> Result<AffectedRows>;
|
||||
|
||||
/// Trigger a compaction task for table.
|
||||
async fn compact(
|
||||
&self,
|
||||
region_id: u64,
|
||||
from_peer: u64,
|
||||
to_peer: u64,
|
||||
replay_timeout: Duration,
|
||||
) -> Result<String>;
|
||||
request: CompactTableRequest,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<AffectedRows>;
|
||||
|
||||
/// Trigger a flush task for a table region.
|
||||
async fn flush_region(&self, region_id: RegionId, ctx: QueryContextRef)
|
||||
-> Result<AffectedRows>;
|
||||
|
||||
/// Trigger a compaction task for a table region.
|
||||
async fn compact_region(
|
||||
&self,
|
||||
region_id: RegionId,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<AffectedRows>;
|
||||
}
|
||||
|
||||
/// A trait for handling meta service requests in `QueryEngine`.
|
||||
/// A trait for handling procedure service requests in `QueryEngine`.
|
||||
#[async_trait]
|
||||
pub trait MetaServiceHandler: Send + Sync {
|
||||
pub trait ProcedureServiceHandler: Send + Sync {
|
||||
/// Migrate a region from source peer to target peer, returns the procedure id if success.
|
||||
async fn migrate_region(&self, request: MigrateRegionRequest) -> Result<Option<String>>;
|
||||
|
||||
/// Query the procedure' state by its id
|
||||
async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse>;
|
||||
}
|
||||
|
||||
pub type TableMutationHandlerRef = Arc<dyn TableMutationHandler>;
|
||||
|
||||
pub type MetaServiceHandlerRef = Arc<dyn MetaServiceHandler>;
|
||||
pub type ProcedureServiceHandlerRef = Arc<dyn ProcedureServiceHandler>;
|
||||
|
||||
@@ -12,8 +12,12 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_query::error::{InvalidInputTypeSnafu, Result};
|
||||
use common_query::prelude::{Signature, TypeSignature, Volatility};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::types::cast::cast;
|
||||
use datatypes::value::ValueRef;
|
||||
use snafu::ResultExt;
|
||||
|
||||
/// Create a function signature with oneof signatures of interleaving two arguments.
|
||||
pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>) -> Signature {
|
||||
@@ -27,3 +31,15 @@ pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>)
|
||||
|
||||
Signature::one_of(sigs, Volatility::Immutable)
|
||||
}
|
||||
|
||||
/// Cast a [`ValueRef`] to u64, returns `None` if fails
|
||||
pub fn cast_u64(value: &ValueRef) -> Result<Option<u64>> {
|
||||
cast((*value).into(), &ConcreteDataType::uint64_datatype())
|
||||
.context(InvalidInputTypeSnafu {
|
||||
err_msg: format!(
|
||||
"Failed to cast input into uint64, actual type: {:#?}",
|
||||
value.data_type(),
|
||||
),
|
||||
})
|
||||
.map(|v| v.as_u64())
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod macros;
|
||||
pub mod scalars;
|
||||
mod system;
|
||||
mod table;
|
||||
|
||||
27
src/common/function/src/macros.rs
Normal file
27
src/common/function/src/macros.rs
Normal file
@@ -0,0 +1,27 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
/// Ensure current function is invokded under `greptime` catalog.
|
||||
#[macro_export]
|
||||
macro_rules! ensure_greptime {
|
||||
($func_ctx: expr) => {{
|
||||
use common_catalog::consts::DEFAULT_CATALOG_NAME;
|
||||
snafu::ensure!(
|
||||
$func_ctx.query_ctx.current_catalog() == DEFAULT_CATALOG_NAME,
|
||||
common_query::error::PermissionDeniedSnafu {
|
||||
err_msg: format!("current catalog is not {DEFAULT_CATALOG_NAME}")
|
||||
}
|
||||
);
|
||||
}};
|
||||
}
|
||||
@@ -14,8 +14,22 @@
|
||||
|
||||
mod binary;
|
||||
mod ctx;
|
||||
mod is_null;
|
||||
mod unary;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
pub use binary::scalar_binary_op;
|
||||
pub use ctx::EvalContext;
|
||||
pub use unary::scalar_unary_op;
|
||||
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
use crate::scalars::expression::is_null::IsNullFunction;
|
||||
|
||||
pub(crate) struct ExpressionFunction;
|
||||
|
||||
impl ExpressionFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register(Arc::new(IsNullFunction));
|
||||
}
|
||||
}
|
||||
|
||||
109
src/common/function/src/scalars/expression/is_null.rs
Normal file
109
src/common/function/src/scalars/expression/is_null.rs
Normal file
@@ -0,0 +1,109 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt;
|
||||
use std::fmt::Display;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_query::error;
|
||||
use common_query::error::{ArrowComputeSnafu, InvalidFuncArgsSnafu};
|
||||
use common_query::prelude::{Signature, Volatility};
|
||||
use datafusion::arrow::array::ArrayRef;
|
||||
use datafusion::arrow::compute::is_null;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::prelude::VectorRef;
|
||||
use datatypes::vectors::Helper;
|
||||
use snafu::{ensure, ResultExt};
|
||||
|
||||
use crate::function::{Function, FunctionContext};
|
||||
|
||||
const NAME: &str = "isnull";
|
||||
|
||||
/// The function to check whether an expression is NULL
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct IsNullFunction;
|
||||
|
||||
impl Display for IsNullFunction {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{}", NAME.to_ascii_uppercase())
|
||||
}
|
||||
}
|
||||
|
||||
impl Function for IsNullFunction {
|
||||
fn name(&self) -> &str {
|
||||
NAME
|
||||
}
|
||||
|
||||
fn return_type(&self, _: &[ConcreteDataType]) -> common_query::error::Result<ConcreteDataType> {
|
||||
Ok(ConcreteDataType::boolean_datatype())
|
||||
}
|
||||
|
||||
fn signature(&self) -> Signature {
|
||||
Signature::any(1, Volatility::Immutable)
|
||||
}
|
||||
|
||||
fn eval(
|
||||
&self,
|
||||
_func_ctx: FunctionContext,
|
||||
columns: &[VectorRef],
|
||||
) -> common_query::error::Result<VectorRef> {
|
||||
ensure!(
|
||||
columns.len() == 1,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect exactly one, have: {}",
|
||||
columns.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
let values = &columns[0];
|
||||
let arrow_array = &values.to_arrow_array();
|
||||
let result = is_null(arrow_array).context(ArrowComputeSnafu)?;
|
||||
|
||||
Helper::try_into_vector(Arc::new(result) as ArrayRef).context(error::FromArrowArraySnafu)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_query::prelude::TypeSignature;
|
||||
use datatypes::scalars::ScalarVector;
|
||||
use datatypes::vectors::{BooleanVector, Float32Vector};
|
||||
|
||||
use super::*;
|
||||
#[test]
|
||||
fn test_is_null_function() {
|
||||
let is_null = IsNullFunction;
|
||||
assert_eq!("isnull", is_null.name());
|
||||
assert_eq!(
|
||||
ConcreteDataType::boolean_datatype(),
|
||||
is_null.return_type(&[]).unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
is_null.signature(),
|
||||
Signature {
|
||||
type_signature: TypeSignature::Any(1),
|
||||
volatility: Volatility::Immutable
|
||||
}
|
||||
);
|
||||
let values = vec![None, Some(3.0), None];
|
||||
|
||||
let args: Vec<VectorRef> = vec![Arc::new(Float32Vector::from(values))];
|
||||
let vector = is_null.eval(FunctionContext::default(), &args).unwrap();
|
||||
let expect: VectorRef = Arc::new(BooleanVector::from_vec(vec![true, false, true]));
|
||||
assert_eq!(expect, vector);
|
||||
}
|
||||
}
|
||||
@@ -128,7 +128,7 @@ mod tests {
|
||||
];
|
||||
let result = function.eval(FunctionContext::default(), &args).unwrap();
|
||||
assert_eq!(result.len(), 4);
|
||||
for i in 0..3 {
|
||||
for i in 0..4 {
|
||||
let p: i64 = (nums[i] % divs[i]) as i64;
|
||||
assert!(matches!(result.get(i), Value::Int64(v) if v == p));
|
||||
}
|
||||
@@ -160,7 +160,7 @@ mod tests {
|
||||
];
|
||||
let result = function.eval(FunctionContext::default(), &args).unwrap();
|
||||
assert_eq!(result.len(), 4);
|
||||
for i in 0..3 {
|
||||
for i in 0..4 {
|
||||
let p: u64 = (nums[i] % divs[i]) as u64;
|
||||
assert!(matches!(result.get(i), Value::UInt64(v) if v == p));
|
||||
}
|
||||
@@ -192,7 +192,7 @@ mod tests {
|
||||
];
|
||||
let result = function.eval(FunctionContext::default(), &args).unwrap();
|
||||
assert_eq!(result.len(), 4);
|
||||
for i in 0..3 {
|
||||
for i in 0..4 {
|
||||
let p: f64 = nums[i] % divs[i];
|
||||
assert!(matches!(result.get(i), Value::Float64(v) if v == p));
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::handlers::{MetaServiceHandlerRef, TableMutationHandlerRef};
|
||||
use crate::handlers::{ProcedureServiceHandlerRef, TableMutationHandlerRef};
|
||||
|
||||
/// Shared state for SQL functions.
|
||||
/// The handlers in state may be `None` in cli command-line or test cases.
|
||||
@@ -20,6 +20,104 @@ use crate::handlers::{MetaServiceHandlerRef, TableMutationHandlerRef};
|
||||
pub struct FunctionState {
|
||||
// The table mutation handler
|
||||
pub table_mutation_handler: Option<TableMutationHandlerRef>,
|
||||
// The meta service handler
|
||||
pub meta_service_handler: Option<MetaServiceHandlerRef>,
|
||||
// The procedure service handler
|
||||
pub procedure_service_handler: Option<ProcedureServiceHandlerRef>,
|
||||
}
|
||||
|
||||
impl FunctionState {
|
||||
/// Create a mock [`FunctionState`] for test.
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub fn mock() -> Self {
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::meta::ProcedureStatus;
|
||||
use async_trait::async_trait;
|
||||
use common_base::AffectedRows;
|
||||
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
|
||||
use common_query::error::Result;
|
||||
use session::context::QueryContextRef;
|
||||
use store_api::storage::RegionId;
|
||||
use table::requests::{
|
||||
CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest,
|
||||
};
|
||||
|
||||
use crate::handlers::{ProcedureServiceHandler, TableMutationHandler};
|
||||
struct MockProcedureServiceHandler;
|
||||
struct MockTableMutationHandler;
|
||||
const ROWS: usize = 42;
|
||||
|
||||
#[async_trait]
|
||||
impl ProcedureServiceHandler for MockProcedureServiceHandler {
|
||||
async fn migrate_region(
|
||||
&self,
|
||||
_request: MigrateRegionRequest,
|
||||
) -> Result<Option<String>> {
|
||||
Ok(Some("test_pid".to_string()))
|
||||
}
|
||||
|
||||
async fn query_procedure_state(&self, _pid: &str) -> Result<ProcedureStateResponse> {
|
||||
Ok(ProcedureStateResponse {
|
||||
status: ProcedureStatus::Done.into(),
|
||||
error: "OK".to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl TableMutationHandler for MockTableMutationHandler {
|
||||
async fn insert(
|
||||
&self,
|
||||
_request: InsertRequest,
|
||||
_ctx: QueryContextRef,
|
||||
) -> Result<AffectedRows> {
|
||||
Ok(ROWS)
|
||||
}
|
||||
|
||||
async fn delete(
|
||||
&self,
|
||||
_request: DeleteRequest,
|
||||
_ctx: QueryContextRef,
|
||||
) -> Result<AffectedRows> {
|
||||
Ok(ROWS)
|
||||
}
|
||||
|
||||
async fn flush(
|
||||
&self,
|
||||
_request: FlushTableRequest,
|
||||
_ctx: QueryContextRef,
|
||||
) -> Result<AffectedRows> {
|
||||
Ok(ROWS)
|
||||
}
|
||||
|
||||
async fn compact(
|
||||
&self,
|
||||
_request: CompactTableRequest,
|
||||
_ctx: QueryContextRef,
|
||||
) -> Result<AffectedRows> {
|
||||
Ok(ROWS)
|
||||
}
|
||||
|
||||
async fn flush_region(
|
||||
&self,
|
||||
_region_id: RegionId,
|
||||
_ctx: QueryContextRef,
|
||||
) -> Result<AffectedRows> {
|
||||
Ok(ROWS)
|
||||
}
|
||||
|
||||
async fn compact_region(
|
||||
&self,
|
||||
_region_id: RegionId,
|
||||
_ctx: QueryContextRef,
|
||||
) -> Result<AffectedRows> {
|
||||
Ok(ROWS)
|
||||
}
|
||||
}
|
||||
|
||||
Self {
|
||||
table_mutation_handler: Some(Arc::new(MockTableMutationHandler)),
|
||||
procedure_service_handler: Some(Arc::new(MockProcedureServiceHandler)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
mod build;
|
||||
mod database;
|
||||
mod procedure_state;
|
||||
mod timezone;
|
||||
mod version;
|
||||
|
||||
@@ -21,6 +22,7 @@ use std::sync::Arc;
|
||||
|
||||
use build::BuildFunction;
|
||||
use database::DatabaseFunction;
|
||||
use procedure_state::ProcedureStateFunction;
|
||||
use timezone::TimezoneFunction;
|
||||
use version::VersionFunction;
|
||||
|
||||
@@ -34,5 +36,6 @@ impl SystemFunction {
|
||||
registry.register(Arc::new(VersionFunction));
|
||||
registry.register(Arc::new(DatabaseFunction));
|
||||
registry.register(Arc::new(TimezoneFunction));
|
||||
registry.register(Arc::new(ProcedureStateFunction));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@ use datatypes::vectors::{StringVector, VectorRef};
|
||||
|
||||
use crate::function::{Function, FunctionContext};
|
||||
|
||||
/// Generates build information
|
||||
/// Generates build information
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct BuildFunction;
|
||||
|
||||
@@ -42,11 +42,7 @@ impl Function for BuildFunction {
|
||||
}
|
||||
|
||||
fn signature(&self) -> Signature {
|
||||
Signature::uniform(
|
||||
0,
|
||||
vec![ConcreteDataType::string_datatype()],
|
||||
Volatility::Immutable,
|
||||
)
|
||||
Signature::uniform(0, vec![], Volatility::Immutable)
|
||||
}
|
||||
|
||||
fn eval(&self, _func_ctx: FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
|
||||
@@ -75,7 +71,7 @@ mod tests {
|
||||
Signature {
|
||||
type_signature: TypeSignature::Uniform(0, valid_types),
|
||||
volatility: Volatility::Immutable
|
||||
} if valid_types == vec![ConcreteDataType::string_datatype()]
|
||||
} if valid_types.is_empty()
|
||||
));
|
||||
let build_info = common_version::build_info().to_string();
|
||||
let vector = build.eval(FunctionContext::default(), &[]).unwrap();
|
||||
|
||||
159
src/common/function/src/system/procedure_state.rs
Normal file
159
src/common/function/src/system/procedure_state.rs
Normal file
@@ -0,0 +1,159 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt;
|
||||
|
||||
use api::v1::meta::ProcedureStatus;
|
||||
use common_macro::admin_fn;
|
||||
use common_meta::rpc::procedure::ProcedureStateResponse;
|
||||
use common_query::error::Error::ThreadJoin;
|
||||
use common_query::error::{
|
||||
InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result,
|
||||
UnsupportedInputDataTypeSnafu,
|
||||
};
|
||||
use common_query::prelude::{Signature, Volatility};
|
||||
use common_telemetry::error;
|
||||
use datatypes::prelude::*;
|
||||
use datatypes::vectors::VectorRef;
|
||||
use serde::Serialize;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{ensure, Location, OptionExt};
|
||||
|
||||
use crate::ensure_greptime;
|
||||
use crate::function::{Function, FunctionContext};
|
||||
use crate::handlers::ProcedureServiceHandlerRef;
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct ProcedureStateJson {
|
||||
status: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
error: Option<String>,
|
||||
}
|
||||
|
||||
/// A function to query procedure state by its id.
|
||||
/// Such as `procedure_state(pid)`.
|
||||
#[admin_fn(
|
||||
name = "ProcedureStateFunction",
|
||||
display_name = "procedure_state",
|
||||
sig_fn = "signature",
|
||||
ret = "string"
|
||||
)]
|
||||
pub(crate) async fn procedure_state(
|
||||
procedure_service_handler: &ProcedureServiceHandlerRef,
|
||||
_ctx: &QueryContextRef,
|
||||
params: &[ValueRef<'_>],
|
||||
) -> Result<Value> {
|
||||
ensure!(
|
||||
params.len() == 1,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect 1, have: {}",
|
||||
params.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let ValueRef::String(pid) = params[0] else {
|
||||
return UnsupportedInputDataTypeSnafu {
|
||||
function: "procedure_state",
|
||||
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
|
||||
let ProcedureStateResponse { status, error, .. } =
|
||||
procedure_service_handler.query_procedure_state(pid).await?;
|
||||
let status = ProcedureStatus::try_from(status)
|
||||
.map(|v| v.as_str_name())
|
||||
.unwrap_or("Unknown");
|
||||
|
||||
let state = ProcedureStateJson {
|
||||
status: status.to_string(),
|
||||
error: if error.is_empty() { None } else { Some(error) },
|
||||
};
|
||||
let json = serde_json::to_string(&state).unwrap_or_default();
|
||||
|
||||
Ok(Value::from(json))
|
||||
}
|
||||
|
||||
fn signature() -> Signature {
|
||||
Signature::uniform(
|
||||
1,
|
||||
vec![ConcreteDataType::string_datatype()],
|
||||
Volatility::Immutable,
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_query::prelude::TypeSignature;
|
||||
use datatypes::vectors::StringVector;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_procedure_state_misc() {
|
||||
let f = ProcedureStateFunction;
|
||||
assert_eq!("procedure_state", f.name());
|
||||
assert_eq!(
|
||||
ConcreteDataType::string_datatype(),
|
||||
f.return_type(&[]).unwrap()
|
||||
);
|
||||
assert!(matches!(f.signature(),
|
||||
Signature {
|
||||
type_signature: TypeSignature::Uniform(1, valid_types),
|
||||
volatility: Volatility::Immutable
|
||||
} if valid_types == vec![ConcreteDataType::string_datatype()]
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_missing_procedure_service() {
|
||||
let f = ProcedureStateFunction;
|
||||
|
||||
let args = vec!["pid"];
|
||||
|
||||
let args = args
|
||||
.into_iter()
|
||||
.map(|arg| Arc::new(StringVector::from_slice(&[arg])) as _)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let result = f.eval(FunctionContext::default(), &args).unwrap_err();
|
||||
assert_eq!(
|
||||
"Missing ProcedureServiceHandler, not expected",
|
||||
result.to_string()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_procedure_state() {
|
||||
let f = ProcedureStateFunction;
|
||||
|
||||
let args = vec!["pid"];
|
||||
|
||||
let args = args
|
||||
.into_iter()
|
||||
.map(|arg| Arc::new(StringVector::from_slice(&[arg])) as _)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let result = f.eval(FunctionContext::mock(), &args).unwrap();
|
||||
|
||||
let expect: VectorRef = Arc::new(StringVector::from(vec![
|
||||
"{\"status\":\"Done\",\"error\":\"OK\"}",
|
||||
]));
|
||||
assert_eq!(expect, result);
|
||||
}
|
||||
}
|
||||
@@ -12,10 +12,14 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod flush_compact_region;
|
||||
mod flush_compact_table;
|
||||
mod migrate_region;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use flush_compact_region::{CompactRegionFunction, FlushRegionFunction};
|
||||
use flush_compact_table::{CompactTableFunction, FlushTableFunction};
|
||||
use migrate_region::MigrateRegionFunction;
|
||||
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
@@ -27,5 +31,9 @@ impl TableFunction {
|
||||
/// Register all table functions to [`FunctionRegistry`].
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register(Arc::new(MigrateRegionFunction));
|
||||
registry.register(Arc::new(FlushRegionFunction));
|
||||
registry.register(Arc::new(CompactRegionFunction));
|
||||
registry.register(Arc::new(FlushTableFunction));
|
||||
registry.register(Arc::new(CompactTableFunction));
|
||||
}
|
||||
}
|
||||
|
||||
148
src/common/function/src/table/flush_compact_region.rs
Normal file
148
src/common/function/src/table/flush_compact_region.rs
Normal file
@@ -0,0 +1,148 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt;
|
||||
|
||||
use common_macro::admin_fn;
|
||||
use common_query::error::Error::ThreadJoin;
|
||||
use common_query::error::{
|
||||
InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
|
||||
};
|
||||
use common_query::prelude::{Signature, Volatility};
|
||||
use common_telemetry::error;
|
||||
use datatypes::prelude::*;
|
||||
use datatypes::vectors::VectorRef;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{ensure, Location, OptionExt};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::ensure_greptime;
|
||||
use crate::function::{Function, FunctionContext};
|
||||
use crate::handlers::TableMutationHandlerRef;
|
||||
use crate::helper::cast_u64;
|
||||
|
||||
macro_rules! define_region_function {
|
||||
($name: expr, $display_name_str: expr, $display_name: ident) => {
|
||||
/// A function to $display_name
|
||||
#[admin_fn(name = $name, display_name = $display_name_str, sig_fn = "signature", ret = "uint64")]
|
||||
pub(crate) async fn $display_name(
|
||||
table_mutation_handler: &TableMutationHandlerRef,
|
||||
query_ctx: &QueryContextRef,
|
||||
params: &[ValueRef<'_>],
|
||||
) -> Result<Value> {
|
||||
ensure!(
|
||||
params.len() == 1,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect 1, have: {}",
|
||||
params.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let Some(region_id) = cast_u64(¶ms[0])? else {
|
||||
return UnsupportedInputDataTypeSnafu {
|
||||
function: $display_name_str,
|
||||
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
|
||||
let affected_rows = table_mutation_handler
|
||||
.$display_name(RegionId::from_u64(region_id), query_ctx.clone())
|
||||
.await?;
|
||||
|
||||
Ok(Value::from(affected_rows as u64))
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
define_region_function!("FlushRegionFunction", "flush_region", flush_region);
|
||||
|
||||
define_region_function!("CompactRegionFunction", "compact_region", compact_region);
|
||||
|
||||
fn signature() -> Signature {
|
||||
Signature::uniform(1, ConcreteDataType::numerics(), Volatility::Immutable)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_query::prelude::TypeSignature;
|
||||
use datatypes::vectors::UInt64Vector;
|
||||
|
||||
use super::*;
|
||||
|
||||
macro_rules! define_region_function_test {
|
||||
($name: ident, $func: ident) => {
|
||||
paste::paste! {
|
||||
#[test]
|
||||
fn [<test_ $name _misc>]() {
|
||||
let f = $func;
|
||||
assert_eq!(stringify!($name), f.name());
|
||||
assert_eq!(
|
||||
ConcreteDataType::uint64_datatype(),
|
||||
f.return_type(&[]).unwrap()
|
||||
);
|
||||
assert!(matches!(f.signature(),
|
||||
Signature {
|
||||
type_signature: TypeSignature::Uniform(1, valid_types),
|
||||
volatility: Volatility::Immutable
|
||||
} if valid_types == ConcreteDataType::numerics()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn [<test_ $name _missing_table_mutation>]() {
|
||||
let f = $func;
|
||||
|
||||
let args = vec![99];
|
||||
|
||||
let args = args
|
||||
.into_iter()
|
||||
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let result = f.eval(FunctionContext::default(), &args).unwrap_err();
|
||||
assert_eq!(
|
||||
"Missing TableMutationHandler, not expected",
|
||||
result.to_string()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn [<test_ $name>]() {
|
||||
let f = $func;
|
||||
|
||||
|
||||
let args = vec![99];
|
||||
|
||||
let args = args
|
||||
.into_iter()
|
||||
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let result = f.eval(FunctionContext::mock(), &args).unwrap();
|
||||
|
||||
let expect: VectorRef = Arc::new(UInt64Vector::from_slice([42]));
|
||||
assert_eq!(expect, result);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
define_region_function_test!(flush_region, FlushRegionFunction);
|
||||
|
||||
define_region_function_test!(compact_region, CompactRegionFunction);
|
||||
}
|
||||
178
src/common/function/src/table/flush_compact_table.rs
Normal file
178
src/common/function/src/table/flush_compact_table.rs
Normal file
@@ -0,0 +1,178 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt;
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_macro::admin_fn;
|
||||
use common_query::error::Error::ThreadJoin;
|
||||
use common_query::error::{
|
||||
InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, TableMutationSnafu,
|
||||
UnsupportedInputDataTypeSnafu,
|
||||
};
|
||||
use common_query::prelude::{Signature, Volatility};
|
||||
use common_telemetry::error;
|
||||
use datatypes::prelude::*;
|
||||
use datatypes::vectors::VectorRef;
|
||||
use session::context::QueryContextRef;
|
||||
use session::table_name::table_name_to_full_name;
|
||||
use snafu::{ensure, Location, OptionExt, ResultExt};
|
||||
use table::requests::{CompactTableRequest, FlushTableRequest};
|
||||
|
||||
use crate::ensure_greptime;
|
||||
use crate::function::{Function, FunctionContext};
|
||||
use crate::handlers::TableMutationHandlerRef;
|
||||
|
||||
macro_rules! define_table_function {
|
||||
($name: expr, $display_name_str: expr, $display_name: ident, $func: ident, $request: ident) => {
|
||||
/// A function to $func table, such as `$display_name(table_name)`.
|
||||
#[admin_fn(name = $name, display_name = $display_name_str, sig_fn = "signature", ret = "uint64")]
|
||||
pub(crate) async fn $display_name(
|
||||
table_mutation_handler: &TableMutationHandlerRef,
|
||||
query_ctx: &QueryContextRef,
|
||||
params: &[ValueRef<'_>],
|
||||
) -> Result<Value> {
|
||||
ensure!(
|
||||
params.len() == 1,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect 1, have: {}",
|
||||
params.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let ValueRef::String(table_name) = params[0] else {
|
||||
return UnsupportedInputDataTypeSnafu {
|
||||
function: $display_name_str,
|
||||
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
|
||||
let (catalog_name, schema_name, table_name) =
|
||||
table_name_to_full_name(table_name, &query_ctx)
|
||||
.map_err(BoxedError::new)
|
||||
.context(TableMutationSnafu)?;
|
||||
|
||||
let affected_rows = table_mutation_handler
|
||||
.$func(
|
||||
$request {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name,
|
||||
},
|
||||
query_ctx.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(Value::from(affected_rows as u64))
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
define_table_function!(
|
||||
"FlushTableFunction",
|
||||
"flush_table",
|
||||
flush_table,
|
||||
flush,
|
||||
FlushTableRequest
|
||||
);
|
||||
|
||||
define_table_function!(
|
||||
"CompactTableFunction",
|
||||
"compact_table",
|
||||
compact_table,
|
||||
compact,
|
||||
CompactTableRequest
|
||||
);
|
||||
|
||||
fn signature() -> Signature {
|
||||
Signature::uniform(
|
||||
1,
|
||||
vec![ConcreteDataType::string_datatype()],
|
||||
Volatility::Immutable,
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_query::prelude::TypeSignature;
|
||||
use datatypes::vectors::{StringVector, UInt64Vector};
|
||||
|
||||
use super::*;
|
||||
|
||||
macro_rules! define_table_function_test {
|
||||
($name: ident, $func: ident) => {
|
||||
paste::paste!{
|
||||
#[test]
|
||||
fn [<test_ $name _misc>]() {
|
||||
let f = $func;
|
||||
assert_eq!(stringify!($name), f.name());
|
||||
assert_eq!(
|
||||
ConcreteDataType::uint64_datatype(),
|
||||
f.return_type(&[]).unwrap()
|
||||
);
|
||||
assert!(matches!(f.signature(),
|
||||
Signature {
|
||||
type_signature: TypeSignature::Uniform(1, valid_types),
|
||||
volatility: Volatility::Immutable
|
||||
} if valid_types == vec![ConcreteDataType::string_datatype()]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn [<test_ $name _missing_table_mutation>]() {
|
||||
let f = $func;
|
||||
|
||||
let args = vec!["test"];
|
||||
|
||||
let args = args
|
||||
.into_iter()
|
||||
.map(|arg| Arc::new(StringVector::from(vec![arg])) as _)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let result = f.eval(FunctionContext::default(), &args).unwrap_err();
|
||||
assert_eq!(
|
||||
"Missing TableMutationHandler, not expected",
|
||||
result.to_string()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn [<test_ $name>]() {
|
||||
let f = $func;
|
||||
|
||||
|
||||
let args = vec!["test"];
|
||||
|
||||
let args = args
|
||||
.into_iter()
|
||||
.map(|arg| Arc::new(StringVector::from(vec![arg])) as _)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let result = f.eval(FunctionContext::mock(), &args).unwrap();
|
||||
|
||||
let expect: VectorRef = Arc::new(UInt64Vector::from_slice([42]));
|
||||
assert_eq!(expect, result);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
define_table_function_test!(flush_table, FlushTableFunction);
|
||||
|
||||
define_table_function_test!(compact_table, CompactTableFunction);
|
||||
}
|
||||
@@ -15,18 +15,25 @@
|
||||
use std::fmt::{self};
|
||||
use std::time::Duration;
|
||||
|
||||
use common_macro::admin_fn;
|
||||
use common_meta::rpc::procedure::MigrateRegionRequest;
|
||||
use common_query::error::Error::ThreadJoin;
|
||||
use common_query::error::{
|
||||
InvalidFuncArgsSnafu, InvalidInputTypeSnafu, MissingTableMutationHandlerSnafu, Result,
|
||||
};
|
||||
use common_query::error::{InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result};
|
||||
use common_query::prelude::{Signature, TypeSignature, Volatility};
|
||||
use common_telemetry::logging::error;
|
||||
use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder};
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::{StringVectorBuilder, VectorRef};
|
||||
use snafu::{Location, OptionExt, ResultExt};
|
||||
use datatypes::data_type::DataType;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::value::{Value, ValueRef};
|
||||
use datatypes::vectors::VectorRef;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{Location, OptionExt};
|
||||
|
||||
use crate::ensure_greptime;
|
||||
use crate::function::{Function, FunctionContext};
|
||||
use crate::handlers::ProcedureServiceHandlerRef;
|
||||
use crate::helper::cast_u64;
|
||||
|
||||
const DEFAULT_REPLAY_TIMEOUT_SECS: u64 = 10;
|
||||
|
||||
/// A function to migrate a region from source peer to target peer.
|
||||
/// Returns the submitted procedure id if success. Only available in cluster mode.
|
||||
@@ -38,138 +45,140 @@ use crate::function::{Function, FunctionContext};
|
||||
/// - `region_id`: the region id
|
||||
/// - `from_peer`: the source peer id
|
||||
/// - `to_peer`: the target peer id
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct MigrateRegionFunction;
|
||||
#[admin_fn(
|
||||
name = "MigrateRegionFunction",
|
||||
display_name = "migrate_region",
|
||||
sig_fn = "signature",
|
||||
ret = "string"
|
||||
)]
|
||||
pub(crate) async fn migrate_region(
|
||||
procedure_service_handler: &ProcedureServiceHandlerRef,
|
||||
_ctx: &QueryContextRef,
|
||||
params: &[ValueRef<'_>],
|
||||
) -> Result<Value> {
|
||||
let (region_id, from_peer, to_peer, replay_timeout) = match params.len() {
|
||||
3 => {
|
||||
let region_id = cast_u64(¶ms[0])?;
|
||||
let from_peer = cast_u64(¶ms[1])?;
|
||||
let to_peer = cast_u64(¶ms[2])?;
|
||||
|
||||
const NAME: &str = "migrate_region";
|
||||
const DEFAULT_REPLAY_TIMEOUT_SECS: u64 = 10;
|
||||
(
|
||||
region_id,
|
||||
from_peer,
|
||||
to_peer,
|
||||
Some(DEFAULT_REPLAY_TIMEOUT_SECS),
|
||||
)
|
||||
}
|
||||
|
||||
fn cast_u64_vector(vector: &VectorRef) -> Result<VectorRef> {
|
||||
vector
|
||||
.cast(&ConcreteDataType::uint64_datatype())
|
||||
.context(InvalidInputTypeSnafu {
|
||||
err_msg: format!(
|
||||
"Failed to cast input into uint64, actual type: {:#?}",
|
||||
vector.data_type(),
|
||||
),
|
||||
})
|
||||
}
|
||||
4 => {
|
||||
let region_id = cast_u64(¶ms[0])?;
|
||||
let from_peer = cast_u64(¶ms[1])?;
|
||||
let to_peer = cast_u64(¶ms[2])?;
|
||||
let replay_timeout = cast_u64(¶ms[3])?;
|
||||
|
||||
impl Function for MigrateRegionFunction {
|
||||
fn name(&self) -> &str {
|
||||
NAME
|
||||
}
|
||||
(region_id, from_peer, to_peer, replay_timeout)
|
||||
}
|
||||
|
||||
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
|
||||
Ok(ConcreteDataType::string_datatype())
|
||||
}
|
||||
|
||||
fn signature(&self) -> Signature {
|
||||
Signature::one_of(
|
||||
vec![
|
||||
// migrate_region(region_id, from_peer, to_peer)
|
||||
TypeSignature::Uniform(3, ConcreteDataType::numerics()),
|
||||
// migrate_region(region_id, from_peer, to_peer, timeout(secs))
|
||||
TypeSignature::Uniform(4, ConcreteDataType::numerics()),
|
||||
],
|
||||
Volatility::Immutable,
|
||||
)
|
||||
}
|
||||
|
||||
fn eval(&self, func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
|
||||
let (region_ids, from_peers, to_peers, replay_timeouts) = match columns.len() {
|
||||
3 => {
|
||||
let region_ids = cast_u64_vector(&columns[0])?;
|
||||
let from_peers = cast_u64_vector(&columns[1])?;
|
||||
let to_peers = cast_u64_vector(&columns[2])?;
|
||||
|
||||
(region_ids, from_peers, to_peers, None)
|
||||
size => {
|
||||
return InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect exactly 3 or 4, have: {}",
|
||||
size
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
};
|
||||
|
||||
4 => {
|
||||
let region_ids = cast_u64_vector(&columns[0])?;
|
||||
let from_peers = cast_u64_vector(&columns[1])?;
|
||||
let to_peers = cast_u64_vector(&columns[2])?;
|
||||
let replay_timeouts = cast_u64_vector(&columns[3])?;
|
||||
match (region_id, from_peer, to_peer, replay_timeout) {
|
||||
(Some(region_id), Some(from_peer), Some(to_peer), Some(replay_timeout)) => {
|
||||
let pid = procedure_service_handler
|
||||
.migrate_region(MigrateRegionRequest {
|
||||
region_id,
|
||||
from_peer,
|
||||
to_peer,
|
||||
replay_timeout: Duration::from_secs(replay_timeout),
|
||||
})
|
||||
.await?;
|
||||
|
||||
(region_ids, from_peers, to_peers, Some(replay_timeouts))
|
||||
match pid {
|
||||
Some(pid) => Ok(Value::from(pid)),
|
||||
None => Ok(Value::Null),
|
||||
}
|
||||
}
|
||||
|
||||
size => {
|
||||
return InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect exactly 3 or 4, have: {}",
|
||||
size
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
};
|
||||
|
||||
std::thread::spawn(move || {
|
||||
let len = region_ids.len();
|
||||
let mut results = StringVectorBuilder::with_capacity(len);
|
||||
|
||||
for index in 0..len {
|
||||
let region_id = region_ids.get(index);
|
||||
let from_peer = from_peers.get(index);
|
||||
let to_peer = to_peers.get(index);
|
||||
let replay_timeout = match &replay_timeouts {
|
||||
Some(replay_timeouts) => replay_timeouts.get(index),
|
||||
None => Value::UInt64(DEFAULT_REPLAY_TIMEOUT_SECS),
|
||||
};
|
||||
|
||||
match (region_id, from_peer, to_peer, replay_timeout) {
|
||||
(
|
||||
Value::UInt64(region_id),
|
||||
Value::UInt64(from_peer),
|
||||
Value::UInt64(to_peer),
|
||||
Value::UInt64(replay_timeout),
|
||||
) => {
|
||||
let func_ctx = func_ctx.clone();
|
||||
|
||||
let pid = common_runtime::block_on_read(async move {
|
||||
func_ctx
|
||||
.state
|
||||
.table_mutation_handler
|
||||
.as_ref()
|
||||
.context(MissingTableMutationHandlerSnafu)?
|
||||
.migrate_region(
|
||||
region_id,
|
||||
from_peer,
|
||||
to_peer,
|
||||
Duration::from_secs(replay_timeout),
|
||||
)
|
||||
.await
|
||||
})?;
|
||||
|
||||
results.push(Some(&pid));
|
||||
}
|
||||
_ => {
|
||||
results.push(None);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(results.to_vector())
|
||||
})
|
||||
.join()
|
||||
.map_err(|e| {
|
||||
error!(e; "Join thread error");
|
||||
ThreadJoin {
|
||||
location: Location::default(),
|
||||
}
|
||||
})?
|
||||
_ => Ok(Value::Null),
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for MigrateRegionFunction {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "MIGRATE_REGION")
|
||||
}
|
||||
fn signature() -> Signature {
|
||||
Signature::one_of(
|
||||
vec![
|
||||
// migrate_region(region_id, from_peer, to_peer)
|
||||
TypeSignature::Uniform(3, ConcreteDataType::numerics()),
|
||||
// migrate_region(region_id, from_peer, to_peer, timeout(secs))
|
||||
TypeSignature::Uniform(4, ConcreteDataType::numerics()),
|
||||
],
|
||||
Volatility::Immutable,
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
// FIXME(dennis): test in the following PR.
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_query::prelude::TypeSignature;
|
||||
use datatypes::vectors::{StringVector, UInt64Vector};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_migrate_region_misc() {
|
||||
let f = MigrateRegionFunction;
|
||||
assert_eq!("migrate_region", f.name());
|
||||
assert_eq!(
|
||||
ConcreteDataType::string_datatype(),
|
||||
f.return_type(&[]).unwrap()
|
||||
);
|
||||
assert!(matches!(f.signature(),
|
||||
Signature {
|
||||
type_signature: TypeSignature::OneOf(sigs),
|
||||
volatility: Volatility::Immutable
|
||||
} if sigs.len() == 2));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_missing_procedure_service() {
|
||||
let f = MigrateRegionFunction;
|
||||
|
||||
let args = vec![1, 1, 1];
|
||||
|
||||
let args = args
|
||||
.into_iter()
|
||||
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let result = f.eval(FunctionContext::default(), &args).unwrap_err();
|
||||
assert_eq!(
|
||||
"Missing ProcedureServiceHandler, not expected",
|
||||
result.to_string()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_migrate_region() {
|
||||
let f = MigrateRegionFunction;
|
||||
|
||||
let args = vec![1, 1, 1];
|
||||
|
||||
let args = args
|
||||
.into_iter()
|
||||
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let result = f.eval(FunctionContext::mock(), &args).unwrap();
|
||||
|
||||
let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"]));
|
||||
assert_eq!(expect, result);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
async-trait.workspace = true
|
||||
common-error.workspace = true
|
||||
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
api.workspace = true
|
||||
async-trait.workspace = true
|
||||
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
api.workspace = true
|
||||
arrow-flight.workspace = true
|
||||
|
||||
@@ -50,8 +50,12 @@ pub struct FlightEncoder {
|
||||
|
||||
impl Default for FlightEncoder {
|
||||
fn default() -> Self {
|
||||
let write_options = writer::IpcWriteOptions::default()
|
||||
.try_with_compression(Some(arrow::ipc::CompressionType::LZ4_FRAME))
|
||||
.unwrap();
|
||||
|
||||
Self {
|
||||
write_options: writer::IpcWriteOptions::default(),
|
||||
write_options,
|
||||
data_gen: writer::IpcDataGenerator::default(),
|
||||
dictionary_tracker: writer::DictionaryTracker::new(false),
|
||||
}
|
||||
|
||||
@@ -7,6 +7,9 @@ license.workspace = true
|
||||
[lib]
|
||||
proc-macro = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
proc-macro2 = "1.0.66"
|
||||
quote = "1.0"
|
||||
|
||||
236
src/common/macro/src/admin_fn.rs
Normal file
236
src/common/macro/src/admin_fn.rs
Normal file
@@ -0,0 +1,236 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use proc_macro::TokenStream;
|
||||
use quote::quote;
|
||||
use syn::spanned::Spanned;
|
||||
use syn::{
|
||||
parse_macro_input, Attribute, AttributeArgs, Ident, ItemFn, Signature, Type, TypePath,
|
||||
TypeReference, Visibility,
|
||||
};
|
||||
|
||||
use crate::utils::{extract_arg_map, extract_input_types, get_ident};
|
||||
|
||||
/// Internal util macro to early return on error.
|
||||
macro_rules! ok {
|
||||
($item:expr) => {
|
||||
match $item {
|
||||
Ok(item) => item,
|
||||
Err(e) => return e.into_compile_error().into(),
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// Internal util macro to to create an error.
|
||||
macro_rules! error {
|
||||
($span:expr, $msg: expr) => {
|
||||
Err(syn::Error::new($span, $msg))
|
||||
};
|
||||
}
|
||||
|
||||
pub(crate) fn process_admin_fn(args: TokenStream, input: TokenStream) -> TokenStream {
|
||||
let mut result = TokenStream::new();
|
||||
|
||||
// extract arg map
|
||||
let arg_pairs = parse_macro_input!(args as AttributeArgs);
|
||||
let arg_span = arg_pairs[0].span();
|
||||
let arg_map = ok!(extract_arg_map(arg_pairs));
|
||||
|
||||
// decompose the fn block
|
||||
let compute_fn = parse_macro_input!(input as ItemFn);
|
||||
let ItemFn {
|
||||
attrs,
|
||||
vis,
|
||||
sig,
|
||||
block,
|
||||
} = compute_fn;
|
||||
|
||||
// extract fn arg list
|
||||
let Signature {
|
||||
inputs,
|
||||
ident: fn_name,
|
||||
..
|
||||
} = &sig;
|
||||
|
||||
let arg_types = ok!(extract_input_types(inputs));
|
||||
if arg_types.len() < 2 {
|
||||
ok!(error!(
|
||||
sig.span(),
|
||||
"Expect at least two argument for admin fn: (handler, query_ctx)"
|
||||
));
|
||||
}
|
||||
let handler_type = ok!(extract_handler_type(&arg_types));
|
||||
|
||||
// build the struct and its impl block
|
||||
// only do this when `display_name` is specified
|
||||
if let Ok(display_name) = get_ident(&arg_map, "display_name", arg_span) {
|
||||
let struct_code = build_struct(
|
||||
attrs,
|
||||
vis,
|
||||
fn_name,
|
||||
ok!(get_ident(&arg_map, "name", arg_span)),
|
||||
ok!(get_ident(&arg_map, "sig_fn", arg_span)),
|
||||
ok!(get_ident(&arg_map, "ret", arg_span)),
|
||||
handler_type,
|
||||
display_name,
|
||||
);
|
||||
result.extend(struct_code);
|
||||
}
|
||||
|
||||
// preserve this fn
|
||||
let input_fn_code: TokenStream = quote! {
|
||||
#sig { #block }
|
||||
}
|
||||
.into();
|
||||
|
||||
result.extend(input_fn_code);
|
||||
result
|
||||
}
|
||||
|
||||
/// Retrieve the handler type, `ProcedureServiceHandlerRef` or `TableMutationHandlerRef`.
|
||||
fn extract_handler_type(arg_types: &[Type]) -> Result<&Ident, syn::Error> {
|
||||
match &arg_types[0] {
|
||||
Type::Reference(TypeReference { elem, .. }) => match &**elem {
|
||||
Type::Path(TypePath { path, .. }) => Ok(&path
|
||||
.segments
|
||||
.first()
|
||||
.expect("Expected a reference of handler")
|
||||
.ident),
|
||||
other => {
|
||||
error!(other.span(), "Expected a reference of handler")
|
||||
}
|
||||
},
|
||||
other => {
|
||||
error!(other.span(), "Expected a reference of handler")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Build the function struct
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn build_struct(
|
||||
attrs: Vec<Attribute>,
|
||||
vis: Visibility,
|
||||
fn_name: &Ident,
|
||||
name: Ident,
|
||||
sig_fn: Ident,
|
||||
ret: Ident,
|
||||
handler_type: &Ident,
|
||||
display_name_ident: Ident,
|
||||
) -> TokenStream {
|
||||
let display_name = display_name_ident.to_string();
|
||||
let ret = Ident::new(&format!("{ret}_datatype"), ret.span());
|
||||
let uppcase_display_name = display_name.to_uppercase();
|
||||
// Get the handler name in function state by the argument ident
|
||||
let (handler, snafu_type) = match handler_type.to_string().as_str() {
|
||||
"ProcedureServiceHandlerRef" => (
|
||||
Ident::new("procedure_service_handler", handler_type.span()),
|
||||
Ident::new("MissingProcedureServiceHandlerSnafu", handler_type.span()),
|
||||
),
|
||||
|
||||
"TableMutationHandlerRef" => (
|
||||
Ident::new("table_mutation_handler", handler_type.span()),
|
||||
Ident::new("MissingTableMutationHandlerSnafu", handler_type.span()),
|
||||
),
|
||||
handler => ok!(error!(
|
||||
handler_type.span(),
|
||||
format!("Unknown handler type: {handler}")
|
||||
)),
|
||||
};
|
||||
|
||||
quote! {
|
||||
#(#attrs)*
|
||||
#[derive(Debug)]
|
||||
#vis struct #name;
|
||||
|
||||
impl fmt::Display for #name {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, #uppcase_display_name)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
impl Function for #name {
|
||||
fn name(&self) -> &'static str {
|
||||
#display_name
|
||||
}
|
||||
|
||||
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
|
||||
Ok(ConcreteDataType::#ret())
|
||||
}
|
||||
|
||||
fn signature(&self) -> Signature {
|
||||
#sig_fn()
|
||||
}
|
||||
|
||||
fn eval(&self, func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
|
||||
// Ensure under the `greptime` catalog for security
|
||||
ensure_greptime!(func_ctx);
|
||||
|
||||
let columns_num = columns.len();
|
||||
let rows_num = if columns.is_empty() {
|
||||
1
|
||||
} else {
|
||||
columns[0].len()
|
||||
};
|
||||
let columns = Vec::from(columns);
|
||||
|
||||
// TODO(dennis): DataFusion doesn't support async UDF currently
|
||||
std::thread::spawn(move || {
|
||||
let query_ctx = &func_ctx.query_ctx;
|
||||
let handler = func_ctx
|
||||
.state
|
||||
.#handler
|
||||
.as_ref()
|
||||
.context(#snafu_type)?;
|
||||
|
||||
let mut builder = ConcreteDataType::#ret()
|
||||
.create_mutable_vector(rows_num);
|
||||
|
||||
if columns_num == 0 {
|
||||
let result = common_runtime::block_on_read(async move {
|
||||
#fn_name(handler, query_ctx, &[]).await
|
||||
})?;
|
||||
|
||||
builder.push_value_ref(result.as_value_ref());
|
||||
} else {
|
||||
for i in 0..rows_num {
|
||||
let args: Vec<_> = columns.iter()
|
||||
.map(|vector| vector.get_ref(i))
|
||||
.collect();
|
||||
|
||||
let result = common_runtime::block_on_read(async move {
|
||||
#fn_name(handler, query_ctx, &args).await
|
||||
})?;
|
||||
|
||||
builder.push_value_ref(result.as_value_ref());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(builder.to_vector())
|
||||
})
|
||||
.join()
|
||||
.map_err(|e| {
|
||||
error!(e; "Join thread error");
|
||||
ThreadJoin {
|
||||
location: Location::default(),
|
||||
}
|
||||
})?
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
.into()
|
||||
}
|
||||
@@ -12,17 +12,20 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod admin_fn;
|
||||
mod aggr_func;
|
||||
mod print_caller;
|
||||
mod range_fn;
|
||||
mod stack_trace_debug;
|
||||
|
||||
mod utils;
|
||||
use aggr_func::{impl_aggr_func_type_store, impl_as_aggr_func_creator};
|
||||
use print_caller::process_print_caller;
|
||||
use proc_macro::TokenStream;
|
||||
use range_fn::process_range_fn;
|
||||
use syn::{parse_macro_input, DeriveInput};
|
||||
|
||||
use crate::admin_fn::process_admin_fn;
|
||||
|
||||
/// Make struct implemented trait [AggrFuncTypeStore], which is necessary when writing UDAF.
|
||||
/// This derive macro is expect to be used along with attribute macro [macro@as_aggr_func_creator].
|
||||
#[proc_macro_derive(AggrFuncTypeStore)]
|
||||
@@ -68,6 +71,25 @@ pub fn range_fn(args: TokenStream, input: TokenStream) -> TokenStream {
|
||||
process_range_fn(args, input)
|
||||
}
|
||||
|
||||
/// Attribute macro to convert a normal function to SQL administration function. The annotated function
|
||||
/// should accept:
|
||||
/// - `&ProcedureServiceHandlerRef` or `&TableMutationHandlerRef` as the first argument,
|
||||
/// - `&QueryContextRef` as the second argument, and
|
||||
/// - `&[ValueRef<'_>]` as the third argument which is SQL function input values in each row.
|
||||
/// Return type must be `common_query::error::Result<Value>`.
|
||||
///
|
||||
/// # Example see `common/function/src/system/procedure_state.rs`.
|
||||
///
|
||||
/// # Arguments
|
||||
/// - `name`: The name of the generated `Function` implementation.
|
||||
/// - `ret`: The return type of the generated SQL function, it will be transformed into `ConcreteDataType::{ret}_datatype()` result.
|
||||
/// - `display_name`: The display name of the generated SQL function.
|
||||
/// - `sig_fn`: the function to returns `Signature` of generated `Function`.
|
||||
#[proc_macro_attribute]
|
||||
pub fn admin_fn(args: TokenStream, input: TokenStream) -> TokenStream {
|
||||
process_admin_fn(args, input)
|
||||
}
|
||||
|
||||
/// Attribute macro to print the caller to the annotated function.
|
||||
/// The caller is printed as its filename and the call site line number.
|
||||
///
|
||||
|
||||
@@ -12,20 +12,16 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use proc_macro::TokenStream;
|
||||
use proc_macro2::Span;
|
||||
use quote::quote;
|
||||
use syn::punctuated::Punctuated;
|
||||
use syn::spanned::Spanned;
|
||||
use syn::token::Comma;
|
||||
use syn::{
|
||||
parse_macro_input, Attribute, AttributeArgs, FnArg, Ident, ItemFn, Meta, MetaNameValue,
|
||||
NestedMeta, Signature, Type, TypeReference, Visibility,
|
||||
parse_macro_input, Attribute, AttributeArgs, Ident, ItemFn, Signature, Type, TypeReference,
|
||||
Visibility,
|
||||
};
|
||||
|
||||
/// Internal util macro to early return on error.
|
||||
use crate::utils::{extract_arg_map, extract_input_types, get_ident};
|
||||
|
||||
macro_rules! ok {
|
||||
($item:expr) => {
|
||||
match $item {
|
||||
@@ -89,48 +85,6 @@ pub(crate) fn process_range_fn(args: TokenStream, input: TokenStream) -> TokenSt
|
||||
result
|
||||
}
|
||||
|
||||
/// Extract a String <-> Ident map from the attribute args.
|
||||
fn extract_arg_map(args: Vec<NestedMeta>) -> Result<HashMap<String, Ident>, syn::Error> {
|
||||
args.into_iter()
|
||||
.map(|meta| {
|
||||
if let NestedMeta::Meta(Meta::NameValue(MetaNameValue { path, lit, .. })) = meta {
|
||||
let name = path.get_ident().unwrap().to_string();
|
||||
let ident = match lit {
|
||||
syn::Lit::Str(lit_str) => lit_str.parse::<Ident>(),
|
||||
_ => Err(syn::Error::new(
|
||||
lit.span(),
|
||||
"Unexpected attribute format. Expected `name = \"value\"`",
|
||||
)),
|
||||
}?;
|
||||
Ok((name, ident))
|
||||
} else {
|
||||
Err(syn::Error::new(
|
||||
meta.span(),
|
||||
"Unexpected attribute format. Expected `name = \"value\"`",
|
||||
))
|
||||
}
|
||||
})
|
||||
.collect::<Result<HashMap<String, Ident>, syn::Error>>()
|
||||
}
|
||||
|
||||
/// Helper function to get an Ident from the previous arg map.
|
||||
fn get_ident(map: &HashMap<String, Ident>, key: &str, span: Span) -> Result<Ident, syn::Error> {
|
||||
map.get(key)
|
||||
.cloned()
|
||||
.ok_or_else(|| syn::Error::new(span, format!("Expect attribute {key} but not found")))
|
||||
}
|
||||
|
||||
/// Extract the argument list from the annotated function.
|
||||
fn extract_input_types(inputs: &Punctuated<FnArg, Comma>) -> Result<Vec<Type>, syn::Error> {
|
||||
inputs
|
||||
.iter()
|
||||
.map(|arg| match arg {
|
||||
FnArg::Receiver(receiver) => Err(syn::Error::new(receiver.span(), "expected bool")),
|
||||
FnArg::Typed(pat_type) => Ok(*pat_type.ty.clone()),
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn build_struct(
|
||||
attrs: Vec<Attribute>,
|
||||
vis: Visibility,
|
||||
@@ -214,7 +168,7 @@ fn build_calc_fn(
|
||||
|
||||
#( let #range_array_names = RangeArray::try_new(extract_array(&input[#param_numbers])?.to_data().into())?; )*
|
||||
|
||||
// TODO(ruihang): add ensure!()
|
||||
// TODO(ruihang): add ensure!()
|
||||
|
||||
let mut result_array = Vec::new();
|
||||
for index in 0..#first_range_array_name.len(){
|
||||
|
||||
69
src/common/macro/src/utils.rs
Normal file
69
src/common/macro/src/utils.rs
Normal file
@@ -0,0 +1,69 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use proc_macro2::Span;
|
||||
use syn::punctuated::Punctuated;
|
||||
use syn::spanned::Spanned;
|
||||
use syn::token::Comma;
|
||||
use syn::{FnArg, Ident, Meta, MetaNameValue, NestedMeta, Type};
|
||||
|
||||
/// Extract a String <-> Ident map from the attribute args.
|
||||
pub(crate) fn extract_arg_map(args: Vec<NestedMeta>) -> Result<HashMap<String, Ident>, syn::Error> {
|
||||
args.into_iter()
|
||||
.map(|meta| {
|
||||
if let NestedMeta::Meta(Meta::NameValue(MetaNameValue { path, lit, .. })) = meta {
|
||||
let name = path.get_ident().unwrap().to_string();
|
||||
let ident = match lit {
|
||||
syn::Lit::Str(lit_str) => lit_str.parse::<Ident>(),
|
||||
_ => Err(syn::Error::new(
|
||||
lit.span(),
|
||||
"Unexpected attribute format. Expected `name = \"value\"`",
|
||||
)),
|
||||
}?;
|
||||
Ok((name, ident))
|
||||
} else {
|
||||
Err(syn::Error::new(
|
||||
meta.span(),
|
||||
"Unexpected attribute format. Expected `name = \"value\"`",
|
||||
))
|
||||
}
|
||||
})
|
||||
.collect::<Result<HashMap<String, Ident>, syn::Error>>()
|
||||
}
|
||||
|
||||
/// Helper function to get an Ident from the previous arg map.
|
||||
pub(crate) fn get_ident(
|
||||
map: &HashMap<String, Ident>,
|
||||
key: &str,
|
||||
span: Span,
|
||||
) -> Result<Ident, syn::Error> {
|
||||
map.get(key)
|
||||
.cloned()
|
||||
.ok_or_else(|| syn::Error::new(span, format!("Expect attribute {key} but not found")))
|
||||
}
|
||||
|
||||
/// Extract the argument list from the annotated function.
|
||||
pub(crate) fn extract_input_types(
|
||||
inputs: &Punctuated<FnArg, Comma>,
|
||||
) -> Result<Vec<Type>, syn::Error> {
|
||||
inputs
|
||||
.iter()
|
||||
.map(|arg| match arg {
|
||||
FnArg::Receiver(receiver) => Err(syn::Error::new(receiver.span(), "expected bool")),
|
||||
FnArg::Typed(pat_type) => Ok(*pat_type.ty.clone()),
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
common-error.workspace = true
|
||||
common-macro.workspace = true
|
||||
|
||||
@@ -7,6 +7,9 @@ license.workspace = true
|
||||
[features]
|
||||
testing = []
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
api.workspace = true
|
||||
async-recursion = "1.0"
|
||||
@@ -15,11 +18,13 @@ async-trait.workspace = true
|
||||
base64.workspace = true
|
||||
bytes.workspace = true
|
||||
chrono.workspace = true
|
||||
common-base.workspace = true
|
||||
common-catalog.workspace = true
|
||||
common-error.workspace = true
|
||||
common-grpc-expr.workspace = true
|
||||
common-macro.workspace = true
|
||||
common-procedure.workspace = true
|
||||
common-procedure-test.workspace = true
|
||||
common-recordbatch.workspace = true
|
||||
common-runtime.workspace = true
|
||||
common-telemetry.workspace = true
|
||||
|
||||
@@ -15,23 +15,25 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::region::{QueryRequest, RegionRequest};
|
||||
pub use common_base::AffectedRows;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::peer::Peer;
|
||||
|
||||
pub type AffectedRows = u64;
|
||||
|
||||
/// The trait for handling requests to datanode.
|
||||
#[async_trait::async_trait]
|
||||
pub trait Datanode: Send + Sync {
|
||||
/// Handles DML, and DDL requests.
|
||||
async fn handle(&self, request: RegionRequest) -> Result<AffectedRows>;
|
||||
|
||||
/// Handles query requests
|
||||
async fn handle_query(&self, request: QueryRequest) -> Result<SendableRecordBatchStream>;
|
||||
}
|
||||
|
||||
pub type DatanodeRef = Arc<dyn Datanode>;
|
||||
|
||||
/// Datanode manager
|
||||
#[async_trait::async_trait]
|
||||
pub trait DatanodeManager: Send + Sync {
|
||||
/// Retrieves a target `datanode`.
|
||||
|
||||
@@ -18,6 +18,7 @@ use std::sync::Arc;
|
||||
use common_telemetry::tracing_context::W3cTrace;
|
||||
use store_api::storage::{RegionNumber, TableId};
|
||||
|
||||
use self::table_meta::TableMetadataAllocatorRef;
|
||||
use crate::cache_invalidator::CacheInvalidatorRef;
|
||||
use crate::datanode_manager::DatanodeManagerRef;
|
||||
use crate::error::Result;
|
||||
@@ -25,6 +26,7 @@ use crate::key::table_route::TableRouteValue;
|
||||
use crate::key::TableMetadataManagerRef;
|
||||
use crate::region_keeper::MemoryRegionKeeperRef;
|
||||
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
|
||||
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
|
||||
|
||||
pub mod alter_table;
|
||||
pub mod create_logical_tables;
|
||||
@@ -32,6 +34,10 @@ pub mod create_table;
|
||||
mod create_table_template;
|
||||
pub mod drop_table;
|
||||
pub mod table_meta;
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub mod test_util;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
pub mod truncate_table;
|
||||
pub mod utils;
|
||||
|
||||
@@ -41,16 +47,32 @@ pub struct ExecutorContext {
|
||||
pub tracing_context: Option<W3cTrace>,
|
||||
}
|
||||
|
||||
/// The procedure executor that accepts ddl, region migration task etc.
|
||||
#[async_trait::async_trait]
|
||||
pub trait DdlTaskExecutor: Send + Sync {
|
||||
pub trait ProcedureExecutor: Send + Sync {
|
||||
/// Submit a ddl task
|
||||
async fn submit_ddl_task(
|
||||
&self,
|
||||
ctx: &ExecutorContext,
|
||||
request: SubmitDdlTaskRequest,
|
||||
) -> Result<SubmitDdlTaskResponse>;
|
||||
|
||||
/// Submit a region migration task
|
||||
async fn migrate_region(
|
||||
&self,
|
||||
ctx: &ExecutorContext,
|
||||
request: MigrateRegionRequest,
|
||||
) -> Result<MigrateRegionResponse>;
|
||||
|
||||
/// Query the procedure state by its id
|
||||
async fn query_procedure_state(
|
||||
&self,
|
||||
ctx: &ExecutorContext,
|
||||
pid: &str,
|
||||
) -> Result<ProcedureStateResponse>;
|
||||
}
|
||||
|
||||
pub type DdlTaskExecutorRef = Arc<dyn DdlTaskExecutor>;
|
||||
pub type ProcedureExecutorRef = Arc<dyn ProcedureExecutor>;
|
||||
|
||||
pub struct TableMetadataAllocatorContext {
|
||||
pub cluster_id: u64,
|
||||
@@ -73,4 +95,5 @@ pub struct DdlContext {
|
||||
pub cache_invalidator: CacheInvalidatorRef,
|
||||
pub table_metadata_manager: TableMetadataManagerRef,
|
||||
pub memory_region_keeper: MemoryRegionKeeperRef,
|
||||
pub table_metadata_allocator: TableMetadataAllocatorRef,
|
||||
}
|
||||
|
||||
@@ -40,7 +40,7 @@ use table::requests::AlterKind;
|
||||
use table::table_reference::TableReference;
|
||||
|
||||
use crate::cache_invalidator::Context;
|
||||
use crate::ddl::utils::handle_operate_region_error;
|
||||
use crate::ddl::utils::add_peer_context_if_needed;
|
||||
use crate::ddl::DdlContext;
|
||||
use crate::error::{self, ConvertAlterTableRequestSnafu, Error, InvalidProtoMsgSnafu, Result};
|
||||
use crate::key::table_info::TableInfoValue;
|
||||
@@ -226,7 +226,7 @@ impl AlterTableProcedure {
|
||||
// The engine will throw this code when the schema version not match.
|
||||
// As this procedure has locked the table, the only reason for this error
|
||||
// is procedure is succeeded before and is retrying.
|
||||
return Err(handle_operate_region_error(datanode)(err));
|
||||
return Err(add_peer_context_if_needed(datanode)(err));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -31,7 +31,7 @@ use strum::AsRefStr;
|
||||
use table::metadata::{RawTableInfo, TableId};
|
||||
|
||||
use crate::ddl::create_table_template::{build_template, CreateRequestBuilder};
|
||||
use crate::ddl::utils::{handle_operate_region_error, handle_retry_error, region_storage_path};
|
||||
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, region_storage_path};
|
||||
use crate::ddl::DdlContext;
|
||||
use crate::error::{Result, TableAlreadyExistsSnafu};
|
||||
use crate::key::table_name::TableNameKey;
|
||||
@@ -66,7 +66,16 @@ impl CreateLogicalTablesProcedure {
|
||||
Ok(Self { context, creator })
|
||||
}
|
||||
|
||||
async fn on_prepare(&mut self) -> Result<Status> {
|
||||
/// On the prepares step, it performs:
|
||||
/// - Checks whether physical table exists.
|
||||
/// - Checks whether logical tables exist.
|
||||
/// - Allocates the table ids.
|
||||
///
|
||||
/// Abort(non-retry):
|
||||
/// - The physical table does not exist.
|
||||
/// - Failed to check whether tables exist.
|
||||
/// - One of logical tables has existing, and the table creation task without setting `create_if_not_exists`.
|
||||
pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
|
||||
let manager = &self.context.table_metadata_manager;
|
||||
|
||||
// Sets physical region numbers
|
||||
@@ -80,7 +89,7 @@ impl CreateLogicalTablesProcedure {
|
||||
.data
|
||||
.set_physical_region_numbers(physical_region_numbers);
|
||||
|
||||
// Checks if the tables exists
|
||||
// Checks if the tables exist
|
||||
let table_name_keys = self
|
||||
.creator
|
||||
.data
|
||||
@@ -96,24 +105,9 @@ impl CreateLogicalTablesProcedure {
|
||||
.map(|x| x.map(|x| x.table_id()))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Sets table ids already exists
|
||||
self.creator
|
||||
.data
|
||||
.set_table_ids_already_exists(already_exists_tables_ids);
|
||||
|
||||
// If all tables do not exists, we can create them directly.
|
||||
if self.creator.data.is_all_tables_not_exists() {
|
||||
self.creator.data.state = CreateTablesState::DatanodeCreateRegions;
|
||||
return Ok(Status::executing(true));
|
||||
}
|
||||
|
||||
// Filter out the tables that already exist.
|
||||
let tasks = &self.creator.data.tasks;
|
||||
let mut filtered_tasks = Vec::with_capacity(tasks.len());
|
||||
for (task, table_id) in tasks
|
||||
.iter()
|
||||
.zip(self.creator.data.table_ids_already_exists().iter())
|
||||
{
|
||||
// Validates the tasks
|
||||
let tasks = &mut self.creator.data.tasks;
|
||||
for (task, table_id) in tasks.iter().zip(already_exists_tables_ids.iter()) {
|
||||
if table_id.is_some() {
|
||||
// If a table already exists, we just ignore it.
|
||||
ensure!(
|
||||
@@ -124,17 +118,34 @@ impl CreateLogicalTablesProcedure {
|
||||
);
|
||||
continue;
|
||||
}
|
||||
filtered_tasks.push(task.clone());
|
||||
}
|
||||
|
||||
// Resets tasks
|
||||
self.creator.data.tasks = filtered_tasks;
|
||||
if self.creator.data.tasks.is_empty() {
|
||||
// If all tables already exist, we can skip the `DatanodeCreateRegions` stage.
|
||||
self.creator.data.state = CreateTablesState::CreateMetadata;
|
||||
return Ok(Status::executing(true));
|
||||
// If all tables already exist, returns the table_ids.
|
||||
if already_exists_tables_ids.iter().all(Option::is_some) {
|
||||
return Ok(Status::done_with_output(
|
||||
already_exists_tables_ids
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.collect::<Vec<_>>(),
|
||||
));
|
||||
}
|
||||
|
||||
// Allocates table ids
|
||||
for (task, table_id) in tasks.iter_mut().zip(already_exists_tables_ids.iter()) {
|
||||
let table_id = if let Some(table_id) = table_id {
|
||||
*table_id
|
||||
} else {
|
||||
self.context
|
||||
.table_metadata_allocator
|
||||
.allocate_table_id(task)
|
||||
.await?
|
||||
};
|
||||
task.set_table_id(table_id);
|
||||
}
|
||||
|
||||
self.creator
|
||||
.data
|
||||
.set_table_ids_already_exists(already_exists_tables_ids);
|
||||
self.creator.data.state = CreateTablesState::DatanodeCreateRegions;
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
@@ -152,17 +163,20 @@ impl CreateLogicalTablesProcedure {
|
||||
self.create_regions(region_routes).await
|
||||
}
|
||||
|
||||
/// Creates table metadata
|
||||
///
|
||||
/// Abort(not-retry):
|
||||
/// - Failed to create table metadata.
|
||||
pub async fn on_create_metadata(&self) -> Result<Status> {
|
||||
let manager = &self.context.table_metadata_manager;
|
||||
|
||||
let physical_table_id = self.creator.data.physical_table_id();
|
||||
let tables_data = self.creator.data.all_tables_data();
|
||||
let num_tables = tables_data.len();
|
||||
let remaining_tasks = self.creator.data.remaining_tasks();
|
||||
let num_tables = remaining_tasks.len();
|
||||
|
||||
if num_tables > 0 {
|
||||
let chunk_size = manager.max_logical_tables_per_batch();
|
||||
if num_tables > chunk_size {
|
||||
let chunks = tables_data
|
||||
let chunks = remaining_tasks
|
||||
.into_iter()
|
||||
.chunks(chunk_size)
|
||||
.into_iter()
|
||||
@@ -172,11 +186,21 @@ impl CreateLogicalTablesProcedure {
|
||||
manager.create_logical_tables_metadata(chunk).await?;
|
||||
}
|
||||
} else {
|
||||
manager.create_logical_tables_metadata(tables_data).await?;
|
||||
manager
|
||||
.create_logical_tables_metadata(remaining_tasks)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
let table_ids = self.creator.data.real_table_ids();
|
||||
// The `table_id` MUST be collected after the [Prepare::Prepare],
|
||||
// ensures the all `table_id`s have been allocated.
|
||||
let table_ids = self
|
||||
.creator
|
||||
.data
|
||||
.tasks
|
||||
.iter()
|
||||
.map(|task| task.table_info.ident.table_id)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
info!("Created {num_tables} tables {table_ids:?} metadata for physical table {physical_table_id}");
|
||||
|
||||
@@ -238,10 +262,10 @@ impl CreateLogicalTablesProcedure {
|
||||
body: Some(PbRegionRequest::Creates(creates)),
|
||||
};
|
||||
create_region_tasks.push(async move {
|
||||
if let Err(err) = requester.handle(request).await {
|
||||
return Err(handle_operate_region_error(datanode)(err));
|
||||
}
|
||||
Ok(())
|
||||
requester
|
||||
.handle(request)
|
||||
.await
|
||||
.map_err(add_peer_context_if_needed(datanode))
|
||||
});
|
||||
}
|
||||
|
||||
@@ -310,17 +334,13 @@ impl TablesCreator {
|
||||
tasks: Vec<CreateTableTask>,
|
||||
physical_table_id: TableId,
|
||||
) -> Self {
|
||||
let table_ids_from_tasks = tasks
|
||||
.iter()
|
||||
.map(|task| task.table_info.ident.table_id)
|
||||
.collect::<Vec<_>>();
|
||||
let len = table_ids_from_tasks.len();
|
||||
let len = tasks.len();
|
||||
|
||||
Self {
|
||||
data: CreateTablesData {
|
||||
cluster_id,
|
||||
state: CreateTablesState::Prepare,
|
||||
tasks,
|
||||
table_ids_from_tasks,
|
||||
table_ids_already_exists: vec![None; len],
|
||||
physical_table_id,
|
||||
physical_region_numbers: vec![],
|
||||
@@ -334,10 +354,6 @@ pub struct CreateTablesData {
|
||||
cluster_id: ClusterId,
|
||||
state: CreateTablesState,
|
||||
tasks: Vec<CreateTableTask>,
|
||||
table_ids_from_tasks: Vec<TableId>,
|
||||
// Because the table_id is allocated before entering the distributed lock,
|
||||
// it needs to recheck if the table exists when creating a table.
|
||||
// If it does exist, then the table_id needs to be replaced with the existing one.
|
||||
table_ids_already_exists: Vec<Option<TableId>>,
|
||||
physical_table_id: TableId,
|
||||
physical_region_numbers: Vec<RegionNumber>,
|
||||
@@ -360,24 +376,6 @@ impl CreateTablesData {
|
||||
self.table_ids_already_exists = table_ids_already_exists;
|
||||
}
|
||||
|
||||
fn table_ids_already_exists(&self) -> &[Option<TableId>] {
|
||||
&self.table_ids_already_exists
|
||||
}
|
||||
|
||||
fn is_all_tables_not_exists(&self) -> bool {
|
||||
self.table_ids_already_exists.iter().all(Option::is_none)
|
||||
}
|
||||
|
||||
pub fn real_table_ids(&self) -> Vec<TableId> {
|
||||
self.table_ids_from_tasks
|
||||
.iter()
|
||||
.zip(self.table_ids_already_exists.iter())
|
||||
.map(|(table_id_from_task, table_id_already_exists)| {
|
||||
table_id_already_exists.unwrap_or(*table_id_from_task)
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
fn all_create_table_exprs(&self) -> Vec<&CreateTableExpr> {
|
||||
self.tasks
|
||||
.iter()
|
||||
@@ -385,18 +383,27 @@ impl CreateTablesData {
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
fn all_tables_data(&self) -> Vec<(RawTableInfo, TableRouteValue)> {
|
||||
/// Returns the remaining tasks.
|
||||
/// The length of tasks must be greater than 0.
|
||||
fn remaining_tasks(&self) -> Vec<(RawTableInfo, TableRouteValue)> {
|
||||
self.tasks
|
||||
.iter()
|
||||
.map(|task| {
|
||||
let table_info = task.table_info.clone();
|
||||
let region_ids = self
|
||||
.physical_region_numbers
|
||||
.iter()
|
||||
.map(|region_number| RegionId::new(table_info.ident.table_id, *region_number))
|
||||
.collect();
|
||||
let table_route = TableRouteValue::logical(self.physical_table_id, region_ids);
|
||||
(table_info, table_route)
|
||||
.zip(self.table_ids_already_exists.iter())
|
||||
.flat_map(|(task, table_id)| {
|
||||
if table_id.is_none() {
|
||||
let table_info = task.table_info.clone();
|
||||
let region_ids = self
|
||||
.physical_region_numbers
|
||||
.iter()
|
||||
.map(|region_number| {
|
||||
RegionId::new(table_info.ident.table_id, *region_number)
|
||||
})
|
||||
.collect();
|
||||
let table_route = TableRouteValue::logical(self.physical_table_id, region_ids);
|
||||
Some((table_info, table_route))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
@@ -33,8 +33,8 @@ use table::metadata::{RawTableInfo, TableId};
|
||||
use table::table_reference::TableReference;
|
||||
|
||||
use crate::ddl::create_table_template::{build_template, CreateRequestBuilder};
|
||||
use crate::ddl::utils::{handle_operate_region_error, handle_retry_error, region_storage_path};
|
||||
use crate::ddl::DdlContext;
|
||||
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, region_storage_path};
|
||||
use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext};
|
||||
use crate::error::{self, Result, TableRouteNotFoundSnafu};
|
||||
use crate::key::table_name::TableNameKey;
|
||||
use crate::key::table_route::TableRouteValue;
|
||||
@@ -45,7 +45,6 @@ use crate::rpc::router::{
|
||||
find_leader_regions, find_leaders, operating_leader_regions, RegionRoute,
|
||||
};
|
||||
use crate::{metrics, ClusterId};
|
||||
|
||||
pub struct CreateTableProcedure {
|
||||
pub context: DdlContext,
|
||||
pub creator: TableCreator,
|
||||
@@ -54,16 +53,10 @@ pub struct CreateTableProcedure {
|
||||
impl CreateTableProcedure {
|
||||
pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable";
|
||||
|
||||
pub fn new(
|
||||
cluster_id: ClusterId,
|
||||
task: CreateTableTask,
|
||||
table_route: TableRouteValue,
|
||||
region_wal_options: HashMap<RegionNumber, String>,
|
||||
context: DdlContext,
|
||||
) -> Self {
|
||||
pub fn new(cluster_id: ClusterId, task: CreateTableTask, context: DdlContext) -> Self {
|
||||
Self {
|
||||
context,
|
||||
creator: TableCreator::new(cluster_id, task, table_route, region_wal_options),
|
||||
creator: TableCreator::new(cluster_id, task),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -75,7 +68,8 @@ impl CreateTableProcedure {
|
||||
opening_regions: vec![],
|
||||
};
|
||||
|
||||
if let TableRouteValue::Physical(x) = &creator.data.table_route {
|
||||
// Only registers regions if the table route is allocated.
|
||||
if let Some(TableRouteValue::Physical(x)) = &creator.data.table_route {
|
||||
creator.opening_regions = creator
|
||||
.register_opening_regions(&context, &x.region_routes)
|
||||
.map_err(BoxedError::new)
|
||||
@@ -85,20 +79,53 @@ impl CreateTableProcedure {
|
||||
Ok(CreateTableProcedure { context, creator })
|
||||
}
|
||||
|
||||
pub fn table_info(&self) -> &RawTableInfo {
|
||||
fn table_info(&self) -> &RawTableInfo {
|
||||
&self.creator.data.task.table_info
|
||||
}
|
||||
|
||||
fn table_id(&self) -> TableId {
|
||||
pub(crate) fn table_id(&self) -> TableId {
|
||||
self.table_info().ident.table_id
|
||||
}
|
||||
|
||||
pub fn region_wal_options(&self) -> &HashMap<RegionNumber, String> {
|
||||
&self.creator.data.region_wal_options
|
||||
fn region_wal_options(&self) -> Result<&HashMap<RegionNumber, String>> {
|
||||
self.creator
|
||||
.data
|
||||
.region_wal_options
|
||||
.as_ref()
|
||||
.context(error::UnexpectedSnafu {
|
||||
err_msg: "region_wal_options is not allocated",
|
||||
})
|
||||
}
|
||||
|
||||
/// Checks whether the table exists.
|
||||
async fn on_prepare(&mut self) -> Result<Status> {
|
||||
fn table_route(&self) -> Result<&TableRouteValue> {
|
||||
self.creator
|
||||
.data
|
||||
.table_route
|
||||
.as_ref()
|
||||
.context(error::UnexpectedSnafu {
|
||||
err_msg: "table_route is not allocated",
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub fn set_allocated_metadata(
|
||||
&mut self,
|
||||
table_id: TableId,
|
||||
table_route: TableRouteValue,
|
||||
region_wal_options: HashMap<RegionNumber, String>,
|
||||
) {
|
||||
self.creator
|
||||
.set_allocated_metadata(table_id, table_route, region_wal_options)
|
||||
}
|
||||
|
||||
/// On the prepare step, it performs:
|
||||
/// - Checks whether the table exists.
|
||||
/// - Allocates the table id.
|
||||
///
|
||||
/// Abort(non-retry):
|
||||
/// - TableName exists and `create_if_not_exists` is false.
|
||||
/// - Failed to allocate [TableMetadata].
|
||||
pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
|
||||
let expr = &self.creator.data.task.create_table;
|
||||
let table_name_value = self
|
||||
.context
|
||||
@@ -124,6 +151,22 @@ impl CreateTableProcedure {
|
||||
}
|
||||
|
||||
self.creator.data.state = CreateTableState::DatanodeCreateRegions;
|
||||
let TableMetadata {
|
||||
table_id,
|
||||
table_route,
|
||||
region_wal_options,
|
||||
} = self
|
||||
.context
|
||||
.table_metadata_allocator
|
||||
.create(
|
||||
&TableMetadataAllocatorContext {
|
||||
cluster_id: self.creator.data.cluster_id,
|
||||
},
|
||||
&self.creator.data.task,
|
||||
)
|
||||
.await?;
|
||||
self.creator
|
||||
.set_allocated_metadata(table_id, table_route, region_wal_options);
|
||||
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
@@ -137,8 +180,20 @@ impl CreateTableProcedure {
|
||||
Ok(CreateRequestBuilder::new(template, physical_table_id))
|
||||
}
|
||||
|
||||
/// Creates regions on datanodes
|
||||
///
|
||||
/// Abort(non-retry):
|
||||
/// - Failed to create [CreateRequestBuilder].
|
||||
/// - Failed to get the table route of physical table (for logical table).
|
||||
///
|
||||
/// Retry:
|
||||
/// - If the underlying servers returns one of the following [Code](tonic::status::Code):
|
||||
/// - [Code::Cancelled](tonic::status::Code::Cancelled)
|
||||
/// - [Code::DeadlineExceeded](tonic::status::Code::DeadlineExceeded)
|
||||
/// - [Code::Unavailable](tonic::status::Code::Unavailable)
|
||||
pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
|
||||
match &self.creator.data.table_route {
|
||||
// Safety: the table route must be allocated.
|
||||
match self.table_route()?.clone() {
|
||||
TableRouteValue::Physical(x) => {
|
||||
let region_routes = x.region_routes.clone();
|
||||
let request_builder = self.new_region_request_builder(None)?;
|
||||
@@ -151,12 +206,12 @@ impl CreateTableProcedure {
|
||||
.context
|
||||
.table_metadata_manager
|
||||
.table_route_manager()
|
||||
.get(physical_table_id)
|
||||
.try_get_physical_table_route(physical_table_id)
|
||||
.await?
|
||||
.context(TableRouteNotFoundSnafu {
|
||||
table_id: physical_table_id,
|
||||
})?;
|
||||
let region_routes = physical_table_route.region_routes()?;
|
||||
let region_routes = &physical_table_route.region_routes;
|
||||
|
||||
let request_builder = self.new_region_request_builder(Some(physical_table_id))?;
|
||||
|
||||
@@ -170,7 +225,8 @@ impl CreateTableProcedure {
|
||||
region_routes: &[RegionRoute],
|
||||
request_builder: CreateRequestBuilder,
|
||||
) -> Result<Status> {
|
||||
if self.creator.data.table_route.is_physical() {
|
||||
// Safety: the table_route must be allocated.
|
||||
if self.table_route()?.is_physical() {
|
||||
// Registers opening regions
|
||||
let guards = self
|
||||
.creator
|
||||
@@ -181,13 +237,12 @@ impl CreateTableProcedure {
|
||||
}
|
||||
|
||||
let create_table_data = &self.creator.data;
|
||||
let region_wal_options = &create_table_data.region_wal_options;
|
||||
|
||||
// Safety: the region_wal_options must be allocated
|
||||
let region_wal_options = self.region_wal_options()?;
|
||||
let create_table_expr = &create_table_data.task.create_table;
|
||||
let catalog = &create_table_expr.catalog_name;
|
||||
let schema = &create_table_expr.schema_name;
|
||||
let storage_path = region_storage_path(catalog, schema);
|
||||
|
||||
let leaders = find_leaders(region_routes);
|
||||
let mut create_region_tasks = Vec::with_capacity(leaders.len());
|
||||
|
||||
@@ -203,7 +258,6 @@ impl CreateTableProcedure {
|
||||
storage_path.clone(),
|
||||
region_wal_options,
|
||||
)?;
|
||||
|
||||
requests.push(PbRegionRequest::Create(create_region_request));
|
||||
}
|
||||
|
||||
@@ -218,12 +272,11 @@ impl CreateTableProcedure {
|
||||
|
||||
let datanode = datanode.clone();
|
||||
let requester = requester.clone();
|
||||
|
||||
create_region_tasks.push(async move {
|
||||
if let Err(err) = requester.handle(request).await {
|
||||
return Err(handle_operate_region_error(datanode)(err));
|
||||
}
|
||||
Ok(())
|
||||
requester
|
||||
.handle(request)
|
||||
.await
|
||||
.map_err(add_peer_context_if_needed(datanode))
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -240,18 +293,21 @@ impl CreateTableProcedure {
|
||||
Ok(Status::executing(false))
|
||||
}
|
||||
|
||||
/// Creates table metadata
|
||||
///
|
||||
/// Abort(not-retry):
|
||||
/// - Failed to create table metadata.
|
||||
async fn on_create_metadata(&self) -> Result<Status> {
|
||||
let table_id = self.table_id();
|
||||
let manager = &self.context.table_metadata_manager;
|
||||
|
||||
let raw_table_info = self.table_info().clone();
|
||||
let region_wal_options = self.region_wal_options().clone();
|
||||
// Safety: the region_wal_options must be allocated.
|
||||
let region_wal_options = self.region_wal_options()?.clone();
|
||||
// Safety: the table_route must be allocated.
|
||||
let table_route = self.table_route()?.clone();
|
||||
manager
|
||||
.create_table_metadata(
|
||||
raw_table_info,
|
||||
self.creator.data.table_route.clone(),
|
||||
region_wal_options,
|
||||
)
|
||||
.create_table_metadata(raw_table_info, table_route, region_wal_options)
|
||||
.await?;
|
||||
info!("Created table metadata for table {table_id}");
|
||||
|
||||
@@ -303,19 +359,14 @@ pub struct TableCreator {
|
||||
}
|
||||
|
||||
impl TableCreator {
|
||||
pub fn new(
|
||||
cluster_id: u64,
|
||||
task: CreateTableTask,
|
||||
table_route: TableRouteValue,
|
||||
region_wal_options: HashMap<RegionNumber, String>,
|
||||
) -> Self {
|
||||
pub fn new(cluster_id: u64, task: CreateTableTask) -> Self {
|
||||
Self {
|
||||
data: CreateTableData {
|
||||
state: CreateTableState::Prepare,
|
||||
cluster_id,
|
||||
task,
|
||||
table_route,
|
||||
region_wal_options,
|
||||
table_route: None,
|
||||
region_wal_options: None,
|
||||
},
|
||||
opening_regions: vec![],
|
||||
}
|
||||
@@ -347,6 +398,17 @@ impl TableCreator {
|
||||
}
|
||||
Ok(opening_region_guards)
|
||||
}
|
||||
|
||||
fn set_allocated_metadata(
|
||||
&mut self,
|
||||
table_id: TableId,
|
||||
table_route: TableRouteValue,
|
||||
region_wal_options: HashMap<RegionNumber, String>,
|
||||
) {
|
||||
self.data.task.table_info.ident.table_id = table_id;
|
||||
self.data.table_route = Some(table_route);
|
||||
self.data.region_wal_options = Some(region_wal_options);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)]
|
||||
@@ -363,8 +425,10 @@ pub enum CreateTableState {
|
||||
pub struct CreateTableData {
|
||||
pub state: CreateTableState,
|
||||
pub task: CreateTableTask,
|
||||
table_route: TableRouteValue,
|
||||
pub region_wal_options: HashMap<RegionNumber, String>,
|
||||
/// None stands for not allocated yet.
|
||||
table_route: Option<TableRouteValue>,
|
||||
/// None stands for not allocated yet.
|
||||
pub region_wal_options: Option<HashMap<RegionNumber, String>>,
|
||||
pub cluster_id: ClusterId,
|
||||
}
|
||||
|
||||
|
||||
@@ -34,7 +34,7 @@ use table::table_reference::TableReference;
|
||||
|
||||
use super::utils::handle_retry_error;
|
||||
use crate::cache_invalidator::Context;
|
||||
use crate::ddl::utils::handle_operate_region_error;
|
||||
use crate::ddl::utils::add_peer_context_if_needed;
|
||||
use crate::ddl::DdlContext;
|
||||
use crate::error::{self, Result};
|
||||
use crate::key::table_info::TableInfoValue;
|
||||
@@ -223,7 +223,7 @@ impl DropTableProcedure {
|
||||
drop_region_tasks.push(async move {
|
||||
if let Err(err) = requester.handle(request).await {
|
||||
if err.status_code() != StatusCode::RegionNotFound {
|
||||
return Err(handle_operate_region_error(datanode)(err));
|
||||
return Err(add_peer_context_if_needed(datanode)(err));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -23,21 +23,22 @@ use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
|
||||
use store_api::storage::{RegionId, RegionNumber, TableId};
|
||||
|
||||
use crate::ddl::{TableMetadata, TableMetadataAllocatorContext};
|
||||
use crate::error::{Result, TableNotFoundSnafu, UnsupportedSnafu};
|
||||
use crate::key::table_name::TableNameKey;
|
||||
use crate::error::{self, Result, TableNotFoundSnafu, UnsupportedSnafu};
|
||||
use crate::key::table_name::{TableNameKey, TableNameManager};
|
||||
use crate::key::table_route::{LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue};
|
||||
use crate::key::TableMetadataManagerRef;
|
||||
use crate::peer::Peer;
|
||||
use crate::rpc::ddl::CreateTableTask;
|
||||
use crate::rpc::router::{Region, RegionRoute};
|
||||
use crate::sequence::SequenceRef;
|
||||
use crate::wal_options_allocator::{allocate_region_wal_options, WalOptionsAllocatorRef};
|
||||
|
||||
pub type TableMetadataAllocatorRef = Arc<TableMetadataAllocator>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct TableMetadataAllocator {
|
||||
table_id_sequence: SequenceRef,
|
||||
wal_options_allocator: WalOptionsAllocatorRef,
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
table_name_manager: TableNameManager,
|
||||
peer_allocator: PeerAllocatorRef,
|
||||
}
|
||||
|
||||
@@ -45,12 +46,12 @@ impl TableMetadataAllocator {
|
||||
pub fn new(
|
||||
table_id_sequence: SequenceRef,
|
||||
wal_options_allocator: WalOptionsAllocatorRef,
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
table_name_manager: TableNameManager,
|
||||
) -> Self {
|
||||
Self::with_peer_allocator(
|
||||
table_id_sequence,
|
||||
wal_options_allocator,
|
||||
table_metadata_manager,
|
||||
table_name_manager,
|
||||
Arc::new(NoopPeerAllocator),
|
||||
)
|
||||
}
|
||||
@@ -58,18 +59,18 @@ impl TableMetadataAllocator {
|
||||
pub fn with_peer_allocator(
|
||||
table_id_sequence: SequenceRef,
|
||||
wal_options_allocator: WalOptionsAllocatorRef,
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
table_name_manager: TableNameManager,
|
||||
peer_allocator: PeerAllocatorRef,
|
||||
) -> Self {
|
||||
Self {
|
||||
table_id_sequence,
|
||||
wal_options_allocator,
|
||||
table_metadata_manager,
|
||||
table_name_manager,
|
||||
peer_allocator,
|
||||
}
|
||||
}
|
||||
|
||||
async fn allocate_table_id(&self, task: &CreateTableTask) -> Result<TableId> {
|
||||
pub(crate) async fn allocate_table_id(&self, task: &CreateTableTask) -> Result<TableId> {
|
||||
let table_id = if let Some(table_id) = &task.create_table.table_id {
|
||||
let table_id = table_id.id;
|
||||
|
||||
@@ -123,6 +124,12 @@ impl TableMetadataAllocator {
|
||||
task: &CreateTableTask,
|
||||
) -> Result<TableRouteValue> {
|
||||
let regions = task.partitions.len();
|
||||
ensure!(
|
||||
regions > 0,
|
||||
error::UnexpectedSnafu {
|
||||
err_msg: "The number of partitions must be greater than 0"
|
||||
}
|
||||
);
|
||||
|
||||
let table_route = if task.create_table.engine == METRIC_ENGINE
|
||||
&& let Some(physical_table_name) = task
|
||||
@@ -131,8 +138,7 @@ impl TableMetadataAllocator {
|
||||
.get(LOGICAL_TABLE_METADATA_KEY)
|
||||
{
|
||||
let physical_table_id = self
|
||||
.table_metadata_manager
|
||||
.table_name_manager()
|
||||
.table_name_manager
|
||||
.get(TableNameKey::new(
|
||||
&task.create_table.catalog_name,
|
||||
&task.create_table.schema_name,
|
||||
|
||||
19
src/common/meta/src/ddl/test_util.rs
Normal file
19
src/common/meta/src/ddl/test_util.rs
Normal file
@@ -0,0 +1,19 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub mod create_table;
|
||||
|
||||
pub use create_table::{
|
||||
TestColumnDef, TestColumnDefBuilder, TestCreateTableExpr, TestCreateTableExprBuilder,
|
||||
};
|
||||
165
src/common/meta/src/ddl/test_util/create_table.rs
Normal file
165
src/common/meta/src/ddl/test_util/create_table.rs
Normal file
@@ -0,0 +1,165 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use api::v1::column_def::try_as_column_schema;
|
||||
use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, SemanticType};
|
||||
use chrono::DateTime;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO2_ENGINE};
|
||||
use datatypes::schema::RawSchema;
|
||||
use derive_builder::Builder;
|
||||
use store_api::storage::TableId;
|
||||
use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
|
||||
use table::requests::TableOptions;
|
||||
|
||||
#[derive(Default, Builder)]
|
||||
pub struct TestColumnDef {
|
||||
#[builder(setter(into), default)]
|
||||
name: String,
|
||||
data_type: ColumnDataType,
|
||||
#[builder(default)]
|
||||
is_nullable: bool,
|
||||
semantic_type: SemanticType,
|
||||
#[builder(setter(into), default)]
|
||||
comment: String,
|
||||
}
|
||||
|
||||
impl From<TestColumnDef> for ColumnDef {
|
||||
fn from(
|
||||
TestColumnDef {
|
||||
name,
|
||||
data_type,
|
||||
is_nullable,
|
||||
semantic_type,
|
||||
comment,
|
||||
}: TestColumnDef,
|
||||
) -> Self {
|
||||
Self {
|
||||
name,
|
||||
data_type: data_type as i32,
|
||||
is_nullable,
|
||||
default_constraint: vec![],
|
||||
semantic_type: semantic_type as i32,
|
||||
comment,
|
||||
datatype_extension: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Builder)]
|
||||
#[builder(default)]
|
||||
pub struct TestCreateTableExpr {
|
||||
#[builder(setter(into), default = "DEFAULT_CATALOG_NAME.to_string()")]
|
||||
catalog_name: String,
|
||||
#[builder(setter(into), default = "DEFAULT_SCHEMA_NAME.to_string()")]
|
||||
schema_name: String,
|
||||
#[builder(setter(into))]
|
||||
table_name: String,
|
||||
#[builder(setter(into))]
|
||||
desc: String,
|
||||
#[builder(setter(into))]
|
||||
column_defs: Vec<ColumnDef>,
|
||||
#[builder(setter(into))]
|
||||
time_index: String,
|
||||
#[builder(setter(into))]
|
||||
primary_keys: Vec<String>,
|
||||
create_if_not_exists: bool,
|
||||
table_options: HashMap<String, String>,
|
||||
table_id: Option<TableId>,
|
||||
#[builder(setter(into), default = "MITO2_ENGINE.to_string()")]
|
||||
engine: String,
|
||||
}
|
||||
|
||||
impl From<TestCreateTableExpr> for CreateTableExpr {
|
||||
fn from(
|
||||
TestCreateTableExpr {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name,
|
||||
desc,
|
||||
column_defs,
|
||||
time_index,
|
||||
primary_keys,
|
||||
create_if_not_exists,
|
||||
table_options,
|
||||
table_id,
|
||||
engine,
|
||||
}: TestCreateTableExpr,
|
||||
) -> Self {
|
||||
Self {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name,
|
||||
desc,
|
||||
column_defs,
|
||||
time_index,
|
||||
primary_keys,
|
||||
create_if_not_exists,
|
||||
table_options,
|
||||
table_id: table_id.map(|id| api::v1::TableId { id }),
|
||||
engine,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds [RawTableInfo] from [CreateTableExpr].
|
||||
pub fn build_raw_table_info_from_expr(expr: &CreateTableExpr) -> RawTableInfo {
|
||||
RawTableInfo {
|
||||
ident: TableIdent {
|
||||
table_id: expr
|
||||
.table_id
|
||||
.as_ref()
|
||||
.map(|table_id| table_id.id)
|
||||
.unwrap_or(0),
|
||||
version: 1,
|
||||
},
|
||||
name: expr.table_name.to_string(),
|
||||
desc: Some(expr.desc.to_string()),
|
||||
catalog_name: expr.catalog_name.to_string(),
|
||||
schema_name: expr.schema_name.to_string(),
|
||||
meta: RawTableMeta {
|
||||
schema: RawSchema {
|
||||
column_schemas: expr
|
||||
.column_defs
|
||||
.iter()
|
||||
.map(|column| try_as_column_schema(column).unwrap())
|
||||
.collect(),
|
||||
timestamp_index: expr
|
||||
.column_defs
|
||||
.iter()
|
||||
.position(|column| column.semantic_type() == SemanticType::Timestamp),
|
||||
version: 0,
|
||||
},
|
||||
primary_key_indices: expr
|
||||
.primary_keys
|
||||
.iter()
|
||||
.map(|key| {
|
||||
expr.column_defs
|
||||
.iter()
|
||||
.position(|column| &column.name == key)
|
||||
.unwrap()
|
||||
})
|
||||
.collect(),
|
||||
value_indices: vec![],
|
||||
engine: expr.engine.to_string(),
|
||||
next_column_id: expr.column_defs.len() as u32,
|
||||
region_numbers: vec![],
|
||||
options: TableOptions::default(),
|
||||
created_on: DateTime::default(),
|
||||
partition_key_indices: vec![],
|
||||
},
|
||||
table_type: TableType::Base,
|
||||
}
|
||||
}
|
||||
16
src/common/meta/src/ddl/tests.rs
Normal file
16
src/common/meta/src/ddl/tests.rs
Normal file
@@ -0,0 +1,16 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod create_logical_tables;
|
||||
mod create_table;
|
||||
518
src/common/meta/src/ddl/tests/create_logical_tables.rs
Normal file
518
src/common/meta/src/ddl/tests/create_logical_tables.rs
Normal file
@@ -0,0 +1,518 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::meta::Partition;
|
||||
use api::v1::region::{QueryRequest, RegionRequest};
|
||||
use api::v1::{ColumnDataType, SemanticType};
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId, Status};
|
||||
use common_procedure_test::MockContextProvider;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
use common_telemetry::debug;
|
||||
use store_api::storage::RegionId;
|
||||
use table::metadata::RawTableInfo;
|
||||
|
||||
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
|
||||
use crate::ddl::test_util::create_table::build_raw_table_info_from_expr;
|
||||
use crate::ddl::test_util::{TestColumnDefBuilder, TestCreateTableExprBuilder};
|
||||
use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext};
|
||||
use crate::error::{Error, Result};
|
||||
use crate::key::table_route::TableRouteValue;
|
||||
use crate::peer::Peer;
|
||||
use crate::rpc::ddl::CreateTableTask;
|
||||
use crate::test_util::{new_ddl_context, AffectedRows, MockDatanodeHandler, MockDatanodeManager};
|
||||
|
||||
// Note: this code may be duplicated with others.
|
||||
// However, it's by design, ensures the tests are easy to be modified or added.
|
||||
fn test_create_logical_table_task(name: &str) -> CreateTableTask {
|
||||
let create_table = TestCreateTableExprBuilder::default()
|
||||
.column_defs([
|
||||
TestColumnDefBuilder::default()
|
||||
.name("ts")
|
||||
.data_type(ColumnDataType::TimestampMillisecond)
|
||||
.semantic_type(SemanticType::Timestamp)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into(),
|
||||
TestColumnDefBuilder::default()
|
||||
.name("host")
|
||||
.data_type(ColumnDataType::String)
|
||||
.semantic_type(SemanticType::Tag)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into(),
|
||||
TestColumnDefBuilder::default()
|
||||
.name("cpu")
|
||||
.data_type(ColumnDataType::Float64)
|
||||
.semantic_type(SemanticType::Field)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into(),
|
||||
])
|
||||
.time_index("ts")
|
||||
.primary_keys(["host".into()])
|
||||
.table_name(name)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into();
|
||||
let table_info = build_raw_table_info_from_expr(&create_table);
|
||||
CreateTableTask {
|
||||
create_table,
|
||||
// Single region
|
||||
partitions: vec![Partition {
|
||||
column_list: vec![],
|
||||
value_list: vec![],
|
||||
}],
|
||||
table_info,
|
||||
}
|
||||
}
|
||||
|
||||
// Note: this code may be duplicated with others.
|
||||
// However, it's by design, ensures the tests are easy to be modified or added.
|
||||
fn test_create_physical_table_task(name: &str) -> CreateTableTask {
|
||||
let create_table = TestCreateTableExprBuilder::default()
|
||||
.column_defs([
|
||||
TestColumnDefBuilder::default()
|
||||
.name("ts")
|
||||
.data_type(ColumnDataType::TimestampMillisecond)
|
||||
.semantic_type(SemanticType::Timestamp)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into(),
|
||||
TestColumnDefBuilder::default()
|
||||
.name("value")
|
||||
.data_type(ColumnDataType::Float64)
|
||||
.semantic_type(SemanticType::Field)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into(),
|
||||
])
|
||||
.time_index("ts")
|
||||
.primary_keys(["value".into()])
|
||||
.table_name(name)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into();
|
||||
let table_info = build_raw_table_info_from_expr(&create_table);
|
||||
CreateTableTask {
|
||||
create_table,
|
||||
// Single region
|
||||
partitions: vec![Partition {
|
||||
column_list: vec![],
|
||||
value_list: vec![],
|
||||
}],
|
||||
table_info,
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_prepare_physical_table_not_found() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
let tasks = vec![test_create_logical_table_task("foo")];
|
||||
let physical_table_id = 1024u32;
|
||||
let mut procedure =
|
||||
CreateLogicalTablesProcedure::new(cluster_id, tasks, physical_table_id, ddl_context);
|
||||
let err = procedure.on_prepare().await.unwrap_err();
|
||||
assert_matches!(err, Error::TableRouteNotFound { .. });
|
||||
}
|
||||
|
||||
async fn create_physical_table_metadata(
|
||||
ddl_context: &DdlContext,
|
||||
table_info: RawTableInfo,
|
||||
table_route: TableRouteValue,
|
||||
) {
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.create_table_metadata(table_info, table_route, HashMap::default())
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_prepare() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
// Prepares physical table metadata.
|
||||
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
|
||||
let TableMetadata {
|
||||
table_id,
|
||||
table_route,
|
||||
..
|
||||
} = ddl_context
|
||||
.table_metadata_allocator
|
||||
.create(
|
||||
&TableMetadataAllocatorContext { cluster_id },
|
||||
&create_physical_table_task,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
create_physical_table_task.set_table_id(table_id);
|
||||
create_physical_table_metadata(
|
||||
&ddl_context,
|
||||
create_physical_table_task.table_info.clone(),
|
||||
table_route,
|
||||
)
|
||||
.await;
|
||||
// The create logical table procedure.
|
||||
let tasks = vec![test_create_logical_table_task("foo")];
|
||||
let physical_table_id = table_id;
|
||||
let mut procedure =
|
||||
CreateLogicalTablesProcedure::new(cluster_id, tasks, physical_table_id, ddl_context);
|
||||
let status = procedure.on_prepare().await.unwrap();
|
||||
assert_matches!(status, Status::Executing { persist: true });
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_prepare_logical_table_exists_err() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
// Prepares physical table metadata.
|
||||
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
|
||||
let TableMetadata {
|
||||
table_id,
|
||||
table_route,
|
||||
..
|
||||
} = ddl_context
|
||||
.table_metadata_allocator
|
||||
.create(
|
||||
&TableMetadataAllocatorContext { cluster_id },
|
||||
&create_physical_table_task,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
create_physical_table_task.set_table_id(table_id);
|
||||
create_physical_table_metadata(
|
||||
&ddl_context,
|
||||
create_physical_table_task.table_info.clone(),
|
||||
table_route,
|
||||
)
|
||||
.await;
|
||||
// Creates the logical table metadata.
|
||||
let mut task = test_create_logical_table_task("foo");
|
||||
task.set_table_id(1025);
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.create_logical_tables_metadata(vec![(
|
||||
task.table_info.clone(),
|
||||
TableRouteValue::logical(1024, vec![RegionId::new(1025, 1)]),
|
||||
)])
|
||||
.await
|
||||
.unwrap();
|
||||
// The create logical table procedure.
|
||||
let physical_table_id = table_id;
|
||||
let mut procedure =
|
||||
CreateLogicalTablesProcedure::new(cluster_id, vec![task], physical_table_id, ddl_context);
|
||||
let err = procedure.on_prepare().await.unwrap_err();
|
||||
assert_matches!(err, Error::TableAlreadyExists { .. });
|
||||
assert_eq!(err.status_code(), StatusCode::TableAlreadyExists);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_prepare_with_create_if_table_exists() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
// Prepares physical table metadata.
|
||||
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
|
||||
let TableMetadata {
|
||||
table_id,
|
||||
table_route,
|
||||
..
|
||||
} = ddl_context
|
||||
.table_metadata_allocator
|
||||
.create(
|
||||
&TableMetadataAllocatorContext { cluster_id },
|
||||
&create_physical_table_task,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
create_physical_table_task.set_table_id(table_id);
|
||||
create_physical_table_metadata(
|
||||
&ddl_context,
|
||||
create_physical_table_task.table_info.clone(),
|
||||
table_route,
|
||||
)
|
||||
.await;
|
||||
// Creates the logical table metadata.
|
||||
let mut task = test_create_logical_table_task("foo");
|
||||
task.set_table_id(8192);
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.create_logical_tables_metadata(vec![(
|
||||
task.table_info.clone(),
|
||||
TableRouteValue::logical(1024, vec![RegionId::new(8192, 1)]),
|
||||
)])
|
||||
.await
|
||||
.unwrap();
|
||||
// The create logical table procedure.
|
||||
let physical_table_id = table_id;
|
||||
// Sets `create_if_not_exists`
|
||||
task.create_table.create_if_not_exists = true;
|
||||
let mut procedure =
|
||||
CreateLogicalTablesProcedure::new(cluster_id, vec![task], physical_table_id, ddl_context);
|
||||
let status = procedure.on_prepare().await.unwrap();
|
||||
let output = status.downcast_output_ref::<Vec<u32>>().unwrap();
|
||||
assert_eq!(*output, vec![8192]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_prepare_part_logical_tables_exist() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
// Prepares physical table metadata.
|
||||
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
|
||||
let TableMetadata {
|
||||
table_id,
|
||||
table_route,
|
||||
..
|
||||
} = ddl_context
|
||||
.table_metadata_allocator
|
||||
.create(
|
||||
&TableMetadataAllocatorContext { cluster_id },
|
||||
&create_physical_table_task,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
create_physical_table_task.set_table_id(table_id);
|
||||
create_physical_table_metadata(
|
||||
&ddl_context,
|
||||
create_physical_table_task.table_info.clone(),
|
||||
table_route,
|
||||
)
|
||||
.await;
|
||||
// Creates the logical table metadata.
|
||||
let mut task = test_create_logical_table_task("exists");
|
||||
task.set_table_id(8192);
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.create_logical_tables_metadata(vec![(
|
||||
task.table_info.clone(),
|
||||
TableRouteValue::logical(1024, vec![RegionId::new(8192, 1)]),
|
||||
)])
|
||||
.await
|
||||
.unwrap();
|
||||
// The create logical table procedure.
|
||||
let physical_table_id = table_id;
|
||||
// Sets `create_if_not_exists`
|
||||
task.create_table.create_if_not_exists = true;
|
||||
let non_exist_task = test_create_logical_table_task("non_exists");
|
||||
let mut procedure = CreateLogicalTablesProcedure::new(
|
||||
cluster_id,
|
||||
vec![task, non_exist_task],
|
||||
physical_table_id,
|
||||
ddl_context,
|
||||
);
|
||||
let status = procedure.on_prepare().await.unwrap();
|
||||
assert_matches!(status, Status::Executing { persist: true });
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct NaiveDatanodeHandler;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl MockDatanodeHandler for NaiveDatanodeHandler {
|
||||
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<AffectedRows> {
|
||||
debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
|
||||
Ok(0)
|
||||
}
|
||||
|
||||
async fn handle_query(
|
||||
&self,
|
||||
_peer: &Peer,
|
||||
_request: QueryRequest,
|
||||
) -> Result<SendableRecordBatchStream> {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_create_metadata() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
// Prepares physical table metadata.
|
||||
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
|
||||
let TableMetadata {
|
||||
table_id,
|
||||
table_route,
|
||||
..
|
||||
} = ddl_context
|
||||
.table_metadata_allocator
|
||||
.create(
|
||||
&TableMetadataAllocatorContext { cluster_id },
|
||||
&create_physical_table_task,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
create_physical_table_task.set_table_id(table_id);
|
||||
create_physical_table_metadata(
|
||||
&ddl_context,
|
||||
create_physical_table_task.table_info.clone(),
|
||||
table_route,
|
||||
)
|
||||
.await;
|
||||
// The create logical table procedure.
|
||||
let physical_table_id = table_id;
|
||||
// Creates the logical table metadata.
|
||||
let task = test_create_logical_table_task("foo");
|
||||
let yet_another_task = test_create_logical_table_task("bar");
|
||||
let mut procedure = CreateLogicalTablesProcedure::new(
|
||||
cluster_id,
|
||||
vec![task, yet_another_task],
|
||||
physical_table_id,
|
||||
ddl_context,
|
||||
);
|
||||
let status = procedure.on_prepare().await.unwrap();
|
||||
assert_matches!(status, Status::Executing { persist: true });
|
||||
let ctx = ProcedureContext {
|
||||
procedure_id: ProcedureId::random(),
|
||||
provider: Arc::new(MockContextProvider::default()),
|
||||
};
|
||||
procedure.execute(&ctx).await.unwrap();
|
||||
// Triggers procedure to create table metadata
|
||||
let status = procedure.execute(&ctx).await.unwrap();
|
||||
let table_ids = status.downcast_output_ref::<Vec<u32>>().unwrap();
|
||||
assert_eq!(*table_ids, vec![1025, 1026]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_create_metadata_part_logical_tables_exist() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
// Prepares physical table metadata.
|
||||
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
|
||||
let TableMetadata {
|
||||
table_id,
|
||||
table_route,
|
||||
..
|
||||
} = ddl_context
|
||||
.table_metadata_allocator
|
||||
.create(
|
||||
&TableMetadataAllocatorContext { cluster_id },
|
||||
&create_physical_table_task,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
create_physical_table_task.set_table_id(table_id);
|
||||
create_physical_table_metadata(
|
||||
&ddl_context,
|
||||
create_physical_table_task.table_info.clone(),
|
||||
table_route,
|
||||
)
|
||||
.await;
|
||||
// Creates the logical table metadata.
|
||||
let mut task = test_create_logical_table_task("exists");
|
||||
task.set_table_id(8192);
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.create_logical_tables_metadata(vec![(
|
||||
task.table_info.clone(),
|
||||
TableRouteValue::logical(1024, vec![RegionId::new(8192, 1)]),
|
||||
)])
|
||||
.await
|
||||
.unwrap();
|
||||
// The create logical table procedure.
|
||||
let physical_table_id = table_id;
|
||||
// Sets `create_if_not_exists`
|
||||
task.create_table.create_if_not_exists = true;
|
||||
let non_exist_task = test_create_logical_table_task("non_exists");
|
||||
let mut procedure = CreateLogicalTablesProcedure::new(
|
||||
cluster_id,
|
||||
vec![task, non_exist_task],
|
||||
physical_table_id,
|
||||
ddl_context,
|
||||
);
|
||||
let status = procedure.on_prepare().await.unwrap();
|
||||
assert_matches!(status, Status::Executing { persist: true });
|
||||
let ctx = ProcedureContext {
|
||||
procedure_id: ProcedureId::random(),
|
||||
provider: Arc::new(MockContextProvider::default()),
|
||||
};
|
||||
procedure.execute(&ctx).await.unwrap();
|
||||
// Triggers procedure to create table metadata
|
||||
let status = procedure.execute(&ctx).await.unwrap();
|
||||
let table_ids = status.downcast_output_ref::<Vec<u32>>().unwrap();
|
||||
assert_eq!(*table_ids, vec![8192, 1025]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_create_metadata_err() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
// Prepares physical table metadata.
|
||||
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
|
||||
let TableMetadata {
|
||||
table_id,
|
||||
table_route,
|
||||
..
|
||||
} = ddl_context
|
||||
.table_metadata_allocator
|
||||
.create(
|
||||
&TableMetadataAllocatorContext { cluster_id },
|
||||
&create_physical_table_task,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
create_physical_table_task.set_table_id(table_id);
|
||||
create_physical_table_metadata(
|
||||
&ddl_context,
|
||||
create_physical_table_task.table_info.clone(),
|
||||
table_route,
|
||||
)
|
||||
.await;
|
||||
// The create logical table procedure.
|
||||
let physical_table_id = table_id;
|
||||
// Creates the logical table metadata.
|
||||
let task = test_create_logical_table_task("foo");
|
||||
let yet_another_task = test_create_logical_table_task("bar");
|
||||
let mut procedure = CreateLogicalTablesProcedure::new(
|
||||
cluster_id,
|
||||
vec![task.clone(), yet_another_task],
|
||||
physical_table_id,
|
||||
ddl_context.clone(),
|
||||
);
|
||||
let status = procedure.on_prepare().await.unwrap();
|
||||
assert_matches!(status, Status::Executing { persist: true });
|
||||
let ctx = ProcedureContext {
|
||||
procedure_id: ProcedureId::random(),
|
||||
provider: Arc::new(MockContextProvider::default()),
|
||||
};
|
||||
procedure.execute(&ctx).await.unwrap();
|
||||
// Creates logical table metadata(different with the task)
|
||||
let mut task = task.clone();
|
||||
task.table_info.ident.table_id = 1025;
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.create_logical_tables_metadata(vec![(
|
||||
task.table_info,
|
||||
TableRouteValue::logical(512, vec![RegionId::new(1026, 1)]),
|
||||
)])
|
||||
.await
|
||||
.unwrap();
|
||||
// Triggers procedure to create table metadata
|
||||
let error = procedure.execute(&ctx).await.unwrap_err();
|
||||
assert!(!error.is_retry_later());
|
||||
}
|
||||
328
src/common/meta/src/ddl/tests/create_table.rs
Normal file
328
src/common/meta/src/ddl/tests/create_table.rs
Normal file
@@ -0,0 +1,328 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::meta::Partition;
|
||||
use api::v1::region::{QueryRequest, RegionRequest};
|
||||
use api::v1::{ColumnDataType, SemanticType};
|
||||
use common_error::ext::{BoxedError, ErrorExt};
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId, Status};
|
||||
use common_procedure_test::MockContextProvider;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
use common_telemetry::debug;
|
||||
|
||||
use crate::ddl::create_table::CreateTableProcedure;
|
||||
use crate::ddl::test_util::create_table::build_raw_table_info_from_expr;
|
||||
use crate::ddl::test_util::{TestColumnDefBuilder, TestCreateTableExprBuilder};
|
||||
use crate::error;
|
||||
use crate::error::{Error, Result};
|
||||
use crate::key::table_route::TableRouteValue;
|
||||
use crate::peer::Peer;
|
||||
use crate::rpc::ddl::CreateTableTask;
|
||||
use crate::test_util::{new_ddl_context, AffectedRows, MockDatanodeHandler, MockDatanodeManager};
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl MockDatanodeHandler for () {
|
||||
async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<AffectedRows> {
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
async fn handle_query(
|
||||
&self,
|
||||
_peer: &Peer,
|
||||
_request: QueryRequest,
|
||||
) -> Result<SendableRecordBatchStream> {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
|
||||
fn test_create_table_task(name: &str) -> CreateTableTask {
|
||||
let create_table = TestCreateTableExprBuilder::default()
|
||||
.column_defs([
|
||||
TestColumnDefBuilder::default()
|
||||
.name("ts")
|
||||
.data_type(ColumnDataType::TimestampMillisecond)
|
||||
.semantic_type(SemanticType::Timestamp)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into(),
|
||||
TestColumnDefBuilder::default()
|
||||
.name("host")
|
||||
.data_type(ColumnDataType::String)
|
||||
.semantic_type(SemanticType::Tag)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into(),
|
||||
TestColumnDefBuilder::default()
|
||||
.name("cpu")
|
||||
.data_type(ColumnDataType::Float64)
|
||||
.semantic_type(SemanticType::Field)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into(),
|
||||
])
|
||||
.time_index("ts")
|
||||
.primary_keys(["host".into()])
|
||||
.table_name(name)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into();
|
||||
let table_info = build_raw_table_info_from_expr(&create_table);
|
||||
CreateTableTask {
|
||||
create_table,
|
||||
// Single region
|
||||
partitions: vec![Partition {
|
||||
column_list: vec![],
|
||||
value_list: vec![],
|
||||
}],
|
||||
table_info,
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_prepare_table_exists_err() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
let task = test_create_table_task("foo");
|
||||
assert!(!task.create_table.create_if_not_exists);
|
||||
// Puts a value to table name key.
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.create_table_metadata(
|
||||
task.table_info.clone(),
|
||||
TableRouteValue::physical(vec![]),
|
||||
HashMap::new(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context);
|
||||
let err = procedure.on_prepare().await.unwrap_err();
|
||||
assert_matches!(err, Error::TableAlreadyExists { .. });
|
||||
assert_eq!(err.status_code(), StatusCode::TableAlreadyExists);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_prepare_with_create_if_table_exists() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
let mut task = test_create_table_task("foo");
|
||||
task.create_table.create_if_not_exists = true;
|
||||
task.table_info.ident.table_id = 1024;
|
||||
// Puts a value to table name key.
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.create_table_metadata(
|
||||
task.table_info.clone(),
|
||||
TableRouteValue::physical(vec![]),
|
||||
HashMap::new(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context);
|
||||
let status = procedure.on_prepare().await.unwrap();
|
||||
assert_matches!(status, Status::Done { output: Some(..) });
|
||||
let table_id = *status.downcast_output_ref::<u32>().unwrap();
|
||||
assert_eq!(table_id, 1024);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_prepare_without_create_if_table_exists() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
let mut task = test_create_table_task("foo");
|
||||
task.create_table.create_if_not_exists = true;
|
||||
let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context);
|
||||
let status = procedure.on_prepare().await.unwrap();
|
||||
assert_matches!(status, Status::Executing { persist: true });
|
||||
assert_eq!(procedure.table_id(), 1024);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_prepare_with_no_partition_err() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
let mut task = test_create_table_task("foo");
|
||||
task.partitions = vec![];
|
||||
task.create_table.create_if_not_exists = true;
|
||||
let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context);
|
||||
let err = procedure.on_prepare().await.unwrap_err();
|
||||
assert_matches!(err, Error::Unexpected { .. });
|
||||
assert!(err
|
||||
.to_string()
|
||||
.contains("The number of partitions must be greater than 0"),);
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RetryErrorDatanodeHandler;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl MockDatanodeHandler for RetryErrorDatanodeHandler {
|
||||
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<AffectedRows> {
|
||||
debug!("Returning retry later for request: {request:?}, peer: {peer:?}");
|
||||
Err(Error::RetryLater {
|
||||
source: BoxedError::new(
|
||||
error::UnexpectedSnafu {
|
||||
err_msg: "retry later",
|
||||
}
|
||||
.build(),
|
||||
),
|
||||
})
|
||||
}
|
||||
|
||||
async fn handle_query(
|
||||
&self,
|
||||
_peer: &Peer,
|
||||
_request: QueryRequest,
|
||||
) -> Result<SendableRecordBatchStream> {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_datanode_create_regions_should_retry() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(RetryErrorDatanodeHandler));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
let task = test_create_table_task("foo");
|
||||
assert!(!task.create_table.create_if_not_exists);
|
||||
let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context);
|
||||
procedure.on_prepare().await.unwrap();
|
||||
let ctx = ProcedureContext {
|
||||
procedure_id: ProcedureId::random(),
|
||||
provider: Arc::new(MockContextProvider::default()),
|
||||
};
|
||||
let error = procedure.execute(&ctx).await.unwrap_err();
|
||||
assert!(error.is_retry_later());
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct UnexpectedErrorDatanodeHandler;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl MockDatanodeHandler for UnexpectedErrorDatanodeHandler {
|
||||
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<AffectedRows> {
|
||||
debug!("Returning mock error for request: {request:?}, peer: {peer:?}");
|
||||
error::UnexpectedSnafu {
|
||||
err_msg: "mock error",
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
|
||||
async fn handle_query(
|
||||
&self,
|
||||
_peer: &Peer,
|
||||
_request: QueryRequest,
|
||||
) -> Result<SendableRecordBatchStream> {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_datanode_create_regions_should_not_retry() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
let task = test_create_table_task("foo");
|
||||
assert!(!task.create_table.create_if_not_exists);
|
||||
let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context);
|
||||
procedure.on_prepare().await.unwrap();
|
||||
let ctx = ProcedureContext {
|
||||
procedure_id: ProcedureId::random(),
|
||||
provider: Arc::new(MockContextProvider::default()),
|
||||
};
|
||||
let error = procedure.execute(&ctx).await.unwrap_err();
|
||||
assert!(!error.is_retry_later());
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct NaiveDatanodeHandler;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl MockDatanodeHandler for NaiveDatanodeHandler {
|
||||
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<AffectedRows> {
|
||||
debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
|
||||
Ok(0)
|
||||
}
|
||||
|
||||
async fn handle_query(
|
||||
&self,
|
||||
_peer: &Peer,
|
||||
_request: QueryRequest,
|
||||
) -> Result<SendableRecordBatchStream> {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_create_metadata_error() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
let task = test_create_table_task("foo");
|
||||
assert!(!task.create_table.create_if_not_exists);
|
||||
let mut procedure = CreateTableProcedure::new(cluster_id, task.clone(), ddl_context.clone());
|
||||
procedure.on_prepare().await.unwrap();
|
||||
let ctx = ProcedureContext {
|
||||
procedure_id: ProcedureId::random(),
|
||||
provider: Arc::new(MockContextProvider::default()),
|
||||
};
|
||||
procedure.execute(&ctx).await.unwrap();
|
||||
let mut task = task;
|
||||
// Creates table metadata(different with the task)
|
||||
task.table_info.ident.table_id = procedure.table_id();
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.create_table_metadata(
|
||||
task.table_info.clone(),
|
||||
TableRouteValue::physical(vec![]),
|
||||
HashMap::new(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
// Triggers procedure to create table metadata
|
||||
let error = procedure.execute(&ctx).await.unwrap_err();
|
||||
assert!(!error.is_retry_later());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_create_metadata() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
let task = test_create_table_task("foo");
|
||||
assert!(!task.create_table.create_if_not_exists);
|
||||
let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context);
|
||||
procedure.on_prepare().await.unwrap();
|
||||
let ctx = ProcedureContext {
|
||||
procedure_id: ProcedureId::random(),
|
||||
provider: Arc::new(MockContextProvider::default()),
|
||||
};
|
||||
procedure.execute(&ctx).await.unwrap();
|
||||
// Triggers procedure to create table metadata
|
||||
let status = procedure.execute(&ctx).await.unwrap();
|
||||
let table_id = status.downcast_output_ref::<u32>().unwrap();
|
||||
assert_eq!(*table_id, 1024);
|
||||
}
|
||||
@@ -31,7 +31,7 @@ use table::metadata::{RawTableInfo, TableId};
|
||||
use table::table_reference::TableReference;
|
||||
|
||||
use super::utils::handle_retry_error;
|
||||
use crate::ddl::utils::handle_operate_region_error;
|
||||
use crate::ddl::utils::add_peer_context_if_needed;
|
||||
use crate::ddl::DdlContext;
|
||||
use crate::error::{Result, TableNotFoundSnafu};
|
||||
use crate::key::table_info::TableInfoValue;
|
||||
@@ -169,10 +169,10 @@ impl TruncateTableProcedure {
|
||||
let requester = requester.clone();
|
||||
|
||||
truncate_region_tasks.push(async move {
|
||||
if let Err(err) = requester.handle(request).await {
|
||||
return Err(handle_operate_region_error(datanode)(err));
|
||||
}
|
||||
Ok(())
|
||||
requester
|
||||
.handle(request)
|
||||
.await
|
||||
.map_err(add_peer_context_if_needed(datanode))
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,19 +27,17 @@ use crate::key::TableMetadataManagerRef;
|
||||
use crate::peer::Peer;
|
||||
use crate::rpc::ddl::CreateTableTask;
|
||||
|
||||
pub fn handle_operate_region_error(datanode: Peer) -> impl FnOnce(Error) -> Error {
|
||||
/// Adds [Peer] context if the error is unretryable.
|
||||
pub fn add_peer_context_if_needed(datanode: Peer) -> impl FnOnce(Error) -> Error {
|
||||
move |err| {
|
||||
if matches!(err, Error::RetryLater { .. }) {
|
||||
Error::RetryLater {
|
||||
source: BoxedError::new(err),
|
||||
}
|
||||
} else {
|
||||
Error::OperateDatanode {
|
||||
if !err.is_retry_later() {
|
||||
return Error::OperateDatanode {
|
||||
location: location!(),
|
||||
peer: datanode,
|
||||
source: BoxedError::new(err),
|
||||
}
|
||||
};
|
||||
}
|
||||
err
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,14 +12,13 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_procedure::{watcher, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId};
|
||||
use common_telemetry::tracing_context::{FutureExt, TracingContext};
|
||||
use common_telemetry::{info, tracing};
|
||||
use common_telemetry::{debug, info, tracing};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::storage::{RegionNumber, TableId};
|
||||
use store_api::storage::TableId;
|
||||
|
||||
use crate::cache_invalidator::CacheInvalidatorRef;
|
||||
use crate::datanode_manager::DatanodeManagerRef;
|
||||
@@ -27,15 +26,12 @@ use crate::ddl::alter_table::AlterTableProcedure;
|
||||
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
|
||||
use crate::ddl::create_table::CreateTableProcedure;
|
||||
use crate::ddl::drop_table::DropTableProcedure;
|
||||
use crate::ddl::table_meta::TableMetadataAllocator;
|
||||
use crate::ddl::table_meta::TableMetadataAllocatorRef;
|
||||
use crate::ddl::truncate_table::TruncateTableProcedure;
|
||||
use crate::ddl::{
|
||||
utils, DdlContext, DdlTaskExecutor, ExecutorContext, TableMetadata,
|
||||
TableMetadataAllocatorContext,
|
||||
};
|
||||
use crate::ddl::{utils, DdlContext, ExecutorContext, ProcedureExecutor};
|
||||
use crate::error::{
|
||||
self, EmptyCreateTableTasksSnafu, ProcedureOutputSnafu, RegisterProcedureLoaderSnafu, Result,
|
||||
SubmitProcedureSnafu, TableNotFoundSnafu, WaitProcedureSnafu,
|
||||
SubmitProcedureSnafu, TableNotFoundSnafu, UnsupportedSnafu, WaitProcedureSnafu,
|
||||
};
|
||||
use crate::key::table_info::TableInfoValue;
|
||||
use crate::key::table_name::TableNameKey;
|
||||
@@ -50,6 +46,8 @@ use crate::rpc::ddl::{
|
||||
AlterTableTask, CreateTableTask, DropTableTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse,
|
||||
TruncateTableTask,
|
||||
};
|
||||
use crate::rpc::procedure;
|
||||
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
|
||||
use crate::rpc::router::RegionRoute;
|
||||
use crate::table_name::TableName;
|
||||
use crate::ClusterId;
|
||||
@@ -62,7 +60,7 @@ pub struct DdlManager {
|
||||
datanode_manager: DatanodeManagerRef,
|
||||
cache_invalidator: CacheInvalidatorRef,
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
table_metadata_allocator: TableMetadataAllocator,
|
||||
table_metadata_allocator: TableMetadataAllocatorRef,
|
||||
memory_region_keeper: MemoryRegionKeeperRef,
|
||||
}
|
||||
|
||||
@@ -73,7 +71,7 @@ impl DdlManager {
|
||||
datanode_clients: DatanodeManagerRef,
|
||||
cache_invalidator: CacheInvalidatorRef,
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
table_metadata_allocator: TableMetadataAllocator,
|
||||
table_metadata_allocator: TableMetadataAllocatorRef,
|
||||
memory_region_keeper: MemoryRegionKeeperRef,
|
||||
) -> Result<Self> {
|
||||
let manager = Self {
|
||||
@@ -100,6 +98,7 @@ impl DdlManager {
|
||||
cache_invalidator: self.cache_invalidator.clone(),
|
||||
table_metadata_manager: self.table_metadata_manager.clone(),
|
||||
memory_region_keeper: self.memory_region_keeper.clone(),
|
||||
table_metadata_allocator: self.table_metadata_allocator.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -205,18 +204,10 @@ impl DdlManager {
|
||||
&self,
|
||||
cluster_id: ClusterId,
|
||||
create_table_task: CreateTableTask,
|
||||
table_route: TableRouteValue,
|
||||
region_wal_options: HashMap<RegionNumber, String>,
|
||||
) -> Result<(ProcedureId, Option<Output>)> {
|
||||
let context = self.create_context();
|
||||
|
||||
let procedure = CreateTableProcedure::new(
|
||||
cluster_id,
|
||||
create_table_task,
|
||||
table_route,
|
||||
region_wal_options,
|
||||
context,
|
||||
);
|
||||
let procedure = CreateTableProcedure::new(cluster_id, create_table_task, context);
|
||||
|
||||
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
|
||||
|
||||
@@ -470,31 +461,10 @@ async fn handle_drop_table_task(
|
||||
async fn handle_create_table_task(
|
||||
ddl_manager: &DdlManager,
|
||||
cluster_id: ClusterId,
|
||||
mut create_table_task: CreateTableTask,
|
||||
create_table_task: CreateTableTask,
|
||||
) -> Result<SubmitDdlTaskResponse> {
|
||||
let table_meta = ddl_manager
|
||||
.table_metadata_allocator
|
||||
.create(
|
||||
&TableMetadataAllocatorContext { cluster_id },
|
||||
&create_table_task,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let TableMetadata {
|
||||
table_id,
|
||||
table_route,
|
||||
region_wal_options,
|
||||
} = table_meta;
|
||||
|
||||
create_table_task.table_info.ident.table_id = table_id;
|
||||
|
||||
let (id, output) = ddl_manager
|
||||
.submit_create_table_task(
|
||||
cluster_id,
|
||||
create_table_task,
|
||||
table_route,
|
||||
region_wal_options,
|
||||
)
|
||||
.submit_create_table_task(cluster_id, create_table_task)
|
||||
.await?;
|
||||
|
||||
let procedure_id = id.to_string();
|
||||
@@ -559,8 +529,9 @@ async fn handle_create_logical_table_tasks(
|
||||
})
|
||||
}
|
||||
|
||||
/// TODO(dennis): let [`DdlManager`] implement [`ProcedureExecutor`] looks weird, find some way to refactor it.
|
||||
#[async_trait::async_trait]
|
||||
impl DdlTaskExecutor for DdlManager {
|
||||
impl ProcedureExecutor for DdlManager {
|
||||
async fn submit_ddl_task(
|
||||
&self,
|
||||
ctx: &ExecutorContext,
|
||||
@@ -574,7 +545,7 @@ impl DdlTaskExecutor for DdlManager {
|
||||
.attach(tracing::info_span!("DdlManager::submit_ddl_task"));
|
||||
async move {
|
||||
let cluster_id = ctx.cluster_id.unwrap_or_default();
|
||||
info!("Submitting Ddl task: {:?}", request.task);
|
||||
debug!("Submitting Ddl task: {:?}", request.task);
|
||||
match request.task {
|
||||
CreateTable(create_table_task) => {
|
||||
handle_create_table_task(self, cluster_id, create_table_task).await
|
||||
@@ -598,6 +569,37 @@ impl DdlTaskExecutor for DdlManager {
|
||||
.trace(span)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn migrate_region(
|
||||
&self,
|
||||
_ctx: &ExecutorContext,
|
||||
_request: MigrateRegionRequest,
|
||||
) -> Result<MigrateRegionResponse> {
|
||||
UnsupportedSnafu {
|
||||
operation: "migrate_region",
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
|
||||
async fn query_procedure_state(
|
||||
&self,
|
||||
_ctx: &ExecutorContext,
|
||||
pid: &str,
|
||||
) -> Result<ProcedureStateResponse> {
|
||||
let pid = ProcedureId::parse_str(pid)
|
||||
.with_context(|_| error::ParseProcedureIdSnafu { key: pid })?;
|
||||
|
||||
let state = self
|
||||
.procedure_manager
|
||||
.procedure_state(pid)
|
||||
.await
|
||||
.context(error::QueryProcedureSnafu)?
|
||||
.context(error::ProcedureNotFoundSnafu {
|
||||
pid: pid.to_string(),
|
||||
})?;
|
||||
|
||||
Ok(procedure::procedure_state_to_pb_response(&state))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -644,12 +646,12 @@ mod tests {
|
||||
procedure_manager.clone(),
|
||||
Arc::new(DummyDatanodeManager),
|
||||
Arc::new(DummyCacheInvalidator),
|
||||
table_metadata_manager,
|
||||
TableMetadataAllocator::new(
|
||||
table_metadata_manager.clone(),
|
||||
Arc::new(TableMetadataAllocator::new(
|
||||
Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
|
||||
Arc::new(WalOptionsAllocator::default()),
|
||||
Arc::new(TableMetadataManager::new(kv_backend)),
|
||||
),
|
||||
table_metadata_manager.table_name_manager().clone(),
|
||||
)),
|
||||
Arc::new(MemoryRegionKeeper::default()),
|
||||
);
|
||||
|
||||
|
||||
@@ -100,6 +100,15 @@ pub enum Error {
|
||||
source: common_procedure::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to query procedure"))]
|
||||
QueryProcedure {
|
||||
location: Location,
|
||||
source: common_procedure::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Procedure not found: {pid}"))]
|
||||
ProcedureNotFound { location: Location, pid: String },
|
||||
|
||||
#[snafu(display("Failed to parse procedure id: {key}"))]
|
||||
ParseProcedureId {
|
||||
location: Location,
|
||||
@@ -331,6 +340,9 @@ pub enum Error {
|
||||
error: rskafka::client::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to resolve Kafka broker endpoint."))]
|
||||
ResolveKafkaEndpoint { source: common_wal::error::Error },
|
||||
|
||||
#[snafu(display("Failed to build a Kafka controller client"))]
|
||||
BuildKafkaCtrlClient {
|
||||
location: Location,
|
||||
@@ -416,6 +428,7 @@ impl ErrorExt for Error {
|
||||
| BuildKafkaClient { .. }
|
||||
| BuildKafkaCtrlClient { .. }
|
||||
| BuildKafkaPartitionClient { .. }
|
||||
| ResolveKafkaEndpoint { .. }
|
||||
| ProduceRecord { .. }
|
||||
| CreateKafkaWalTopic { .. }
|
||||
| EmptyTopicPool { .. }
|
||||
@@ -431,14 +444,17 @@ impl ErrorExt for Error {
|
||||
| RenameTable { .. }
|
||||
| Unsupported { .. } => StatusCode::Internal,
|
||||
|
||||
PrimaryKeyNotFound { .. } | EmptyKey { .. } | InvalidEngineType { .. } => {
|
||||
StatusCode::InvalidArguments
|
||||
}
|
||||
ProcedureNotFound { .. }
|
||||
| PrimaryKeyNotFound { .. }
|
||||
| EmptyKey { .. }
|
||||
| InvalidEngineType { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
TableNotFound { .. } => StatusCode::TableNotFound,
|
||||
TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
|
||||
|
||||
SubmitProcedure { source, .. } | WaitProcedure { source, .. } => source.status_code(),
|
||||
SubmitProcedure { source, .. }
|
||||
| QueryProcedure { source, .. }
|
||||
| WaitProcedure { source, .. } => source.status_code(),
|
||||
RegisterProcedureLoader { source, .. } => source.status_code(),
|
||||
External { source, .. } => source.status_code(),
|
||||
OperateDatanode { source, .. } => source.status_code(),
|
||||
|
||||
@@ -363,8 +363,10 @@ impl TableMetadataManager {
|
||||
Option<DeserializedValueWithBytes<TableInfoValue>>,
|
||||
Option<DeserializedValueWithBytes<TableRouteValue>>,
|
||||
)> {
|
||||
let (get_table_route_txn, table_route_decoder) =
|
||||
self.table_route_manager.build_get_txn(table_id);
|
||||
let (get_table_route_txn, table_route_decoder) = self
|
||||
.table_route_manager
|
||||
.table_route_storage()
|
||||
.build_get_txn(table_id);
|
||||
|
||||
let (get_table_info_txn, table_info_decoder) =
|
||||
self.table_info_manager.build_get_txn(table_id);
|
||||
@@ -414,6 +416,7 @@ impl TableMetadataManager {
|
||||
|
||||
let (create_table_route_txn, on_create_table_route_failure) = self
|
||||
.table_route_manager()
|
||||
.table_route_storage()
|
||||
.build_create_txn(table_id, &table_route_value)?;
|
||||
|
||||
let mut txn = Txn::merge_all(vec![
|
||||
@@ -506,6 +509,7 @@ impl TableMetadataManager {
|
||||
|
||||
let (create_table_route_txn, on_create_table_route_failure) = self
|
||||
.table_route_manager()
|
||||
.table_route_storage()
|
||||
.build_create_txn(table_id, &table_route_value)?;
|
||||
txns.push(create_table_route_txn);
|
||||
|
||||
@@ -579,6 +583,7 @@ impl TableMetadataManager {
|
||||
// Deletes table route.
|
||||
let delete_table_route_txn = self
|
||||
.table_route_manager()
|
||||
.table_route_storage()
|
||||
.build_delete_txn(table_id, table_route_value)?;
|
||||
|
||||
let txn = Txn::merge_all(vec![
|
||||
@@ -713,6 +718,7 @@ impl TableMetadataManager {
|
||||
|
||||
let (update_table_route_txn, on_update_table_route_failure) = self
|
||||
.table_route_manager()
|
||||
.table_route_storage()
|
||||
.build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
|
||||
|
||||
let txn = Txn::merge_all(vec![update_datanode_table_txn, update_table_route_txn]);
|
||||
@@ -765,6 +771,7 @@ impl TableMetadataManager {
|
||||
|
||||
let (update_table_route_txn, on_update_table_route_failure) = self
|
||||
.table_route_manager()
|
||||
.table_route_storage()
|
||||
.build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
|
||||
|
||||
let r = self.kv_backend.txn(update_table_route_txn).await?;
|
||||
@@ -1096,6 +1103,7 @@ mod tests {
|
||||
|
||||
assert!(table_metadata_manager
|
||||
.table_route_manager()
|
||||
.table_route_storage()
|
||||
.get(table_id)
|
||||
.await
|
||||
.unwrap()
|
||||
@@ -1120,7 +1128,8 @@ mod tests {
|
||||
|
||||
let removed_table_route = table_metadata_manager
|
||||
.table_route_manager()
|
||||
.get_removed(table_id)
|
||||
.table_route_storage()
|
||||
.get_raw_removed(table_id)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
@@ -1316,6 +1325,7 @@ mod tests {
|
||||
|
||||
let updated_route_value = table_metadata_manager
|
||||
.table_route_manager()
|
||||
.table_route_storage()
|
||||
.get(table_id)
|
||||
.await
|
||||
.unwrap()
|
||||
|
||||
@@ -144,6 +144,7 @@ impl TableNameValue {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct TableNameManager {
|
||||
kv_backend: KvBackendRef,
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@ use table::metadata::TableId;
|
||||
|
||||
use super::{txn_helper, DeserializedValueWithBytes, TableMetaValue};
|
||||
use crate::error::{
|
||||
MetadataCorruptionSnafu, Result, SerdeJsonSnafu, TableRouteNotFoundSnafu,
|
||||
self, MetadataCorruptionSnafu, Result, SerdeJsonSnafu, TableRouteNotFoundSnafu,
|
||||
UnexpectedLogicalRouteTableSnafu,
|
||||
};
|
||||
use crate::key::{to_removed_key, RegionDistribution, TableMetaKey, TABLE_ROUTE_PREFIX};
|
||||
@@ -77,7 +77,7 @@ impl TableRouteValue {
|
||||
err_msg: format!("{self:?} is a non-physical TableRouteValue."),
|
||||
}
|
||||
);
|
||||
let version = self.physical_table_route().version;
|
||||
let version = self.as_physical_table_route_ref().version;
|
||||
Ok(Self::Physical(PhysicalTableRouteValue {
|
||||
region_routes,
|
||||
version: version + 1,
|
||||
@@ -95,7 +95,7 @@ impl TableRouteValue {
|
||||
err_msg: format!("{self:?} is a non-physical TableRouteValue."),
|
||||
}
|
||||
);
|
||||
Ok(self.physical_table_route().version)
|
||||
Ok(self.as_physical_table_route_ref().version)
|
||||
}
|
||||
|
||||
/// Returns the corresponding [RegionRoute], returns `None` if it's the specific region is not found.
|
||||
@@ -109,7 +109,7 @@ impl TableRouteValue {
|
||||
}
|
||||
);
|
||||
Ok(self
|
||||
.physical_table_route()
|
||||
.as_physical_table_route_ref()
|
||||
.region_routes
|
||||
.iter()
|
||||
.find(|route| route.region.id == region_id)
|
||||
@@ -129,10 +129,25 @@ impl TableRouteValue {
|
||||
err_msg: format!("{self:?} is a non-physical TableRouteValue."),
|
||||
}
|
||||
);
|
||||
Ok(&self.physical_table_route().region_routes)
|
||||
Ok(&self.as_physical_table_route_ref().region_routes)
|
||||
}
|
||||
|
||||
fn physical_table_route(&self) -> &PhysicalTableRouteValue {
|
||||
/// Returns the reference of [`PhysicalTableRouteValue`].
|
||||
///
|
||||
/// # Panic
|
||||
/// If it is not the [`PhysicalTableRouteValue`].
|
||||
fn as_physical_table_route_ref(&self) -> &PhysicalTableRouteValue {
|
||||
match self {
|
||||
TableRouteValue::Physical(x) => x,
|
||||
_ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts to [`PhysicalTableRouteValue`].
|
||||
///
|
||||
/// # Panic
|
||||
/// If it is not the [`PhysicalTableRouteValue`].
|
||||
fn into_physical_table_route(self) -> PhysicalTableRouteValue {
|
||||
match self {
|
||||
TableRouteValue::Physical(x) => x,
|
||||
_ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"),
|
||||
@@ -213,111 +228,53 @@ impl Display for TableRouteKey {
|
||||
}
|
||||
|
||||
pub struct TableRouteManager {
|
||||
kv_backend: KvBackendRef,
|
||||
storage: TableRouteStorage,
|
||||
}
|
||||
|
||||
impl TableRouteManager {
|
||||
pub fn new(kv_backend: KvBackendRef) -> Self {
|
||||
Self { kv_backend }
|
||||
Self {
|
||||
storage: TableRouteStorage::new(kv_backend),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn build_get_txn(
|
||||
/// Returns the [`PhysicalTableRouteValue`] in the first level,
|
||||
/// It won't follow the [`LogicalTableRouteValue`] to find the next level [`PhysicalTableRouteValue`].
|
||||
///
|
||||
/// Returns an error if the first level value is not a [`PhysicalTableRouteValue`].
|
||||
pub async fn try_get_physical_table_route(
|
||||
&self,
|
||||
table_id: TableId,
|
||||
) -> (
|
||||
Txn,
|
||||
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>>,
|
||||
) {
|
||||
let key = TableRouteKey::new(table_id);
|
||||
let raw_key = key.as_raw_key();
|
||||
let txn = Txn::new().and_then(vec![TxnOp::Get(raw_key.clone())]);
|
||||
|
||||
(txn, txn_helper::build_txn_response_decoder_fn(raw_key))
|
||||
}
|
||||
|
||||
/// Builds a create table route transaction. it expected the `__table_route/{table_id}` wasn't occupied.
|
||||
pub fn build_create_txn(
|
||||
&self,
|
||||
table_id: TableId,
|
||||
table_route_value: &TableRouteValue,
|
||||
) -> Result<(
|
||||
Txn,
|
||||
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>>,
|
||||
)> {
|
||||
let key = TableRouteKey::new(table_id);
|
||||
let raw_key = key.as_raw_key();
|
||||
|
||||
let txn = txn_helper::build_put_if_absent_txn(
|
||||
raw_key.clone(),
|
||||
table_route_value.try_as_raw_value()?,
|
||||
);
|
||||
|
||||
Ok((txn, txn_helper::build_txn_response_decoder_fn(raw_key)))
|
||||
}
|
||||
|
||||
/// Builds a update table route transaction, it expected the remote value equals the `current_table_route_value`.
|
||||
/// It retrieves the latest value if the comparing failed.
|
||||
pub(crate) fn build_update_txn(
|
||||
&self,
|
||||
table_id: TableId,
|
||||
current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
|
||||
new_table_route_value: &TableRouteValue,
|
||||
) -> Result<(
|
||||
Txn,
|
||||
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>>,
|
||||
)> {
|
||||
let key = TableRouteKey::new(table_id);
|
||||
let raw_key = key.as_raw_key();
|
||||
let raw_value = current_table_route_value.get_raw_bytes();
|
||||
let new_raw_value: Vec<u8> = new_table_route_value.try_as_raw_value()?;
|
||||
|
||||
let txn = txn_helper::build_compare_and_put_txn(raw_key.clone(), raw_value, new_raw_value);
|
||||
|
||||
Ok((txn, txn_helper::build_txn_response_decoder_fn(raw_key)))
|
||||
}
|
||||
|
||||
/// Builds a delete table route transaction, it expected the remote value equals the `table_route_value`.
|
||||
pub(crate) fn build_delete_txn(
|
||||
&self,
|
||||
table_id: TableId,
|
||||
table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
|
||||
) -> Result<Txn> {
|
||||
let key = TableRouteKey::new(table_id);
|
||||
let raw_key = key.as_raw_key();
|
||||
let raw_value = table_route_value.get_raw_bytes();
|
||||
let removed_key = to_removed_key(&String::from_utf8_lossy(&raw_key));
|
||||
|
||||
let txn = Txn::new().and_then(vec![
|
||||
TxnOp::Delete(raw_key),
|
||||
TxnOp::Put(removed_key.into_bytes(), raw_value),
|
||||
]);
|
||||
|
||||
Ok(txn)
|
||||
}
|
||||
|
||||
pub async fn get(
|
||||
&self,
|
||||
table_id: TableId,
|
||||
) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>> {
|
||||
let key = TableRouteKey::new(table_id);
|
||||
self.kv_backend
|
||||
.get(&key.as_raw_key())
|
||||
.await?
|
||||
.map(|kv| DeserializedValueWithBytes::from_inner_slice(&kv.value))
|
||||
.transpose()
|
||||
) -> Result<Option<PhysicalTableRouteValue>> {
|
||||
match self.storage.get(table_id).await? {
|
||||
Some(route) => {
|
||||
ensure!(
|
||||
route.is_physical(),
|
||||
error::UnexpectedLogicalRouteTableSnafu {
|
||||
err_msg: format!("{route:?} is a non-physical TableRouteValue.")
|
||||
}
|
||||
);
|
||||
Ok(Some(route.into_physical_table_route()))
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the [TableId] recursively.
|
||||
///
|
||||
/// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if:
|
||||
/// - the table(`logical_or_physical_table_id`) does not exist.
|
||||
pub async fn get_physical_table_id(
|
||||
&self,
|
||||
logical_or_physical_table_id: TableId,
|
||||
) -> Result<TableId> {
|
||||
let table_route = self
|
||||
.storage
|
||||
.get(logical_or_physical_table_id)
|
||||
.await?
|
||||
.context(TableRouteNotFoundSnafu {
|
||||
table_id: logical_or_physical_table_id,
|
||||
})?
|
||||
.into_inner();
|
||||
})?;
|
||||
|
||||
match table_route {
|
||||
TableRouteValue::Physical(_) => Ok(logical_or_physical_table_id),
|
||||
@@ -325,41 +282,58 @@ impl TableRouteManager {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the [TableRouteValue::Physical] recursively.
|
||||
///
|
||||
/// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if:
|
||||
/// - the physical table(`logical_or_physical_table_id`) does not exist
|
||||
/// - the corresponding physical table of the logical table(`logical_or_physical_table_id`) does not exist.
|
||||
pub async fn get_physical_table_route(
|
||||
&self,
|
||||
logical_or_physical_table_id: TableId,
|
||||
) -> Result<(TableId, PhysicalTableRouteValue)> {
|
||||
let table_route = self
|
||||
.storage
|
||||
.get(logical_or_physical_table_id)
|
||||
.await?
|
||||
.context(TableRouteNotFoundSnafu {
|
||||
table_id: logical_or_physical_table_id,
|
||||
})?
|
||||
.into_inner();
|
||||
})?;
|
||||
|
||||
match table_route {
|
||||
TableRouteValue::Physical(x) => Ok((logical_or_physical_table_id, x)),
|
||||
TableRouteValue::Logical(x) => {
|
||||
let physical_table_id = x.physical_table_id();
|
||||
let physical_table_route =
|
||||
self.get(physical_table_id)
|
||||
.await?
|
||||
.context(TableRouteNotFoundSnafu {
|
||||
table_id: physical_table_id,
|
||||
})?;
|
||||
Ok((
|
||||
physical_table_id,
|
||||
physical_table_route.physical_table_route().clone(),
|
||||
))
|
||||
let physical_table_route = self.storage.get(physical_table_id).await?.context(
|
||||
TableRouteNotFoundSnafu {
|
||||
table_id: physical_table_id,
|
||||
},
|
||||
)?;
|
||||
let physical_table_route = physical_table_route.into_physical_table_route();
|
||||
Ok((physical_table_id, physical_table_route))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the [TableRouteValue::Physical] recursively.
|
||||
///
|
||||
/// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if:
|
||||
/// - one of the logical tables corresponding to the physical table does not exist.
|
||||
///
|
||||
/// **Notes**: it may return a subset of `logical_or_physical_table_ids`.
|
||||
pub async fn batch_get_physical_table_routes(
|
||||
&self,
|
||||
logical_or_physical_table_ids: &[TableId],
|
||||
) -> Result<HashMap<TableId, PhysicalTableRouteValue>> {
|
||||
let table_routes = self.batch_get(logical_or_physical_table_ids).await?;
|
||||
let table_routes = self
|
||||
.storage
|
||||
.batch_get(logical_or_physical_table_ids)
|
||||
.await?;
|
||||
// Returns a subset of `logical_or_physical_table_ids`.
|
||||
let table_routes = table_routes
|
||||
.into_iter()
|
||||
.zip(logical_or_physical_table_ids)
|
||||
.filter_map(|(route, id)| route.map(|route| (*id, route)))
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
let mut physical_table_routes = HashMap::with_capacity(table_routes.len());
|
||||
let mut logical_table_ids = HashMap::with_capacity(table_routes.len());
|
||||
@@ -379,13 +353,22 @@ impl TableRouteManager {
|
||||
return Ok(physical_table_routes);
|
||||
}
|
||||
|
||||
// Finds the logical tables corresponding to the physical tables.
|
||||
let physical_table_ids = logical_table_ids
|
||||
.values()
|
||||
.cloned()
|
||||
.collect::<HashSet<_>>()
|
||||
.into_iter()
|
||||
.collect::<Vec<_>>();
|
||||
let table_routes = self.batch_get(&physical_table_ids).await?;
|
||||
let table_routes = self
|
||||
.table_route_storage()
|
||||
.batch_get(&physical_table_ids)
|
||||
.await?;
|
||||
let table_routes = table_routes
|
||||
.into_iter()
|
||||
.zip(physical_table_ids)
|
||||
.filter_map(|(route, id)| route.map(|route| (id, route)))
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
for (logical_table_id, physical_table_id) in logical_table_ids {
|
||||
let table_route =
|
||||
@@ -414,40 +397,114 @@ impl TableRouteManager {
|
||||
Ok(physical_table_routes)
|
||||
}
|
||||
|
||||
/// It may return a subset of the `table_ids`.
|
||||
pub async fn batch_get(
|
||||
/// Returns [`RegionDistribution`] of the table(`table_id`).
|
||||
pub async fn get_region_distribution(
|
||||
&self,
|
||||
table_ids: &[TableId],
|
||||
) -> Result<HashMap<TableId, TableRouteValue>> {
|
||||
let lookup_table = table_ids
|
||||
.iter()
|
||||
.map(|id| (TableRouteKey::new(*id).as_raw_key(), id))
|
||||
.collect::<HashMap<_, _>>();
|
||||
table_id: TableId,
|
||||
) -> Result<Option<RegionDistribution>> {
|
||||
self.storage
|
||||
.get(table_id)
|
||||
.await?
|
||||
.map(|table_route| Ok(region_distribution(table_route.region_routes()?)))
|
||||
.transpose()
|
||||
}
|
||||
|
||||
let resp = self
|
||||
.kv_backend
|
||||
.batch_get(BatchGetRequest {
|
||||
keys: lookup_table.keys().cloned().collect::<Vec<_>>(),
|
||||
})
|
||||
.await?;
|
||||
/// Returns low-level APIs.
|
||||
pub fn table_route_storage(&self) -> &TableRouteStorage {
|
||||
&self.storage
|
||||
}
|
||||
}
|
||||
|
||||
let values = resp
|
||||
.kvs
|
||||
.iter()
|
||||
.map(|kv| {
|
||||
Ok((
|
||||
// Safety: must exist.
|
||||
**lookup_table.get(kv.key()).unwrap(),
|
||||
TableRouteValue::try_from_raw_value(&kv.value)?,
|
||||
))
|
||||
})
|
||||
.collect::<Result<HashMap<_, _>>>()?;
|
||||
/// Low-level operations of [TableRouteValue].
|
||||
pub struct TableRouteStorage {
|
||||
kv_backend: KvBackendRef,
|
||||
}
|
||||
|
||||
Ok(values)
|
||||
impl TableRouteStorage {
|
||||
pub fn new(kv_backend: KvBackendRef) -> Self {
|
||||
Self { kv_backend }
|
||||
}
|
||||
|
||||
/// Builds a get table route transaction(readonly).
|
||||
pub(crate) fn build_get_txn(
|
||||
&self,
|
||||
table_id: TableId,
|
||||
) -> (
|
||||
Txn,
|
||||
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>>,
|
||||
) {
|
||||
let key = TableRouteKey::new(table_id);
|
||||
let raw_key = key.as_raw_key();
|
||||
let txn = Txn::new().and_then(vec![TxnOp::Get(raw_key.clone())]);
|
||||
|
||||
(txn, txn_helper::build_txn_response_decoder_fn(raw_key))
|
||||
}
|
||||
|
||||
/// Builds a create table route transaction,
|
||||
/// it expected the `__table_route/{table_id}` wasn't occupied.
|
||||
pub fn build_create_txn(
|
||||
&self,
|
||||
table_id: TableId,
|
||||
table_route_value: &TableRouteValue,
|
||||
) -> Result<(
|
||||
Txn,
|
||||
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>>,
|
||||
)> {
|
||||
let key = TableRouteKey::new(table_id);
|
||||
let raw_key = key.as_raw_key();
|
||||
|
||||
let txn = txn_helper::build_put_if_absent_txn(
|
||||
raw_key.clone(),
|
||||
table_route_value.try_as_raw_value()?,
|
||||
);
|
||||
|
||||
Ok((txn, txn_helper::build_txn_response_decoder_fn(raw_key)))
|
||||
}
|
||||
|
||||
/// Builds a update table route transaction,
|
||||
/// it expected the remote value equals the `current_table_route_value`.
|
||||
/// It retrieves the latest value if the comparing failed.
|
||||
pub(crate) fn build_update_txn(
|
||||
&self,
|
||||
table_id: TableId,
|
||||
current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
|
||||
new_table_route_value: &TableRouteValue,
|
||||
) -> Result<(
|
||||
Txn,
|
||||
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>>,
|
||||
)> {
|
||||
let key = TableRouteKey::new(table_id);
|
||||
let raw_key = key.as_raw_key();
|
||||
let raw_value = current_table_route_value.get_raw_bytes();
|
||||
let new_raw_value: Vec<u8> = new_table_route_value.try_as_raw_value()?;
|
||||
|
||||
let txn = txn_helper::build_compare_and_put_txn(raw_key.clone(), raw_value, new_raw_value);
|
||||
|
||||
Ok((txn, txn_helper::build_txn_response_decoder_fn(raw_key)))
|
||||
}
|
||||
|
||||
/// Builds a delete table route transaction,
|
||||
/// it expected the remote value equals the `table_route_value`.
|
||||
pub(crate) fn build_delete_txn(
|
||||
&self,
|
||||
table_id: TableId,
|
||||
table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
|
||||
) -> Result<Txn> {
|
||||
let key = TableRouteKey::new(table_id);
|
||||
let raw_key = key.as_raw_key();
|
||||
let raw_value = table_route_value.get_raw_bytes();
|
||||
let removed_key = to_removed_key(&String::from_utf8_lossy(&raw_key));
|
||||
|
||||
let txn = Txn::new().and_then(vec![
|
||||
TxnOp::Delete(raw_key),
|
||||
TxnOp::Put(removed_key.into_bytes(), raw_value),
|
||||
]);
|
||||
|
||||
Ok(txn)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub async fn get_removed(
|
||||
pub async fn get_raw_removed(
|
||||
&self,
|
||||
table_id: TableId,
|
||||
) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>> {
|
||||
@@ -460,20 +517,64 @@ impl TableRouteManager {
|
||||
.transpose()
|
||||
}
|
||||
|
||||
pub async fn get_region_distribution(
|
||||
/// Returns the [`TableRouteValue`].
|
||||
pub async fn get(&self, table_id: TableId) -> Result<Option<TableRouteValue>> {
|
||||
let key = TableRouteKey::new(table_id);
|
||||
self.kv_backend
|
||||
.get(&key.as_raw_key())
|
||||
.await?
|
||||
.map(|kv| TableRouteValue::try_from_raw_value(&kv.value))
|
||||
.transpose()
|
||||
}
|
||||
|
||||
/// Returns the [`TableRouteValue`] wrapped with [`DeserializedValueWithBytes`].
|
||||
pub async fn get_raw(
|
||||
&self,
|
||||
table_id: TableId,
|
||||
) -> Result<Option<RegionDistribution>> {
|
||||
self.get(table_id)
|
||||
) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>> {
|
||||
let key = TableRouteKey::new(table_id);
|
||||
self.kv_backend
|
||||
.get(&key.as_raw_key())
|
||||
.await?
|
||||
.map(|table_route| Ok(region_distribution(table_route.region_routes()?)))
|
||||
.map(|kv| DeserializedValueWithBytes::from_inner_slice(&kv.value))
|
||||
.transpose()
|
||||
}
|
||||
|
||||
/// Returns batch of [`TableRouteValue`] that respects the order of `table_ids`.
|
||||
pub async fn batch_get(&self, table_ids: &[TableId]) -> Result<Vec<Option<TableRouteValue>>> {
|
||||
let keys = table_ids
|
||||
.iter()
|
||||
.map(|id| TableRouteKey::new(*id).as_raw_key())
|
||||
.collect::<Vec<_>>();
|
||||
let resp = self
|
||||
.kv_backend
|
||||
.batch_get(BatchGetRequest { keys: keys.clone() })
|
||||
.await?;
|
||||
|
||||
let kvs = resp
|
||||
.kvs
|
||||
.into_iter()
|
||||
.map(|kv| (kv.key, kv.value))
|
||||
.collect::<HashMap<_, _>>();
|
||||
keys.into_iter()
|
||||
.map(|key| {
|
||||
if let Some(value) = kvs.get(&key) {
|
||||
Ok(Some(TableRouteValue::try_from_raw_value(value)?))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::*;
|
||||
use crate::kv_backend::memory::MemoryKvBackend;
|
||||
use crate::kv_backend::TxnService;
|
||||
|
||||
#[test]
|
||||
fn test_table_route_compatibility() {
|
||||
@@ -486,4 +587,81 @@ mod tests {
|
||||
r#"Physical(PhysicalTableRouteValue { region_routes: [RegionRoute { region: Region { id: 1(0, 1), name: "r1", partition: None, attrs: {} }, leader_peer: Some(Peer { id: 2, addr: "a2" }), follower_peers: [], leader_status: None, leader_down_since: None }, RegionRoute { region: Region { id: 1(0, 1), name: "r1", partition: None, attrs: {} }, leader_peer: Some(Peer { id: 2, addr: "a2" }), follower_peers: [], leader_status: None, leader_down_since: None }], version: 0 })"#
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_table_route_storage_get_raw_empty() {
|
||||
let kv = Arc::new(MemoryKvBackend::default());
|
||||
let table_route_storage = TableRouteStorage::new(kv);
|
||||
let table_route = table_route_storage.get_raw(1024).await.unwrap();
|
||||
assert!(table_route.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_table_route_storage_get_raw() {
|
||||
let kv = Arc::new(MemoryKvBackend::default());
|
||||
let table_route_storage = TableRouteStorage::new(kv.clone());
|
||||
let table_route = table_route_storage.get_raw(1024).await.unwrap();
|
||||
assert!(table_route.is_none());
|
||||
let table_route_manager = TableRouteManager::new(kv.clone());
|
||||
let table_route_value = TableRouteValue::Logical(LogicalTableRouteValue {
|
||||
physical_table_id: 1023,
|
||||
region_ids: vec![RegionId::new(1023, 1)],
|
||||
});
|
||||
let (txn, _) = table_route_manager
|
||||
.table_route_storage()
|
||||
.build_create_txn(1024, &table_route_value)
|
||||
.unwrap();
|
||||
let r = kv.txn(txn).await.unwrap();
|
||||
assert!(r.succeeded);
|
||||
let table_route = table_route_storage.get_raw(1024).await.unwrap();
|
||||
assert!(table_route.is_some());
|
||||
let got = table_route.unwrap().inner;
|
||||
assert_eq!(got, table_route_value);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_table_route_batch_get() {
|
||||
let kv = Arc::new(MemoryKvBackend::default());
|
||||
let table_route_storage = TableRouteStorage::new(kv.clone());
|
||||
let routes = table_route_storage
|
||||
.batch_get(&[1023, 1024, 1025])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(routes.iter().all(Option::is_none));
|
||||
let table_route_manager = TableRouteManager::new(kv.clone());
|
||||
let routes = [
|
||||
(
|
||||
1024,
|
||||
TableRouteValue::Logical(LogicalTableRouteValue {
|
||||
physical_table_id: 1023,
|
||||
region_ids: vec![RegionId::new(1023, 1)],
|
||||
}),
|
||||
),
|
||||
(
|
||||
1025,
|
||||
TableRouteValue::Logical(LogicalTableRouteValue {
|
||||
physical_table_id: 1023,
|
||||
region_ids: vec![RegionId::new(1023, 2)],
|
||||
}),
|
||||
),
|
||||
];
|
||||
for (table_id, route) in &routes {
|
||||
let (txn, _) = table_route_manager
|
||||
.table_route_storage()
|
||||
.build_create_txn(*table_id, route)
|
||||
.unwrap();
|
||||
let r = kv.txn(txn).await.unwrap();
|
||||
assert!(r.succeeded);
|
||||
}
|
||||
|
||||
let results = table_route_storage
|
||||
.batch_get(&[9999, 1025, 8888, 1024])
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(results[0].is_none());
|
||||
assert_eq!(results[1].as_ref().unwrap(), &routes[1].1);
|
||||
assert!(results[2].is_none());
|
||||
assert_eq!(results[3].as_ref().unwrap(), &routes[0].1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,6 +36,8 @@ pub mod rpc;
|
||||
pub mod sequence;
|
||||
pub mod state_store;
|
||||
pub mod table_name;
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub mod test_util;
|
||||
pub mod util;
|
||||
pub mod wal_options_allocator;
|
||||
|
||||
|
||||
@@ -363,6 +363,11 @@ impl CreateTableTask {
|
||||
table: &table.table_name,
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the `table_info`'s table_id.
|
||||
pub fn set_table_id(&mut self, table_id: TableId) {
|
||||
self.table_info.ident.table_id = table_id;
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for CreateTableTask {
|
||||
|
||||
@@ -12,6 +12,9 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
pub use api::v1::meta::{MigrateRegionResponse, ProcedureStateResponse};
|
||||
use api::v1::meta::{
|
||||
ProcedureId as PbProcedureId, ProcedureStateResponse as PbProcedureStateResponse,
|
||||
ProcedureStatus as PbProcedureStatus,
|
||||
@@ -21,6 +24,15 @@ use snafu::ResultExt;
|
||||
|
||||
use crate::error::{ParseProcedureIdSnafu, Result};
|
||||
|
||||
/// A request to migrate region.
|
||||
#[derive(Clone)]
|
||||
pub struct MigrateRegionRequest {
|
||||
pub region_id: u64,
|
||||
pub from_peer: u64,
|
||||
pub to_peer: u64,
|
||||
pub replay_timeout: Duration,
|
||||
}
|
||||
|
||||
/// Cast the protobuf [`ProcedureId`] to common [`ProcedureId`].
|
||||
pub fn pb_pid_to_pid(pid: &PbProcedureId) -> Result<ProcedureId> {
|
||||
ProcedureId::parse_str(&String::from_utf8_lossy(&pid.key)).with_context(|_| {
|
||||
|
||||
104
src/common/meta/src/test_util.rs
Normal file
104
src/common/meta/src/test_util.rs
Normal file
@@ -0,0 +1,104 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::region::{QueryRequest, RegionRequest};
|
||||
pub use common_base::AffectedRows;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
|
||||
use crate::cache_invalidator::DummyCacheInvalidator;
|
||||
use crate::datanode_manager::{Datanode, DatanodeManager, DatanodeManagerRef, DatanodeRef};
|
||||
use crate::ddl::table_meta::TableMetadataAllocator;
|
||||
use crate::ddl::DdlContext;
|
||||
use crate::error::Result;
|
||||
use crate::key::TableMetadataManager;
|
||||
use crate::kv_backend::memory::MemoryKvBackend;
|
||||
use crate::peer::Peer;
|
||||
use crate::region_keeper::MemoryRegionKeeper;
|
||||
use crate::sequence::SequenceBuilder;
|
||||
use crate::wal_options_allocator::WalOptionsAllocator;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait MockDatanodeHandler: Sync + Send + Clone {
|
||||
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<AffectedRows>;
|
||||
|
||||
async fn handle_query(
|
||||
&self,
|
||||
peer: &Peer,
|
||||
request: QueryRequest,
|
||||
) -> Result<SendableRecordBatchStream>;
|
||||
}
|
||||
|
||||
/// A mock struct implements [DatanodeManager].
|
||||
#[derive(Clone)]
|
||||
pub struct MockDatanodeManager<T> {
|
||||
handler: T,
|
||||
}
|
||||
|
||||
impl<T> MockDatanodeManager<T> {
|
||||
pub fn new(handler: T) -> Self {
|
||||
Self { handler }
|
||||
}
|
||||
}
|
||||
|
||||
/// A mock struct implements [Datanode].
|
||||
#[derive(Clone)]
|
||||
struct MockDatanode<T> {
|
||||
peer: Peer,
|
||||
handler: T,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<T: MockDatanodeHandler> Datanode for MockDatanode<T> {
|
||||
async fn handle(&self, request: RegionRequest) -> Result<AffectedRows> {
|
||||
self.handler.handle(&self.peer, request).await
|
||||
}
|
||||
|
||||
async fn handle_query(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
|
||||
self.handler.handle_query(&self.peer, request).await
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<T: MockDatanodeHandler + 'static> DatanodeManager for MockDatanodeManager<T> {
|
||||
async fn datanode(&self, peer: &Peer) -> DatanodeRef {
|
||||
Arc::new(MockDatanode {
|
||||
peer: peer.clone(),
|
||||
handler: self.handler.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a test purpose [DdlContext].
|
||||
pub fn new_ddl_context(datanode_manager: DatanodeManagerRef) -> DdlContext {
|
||||
let kv_backend = Arc::new(MemoryKvBackend::new());
|
||||
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
|
||||
|
||||
DdlContext {
|
||||
datanode_manager,
|
||||
cache_invalidator: Arc::new(DummyCacheInvalidator),
|
||||
memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
|
||||
table_metadata_allocator: Arc::new(TableMetadataAllocator::new(
|
||||
Arc::new(
|
||||
SequenceBuilder::new("test", kv_backend)
|
||||
.initial(1024)
|
||||
.build(),
|
||||
),
|
||||
Arc::new(WalOptionsAllocator::default()),
|
||||
table_metadata_manager.table_name_manager().clone(),
|
||||
)),
|
||||
table_metadata_manager,
|
||||
}
|
||||
}
|
||||
@@ -30,7 +30,7 @@ use snafu::{ensure, ResultExt};
|
||||
use crate::error::{
|
||||
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, BuildKafkaPartitionClientSnafu,
|
||||
CreateKafkaWalTopicSnafu, DecodeJsonSnafu, EncodeJsonSnafu, InvalidNumTopicsSnafu,
|
||||
ProduceRecordSnafu, Result,
|
||||
ProduceRecordSnafu, ResolveKafkaEndpointSnafu, Result,
|
||||
};
|
||||
use crate::kv_backend::KvBackendRef;
|
||||
use crate::rpc::store::PutRequest;
|
||||
@@ -117,7 +117,10 @@ impl TopicManager {
|
||||
base: self.config.backoff.base as f64,
|
||||
deadline: self.config.backoff.deadline,
|
||||
};
|
||||
let client = ClientBuilder::new(self.config.broker_endpoints.clone())
|
||||
let broker_endpoints = common_wal::resolve_to_ipv4(&self.config.broker_endpoints)
|
||||
.await
|
||||
.context(ResolveKafkaEndpointSnafu)?;
|
||||
let client = ClientBuilder::new(broker_endpoints)
|
||||
.backoff_config(backoff_config)
|
||||
.build()
|
||||
.await
|
||||
|
||||
10
src/common/plugins/Cargo.toml
Normal file
10
src/common/plugins/Cargo.toml
Normal file
@@ -0,0 +1,10 @@
|
||||
[package]
|
||||
name = "common-plugins"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user