mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 13:52:59 +00:00
Compare commits
39 Commits
feat/merge
...
v0.7.0-nig
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e481f073f5 | ||
|
|
606309f49a | ||
|
|
8059b95e37 | ||
|
|
afe4633320 | ||
|
|
abbfd23d4b | ||
|
|
1df64f294b | ||
|
|
a6564e72b4 | ||
|
|
1f1d1b4f57 | ||
|
|
b144836935 | ||
|
|
93d9f48dd7 | ||
|
|
90e9b69035 | ||
|
|
2035e7bf4c | ||
|
|
7341f23019 | ||
|
|
41ee0cdd5a | ||
|
|
578dd8f87a | ||
|
|
1dc4fec662 | ||
|
|
f26505b625 | ||
|
|
8289b0dec2 | ||
|
|
53105b99e7 | ||
|
|
564fe3beca | ||
|
|
e9a2b0a9ee | ||
|
|
860b1e9d9e | ||
|
|
7c88d721c2 | ||
|
|
90169c868d | ||
|
|
4c07606da6 | ||
|
|
a7bf458a37 | ||
|
|
fa08085119 | ||
|
|
86a98c80f5 | ||
|
|
085a380019 | ||
|
|
d9a96344ee | ||
|
|
41656c8635 | ||
|
|
cf08a3de6b | ||
|
|
f087a843bb | ||
|
|
450dfe324d | ||
|
|
3dfe4a2e5a | ||
|
|
eded08897d | ||
|
|
b1f54d8a03 | ||
|
|
bf5e1905cd | ||
|
|
6628c41c36 |
@@ -3,13 +3,3 @@ linker = "aarch64-linux-gnu-gcc"
|
||||
|
||||
[alias]
|
||||
sqlness = "run --bin sqlness-runner --"
|
||||
|
||||
|
||||
[build]
|
||||
rustflags = [
|
||||
# lints
|
||||
# TODO: use lint configuration in cargo https://github.com/rust-lang/cargo/issues/5034
|
||||
"-Wclippy::print_stdout",
|
||||
"-Wclippy::print_stderr",
|
||||
"-Wclippy::implicit_clone",
|
||||
]
|
||||
|
||||
@@ -34,7 +34,7 @@ runs:
|
||||
|
||||
- name: Upload sqlness logs
|
||||
if: ${{ failure() && inputs.disable-run-tests == 'false' }} # Only upload logs when the integration tests failed.
|
||||
uses: actions/upload-artifact@v3
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: sqlness-logs
|
||||
path: /tmp/greptime-*.log
|
||||
|
||||
@@ -67,7 +67,7 @@ runs:
|
||||
|
||||
- name: Upload sqlness logs
|
||||
if: ${{ failure() }} # Only upload logs when the integration tests failed.
|
||||
uses: actions/upload-artifact@v3
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: sqlness-logs
|
||||
path: /tmp/greptime-*.log
|
||||
|
||||
@@ -62,10 +62,10 @@ runs:
|
||||
|
||||
- name: Upload sqlness logs
|
||||
if: ${{ failure() }} # Only upload logs when the integration tests failed.
|
||||
uses: actions/upload-artifact@v3
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: sqlness-logs
|
||||
path: ${{ runner.temp }}/greptime-*.log
|
||||
path: /tmp/greptime-*.log
|
||||
retention-days: 3
|
||||
|
||||
- name: Build greptime binary
|
||||
|
||||
42
.github/workflows/develop.yml
vendored
42
.github/workflows/develop.yml
vendored
@@ -1,7 +1,7 @@
|
||||
on:
|
||||
merge_group:
|
||||
pull_request:
|
||||
types: [opened, synchronize, reopened, ready_for_review]
|
||||
types: [ opened, synchronize, reopened, ready_for_review ]
|
||||
paths-ignore:
|
||||
- 'docs/**'
|
||||
- 'config/**'
|
||||
@@ -57,7 +57,7 @@ jobs:
|
||||
toolchain: ${{ env.RUST_TOOLCHAIN }}
|
||||
- name: Rust Cache
|
||||
uses: Swatinem/rust-cache@v2
|
||||
with:
|
||||
with:
|
||||
# Shares across multiple jobs
|
||||
# Shares with `Clippy` job
|
||||
shared-key: "check-lint"
|
||||
@@ -75,7 +75,7 @@ jobs:
|
||||
toolchain: stable
|
||||
- name: Rust Cache
|
||||
uses: Swatinem/rust-cache@v2
|
||||
with:
|
||||
with:
|
||||
# Shares across multiple jobs
|
||||
shared-key: "check-toml"
|
||||
- name: Install taplo
|
||||
@@ -136,13 +136,12 @@ jobs:
|
||||
run: tar -xvf ./bins.tar.gz
|
||||
- name: Run sqlness
|
||||
run: RUST_BACKTRACE=1 ./bins/sqlness-runner -c ./tests/cases --bins-dir ./bins
|
||||
# FIXME: Logs cannot found be on failure (or even success). Need to figure out the cause.
|
||||
- name: Upload sqlness logs
|
||||
if: always()
|
||||
uses: actions/upload-artifact@v3
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: sqlness-logs
|
||||
path: ${{ runner.temp }}/greptime-*.log
|
||||
path: /tmp/greptime-*.log
|
||||
retention-days: 3
|
||||
|
||||
sqlness-kafka-wal:
|
||||
@@ -167,13 +166,12 @@ jobs:
|
||||
run: docker compose -f docker-compose-standalone.yml up -d --wait
|
||||
- name: Run sqlness
|
||||
run: RUST_BACKTRACE=1 ./bins/sqlness-runner -w kafka -k 127.0.0.1:9092 -c ./tests/cases --bins-dir ./bins
|
||||
# FIXME: Logs cannot be found on failure (or even success). Need to figure out the cause.
|
||||
- name: Upload sqlness logs
|
||||
if: always()
|
||||
uses: actions/upload-artifact@v3
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: sqlness-logs
|
||||
path: ${{ runner.temp }}/greptime-*.log
|
||||
name: sqlness-logs-with-kafka-wal
|
||||
path: /tmp/greptime-*.log
|
||||
retention-days: 3
|
||||
|
||||
fmt:
|
||||
@@ -191,7 +189,7 @@ jobs:
|
||||
components: rustfmt
|
||||
- name: Rust Cache
|
||||
uses: Swatinem/rust-cache@v2
|
||||
with:
|
||||
with:
|
||||
# Shares across multiple jobs
|
||||
shared-key: "check-rust-fmt"
|
||||
- name: Run cargo fmt
|
||||
@@ -212,7 +210,7 @@ jobs:
|
||||
components: clippy
|
||||
- name: Rust Cache
|
||||
uses: Swatinem/rust-cache@v2
|
||||
with:
|
||||
with:
|
||||
# Shares across multiple jobs
|
||||
# Shares with `Check` job
|
||||
shared-key: "check-lint"
|
||||
@@ -271,10 +269,28 @@ jobs:
|
||||
GT_KAFKA_ENDPOINTS: 127.0.0.1:9092
|
||||
UNITTEST_LOG_DIR: "__unittest_logs"
|
||||
- name: Codecov upload
|
||||
uses: codecov/codecov-action@v2
|
||||
uses: codecov/codecov-action@v4
|
||||
with:
|
||||
token: ${{ secrets.CODECOV_TOKEN }}
|
||||
files: ./lcov.info
|
||||
flags: rust
|
||||
fail_ci_if_error: false
|
||||
verbose: true
|
||||
|
||||
compat:
|
||||
name: Compatibility Test
|
||||
needs: build
|
||||
runs-on: ubuntu-20.04
|
||||
timeout-minutes: 60
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- name: Download pre-built binaries
|
||||
uses: actions/download-artifact@v4
|
||||
with:
|
||||
name: bins
|
||||
path: .
|
||||
- name: Unzip binaries
|
||||
run: |
|
||||
mkdir -p ./bins/current
|
||||
tar -xvf ./bins.tar.gz --strip-components=1 -C ./bins/current
|
||||
- run: ./tests/compat/test-compat.sh 0.6.0
|
||||
|
||||
14
.github/workflows/docs.yml
vendored
14
.github/workflows/docs.yml
vendored
@@ -61,6 +61,18 @@ jobs:
|
||||
|
||||
sqlness:
|
||||
name: Sqlness Test
|
||||
runs-on: ubuntu-20.04
|
||||
runs-on: ${{ matrix.os }}
|
||||
strategy:
|
||||
matrix:
|
||||
os: [ ubuntu-20.04 ]
|
||||
steps:
|
||||
- run: 'echo "No action required"'
|
||||
|
||||
sqlness-kafka-wal:
|
||||
name: Sqlness Test with Kafka Wal
|
||||
runs-on: ${{ matrix.os }}
|
||||
strategy:
|
||||
matrix:
|
||||
os: [ ubuntu-20.04 ]
|
||||
steps:
|
||||
- run: 'echo "No action required"'
|
||||
|
||||
4
.github/workflows/nightly-ci.yml
vendored
4
.github/workflows/nightly-ci.yml
vendored
@@ -45,10 +45,10 @@ jobs:
|
||||
{"text": "Nightly CI failed for sqlness tests"}
|
||||
- name: Upload sqlness logs
|
||||
if: always()
|
||||
uses: actions/upload-artifact@v3
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: sqlness-logs
|
||||
path: ${{ runner.temp }}/greptime-*.log
|
||||
path: /tmp/greptime-*.log
|
||||
retention-days: 3
|
||||
|
||||
test-on-windows:
|
||||
|
||||
127
Cargo.lock
generated
127
Cargo.lock
generated
@@ -785,23 +785,6 @@ dependencies = [
|
||||
"syn 2.0.43",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "axum-test-helper"
|
||||
version = "0.1.1"
|
||||
source = "git+https://github.com/sunng87/axum-test-helper.git?branch=patch-1#5aa7843ce2250144ea1b7f589f274c00cf1af4ab"
|
||||
dependencies = [
|
||||
"axum",
|
||||
"bytes",
|
||||
"http",
|
||||
"http-body",
|
||||
"hyper",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"tokio",
|
||||
"tower",
|
||||
"tower-service",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "axum-test-helper"
|
||||
version = "0.3.0"
|
||||
@@ -1557,6 +1540,8 @@ dependencies = [
|
||||
"prometheus",
|
||||
"prost 0.12.3",
|
||||
"rand",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"session",
|
||||
"snafu",
|
||||
"substrait 0.17.1",
|
||||
@@ -1783,8 +1768,10 @@ dependencies = [
|
||||
"arc-swap",
|
||||
"async-trait",
|
||||
"chrono-tz 0.6.3",
|
||||
"common-catalog",
|
||||
"common-error",
|
||||
"common-macro",
|
||||
"common-meta",
|
||||
"common-query",
|
||||
"common-runtime",
|
||||
"common-telemetry",
|
||||
@@ -1799,6 +1786,7 @@ dependencies = [
|
||||
"paste",
|
||||
"ron",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"session",
|
||||
"snafu",
|
||||
"statrs",
|
||||
@@ -1917,6 +1905,7 @@ dependencies = [
|
||||
"common-grpc-expr",
|
||||
"common-macro",
|
||||
"common-procedure",
|
||||
"common-procedure-test",
|
||||
"common-recordbatch",
|
||||
"common-runtime",
|
||||
"common-telemetry",
|
||||
@@ -1949,6 +1938,10 @@ dependencies = [
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "common-plugins"
|
||||
version = "0.6.0"
|
||||
|
||||
[[package]]
|
||||
name = "common-procedure"
|
||||
version = "0.6.0"
|
||||
@@ -2048,6 +2041,7 @@ dependencies = [
|
||||
name = "common-telemetry"
|
||||
version = "0.6.0"
|
||||
dependencies = [
|
||||
"atty",
|
||||
"backtrace",
|
||||
"common-error",
|
||||
"console-subscriber",
|
||||
@@ -3415,6 +3409,7 @@ dependencies = [
|
||||
"datatypes",
|
||||
"hydroflow",
|
||||
"itertools 0.10.5",
|
||||
"num-traits",
|
||||
"serde",
|
||||
"servers",
|
||||
"session",
|
||||
@@ -5178,7 +5173,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "meter-core"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=abbd357c1e193cd270ea65ee7652334a150b628f#abbd357c1e193cd270ea65ee7652334a150b628f"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=80b72716dcde47ec4161478416a5c6c21343364d#80b72716dcde47ec4161478416a5c6c21343364d"
|
||||
dependencies = [
|
||||
"anymap",
|
||||
"once_cell",
|
||||
@@ -5188,7 +5183,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "meter-macros"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=abbd357c1e193cd270ea65ee7652334a150b628f#abbd357c1e193cd270ea65ee7652334a150b628f"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=80b72716dcde47ec4161478416a5c6c21343364d#80b72716dcde47ec4161478416a5c6c21343364d"
|
||||
dependencies = [
|
||||
"meter-core",
|
||||
]
|
||||
@@ -5210,6 +5205,7 @@ dependencies = [
|
||||
"common-time",
|
||||
"datafusion",
|
||||
"datatypes",
|
||||
"itertools 0.10.5",
|
||||
"lazy_static",
|
||||
"mito2",
|
||||
"mur3",
|
||||
@@ -5313,6 +5309,7 @@ dependencies = [
|
||||
"prometheus",
|
||||
"prost 0.12.3",
|
||||
"puffin",
|
||||
"rand",
|
||||
"regex",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -5418,6 +5415,24 @@ dependencies = [
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mysql-common-derive"
|
||||
version = "0.31.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c60492b5eb751e55b42d716b6b26dceb66767996cd7a5560a842fbf613ca2e92"
|
||||
dependencies = [
|
||||
"darling 0.20.3",
|
||||
"heck",
|
||||
"num-bigint",
|
||||
"proc-macro-crate 3.1.0",
|
||||
"proc-macro-error",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.43",
|
||||
"termcolor",
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mysql_async"
|
||||
version = "0.33.0"
|
||||
@@ -5434,7 +5449,7 @@ dependencies = [
|
||||
"lazy_static",
|
||||
"lru",
|
||||
"mio",
|
||||
"mysql_common",
|
||||
"mysql_common 0.31.0",
|
||||
"once_cell",
|
||||
"pem",
|
||||
"percent-encoding",
|
||||
@@ -5470,13 +5485,12 @@ dependencies = [
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"cc",
|
||||
"chrono",
|
||||
"cmake",
|
||||
"crc32fast",
|
||||
"flate2",
|
||||
"frunk",
|
||||
"lazy_static",
|
||||
"mysql-common-derive",
|
||||
"mysql-common-derive 0.30.2",
|
||||
"num-bigint",
|
||||
"num-traits",
|
||||
"rand",
|
||||
@@ -5495,6 +5509,46 @@ dependencies = [
|
||||
"zstd 0.12.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mysql_common"
|
||||
version = "0.32.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b73aacd01475af6d2efbdf489efd60fc519515ffe94edfd74236f954d521e31b"
|
||||
dependencies = [
|
||||
"base64 0.21.5",
|
||||
"bigdecimal",
|
||||
"bindgen",
|
||||
"bitflags 2.4.1",
|
||||
"bitvec",
|
||||
"btoi",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"cc",
|
||||
"chrono",
|
||||
"cmake",
|
||||
"crc32fast",
|
||||
"flate2",
|
||||
"frunk",
|
||||
"lazy_static",
|
||||
"mysql-common-derive 0.31.0",
|
||||
"num-bigint",
|
||||
"num-traits",
|
||||
"rand",
|
||||
"regex",
|
||||
"rust_decimal",
|
||||
"saturating",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sha1",
|
||||
"sha2",
|
||||
"smallvec",
|
||||
"subprocess",
|
||||
"thiserror",
|
||||
"time",
|
||||
"uuid",
|
||||
"zstd 0.13.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nalgebra"
|
||||
version = "0.29.0"
|
||||
@@ -5890,13 +5944,14 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "opensrv-mysql"
|
||||
version = "0.6.0"
|
||||
source = "git+https://github.com/MichaelScofield/opensrv.git?rev=1676c1d#1676c1d166cf33e7745e1b5db54b3fe2b7defec9"
|
||||
version = "0.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4148ab944991b0a33be74d2636a815268974578812a9e4cf7dc785325e858154"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"byteorder",
|
||||
"chrono",
|
||||
"mysql_common",
|
||||
"mysql_common 0.32.0",
|
||||
"nom",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
@@ -6069,6 +6124,7 @@ dependencies = [
|
||||
"meter-macros",
|
||||
"object-store",
|
||||
"partition",
|
||||
"path-slash",
|
||||
"prometheus",
|
||||
"query",
|
||||
"regex",
|
||||
@@ -6343,6 +6399,12 @@ version = "1.0.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c"
|
||||
|
||||
[[package]]
|
||||
name = "path-slash"
|
||||
version = "0.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1e91099d4268b0e11973f036e885d652fb0b21fedcf69738c627f94db6a44f42"
|
||||
|
||||
[[package]]
|
||||
name = "pathdiff"
|
||||
version = "0.2.1"
|
||||
@@ -6811,6 +6873,15 @@ dependencies = [
|
||||
"toml_edit 0.20.7",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro-crate"
|
||||
version = "3.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6d37c51ca738a55da99dc0c4a34860fd675453b8b36209178c2249bb13651284"
|
||||
dependencies = [
|
||||
"toml_edit 0.21.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro-error"
|
||||
version = "1.0.4"
|
||||
@@ -7241,6 +7312,7 @@ dependencies = [
|
||||
"common-function",
|
||||
"common-macro",
|
||||
"common-meta",
|
||||
"common-plugins",
|
||||
"common-query",
|
||||
"common-recordbatch",
|
||||
"common-telemetry",
|
||||
@@ -8883,7 +8955,7 @@ dependencies = [
|
||||
"auth",
|
||||
"axum",
|
||||
"axum-macros",
|
||||
"axum-test-helper 0.3.0",
|
||||
"axum-test-helper",
|
||||
"base64 0.21.5",
|
||||
"bytes",
|
||||
"catalog",
|
||||
@@ -8897,6 +8969,7 @@ dependencies = [
|
||||
"common-macro",
|
||||
"common-mem-prof",
|
||||
"common-meta",
|
||||
"common-plugins",
|
||||
"common-query",
|
||||
"common-recordbatch",
|
||||
"common-runtime",
|
||||
@@ -9955,7 +10028,7 @@ dependencies = [
|
||||
"async-trait",
|
||||
"auth",
|
||||
"axum",
|
||||
"axum-test-helper 0.1.1",
|
||||
"axum-test-helper",
|
||||
"catalog",
|
||||
"chrono",
|
||||
"client",
|
||||
|
||||
12
Cargo.toml
12
Cargo.toml
@@ -18,6 +18,7 @@ members = [
|
||||
"src/common/grpc-expr",
|
||||
"src/common/mem-prof",
|
||||
"src/common/meta",
|
||||
"src/common/plugins",
|
||||
"src/common/procedure",
|
||||
"src/common/procedure-test",
|
||||
"src/common/query",
|
||||
@@ -65,6 +66,12 @@ version = "0.6.0"
|
||||
edition = "2021"
|
||||
license = "Apache-2.0"
|
||||
|
||||
[workspace.lints]
|
||||
clippy.print_stdout = "warn"
|
||||
clippy.print_stderr = "warn"
|
||||
clippy.implicit_clone = "warn"
|
||||
rust.unknown_lints = "deny"
|
||||
|
||||
[workspace.dependencies]
|
||||
ahash = { version = "0.8", features = ["compile-time-rng"] }
|
||||
aquamarine = "0.3"
|
||||
@@ -100,7 +107,7 @@ greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", r
|
||||
humantime-serde = "1.1"
|
||||
itertools = "0.10"
|
||||
lazy_static = "1.4"
|
||||
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "abbd357c1e193cd270ea65ee7652334a150b628f" }
|
||||
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "80b72716dcde47ec4161478416a5c6c21343364d" }
|
||||
mockall = "0.11.4"
|
||||
moka = "0.12"
|
||||
num_cpus = "1.16"
|
||||
@@ -164,6 +171,7 @@ common-grpc-expr = { path = "src/common/grpc-expr" }
|
||||
common-macro = { path = "src/common/macro" }
|
||||
common-mem-prof = { path = "src/common/mem-prof" }
|
||||
common-meta = { path = "src/common/meta" }
|
||||
common-plugins = { path = "src/common/plugins" }
|
||||
common-procedure = { path = "src/common/procedure" }
|
||||
common-procedure-test = { path = "src/common/procedure-test" }
|
||||
common-query = { path = "src/common/query" }
|
||||
@@ -201,7 +209,7 @@ table = { path = "src/table" }
|
||||
|
||||
[workspace.dependencies.meter-macros]
|
||||
git = "https://github.com/GreptimeTeam/greptime-meter.git"
|
||||
rev = "abbd357c1e193cd270ea65ee7652334a150b628f"
|
||||
rev = "80b72716dcde47ec4161478416a5c6c21343364d"
|
||||
|
||||
[profile.release]
|
||||
debug = 1
|
||||
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
arrow.workspace = true
|
||||
chrono.workspace = true
|
||||
|
||||
@@ -504,7 +504,7 @@ async fn do_query(num_iter: usize, db: &Database, table_name: &str) {
|
||||
let res = db.sql(&query).await.unwrap();
|
||||
match res {
|
||||
Output::AffectedRows(_) | Output::RecordBatches(_) => (),
|
||||
Output::Stream(stream) => {
|
||||
Output::Stream(stream, _) => {
|
||||
stream.try_collect::<Vec<_>>().await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,13 +13,20 @@ RELEASE_VERSION="$(cat $STATIC_DIR/VERSION | tr -d '\t\r\n ')"
|
||||
|
||||
echo "Downloading assets to dir: $OUT_DIR"
|
||||
cd $OUT_DIR
|
||||
|
||||
if [[ -z "$GITHUB_PROXY_URL" ]]; then
|
||||
GITHUB_URL="https://github.com"
|
||||
else
|
||||
GITHUB_URL="${GITHUB_PROXY_URL%/}"
|
||||
fi
|
||||
|
||||
# Download the SHA256 checksum attached to the release. To verify the integrity
|
||||
# of the download, this checksum will be used to check the download tar file
|
||||
# containing the built dashboard assets.
|
||||
curl -Ls https://github.com/GreptimeTeam/dashboard/releases/download/$RELEASE_VERSION/sha256.txt --output sha256.txt
|
||||
curl -Ls ${GITHUB_URL}/GreptimeTeam/dashboard/releases/download/$RELEASE_VERSION/sha256.txt --output sha256.txt
|
||||
|
||||
# Download the tar file containing the built dashboard assets.
|
||||
curl -L https://github.com/GreptimeTeam/dashboard/releases/download/$RELEASE_VERSION/build.tar.gz --output build.tar.gz
|
||||
curl -L ${GITHUB_URL}/GreptimeTeam/dashboard/releases/download/$RELEASE_VERSION/build.tar.gz --output build.tar.gz
|
||||
|
||||
# Verify the checksums match; exit if they don't.
|
||||
case "$(uname -s)" in
|
||||
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
common-base.workspace = true
|
||||
common-decimal.workspace = true
|
||||
|
||||
@@ -8,6 +8,9 @@ license.workspace = true
|
||||
default = []
|
||||
testing = []
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
api.workspace = true
|
||||
async-trait.workspace = true
|
||||
|
||||
@@ -7,6 +7,9 @@ license.workspace = true
|
||||
[features]
|
||||
testing = []
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
api.workspace = true
|
||||
arc-swap = "1.0"
|
||||
|
||||
@@ -164,11 +164,8 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to find table partitions: #{table}"))]
|
||||
FindPartitions {
|
||||
source: partition::error::Error,
|
||||
table: String,
|
||||
},
|
||||
#[snafu(display("Failed to find table partitions"))]
|
||||
FindPartitions { source: partition::error::Error },
|
||||
|
||||
#[snafu(display("Failed to find region routes"))]
|
||||
FindRegionRoutes { source: partition::error::Error },
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use core::pin::pin;
|
||||
use std::sync::{Arc, Weak};
|
||||
|
||||
use arrow_schema::SchemaRef as ArrowSchemaRef;
|
||||
@@ -31,7 +32,7 @@ use datatypes::vectors::{
|
||||
ConstantVector, DateTimeVector, DateTimeVectorBuilder, Int64Vector, Int64VectorBuilder,
|
||||
MutableVector, StringVector, StringVectorBuilder, UInt64VectorBuilder,
|
||||
};
|
||||
use futures::TryStreamExt;
|
||||
use futures::{StreamExt, TryStreamExt};
|
||||
use partition::manager::PartitionInfo;
|
||||
use partition::partition::PartitionDef;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
@@ -240,40 +241,64 @@ impl InformationSchemaPartitionsBuilder {
|
||||
let predicates = Predicates::from_scan_request(&request);
|
||||
|
||||
for schema_name in catalog_manager.schema_names(&catalog_name).await? {
|
||||
let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await;
|
||||
let table_info_stream = catalog_manager
|
||||
.tables(&catalog_name, &schema_name)
|
||||
.await
|
||||
.try_filter_map(|t| async move {
|
||||
let table_info = t.table_info();
|
||||
if table_info.table_type == TableType::Temporary {
|
||||
Ok(None)
|
||||
} else {
|
||||
Ok(Some(table_info))
|
||||
}
|
||||
});
|
||||
|
||||
while let Some(table) = stream.try_next().await? {
|
||||
let table_info = table.table_info();
|
||||
const BATCH_SIZE: usize = 128;
|
||||
|
||||
if table_info.table_type == TableType::Temporary {
|
||||
continue;
|
||||
}
|
||||
// Split table infos into chunks
|
||||
let mut table_info_chunks = pin!(table_info_stream.ready_chunks(BATCH_SIZE));
|
||||
|
||||
let table_id = table_info.ident.table_id;
|
||||
let partitions = if let Some(partition_manager) = &partition_manager {
|
||||
while let Some(table_infos) = table_info_chunks.next().await {
|
||||
let table_infos = table_infos.into_iter().collect::<Result<Vec<_>>>()?;
|
||||
let table_ids: Vec<TableId> =
|
||||
table_infos.iter().map(|info| info.ident.table_id).collect();
|
||||
|
||||
let mut table_partitions = if let Some(partition_manager) = &partition_manager {
|
||||
partition_manager
|
||||
.find_table_partitions(table_id)
|
||||
.batch_find_table_partitions(&table_ids)
|
||||
.await
|
||||
.context(FindPartitionsSnafu {
|
||||
table: &table_info.name,
|
||||
})?
|
||||
.context(FindPartitionsSnafu)?
|
||||
} else {
|
||||
// Current node must be a standalone instance, contains only one partition by default.
|
||||
// TODO(dennis): change it when we support multi-regions for standalone.
|
||||
vec![PartitionInfo {
|
||||
id: RegionId::new(table_id, 0),
|
||||
partition: PartitionDef::new(vec![], vec![]),
|
||||
}]
|
||||
table_ids
|
||||
.into_iter()
|
||||
.map(|table_id| {
|
||||
(
|
||||
table_id,
|
||||
vec![PartitionInfo {
|
||||
id: RegionId::new(table_id, 0),
|
||||
partition: PartitionDef::new(vec![], vec![]),
|
||||
}],
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
};
|
||||
|
||||
self.add_partitions(
|
||||
&predicates,
|
||||
&table_info,
|
||||
&catalog_name,
|
||||
&schema_name,
|
||||
&table_info.name,
|
||||
&partitions,
|
||||
);
|
||||
for table_info in table_infos {
|
||||
let partitions = table_partitions
|
||||
.remove(&table_info.ident.table_id)
|
||||
.unwrap_or(vec![]);
|
||||
|
||||
self.add_partitions(
|
||||
&predicates,
|
||||
&table_info,
|
||||
&catalog_name,
|
||||
&schema_name,
|
||||
&table_info.name,
|
||||
&partitions,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -199,7 +199,7 @@ impl InformationSchemaRegionPeersBuilder {
|
||||
|
||||
let table_routes = if let Some(partition_manager) = &partition_manager {
|
||||
partition_manager
|
||||
.find_region_routes_batch(&table_ids)
|
||||
.batch_find_region_routes(&table_ids)
|
||||
.await
|
||||
.context(FindRegionRoutesSnafu)?
|
||||
} else {
|
||||
|
||||
@@ -7,6 +7,9 @@ license.workspace = true
|
||||
[features]
|
||||
testing = []
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
api.workspace = true
|
||||
arc-swap = "1.6"
|
||||
@@ -34,6 +37,8 @@ parking_lot = "0.12"
|
||||
prometheus.workspace = true
|
||||
prost.workspace = true
|
||||
rand.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
session.workspace = true
|
||||
snafu.workspace = true
|
||||
tokio.workspace = true
|
||||
|
||||
@@ -340,7 +340,7 @@ impl Database {
|
||||
output_ordering: None,
|
||||
metrics: Default::default(),
|
||||
};
|
||||
Ok(Output::Stream(Box::pin(record_batch_stream)))
|
||||
Ok(Output::new_stream(Box::pin(record_batch_stream)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -134,10 +134,17 @@ impl From<Status> for Error {
|
||||
|
||||
impl Error {
|
||||
pub fn should_retry(&self) -> bool {
|
||||
!matches!(
|
||||
// TODO(weny): figure out each case of these codes.
|
||||
matches!(
|
||||
self,
|
||||
Self::RegionServer {
|
||||
code: Code::InvalidArgument,
|
||||
code: Code::Cancelled,
|
||||
..
|
||||
} | Self::RegionServer {
|
||||
code: Code::DeadlineExceeded,
|
||||
..
|
||||
} | Self::RegionServer {
|
||||
code: Code::Unavailable,
|
||||
..
|
||||
}
|
||||
)
|
||||
|
||||
@@ -123,8 +123,8 @@ impl RegionRequester {
|
||||
.fail();
|
||||
};
|
||||
|
||||
let metrics_str = Arc::new(ArcSwapOption::from(None));
|
||||
let ref_str = metrics_str.clone();
|
||||
let metrics = Arc::new(ArcSwapOption::from(None));
|
||||
let metrics_ref = metrics.clone();
|
||||
|
||||
let tracing_context = TracingContext::from_current_span();
|
||||
|
||||
@@ -140,7 +140,8 @@ impl RegionRequester {
|
||||
match flight_message {
|
||||
FlightMessage::Recordbatch(record_batch) => yield Ok(record_batch),
|
||||
FlightMessage::Metrics(s) => {
|
||||
ref_str.swap(Some(Arc::new(s)));
|
||||
let m = serde_json::from_str(&s).ok().map(Arc::new);
|
||||
metrics_ref.swap(m);
|
||||
break;
|
||||
}
|
||||
_ => {
|
||||
@@ -159,7 +160,7 @@ impl RegionRequester {
|
||||
schema,
|
||||
stream,
|
||||
output_ordering: None,
|
||||
metrics: metrics_str,
|
||||
metrics,
|
||||
};
|
||||
Ok(Box::pin(record_batch_stream))
|
||||
}
|
||||
|
||||
@@ -12,6 +12,9 @@ path = "src/bin/greptime.rs"
|
||||
[features]
|
||||
tokio-console = ["common-telemetry/tokio-console"]
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
anymap = "1.0.0-beta.2"
|
||||
async-trait.workspace = true
|
||||
|
||||
@@ -142,7 +142,7 @@ impl Export {
|
||||
.with_context(|_| RequestDatabaseSnafu {
|
||||
sql: "show databases".to_string(),
|
||||
})?;
|
||||
let Output::Stream(stream) = result else {
|
||||
let Output::Stream(stream, _) = result else {
|
||||
NotDataFromOutputSnafu.fail()?
|
||||
};
|
||||
let record_batch = collect(stream)
|
||||
@@ -183,7 +183,7 @@ impl Export {
|
||||
.sql(&sql)
|
||||
.await
|
||||
.with_context(|_| RequestDatabaseSnafu { sql })?;
|
||||
let Output::Stream(stream) = result else {
|
||||
let Output::Stream(stream, _) = result else {
|
||||
NotDataFromOutputSnafu.fail()?
|
||||
};
|
||||
let Some(record_batch) = collect(stream)
|
||||
@@ -235,7 +235,7 @@ impl Export {
|
||||
.sql(&sql)
|
||||
.await
|
||||
.with_context(|_| RequestDatabaseSnafu { sql })?;
|
||||
let Output::Stream(stream) = result else {
|
||||
let Output::Stream(stream, _) = result else {
|
||||
NotDataFromOutputSnafu.fail()?
|
||||
};
|
||||
let record_batch = collect(stream)
|
||||
|
||||
@@ -185,7 +185,7 @@ impl Repl {
|
||||
.context(RequestDatabaseSnafu { sql: &sql })?;
|
||||
|
||||
let either = match output {
|
||||
Output::Stream(s) => {
|
||||
Output::Stream(s, _) => {
|
||||
let x = RecordBatches::try_collect(s)
|
||||
.await
|
||||
.context(CollectRecordBatchesSnafu)?;
|
||||
@@ -260,6 +260,7 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
|
||||
catalog_list,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
plugins.clone(),
|
||||
));
|
||||
|
||||
@@ -43,6 +43,10 @@ impl Instance {
|
||||
pub fn datanode_mut(&mut self) -> &mut Datanode {
|
||||
&mut self.datanode
|
||||
}
|
||||
|
||||
pub fn datanode(&self) -> &Datanode {
|
||||
&self.datanode
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -235,6 +239,7 @@ impl StartCommand {
|
||||
.with_default_grpc_server(&datanode.region_server())
|
||||
.enable_http_service()
|
||||
.build()
|
||||
.await
|
||||
.context(StartDatanodeSnafu)?;
|
||||
datanode.setup_services(services);
|
||||
|
||||
|
||||
@@ -43,13 +43,17 @@ pub struct Instance {
|
||||
}
|
||||
|
||||
impl Instance {
|
||||
fn new(frontend: FeInstance) -> Self {
|
||||
pub fn new(frontend: FeInstance) -> Self {
|
||||
Self { frontend }
|
||||
}
|
||||
|
||||
pub fn mut_inner(&mut self) -> &mut FeInstance {
|
||||
&mut self.frontend
|
||||
}
|
||||
|
||||
pub fn inner(&self) -> &FeInstance {
|
||||
&self.frontend
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -271,6 +275,7 @@ impl StartCommand {
|
||||
|
||||
let servers = Services::new(opts.clone(), Arc::new(instance.clone()), plugins)
|
||||
.build()
|
||||
.await
|
||||
.context(StartFrontendSnafu)?;
|
||||
instance
|
||||
.build_servers(opts, servers)
|
||||
|
||||
@@ -32,11 +32,11 @@ lazy_static::lazy_static! {
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait App {
|
||||
pub trait App: Send {
|
||||
fn name(&self) -> &str;
|
||||
|
||||
/// A hook for implementor to make something happened before actual startup. Defaults to no-op.
|
||||
fn pre_start(&mut self) -> error::Result<()> {
|
||||
async fn pre_start(&mut self) -> error::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -46,24 +46,21 @@ pub trait App {
|
||||
}
|
||||
|
||||
pub async fn start_app(mut app: Box<dyn App>) -> error::Result<()> {
|
||||
let name = app.name().to_string();
|
||||
info!("Starting app: {}", app.name());
|
||||
|
||||
app.pre_start()?;
|
||||
app.pre_start().await?;
|
||||
|
||||
tokio::select! {
|
||||
result = app.start() => {
|
||||
if let Err(err) = result {
|
||||
error!(err; "Failed to start app {name}!");
|
||||
}
|
||||
}
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
if let Err(err) = app.stop().await {
|
||||
error!(err; "Failed to stop app {name}!");
|
||||
}
|
||||
info!("Goodbye!");
|
||||
}
|
||||
app.start().await?;
|
||||
|
||||
if let Err(e) = tokio::signal::ctrl_c().await {
|
||||
error!("Failed to listen for ctrl-c signal: {}", e);
|
||||
// It's unusual to fail to listen for ctrl-c signal, maybe there's something unexpected in
|
||||
// the underlying system. So we stop the app instead of running nonetheless to let people
|
||||
// investigate the issue.
|
||||
}
|
||||
|
||||
app.stop().await?;
|
||||
info!("Goodbye!");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -21,8 +21,8 @@ use common_catalog::consts::MIN_USER_TABLE_ID;
|
||||
use common_config::{metadata_store_dir, KvBackendConfig};
|
||||
use common_meta::cache_invalidator::DummyCacheInvalidator;
|
||||
use common_meta::datanode_manager::DatanodeManagerRef;
|
||||
use common_meta::ddl::table_meta::TableMetadataAllocator;
|
||||
use common_meta::ddl::DdlTaskExecutorRef;
|
||||
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
|
||||
use common_meta::ddl::ProcedureExecutorRef;
|
||||
use common_meta::ddl_manager::DdlManager;
|
||||
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
@@ -419,11 +419,11 @@ impl StartCommand {
|
||||
let table_metadata_manager =
|
||||
Self::create_table_metadata_manager(kv_backend.clone()).await?;
|
||||
|
||||
let table_meta_allocator = TableMetadataAllocator::new(
|
||||
let table_meta_allocator = Arc::new(TableMetadataAllocator::new(
|
||||
table_id_sequence,
|
||||
wal_options_allocator.clone(),
|
||||
table_metadata_manager.clone(),
|
||||
);
|
||||
table_metadata_manager.table_name_manager().clone(),
|
||||
));
|
||||
|
||||
let ddl_task_executor = Self::create_ddl_task_executor(
|
||||
table_metadata_manager,
|
||||
@@ -441,6 +441,7 @@ impl StartCommand {
|
||||
|
||||
let servers = Services::new(fe_opts.clone(), Arc::new(frontend.clone()), fe_plugins)
|
||||
.build()
|
||||
.await
|
||||
.context(StartFrontendSnafu)?;
|
||||
frontend
|
||||
.build_servers(fe_opts, servers)
|
||||
@@ -458,9 +459,9 @@ impl StartCommand {
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
procedure_manager: ProcedureManagerRef,
|
||||
datanode_manager: DatanodeManagerRef,
|
||||
table_meta_allocator: TableMetadataAllocator,
|
||||
) -> Result<DdlTaskExecutorRef> {
|
||||
let ddl_task_executor: DdlTaskExecutorRef = Arc::new(
|
||||
table_meta_allocator: TableMetadataAllocatorRef,
|
||||
) -> Result<ProcedureExecutorRef> {
|
||||
let procedure_executor: ProcedureExecutorRef = Arc::new(
|
||||
DdlManager::try_new(
|
||||
procedure_manager,
|
||||
datanode_manager,
|
||||
@@ -472,7 +473,7 @@ impl StartCommand {
|
||||
.context(InitDdlManagerSnafu)?,
|
||||
);
|
||||
|
||||
Ok(ddl_task_executor)
|
||||
Ok(procedure_executor)
|
||||
}
|
||||
|
||||
pub async fn create_table_metadata_manager(
|
||||
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
anymap = "1.0.0-beta.2"
|
||||
bitvec = "1.0"
|
||||
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
common-error.workspace = true
|
||||
common-macro.workspace = true
|
||||
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
common-base.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
arrow.workspace = true
|
||||
arrow-schema.workspace = true
|
||||
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
arrow.workspace = true
|
||||
bigdecimal.workspace = true
|
||||
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
snafu.workspace = true
|
||||
strum.workspace = true
|
||||
|
||||
@@ -19,7 +19,9 @@ pub mod format;
|
||||
pub mod mock;
|
||||
pub mod status_code;
|
||||
|
||||
pub use snafu;
|
||||
|
||||
// HACK - these headers are here for shared in gRPC services. For common HTTP headers,
|
||||
// please define in `src/servers/src/http/header.rs`.
|
||||
pub const GREPTIME_DB_HEADER_ERROR_CODE: &str = "x-greptime-err-code";
|
||||
pub const GREPTIME_DB_HEADER_ERROR_MSG: &str = "x-greptime-err-msg";
|
||||
|
||||
pub use snafu;
|
||||
|
||||
@@ -4,13 +4,18 @@ edition.workspace = true
|
||||
version.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
api.workspace = true
|
||||
arc-swap = "1.0"
|
||||
async-trait.workspace = true
|
||||
chrono-tz = "0.6"
|
||||
common-catalog.workspace = true
|
||||
common-error.workspace = true
|
||||
common-macro.workspace = true
|
||||
common-meta.workspace = true
|
||||
common-query.workspace = true
|
||||
common-runtime.workspace = true
|
||||
common-telemetry.workspace = true
|
||||
@@ -23,6 +28,8 @@ num = "0.4"
|
||||
num-traits = "0.2"
|
||||
once_cell.workspace = true
|
||||
paste = "1.0"
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
session.workspace = true
|
||||
snafu.workspace = true
|
||||
statrs = "0.16"
|
||||
|
||||
@@ -30,6 +30,17 @@ pub struct FunctionContext {
|
||||
pub state: Arc<FunctionState>,
|
||||
}
|
||||
|
||||
impl FunctionContext {
|
||||
/// Create a mock [`FunctionContext`] for test.
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub fn mock() -> Self {
|
||||
Self {
|
||||
query_ctx: QueryContextBuilder::default().build(),
|
||||
state: Arc::new(FunctionState::mock()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for FunctionContext {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
|
||||
@@ -21,6 +21,7 @@ use once_cell::sync::Lazy;
|
||||
use crate::function::FunctionRef;
|
||||
use crate::scalars::aggregate::{AggregateFunctionMetaRef, AggregateFunctions};
|
||||
use crate::scalars::date::DateFunction;
|
||||
use crate::scalars::expression::ExpressionFunction;
|
||||
use crate::scalars::math::MathFunction;
|
||||
use crate::scalars::numpy::NumpyFunction;
|
||||
use crate::scalars::timestamp::TimestampFunction;
|
||||
@@ -80,6 +81,7 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
|
||||
NumpyFunction::register(&function_registry);
|
||||
TimestampFunction::register(&function_registry);
|
||||
DateFunction::register(&function_registry);
|
||||
ExpressionFunction::register(&function_registry);
|
||||
|
||||
// Aggregate functions
|
||||
AggregateFunctions::register(&function_registry);
|
||||
|
||||
@@ -13,10 +13,9 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::meta::ProcedureStateResponse;
|
||||
use async_trait::async_trait;
|
||||
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
|
||||
use common_query::error::Result;
|
||||
use session::context::QueryContextRef;
|
||||
use table::requests::{DeleteRequest, InsertRequest};
|
||||
@@ -31,24 +30,18 @@ pub trait TableMutationHandler: Send + Sync {
|
||||
|
||||
/// Delete rows from the table.
|
||||
async fn delete(&self, request: DeleteRequest, ctx: QueryContextRef) -> Result<AffectedRows>;
|
||||
|
||||
/// Migrate a region from source peer to target peer, returns the procedure id if success.
|
||||
async fn migrate_region(
|
||||
&self,
|
||||
region_id: u64,
|
||||
from_peer: u64,
|
||||
to_peer: u64,
|
||||
replay_timeout: Duration,
|
||||
) -> Result<String>;
|
||||
}
|
||||
|
||||
/// A trait for handling meta service requests in `QueryEngine`.
|
||||
/// A trait for handling procedure service requests in `QueryEngine`.
|
||||
#[async_trait]
|
||||
pub trait MetaServiceHandler: Send + Sync {
|
||||
pub trait ProcedureServiceHandler: Send + Sync {
|
||||
/// Migrate a region from source peer to target peer, returns the procedure id if success.
|
||||
async fn migrate_region(&self, request: MigrateRegionRequest) -> Result<Option<String>>;
|
||||
|
||||
/// Query the procedure' state by its id
|
||||
async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse>;
|
||||
}
|
||||
|
||||
pub type TableMutationHandlerRef = Arc<dyn TableMutationHandler>;
|
||||
|
||||
pub type MetaServiceHandlerRef = Arc<dyn MetaServiceHandler>;
|
||||
pub type ProcedureServiceHandlerRef = Arc<dyn ProcedureServiceHandler>;
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod macros;
|
||||
pub mod scalars;
|
||||
mod system;
|
||||
mod table;
|
||||
|
||||
27
src/common/function/src/macros.rs
Normal file
27
src/common/function/src/macros.rs
Normal file
@@ -0,0 +1,27 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
/// Ensure current function is invokded under `greptime` catalog.
|
||||
#[macro_export]
|
||||
macro_rules! ensure_greptime {
|
||||
($func_ctx: expr) => {{
|
||||
use common_catalog::consts::DEFAULT_CATALOG_NAME;
|
||||
snafu::ensure!(
|
||||
$func_ctx.query_ctx.current_catalog() == DEFAULT_CATALOG_NAME,
|
||||
common_query::error::PermissionDeniedSnafu {
|
||||
err_msg: format!("current catalog is not {DEFAULT_CATALOG_NAME}")
|
||||
}
|
||||
);
|
||||
}};
|
||||
}
|
||||
@@ -14,8 +14,22 @@
|
||||
|
||||
mod binary;
|
||||
mod ctx;
|
||||
mod is_null;
|
||||
mod unary;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
pub use binary::scalar_binary_op;
|
||||
pub use ctx::EvalContext;
|
||||
pub use unary::scalar_unary_op;
|
||||
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
use crate::scalars::expression::is_null::IsNullFunction;
|
||||
|
||||
pub(crate) struct ExpressionFunction;
|
||||
|
||||
impl ExpressionFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register(Arc::new(IsNullFunction));
|
||||
}
|
||||
}
|
||||
|
||||
109
src/common/function/src/scalars/expression/is_null.rs
Normal file
109
src/common/function/src/scalars/expression/is_null.rs
Normal file
@@ -0,0 +1,109 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt;
|
||||
use std::fmt::Display;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_query::error;
|
||||
use common_query::error::{ArrowComputeSnafu, InvalidFuncArgsSnafu};
|
||||
use common_query::prelude::{Signature, Volatility};
|
||||
use datafusion::arrow::array::ArrayRef;
|
||||
use datafusion::arrow::compute::is_null;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::prelude::VectorRef;
|
||||
use datatypes::vectors::Helper;
|
||||
use snafu::{ensure, ResultExt};
|
||||
|
||||
use crate::function::{Function, FunctionContext};
|
||||
|
||||
const NAME: &str = "isnull";
|
||||
|
||||
/// The function to check whether an expression is NULL
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct IsNullFunction;
|
||||
|
||||
impl Display for IsNullFunction {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{}", NAME.to_ascii_uppercase())
|
||||
}
|
||||
}
|
||||
|
||||
impl Function for IsNullFunction {
|
||||
fn name(&self) -> &str {
|
||||
NAME
|
||||
}
|
||||
|
||||
fn return_type(&self, _: &[ConcreteDataType]) -> common_query::error::Result<ConcreteDataType> {
|
||||
Ok(ConcreteDataType::boolean_datatype())
|
||||
}
|
||||
|
||||
fn signature(&self) -> Signature {
|
||||
Signature::any(1, Volatility::Immutable)
|
||||
}
|
||||
|
||||
fn eval(
|
||||
&self,
|
||||
_func_ctx: FunctionContext,
|
||||
columns: &[VectorRef],
|
||||
) -> common_query::error::Result<VectorRef> {
|
||||
ensure!(
|
||||
columns.len() == 1,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect exactly one, have: {}",
|
||||
columns.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
let values = &columns[0];
|
||||
let arrow_array = &values.to_arrow_array();
|
||||
let result = is_null(arrow_array).context(ArrowComputeSnafu)?;
|
||||
|
||||
Helper::try_into_vector(Arc::new(result) as ArrayRef).context(error::FromArrowArraySnafu)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_query::prelude::TypeSignature;
|
||||
use datatypes::scalars::ScalarVector;
|
||||
use datatypes::vectors::{BooleanVector, Float32Vector};
|
||||
|
||||
use super::*;
|
||||
#[test]
|
||||
fn test_is_null_function() {
|
||||
let is_null = IsNullFunction;
|
||||
assert_eq!("isnull", is_null.name());
|
||||
assert_eq!(
|
||||
ConcreteDataType::boolean_datatype(),
|
||||
is_null.return_type(&[]).unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
is_null.signature(),
|
||||
Signature {
|
||||
type_signature: TypeSignature::Any(1),
|
||||
volatility: Volatility::Immutable
|
||||
}
|
||||
);
|
||||
let values = vec![None, Some(3.0), None];
|
||||
|
||||
let args: Vec<VectorRef> = vec![Arc::new(Float32Vector::from(values))];
|
||||
let vector = is_null.eval(FunctionContext::default(), &args).unwrap();
|
||||
let expect: VectorRef = Arc::new(BooleanVector::from_vec(vec![true, false, true]));
|
||||
assert_eq!(expect, vector);
|
||||
}
|
||||
}
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::handlers::{MetaServiceHandlerRef, TableMutationHandlerRef};
|
||||
use crate::handlers::{ProcedureServiceHandlerRef, TableMutationHandlerRef};
|
||||
|
||||
/// Shared state for SQL functions.
|
||||
/// The handlers in state may be `None` in cli command-line or test cases.
|
||||
@@ -20,6 +20,45 @@ use crate::handlers::{MetaServiceHandlerRef, TableMutationHandlerRef};
|
||||
pub struct FunctionState {
|
||||
// The table mutation handler
|
||||
pub table_mutation_handler: Option<TableMutationHandlerRef>,
|
||||
// The meta service handler
|
||||
pub meta_service_handler: Option<MetaServiceHandlerRef>,
|
||||
// The procedure service handler
|
||||
pub procedure_service_handler: Option<ProcedureServiceHandlerRef>,
|
||||
}
|
||||
|
||||
impl FunctionState {
|
||||
/// Create a mock [`FunctionState`] for test.
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub fn mock() -> Self {
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::meta::ProcedureStatus;
|
||||
use async_trait::async_trait;
|
||||
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
|
||||
use common_query::error::Result;
|
||||
|
||||
use crate::handlers::ProcedureServiceHandler;
|
||||
struct MockProcedureServiceHandler;
|
||||
|
||||
#[async_trait]
|
||||
impl ProcedureServiceHandler for MockProcedureServiceHandler {
|
||||
async fn migrate_region(
|
||||
&self,
|
||||
_request: MigrateRegionRequest,
|
||||
) -> Result<Option<String>> {
|
||||
Ok(Some("test_pid".to_string()))
|
||||
}
|
||||
|
||||
async fn query_procedure_state(&self, _pid: &str) -> Result<ProcedureStateResponse> {
|
||||
Ok(ProcedureStateResponse {
|
||||
status: ProcedureStatus::Done.into(),
|
||||
error: "OK".to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Self {
|
||||
table_mutation_handler: None,
|
||||
procedure_service_handler: Some(Arc::new(MockProcedureServiceHandler)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
mod build;
|
||||
mod database;
|
||||
mod procedure_state;
|
||||
mod timezone;
|
||||
mod version;
|
||||
|
||||
@@ -21,6 +22,7 @@ use std::sync::Arc;
|
||||
|
||||
use build::BuildFunction;
|
||||
use database::DatabaseFunction;
|
||||
use procedure_state::ProcedureStateFunction;
|
||||
use timezone::TimezoneFunction;
|
||||
use version::VersionFunction;
|
||||
|
||||
@@ -34,5 +36,6 @@ impl SystemFunction {
|
||||
registry.register(Arc::new(VersionFunction));
|
||||
registry.register(Arc::new(DatabaseFunction));
|
||||
registry.register(Arc::new(TimezoneFunction));
|
||||
registry.register(Arc::new(ProcedureStateFunction));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@ use datatypes::vectors::{StringVector, VectorRef};
|
||||
|
||||
use crate::function::{Function, FunctionContext};
|
||||
|
||||
/// Generates build information
|
||||
/// Generates build information
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct BuildFunction;
|
||||
|
||||
@@ -42,11 +42,7 @@ impl Function for BuildFunction {
|
||||
}
|
||||
|
||||
fn signature(&self) -> Signature {
|
||||
Signature::uniform(
|
||||
0,
|
||||
vec![ConcreteDataType::string_datatype()],
|
||||
Volatility::Immutable,
|
||||
)
|
||||
Signature::uniform(0, vec![], Volatility::Immutable)
|
||||
}
|
||||
|
||||
fn eval(&self, _func_ctx: FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
|
||||
@@ -75,7 +71,7 @@ mod tests {
|
||||
Signature {
|
||||
type_signature: TypeSignature::Uniform(0, valid_types),
|
||||
volatility: Volatility::Immutable
|
||||
} if valid_types == vec![ConcreteDataType::string_datatype()]
|
||||
} if valid_types.is_empty()
|
||||
));
|
||||
let build_info = common_version::build_info().to_string();
|
||||
let vector = build.eval(FunctionContext::default(), &[]).unwrap();
|
||||
|
||||
216
src/common/function/src/system/procedure_state.rs
Normal file
216
src/common/function/src/system/procedure_state.rs
Normal file
@@ -0,0 +1,216 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::meta::ProcedureStatus;
|
||||
use common_meta::rpc::procedure::ProcedureStateResponse;
|
||||
use common_query::error::Error::ThreadJoin;
|
||||
use common_query::error::{
|
||||
InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result,
|
||||
UnsupportedInputDataTypeSnafu,
|
||||
};
|
||||
use common_query::prelude::{Signature, Volatility};
|
||||
use common_telemetry::error;
|
||||
use datatypes::prelude::*;
|
||||
use datatypes::vectors::{ConstantVector, Helper, StringVector, VectorRef};
|
||||
use serde::Serialize;
|
||||
use snafu::{ensure, Location, OptionExt};
|
||||
|
||||
use crate::function::{Function, FunctionContext};
|
||||
|
||||
const NAME: &str = "procedure_state";
|
||||
|
||||
/// A function to query procedure state by its id.
|
||||
/// Such as `procedure_state(pid)`.
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct ProcedureStateFunction;
|
||||
|
||||
impl fmt::Display for ProcedureStateFunction {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "PROCEDURE_STATE")
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct ProcedureStateJson {
|
||||
status: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
error: Option<String>,
|
||||
}
|
||||
|
||||
impl Function for ProcedureStateFunction {
|
||||
fn name(&self) -> &str {
|
||||
NAME
|
||||
}
|
||||
|
||||
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
|
||||
Ok(ConcreteDataType::string_datatype())
|
||||
}
|
||||
|
||||
fn signature(&self) -> Signature {
|
||||
Signature::uniform(
|
||||
1,
|
||||
vec![ConcreteDataType::string_datatype()],
|
||||
Volatility::Immutable,
|
||||
)
|
||||
}
|
||||
|
||||
fn eval(&self, func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
|
||||
crate::ensure_greptime!(func_ctx);
|
||||
|
||||
ensure!(
|
||||
columns.len() == 1,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect 1, have: {}",
|
||||
columns.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let pids = columns[0].clone();
|
||||
let expect_len = pids.len();
|
||||
let is_const = pids.is_const();
|
||||
|
||||
match pids.data_type() {
|
||||
ConcreteDataType::String(_) => {
|
||||
// TODO(dennis): datafusion UDF doesn't support async function currently
|
||||
std::thread::spawn(move || {
|
||||
let pids: &StringVector = if is_const {
|
||||
let pids: &ConstantVector = unsafe { Helper::static_cast(&pids) };
|
||||
unsafe { Helper::static_cast(pids.inner()) }
|
||||
} else {
|
||||
unsafe { Helper::static_cast(&pids) }
|
||||
};
|
||||
|
||||
let procedure_service_handler = func_ctx
|
||||
.state
|
||||
.procedure_service_handler
|
||||
.as_ref()
|
||||
.context(MissingProcedureServiceHandlerSnafu)?;
|
||||
|
||||
let states = pids
|
||||
.iter_data()
|
||||
.map(|pid| {
|
||||
if let Some(pid) = pid {
|
||||
let ProcedureStateResponse { status, error, .. } =
|
||||
common_runtime::block_on_read(async move {
|
||||
procedure_service_handler.query_procedure_state(pid).await
|
||||
})?;
|
||||
|
||||
let status = ProcedureStatus::try_from(status)
|
||||
.map(|v| v.as_str_name())
|
||||
.unwrap_or("Unknown");
|
||||
|
||||
let state = ProcedureStateJson {
|
||||
status: status.to_string(),
|
||||
error: if error.is_empty() { None } else { Some(error) },
|
||||
};
|
||||
|
||||
Ok(Some(serde_json::to_string(&state).unwrap_or_default()))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
let results: VectorRef = Arc::new(StringVector::from(states));
|
||||
|
||||
if is_const {
|
||||
Ok(Arc::new(ConstantVector::new(results, expect_len)) as _)
|
||||
} else {
|
||||
Ok(results)
|
||||
}
|
||||
})
|
||||
.join()
|
||||
.map_err(|e| {
|
||||
error!(e; "Join thread error");
|
||||
ThreadJoin {
|
||||
location: Location::default(),
|
||||
}
|
||||
})?
|
||||
}
|
||||
_ => UnsupportedInputDataTypeSnafu {
|
||||
function: NAME,
|
||||
datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_query::prelude::TypeSignature;
|
||||
use datatypes::vectors::StringVector;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_procedure_state_misc() {
|
||||
let f = ProcedureStateFunction;
|
||||
assert_eq!("procedure_state", f.name());
|
||||
assert_eq!(
|
||||
ConcreteDataType::string_datatype(),
|
||||
f.return_type(&[]).unwrap()
|
||||
);
|
||||
assert!(matches!(f.signature(),
|
||||
Signature {
|
||||
type_signature: TypeSignature::Uniform(1, valid_types),
|
||||
volatility: Volatility::Immutable
|
||||
} if valid_types == vec![ConcreteDataType::string_datatype()]
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_missing_procedure_service() {
|
||||
let f = ProcedureStateFunction;
|
||||
|
||||
let args = vec!["pid"];
|
||||
|
||||
let args = args
|
||||
.into_iter()
|
||||
.map(|arg| Arc::new(StringVector::from_slice(&[arg])) as _)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let result = f.eval(FunctionContext::default(), &args).unwrap_err();
|
||||
assert_eq!(
|
||||
"Missing ProcedureServiceHandler, not expected",
|
||||
result.to_string()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_procedure_state() {
|
||||
let f = ProcedureStateFunction;
|
||||
|
||||
let args = vec!["pid"];
|
||||
|
||||
let args = args
|
||||
.into_iter()
|
||||
.map(|arg| Arc::new(StringVector::from_slice(&[arg])) as _)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let result = f.eval(FunctionContext::mock(), &args).unwrap();
|
||||
|
||||
let expect: VectorRef = Arc::new(StringVector::from(vec![
|
||||
"{\"status\":\"Done\",\"error\":\"OK\"}",
|
||||
]));
|
||||
assert_eq!(expect, result);
|
||||
}
|
||||
}
|
||||
@@ -15,9 +15,10 @@
|
||||
use std::fmt::{self};
|
||||
use std::time::Duration;
|
||||
|
||||
use common_meta::rpc::procedure::MigrateRegionRequest;
|
||||
use common_query::error::Error::ThreadJoin;
|
||||
use common_query::error::{
|
||||
InvalidFuncArgsSnafu, InvalidInputTypeSnafu, MissingTableMutationHandlerSnafu, Result,
|
||||
InvalidFuncArgsSnafu, InvalidInputTypeSnafu, MissingProcedureServiceHandlerSnafu, Result,
|
||||
};
|
||||
use common_query::prelude::{Signature, TypeSignature, Volatility};
|
||||
use common_telemetry::logging::error;
|
||||
@@ -77,6 +78,8 @@ impl Function for MigrateRegionFunction {
|
||||
}
|
||||
|
||||
fn eval(&self, func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
|
||||
crate::ensure_greptime!(func_ctx);
|
||||
|
||||
let (region_ids, from_peers, to_peers, replay_timeouts) = match columns.len() {
|
||||
3 => {
|
||||
let region_ids = cast_u64_vector(&columns[0])?;
|
||||
@@ -106,9 +109,15 @@ impl Function for MigrateRegionFunction {
|
||||
}
|
||||
};
|
||||
|
||||
// TODO(dennis): datafusion UDF doesn't support async function currently
|
||||
std::thread::spawn(move || {
|
||||
let len = region_ids.len();
|
||||
let mut results = StringVectorBuilder::with_capacity(len);
|
||||
let procedure_service_handler = func_ctx
|
||||
.state
|
||||
.procedure_service_handler
|
||||
.as_ref()
|
||||
.context(MissingProcedureServiceHandlerSnafu)?;
|
||||
|
||||
for index in 0..len {
|
||||
let region_id = region_ids.get(index);
|
||||
@@ -126,24 +135,18 @@ impl Function for MigrateRegionFunction {
|
||||
Value::UInt64(to_peer),
|
||||
Value::UInt64(replay_timeout),
|
||||
) => {
|
||||
let func_ctx = func_ctx.clone();
|
||||
|
||||
let pid = common_runtime::block_on_read(async move {
|
||||
func_ctx
|
||||
.state
|
||||
.table_mutation_handler
|
||||
.as_ref()
|
||||
.context(MissingTableMutationHandlerSnafu)?
|
||||
.migrate_region(
|
||||
procedure_service_handler
|
||||
.migrate_region(MigrateRegionRequest {
|
||||
region_id,
|
||||
from_peer,
|
||||
to_peer,
|
||||
Duration::from_secs(replay_timeout),
|
||||
)
|
||||
replay_timeout: Duration::from_secs(replay_timeout),
|
||||
})
|
||||
.await
|
||||
})?;
|
||||
|
||||
results.push(Some(&pid));
|
||||
results.push(pid.as_deref())
|
||||
}
|
||||
_ => {
|
||||
results.push(None);
|
||||
@@ -171,5 +174,60 @@ impl fmt::Display for MigrateRegionFunction {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
// FIXME(dennis): test in the following PR.
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_query::prelude::TypeSignature;
|
||||
use datatypes::vectors::{StringVector, UInt64Vector};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_migrate_region_misc() {
|
||||
let f = MigrateRegionFunction;
|
||||
assert_eq!("migrate_region", f.name());
|
||||
assert_eq!(
|
||||
ConcreteDataType::string_datatype(),
|
||||
f.return_type(&[]).unwrap()
|
||||
);
|
||||
assert!(matches!(f.signature(),
|
||||
Signature {
|
||||
type_signature: TypeSignature::OneOf(sigs),
|
||||
volatility: Volatility::Immutable
|
||||
} if sigs.len() == 2));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_missing_procedure_service() {
|
||||
let f = MigrateRegionFunction;
|
||||
|
||||
let args = vec![1, 1, 1];
|
||||
|
||||
let args = args
|
||||
.into_iter()
|
||||
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let result = f.eval(FunctionContext::default(), &args).unwrap_err();
|
||||
assert_eq!(
|
||||
"Missing ProcedureServiceHandler, not expected",
|
||||
result.to_string()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_migrate_region() {
|
||||
let f = MigrateRegionFunction;
|
||||
|
||||
let args = vec![1, 1, 1];
|
||||
|
||||
let args = args
|
||||
.into_iter()
|
||||
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let result = f.eval(FunctionContext::mock(), &args).unwrap();
|
||||
|
||||
let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"]));
|
||||
assert_eq!(expect, result);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
async-trait.workspace = true
|
||||
common-error.workspace = true
|
||||
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
api.workspace = true
|
||||
async-trait.workspace = true
|
||||
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
api.workspace = true
|
||||
arrow-flight.workspace = true
|
||||
|
||||
@@ -7,6 +7,9 @@ license.workspace = true
|
||||
[lib]
|
||||
proc-macro = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
proc-macro2 = "1.0.66"
|
||||
quote = "1.0"
|
||||
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
common-error.workspace = true
|
||||
common-macro.workspace = true
|
||||
|
||||
@@ -7,6 +7,9 @@ license.workspace = true
|
||||
[features]
|
||||
testing = []
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
api.workspace = true
|
||||
async-recursion = "1.0"
|
||||
@@ -20,6 +23,7 @@ common-error.workspace = true
|
||||
common-grpc-expr.workspace = true
|
||||
common-macro.workspace = true
|
||||
common-procedure.workspace = true
|
||||
common-procedure-test.workspace = true
|
||||
common-recordbatch.workspace = true
|
||||
common-runtime.workspace = true
|
||||
common-telemetry.workspace = true
|
||||
|
||||
@@ -18,6 +18,7 @@ use std::sync::Arc;
|
||||
use common_telemetry::tracing_context::W3cTrace;
|
||||
use store_api::storage::{RegionNumber, TableId};
|
||||
|
||||
use self::table_meta::TableMetadataAllocatorRef;
|
||||
use crate::cache_invalidator::CacheInvalidatorRef;
|
||||
use crate::datanode_manager::DatanodeManagerRef;
|
||||
use crate::error::Result;
|
||||
@@ -25,6 +26,7 @@ use crate::key::table_route::TableRouteValue;
|
||||
use crate::key::TableMetadataManagerRef;
|
||||
use crate::region_keeper::MemoryRegionKeeperRef;
|
||||
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
|
||||
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
|
||||
|
||||
pub mod alter_table;
|
||||
pub mod create_logical_tables;
|
||||
@@ -32,6 +34,10 @@ pub mod create_table;
|
||||
mod create_table_template;
|
||||
pub mod drop_table;
|
||||
pub mod table_meta;
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub mod test_util;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
pub mod truncate_table;
|
||||
pub mod utils;
|
||||
|
||||
@@ -41,16 +47,32 @@ pub struct ExecutorContext {
|
||||
pub tracing_context: Option<W3cTrace>,
|
||||
}
|
||||
|
||||
/// The procedure executor that accepts ddl, region migration task etc.
|
||||
#[async_trait::async_trait]
|
||||
pub trait DdlTaskExecutor: Send + Sync {
|
||||
pub trait ProcedureExecutor: Send + Sync {
|
||||
/// Submit a ddl task
|
||||
async fn submit_ddl_task(
|
||||
&self,
|
||||
ctx: &ExecutorContext,
|
||||
request: SubmitDdlTaskRequest,
|
||||
) -> Result<SubmitDdlTaskResponse>;
|
||||
|
||||
/// Submit a region migration task
|
||||
async fn migrate_region(
|
||||
&self,
|
||||
ctx: &ExecutorContext,
|
||||
request: MigrateRegionRequest,
|
||||
) -> Result<MigrateRegionResponse>;
|
||||
|
||||
/// Query the procedure state by its id
|
||||
async fn query_procedure_state(
|
||||
&self,
|
||||
ctx: &ExecutorContext,
|
||||
pid: &str,
|
||||
) -> Result<ProcedureStateResponse>;
|
||||
}
|
||||
|
||||
pub type DdlTaskExecutorRef = Arc<dyn DdlTaskExecutor>;
|
||||
pub type ProcedureExecutorRef = Arc<dyn ProcedureExecutor>;
|
||||
|
||||
pub struct TableMetadataAllocatorContext {
|
||||
pub cluster_id: u64,
|
||||
@@ -73,4 +95,5 @@ pub struct DdlContext {
|
||||
pub cache_invalidator: CacheInvalidatorRef,
|
||||
pub table_metadata_manager: TableMetadataManagerRef,
|
||||
pub memory_region_keeper: MemoryRegionKeeperRef,
|
||||
pub table_metadata_allocator: TableMetadataAllocatorRef,
|
||||
}
|
||||
|
||||
@@ -40,7 +40,7 @@ use table::requests::AlterKind;
|
||||
use table::table_reference::TableReference;
|
||||
|
||||
use crate::cache_invalidator::Context;
|
||||
use crate::ddl::utils::handle_operate_region_error;
|
||||
use crate::ddl::utils::add_peer_context_if_needed;
|
||||
use crate::ddl::DdlContext;
|
||||
use crate::error::{self, ConvertAlterTableRequestSnafu, Error, InvalidProtoMsgSnafu, Result};
|
||||
use crate::key::table_info::TableInfoValue;
|
||||
@@ -226,7 +226,7 @@ impl AlterTableProcedure {
|
||||
// The engine will throw this code when the schema version not match.
|
||||
// As this procedure has locked the table, the only reason for this error
|
||||
// is procedure is succeeded before and is retrying.
|
||||
return Err(handle_operate_region_error(datanode)(err));
|
||||
return Err(add_peer_context_if_needed(datanode)(err));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -31,7 +31,7 @@ use strum::AsRefStr;
|
||||
use table::metadata::{RawTableInfo, TableId};
|
||||
|
||||
use crate::ddl::create_table_template::{build_template, CreateRequestBuilder};
|
||||
use crate::ddl::utils::{handle_operate_region_error, handle_retry_error, region_storage_path};
|
||||
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, region_storage_path};
|
||||
use crate::ddl::DdlContext;
|
||||
use crate::error::{Result, TableAlreadyExistsSnafu};
|
||||
use crate::key::table_name::TableNameKey;
|
||||
@@ -66,7 +66,16 @@ impl CreateLogicalTablesProcedure {
|
||||
Ok(Self { context, creator })
|
||||
}
|
||||
|
||||
async fn on_prepare(&mut self) -> Result<Status> {
|
||||
/// On the prepares step, it performs:
|
||||
/// - Checks whether physical table exists.
|
||||
/// - Checks whether logical tables exist.
|
||||
/// - Allocates the table ids.
|
||||
///
|
||||
/// Abort(non-retry):
|
||||
/// - The physical table does not exist.
|
||||
/// - Failed to check whether tables exist.
|
||||
/// - One of logical tables has existing, and the table creation task without setting `create_if_not_exists`.
|
||||
pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
|
||||
let manager = &self.context.table_metadata_manager;
|
||||
|
||||
// Sets physical region numbers
|
||||
@@ -80,7 +89,7 @@ impl CreateLogicalTablesProcedure {
|
||||
.data
|
||||
.set_physical_region_numbers(physical_region_numbers);
|
||||
|
||||
// Checks if the tables exists
|
||||
// Checks if the tables exist
|
||||
let table_name_keys = self
|
||||
.creator
|
||||
.data
|
||||
@@ -96,24 +105,9 @@ impl CreateLogicalTablesProcedure {
|
||||
.map(|x| x.map(|x| x.table_id()))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Sets table ids already exists
|
||||
self.creator
|
||||
.data
|
||||
.set_table_ids_already_exists(already_exists_tables_ids);
|
||||
|
||||
// If all tables do not exists, we can create them directly.
|
||||
if self.creator.data.is_all_tables_not_exists() {
|
||||
self.creator.data.state = CreateTablesState::DatanodeCreateRegions;
|
||||
return Ok(Status::executing(true));
|
||||
}
|
||||
|
||||
// Filter out the tables that already exist.
|
||||
let tasks = &self.creator.data.tasks;
|
||||
let mut filtered_tasks = Vec::with_capacity(tasks.len());
|
||||
for (task, table_id) in tasks
|
||||
.iter()
|
||||
.zip(self.creator.data.table_ids_already_exists().iter())
|
||||
{
|
||||
// Validates the tasks
|
||||
let tasks = &mut self.creator.data.tasks;
|
||||
for (task, table_id) in tasks.iter().zip(already_exists_tables_ids.iter()) {
|
||||
if table_id.is_some() {
|
||||
// If a table already exists, we just ignore it.
|
||||
ensure!(
|
||||
@@ -124,17 +118,34 @@ impl CreateLogicalTablesProcedure {
|
||||
);
|
||||
continue;
|
||||
}
|
||||
filtered_tasks.push(task.clone());
|
||||
}
|
||||
|
||||
// Resets tasks
|
||||
self.creator.data.tasks = filtered_tasks;
|
||||
if self.creator.data.tasks.is_empty() {
|
||||
// If all tables already exist, we can skip the `DatanodeCreateRegions` stage.
|
||||
self.creator.data.state = CreateTablesState::CreateMetadata;
|
||||
return Ok(Status::executing(true));
|
||||
// If all tables already exist, returns the table_ids.
|
||||
if already_exists_tables_ids.iter().all(Option::is_some) {
|
||||
return Ok(Status::done_with_output(
|
||||
already_exists_tables_ids
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.collect::<Vec<_>>(),
|
||||
));
|
||||
}
|
||||
|
||||
// Allocates table ids
|
||||
for (task, table_id) in tasks.iter_mut().zip(already_exists_tables_ids.iter()) {
|
||||
let table_id = if let Some(table_id) = table_id {
|
||||
*table_id
|
||||
} else {
|
||||
self.context
|
||||
.table_metadata_allocator
|
||||
.allocate_table_id(task)
|
||||
.await?
|
||||
};
|
||||
task.set_table_id(table_id);
|
||||
}
|
||||
|
||||
self.creator
|
||||
.data
|
||||
.set_table_ids_already_exists(already_exists_tables_ids);
|
||||
self.creator.data.state = CreateTablesState::DatanodeCreateRegions;
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
@@ -152,17 +163,20 @@ impl CreateLogicalTablesProcedure {
|
||||
self.create_regions(region_routes).await
|
||||
}
|
||||
|
||||
/// Creates table metadata
|
||||
///
|
||||
/// Abort(not-retry):
|
||||
/// - Failed to create table metadata.
|
||||
pub async fn on_create_metadata(&self) -> Result<Status> {
|
||||
let manager = &self.context.table_metadata_manager;
|
||||
|
||||
let physical_table_id = self.creator.data.physical_table_id();
|
||||
let tables_data = self.creator.data.all_tables_data();
|
||||
let num_tables = tables_data.len();
|
||||
let remaining_tasks = self.creator.data.remaining_tasks();
|
||||
let num_tables = remaining_tasks.len();
|
||||
|
||||
if num_tables > 0 {
|
||||
let chunk_size = manager.max_logical_tables_per_batch();
|
||||
if num_tables > chunk_size {
|
||||
let chunks = tables_data
|
||||
let chunks = remaining_tasks
|
||||
.into_iter()
|
||||
.chunks(chunk_size)
|
||||
.into_iter()
|
||||
@@ -172,11 +186,21 @@ impl CreateLogicalTablesProcedure {
|
||||
manager.create_logical_tables_metadata(chunk).await?;
|
||||
}
|
||||
} else {
|
||||
manager.create_logical_tables_metadata(tables_data).await?;
|
||||
manager
|
||||
.create_logical_tables_metadata(remaining_tasks)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
let table_ids = self.creator.data.real_table_ids();
|
||||
// The `table_id` MUST be collected after the [Prepare::Prepare],
|
||||
// ensures the all `table_id`s have been allocated.
|
||||
let table_ids = self
|
||||
.creator
|
||||
.data
|
||||
.tasks
|
||||
.iter()
|
||||
.map(|task| task.table_info.ident.table_id)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
info!("Created {num_tables} tables {table_ids:?} metadata for physical table {physical_table_id}");
|
||||
|
||||
@@ -238,10 +262,10 @@ impl CreateLogicalTablesProcedure {
|
||||
body: Some(PbRegionRequest::Creates(creates)),
|
||||
};
|
||||
create_region_tasks.push(async move {
|
||||
if let Err(err) = requester.handle(request).await {
|
||||
return Err(handle_operate_region_error(datanode)(err));
|
||||
}
|
||||
Ok(())
|
||||
requester
|
||||
.handle(request)
|
||||
.await
|
||||
.map_err(add_peer_context_if_needed(datanode))
|
||||
});
|
||||
}
|
||||
|
||||
@@ -310,17 +334,13 @@ impl TablesCreator {
|
||||
tasks: Vec<CreateTableTask>,
|
||||
physical_table_id: TableId,
|
||||
) -> Self {
|
||||
let table_ids_from_tasks = tasks
|
||||
.iter()
|
||||
.map(|task| task.table_info.ident.table_id)
|
||||
.collect::<Vec<_>>();
|
||||
let len = table_ids_from_tasks.len();
|
||||
let len = tasks.len();
|
||||
|
||||
Self {
|
||||
data: CreateTablesData {
|
||||
cluster_id,
|
||||
state: CreateTablesState::Prepare,
|
||||
tasks,
|
||||
table_ids_from_tasks,
|
||||
table_ids_already_exists: vec![None; len],
|
||||
physical_table_id,
|
||||
physical_region_numbers: vec![],
|
||||
@@ -334,10 +354,6 @@ pub struct CreateTablesData {
|
||||
cluster_id: ClusterId,
|
||||
state: CreateTablesState,
|
||||
tasks: Vec<CreateTableTask>,
|
||||
table_ids_from_tasks: Vec<TableId>,
|
||||
// Because the table_id is allocated before entering the distributed lock,
|
||||
// it needs to recheck if the table exists when creating a table.
|
||||
// If it does exist, then the table_id needs to be replaced with the existing one.
|
||||
table_ids_already_exists: Vec<Option<TableId>>,
|
||||
physical_table_id: TableId,
|
||||
physical_region_numbers: Vec<RegionNumber>,
|
||||
@@ -360,24 +376,6 @@ impl CreateTablesData {
|
||||
self.table_ids_already_exists = table_ids_already_exists;
|
||||
}
|
||||
|
||||
fn table_ids_already_exists(&self) -> &[Option<TableId>] {
|
||||
&self.table_ids_already_exists
|
||||
}
|
||||
|
||||
fn is_all_tables_not_exists(&self) -> bool {
|
||||
self.table_ids_already_exists.iter().all(Option::is_none)
|
||||
}
|
||||
|
||||
pub fn real_table_ids(&self) -> Vec<TableId> {
|
||||
self.table_ids_from_tasks
|
||||
.iter()
|
||||
.zip(self.table_ids_already_exists.iter())
|
||||
.map(|(table_id_from_task, table_id_already_exists)| {
|
||||
table_id_already_exists.unwrap_or(*table_id_from_task)
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
fn all_create_table_exprs(&self) -> Vec<&CreateTableExpr> {
|
||||
self.tasks
|
||||
.iter()
|
||||
@@ -385,18 +383,27 @@ impl CreateTablesData {
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
fn all_tables_data(&self) -> Vec<(RawTableInfo, TableRouteValue)> {
|
||||
/// Returns the remaining tasks.
|
||||
/// The length of tasks must be greater than 0.
|
||||
fn remaining_tasks(&self) -> Vec<(RawTableInfo, TableRouteValue)> {
|
||||
self.tasks
|
||||
.iter()
|
||||
.map(|task| {
|
||||
let table_info = task.table_info.clone();
|
||||
let region_ids = self
|
||||
.physical_region_numbers
|
||||
.iter()
|
||||
.map(|region_number| RegionId::new(table_info.ident.table_id, *region_number))
|
||||
.collect();
|
||||
let table_route = TableRouteValue::logical(self.physical_table_id, region_ids);
|
||||
(table_info, table_route)
|
||||
.zip(self.table_ids_already_exists.iter())
|
||||
.flat_map(|(task, table_id)| {
|
||||
if table_id.is_none() {
|
||||
let table_info = task.table_info.clone();
|
||||
let region_ids = self
|
||||
.physical_region_numbers
|
||||
.iter()
|
||||
.map(|region_number| {
|
||||
RegionId::new(table_info.ident.table_id, *region_number)
|
||||
})
|
||||
.collect();
|
||||
let table_route = TableRouteValue::logical(self.physical_table_id, region_ids);
|
||||
Some((table_info, table_route))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
@@ -33,8 +33,8 @@ use table::metadata::{RawTableInfo, TableId};
|
||||
use table::table_reference::TableReference;
|
||||
|
||||
use crate::ddl::create_table_template::{build_template, CreateRequestBuilder};
|
||||
use crate::ddl::utils::{handle_operate_region_error, handle_retry_error, region_storage_path};
|
||||
use crate::ddl::DdlContext;
|
||||
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, region_storage_path};
|
||||
use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext};
|
||||
use crate::error::{self, Result, TableRouteNotFoundSnafu};
|
||||
use crate::key::table_name::TableNameKey;
|
||||
use crate::key::table_route::TableRouteValue;
|
||||
@@ -45,7 +45,6 @@ use crate::rpc::router::{
|
||||
find_leader_regions, find_leaders, operating_leader_regions, RegionRoute,
|
||||
};
|
||||
use crate::{metrics, ClusterId};
|
||||
|
||||
pub struct CreateTableProcedure {
|
||||
pub context: DdlContext,
|
||||
pub creator: TableCreator,
|
||||
@@ -54,16 +53,10 @@ pub struct CreateTableProcedure {
|
||||
impl CreateTableProcedure {
|
||||
pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable";
|
||||
|
||||
pub fn new(
|
||||
cluster_id: ClusterId,
|
||||
task: CreateTableTask,
|
||||
table_route: TableRouteValue,
|
||||
region_wal_options: HashMap<RegionNumber, String>,
|
||||
context: DdlContext,
|
||||
) -> Self {
|
||||
pub fn new(cluster_id: ClusterId, task: CreateTableTask, context: DdlContext) -> Self {
|
||||
Self {
|
||||
context,
|
||||
creator: TableCreator::new(cluster_id, task, table_route, region_wal_options),
|
||||
creator: TableCreator::new(cluster_id, task),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -75,7 +68,8 @@ impl CreateTableProcedure {
|
||||
opening_regions: vec![],
|
||||
};
|
||||
|
||||
if let TableRouteValue::Physical(x) = &creator.data.table_route {
|
||||
// Only registers regions if the table route is allocated.
|
||||
if let Some(TableRouteValue::Physical(x)) = &creator.data.table_route {
|
||||
creator.opening_regions = creator
|
||||
.register_opening_regions(&context, &x.region_routes)
|
||||
.map_err(BoxedError::new)
|
||||
@@ -85,20 +79,41 @@ impl CreateTableProcedure {
|
||||
Ok(CreateTableProcedure { context, creator })
|
||||
}
|
||||
|
||||
pub fn table_info(&self) -> &RawTableInfo {
|
||||
fn table_info(&self) -> &RawTableInfo {
|
||||
&self.creator.data.task.table_info
|
||||
}
|
||||
|
||||
fn table_id(&self) -> TableId {
|
||||
pub(crate) fn table_id(&self) -> TableId {
|
||||
self.table_info().ident.table_id
|
||||
}
|
||||
|
||||
pub fn region_wal_options(&self) -> &HashMap<RegionNumber, String> {
|
||||
&self.creator.data.region_wal_options
|
||||
fn region_wal_options(&self) -> Option<&HashMap<RegionNumber, String>> {
|
||||
self.creator.data.region_wal_options.as_ref()
|
||||
}
|
||||
|
||||
/// Checks whether the table exists.
|
||||
async fn on_prepare(&mut self) -> Result<Status> {
|
||||
fn table_route(&self) -> Option<&TableRouteValue> {
|
||||
self.creator.data.table_route.as_ref()
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub fn set_allocated_metadata(
|
||||
&mut self,
|
||||
table_id: TableId,
|
||||
table_route: TableRouteValue,
|
||||
region_wal_options: HashMap<RegionNumber, String>,
|
||||
) {
|
||||
self.creator
|
||||
.set_allocated_metadata(table_id, table_route, region_wal_options)
|
||||
}
|
||||
|
||||
/// On the prepare step, it performs:
|
||||
/// - Checks whether the table exists.
|
||||
/// - Allocates the table id.
|
||||
///
|
||||
/// Abort(non-retry):
|
||||
/// - TableName exists and `create_if_not_exists` is false.
|
||||
/// - Failed to allocate [TableMetadata].
|
||||
pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
|
||||
let expr = &self.creator.data.task.create_table;
|
||||
let table_name_value = self
|
||||
.context
|
||||
@@ -124,6 +139,22 @@ impl CreateTableProcedure {
|
||||
}
|
||||
|
||||
self.creator.data.state = CreateTableState::DatanodeCreateRegions;
|
||||
let TableMetadata {
|
||||
table_id,
|
||||
table_route,
|
||||
region_wal_options,
|
||||
} = self
|
||||
.context
|
||||
.table_metadata_allocator
|
||||
.create(
|
||||
&TableMetadataAllocatorContext {
|
||||
cluster_id: self.creator.data.cluster_id,
|
||||
},
|
||||
&self.creator.data.task,
|
||||
)
|
||||
.await?;
|
||||
self.creator
|
||||
.set_allocated_metadata(table_id, table_route, region_wal_options);
|
||||
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
@@ -137,8 +168,20 @@ impl CreateTableProcedure {
|
||||
Ok(CreateRequestBuilder::new(template, physical_table_id))
|
||||
}
|
||||
|
||||
/// Creates regions on datanodes
|
||||
///
|
||||
/// Abort(non-retry):
|
||||
/// - Failed to create [CreateRequestBuilder].
|
||||
/// - Failed to get the table route of physical table (for logical table).
|
||||
///
|
||||
/// Retry:
|
||||
/// - If the underlying servers returns one of the following [Code](tonic::status::Code):
|
||||
/// - [Code::Cancelled](tonic::status::Code::Cancelled)
|
||||
/// - [Code::DeadlineExceeded](tonic::status::Code::DeadlineExceeded)
|
||||
/// - [Code::Unavailable](tonic::status::Code::Unavailable)
|
||||
pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
|
||||
match &self.creator.data.table_route {
|
||||
// Safety: the table route must be allocated.
|
||||
match &self.creator.data.table_route.clone().unwrap() {
|
||||
TableRouteValue::Physical(x) => {
|
||||
let region_routes = x.region_routes.clone();
|
||||
let request_builder = self.new_region_request_builder(None)?;
|
||||
@@ -170,7 +213,8 @@ impl CreateTableProcedure {
|
||||
region_routes: &[RegionRoute],
|
||||
request_builder: CreateRequestBuilder,
|
||||
) -> Result<Status> {
|
||||
if self.creator.data.table_route.is_physical() {
|
||||
// Safety: the table_route must be allocated.
|
||||
if self.table_route().unwrap().is_physical() {
|
||||
// Registers opening regions
|
||||
let guards = self
|
||||
.creator
|
||||
@@ -181,13 +225,12 @@ impl CreateTableProcedure {
|
||||
}
|
||||
|
||||
let create_table_data = &self.creator.data;
|
||||
let region_wal_options = &create_table_data.region_wal_options;
|
||||
|
||||
// Safety: the region_wal_options must be allocated
|
||||
let region_wal_options = self.region_wal_options().unwrap();
|
||||
let create_table_expr = &create_table_data.task.create_table;
|
||||
let catalog = &create_table_expr.catalog_name;
|
||||
let schema = &create_table_expr.schema_name;
|
||||
let storage_path = region_storage_path(catalog, schema);
|
||||
|
||||
let leaders = find_leaders(region_routes);
|
||||
let mut create_region_tasks = Vec::with_capacity(leaders.len());
|
||||
|
||||
@@ -203,7 +246,6 @@ impl CreateTableProcedure {
|
||||
storage_path.clone(),
|
||||
region_wal_options,
|
||||
)?;
|
||||
|
||||
requests.push(PbRegionRequest::Create(create_region_request));
|
||||
}
|
||||
|
||||
@@ -218,12 +260,11 @@ impl CreateTableProcedure {
|
||||
|
||||
let datanode = datanode.clone();
|
||||
let requester = requester.clone();
|
||||
|
||||
create_region_tasks.push(async move {
|
||||
if let Err(err) = requester.handle(request).await {
|
||||
return Err(handle_operate_region_error(datanode)(err));
|
||||
}
|
||||
Ok(())
|
||||
requester
|
||||
.handle(request)
|
||||
.await
|
||||
.map_err(add_peer_context_if_needed(datanode))
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -240,18 +281,21 @@ impl CreateTableProcedure {
|
||||
Ok(Status::executing(false))
|
||||
}
|
||||
|
||||
/// Creates table metadata
|
||||
///
|
||||
/// Abort(not-retry):
|
||||
/// - Failed to create table metadata.
|
||||
async fn on_create_metadata(&self) -> Result<Status> {
|
||||
let table_id = self.table_id();
|
||||
let manager = &self.context.table_metadata_manager;
|
||||
|
||||
let raw_table_info = self.table_info().clone();
|
||||
let region_wal_options = self.region_wal_options().clone();
|
||||
// Safety: the region_wal_options must be allocated.
|
||||
let region_wal_options = self.region_wal_options().unwrap().clone();
|
||||
// Safety: the table_route must be allocated.
|
||||
let table_route = self.table_route().unwrap().clone();
|
||||
manager
|
||||
.create_table_metadata(
|
||||
raw_table_info,
|
||||
self.creator.data.table_route.clone(),
|
||||
region_wal_options,
|
||||
)
|
||||
.create_table_metadata(raw_table_info, table_route, region_wal_options)
|
||||
.await?;
|
||||
info!("Created table metadata for table {table_id}");
|
||||
|
||||
@@ -303,19 +347,14 @@ pub struct TableCreator {
|
||||
}
|
||||
|
||||
impl TableCreator {
|
||||
pub fn new(
|
||||
cluster_id: u64,
|
||||
task: CreateTableTask,
|
||||
table_route: TableRouteValue,
|
||||
region_wal_options: HashMap<RegionNumber, String>,
|
||||
) -> Self {
|
||||
pub fn new(cluster_id: u64, task: CreateTableTask) -> Self {
|
||||
Self {
|
||||
data: CreateTableData {
|
||||
state: CreateTableState::Prepare,
|
||||
cluster_id,
|
||||
task,
|
||||
table_route,
|
||||
region_wal_options,
|
||||
table_route: None,
|
||||
region_wal_options: None,
|
||||
},
|
||||
opening_regions: vec![],
|
||||
}
|
||||
@@ -347,6 +386,17 @@ impl TableCreator {
|
||||
}
|
||||
Ok(opening_region_guards)
|
||||
}
|
||||
|
||||
fn set_allocated_metadata(
|
||||
&mut self,
|
||||
table_id: TableId,
|
||||
table_route: TableRouteValue,
|
||||
region_wal_options: HashMap<RegionNumber, String>,
|
||||
) {
|
||||
self.data.task.table_info.ident.table_id = table_id;
|
||||
self.data.table_route = Some(table_route);
|
||||
self.data.region_wal_options = Some(region_wal_options);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)]
|
||||
@@ -363,8 +413,10 @@ pub enum CreateTableState {
|
||||
pub struct CreateTableData {
|
||||
pub state: CreateTableState,
|
||||
pub task: CreateTableTask,
|
||||
table_route: TableRouteValue,
|
||||
pub region_wal_options: HashMap<RegionNumber, String>,
|
||||
/// None stands for not allocated yet.
|
||||
table_route: Option<TableRouteValue>,
|
||||
/// None stands for not allocated yet.
|
||||
pub region_wal_options: Option<HashMap<RegionNumber, String>>,
|
||||
pub cluster_id: ClusterId,
|
||||
}
|
||||
|
||||
|
||||
@@ -34,7 +34,7 @@ use table::table_reference::TableReference;
|
||||
|
||||
use super::utils::handle_retry_error;
|
||||
use crate::cache_invalidator::Context;
|
||||
use crate::ddl::utils::handle_operate_region_error;
|
||||
use crate::ddl::utils::add_peer_context_if_needed;
|
||||
use crate::ddl::DdlContext;
|
||||
use crate::error::{self, Result};
|
||||
use crate::key::table_info::TableInfoValue;
|
||||
@@ -223,7 +223,7 @@ impl DropTableProcedure {
|
||||
drop_region_tasks.push(async move {
|
||||
if let Err(err) = requester.handle(request).await {
|
||||
if err.status_code() != StatusCode::RegionNotFound {
|
||||
return Err(handle_operate_region_error(datanode)(err));
|
||||
return Err(add_peer_context_if_needed(datanode)(err));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -23,21 +23,22 @@ use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
|
||||
use store_api::storage::{RegionId, RegionNumber, TableId};
|
||||
|
||||
use crate::ddl::{TableMetadata, TableMetadataAllocatorContext};
|
||||
use crate::error::{Result, TableNotFoundSnafu, UnsupportedSnafu};
|
||||
use crate::key::table_name::TableNameKey;
|
||||
use crate::error::{self, Result, TableNotFoundSnafu, UnsupportedSnafu};
|
||||
use crate::key::table_name::{TableNameKey, TableNameManager};
|
||||
use crate::key::table_route::{LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue};
|
||||
use crate::key::TableMetadataManagerRef;
|
||||
use crate::peer::Peer;
|
||||
use crate::rpc::ddl::CreateTableTask;
|
||||
use crate::rpc::router::{Region, RegionRoute};
|
||||
use crate::sequence::SequenceRef;
|
||||
use crate::wal_options_allocator::{allocate_region_wal_options, WalOptionsAllocatorRef};
|
||||
|
||||
pub type TableMetadataAllocatorRef = Arc<TableMetadataAllocator>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct TableMetadataAllocator {
|
||||
table_id_sequence: SequenceRef,
|
||||
wal_options_allocator: WalOptionsAllocatorRef,
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
table_name_manager: TableNameManager,
|
||||
peer_allocator: PeerAllocatorRef,
|
||||
}
|
||||
|
||||
@@ -45,12 +46,12 @@ impl TableMetadataAllocator {
|
||||
pub fn new(
|
||||
table_id_sequence: SequenceRef,
|
||||
wal_options_allocator: WalOptionsAllocatorRef,
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
table_name_manager: TableNameManager,
|
||||
) -> Self {
|
||||
Self::with_peer_allocator(
|
||||
table_id_sequence,
|
||||
wal_options_allocator,
|
||||
table_metadata_manager,
|
||||
table_name_manager,
|
||||
Arc::new(NoopPeerAllocator),
|
||||
)
|
||||
}
|
||||
@@ -58,18 +59,18 @@ impl TableMetadataAllocator {
|
||||
pub fn with_peer_allocator(
|
||||
table_id_sequence: SequenceRef,
|
||||
wal_options_allocator: WalOptionsAllocatorRef,
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
table_name_manager: TableNameManager,
|
||||
peer_allocator: PeerAllocatorRef,
|
||||
) -> Self {
|
||||
Self {
|
||||
table_id_sequence,
|
||||
wal_options_allocator,
|
||||
table_metadata_manager,
|
||||
table_name_manager,
|
||||
peer_allocator,
|
||||
}
|
||||
}
|
||||
|
||||
async fn allocate_table_id(&self, task: &CreateTableTask) -> Result<TableId> {
|
||||
pub(crate) async fn allocate_table_id(&self, task: &CreateTableTask) -> Result<TableId> {
|
||||
let table_id = if let Some(table_id) = &task.create_table.table_id {
|
||||
let table_id = table_id.id;
|
||||
|
||||
@@ -123,6 +124,12 @@ impl TableMetadataAllocator {
|
||||
task: &CreateTableTask,
|
||||
) -> Result<TableRouteValue> {
|
||||
let regions = task.partitions.len();
|
||||
ensure!(
|
||||
regions > 0,
|
||||
error::UnexpectedSnafu {
|
||||
err_msg: "The number of partitions must be greater than 0"
|
||||
}
|
||||
);
|
||||
|
||||
let table_route = if task.create_table.engine == METRIC_ENGINE
|
||||
&& let Some(physical_table_name) = task
|
||||
@@ -131,8 +138,7 @@ impl TableMetadataAllocator {
|
||||
.get(LOGICAL_TABLE_METADATA_KEY)
|
||||
{
|
||||
let physical_table_id = self
|
||||
.table_metadata_manager
|
||||
.table_name_manager()
|
||||
.table_name_manager
|
||||
.get(TableNameKey::new(
|
||||
&task.create_table.catalog_name,
|
||||
&task.create_table.schema_name,
|
||||
|
||||
19
src/common/meta/src/ddl/test_util.rs
Normal file
19
src/common/meta/src/ddl/test_util.rs
Normal file
@@ -0,0 +1,19 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub mod create_table;
|
||||
|
||||
pub use create_table::{
|
||||
TestColumnDef, TestColumnDefBuilder, TestCreateTableExpr, TestCreateTableExprBuilder,
|
||||
};
|
||||
165
src/common/meta/src/ddl/test_util/create_table.rs
Normal file
165
src/common/meta/src/ddl/test_util/create_table.rs
Normal file
@@ -0,0 +1,165 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use api::v1::column_def::try_as_column_schema;
|
||||
use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, SemanticType};
|
||||
use chrono::DateTime;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO2_ENGINE};
|
||||
use datatypes::schema::RawSchema;
|
||||
use derive_builder::Builder;
|
||||
use store_api::storage::TableId;
|
||||
use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
|
||||
use table::requests::TableOptions;
|
||||
|
||||
#[derive(Default, Builder)]
|
||||
pub struct TestColumnDef {
|
||||
#[builder(setter(into), default)]
|
||||
name: String,
|
||||
data_type: ColumnDataType,
|
||||
#[builder(default)]
|
||||
is_nullable: bool,
|
||||
semantic_type: SemanticType,
|
||||
#[builder(setter(into), default)]
|
||||
comment: String,
|
||||
}
|
||||
|
||||
impl From<TestColumnDef> for ColumnDef {
|
||||
fn from(
|
||||
TestColumnDef {
|
||||
name,
|
||||
data_type,
|
||||
is_nullable,
|
||||
semantic_type,
|
||||
comment,
|
||||
}: TestColumnDef,
|
||||
) -> Self {
|
||||
Self {
|
||||
name,
|
||||
data_type: data_type as i32,
|
||||
is_nullable,
|
||||
default_constraint: vec![],
|
||||
semantic_type: semantic_type as i32,
|
||||
comment,
|
||||
datatype_extension: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Builder)]
|
||||
#[builder(default)]
|
||||
pub struct TestCreateTableExpr {
|
||||
#[builder(setter(into), default = "DEFAULT_CATALOG_NAME.to_string()")]
|
||||
catalog_name: String,
|
||||
#[builder(setter(into), default = "DEFAULT_SCHEMA_NAME.to_string()")]
|
||||
schema_name: String,
|
||||
#[builder(setter(into))]
|
||||
table_name: String,
|
||||
#[builder(setter(into))]
|
||||
desc: String,
|
||||
#[builder(setter(into))]
|
||||
column_defs: Vec<ColumnDef>,
|
||||
#[builder(setter(into))]
|
||||
time_index: String,
|
||||
#[builder(setter(into))]
|
||||
primary_keys: Vec<String>,
|
||||
create_if_not_exists: bool,
|
||||
table_options: HashMap<String, String>,
|
||||
table_id: Option<TableId>,
|
||||
#[builder(setter(into), default = "MITO2_ENGINE.to_string()")]
|
||||
engine: String,
|
||||
}
|
||||
|
||||
impl From<TestCreateTableExpr> for CreateTableExpr {
|
||||
fn from(
|
||||
TestCreateTableExpr {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name,
|
||||
desc,
|
||||
column_defs,
|
||||
time_index,
|
||||
primary_keys,
|
||||
create_if_not_exists,
|
||||
table_options,
|
||||
table_id,
|
||||
engine,
|
||||
}: TestCreateTableExpr,
|
||||
) -> Self {
|
||||
Self {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name,
|
||||
desc,
|
||||
column_defs,
|
||||
time_index,
|
||||
primary_keys,
|
||||
create_if_not_exists,
|
||||
table_options,
|
||||
table_id: table_id.map(|id| api::v1::TableId { id }),
|
||||
engine,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds [RawTableInfo] from [CreateTableExpr].
|
||||
pub fn build_raw_table_info_from_expr(expr: &CreateTableExpr) -> RawTableInfo {
|
||||
RawTableInfo {
|
||||
ident: TableIdent {
|
||||
table_id: expr
|
||||
.table_id
|
||||
.as_ref()
|
||||
.map(|table_id| table_id.id)
|
||||
.unwrap_or(0),
|
||||
version: 1,
|
||||
},
|
||||
name: expr.table_name.to_string(),
|
||||
desc: Some(expr.desc.to_string()),
|
||||
catalog_name: expr.catalog_name.to_string(),
|
||||
schema_name: expr.schema_name.to_string(),
|
||||
meta: RawTableMeta {
|
||||
schema: RawSchema {
|
||||
column_schemas: expr
|
||||
.column_defs
|
||||
.iter()
|
||||
.map(|column| try_as_column_schema(column).unwrap())
|
||||
.collect(),
|
||||
timestamp_index: expr
|
||||
.column_defs
|
||||
.iter()
|
||||
.position(|column| column.semantic_type() == SemanticType::Timestamp),
|
||||
version: 0,
|
||||
},
|
||||
primary_key_indices: expr
|
||||
.primary_keys
|
||||
.iter()
|
||||
.map(|key| {
|
||||
expr.column_defs
|
||||
.iter()
|
||||
.position(|column| &column.name == key)
|
||||
.unwrap()
|
||||
})
|
||||
.collect(),
|
||||
value_indices: vec![],
|
||||
engine: expr.engine.to_string(),
|
||||
next_column_id: expr.column_defs.len() as u32,
|
||||
region_numbers: vec![],
|
||||
options: TableOptions::default(),
|
||||
created_on: DateTime::default(),
|
||||
partition_key_indices: vec![],
|
||||
},
|
||||
table_type: TableType::Base,
|
||||
}
|
||||
}
|
||||
16
src/common/meta/src/ddl/tests.rs
Normal file
16
src/common/meta/src/ddl/tests.rs
Normal file
@@ -0,0 +1,16 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod create_logical_tables;
|
||||
mod create_table;
|
||||
518
src/common/meta/src/ddl/tests/create_logical_tables.rs
Normal file
518
src/common/meta/src/ddl/tests/create_logical_tables.rs
Normal file
@@ -0,0 +1,518 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::meta::Partition;
|
||||
use api::v1::region::{QueryRequest, RegionRequest};
|
||||
use api::v1::{ColumnDataType, SemanticType};
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId, Status};
|
||||
use common_procedure_test::MockContextProvider;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
use common_telemetry::debug;
|
||||
use store_api::storage::RegionId;
|
||||
use table::metadata::RawTableInfo;
|
||||
|
||||
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
|
||||
use crate::ddl::test_util::create_table::build_raw_table_info_from_expr;
|
||||
use crate::ddl::test_util::{TestColumnDefBuilder, TestCreateTableExprBuilder};
|
||||
use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext};
|
||||
use crate::error::{Error, Result};
|
||||
use crate::key::table_route::TableRouteValue;
|
||||
use crate::peer::Peer;
|
||||
use crate::rpc::ddl::CreateTableTask;
|
||||
use crate::test_util::{new_ddl_context, AffectedRows, MockDatanodeHandler, MockDatanodeManager};
|
||||
|
||||
// Note: this code may be duplicated with others.
|
||||
// However, it's by design, ensures the tests are easy to be modified or added.
|
||||
fn test_create_logical_table_task(name: &str) -> CreateTableTask {
|
||||
let create_table = TestCreateTableExprBuilder::default()
|
||||
.column_defs([
|
||||
TestColumnDefBuilder::default()
|
||||
.name("ts")
|
||||
.data_type(ColumnDataType::TimestampMillisecond)
|
||||
.semantic_type(SemanticType::Timestamp)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into(),
|
||||
TestColumnDefBuilder::default()
|
||||
.name("host")
|
||||
.data_type(ColumnDataType::String)
|
||||
.semantic_type(SemanticType::Tag)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into(),
|
||||
TestColumnDefBuilder::default()
|
||||
.name("cpu")
|
||||
.data_type(ColumnDataType::Float64)
|
||||
.semantic_type(SemanticType::Field)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into(),
|
||||
])
|
||||
.time_index("ts")
|
||||
.primary_keys(["host".into()])
|
||||
.table_name(name)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into();
|
||||
let table_info = build_raw_table_info_from_expr(&create_table);
|
||||
CreateTableTask {
|
||||
create_table,
|
||||
// Single region
|
||||
partitions: vec![Partition {
|
||||
column_list: vec![],
|
||||
value_list: vec![],
|
||||
}],
|
||||
table_info,
|
||||
}
|
||||
}
|
||||
|
||||
// Note: this code may be duplicated with others.
|
||||
// However, it's by design, ensures the tests are easy to be modified or added.
|
||||
fn test_create_physical_table_task(name: &str) -> CreateTableTask {
|
||||
let create_table = TestCreateTableExprBuilder::default()
|
||||
.column_defs([
|
||||
TestColumnDefBuilder::default()
|
||||
.name("ts")
|
||||
.data_type(ColumnDataType::TimestampMillisecond)
|
||||
.semantic_type(SemanticType::Timestamp)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into(),
|
||||
TestColumnDefBuilder::default()
|
||||
.name("value")
|
||||
.data_type(ColumnDataType::Float64)
|
||||
.semantic_type(SemanticType::Field)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into(),
|
||||
])
|
||||
.time_index("ts")
|
||||
.primary_keys(["value".into()])
|
||||
.table_name(name)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into();
|
||||
let table_info = build_raw_table_info_from_expr(&create_table);
|
||||
CreateTableTask {
|
||||
create_table,
|
||||
// Single region
|
||||
partitions: vec![Partition {
|
||||
column_list: vec![],
|
||||
value_list: vec![],
|
||||
}],
|
||||
table_info,
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_prepare_physical_table_not_found() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
let tasks = vec![test_create_logical_table_task("foo")];
|
||||
let physical_table_id = 1024u32;
|
||||
let mut procedure =
|
||||
CreateLogicalTablesProcedure::new(cluster_id, tasks, physical_table_id, ddl_context);
|
||||
let err = procedure.on_prepare().await.unwrap_err();
|
||||
assert_matches!(err, Error::TableRouteNotFound { .. });
|
||||
}
|
||||
|
||||
async fn create_physical_table_metadata(
|
||||
ddl_context: &DdlContext,
|
||||
table_info: RawTableInfo,
|
||||
table_route: TableRouteValue,
|
||||
) {
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.create_table_metadata(table_info, table_route, HashMap::default())
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_prepare() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
// Prepares physical table metadata.
|
||||
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
|
||||
let TableMetadata {
|
||||
table_id,
|
||||
table_route,
|
||||
..
|
||||
} = ddl_context
|
||||
.table_metadata_allocator
|
||||
.create(
|
||||
&TableMetadataAllocatorContext { cluster_id },
|
||||
&create_physical_table_task,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
create_physical_table_task.set_table_id(table_id);
|
||||
create_physical_table_metadata(
|
||||
&ddl_context,
|
||||
create_physical_table_task.table_info.clone(),
|
||||
table_route,
|
||||
)
|
||||
.await;
|
||||
// The create logical table procedure.
|
||||
let tasks = vec![test_create_logical_table_task("foo")];
|
||||
let physical_table_id = table_id;
|
||||
let mut procedure =
|
||||
CreateLogicalTablesProcedure::new(cluster_id, tasks, physical_table_id, ddl_context);
|
||||
let status = procedure.on_prepare().await.unwrap();
|
||||
assert_matches!(status, Status::Executing { persist: true });
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_prepare_logical_table_exists_err() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
// Prepares physical table metadata.
|
||||
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
|
||||
let TableMetadata {
|
||||
table_id,
|
||||
table_route,
|
||||
..
|
||||
} = ddl_context
|
||||
.table_metadata_allocator
|
||||
.create(
|
||||
&TableMetadataAllocatorContext { cluster_id },
|
||||
&create_physical_table_task,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
create_physical_table_task.set_table_id(table_id);
|
||||
create_physical_table_metadata(
|
||||
&ddl_context,
|
||||
create_physical_table_task.table_info.clone(),
|
||||
table_route,
|
||||
)
|
||||
.await;
|
||||
// Creates the logical table metadata.
|
||||
let mut task = test_create_logical_table_task("foo");
|
||||
task.set_table_id(1025);
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.create_logical_tables_metadata(vec![(
|
||||
task.table_info.clone(),
|
||||
TableRouteValue::logical(1024, vec![RegionId::new(1025, 1)]),
|
||||
)])
|
||||
.await
|
||||
.unwrap();
|
||||
// The create logical table procedure.
|
||||
let physical_table_id = table_id;
|
||||
let mut procedure =
|
||||
CreateLogicalTablesProcedure::new(cluster_id, vec![task], physical_table_id, ddl_context);
|
||||
let err = procedure.on_prepare().await.unwrap_err();
|
||||
assert_matches!(err, Error::TableAlreadyExists { .. });
|
||||
assert_eq!(err.status_code(), StatusCode::TableAlreadyExists);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_prepare_with_create_if_table_exists() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
// Prepares physical table metadata.
|
||||
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
|
||||
let TableMetadata {
|
||||
table_id,
|
||||
table_route,
|
||||
..
|
||||
} = ddl_context
|
||||
.table_metadata_allocator
|
||||
.create(
|
||||
&TableMetadataAllocatorContext { cluster_id },
|
||||
&create_physical_table_task,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
create_physical_table_task.set_table_id(table_id);
|
||||
create_physical_table_metadata(
|
||||
&ddl_context,
|
||||
create_physical_table_task.table_info.clone(),
|
||||
table_route,
|
||||
)
|
||||
.await;
|
||||
// Creates the logical table metadata.
|
||||
let mut task = test_create_logical_table_task("foo");
|
||||
task.set_table_id(8192);
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.create_logical_tables_metadata(vec![(
|
||||
task.table_info.clone(),
|
||||
TableRouteValue::logical(1024, vec![RegionId::new(8192, 1)]),
|
||||
)])
|
||||
.await
|
||||
.unwrap();
|
||||
// The create logical table procedure.
|
||||
let physical_table_id = table_id;
|
||||
// Sets `create_if_not_exists`
|
||||
task.create_table.create_if_not_exists = true;
|
||||
let mut procedure =
|
||||
CreateLogicalTablesProcedure::new(cluster_id, vec![task], physical_table_id, ddl_context);
|
||||
let status = procedure.on_prepare().await.unwrap();
|
||||
let output = status.downcast_output_ref::<Vec<u32>>().unwrap();
|
||||
assert_eq!(*output, vec![8192]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_prepare_part_logical_tables_exist() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
// Prepares physical table metadata.
|
||||
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
|
||||
let TableMetadata {
|
||||
table_id,
|
||||
table_route,
|
||||
..
|
||||
} = ddl_context
|
||||
.table_metadata_allocator
|
||||
.create(
|
||||
&TableMetadataAllocatorContext { cluster_id },
|
||||
&create_physical_table_task,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
create_physical_table_task.set_table_id(table_id);
|
||||
create_physical_table_metadata(
|
||||
&ddl_context,
|
||||
create_physical_table_task.table_info.clone(),
|
||||
table_route,
|
||||
)
|
||||
.await;
|
||||
// Creates the logical table metadata.
|
||||
let mut task = test_create_logical_table_task("exists");
|
||||
task.set_table_id(8192);
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.create_logical_tables_metadata(vec![(
|
||||
task.table_info.clone(),
|
||||
TableRouteValue::logical(1024, vec![RegionId::new(8192, 1)]),
|
||||
)])
|
||||
.await
|
||||
.unwrap();
|
||||
// The create logical table procedure.
|
||||
let physical_table_id = table_id;
|
||||
// Sets `create_if_not_exists`
|
||||
task.create_table.create_if_not_exists = true;
|
||||
let non_exist_task = test_create_logical_table_task("non_exists");
|
||||
let mut procedure = CreateLogicalTablesProcedure::new(
|
||||
cluster_id,
|
||||
vec![task, non_exist_task],
|
||||
physical_table_id,
|
||||
ddl_context,
|
||||
);
|
||||
let status = procedure.on_prepare().await.unwrap();
|
||||
assert_matches!(status, Status::Executing { persist: true });
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct NaiveDatanodeHandler;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl MockDatanodeHandler for NaiveDatanodeHandler {
|
||||
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<AffectedRows> {
|
||||
debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
|
||||
Ok(0)
|
||||
}
|
||||
|
||||
async fn handle_query(
|
||||
&self,
|
||||
_peer: &Peer,
|
||||
_request: QueryRequest,
|
||||
) -> Result<SendableRecordBatchStream> {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_create_metadata() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
// Prepares physical table metadata.
|
||||
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
|
||||
let TableMetadata {
|
||||
table_id,
|
||||
table_route,
|
||||
..
|
||||
} = ddl_context
|
||||
.table_metadata_allocator
|
||||
.create(
|
||||
&TableMetadataAllocatorContext { cluster_id },
|
||||
&create_physical_table_task,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
create_physical_table_task.set_table_id(table_id);
|
||||
create_physical_table_metadata(
|
||||
&ddl_context,
|
||||
create_physical_table_task.table_info.clone(),
|
||||
table_route,
|
||||
)
|
||||
.await;
|
||||
// The create logical table procedure.
|
||||
let physical_table_id = table_id;
|
||||
// Creates the logical table metadata.
|
||||
let task = test_create_logical_table_task("foo");
|
||||
let yet_another_task = test_create_logical_table_task("bar");
|
||||
let mut procedure = CreateLogicalTablesProcedure::new(
|
||||
cluster_id,
|
||||
vec![task, yet_another_task],
|
||||
physical_table_id,
|
||||
ddl_context,
|
||||
);
|
||||
let status = procedure.on_prepare().await.unwrap();
|
||||
assert_matches!(status, Status::Executing { persist: true });
|
||||
let ctx = ProcedureContext {
|
||||
procedure_id: ProcedureId::random(),
|
||||
provider: Arc::new(MockContextProvider::default()),
|
||||
};
|
||||
procedure.execute(&ctx).await.unwrap();
|
||||
// Triggers procedure to create table metadata
|
||||
let status = procedure.execute(&ctx).await.unwrap();
|
||||
let table_ids = status.downcast_output_ref::<Vec<u32>>().unwrap();
|
||||
assert_eq!(*table_ids, vec![1025, 1026]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_create_metadata_part_logical_tables_exist() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
// Prepares physical table metadata.
|
||||
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
|
||||
let TableMetadata {
|
||||
table_id,
|
||||
table_route,
|
||||
..
|
||||
} = ddl_context
|
||||
.table_metadata_allocator
|
||||
.create(
|
||||
&TableMetadataAllocatorContext { cluster_id },
|
||||
&create_physical_table_task,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
create_physical_table_task.set_table_id(table_id);
|
||||
create_physical_table_metadata(
|
||||
&ddl_context,
|
||||
create_physical_table_task.table_info.clone(),
|
||||
table_route,
|
||||
)
|
||||
.await;
|
||||
// Creates the logical table metadata.
|
||||
let mut task = test_create_logical_table_task("exists");
|
||||
task.set_table_id(8192);
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.create_logical_tables_metadata(vec![(
|
||||
task.table_info.clone(),
|
||||
TableRouteValue::logical(1024, vec![RegionId::new(8192, 1)]),
|
||||
)])
|
||||
.await
|
||||
.unwrap();
|
||||
// The create logical table procedure.
|
||||
let physical_table_id = table_id;
|
||||
// Sets `create_if_not_exists`
|
||||
task.create_table.create_if_not_exists = true;
|
||||
let non_exist_task = test_create_logical_table_task("non_exists");
|
||||
let mut procedure = CreateLogicalTablesProcedure::new(
|
||||
cluster_id,
|
||||
vec![task, non_exist_task],
|
||||
physical_table_id,
|
||||
ddl_context,
|
||||
);
|
||||
let status = procedure.on_prepare().await.unwrap();
|
||||
assert_matches!(status, Status::Executing { persist: true });
|
||||
let ctx = ProcedureContext {
|
||||
procedure_id: ProcedureId::random(),
|
||||
provider: Arc::new(MockContextProvider::default()),
|
||||
};
|
||||
procedure.execute(&ctx).await.unwrap();
|
||||
// Triggers procedure to create table metadata
|
||||
let status = procedure.execute(&ctx).await.unwrap();
|
||||
let table_ids = status.downcast_output_ref::<Vec<u32>>().unwrap();
|
||||
assert_eq!(*table_ids, vec![8192, 1025]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_create_metadata_err() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
// Prepares physical table metadata.
|
||||
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
|
||||
let TableMetadata {
|
||||
table_id,
|
||||
table_route,
|
||||
..
|
||||
} = ddl_context
|
||||
.table_metadata_allocator
|
||||
.create(
|
||||
&TableMetadataAllocatorContext { cluster_id },
|
||||
&create_physical_table_task,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
create_physical_table_task.set_table_id(table_id);
|
||||
create_physical_table_metadata(
|
||||
&ddl_context,
|
||||
create_physical_table_task.table_info.clone(),
|
||||
table_route,
|
||||
)
|
||||
.await;
|
||||
// The create logical table procedure.
|
||||
let physical_table_id = table_id;
|
||||
// Creates the logical table metadata.
|
||||
let task = test_create_logical_table_task("foo");
|
||||
let yet_another_task = test_create_logical_table_task("bar");
|
||||
let mut procedure = CreateLogicalTablesProcedure::new(
|
||||
cluster_id,
|
||||
vec![task.clone(), yet_another_task],
|
||||
physical_table_id,
|
||||
ddl_context.clone(),
|
||||
);
|
||||
let status = procedure.on_prepare().await.unwrap();
|
||||
assert_matches!(status, Status::Executing { persist: true });
|
||||
let ctx = ProcedureContext {
|
||||
procedure_id: ProcedureId::random(),
|
||||
provider: Arc::new(MockContextProvider::default()),
|
||||
};
|
||||
procedure.execute(&ctx).await.unwrap();
|
||||
// Creates logical table metadata(different with the task)
|
||||
let mut task = task.clone();
|
||||
task.table_info.ident.table_id = 1025;
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.create_logical_tables_metadata(vec![(
|
||||
task.table_info,
|
||||
TableRouteValue::logical(512, vec![RegionId::new(1026, 1)]),
|
||||
)])
|
||||
.await
|
||||
.unwrap();
|
||||
// Triggers procedure to create table metadata
|
||||
let error = procedure.execute(&ctx).await.unwrap_err();
|
||||
assert!(!error.is_retry_later());
|
||||
}
|
||||
328
src/common/meta/src/ddl/tests/create_table.rs
Normal file
328
src/common/meta/src/ddl/tests/create_table.rs
Normal file
@@ -0,0 +1,328 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::meta::Partition;
|
||||
use api::v1::region::{QueryRequest, RegionRequest};
|
||||
use api::v1::{ColumnDataType, SemanticType};
|
||||
use common_error::ext::{BoxedError, ErrorExt};
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId, Status};
|
||||
use common_procedure_test::MockContextProvider;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
use common_telemetry::debug;
|
||||
|
||||
use crate::ddl::create_table::CreateTableProcedure;
|
||||
use crate::ddl::test_util::create_table::build_raw_table_info_from_expr;
|
||||
use crate::ddl::test_util::{TestColumnDefBuilder, TestCreateTableExprBuilder};
|
||||
use crate::error;
|
||||
use crate::error::{Error, Result};
|
||||
use crate::key::table_route::TableRouteValue;
|
||||
use crate::peer::Peer;
|
||||
use crate::rpc::ddl::CreateTableTask;
|
||||
use crate::test_util::{new_ddl_context, AffectedRows, MockDatanodeHandler, MockDatanodeManager};
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl MockDatanodeHandler for () {
|
||||
async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<AffectedRows> {
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
async fn handle_query(
|
||||
&self,
|
||||
_peer: &Peer,
|
||||
_request: QueryRequest,
|
||||
) -> Result<SendableRecordBatchStream> {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
|
||||
fn test_create_table_task(name: &str) -> CreateTableTask {
|
||||
let create_table = TestCreateTableExprBuilder::default()
|
||||
.column_defs([
|
||||
TestColumnDefBuilder::default()
|
||||
.name("ts")
|
||||
.data_type(ColumnDataType::TimestampMillisecond)
|
||||
.semantic_type(SemanticType::Timestamp)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into(),
|
||||
TestColumnDefBuilder::default()
|
||||
.name("host")
|
||||
.data_type(ColumnDataType::String)
|
||||
.semantic_type(SemanticType::Tag)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into(),
|
||||
TestColumnDefBuilder::default()
|
||||
.name("cpu")
|
||||
.data_type(ColumnDataType::Float64)
|
||||
.semantic_type(SemanticType::Field)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into(),
|
||||
])
|
||||
.time_index("ts")
|
||||
.primary_keys(["host".into()])
|
||||
.table_name(name)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into();
|
||||
let table_info = build_raw_table_info_from_expr(&create_table);
|
||||
CreateTableTask {
|
||||
create_table,
|
||||
// Single region
|
||||
partitions: vec![Partition {
|
||||
column_list: vec![],
|
||||
value_list: vec![],
|
||||
}],
|
||||
table_info,
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_prepare_table_exists_err() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
let task = test_create_table_task("foo");
|
||||
assert!(!task.create_table.create_if_not_exists);
|
||||
// Puts a value to table name key.
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.create_table_metadata(
|
||||
task.table_info.clone(),
|
||||
TableRouteValue::physical(vec![]),
|
||||
HashMap::new(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context);
|
||||
let err = procedure.on_prepare().await.unwrap_err();
|
||||
assert_matches!(err, Error::TableAlreadyExists { .. });
|
||||
assert_eq!(err.status_code(), StatusCode::TableAlreadyExists);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_prepare_with_create_if_table_exists() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
let mut task = test_create_table_task("foo");
|
||||
task.create_table.create_if_not_exists = true;
|
||||
task.table_info.ident.table_id = 1024;
|
||||
// Puts a value to table name key.
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.create_table_metadata(
|
||||
task.table_info.clone(),
|
||||
TableRouteValue::physical(vec![]),
|
||||
HashMap::new(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context);
|
||||
let status = procedure.on_prepare().await.unwrap();
|
||||
assert_matches!(status, Status::Done { output: Some(..) });
|
||||
let table_id = *status.downcast_output_ref::<u32>().unwrap();
|
||||
assert_eq!(table_id, 1024);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_prepare_without_create_if_table_exists() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
let mut task = test_create_table_task("foo");
|
||||
task.create_table.create_if_not_exists = true;
|
||||
let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context);
|
||||
let status = procedure.on_prepare().await.unwrap();
|
||||
assert_matches!(status, Status::Executing { persist: true });
|
||||
assert_eq!(procedure.table_id(), 1024);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_prepare_with_no_partition_err() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
let mut task = test_create_table_task("foo");
|
||||
task.partitions = vec![];
|
||||
task.create_table.create_if_not_exists = true;
|
||||
let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context);
|
||||
let err = procedure.on_prepare().await.unwrap_err();
|
||||
assert_matches!(err, Error::Unexpected { .. });
|
||||
assert!(err
|
||||
.to_string()
|
||||
.contains("The number of partitions must be greater than 0"),);
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RetryErrorDatanodeHandler;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl MockDatanodeHandler for RetryErrorDatanodeHandler {
|
||||
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<AffectedRows> {
|
||||
debug!("Returning retry later for request: {request:?}, peer: {peer:?}");
|
||||
Err(Error::RetryLater {
|
||||
source: BoxedError::new(
|
||||
error::UnexpectedSnafu {
|
||||
err_msg: "retry later",
|
||||
}
|
||||
.build(),
|
||||
),
|
||||
})
|
||||
}
|
||||
|
||||
async fn handle_query(
|
||||
&self,
|
||||
_peer: &Peer,
|
||||
_request: QueryRequest,
|
||||
) -> Result<SendableRecordBatchStream> {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_datanode_create_regions_should_retry() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(RetryErrorDatanodeHandler));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
let task = test_create_table_task("foo");
|
||||
assert!(!task.create_table.create_if_not_exists);
|
||||
let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context);
|
||||
procedure.on_prepare().await.unwrap();
|
||||
let ctx = ProcedureContext {
|
||||
procedure_id: ProcedureId::random(),
|
||||
provider: Arc::new(MockContextProvider::default()),
|
||||
};
|
||||
let error = procedure.execute(&ctx).await.unwrap_err();
|
||||
assert!(error.is_retry_later());
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct UnexpectedErrorDatanodeHandler;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl MockDatanodeHandler for UnexpectedErrorDatanodeHandler {
|
||||
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<AffectedRows> {
|
||||
debug!("Returning mock error for request: {request:?}, peer: {peer:?}");
|
||||
error::UnexpectedSnafu {
|
||||
err_msg: "mock error",
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
|
||||
async fn handle_query(
|
||||
&self,
|
||||
_peer: &Peer,
|
||||
_request: QueryRequest,
|
||||
) -> Result<SendableRecordBatchStream> {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_datanode_create_regions_should_not_retry() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
let task = test_create_table_task("foo");
|
||||
assert!(!task.create_table.create_if_not_exists);
|
||||
let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context);
|
||||
procedure.on_prepare().await.unwrap();
|
||||
let ctx = ProcedureContext {
|
||||
procedure_id: ProcedureId::random(),
|
||||
provider: Arc::new(MockContextProvider::default()),
|
||||
};
|
||||
let error = procedure.execute(&ctx).await.unwrap_err();
|
||||
assert!(!error.is_retry_later());
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct NaiveDatanodeHandler;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl MockDatanodeHandler for NaiveDatanodeHandler {
|
||||
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<AffectedRows> {
|
||||
debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
|
||||
Ok(0)
|
||||
}
|
||||
|
||||
async fn handle_query(
|
||||
&self,
|
||||
_peer: &Peer,
|
||||
_request: QueryRequest,
|
||||
) -> Result<SendableRecordBatchStream> {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_create_metadata_error() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
let task = test_create_table_task("foo");
|
||||
assert!(!task.create_table.create_if_not_exists);
|
||||
let mut procedure = CreateTableProcedure::new(cluster_id, task.clone(), ddl_context.clone());
|
||||
procedure.on_prepare().await.unwrap();
|
||||
let ctx = ProcedureContext {
|
||||
procedure_id: ProcedureId::random(),
|
||||
provider: Arc::new(MockContextProvider::default()),
|
||||
};
|
||||
procedure.execute(&ctx).await.unwrap();
|
||||
let mut task = task;
|
||||
// Creates table metadata(different with the task)
|
||||
task.table_info.ident.table_id = procedure.table_id();
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.create_table_metadata(
|
||||
task.table_info.clone(),
|
||||
TableRouteValue::physical(vec![]),
|
||||
HashMap::new(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
// Triggers procedure to create table metadata
|
||||
let error = procedure.execute(&ctx).await.unwrap_err();
|
||||
assert!(!error.is_retry_later());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_create_metadata() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
let task = test_create_table_task("foo");
|
||||
assert!(!task.create_table.create_if_not_exists);
|
||||
let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context);
|
||||
procedure.on_prepare().await.unwrap();
|
||||
let ctx = ProcedureContext {
|
||||
procedure_id: ProcedureId::random(),
|
||||
provider: Arc::new(MockContextProvider::default()),
|
||||
};
|
||||
procedure.execute(&ctx).await.unwrap();
|
||||
// Triggers procedure to create table metadata
|
||||
let status = procedure.execute(&ctx).await.unwrap();
|
||||
let table_id = status.downcast_output_ref::<u32>().unwrap();
|
||||
assert_eq!(*table_id, 1024);
|
||||
}
|
||||
@@ -31,7 +31,7 @@ use table::metadata::{RawTableInfo, TableId};
|
||||
use table::table_reference::TableReference;
|
||||
|
||||
use super::utils::handle_retry_error;
|
||||
use crate::ddl::utils::handle_operate_region_error;
|
||||
use crate::ddl::utils::add_peer_context_if_needed;
|
||||
use crate::ddl::DdlContext;
|
||||
use crate::error::{Result, TableNotFoundSnafu};
|
||||
use crate::key::table_info::TableInfoValue;
|
||||
@@ -169,10 +169,10 @@ impl TruncateTableProcedure {
|
||||
let requester = requester.clone();
|
||||
|
||||
truncate_region_tasks.push(async move {
|
||||
if let Err(err) = requester.handle(request).await {
|
||||
return Err(handle_operate_region_error(datanode)(err));
|
||||
}
|
||||
Ok(())
|
||||
requester
|
||||
.handle(request)
|
||||
.await
|
||||
.map_err(add_peer_context_if_needed(datanode))
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,19 +27,17 @@ use crate::key::TableMetadataManagerRef;
|
||||
use crate::peer::Peer;
|
||||
use crate::rpc::ddl::CreateTableTask;
|
||||
|
||||
pub fn handle_operate_region_error(datanode: Peer) -> impl FnOnce(Error) -> Error {
|
||||
/// Adds [Peer] context if the error is unretryable.
|
||||
pub fn add_peer_context_if_needed(datanode: Peer) -> impl FnOnce(Error) -> Error {
|
||||
move |err| {
|
||||
if matches!(err, Error::RetryLater { .. }) {
|
||||
Error::RetryLater {
|
||||
source: BoxedError::new(err),
|
||||
}
|
||||
} else {
|
||||
Error::OperateDatanode {
|
||||
if !err.is_retry_later() {
|
||||
return Error::OperateDatanode {
|
||||
location: location!(),
|
||||
peer: datanode,
|
||||
source: BoxedError::new(err),
|
||||
}
|
||||
};
|
||||
}
|
||||
err
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,14 +12,13 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_procedure::{watcher, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId};
|
||||
use common_telemetry::tracing_context::{FutureExt, TracingContext};
|
||||
use common_telemetry::{info, tracing};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::storage::{RegionNumber, TableId};
|
||||
use store_api::storage::TableId;
|
||||
|
||||
use crate::cache_invalidator::CacheInvalidatorRef;
|
||||
use crate::datanode_manager::DatanodeManagerRef;
|
||||
@@ -27,15 +26,12 @@ use crate::ddl::alter_table::AlterTableProcedure;
|
||||
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
|
||||
use crate::ddl::create_table::CreateTableProcedure;
|
||||
use crate::ddl::drop_table::DropTableProcedure;
|
||||
use crate::ddl::table_meta::TableMetadataAllocator;
|
||||
use crate::ddl::table_meta::TableMetadataAllocatorRef;
|
||||
use crate::ddl::truncate_table::TruncateTableProcedure;
|
||||
use crate::ddl::{
|
||||
utils, DdlContext, DdlTaskExecutor, ExecutorContext, TableMetadata,
|
||||
TableMetadataAllocatorContext,
|
||||
};
|
||||
use crate::ddl::{utils, DdlContext, ExecutorContext, ProcedureExecutor};
|
||||
use crate::error::{
|
||||
self, EmptyCreateTableTasksSnafu, ProcedureOutputSnafu, RegisterProcedureLoaderSnafu, Result,
|
||||
SubmitProcedureSnafu, TableNotFoundSnafu, WaitProcedureSnafu,
|
||||
SubmitProcedureSnafu, TableNotFoundSnafu, UnsupportedSnafu, WaitProcedureSnafu,
|
||||
};
|
||||
use crate::key::table_info::TableInfoValue;
|
||||
use crate::key::table_name::TableNameKey;
|
||||
@@ -50,6 +46,8 @@ use crate::rpc::ddl::{
|
||||
AlterTableTask, CreateTableTask, DropTableTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse,
|
||||
TruncateTableTask,
|
||||
};
|
||||
use crate::rpc::procedure;
|
||||
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
|
||||
use crate::rpc::router::RegionRoute;
|
||||
use crate::table_name::TableName;
|
||||
use crate::ClusterId;
|
||||
@@ -62,7 +60,7 @@ pub struct DdlManager {
|
||||
datanode_manager: DatanodeManagerRef,
|
||||
cache_invalidator: CacheInvalidatorRef,
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
table_metadata_allocator: TableMetadataAllocator,
|
||||
table_metadata_allocator: TableMetadataAllocatorRef,
|
||||
memory_region_keeper: MemoryRegionKeeperRef,
|
||||
}
|
||||
|
||||
@@ -73,7 +71,7 @@ impl DdlManager {
|
||||
datanode_clients: DatanodeManagerRef,
|
||||
cache_invalidator: CacheInvalidatorRef,
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
table_metadata_allocator: TableMetadataAllocator,
|
||||
table_metadata_allocator: TableMetadataAllocatorRef,
|
||||
memory_region_keeper: MemoryRegionKeeperRef,
|
||||
) -> Result<Self> {
|
||||
let manager = Self {
|
||||
@@ -100,6 +98,7 @@ impl DdlManager {
|
||||
cache_invalidator: self.cache_invalidator.clone(),
|
||||
table_metadata_manager: self.table_metadata_manager.clone(),
|
||||
memory_region_keeper: self.memory_region_keeper.clone(),
|
||||
table_metadata_allocator: self.table_metadata_allocator.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -205,18 +204,10 @@ impl DdlManager {
|
||||
&self,
|
||||
cluster_id: ClusterId,
|
||||
create_table_task: CreateTableTask,
|
||||
table_route: TableRouteValue,
|
||||
region_wal_options: HashMap<RegionNumber, String>,
|
||||
) -> Result<(ProcedureId, Option<Output>)> {
|
||||
let context = self.create_context();
|
||||
|
||||
let procedure = CreateTableProcedure::new(
|
||||
cluster_id,
|
||||
create_table_task,
|
||||
table_route,
|
||||
region_wal_options,
|
||||
context,
|
||||
);
|
||||
let procedure = CreateTableProcedure::new(cluster_id, create_table_task, context);
|
||||
|
||||
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
|
||||
|
||||
@@ -470,31 +461,10 @@ async fn handle_drop_table_task(
|
||||
async fn handle_create_table_task(
|
||||
ddl_manager: &DdlManager,
|
||||
cluster_id: ClusterId,
|
||||
mut create_table_task: CreateTableTask,
|
||||
create_table_task: CreateTableTask,
|
||||
) -> Result<SubmitDdlTaskResponse> {
|
||||
let table_meta = ddl_manager
|
||||
.table_metadata_allocator
|
||||
.create(
|
||||
&TableMetadataAllocatorContext { cluster_id },
|
||||
&create_table_task,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let TableMetadata {
|
||||
table_id,
|
||||
table_route,
|
||||
region_wal_options,
|
||||
} = table_meta;
|
||||
|
||||
create_table_task.table_info.ident.table_id = table_id;
|
||||
|
||||
let (id, output) = ddl_manager
|
||||
.submit_create_table_task(
|
||||
cluster_id,
|
||||
create_table_task,
|
||||
table_route,
|
||||
region_wal_options,
|
||||
)
|
||||
.submit_create_table_task(cluster_id, create_table_task)
|
||||
.await?;
|
||||
|
||||
let procedure_id = id.to_string();
|
||||
@@ -559,8 +529,9 @@ async fn handle_create_logical_table_tasks(
|
||||
})
|
||||
}
|
||||
|
||||
/// TODO(dennis): let [`DdlManager`] implement [`ProcedureExecutor`] looks weird, find some way to refactor it.
|
||||
#[async_trait::async_trait]
|
||||
impl DdlTaskExecutor for DdlManager {
|
||||
impl ProcedureExecutor for DdlManager {
|
||||
async fn submit_ddl_task(
|
||||
&self,
|
||||
ctx: &ExecutorContext,
|
||||
@@ -598,6 +569,37 @@ impl DdlTaskExecutor for DdlManager {
|
||||
.trace(span)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn migrate_region(
|
||||
&self,
|
||||
_ctx: &ExecutorContext,
|
||||
_request: MigrateRegionRequest,
|
||||
) -> Result<MigrateRegionResponse> {
|
||||
UnsupportedSnafu {
|
||||
operation: "migrate_region",
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
|
||||
async fn query_procedure_state(
|
||||
&self,
|
||||
_ctx: &ExecutorContext,
|
||||
pid: &str,
|
||||
) -> Result<ProcedureStateResponse> {
|
||||
let pid = ProcedureId::parse_str(pid)
|
||||
.with_context(|_| error::ParseProcedureIdSnafu { key: pid })?;
|
||||
|
||||
let state = self
|
||||
.procedure_manager
|
||||
.procedure_state(pid)
|
||||
.await
|
||||
.context(error::QueryProcedureSnafu)?
|
||||
.context(error::ProcedureNotFoundSnafu {
|
||||
pid: pid.to_string(),
|
||||
})?;
|
||||
|
||||
Ok(procedure::procedure_state_to_pb_response(&state))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -644,12 +646,12 @@ mod tests {
|
||||
procedure_manager.clone(),
|
||||
Arc::new(DummyDatanodeManager),
|
||||
Arc::new(DummyCacheInvalidator),
|
||||
table_metadata_manager,
|
||||
TableMetadataAllocator::new(
|
||||
table_metadata_manager.clone(),
|
||||
Arc::new(TableMetadataAllocator::new(
|
||||
Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
|
||||
Arc::new(WalOptionsAllocator::default()),
|
||||
Arc::new(TableMetadataManager::new(kv_backend)),
|
||||
),
|
||||
table_metadata_manager.table_name_manager().clone(),
|
||||
)),
|
||||
Arc::new(MemoryRegionKeeper::default()),
|
||||
);
|
||||
|
||||
|
||||
@@ -100,6 +100,15 @@ pub enum Error {
|
||||
source: common_procedure::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to query procedure"))]
|
||||
QueryProcedure {
|
||||
location: Location,
|
||||
source: common_procedure::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Procedure not found: {pid}"))]
|
||||
ProcedureNotFound { location: Location, pid: String },
|
||||
|
||||
#[snafu(display("Failed to parse procedure id: {key}"))]
|
||||
ParseProcedureId {
|
||||
location: Location,
|
||||
@@ -431,14 +440,17 @@ impl ErrorExt for Error {
|
||||
| RenameTable { .. }
|
||||
| Unsupported { .. } => StatusCode::Internal,
|
||||
|
||||
PrimaryKeyNotFound { .. } | EmptyKey { .. } | InvalidEngineType { .. } => {
|
||||
StatusCode::InvalidArguments
|
||||
}
|
||||
ProcedureNotFound { .. }
|
||||
| PrimaryKeyNotFound { .. }
|
||||
| EmptyKey { .. }
|
||||
| InvalidEngineType { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
TableNotFound { .. } => StatusCode::TableNotFound,
|
||||
TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
|
||||
|
||||
SubmitProcedure { source, .. } | WaitProcedure { source, .. } => source.status_code(),
|
||||
SubmitProcedure { source, .. }
|
||||
| QueryProcedure { source, .. }
|
||||
| WaitProcedure { source, .. } => source.status_code(),
|
||||
RegisterProcedureLoader { source, .. } => source.status_code(),
|
||||
External { source, .. } => source.status_code(),
|
||||
OperateDatanode { source, .. } => source.status_code(),
|
||||
|
||||
@@ -144,6 +144,7 @@ impl TableNameValue {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct TableNameManager {
|
||||
kv_backend: KvBackendRef,
|
||||
}
|
||||
|
||||
@@ -325,6 +325,11 @@ impl TableRouteManager {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the [TableRouteValue::Physical] of table.
|
||||
///
|
||||
/// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if:
|
||||
/// - the physical table(`logical_or_physical_table_id`) does not exists
|
||||
/// - the corresponding physical table of the logical table(`logical_or_physical_table_id`) does not exists.
|
||||
pub async fn get_physical_table_route(
|
||||
&self,
|
||||
logical_or_physical_table_id: TableId,
|
||||
|
||||
@@ -36,6 +36,8 @@ pub mod rpc;
|
||||
pub mod sequence;
|
||||
pub mod state_store;
|
||||
pub mod table_name;
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub mod test_util;
|
||||
pub mod util;
|
||||
pub mod wal_options_allocator;
|
||||
|
||||
|
||||
@@ -363,6 +363,11 @@ impl CreateTableTask {
|
||||
table: &table.table_name,
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the `table_info`'s table_id.
|
||||
pub fn set_table_id(&mut self, table_id: TableId) {
|
||||
self.table_info.ident.table_id = table_id;
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for CreateTableTask {
|
||||
|
||||
@@ -12,6 +12,9 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
pub use api::v1::meta::{MigrateRegionResponse, ProcedureStateResponse};
|
||||
use api::v1::meta::{
|
||||
ProcedureId as PbProcedureId, ProcedureStateResponse as PbProcedureStateResponse,
|
||||
ProcedureStatus as PbProcedureStatus,
|
||||
@@ -21,6 +24,15 @@ use snafu::ResultExt;
|
||||
|
||||
use crate::error::{ParseProcedureIdSnafu, Result};
|
||||
|
||||
/// A request to migrate region.
|
||||
#[derive(Clone)]
|
||||
pub struct MigrateRegionRequest {
|
||||
pub region_id: u64,
|
||||
pub from_peer: u64,
|
||||
pub to_peer: u64,
|
||||
pub replay_timeout: Duration,
|
||||
}
|
||||
|
||||
/// Cast the protobuf [`ProcedureId`] to common [`ProcedureId`].
|
||||
pub fn pb_pid_to_pid(pid: &PbProcedureId) -> Result<ProcedureId> {
|
||||
ProcedureId::parse_str(&String::from_utf8_lossy(&pid.key)).with_context(|_| {
|
||||
|
||||
105
src/common/meta/src/test_util.rs
Normal file
105
src/common/meta/src/test_util.rs
Normal file
@@ -0,0 +1,105 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::region::{QueryRequest, RegionRequest};
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
|
||||
use crate::cache_invalidator::DummyCacheInvalidator;
|
||||
use crate::datanode_manager::{Datanode, DatanodeManager, DatanodeManagerRef, DatanodeRef};
|
||||
use crate::ddl::table_meta::TableMetadataAllocator;
|
||||
use crate::ddl::DdlContext;
|
||||
use crate::error::Result;
|
||||
use crate::key::TableMetadataManager;
|
||||
use crate::kv_backend::memory::MemoryKvBackend;
|
||||
use crate::peer::Peer;
|
||||
use crate::region_keeper::MemoryRegionKeeper;
|
||||
use crate::sequence::SequenceBuilder;
|
||||
use crate::wal_options_allocator::WalOptionsAllocator;
|
||||
|
||||
pub type AffectedRows = u64;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait MockDatanodeHandler: Sync + Send + Clone {
|
||||
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<AffectedRows>;
|
||||
|
||||
async fn handle_query(
|
||||
&self,
|
||||
peer: &Peer,
|
||||
request: QueryRequest,
|
||||
) -> Result<SendableRecordBatchStream>;
|
||||
}
|
||||
|
||||
/// A mock struct implements [DatanodeManager].
|
||||
#[derive(Clone)]
|
||||
pub struct MockDatanodeManager<T> {
|
||||
handler: T,
|
||||
}
|
||||
|
||||
impl<T> MockDatanodeManager<T> {
|
||||
pub fn new(handler: T) -> Self {
|
||||
Self { handler }
|
||||
}
|
||||
}
|
||||
|
||||
/// A mock struct implements [Datanode].
|
||||
#[derive(Clone)]
|
||||
struct MockDatanode<T> {
|
||||
peer: Peer,
|
||||
handler: T,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<T: MockDatanodeHandler> Datanode for MockDatanode<T> {
|
||||
async fn handle(&self, request: RegionRequest) -> Result<AffectedRows> {
|
||||
self.handler.handle(&self.peer, request).await
|
||||
}
|
||||
|
||||
async fn handle_query(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
|
||||
self.handler.handle_query(&self.peer, request).await
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<T: MockDatanodeHandler + 'static> DatanodeManager for MockDatanodeManager<T> {
|
||||
async fn datanode(&self, peer: &Peer) -> DatanodeRef {
|
||||
Arc::new(MockDatanode {
|
||||
peer: peer.clone(),
|
||||
handler: self.handler.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a test purpose [DdlContext].
|
||||
pub fn new_ddl_context(datanode_manager: DatanodeManagerRef) -> DdlContext {
|
||||
let kv_backend = Arc::new(MemoryKvBackend::new());
|
||||
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
|
||||
|
||||
DdlContext {
|
||||
datanode_manager,
|
||||
cache_invalidator: Arc::new(DummyCacheInvalidator),
|
||||
memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
|
||||
table_metadata_allocator: Arc::new(TableMetadataAllocator::new(
|
||||
Arc::new(
|
||||
SequenceBuilder::new("test", kv_backend)
|
||||
.initial(1024)
|
||||
.build(),
|
||||
),
|
||||
Arc::new(WalOptionsAllocator::default()),
|
||||
table_metadata_manager.table_name_manager().clone(),
|
||||
)),
|
||||
table_metadata_manager,
|
||||
}
|
||||
}
|
||||
10
src/common/plugins/Cargo.toml
Normal file
10
src/common/plugins/Cargo.toml
Normal file
@@ -0,0 +1,10 @@
|
||||
[package]
|
||||
name = "common-plugins"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
19
src/common/plugins/src/consts.rs
Normal file
19
src/common/plugins/src/consts.rs
Normal file
@@ -0,0 +1,19 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
/// Greptime preserved metrics prefix
|
||||
pub const GREPTIME_EXEC_PREFIX: &str = "greptime_exec_";
|
||||
|
||||
/// Execution cost metrics key
|
||||
pub const GREPTIME_EXEC_COST: &str = "greptime_exec_cost";
|
||||
20
src/common/plugins/src/lib.rs
Normal file
20
src/common/plugins/src/lib.rs
Normal file
@@ -0,0 +1,20 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
/// This crate is designed to be at the bottom of the depencey tree
|
||||
/// to provide common and useful utils and consts to all plugin usage,
|
||||
/// since `plugins` crate is at the top depending on crates like `frontend` and `datanode`
|
||||
mod consts;
|
||||
|
||||
pub use consts::{GREPTIME_EXEC_COST, GREPTIME_EXEC_PREFIX};
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
async-trait.workspace = true
|
||||
common-procedure.workspace = true
|
||||
|
||||
@@ -7,6 +7,9 @@ license.workspace = true
|
||||
[features]
|
||||
testing = []
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
async-stream.workspace = true
|
||||
async-trait.workspace = true
|
||||
|
||||
@@ -57,6 +57,22 @@ impl Status {
|
||||
Status::Done { output: None }
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
/// Downcasts [Status::Done]'s output to &T
|
||||
/// #Panic:
|
||||
/// - if [Status] is not the [Status::Done].
|
||||
/// - if the output is None.
|
||||
pub fn downcast_output_ref<T: 'static>(&self) -> Option<&T> {
|
||||
if let Status::Done { output } = self {
|
||||
output
|
||||
.as_ref()
|
||||
.expect("Try to downcast the output of Status::Done, but the output is None")
|
||||
.downcast_ref()
|
||||
} else {
|
||||
panic!("Expected the Status::Done, but got: {:?}", self)
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a [Status::Done] with output.
|
||||
pub fn done_with_output<T: Any + Send + Sync>(output: T) -> Status {
|
||||
Status::Done {
|
||||
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
api.workspace = true
|
||||
async-trait.workspace = true
|
||||
|
||||
@@ -178,14 +178,23 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to do procedure task"))]
|
||||
ProcedureService {
|
||||
source: BoxedError,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Missing TableMutationHandler, not expected"))]
|
||||
MissingTableMutationHandler { location: Location },
|
||||
|
||||
#[snafu(display("Missing MetaServiceHandler, not expected"))]
|
||||
MissingMetaServiceHandler { location: Location },
|
||||
#[snafu(display("Missing ProcedureServiceHandler, not expected"))]
|
||||
MissingProcedureServiceHandler { location: Location },
|
||||
|
||||
#[snafu(display("Invalid function args: {}", err_msg))]
|
||||
InvalidFuncArgs { err_msg: String, location: Location },
|
||||
|
||||
#[snafu(display("Permission denied: {}", err_msg))]
|
||||
PermissionDenied { err_msg: String, location: Location },
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -213,7 +222,7 @@ impl ErrorExt for Error {
|
||||
| Error::FromArrowArray { source, .. } => source.status_code(),
|
||||
|
||||
Error::MissingTableMutationHandler { .. }
|
||||
| Error::MissingMetaServiceHandler { .. }
|
||||
| Error::MissingProcedureServiceHandler { .. }
|
||||
| Error::ExecuteRepeatedly { .. }
|
||||
| Error::ThreadJoin { .. }
|
||||
| Error::GeneralDataFusion { .. } => StatusCode::Unexpected,
|
||||
@@ -225,7 +234,11 @@ impl ErrorExt for Error {
|
||||
Error::ConvertDfRecordBatchStream { source, .. } => source.status_code(),
|
||||
Error::ExecutePhysicalPlan { source, .. } => source.status_code(),
|
||||
Error::Execute { source, .. } => source.status_code(),
|
||||
Error::TableMutation { source, .. } => source.status_code(),
|
||||
Error::ProcedureService { source, .. } | Error::TableMutation { source, .. } => {
|
||||
source.status_code()
|
||||
}
|
||||
|
||||
Error::PermissionDenied { .. } => StatusCode::PermissionDenied,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -13,10 +13,12 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::greptime_proto::v1::add_column_location::LocationType;
|
||||
use api::greptime_proto::v1::AddColumnLocation as Location;
|
||||
use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
|
||||
use physical_plan::PhysicalPlan;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub mod columnar_value;
|
||||
@@ -32,7 +34,14 @@ use sqlparser_derive::{Visit, VisitMut};
|
||||
pub enum Output {
|
||||
AffectedRows(usize),
|
||||
RecordBatches(RecordBatches),
|
||||
Stream(SendableRecordBatchStream),
|
||||
Stream(SendableRecordBatchStream, Option<Arc<dyn PhysicalPlan>>),
|
||||
}
|
||||
|
||||
impl Output {
|
||||
// helper function to build original `Output::Stream`
|
||||
pub fn new_stream(stream: SendableRecordBatchStream) -> Self {
|
||||
Output::Stream(stream, None)
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for Output {
|
||||
@@ -42,7 +51,13 @@ impl Debug for Output {
|
||||
Output::RecordBatches(recordbatches) => {
|
||||
write!(f, "Output::RecordBatches({recordbatches:?})")
|
||||
}
|
||||
Output::Stream(_) => write!(f, "Output::Stream(<stream>)"),
|
||||
Output::Stream(_, df) => {
|
||||
if df.is_some() {
|
||||
write!(f, "Output::Stream(<stream>, Some<physical_plan>)")
|
||||
} else {
|
||||
write!(f, "Output::Stream(<stream>)")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
arc-swap = "1.6"
|
||||
common-base.workspace = true
|
||||
|
||||
@@ -182,7 +182,7 @@ pub struct RecordBatchStreamAdapter {
|
||||
enum Metrics {
|
||||
Unavailable,
|
||||
Unresolved(Arc<dyn ExecutionPlan>),
|
||||
Resolved(String),
|
||||
Resolved(RecordBatchMetrics),
|
||||
}
|
||||
|
||||
impl RecordBatchStreamAdapter {
|
||||
@@ -222,9 +222,9 @@ impl RecordBatchStream for RecordBatchStreamAdapter {
|
||||
self.schema.clone()
|
||||
}
|
||||
|
||||
fn metrics(&self) -> Option<String> {
|
||||
fn metrics(&self) -> Option<RecordBatchMetrics> {
|
||||
match &self.metrics_2 {
|
||||
Metrics::Resolved(metrics) => Some(metrics.clone()),
|
||||
Metrics::Resolved(metrics) => Some(*metrics),
|
||||
Metrics::Unavailable | Metrics::Unresolved(_) => None,
|
||||
}
|
||||
}
|
||||
@@ -254,8 +254,7 @@ impl Stream for RecordBatchStreamAdapter {
|
||||
let mut metrics_holder = RecordBatchMetrics::default();
|
||||
collect_metrics(df_plan, &mut metrics_holder);
|
||||
if metrics_holder.elapsed_compute != 0 || metrics_holder.memory_usage != 0 {
|
||||
self.metrics_2 =
|
||||
Metrics::Resolved(serde_json::to_string(&metrics_holder).unwrap());
|
||||
self.metrics_2 = Metrics::Resolved(metrics_holder);
|
||||
}
|
||||
}
|
||||
Poll::Ready(None)
|
||||
@@ -285,7 +284,7 @@ fn collect_metrics(df_plan: &Arc<dyn ExecutionPlan>, result: &mut RecordBatchMet
|
||||
|
||||
/// [`RecordBatchMetrics`] carrys metrics value
|
||||
/// from datanode to frontend through gRPC
|
||||
#[derive(serde::Serialize, serde::Deserialize, Default, Debug)]
|
||||
#[derive(serde::Serialize, serde::Deserialize, Default, Debug, Clone, Copy)]
|
||||
pub struct RecordBatchMetrics {
|
||||
// cpu consumption in nanoseconds
|
||||
pub elapsed_compute: usize,
|
||||
|
||||
@@ -21,6 +21,7 @@ pub mod util;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
|
||||
use adapter::RecordBatchMetrics;
|
||||
use arc_swap::ArcSwapOption;
|
||||
use datafusion::physical_plan::memory::MemoryStream;
|
||||
pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
|
||||
@@ -42,7 +43,7 @@ pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
|
||||
None
|
||||
}
|
||||
|
||||
fn metrics(&self) -> Option<String> {
|
||||
fn metrics(&self) -> Option<RecordBatchMetrics> {
|
||||
None
|
||||
}
|
||||
}
|
||||
@@ -212,7 +213,7 @@ pub struct RecordBatchStreamWrapper<S> {
|
||||
pub schema: SchemaRef,
|
||||
pub stream: S,
|
||||
pub output_ordering: Option<Vec<OrderOption>>,
|
||||
pub metrics: Arc<ArcSwapOption<String>>,
|
||||
pub metrics: Arc<ArcSwapOption<RecordBatchMetrics>>,
|
||||
}
|
||||
|
||||
impl<S> RecordBatchStreamWrapper<S> {
|
||||
@@ -238,8 +239,8 @@ impl<S: Stream<Item = Result<RecordBatch>> + Unpin> RecordBatchStream
|
||||
self.output_ordering.as_deref()
|
||||
}
|
||||
|
||||
fn metrics(&self) -> Option<String> {
|
||||
self.metrics.load().as_ref().map(|s| s.as_ref().clone())
|
||||
fn metrics(&self) -> Option<RecordBatchMetrics> {
|
||||
self.metrics.load().as_ref().map(|s| *s.as_ref())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
async-trait.workspace = true
|
||||
common-error.workspace = true
|
||||
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
async-recursion = "1.0"
|
||||
async-trait.workspace = true
|
||||
|
||||
@@ -8,7 +8,11 @@ license.workspace = true
|
||||
tokio-console = ["console-subscriber", "tokio/tracing"]
|
||||
deadlock_detection = ["parking_lot/deadlock_detection"]
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
atty = "0.2"
|
||||
backtrace = "0.3"
|
||||
common-error.workspace = true
|
||||
console-subscriber = { version = "0.1", optional = true }
|
||||
|
||||
@@ -132,27 +132,34 @@ pub fn init_global_logging(
|
||||
// Enable log compatible layer to convert log record to tracing span.
|
||||
LogTracer::init().expect("log tracer must be valid");
|
||||
|
||||
// stdout log layer.
|
||||
let stdout_logging_layer = if opts.append_stdout {
|
||||
let (stdout_writer, stdout_guard) = tracing_appender::non_blocking(std::io::stdout());
|
||||
guards.push(stdout_guard);
|
||||
|
||||
Some(Layer::new().with_writer(stdout_writer))
|
||||
Some(
|
||||
Layer::new()
|
||||
.with_writer(stdout_writer)
|
||||
.with_ansi(atty::is(atty::Stream::Stdout)),
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// JSON log layer.
|
||||
// file log layer.
|
||||
let rolling_appender = RollingFileAppender::new(Rotation::HOURLY, dir, app_name);
|
||||
let (rolling_writer, rolling_writer_guard) = tracing_appender::non_blocking(rolling_appender);
|
||||
let file_logging_layer = Layer::new().with_writer(rolling_writer);
|
||||
let file_logging_layer = Layer::new().with_writer(rolling_writer).with_ansi(false);
|
||||
guards.push(rolling_writer_guard);
|
||||
|
||||
// error JSON log layer.
|
||||
// error file log layer.
|
||||
let err_rolling_appender =
|
||||
RollingFileAppender::new(Rotation::HOURLY, dir, format!("{}-{}", app_name, "err"));
|
||||
let (err_rolling_writer, err_rolling_writer_guard) =
|
||||
tracing_appender::non_blocking(err_rolling_appender);
|
||||
let err_file_logging_layer = Layer::new().with_writer(err_rolling_writer);
|
||||
let err_file_logging_layer = Layer::new()
|
||||
.with_writer(err_rolling_writer)
|
||||
.with_ansi(false);
|
||||
guards.push(err_rolling_writer_guard);
|
||||
|
||||
// resolve log level settings from:
|
||||
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
client.workspace = true
|
||||
common-query.workspace = true
|
||||
|
||||
@@ -28,7 +28,7 @@ pub async fn execute_and_check_output(db: &Database, sql: &str, expected: Expect
|
||||
assert_eq!(*x, y, "actual: \n{}", x)
|
||||
}
|
||||
(Output::RecordBatches(_), ExpectedOutput::QueryResult(x))
|
||||
| (Output::Stream(_), ExpectedOutput::QueryResult(x)) => {
|
||||
| (Output::Stream(_, _), ExpectedOutput::QueryResult(x)) => {
|
||||
check_output_stream(output, x).await
|
||||
}
|
||||
_ => panic!(),
|
||||
@@ -37,7 +37,7 @@ pub async fn execute_and_check_output(db: &Database, sql: &str, expected: Expect
|
||||
|
||||
pub async fn check_output_stream(output: Output, expected: &str) {
|
||||
let recordbatches = match output {
|
||||
Output::Stream(stream) => util::collect_batches(stream).await.unwrap(),
|
||||
Output::Stream(stream, _) => util::collect_batches(stream).await.unwrap(),
|
||||
Output::RecordBatches(recordbatches) => recordbatches,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
arrow.workspace = true
|
||||
chrono.workspace = true
|
||||
|
||||
@@ -4,5 +4,8 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
build-data = "0.1.4"
|
||||
|
||||
@@ -7,6 +7,9 @@ license.workspace = true
|
||||
[features]
|
||||
testing = []
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
common-base.workspace = true
|
||||
common-telemetry.workspace = true
|
||||
|
||||
@@ -7,6 +7,9 @@ license.workspace = true
|
||||
[features]
|
||||
testing = []
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
api.workspace = true
|
||||
arrow-flight.workspace = true
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
|
||||
//! Datanode implementation.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -32,7 +31,6 @@ use common_wal::config::kafka::DatanodeKafkaConfig;
|
||||
use common_wal::config::raft_engine::RaftEngineConfig;
|
||||
use common_wal::config::DatanodeWalConfig;
|
||||
use file_engine::engine::FileRegionEngine;
|
||||
use futures::future;
|
||||
use futures_util::future::try_join_all;
|
||||
use futures_util::TryStreamExt;
|
||||
use log_store::kafka::log_store::KafkaLogStore;
|
||||
@@ -45,7 +43,7 @@ use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
|
||||
use object_store::util::normalize_dir;
|
||||
use query::QueryEngineFactory;
|
||||
use servers::export_metrics::ExportMetricsTask;
|
||||
use servers::server::{start_server, ServerHandlers};
|
||||
use servers::server::ServerHandlers;
|
||||
use servers::Mode;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::path_utils::{region_dir, WAL_DIR};
|
||||
@@ -97,7 +95,11 @@ impl Datanode {
|
||||
t.start(None).context(StartServerSnafu)?
|
||||
}
|
||||
|
||||
self.start_services().await
|
||||
self.services.start_all().await.context(StartServerSnafu)
|
||||
}
|
||||
|
||||
pub fn server_handlers(&self) -> &ServerHandlers {
|
||||
&self.services
|
||||
}
|
||||
|
||||
pub fn start_telemetry(&self) {
|
||||
@@ -127,24 +129,12 @@ impl Datanode {
|
||||
self.services = services;
|
||||
}
|
||||
|
||||
/// Start services of datanode. This method call will block until services are shutdown.
|
||||
pub async fn start_services(&mut self) -> Result<()> {
|
||||
let _ = future::try_join_all(self.services.values().map(start_server))
|
||||
.await
|
||||
.context(StartServerSnafu)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn shutdown_services(&self) -> Result<()> {
|
||||
let _ = future::try_join_all(self.services.values().map(|server| server.0.shutdown()))
|
||||
pub async fn shutdown(&self) -> Result<()> {
|
||||
self.services
|
||||
.shutdown_all()
|
||||
.await
|
||||
.context(ShutdownServerSnafu)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn shutdown(&self) -> Result<()> {
|
||||
// We must shutdown services first
|
||||
self.shutdown_services().await?;
|
||||
let _ = self.greptimedb_telemetry_task.stop().await;
|
||||
if let Some(heartbeat_task) = &self.heartbeat_task {
|
||||
heartbeat_task
|
||||
@@ -268,7 +258,7 @@ impl DatanodeBuilder {
|
||||
.context(StartServerSnafu)?;
|
||||
|
||||
Ok(Datanode {
|
||||
services: HashMap::new(),
|
||||
services: ServerHandlers::default(),
|
||||
heartbeat_task,
|
||||
region_server,
|
||||
greptimedb_telemetry_task,
|
||||
@@ -310,6 +300,7 @@ impl DatanodeBuilder {
|
||||
MemoryCatalogManager::with_default_setup(),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
self.plugins.clone(),
|
||||
);
|
||||
|
||||
@@ -655,7 +655,7 @@ impl RegionServerInner {
|
||||
Output::AffectedRows(_) | Output::RecordBatches(_) => {
|
||||
UnsupportedOutputSnafu { expected: "stream" }.fail()
|
||||
}
|
||||
Output::Stream(stream) => Ok(stream),
|
||||
Output::Stream(stream, _) => Ok(stream),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user