Compare commits

..

22 Commits

Author SHA1 Message Date
Weny Xu
4eb0771afe feat: introduce install_manifest_to for RegionManifestManager (#5742)
* feat: introduce `install_manifest_changes` for `RegionManifestManager`

* chore: rename function to `install_manifest_to`

* Apply suggestions from code review

Co-authored-by: jeremyhi <jiachun_feng@proton.me>

* chore: add comments

* chore: add comments

* chore: update logic and add comments

* chore: add more check

* Update src/mito2/src/manifest/manager.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

---------

Co-authored-by: jeremyhi <jiachun_feng@proton.me>
Co-authored-by: Yingwen <realevenyag@gmail.com>
2025-03-21 05:19:23 +00:00
Yohan Wal
a0739a96e4 fix: wrap table name with `` (#5748)
* fix: wrap table name with quotes

* fix: minor fix
2025-03-20 09:38:54 +00:00
Ning Sun
77ccf1eac8 chore: add datanode write rows to grafana dashboard (#5745) 2025-03-20 03:39:40 +00:00
Yohan Wal
1dc4a196bf feat: add mysql election logic (#5694)
* feat: add mysql election

* feat: add mysql election

* chore: fix deps

* chore: fix deps

* fix: duplicate container

* fix: duplicate setup for sqlness

* fix: call once

* fix: do not use NOWAIT for mysql 5.7

* chore: apply comments

* fix: no parallel sqlness for mysql

* chore: comments and minor revert

* chore: apply comments

* chore: apply comments

* chore: add  to table name

* ci: use 2 metasrv to detect election bugs

* refactor: better election logic

* chore: apply comments

* chore: apply comments

* feat: version check before startup
2025-03-19 11:31:18 +00:00
shuiyisong
2431cd3bdf chore: merge error files under pipeline crate (#5738) 2025-03-19 09:55:51 +00:00
discord9
cd730e0486 fix: mysql prepare limit&offset param (#5734)
* fix: prepare limit&offset param

* test: sqlness

* chore: per review

* chore: per review
2025-03-19 07:49:26 +00:00
zyy17
a19441bed8 refactor: remove trace id from primary key in opentelemetry_traces table (#5733)
* refactor: remove trace id in primary key

* refactor: remove trace id in primary key in v0 model

* refactor: add span id in v1

* fix: integration test
2025-03-19 06:17:58 +00:00
dennis zhuang
162e3b8620 docs: adds news to readme (#5735) 2025-03-19 01:33:46 +00:00
Wenbin
83642dab87 feat: remove duplicated peer definition (#5728)
* remove duplicate peer

* fix
2025-03-18 11:30:25 +00:00
discord9
46070958c9 fix: mysql prepare bool value (#5732) 2025-03-18 10:50:45 +00:00
pikady
eea8b1c730 feat: add vec_kth_elem function (#5674)
* feat: add vec_kth_elem function

Signed-off-by: pikady <2652917633@qq.com>

* code format

Signed-off-by: pikady <2652917633@qq.com>

* add test sql

Signed-off-by: pikady <2652917633@qq.com>

* change indexing from 1-based to 0-based

Signed-off-by: pikady <2652917633@qq.com>

* improve code formatting and correct spelling errors

Signed-off-by: pikady <2652917633@qq.com>

* Update tests/cases/standalone/common/function/vector/vector.sql

I noticed the two lines are identical. Could you clarify the reason for the change? Thanks!

Co-authored-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: pikady <2652917633@qq.com>
Co-authored-by: Zhenchi <zhongzc_arch@outlook.com>
2025-03-18 07:25:53 +00:00
Ning Sun
1ab4ddab8d feat: update pipeline header name to x-greptime-pipeline-name (#5710)
* feat: update pipeline header name to x-greptime-pipeline-name

* refactor: update string_value_from_header
2025-03-18 02:39:54 +00:00
Ning Sun
9e63018198 feat: disable http timeout (#5721)
* feat: update to disable http timeout by default

* feat: make http timeout default to 0

* test: correct test case

* chore: generate new config doc

* test: correct tests
2025-03-18 01:18:56 +00:00
discord9
594bec8c36 feat: load manifest manually in mito engine (#5725)
* feat: load manifest and some

* chore: per review
2025-03-18 01:18:08 +00:00
localhost
1586732d20 chore: add some method for log query handler (#5685)
* chore: add some method for log query handler

* chore: make clippy happy

* chore: add some method for log query handler

* Update src/frontend/src/instance/logs.rs

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>

---------

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>
2025-03-17 18:36:43 +00:00
yihong
16fddd97a7 chore: revert commit update flate2 version (#5706)" (#5715)
Revert "chore: update flate2 version (#5706)"

This reverts commit a5df3954f3.
2025-03-17 12:16:26 +00:00
Ning Sun
2260782c12 refactor: update jaeger api implementation for new trace modeling (#5655)
* refactor: update jaeger api implementation

* test: add tests for v1 data model

* feat: customize trace table name

* fix: update column requirements to use Column type instead of String

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: lint fix

* refactor: accumulate resource attributes for v1

* fix: add empty check for additional string

* feat: add table option to mark data model version

* fix: do not overwrite all tags

* feat: use table option to mark table data model version and process accordingly

* chore: update comments to reflect query changes

* feat: use header for jaeger table name

* feat: update index for service_name, drop index for span_name

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: zyy17 <zyylsxm@gmail.com>
2025-03-17 07:31:32 +00:00
Sicong Hu
09dacc8e9b feat: add vec_subvector function (#5683)
* feat: add vec_subvector function

* change datatype of arg1 and arg2 from u64 to i64

* add sqlness test

* improve description comments
2025-03-16 10:43:53 +00:00
Ruihang Xia
dec439db2b chore: bump version to 0.14.0 (#5711)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-03-16 09:58:19 +00:00
Ning Sun
dc76571166 feat: move default data path from /tmp to current directory (#5719) 2025-03-16 09:57:46 +00:00
shuiyisong
3e17f8c426 chore: use Bytes instead of string in bulk ingestion (#5717)
chore: use bytes instead of string in bulk log ingestion
2025-03-14 09:31:35 +00:00
yihong
a5df3954f3 chore: update flate2 version (#5706)
Signed-off-by: yihong0618 <zouzou0208@gmail.com>
2025-03-14 02:15:27 +00:00
181 changed files with 4119 additions and 4191 deletions

View File

@@ -47,6 +47,7 @@ runs:
shell: pwsh
run: make test sqlness-test
env:
RUSTUP_WINDOWS_PATH_ADD_BIN: 1 # Workaround for https://github.com/nextest-rs/nextest/issues/1493
RUST_BACKTRACE: 1
SQLNESS_OPTS: "--preserve-state"

View File

@@ -8,7 +8,7 @@ inputs:
default: 2
description: "Number of Datanode replicas"
meta-replicas:
default: 1
default: 2
description: "Number of Metasrv replicas"
image-registry:
default: "docker.io"

View File

@@ -576,9 +576,12 @@ jobs:
- name: "Remote WAL"
opts: "-w kafka -k 127.0.0.1:9092"
kafka: true
- name: "Pg Kvbackend"
- name: "PostgreSQL KvBackend"
opts: "--setup-pg"
kafka: false
- name: "MySQL Kvbackend"
opts: "--setup-mysql"
kafka: false
timeout-minutes: 60
steps:
- uses: actions/checkout@v4

View File

@@ -107,6 +107,7 @@ jobs:
CARGO_BUILD_RUSTFLAGS: "-C linker=lld-link"
RUST_BACKTRACE: 1
CARGO_INCREMENTAL: 0
RUSTUP_WINDOWS_PATH_ADD_BIN: 1 # Workaround for https://github.com/nextest-rs/nextest/issues/1493
GT_S3_BUCKET: ${{ vars.AWS_CI_TEST_BUCKET }}
GT_S3_ACCESS_KEY_ID: ${{ secrets.AWS_CI_TEST_ACCESS_KEY_ID }}
GT_S3_ACCESS_KEY: ${{ secrets.AWS_CI_TEST_SECRET_ACCESS_KEY }}

View File

@@ -91,7 +91,7 @@ env:
# The scheduled version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-YYYYMMDD', like v0.2.0-nigthly-20230313;
NIGHTLY_RELEASE_PREFIX: nightly
# Note: The NEXT_RELEASE_VERSION should be modified manually by every formal release.
NEXT_RELEASE_VERSION: v0.13.0
NEXT_RELEASE_VERSION: v0.14.0
jobs:
allocate-runners:

3
.gitignore vendored
View File

@@ -54,3 +54,6 @@ tests-fuzz/corpus/
# Nix
.direnv
.envrc
## default data home
greptimedb_data

147
Cargo.lock generated
View File

@@ -185,7 +185,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
[[package]]
name = "api"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"common-base",
"common-decimal",
@@ -710,7 +710,7 @@ dependencies = [
[[package]]
name = "auth"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"api",
"async-trait",
@@ -1324,7 +1324,7 @@ dependencies = [
[[package]]
name = "cache"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"catalog",
"common-error",
@@ -1348,7 +1348,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "catalog"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"api",
"arrow",
@@ -1661,7 +1661,7 @@ checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97"
[[package]]
name = "cli"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"async-trait",
"auth",
@@ -1704,7 +1704,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.13.0",
"substrait 0.14.0",
"table",
"tempfile",
"tokio",
@@ -1713,7 +1713,7 @@ dependencies = [
[[package]]
name = "client"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"api",
"arc-swap",
@@ -1740,7 +1740,7 @@ dependencies = [
"rand",
"serde_json",
"snafu 0.8.5",
"substrait 0.13.0",
"substrait 0.14.0",
"substrait 0.37.3",
"tokio",
"tokio-stream",
@@ -1781,7 +1781,7 @@ dependencies = [
[[package]]
name = "cmd"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"async-trait",
"auth",
@@ -1842,7 +1842,7 @@ dependencies = [
"similar-asserts",
"snafu 0.8.5",
"store-api",
"substrait 0.13.0",
"substrait 0.14.0",
"table",
"temp-env",
"tempfile",
@@ -1888,7 +1888,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
[[package]]
name = "common-base"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"anymap2",
"async-trait",
@@ -1910,11 +1910,11 @@ dependencies = [
[[package]]
name = "common-catalog"
version = "0.13.0"
version = "0.14.0"
[[package]]
name = "common-config"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"common-base",
"common-error",
@@ -1939,7 +1939,7 @@ dependencies = [
[[package]]
name = "common-datasource"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"arrow",
"arrow-schema",
@@ -1975,7 +1975,7 @@ dependencies = [
[[package]]
name = "common-decimal"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"bigdecimal 0.4.5",
"common-error",
@@ -1988,7 +1988,7 @@ dependencies = [
[[package]]
name = "common-error"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"http 1.1.0",
"snafu 0.8.5",
@@ -1998,7 +1998,7 @@ dependencies = [
[[package]]
name = "common-frontend"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"async-trait",
"common-error",
@@ -2008,7 +2008,7 @@ dependencies = [
[[package]]
name = "common-function"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -2059,7 +2059,7 @@ dependencies = [
[[package]]
name = "common-greptimedb-telemetry"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"async-trait",
"common-runtime",
@@ -2076,7 +2076,7 @@ dependencies = [
[[package]]
name = "common-grpc"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"api",
"arrow-flight",
@@ -2104,7 +2104,7 @@ dependencies = [
[[package]]
name = "common-grpc-expr"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"api",
"common-base",
@@ -2123,7 +2123,7 @@ dependencies = [
[[package]]
name = "common-macro"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"arc-swap",
"common-query",
@@ -2137,7 +2137,7 @@ dependencies = [
[[package]]
name = "common-mem-prof"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"common-error",
"common-macro",
@@ -2150,7 +2150,7 @@ dependencies = [
[[package]]
name = "common-meta"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"anymap2",
"api",
@@ -2211,7 +2211,7 @@ dependencies = [
[[package]]
name = "common-options"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"common-grpc",
"humantime-serde",
@@ -2220,11 +2220,11 @@ dependencies = [
[[package]]
name = "common-plugins"
version = "0.13.0"
version = "0.14.0"
[[package]]
name = "common-pprof"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"common-error",
"common-macro",
@@ -2236,7 +2236,7 @@ dependencies = [
[[package]]
name = "common-procedure"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"async-stream",
"async-trait",
@@ -2263,16 +2263,15 @@ dependencies = [
[[package]]
name = "common-procedure-test"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"async-trait",
"common-procedure",
"snafu 0.8.5",
]
[[package]]
name = "common-query"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"api",
"async-trait",
@@ -2298,7 +2297,7 @@ dependencies = [
[[package]]
name = "common-recordbatch"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"arc-swap",
"common-error",
@@ -2317,7 +2316,7 @@ dependencies = [
[[package]]
name = "common-runtime"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -2347,7 +2346,7 @@ dependencies = [
[[package]]
name = "common-telemetry"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"atty",
"backtrace",
@@ -2375,7 +2374,7 @@ dependencies = [
[[package]]
name = "common-test-util"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"client",
"common-query",
@@ -2387,7 +2386,7 @@ dependencies = [
[[package]]
name = "common-time"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"arrow",
"chrono",
@@ -2405,7 +2404,7 @@ dependencies = [
[[package]]
name = "common-version"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"build-data",
"const_format",
@@ -2415,7 +2414,7 @@ dependencies = [
[[package]]
name = "common-wal"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"common-base",
"common-error",
@@ -3346,7 +3345,7 @@ dependencies = [
[[package]]
name = "datanode"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"api",
"arrow-flight",
@@ -3398,7 +3397,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.13.0",
"substrait 0.14.0",
"table",
"tokio",
"toml 0.8.19",
@@ -3407,7 +3406,7 @@ dependencies = [
[[package]]
name = "datatypes"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"arrow",
"arrow-array",
@@ -4051,7 +4050,7 @@ dependencies = [
[[package]]
name = "file-engine"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"api",
"async-trait",
@@ -4161,7 +4160,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
[[package]]
name = "flow"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"api",
"arrow",
@@ -4223,7 +4222,7 @@ dependencies = [
"snafu 0.8.5",
"store-api",
"strum 0.25.0",
"substrait 0.13.0",
"substrait 0.14.0",
"table",
"tokio",
"tonic 0.12.3",
@@ -4278,7 +4277,7 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
[[package]]
name = "frontend"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"api",
"arc-swap",
@@ -4706,7 +4705,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=2be0f36b3264e28ab0e1c22a980d0bb634eb3a77#2be0f36b3264e28ab0e1c22a980d0bb634eb3a77"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a7274ddce299f33d23dbe8af5bbe6219f07c559a#a7274ddce299f33d23dbe8af5bbe6219f07c559a"
dependencies = [
"prost 0.13.3",
"serde",
@@ -5546,7 +5545,7 @@ dependencies = [
[[package]]
name = "index"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"async-trait",
"asynchronous-codec",
@@ -6339,7 +6338,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]]
name = "log-query"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"chrono",
"common-error",
@@ -6351,7 +6350,7 @@ dependencies = [
[[package]]
name = "log-store"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"async-stream",
"async-trait",
@@ -6644,7 +6643,7 @@ dependencies = [
[[package]]
name = "meta-client"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"api",
"async-trait",
@@ -6671,7 +6670,7 @@ dependencies = [
[[package]]
name = "meta-srv"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"api",
"async-trait",
@@ -6721,6 +6720,7 @@ dependencies = [
"servers",
"session",
"snafu 0.8.5",
"sqlx",
"store-api",
"strum 0.25.0",
"table",
@@ -6758,7 +6758,7 @@ dependencies = [
[[package]]
name = "metric-engine"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"api",
"aquamarine",
@@ -6856,7 +6856,7 @@ dependencies = [
[[package]]
name = "mito2"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"api",
"aquamarine",
@@ -7553,7 +7553,7 @@ dependencies = [
[[package]]
name = "object-store"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"anyhow",
"bytes",
@@ -7802,7 +7802,7 @@ dependencies = [
[[package]]
name = "operator"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -7850,7 +7850,7 @@ dependencies = [
"sql",
"sqlparser 0.52.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=71dd86058d2af97b9925093d40c4e03360403170)",
"store-api",
"substrait 0.13.0",
"substrait 0.14.0",
"table",
"tokio",
"tokio-util",
@@ -8087,7 +8087,7 @@ dependencies = [
[[package]]
name = "partition"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"api",
"async-trait",
@@ -8355,7 +8355,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pipeline"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -8495,7 +8495,7 @@ dependencies = [
[[package]]
name = "plugins"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"auth",
"clap 4.5.19",
@@ -8757,7 +8757,7 @@ dependencies = [
[[package]]
name = "promql"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"ahash 0.8.11",
"async-trait",
@@ -9004,7 +9004,7 @@ dependencies = [
[[package]]
name = "puffin"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"async-compression 0.4.13",
"async-trait",
@@ -9045,7 +9045,7 @@ dependencies = [
[[package]]
name = "query"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -9110,7 +9110,7 @@ dependencies = [
"sqlparser 0.52.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=71dd86058d2af97b9925093d40c4e03360403170)",
"statrs",
"store-api",
"substrait 0.13.0",
"substrait 0.14.0",
"table",
"tokio",
"tokio-stream",
@@ -10465,7 +10465,7 @@ dependencies = [
[[package]]
name = "servers"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -10582,7 +10582,7 @@ dependencies = [
[[package]]
name = "session"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"api",
"arc-swap",
@@ -10891,7 +10891,7 @@ dependencies = [
[[package]]
name = "sql"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"api",
"chrono",
@@ -10945,7 +10945,7 @@ dependencies = [
[[package]]
name = "sqlness-runner"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -11262,7 +11262,7 @@ dependencies = [
[[package]]
name = "store-api"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"api",
"aquamarine",
@@ -11392,7 +11392,7 @@ dependencies = [
[[package]]
name = "substrait"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"async-trait",
"bytes",
@@ -11573,7 +11573,7 @@ dependencies = [
[[package]]
name = "table"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"api",
"async-trait",
@@ -11824,7 +11824,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
[[package]]
name = "tests-fuzz"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"arbitrary",
"async-trait",
@@ -11868,7 +11868,7 @@ dependencies = [
[[package]]
name = "tests-integration"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"api",
"arrow-flight",
@@ -11921,6 +11921,7 @@ dependencies = [
"operator",
"partition",
"paste",
"pipeline",
"prost 0.13.3",
"query",
"rand",
@@ -11934,7 +11935,7 @@ dependencies = [
"sql",
"sqlx",
"store-api",
"substrait 0.13.0",
"substrait 0.14.0",
"table",
"tempfile",
"time",

View File

@@ -67,7 +67,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.13.0"
version = "0.14.0"
edition = "2021"
license = "Apache-2.0"
@@ -129,7 +129,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2be0f36b3264e28ab0e1c22a980d0bb634eb3a77" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a7274ddce299f33d23dbe8af5bbe6219f07c559a" }
hex = "0.4"
http = "1"
humantime = "2.1"
@@ -191,6 +191,8 @@ snafu = "0.8"
sqlx = { version = "0.8", features = [
"runtime-tokio-rustls",
"mysql",
"postgres",
"chrono",
] }
sysinfo = "0.30"
# on branch v0.52.x

View File

@@ -6,7 +6,7 @@
</picture>
</p>
<h2 align="center">Unified & Cost-Effective Time Series Database for Metrics, Logs, and Events</h2>
<h2 align="center">Unified & Cost-Effective Observerability Database for Metrics, Logs, and Events</h2>
<div align="center">
<h3 align="center">
@@ -62,15 +62,19 @@
## Introduction
**GreptimeDB** is an open-source unified & cost-effective time-series database for **Metrics**, **Logs**, and **Events** (also **Traces** in plan). You can gain real-time insights from Edge to Cloud at Any Scale.
**GreptimeDB** is an open-source unified & cost-effective observerability database for **Metrics**, **Logs**, and **Events** (also **Traces** in plan). You can gain real-time insights from Edge to Cloud at Any Scale.
## News
**[GreptimeDB archives 1 billion cold run #1 in JSONBench!](https://greptime.com/blogs/2025-03-18-jsonbench-greptimedb-performance)**
## Why GreptimeDB
Our core developers have been building time-series data platforms for years. Based on our best practices, GreptimeDB was born to give you:
Our core developers have been building observerability data platforms for years. Based on our best practices, GreptimeDB was born to give you:
* **Unified Processing of Metrics, Logs, and Events**
GreptimeDB unifies time series data processing by treating all data - whether metrics, logs, or events - as timestamped events with context. Users can analyze this data using either [SQL](https://docs.greptime.com/user-guide/query-data/sql) or [PromQL](https://docs.greptime.com/user-guide/query-data/promql) and leverage stream processing ([Flow](https://docs.greptime.com/user-guide/flow-computation/overview)) to enable continuous aggregation. [Read more](https://docs.greptime.com/user-guide/concepts/data-model).
GreptimeDB unifies observerability data processing by treating all data - whether metrics, logs, or events - as timestamped events with context. Users can analyze this data using either [SQL](https://docs.greptime.com/user-guide/query-data/sql) or [PromQL](https://docs.greptime.com/user-guide/query-data/promql) and leverage stream processing ([Flow](https://docs.greptime.com/user-guide/flow-computation/overview)) to enable continuous aggregation. [Read more](https://docs.greptime.com/user-guide/concepts/data-model).
* **Cloud-native Distributed Database**
@@ -112,7 +116,7 @@ Start a GreptimeDB container with:
```shell
docker run -p 127.0.0.1:4000-4003:4000-4003 \
-v "$(pwd)/greptimedb:/tmp/greptimedb" \
-v "$(pwd)/greptimedb:./greptimedb_data" \
--name greptime --rm \
greptime/greptimedb:latest standalone start \
--http-addr 0.0.0.0:4000 \

View File

@@ -24,7 +24,7 @@
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
| `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default<br/>This allows browser to access http APIs without CORS restrictions |
| `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. |
@@ -101,7 +101,7 @@
| `flow` | -- | -- | flow engine options. |
| `flow.num_workers` | Integer | `0` | The number of flow worker in flownode.<br/>Not setting(or set to 0) this value will use the number of CPU cores divided by 2. |
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. |
| `storage.data_home` | String | `./greptimedb_data/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
@@ -181,7 +181,7 @@
| `region_engine.metric` | -- | -- | Metric engine options. |
| `region_engine.metric.experimental_sparse_primary_key_encoding` | Bool | `false` | Whether to enable the experimental sparse primary key encoding. |
| `logging` | -- | -- | The logging options. |
| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |
@@ -222,7 +222,7 @@
| `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
| `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default<br/>This allows browser to access http APIs without CORS restrictions |
| `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. |
@@ -279,7 +279,7 @@
| `datanode.client.connect_timeout` | String | `10s` | -- |
| `datanode.client.tcp_nodelay` | Bool | `true` | -- |
| `logging` | -- | -- | The logging options. |
| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |
@@ -308,7 +308,7 @@
| Key | Type | Default | Descriptions |
| --- | -----| ------- | ----------- |
| `data_home` | String | `/tmp/metasrv/` | The working home directory. |
| `data_home` | String | `./greptimedb_data/metasrv/` | The working home directory. |
| `bind_addr` | String | `127.0.0.1:3002` | The bind address of metasrv. |
| `server_addr` | String | `127.0.0.1:3002` | The communication server address for the frontend and datanode to connect to metasrv.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `bind_addr`. |
| `store_addrs` | Array | -- | Store server address default to etcd store.<br/>For postgres store, the format is:<br/>"password=password dbname=postgres user=postgres host=localhost port=5432"<br/>For etcd store, the format is:<br/>"127.0.0.1:2379" |
@@ -352,7 +352,7 @@
| `wal.backoff_base` | Integer | `2` | Exponential backoff rate, i.e. next backoff = base * current backoff. |
| `wal.backoff_deadline` | String | `5mins` | Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate. |
| `logging` | -- | -- | The logging options. |
| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |
@@ -390,7 +390,7 @@
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. Enabled by default. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.bind_addr` | String | `127.0.0.1:3001` | The address to bind the gRPC server. |
@@ -442,7 +442,7 @@
| `wal.dump_index_interval` | String | `60s` | The interval for dumping WAL indexes.<br/>**It's only used when the provider is `kafka`**. |
| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.<br/>**It's only used when the provider is `kafka`**.<br/><br/>This option ensures that when Kafka messages are deleted, the system<br/>can still successfully replay memtable data without throwing an<br/>out-of-range error.<br/>However, enabling this option might lead to unexpected data loss,<br/>as the system will skip over missing entries instead of treating<br/>them as critical errors. |
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. |
| `storage.data_home` | String | `./greptimedb_data/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
@@ -522,7 +522,7 @@
| `region_engine.metric` | -- | -- | Metric engine options. |
| `region_engine.metric.experimental_sparse_primary_key_encoding` | Bool | `false` | Whether to enable the experimental sparse primary key encoding. |
| `logging` | -- | -- | The logging options. |
| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |
@@ -563,7 +563,7 @@
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
| `meta_client` | -- | -- | The metasrv client options. |
| `meta_client.metasrv_addrs` | Array | -- | The addresses of the metasrv. |
@@ -579,7 +579,7 @@
| `heartbeat.interval` | String | `3s` | Interval for sending heartbeat messages to the metasrv. |
| `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. |
| `logging` | -- | -- | The logging options. |
| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |

View File

@@ -27,7 +27,7 @@ max_concurrent_queries = 0
## The address to bind the HTTP server.
addr = "127.0.0.1:4000"
## HTTP request timeout. Set to 0 to disable timeout.
timeout = "30s"
timeout = "0s"
## HTTP request body limit.
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
## Set to 0 to disable limit.
@@ -119,7 +119,7 @@ provider = "raft_engine"
## The directory to store the WAL files.
## **It's only used when the provider is `raft_engine`**.
## @toml2docs:none-default
dir = "/tmp/greptimedb/wal"
dir = "./greptimedb_data/wal"
## The size of the WAL segment file.
## **It's only used when the provider is `raft_engine`**.
@@ -265,7 +265,7 @@ overwrite_entry_start_id = false
## The data storage options.
[storage]
## The working home directory.
data_home = "/tmp/greptimedb/"
data_home = "./greptimedb_data/"
## The storage type used to store the data.
## - `File`: the data is stored in the local file system.
@@ -618,7 +618,7 @@ experimental_sparse_primary_key_encoding = false
## The logging options.
[logging]
## The directory to store the log files. If set to empty, logs will not be written to files.
dir = "/tmp/greptimedb/logs"
dir = "./greptimedb_data/logs"
## The log level. Can be `info`/`debug`/`warn`/`error`.
## @toml2docs:none-default

View File

@@ -30,7 +30,7 @@ max_send_message_size = "512MB"
## The address to bind the HTTP server.
addr = "127.0.0.1:4000"
## HTTP request timeout. Set to 0 to disable timeout.
timeout = "30s"
timeout = "0s"
## HTTP request body limit.
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
## Set to 0 to disable limit.
@@ -76,7 +76,7 @@ retry_interval = "3s"
## The logging options.
[logging]
## The directory to store the log files. If set to empty, logs will not be written to files.
dir = "/tmp/greptimedb/logs"
dir = "./greptimedb_data/logs"
## The log level. Can be `info`/`debug`/`warn`/`error`.
## @toml2docs:none-default
@@ -121,4 +121,3 @@ sample_ratio = 1.0
## The tokio console address.
## @toml2docs:none-default
#+ tokio_console_addr = "127.0.0.1"

View File

@@ -26,7 +26,7 @@ retry_interval = "3s"
## The address to bind the HTTP server.
addr = "127.0.0.1:4000"
## HTTP request timeout. Set to 0 to disable timeout.
timeout = "30s"
timeout = "0s"
## HTTP request body limit.
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
## Set to 0 to disable limit.
@@ -189,7 +189,7 @@ tcp_nodelay = true
## The logging options.
[logging]
## The directory to store the log files. If set to empty, logs will not be written to files.
dir = "/tmp/greptimedb/logs"
dir = "./greptimedb_data/logs"
## The log level. Can be `info`/`debug`/`warn`/`error`.
## @toml2docs:none-default

View File

@@ -1,5 +1,5 @@
## The working home directory.
data_home = "/tmp/metasrv/"
data_home = "./greptimedb_data/metasrv/"
## The bind address of metasrv.
bind_addr = "127.0.0.1:3002"
@@ -177,7 +177,7 @@ backoff_deadline = "5mins"
## The logging options.
[logging]
## The directory to store the log files. If set to empty, logs will not be written to files.
dir = "/tmp/greptimedb/logs"
dir = "./greptimedb_data/logs"
## The log level. Can be `info`/`debug`/`warn`/`error`.
## @toml2docs:none-default

View File

@@ -34,7 +34,7 @@ max_concurrent_queries = 0
## The address to bind the HTTP server.
addr = "127.0.0.1:4000"
## HTTP request timeout. Set to 0 to disable timeout.
timeout = "30s"
timeout = "0s"
## HTTP request body limit.
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
## Set to 0 to disable limit.
@@ -164,7 +164,7 @@ provider = "raft_engine"
## The directory to store the WAL files.
## **It's only used when the provider is `raft_engine`**.
## @toml2docs:none-default
dir = "/tmp/greptimedb/wal"
dir = "./greptimedb_data/wal"
## The size of the WAL segment file.
## **It's only used when the provider is `raft_engine`**.
@@ -352,7 +352,7 @@ retry_delay = "500ms"
## The data storage options.
[storage]
## The working home directory.
data_home = "/tmp/greptimedb/"
data_home = "./greptimedb_data/"
## The storage type used to store the data.
## - `File`: the data is stored in the local file system.
@@ -705,7 +705,7 @@ experimental_sparse_primary_key_encoding = false
## The logging options.
[logging]
## The directory to store the log files. If set to empty, logs will not be written to files.
dir = "/tmp/greptimedb/logs"
dir = "./greptimedb_data/logs"
## The log level. Can be `info`/`debug`/`warn`/`error`.
## @toml2docs:none-default

View File

@@ -25,7 +25,7 @@ services:
- --initial-cluster-state=new
- *etcd_initial_cluster_token
volumes:
- /tmp/greptimedb-cluster-docker-compose/etcd0:/var/lib/etcd
- ./greptimedb-cluster-docker-compose/etcd0:/var/lib/etcd
healthcheck:
test: [ "CMD", "etcdctl", "--endpoints=http://etcd0:2379", "endpoint", "health" ]
interval: 5s
@@ -68,12 +68,13 @@ services:
- datanode
- start
- --node-id=0
- --data-home=/greptimedb_data
- --rpc-bind-addr=0.0.0.0:3001
- --rpc-server-addr=datanode0:3001
- --metasrv-addrs=metasrv:3002
- --http-addr=0.0.0.0:5000
volumes:
- /tmp/greptimedb-cluster-docker-compose/datanode0:/tmp/greptimedb
- ./greptimedb-cluster-docker-compose/datanode0:/greptimedb_data
healthcheck:
test: [ "CMD", "curl", "-fv", "http://datanode0:5000/health" ]
interval: 5s

View File

@@ -4782,7 +4782,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Current counts for stalled write requests by instance\n\nWrite stalls when memtable is full and pending for flush\n\n",
"description": "Ingestion size by row counts.",
"fieldConfig": {
"defaults": {
"color": {
@@ -4844,7 +4844,7 @@
"x": 12,
"y": 138
},
"id": 221,
"id": 277,
"options": {
"legend": {
"calcs": [],
@@ -4864,14 +4864,14 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "sum by(pod) (greptime_mito_write_stall_total{pod=~\"$datanode\"})",
"expr": "rate(greptime_mito_write_rows_total{pod=~\"$datanode\"}[$__rate_interval])",
"instant": false,
"legendFormat": "{{pod}}",
"range": true,
"refId": "A"
}
],
"title": "Write Stall per Instance",
"title": "Write Rows per Instance",
"type": "timeseries"
},
{
@@ -4976,7 +4976,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Cache size by instance.\n",
"description": "Current counts for stalled write requests by instance\n\nWrite stalls when memtable is full and pending for flush\n\n",
"fieldConfig": {
"defaults": {
"color": {
@@ -5028,7 +5028,7 @@
}
]
},
"unit": "decbytes"
"unit": "none"
},
"overrides": []
},
@@ -5038,7 +5038,7 @@
"x": 12,
"y": 146
},
"id": 229,
"id": 221,
"options": {
"legend": {
"calcs": [],
@@ -5058,14 +5058,14 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "greptime_mito_cache_bytes{pod=~\"$datanode\"}",
"expr": "sum by(pod) (greptime_mito_write_stall_total{pod=~\"$datanode\"})",
"instant": false,
"legendFormat": "{{pod}}-{{type}}",
"legendFormat": "{{pod}}",
"range": true,
"refId": "A"
}
],
"title": "Cached Bytes per Instance",
"title": "Write Stall per Instance",
"type": "timeseries"
},
{
@@ -5172,7 +5172,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "P99 latency of each type of reads by instance",
"description": "Cache size by instance.\n",
"fieldConfig": {
"defaults": {
"color": {
@@ -5224,7 +5224,7 @@
}
]
},
"unit": "s"
"unit": "decbytes"
},
"overrides": []
},
@@ -5234,17 +5234,13 @@
"x": 12,
"y": 154
},
"id": 228,
"id": 229,
"options": {
"legend": {
"calcs": [
"lastNotNull"
],
"calcs": [],
"displayMode": "table",
"placement": "bottom",
"showLegend": true,
"sortBy": "Last *",
"sortDesc": true
"showLegend": true
},
"tooltip": {
"mode": "single",
@@ -5258,14 +5254,14 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, sum by(pod, le, stage) (rate(greptime_mito_read_stage_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
"expr": "greptime_mito_cache_bytes{pod=~\"$datanode\"}",
"instant": false,
"legendFormat": "{{pod}}-{{stage}}-p99",
"legendFormat": "{{pod}}-{{type}}",
"range": true,
"refId": "A"
}
],
"title": "Read Stage P99 per Instance",
"title": "Cached Bytes per Instance",
"type": "timeseries"
},
{
@@ -5317,7 +5313,8 @@
"mode": "absolute",
"steps": [
{
"color": "green"
"color": "green",
"value": null
},
{
"color": "red",
@@ -5370,7 +5367,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Latency of compaction task, at p99",
"description": "P99 latency of each type of reads by instance",
"fieldConfig": {
"defaults": {
"color": {
@@ -5414,7 +5411,8 @@
"mode": "absolute",
"steps": [
{
"color": "green"
"color": "green",
"value": null
},
{
"color": "red",
@@ -5432,7 +5430,7 @@
"x": 12,
"y": 162
},
"id": 230,
"id": 228,
"options": {
"legend": {
"calcs": [
@@ -5440,7 +5438,9 @@
],
"displayMode": "table",
"placement": "bottom",
"showLegend": true
"showLegend": true,
"sortBy": "Last *",
"sortDesc": true
},
"tooltip": {
"mode": "single",
@@ -5454,14 +5454,14 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, sum by(pod, le) (rate(greptime_mito_compaction_total_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
"expr": "histogram_quantile(0.99, sum by(pod, le, stage) (rate(greptime_mito_read_stage_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
"instant": false,
"legendFormat": "[{{pod}}]-compaction-p99",
"legendFormat": "{{pod}}-{{stage}}-p99",
"range": true,
"refId": "A"
}
],
"title": "Compaction P99 per Instance",
"title": "Read Stage P99 per Instance",
"type": "timeseries"
},
{
@@ -5570,7 +5570,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Compaction latency by stage",
"description": "Latency of compaction task, at p99",
"fieldConfig": {
"defaults": {
"color": {
@@ -5632,7 +5632,7 @@
"x": 12,
"y": 170
},
"id": 232,
"id": 230,
"options": {
"legend": {
"calcs": [
@@ -5654,9 +5654,9 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, sum by(pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
"expr": "histogram_quantile(0.99, sum by(pod, le) (rate(greptime_mito_compaction_total_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
"instant": false,
"legendFormat": "{{pod}}-{{stage}}-p99",
"legendFormat": "[{{pod}}]-compaction-p99",
"range": true,
"refId": "A"
}
@@ -5794,7 +5794,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Write-ahead log operations latency at p99",
"description": "Compaction latency by stage",
"fieldConfig": {
"defaults": {
"color": {
@@ -5856,13 +5856,13 @@
"x": 12,
"y": 178
},
"id": 269,
"id": 232,
"options": {
"legend": {
"calcs": [
"lastNotNull"
],
"displayMode": "list",
"displayMode": "table",
"placement": "bottom",
"showLegend": true
},
@@ -5878,14 +5878,14 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, sum by(le,logstore,optype,pod) (rate(greptime_logstore_op_elapsed_bucket[$__rate_interval])))",
"expr": "histogram_quantile(0.99, sum by(pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
"instant": false,
"legendFormat": "{{pod}}-{{logstore}}-{{optype}}-p99",
"legendFormat": "{{pod}}-{{stage}}-p99",
"range": true,
"refId": "A"
}
],
"title": "Log Store op duration seconds",
"title": "Compaction P99 per Instance",
"type": "timeseries"
},
{
@@ -5993,7 +5993,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Ongoing compaction task count",
"description": "Write-ahead log operations latency at p99",
"fieldConfig": {
"defaults": {
"color": {
@@ -6045,7 +6045,7 @@
}
]
},
"unit": "none"
"unit": "s"
},
"overrides": []
},
@@ -6055,13 +6055,13 @@
"x": 12,
"y": 186
},
"id": 271,
"id": 269,
"options": {
"legend": {
"calcs": [
"lastNotNull"
],
"displayMode": "table",
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
@@ -6078,14 +6078,14 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "greptime_mito_inflight_compaction_count",
"expr": "histogram_quantile(0.99, sum by(le,logstore,optype,pod) (rate(greptime_logstore_op_elapsed_bucket[$__rate_interval])))",
"instant": false,
"legendFormat": "{{pod}}",
"legendFormat": "{{pod}}-{{logstore}}-{{optype}}-p99",
"range": true,
"refId": "A"
}
],
"title": "Inflight Compaction",
"title": "Log Store op duration seconds",
"type": "timeseries"
},
{
@@ -6188,6 +6188,105 @@
"title": "Inflight Flush",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Ongoing compaction task count",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "points",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "none"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 194
},
"id": 271,
"options": {
"legend": {
"calcs": [
"lastNotNull"
],
"displayMode": "table",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.1.3",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "greptime_mito_inflight_compaction_count",
"instant": false,
"legendFormat": "{{pod}}",
"range": true,
"refId": "A"
}
],
"title": "Inflight Compaction",
"type": "timeseries"
},
{
"collapsed": false,
"gridPos": {

View File

@@ -406,7 +406,7 @@ mod tests {
sync_write = false
[storage]
data_home = "/tmp/greptimedb/"
data_home = "./greptimedb_data/"
type = "File"
[[storage.providers]]
@@ -420,7 +420,7 @@ mod tests {
[logging]
level = "debug"
dir = "/tmp/greptimedb/test/logs"
dir = "./greptimedb_data/test/logs"
"#;
write!(file, "{}", toml_str).unwrap();
@@ -467,7 +467,7 @@ mod tests {
assert_eq!(10000, ddl_timeout.as_millis());
assert_eq!(3000, timeout.as_millis());
assert!(tcp_nodelay);
assert_eq!("/tmp/greptimedb/", options.storage.data_home);
assert_eq!("./greptimedb_data/", options.storage.data_home);
assert!(matches!(
&options.storage.store,
ObjectStoreConfig::File(FileConfig { .. })
@@ -483,7 +483,10 @@ mod tests {
));
assert_eq!("debug", options.logging.level.unwrap());
assert_eq!("/tmp/greptimedb/test/logs".to_string(), options.logging.dir);
assert_eq!(
"./greptimedb_data/test/logs".to_string(),
options.logging.dir
);
}
#[test]
@@ -526,7 +529,7 @@ mod tests {
let options = cmd
.load_options(&GlobalOptions {
log_dir: Some("/tmp/greptimedb/test/logs".to_string()),
log_dir: Some("./greptimedb_data/test/logs".to_string()),
log_level: Some("debug".to_string()),
#[cfg(feature = "tokio-console")]
@@ -536,7 +539,7 @@ mod tests {
.component;
let logging_opt = options.logging;
assert_eq!("/tmp/greptimedb/test/logs", logging_opt.dir);
assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
assert_eq!("debug", logging_opt.level.as_ref().unwrap());
}
@@ -565,11 +568,11 @@ mod tests {
[storage]
type = "File"
data_home = "/tmp/greptimedb/"
data_home = "./greptimedb_data/"
[logging]
level = "debug"
dir = "/tmp/greptimedb/test/logs"
dir = "./greptimedb_data/test/logs"
"#;
write!(file, "{}", toml_str).unwrap();

View File

@@ -440,7 +440,7 @@ mod tests {
[http]
addr = "127.0.0.1:4000"
timeout = "30s"
timeout = "0s"
body_limit = "2GB"
[opentsdb]
@@ -448,7 +448,7 @@ mod tests {
[logging]
level = "debug"
dir = "/tmp/greptimedb/test/logs"
dir = "./greptimedb_data/test/logs"
"#;
write!(file, "{}", toml_str).unwrap();
@@ -461,12 +461,15 @@ mod tests {
let fe_opts = command.load_options(&Default::default()).unwrap().component;
assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
assert_eq!(Duration::from_secs(30), fe_opts.http.timeout);
assert_eq!(Duration::from_secs(0), fe_opts.http.timeout);
assert_eq!(ReadableSize::gb(2), fe_opts.http.body_limit);
assert_eq!("debug", fe_opts.logging.level.as_ref().unwrap());
assert_eq!("/tmp/greptimedb/test/logs".to_string(), fe_opts.logging.dir);
assert_eq!(
"./greptimedb_data/test/logs".to_string(),
fe_opts.logging.dir
);
assert!(!fe_opts.opentsdb.enable);
}
@@ -505,7 +508,7 @@ mod tests {
let options = cmd
.load_options(&GlobalOptions {
log_dir: Some("/tmp/greptimedb/test/logs".to_string()),
log_dir: Some("./greptimedb_data/test/logs".to_string()),
log_level: Some("debug".to_string()),
#[cfg(feature = "tokio-console")]
@@ -515,7 +518,7 @@ mod tests {
.component;
let logging_opt = options.logging;
assert_eq!("/tmp/greptimedb/test/logs", logging_opt.dir);
assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
assert_eq!("debug", logging_opt.level.as_ref().unwrap());
}

View File

@@ -337,7 +337,7 @@ mod tests {
[logging]
level = "debug"
dir = "/tmp/greptimedb/test/logs"
dir = "./greptimedb_data/test/logs"
[failure_detector]
threshold = 8.0
@@ -358,7 +358,10 @@ mod tests {
assert_eq!(vec!["127.0.0.1:2379".to_string()], options.store_addrs);
assert_eq!(SelectorType::LeaseBased, options.selector);
assert_eq!("debug", options.logging.level.as_ref().unwrap());
assert_eq!("/tmp/greptimedb/test/logs".to_string(), options.logging.dir);
assert_eq!(
"./greptimedb_data/test/logs".to_string(),
options.logging.dir
);
assert_eq!(8.0, options.failure_detector.threshold);
assert_eq!(
100.0,
@@ -396,7 +399,7 @@ mod tests {
let options = cmd
.load_options(&GlobalOptions {
log_dir: Some("/tmp/greptimedb/test/logs".to_string()),
log_dir: Some("./greptimedb_data/test/logs".to_string()),
log_level: Some("debug".to_string()),
#[cfg(feature = "tokio-console")]
@@ -406,7 +409,7 @@ mod tests {
.component;
let logging_opt = options.logging;
assert_eq!("/tmp/greptimedb/test/logs", logging_opt.dir);
assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
assert_eq!("debug", logging_opt.level.as_ref().unwrap());
}
@@ -424,7 +427,7 @@ mod tests {
[logging]
level = "debug"
dir = "/tmp/greptimedb/test/logs"
dir = "./greptimedb_data/test/logs"
"#;
write!(file, "{}", toml_str).unwrap();

View File

@@ -852,7 +852,7 @@ mod tests {
[wal]
provider = "raft_engine"
dir = "/tmp/greptimedb/test/wal"
dir = "./greptimedb_data/test/wal"
file_size = "1GB"
purge_threshold = "50GB"
purge_interval = "10m"
@@ -860,7 +860,7 @@ mod tests {
sync_write = false
[storage]
data_home = "/tmp/greptimedb/"
data_home = "./greptimedb_data/"
type = "File"
[[storage.providers]]
@@ -892,7 +892,7 @@ mod tests {
[logging]
level = "debug"
dir = "/tmp/greptimedb/test/logs"
dir = "./greptimedb_data/test/logs"
"#;
write!(file, "{}", toml_str).unwrap();
let cmd = StartCommand {
@@ -922,7 +922,10 @@ mod tests {
let DatanodeWalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
unreachable!()
};
assert_eq!("/tmp/greptimedb/test/wal", raft_engine_config.dir.unwrap());
assert_eq!(
"./greptimedb_data/test/wal",
raft_engine_config.dir.unwrap()
);
assert!(matches!(
&dn_opts.storage.store,
@@ -946,7 +949,7 @@ mod tests {
}
assert_eq!("debug", logging_opts.level.as_ref().unwrap());
assert_eq!("/tmp/greptimedb/test/logs".to_string(), logging_opts.dir);
assert_eq!("./greptimedb_data/test/logs".to_string(), logging_opts.dir);
}
#[test]
@@ -958,7 +961,7 @@ mod tests {
let opts = cmd
.load_options(&GlobalOptions {
log_dir: Some("/tmp/greptimedb/test/logs".to_string()),
log_dir: Some("./greptimedb_data/test/logs".to_string()),
log_level: Some("debug".to_string()),
#[cfg(feature = "tokio-console")]
@@ -967,7 +970,7 @@ mod tests {
.unwrap()
.component;
assert_eq!("/tmp/greptimedb/test/logs", opts.logging.dir);
assert_eq!("./greptimedb_data/test/logs", opts.logging.dir);
assert_eq!("debug", opts.logging.level.unwrap());
}

View File

@@ -56,13 +56,13 @@ fn test_load_datanode_example_config() {
metadata_cache_tti: Duration::from_secs(300),
}),
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
dir: Some("/tmp/greptimedb/wal".to_string()),
dir: Some("./greptimedb_data/wal".to_string()),
sync_period: Some(Duration::from_secs(10)),
recovery_parallelism: 2,
..Default::default()
}),
storage: StorageConfig {
data_home: "/tmp/greptimedb/".to_string(),
data_home: "./greptimedb_data/".to_string(),
..Default::default()
},
region_engine: vec![
@@ -159,10 +159,10 @@ fn test_load_metasrv_example_config() {
let expected = GreptimeOptions::<MetasrvOptions> {
component: MetasrvOptions {
selector: SelectorType::default(),
data_home: "/tmp/metasrv/".to_string(),
data_home: "./greptimedb_data/metasrv/".to_string(),
server_addr: "127.0.0.1:3002".to_string(),
logging: LoggingOptions {
dir: "/tmp/greptimedb/logs".to_string(),
dir: "./greptimedb_data/logs".to_string(),
level: Some("info".to_string()),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
@@ -202,7 +202,7 @@ fn test_load_standalone_example_config() {
component: StandaloneOptions {
default_timezone: Some("UTC".to_string()),
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
dir: Some("/tmp/greptimedb/wal".to_string()),
dir: Some("./greptimedb_data/wal".to_string()),
sync_period: Some(Duration::from_secs(10)),
recovery_parallelism: 2,
..Default::default()
@@ -219,7 +219,7 @@ fn test_load_standalone_example_config() {
}),
],
storage: StorageConfig {
data_home: "/tmp/greptimedb/".to_string(),
data_home: "./greptimedb_data/".to_string(),
..Default::default()
},
logging: LoggingOptions {

View File

@@ -135,5 +135,6 @@ pub fn is_readonly_schema(schema: &str) -> bool {
pub const TRACE_ID_COLUMN: &str = "trace_id";
pub const SPAN_ID_COLUMN: &str = "span_id";
pub const SPAN_NAME_COLUMN: &str = "span_name";
pub const SERVICE_NAME_COLUMN: &str = "service_name";
pub const PARENT_SPAN_ID_COLUMN: &str = "parent_span_id";
// ---- End of special table and fields ----

View File

@@ -161,7 +161,7 @@ mod tests {
[wal]
provider = "raft_engine"
dir = "/tmp/greptimedb/wal"
dir = "./greptimedb_data/wal"
file_size = "1GB"
purge_threshold = "50GB"
purge_interval = "10m"
@@ -170,7 +170,7 @@ mod tests {
[logging]
level = "debug"
dir = "/tmp/greptimedb/test/logs"
dir = "./greptimedb_data/test/logs"
"#;
write!(file, "{}", toml_str).unwrap();
@@ -246,7 +246,7 @@ mod tests {
let DatanodeWalConfig::RaftEngine(raft_engine_config) = opts.wal else {
unreachable!()
};
assert_eq!(raft_engine_config.dir.unwrap(), "/tmp/greptimedb/wal");
assert_eq!(raft_engine_config.dir.unwrap(), "./greptimedb_data/wal");
// Should be default values.
assert_eq!(opts.node_id, None);

View File

@@ -24,9 +24,11 @@ pub(crate) mod sum;
mod vector_add;
mod vector_dim;
mod vector_div;
mod vector_kth_elem;
mod vector_mul;
mod vector_norm;
mod vector_sub;
mod vector_subvector;
use std::sync::Arc;
@@ -56,6 +58,8 @@ impl VectorFunction {
registry.register(Arc::new(vector_div::VectorDivFunction));
registry.register(Arc::new(vector_norm::VectorNormFunction));
registry.register(Arc::new(vector_dim::VectorDimFunction));
registry.register(Arc::new(vector_kth_elem::VectorKthElemFunction));
registry.register(Arc::new(vector_subvector::VectorSubvectorFunction));
registry.register(Arc::new(elem_sum::ElemSumFunction));
registry.register(Arc::new(elem_product::ElemProductFunction));
}

View File

@@ -0,0 +1,211 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::borrow::Cow;
use std::fmt::Display;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::Signature;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{Float32VectorBuilder, MutableVector, VectorRef};
use snafu::ensure;
use crate::function::{Function, FunctionContext};
use crate::helper;
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const};
const NAME: &str = "vec_kth_elem";
/// Returns the k-th(0-based index) element of the vector.
///
/// # Example
///
/// ```sql
/// SELECT vec_kth_elem("[2, 4, 6]",1) as result;
///
/// +---------+
/// | result |
/// +---------+
/// | 4 |
/// +---------+
///
/// ```
///
#[derive(Debug, Clone, Default)]
pub struct VectorKthElemFunction;
impl Function for VectorKthElemFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(
&self,
_input_types: &[ConcreteDataType],
) -> common_query::error::Result<ConcreteDataType> {
Ok(ConcreteDataType::float32_datatype())
}
fn signature(&self) -> Signature {
helper::one_of_sigs2(
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::binary_datatype(),
],
vec![ConcreteDataType::int64_datatype()],
)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly two, have: {}",
columns.len()
),
}
);
let arg0 = &columns[0];
let arg1 = &columns[1];
let len = arg0.len();
let mut result = Float32VectorBuilder::with_capacity(len);
if len == 0 {
return Ok(result.to_vector());
};
let arg0_const = as_veclit_if_const(arg0)?;
for i in 0..len {
let arg0 = match arg0_const.as_ref() {
Some(arg0) => Some(Cow::Borrowed(arg0.as_ref())),
None => as_veclit(arg0.get_ref(i))?,
};
let Some(arg0) = arg0 else {
result.push_null();
continue;
};
let arg1 = arg1.get(i).as_f64_lossy();
let Some(arg1) = arg1 else {
result.push_null();
continue;
};
ensure!(
arg1 >= 0.0 && arg1.fract() == 0.0,
InvalidFuncArgsSnafu {
err_msg: format!(
"Invalid argument: k must be a non-negative integer, but got k = {}.",
arg1
),
}
);
let k = arg1 as usize;
ensure!(
k < arg0.len(),
InvalidFuncArgsSnafu {
err_msg: format!(
"Out of range: k must be in the range [0, {}], but got k = {}.",
arg0.len() - 1,
k
),
}
);
let value = arg0[k];
result.push(Some(value));
}
Ok(result.to_vector())
}
}
impl Display for VectorKthElemFunction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", NAME.to_ascii_uppercase())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_query::error;
use datatypes::vectors::{Int64Vector, StringVector};
use super::*;
#[test]
fn test_vec_kth_elem() {
let func = VectorKthElemFunction;
let input0 = Arc::new(StringVector::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
Some("[4.0,5.0,6.0]".to_string()),
Some("[7.0,8.0,9.0]".to_string()),
None,
]));
let input1 = Arc::new(Int64Vector::from(vec![Some(0), Some(2), None, Some(1)]));
let result = func
.eval(&FunctionContext::default(), &[input0, input1])
.unwrap();
let result = result.as_ref();
assert_eq!(result.len(), 4);
assert_eq!(result.get_ref(0).as_f32().unwrap(), Some(1.0));
assert_eq!(result.get_ref(1).as_f32().unwrap(), Some(6.0));
assert!(result.get_ref(2).is_null());
assert!(result.get_ref(3).is_null());
let input0 = Arc::new(StringVector::from(vec![Some("[1.0,2.0,3.0]".to_string())]));
let input1 = Arc::new(Int64Vector::from(vec![Some(3)]));
let err = func
.eval(&FunctionContext::default(), &[input0, input1])
.unwrap_err();
match err {
error::Error::InvalidFuncArgs { err_msg, .. } => {
assert_eq!(
err_msg,
format!("Out of range: k must be in the range [0, 2], but got k = 3.")
)
}
_ => unreachable!(),
}
let input0 = Arc::new(StringVector::from(vec![Some("[1.0,2.0,3.0]".to_string())]));
let input1 = Arc::new(Int64Vector::from(vec![Some(-1)]));
let err = func
.eval(&FunctionContext::default(), &[input0, input1])
.unwrap_err();
match err {
error::Error::InvalidFuncArgs { err_msg, .. } => {
assert_eq!(
err_msg,
format!("Invalid argument: k must be a non-negative integer, but got k = -1.")
)
}
_ => unreachable!(),
}
}
}

View File

@@ -0,0 +1,240 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::borrow::Cow;
use std::fmt::Display;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, TypeSignature};
use datafusion_expr::Volatility;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{BinaryVectorBuilder, MutableVector, VectorRef};
use snafu::ensure;
use crate::function::{Function, FunctionContext};
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const, veclit_to_binlit};
const NAME: &str = "vec_subvector";
/// Returns a subvector from start(included) to end(excluded) index.
///
/// # Example
///
/// ```sql
/// SELECT vec_to_string(vec_subvector("[1, 2, 3, 4, 5]", 1, 3)) as result;
///
/// +---------+
/// | result |
/// +---------+
/// | [2, 3] |
/// +---------+
///
/// ```
///
#[derive(Debug, Clone, Default)]
pub struct VectorSubvectorFunction;
impl Function for VectorSubvectorFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::binary_datatype())
}
fn signature(&self) -> Signature {
Signature::one_of(
vec![
TypeSignature::Exact(vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::int64_datatype(),
]),
TypeSignature::Exact(vec![
ConcreteDataType::binary_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::int64_datatype(),
]),
],
Volatility::Immutable,
)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 3,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly three, have: {}",
columns.len()
)
}
);
let arg0 = &columns[0];
let arg1 = &columns[1];
let arg2 = &columns[2];
ensure!(
arg0.len() == arg1.len() && arg1.len() == arg2.len(),
InvalidFuncArgsSnafu {
err_msg: format!(
"The lengths of the vector are not aligned, args 0: {}, args 1: {}, args 2: {}",
arg0.len(),
arg1.len(),
arg2.len()
)
}
);
let len = arg0.len();
let mut result = BinaryVectorBuilder::with_capacity(len);
if len == 0 {
return Ok(result.to_vector());
}
let arg0_const = as_veclit_if_const(arg0)?;
for i in 0..len {
let arg0 = match arg0_const.as_ref() {
Some(arg0) => Some(Cow::Borrowed(arg0.as_ref())),
None => as_veclit(arg0.get_ref(i))?,
};
let arg1 = arg1.get(i).as_i64();
let arg2 = arg2.get(i).as_i64();
let (Some(arg0), Some(arg1), Some(arg2)) = (arg0, arg1, arg2) else {
result.push_null();
continue;
};
ensure!(
0 <= arg1 && arg1 <= arg2 && arg2 as usize <= arg0.len(),
InvalidFuncArgsSnafu {
err_msg: format!(
"Invalid start and end indices: start={}, end={}, vec_len={}",
arg1,
arg2,
arg0.len()
)
}
);
let subvector = &arg0[arg1 as usize..arg2 as usize];
let binlit = veclit_to_binlit(subvector);
result.push(Some(&binlit));
}
Ok(result.to_vector())
}
}
impl Display for VectorSubvectorFunction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", NAME.to_ascii_uppercase())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_query::error::Error;
use datatypes::vectors::{Int64Vector, StringVector};
use super::*;
use crate::function::FunctionContext;
#[test]
fn test_subvector() {
let func = VectorSubvectorFunction;
let input0 = Arc::new(StringVector::from(vec![
Some("[1.0, 2.0, 3.0, 4.0, 5.0]".to_string()),
Some("[6.0, 7.0, 8.0, 9.0, 10.0]".to_string()),
None,
Some("[11.0, 12.0, 13.0]".to_string()),
]));
let input1 = Arc::new(Int64Vector::from(vec![Some(1), Some(0), Some(0), Some(1)]));
let input2 = Arc::new(Int64Vector::from(vec![Some(3), Some(5), Some(2), Some(3)]));
let result = func
.eval(&FunctionContext::default(), &[input0, input1, input2])
.unwrap();
let result = result.as_ref();
assert_eq!(result.len(), 4);
assert_eq!(
result.get_ref(0).as_binary().unwrap(),
Some(veclit_to_binlit(&[2.0, 3.0]).as_slice())
);
assert_eq!(
result.get_ref(1).as_binary().unwrap(),
Some(veclit_to_binlit(&[6.0, 7.0, 8.0, 9.0, 10.0]).as_slice())
);
assert!(result.get_ref(2).is_null());
assert_eq!(
result.get_ref(3).as_binary().unwrap(),
Some(veclit_to_binlit(&[12.0, 13.0]).as_slice())
);
}
#[test]
fn test_subvector_error() {
let func = VectorSubvectorFunction;
let input0 = Arc::new(StringVector::from(vec![
Some("[1.0, 2.0, 3.0]".to_string()),
Some("[4.0, 5.0, 6.0]".to_string()),
]));
let input1 = Arc::new(Int64Vector::from(vec![Some(1), Some(2)]));
let input2 = Arc::new(Int64Vector::from(vec![Some(3)]));
let result = func.eval(&FunctionContext::default(), &[input0, input1, input2]);
match result {
Err(Error::InvalidFuncArgs { err_msg, .. }) => {
assert_eq!(
err_msg,
"The lengths of the vector are not aligned, args 0: 2, args 1: 2, args 2: 1"
)
}
_ => unreachable!(),
}
}
#[test]
fn test_subvector_invalid_indices() {
let func = VectorSubvectorFunction;
let input0 = Arc::new(StringVector::from(vec![
Some("[1.0, 2.0, 3.0]".to_string()),
Some("[4.0, 5.0, 6.0]".to_string()),
]));
let input1 = Arc::new(Int64Vector::from(vec![Some(1), Some(3)]));
let input2 = Arc::new(Int64Vector::from(vec![Some(3), Some(4)]));
let result = func.eval(&FunctionContext::default(), &[input0, input1, input2]);
match result {
Err(Error::InvalidFuncArgs { err_msg, .. }) => {
assert_eq!(
err_msg,
"Invalid start and end indices: start=3, end=4, vec_len=3"
)
}
_ => unreachable!(),
}
}
}

View File

@@ -22,31 +22,30 @@ use std::vec;
use api::v1::alter_table_expr::Kind;
use api::v1::RenameTable;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, ContextProvider, Error as ProcedureError, LockKey, PoisonKey,
PoisonKeys, Procedure, ProcedureId, Status, StringKey,
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Status, StringKey,
};
use common_telemetry::{debug, error, info};
use futures::future;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use snafu::ResultExt;
use store_api::storage::RegionId;
use strum::AsRefStr;
use table::metadata::{RawTableInfo, TableId, TableInfo};
use table::table_reference::TableReference;
use crate::cache_invalidator::Context;
use crate::ddl::utils::{add_peer_context_if_needed, handle_multiple_results, MultipleResults};
use crate::ddl::utils::add_peer_context_if_needed;
use crate::ddl::DdlContext;
use crate::error::{AbortProcedureSnafu, Error, NoLeaderSnafu, PutPoisonSnafu, Result};
use crate::error::{Error, Result};
use crate::instruction::CacheIdent;
use crate::key::table_info::TableInfoValue;
use crate::key::{DeserializedValueWithBytes, RegionDistribution};
use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
use crate::metrics;
use crate::poison_key::table_poison_key;
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders, region_distribution};
@@ -105,27 +104,7 @@ impl AlterTableProcedure {
Ok(Status::executing(true))
}
fn table_poison_key(&self) -> PoisonKey {
table_poison_key(self.data.table_id())
}
async fn put_poison(
&self,
ctx_provider: &dyn ContextProvider,
procedure_id: ProcedureId,
) -> Result<()> {
let poison_key = self.table_poison_key();
ctx_provider
.try_put_poison(&poison_key, procedure_id)
.await
.context(PutPoisonSnafu)
}
pub async fn submit_alter_region_requests(
&mut self,
procedure_id: ProcedureId,
ctx_provider: &dyn ContextProvider,
) -> Result<Status> {
pub async fn submit_alter_region_requests(&mut self) -> Result<Status> {
let table_id = self.data.table_id();
let (_, physical_table_route) = self
.context
@@ -148,9 +127,6 @@ impl AlterTableProcedure {
alter_kind,
);
ensure!(!leaders.is_empty(), NoLeaderSnafu { table_id });
// Puts the poison before submitting alter region requests to datanodes.
self.put_poison(ctx_provider, procedure_id).await?;
for datanode in leaders {
let requester = self.context.node_manager.datanode(&datanode).await;
let regions = find_leader_regions(&physical_table_route.region_routes, &datanode);
@@ -164,51 +140,28 @@ impl AlterTableProcedure {
let requester = requester.clone();
alter_region_tasks.push(async move {
requester
.handle(request)
.await
.map_err(add_peer_context_if_needed(datanode))
if let Err(err) = requester.handle(request).await {
if err.status_code() != StatusCode::RequestOutdated {
// Treat request outdated as success.
// The engine will throw this code when the schema version not match.
// As this procedure has locked the table, the only reason for this error
// is procedure is succeeded before and is retrying.
return Err(add_peer_context_if_needed(datanode)(err));
}
}
Ok(())
});
}
}
let results = future::join_all(alter_region_tasks)
future::join_all(alter_region_tasks)
.await
.into_iter()
.collect::<Vec<_>>();
.collect::<Result<Vec<_>>>()?;
match handle_multiple_results(results) {
MultipleResults::PartialRetryable(error) => {
// Just returns the error, and wait for the next try.
Err(error)
}
MultipleResults::PartialNonRetryable(error) => {
error!(error; "Partial non-retryable errors occurred during alter table, table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
// No retry will be done.
Ok(Status::poisoned(
Some(self.table_poison_key()),
ProcedureError::external(error),
))
}
MultipleResults::AllRetryable(error) => {
// Just returns the error, and wait for the next try.
Err(error)
}
MultipleResults::Ok => {
self.data.state = AlterTableState::UpdateMetadata;
Ok(Status::executing_with_clean_poisons(true))
}
MultipleResults::AllNonRetryable(error) => {
error!(error; "All alter requests returned non-retryable errors for table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
// It assumes the metadata on datanode is not changed.
// Case: The alter region request is sent but not applied. (e.g., InvalidArgument)
self.data.state = AlterTableState::UpdateMetadata;
let err = BoxedError::new(error);
Err(err).context(AbortProcedureSnafu {
clean_poisons: true,
})
}
}
Ok(Status::executing(true))
}
/// Update table metadata.
@@ -297,12 +250,10 @@ impl Procedure for AlterTableProcedure {
Self::TYPE_NAME
}
async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let error_handler = |e: Error| {
if e.is_retry_later() {
ProcedureError::retry_later(e)
} else if e.need_clean_poisons() {
ProcedureError::external_and_clean_poisons(e)
} else {
ProcedureError::external(e)
}
@@ -318,10 +269,7 @@ impl Procedure for AlterTableProcedure {
match state {
AlterTableState::Prepare => self.on_prepare().await,
AlterTableState::SubmitAlterRegionRequests => {
self.submit_alter_region_requests(ctx.procedure_id, ctx.provider.as_ref())
.await
}
AlterTableState::SubmitAlterRegionRequests => self.submit_alter_region_requests().await,
AlterTableState::UpdateMetadata => self.on_update_metadata().await,
AlterTableState::InvalidateTableCache => self.on_broadcast().await,
}
@@ -337,10 +285,6 @@ impl Procedure for AlterTableProcedure {
LockKey::new(key)
}
fn poison_keys(&self) -> PoisonKeys {
PoisonKeys::new(vec![self.table_poison_key()])
}
}
#[derive(Debug, Serialize, Deserialize, AsRefStr)]

View File

@@ -299,9 +299,7 @@ impl Procedure for CreateTableProcedure {
.creator
.register_opening_regions(&self.context, &x.region_routes)
.map_err(BoxedError::new)
.context(ExternalSnafu {
clean_poisons: false,
})?;
.context(ExternalSnafu)?;
}
Ok(())

View File

@@ -130,9 +130,7 @@ impl Procedure for DropDatabaseProcedure {
self.state
.recover(&self.runtime_context)
.map_err(BoxedError::new)
.context(ExternalSnafu {
clean_poisons: false,
})
.context(ExternalSnafu)
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {

View File

@@ -200,9 +200,7 @@ impl Procedure for DropTableProcedure {
if register_operating_regions {
self.register_dropping_regions()
.map_err(BoxedError::new)
.context(ExternalSnafu {
clean_poisons: false,
})?;
.context(ExternalSnafu)?;
}
Ok(())

View File

@@ -80,13 +80,7 @@ pub async fn create_logical_table(
let tasks = vec![test_create_logical_table_task(table_name)];
let mut procedure = CreateLogicalTablesProcedure::new(tasks, physical_table_id, ddl_context);
let status = procedure.on_prepare().await.unwrap();
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
assert_matches!(status, Status::Executing { persist: true });
let status = procedure.on_create_metadata().await.unwrap();
assert_matches!(status, Status::Done { .. });

View File

@@ -171,74 +171,3 @@ impl MockDatanodeHandler for NaiveDatanodeHandler {
unreachable!()
}
}
#[derive(Clone)]
pub struct PartialSuccessDatanodeHandler {
pub retryable: bool,
}
#[async_trait::async_trait]
impl MockDatanodeHandler for PartialSuccessDatanodeHandler {
async fn handle(&self, peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
let success = peer.id % 2 == 0;
if success {
Ok(RegionResponse::new(0))
} else if self.retryable {
Err(Error::RetryLater {
source: BoxedError::new(
error::UnexpectedSnafu {
err_msg: "retry later",
}
.build(),
),
})
} else {
error::UnexpectedSnafu {
err_msg: "mock error",
}
.fail()
}
}
async fn handle_query(
&self,
_peer: &Peer,
_request: QueryRequest,
) -> Result<SendableRecordBatchStream> {
unreachable!()
}
}
#[derive(Clone)]
pub struct AllFailureDatanodeHandler {
pub retryable: bool,
}
#[async_trait::async_trait]
impl MockDatanodeHandler for AllFailureDatanodeHandler {
async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
if self.retryable {
Err(Error::RetryLater {
source: BoxedError::new(
error::UnexpectedSnafu {
err_msg: "retry later",
}
.build(),
),
})
} else {
error::UnexpectedSnafu {
err_msg: "mock error",
}
.fail()
}
}
async fn handle_query(
&self,
_peer: &Peer,
_request: QueryRequest,
) -> Result<SendableRecordBatchStream> {
unreachable!()
}
}

View File

@@ -180,13 +180,7 @@ async fn test_on_prepare() {
let mut procedure = AlterLogicalTablesProcedure::new(tasks, phy_id, ddl_context);
let result = procedure.on_prepare().await;
assert_matches!(
result,
Ok(Status::Executing {
persist: true,
clean_poisons: false
})
);
assert_matches!(result, Ok(Status::Executing { persist: true }));
}
#[tokio::test]
@@ -211,13 +205,7 @@ async fn test_on_update_metadata() {
let mut procedure = AlterLogicalTablesProcedure::new(tasks, phy_id, ddl_context);
let mut status = procedure.on_prepare().await.unwrap();
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
assert_matches!(status, Status::Executing { persist: true });
let ctx = common_procedure::Context {
procedure_id: ProcedureId::random(),
@@ -225,22 +213,10 @@ async fn test_on_update_metadata() {
};
// on_submit_alter_region_requests
status = procedure.execute(&ctx).await.unwrap();
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
assert_matches!(status, Status::Executing { persist: true });
// on_update_metadata
status = procedure.execute(&ctx).await.unwrap();
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
assert_matches!(status, Status::Executing { persist: true });
}
#[tokio::test]
@@ -261,13 +237,7 @@ async fn test_on_part_duplicate_alter_request() {
let mut procedure = AlterLogicalTablesProcedure::new(tasks, phy_id, ddl_context.clone());
let mut status = procedure.on_prepare().await.unwrap();
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
assert_matches!(status, Status::Executing { persist: true });
let ctx = common_procedure::Context {
procedure_id: ProcedureId::random(),
@@ -275,22 +245,10 @@ async fn test_on_part_duplicate_alter_request() {
};
// on_submit_alter_region_requests
status = procedure.execute(&ctx).await.unwrap();
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
assert_matches!(status, Status::Executing { persist: true });
// on_update_metadata
status = procedure.execute(&ctx).await.unwrap();
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
assert_matches!(status, Status::Executing { persist: true });
// re-alter
let tasks = vec![
@@ -312,13 +270,7 @@ async fn test_on_part_duplicate_alter_request() {
let mut procedure = AlterLogicalTablesProcedure::new(tasks, phy_id, ddl_context.clone());
let mut status = procedure.on_prepare().await.unwrap();
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
assert_matches!(status, Status::Executing { persist: true });
let ctx = common_procedure::Context {
procedure_id: ProcedureId::random(),
@@ -326,22 +278,10 @@ async fn test_on_part_duplicate_alter_request() {
};
// on_submit_alter_region_requests
status = procedure.execute(&ctx).await.unwrap();
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
assert_matches!(status, Status::Executing { persist: true });
// on_update_metadata
status = procedure.execute(&ctx).await.unwrap();
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
assert_matches!(status, Status::Executing { persist: true });
let table_name_keys = vec![
TableNameKey::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "table1"),

View File

@@ -25,9 +25,6 @@ use api::v1::{
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_procedure::store::poison_store::PoisonStore;
use common_procedure::{ProcedureId, Status};
use common_procedure_test::MockContextProvider;
use store_api::storage::RegionId;
use table::requests::TTL_KEY;
use tokio::sync::mpsc::{self};
@@ -36,46 +33,16 @@ use crate::ddl::alter_table::AlterTableProcedure;
use crate::ddl::test_util::alter_table::TestAlterTableExprBuilder;
use crate::ddl::test_util::create_table::test_create_table_task;
use crate::ddl::test_util::datanode_handler::{
AllFailureDatanodeHandler, DatanodeWatcher, PartialSuccessDatanodeHandler,
RequestOutdatedErrorDatanodeHandler,
DatanodeWatcher, RequestOutdatedErrorDatanodeHandler,
};
use crate::error::Error;
use crate::key::datanode_table::DatanodeTableKey;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::node_manager::NodeManagerRef;
use crate::peer::Peer;
use crate::poison_key::table_poison_key;
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::{Region, RegionRoute};
use crate::test_util::{new_ddl_context, MockDatanodeManager};
fn prepare_table_route(table_id: u32) -> TableRouteValue {
TableRouteValue::physical(vec![
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 1)),
leader_peer: Some(Peer::empty(1)),
follower_peers: vec![Peer::empty(5)],
leader_state: None,
leader_down_since: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 2)),
leader_peer: Some(Peer::empty(2)),
follower_peers: vec![Peer::empty(4)],
leader_state: None,
leader_down_since: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 3)),
leader_peer: Some(Peer::empty(3)),
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
},
])
}
fn test_rename_alter_table_task(table_name: &str, new_table_name: &str) -> AlterTableTask {
let builder = TestAlterTableExprBuilder::default()
.table_name(table_name)
@@ -134,7 +101,29 @@ async fn test_on_submit_alter_request() {
.table_metadata_manager
.create_table_metadata(
task.table_info.clone(),
prepare_table_route(table_id),
TableRouteValue::physical(vec![
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 1)),
leader_peer: Some(Peer::empty(1)),
follower_peers: vec![Peer::empty(5)],
leader_state: None,
leader_down_since: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 2)),
leader_peer: Some(Peer::empty(2)),
follower_peers: vec![Peer::empty(4)],
leader_state: None,
leader_down_since: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 3)),
leader_peer: Some(Peer::empty(3)),
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
},
]),
HashMap::new(),
)
.await
@@ -152,15 +141,9 @@ async fn test_on_submit_alter_request() {
})),
},
};
let procedure_id = ProcedureId::random();
let provider = Arc::new(MockContextProvider::default());
let mut procedure =
AlterTableProcedure::new(table_id, alter_table_task, ddl_context.clone()).unwrap();
let mut procedure = AlterTableProcedure::new(table_id, alter_table_task, ddl_context).unwrap();
procedure.on_prepare().await.unwrap();
procedure
.submit_alter_region_requests(procedure_id, provider.as_ref())
.await
.unwrap();
procedure.submit_alter_region_requests().await.unwrap();
let check = |peer: Peer,
request: RegionRequest,
@@ -202,7 +185,29 @@ async fn test_on_submit_alter_request_with_outdated_request() {
.table_metadata_manager
.create_table_metadata(
task.table_info.clone(),
prepare_table_route(table_id),
TableRouteValue::physical(vec![
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 1)),
leader_peer: Some(Peer::empty(1)),
follower_peers: vec![Peer::empty(5)],
leader_state: None,
leader_down_since: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 2)),
leader_peer: Some(Peer::empty(2)),
follower_peers: vec![Peer::empty(4)],
leader_state: None,
leader_down_since: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 3)),
leader_peer: Some(Peer::empty(3)),
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
},
]),
HashMap::new(),
)
.await
@@ -220,15 +225,9 @@ async fn test_on_submit_alter_request_with_outdated_request() {
})),
},
};
let procedure_id = ProcedureId::random();
let provider = Arc::new(MockContextProvider::default());
let mut procedure = AlterTableProcedure::new(table_id, alter_table_task, ddl_context).unwrap();
procedure.on_prepare().await.unwrap();
let err = procedure
.submit_alter_region_requests(procedure_id, provider.as_ref())
.await
.unwrap_err();
assert!(!err.is_retry_later());
procedure.submit_alter_region_requests().await.unwrap();
}
#[tokio::test]
@@ -327,14 +326,9 @@ async fn test_on_update_metadata_add_columns() {
})),
},
};
let procedure_id = ProcedureId::random();
let provider = Arc::new(MockContextProvider::default());
let mut procedure = AlterTableProcedure::new(table_id, task, ddl_context.clone()).unwrap();
procedure.on_prepare().await.unwrap();
procedure
.submit_alter_region_requests(procedure_id, provider.as_ref())
.await
.unwrap();
procedure.submit_alter_region_requests().await.unwrap();
procedure.on_update_metadata().await.unwrap();
let table_info = ddl_context
@@ -393,14 +387,9 @@ async fn test_on_update_table_options() {
})),
},
};
let procedure_id = ProcedureId::random();
let provider = Arc::new(MockContextProvider::default());
let mut procedure = AlterTableProcedure::new(table_id, task, ddl_context.clone()).unwrap();
procedure.on_prepare().await.unwrap();
procedure
.submit_alter_region_requests(procedure_id, provider.as_ref())
.await
.unwrap();
procedure.submit_alter_region_requests().await.unwrap();
procedure.on_update_metadata().await.unwrap();
let table_info = ddl_context
@@ -428,156 +417,3 @@ async fn test_on_update_table_options() {
HashMap::from(&table_info.meta.options)
);
}
async fn prepare_alter_table_procedure(
node_manager: NodeManagerRef,
) -> (AlterTableProcedure, ProcedureId) {
common_telemetry::init_default_ut_logging();
let ddl_context = new_ddl_context(node_manager);
let table_id = 1024;
let table_name = "foo";
let task = test_create_table_task(table_name, table_id);
// Puts a value to table name key.
ddl_context
.table_metadata_manager
.create_table_metadata(
task.table_info.clone(),
prepare_table_route(table_id),
HashMap::new(),
)
.await
.unwrap();
let alter_table_task = AlterTableTask {
alter_table: AlterTableExpr {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
kind: Some(Kind::DropColumns(DropColumns {
drop_columns: vec![DropColumn {
name: "cpu".to_string(),
}],
})),
},
};
let procedure_id = ProcedureId::random();
let mut procedure =
AlterTableProcedure::new(table_id, alter_table_task, ddl_context.clone()).unwrap();
procedure.on_prepare().await.unwrap();
(procedure, procedure_id)
}
#[tokio::test]
async fn test_on_submit_alter_request_with_partial_success_retryable() {
let node_manager = Arc::new(MockDatanodeManager::new(PartialSuccessDatanodeHandler {
retryable: true,
}));
let provider = Arc::new(MockContextProvider::default());
let (mut procedure, procedure_id) = prepare_alter_table_procedure(node_manager).await;
let result = procedure
.submit_alter_region_requests(procedure_id, provider.as_ref())
.await
.unwrap_err();
assert!(result.is_retry_later());
// Submits again
let result = procedure
.submit_alter_region_requests(procedure_id, provider.as_ref())
.await
.unwrap_err();
assert!(result.is_retry_later());
}
#[tokio::test]
async fn test_on_submit_alter_request_with_partial_success_non_retryable() {
let node_manager = Arc::new(MockDatanodeManager::new(PartialSuccessDatanodeHandler {
retryable: false,
}));
let provider = Arc::new(MockContextProvider::default());
let (mut procedure, procedure_id) = prepare_alter_table_procedure(node_manager).await;
let result = procedure
.submit_alter_region_requests(procedure_id, provider.as_ref())
.await
.unwrap();
assert_matches!(result, Status::Poisoned { .. });
// submits again
let result = procedure
.submit_alter_region_requests(procedure_id, provider.as_ref())
.await
.unwrap();
assert_matches!(result, Status::Poisoned { .. });
}
#[tokio::test]
async fn test_on_submit_alter_request_with_all_failure_retrybale() {
common_telemetry::init_default_ut_logging();
let node_manager = Arc::new(MockDatanodeManager::new(AllFailureDatanodeHandler {
retryable: true,
}));
let provider = Arc::new(MockContextProvider::default());
let (mut procedure, procedure_id) = prepare_alter_table_procedure(node_manager).await;
let err = procedure
.submit_alter_region_requests(procedure_id, provider.as_ref())
.await
.unwrap_err();
assert!(err.is_retry_later());
// submits again
let err = procedure
.submit_alter_region_requests(procedure_id, provider.as_ref())
.await
.unwrap_err();
assert!(err.is_retry_later());
}
#[tokio::test]
async fn test_on_submit_alter_request_with_all_failure_non_retrybale() {
common_telemetry::init_default_ut_logging();
let node_manager = Arc::new(MockDatanodeManager::new(AllFailureDatanodeHandler {
retryable: false,
}));
let provider = Arc::new(MockContextProvider::default());
let (mut procedure, procedure_id) = prepare_alter_table_procedure(node_manager).await;
let err = procedure
.submit_alter_region_requests(procedure_id, provider.as_ref())
.await
.unwrap_err();
assert_matches!(err, Error::AbortProcedure { .. });
assert!(!err.is_retry_later());
assert!(err.need_clean_poisons());
// submits again
let err = procedure
.submit_alter_region_requests(procedure_id, provider.as_ref())
.await
.unwrap_err();
assert_matches!(err, Error::AbortProcedure { .. });
assert!(!err.is_retry_later());
assert!(err.need_clean_poisons());
}
#[tokio::test]
async fn test_on_submit_alter_request_with_exist_poison() {
common_telemetry::init_default_ut_logging();
let node_manager = Arc::new(MockDatanodeManager::new(AllFailureDatanodeHandler {
retryable: false,
}));
let provider = Arc::new(MockContextProvider::default());
let (mut procedure, procedure_id) = prepare_alter_table_procedure(node_manager).await;
let table_id = 1024;
let key = table_poison_key(table_id).to_string();
let another_procedure_id = ProcedureId::random();
provider
.poison_manager()
.try_put_poison(key, another_procedure_id.to_string())
.await
.unwrap();
procedure.on_prepare().await.unwrap();
let err = procedure
.submit_alter_region_requests(procedure_id, provider.as_ref())
.await
.unwrap_err();
assert_matches!(err, Error::PutPoison { .. });
}

View File

@@ -69,13 +69,7 @@ async fn test_on_prepare() {
let physical_table_id = table_id;
let mut procedure = CreateLogicalTablesProcedure::new(tasks, physical_table_id, ddl_context);
let status = procedure.on_prepare().await.unwrap();
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
assert_matches!(status, Status::Executing { persist: true });
}
#[tokio::test]
@@ -208,13 +202,7 @@ async fn test_on_prepare_part_logical_tables_exist() {
ddl_context,
);
let status = procedure.on_prepare().await.unwrap();
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
assert_matches!(status, Status::Executing { persist: true });
}
#[tokio::test]
@@ -250,13 +238,7 @@ async fn test_on_create_metadata() {
ddl_context,
);
let status = procedure.on_prepare().await.unwrap();
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
assert_matches!(status, Status::Executing { persist: true });
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
@@ -312,13 +294,7 @@ async fn test_on_create_metadata_part_logical_tables_exist() {
ddl_context,
);
let status = procedure.on_prepare().await.unwrap();
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
assert_matches!(status, Status::Executing { persist: true });
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
@@ -363,13 +339,7 @@ async fn test_on_create_metadata_err() {
ddl_context.clone(),
);
let status = procedure.on_prepare().await.unwrap();
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
assert_matches!(status, Status::Executing { persist: true });
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),

View File

@@ -137,13 +137,7 @@ async fn test_on_prepare_without_create_if_table_exists() {
task.create_table.create_if_not_exists = true;
let mut procedure = CreateTableProcedure::new(task, ddl_context);
let status = procedure.on_prepare().await.unwrap();
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
assert_matches!(status, Status::Executing { persist: true });
assert_eq!(procedure.table_id(), 1024);
}

View File

@@ -153,13 +153,7 @@ async fn test_on_prepare_without_create_if_table_exists() {
task.create_view.create_if_not_exists = true;
let mut procedure = CreateViewProcedure::new(task, ddl_context);
let status = procedure.on_prepare().await.unwrap();
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
assert_matches!(status, Status::Executing { persist: true });
assert_eq!(procedure.view_id(), 1024);
}

View File

@@ -13,12 +13,10 @@
// limitations under the License.
use std::collections::HashMap;
use std::fmt::Debug;
use common_catalog::consts::METRIC_ENGINE;
use common_error::ext::BoxedError;
use common_procedure::error::Error as ProcedureError;
use common_telemetry::{error, warn};
use common_wal::options::WalOptions;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
@@ -40,7 +38,6 @@ use crate::rpc::router::RegionRoute;
/// Adds [Peer] context if the error is unretryable.
pub fn add_peer_context_if_needed(datanode: Peer) -> impl FnOnce(Error) -> Error {
move |err| {
error!(err; "Failed to operate datanode, peer: {}", datanode);
if !err.is_retry_later() {
return Err::<(), BoxedError>(BoxedError::new(err))
.context(OperateDatanodeSnafu { peer: datanode })
@@ -185,85 +182,6 @@ pub fn extract_region_wal_options(
Ok(region_wal_options)
}
/// The result of multiple operations.
///
/// - Ok: all operations are successful.
/// - PartialRetryable: if any operation is retryable and without non retryable error, the result is retryable.
/// - PartialNonRetryable: if any operation is non retryable, the result is non retryable.
/// - AllRetryable: all operations are retryable.
/// - AllNonRetryable: all operations are not retryable.
pub enum MultipleResults {
Ok,
PartialRetryable(Error),
PartialNonRetryable(Error),
AllRetryable(Error),
AllNonRetryable(Error),
}
/// Handles the results of alter region requests.
///
/// For partial success, we need to check if the errors are retryable.
/// If all the errors are retryable, we return a retryable error.
/// Otherwise, we return the first error.
pub fn handle_multiple_results<T: Debug>(results: Vec<Result<T>>) -> MultipleResults {
if results.is_empty() {
return MultipleResults::Ok;
}
let num_results = results.len();
let mut retryable_results = Vec::new();
let mut non_retryable_results = Vec::new();
let mut ok_results = Vec::new();
for result in results {
match result {
Ok(_) => ok_results.push(result),
Err(err) => {
if err.is_retry_later() {
retryable_results.push(err);
} else {
non_retryable_results.push(err);
}
}
}
}
common_telemetry::debug!(
"retryable_results: {}, non_retryable_results: {}, ok_results: {}",
retryable_results.len(),
non_retryable_results.len(),
ok_results.len()
);
if retryable_results.len() == num_results {
return MultipleResults::AllRetryable(retryable_results.into_iter().next().unwrap());
} else if non_retryable_results.len() == num_results {
warn!("all non retryable results: {}", non_retryable_results.len());
for err in &non_retryable_results {
error!(err; "non retryable error");
}
return MultipleResults::AllNonRetryable(non_retryable_results.into_iter().next().unwrap());
} else if ok_results.len() == num_results {
return MultipleResults::Ok;
} else if !retryable_results.is_empty()
&& !ok_results.is_empty()
&& non_retryable_results.is_empty()
{
return MultipleResults::PartialRetryable(retryable_results.into_iter().next().unwrap());
}
warn!(
"partial non retryable results: {}, retryable results: {}, ok results: {}",
non_retryable_results.len(),
retryable_results.len(),
ok_results.len()
);
for err in &non_retryable_results {
error!(err; "non retryable error");
}
// non_retryable_results.len() > 0
MultipleResults::PartialNonRetryable(non_retryable_results.into_iter().next().unwrap())
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -834,7 +834,6 @@ mod tests {
use std::sync::Arc;
use common_procedure::local::LocalManager;
use common_procedure::test_util::InMemoryPoisonStore;
use super::DdlManager;
use crate::cache_invalidator::DummyCacheInvalidator;
@@ -883,12 +882,7 @@ mod tests {
));
let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
let poison_manager = Arc::new(InMemoryPoisonStore::default());
let procedure_manager = Arc::new(LocalManager::new(
Default::default(),
state_store,
poison_manager,
));
let procedure_manager = Arc::new(LocalManager::new(Default::default(), state_store));
let _ = DdlManager::try_new(
DdlContext {

View File

@@ -449,14 +449,6 @@ pub enum Error {
#[snafu(display("Retry later"))]
RetryLater { source: BoxedError },
#[snafu(display("Abort procedure"))]
AbortProcedure {
#[snafu(implicit)]
location: Location,
source: BoxedError,
clean_poisons: bool,
},
#[snafu(display(
"Failed to encode a wal options to json string, wal_options: {:?}",
wal_options
@@ -756,33 +748,6 @@ pub enum Error {
#[snafu(source)]
error: serde_json::Error,
},
#[snafu(display("No leader found for table_id: {}", table_id))]
NoLeader {
table_id: TableId,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Procedure poison key already exists with a different value, key: {}, value: {}",
key,
value
))]
ProcedurePoisonConflict {
key: String,
value: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to put poison, table metadata may be corrupted"))]
PutPoison {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
source: common_procedure::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -801,8 +766,7 @@ impl ErrorExt for Error {
| SerializeToJson { .. }
| DeserializeFromJson { .. } => StatusCode::Internal,
NoLeader { .. } => StatusCode::TableUnavailable,
ValueNotExist { .. } | ProcedurePoisonConflict { .. } => StatusCode::Unexpected,
ValueNotExist { .. } => StatusCode::Unexpected,
Unsupported { .. } => StatusCode::Unsupported,
@@ -873,9 +837,7 @@ impl ErrorExt for Error {
OperateDatanode { source, .. } => source.status_code(),
Table { source, .. } => source.status_code(),
RetryLater { source, .. } => source.status_code(),
AbortProcedure { source, .. } => source.status_code(),
ConvertAlterTableRequest { source, .. } => source.status_code(),
PutPoison { source, .. } => source.status_code(),
ParseProcedureId { .. }
| InvalidNumTopics { .. }
@@ -946,11 +908,6 @@ impl Error {
matches!(self, Error::RetryLater { .. })
}
/// Determine whether it needs to clean poisons.
pub fn need_clean_poisons(&self) -> bool {
matches!(self, Error::AbortProcedure { clean_poisons, .. } if *clean_poisons)
}
/// Returns true if the response exceeds the size limit.
pub fn is_exceeded_size_limit(&self) -> bool {
match self {

View File

@@ -156,7 +156,6 @@ use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::rpc::router::{region_distribution, LeaderState, RegionRoute};
use crate::rpc::store::BatchDeleteRequest;
use crate::state_store::PoisonValue;
use crate::DatanodeId;
pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*";
@@ -1321,8 +1320,7 @@ impl_metadata_value! {
TableFlowValue,
NodeAddressValue,
SchemaNameValue,
FlowStateValue,
PoisonValue
FlowStateValue
}
impl_optional_metadata_value! {

View File

@@ -155,7 +155,7 @@ impl<'a> MySqlTemplateFactory<'a> {
table_name: table_name.to_string(),
create_table_statement: format!(
// Cannot be more than 3072 bytes in PRIMARY KEY
"CREATE TABLE IF NOT EXISTS {table_name}(k VARBINARY(3072) PRIMARY KEY, v BLOB);",
"CREATE TABLE IF NOT EXISTS `{table_name}`(k VARBINARY(3072) PRIMARY KEY, v BLOB);",
),
range_template: RangeTemplate {
point: format!("SELECT k, v FROM `{table_name}` WHERE k = ?"),

View File

@@ -37,7 +37,6 @@ pub mod metrics;
pub mod node_expiry_listener;
pub mod node_manager;
pub mod peer;
pub mod poison_key;
pub mod range_stream;
pub mod region_keeper;
pub mod rpc;

View File

@@ -12,63 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{Display, Formatter};
use std::sync::Arc;
use api::v1::meta::Peer as PbPeer;
use serde::{Deserialize, Serialize};
pub use api::v1::meta::Peer;
use crate::error::Error;
use crate::{DatanodeId, FlownodeId};
#[derive(Debug, Default, Clone, Hash, Eq, PartialEq, Deserialize, Serialize)]
pub struct Peer {
/// Node identifier. Unique in a cluster.
pub id: u64,
pub addr: String,
}
impl From<PbPeer> for Peer {
fn from(p: PbPeer) -> Self {
Self {
id: p.id,
addr: p.addr,
}
}
}
impl From<Peer> for PbPeer {
fn from(p: Peer) -> Self {
Self {
id: p.id,
addr: p.addr,
}
}
}
impl Peer {
pub fn new(id: u64, addr: impl Into<String>) -> Self {
Self {
id,
addr: addr.into(),
}
}
#[cfg(any(test, feature = "testing"))]
pub fn empty(id: u64) -> Self {
Self {
id,
addr: String::new(),
}
}
}
impl Display for Peer {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "peer-{}({})", self.id, self.addr)
}
}
/// can query peer given a node id
#[async_trait::async_trait]
pub trait PeerLookupService {

View File

@@ -1,22 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_procedure::PoisonKey;
use store_api::storage::TableId;
/// Returns the poison key for the table.
pub fn table_poison_key(table_id: TableId) -> PoisonKey {
let key = format!("table/{}", table_id);
PoisonKey::new(&key)
}

View File

@@ -1240,7 +1240,6 @@ impl From<QueryContext> for PbQueryContext {
extensions,
channel: channel as u32,
snapshot_seqs: None,
explain: None,
}
}
}

View File

@@ -81,7 +81,6 @@ pub fn procedure_state_to_pb_state(state: &ProcedureState) -> (PbProcedureStatus
ProcedureState::RollingBack { error } => {
(PbProcedureStatus::RollingBack, error.to_string())
}
ProcedureState::Poisoned { error, .. } => (PbProcedureStatus::Poisoned, error.to_string()),
}
}

View File

@@ -14,23 +14,16 @@
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_procedure::error::{
DeletePoisonSnafu, DeleteStatesSnafu, GetPoisonSnafu, ListStateSnafu, PutPoisonSnafu,
PutStateSnafu, Result as ProcedureResult,
};
use common_procedure::store::poison_store::PoisonStore;
use common_procedure::error::{DeleteStatesSnafu, ListStateSnafu, PutStateSnafu};
use common_procedure::store::state_store::{KeySet, KeyValueStream, StateStore};
use common_procedure::store::util::multiple_value_stream;
use common_procedure::Result as ProcedureResult;
use futures::future::try_join_all;
use futures::StreamExt;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use snafu::ResultExt;
use crate::error::{ProcedurePoisonConflictSnafu, Result, UnexpectedSnafu};
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{DeserializedValueWithBytes, MetadataValue};
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
use crate::error::Result;
use crate::kv_backend::KvBackendRef;
use crate::range_stream::PaginationStream;
use crate::rpc::store::{BatchDeleteRequest, PutRequest, RangeRequest};
@@ -39,16 +32,11 @@ use crate::rpc::KeyValue;
const DELIMITER: &str = "/";
const PROCEDURE_PREFIX: &str = "/__procedure__/";
const PROCEDURE_POISON_KEY_PREFIX: &str = "/__procedure_poison/";
fn with_prefix(key: &str) -> String {
format!("{PROCEDURE_PREFIX}{key}")
}
fn with_poison_prefix(key: &str) -> String {
format!("{}{}", PROCEDURE_POISON_KEY_PREFIX, key)
}
fn strip_prefix(key: &str) -> String {
key.trim_start_matches(PROCEDURE_PREFIX).to_string()
}
@@ -219,168 +207,8 @@ impl StateStore for KvStateStore {
}
}
/// The value of the poison key.
///
/// Each poison value contains a unique token that identifies the procedure.
/// While multiple procedures may use the same poison key (representing the same resource),
/// each procedure will have a distinct token value to differentiate its ownership.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PoisonValue {
token: String,
}
type PoisonDecodeResult = Result<Option<DeserializedValueWithBytes<PoisonValue>>>;
impl KvStateStore {
/// Builds a create poison transaction,
/// it expected the `__procedure_poison/{key}` wasn't occupied.
fn build_create_poison_txn(
&self,
key: &str,
value: &PoisonValue,
) -> Result<(
Txn,
impl FnOnce(&mut TxnOpGetResponseSet) -> PoisonDecodeResult,
)> {
let key = key.as_bytes().to_vec();
let value = value.try_as_raw_value()?;
let txn = Txn::put_if_not_exists(key.clone(), value);
Ok((
txn,
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)),
))
}
/// Builds a delete poison transaction,
/// it expected the `__procedure_poison/{key}` was occupied.
fn build_delete_poison_txn(
&self,
key: &str,
value: PoisonValue,
) -> Result<(
Txn,
impl FnOnce(&mut TxnOpGetResponseSet) -> PoisonDecodeResult,
)> {
let key = key.as_bytes().to_vec();
let value = value.try_as_raw_value()?;
let txn = Txn::new()
.when(vec![Compare::with_value(
key.clone(),
CompareOp::Equal,
value,
)])
.and_then(vec![TxnOp::Delete(key.clone())])
.or_else(vec![TxnOp::Get(key.clone())]);
Ok((
txn,
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)),
))
}
async fn get_poison_inner(&self, key: &str) -> Result<Option<PoisonValue>> {
let key = with_poison_prefix(key);
let value = self.kv_backend.get(key.as_bytes()).await?;
value
.map(|v| PoisonValue::try_from_raw_value(&v.value))
.transpose()
}
/// Put the poison.
///
/// If the poison is already put by other procedure, it will return an error.
async fn set_poison_inner(&self, key: &str, token: &str) -> Result<()> {
let key = with_poison_prefix(key);
let (txn, on_failure) = self.build_create_poison_txn(
&key,
&PoisonValue {
token: token.to_string(),
},
)?;
let mut resp = self.kv_backend.txn(txn).await?;
if !resp.succeeded {
let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
let remote_value = on_failure(&mut set)?
.context(UnexpectedSnafu {
err_msg: "Reads the empty poison value in comparing operation of the put consistency poison",
})?
.into_inner();
ensure!(
remote_value.token == token,
ProcedurePoisonConflictSnafu {
key: &key,
value: &remote_value.token,
}
);
}
Ok(())
}
/// Deletes the poison.
///
/// If the poison is not put by the procedure, it will return an error.
async fn delete_poison_inner(&self, key: &str, token: &str) -> Result<()> {
let key = with_poison_prefix(key);
let (txn, on_failure) = self.build_delete_poison_txn(
&key,
PoisonValue {
token: token.to_string(),
},
)?;
let mut resp = self.kv_backend.txn(txn).await?;
if !resp.succeeded {
let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
let remote_value = on_failure(&mut set)?;
ensure!(
remote_value.is_none(),
ProcedurePoisonConflictSnafu {
key: &key,
value: &remote_value.unwrap().into_inner().token,
}
);
}
Ok(())
}
}
#[async_trait]
impl PoisonStore for KvStateStore {
async fn try_put_poison(&self, key: String, token: String) -> ProcedureResult<()> {
self.set_poison_inner(&key, &token)
.await
.map_err(BoxedError::new)
.context(PutPoisonSnafu { key, token })
}
async fn delete_poison(&self, key: String, token: String) -> ProcedureResult<()> {
self.delete_poison_inner(&key, &token)
.await
.map_err(BoxedError::new)
.context(DeletePoisonSnafu { key, token })
}
async fn get_poison(&self, key: &str) -> ProcedureResult<Option<String>> {
self.get_poison_inner(key)
.await
.map(|v| v.map(|v| v.token))
.map_err(BoxedError::new)
.context(GetPoisonSnafu { key })
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::env;
use std::sync::Arc;
@@ -391,7 +219,6 @@ mod tests {
use uuid::Uuid;
use super::*;
use crate::error::Error;
use crate::kv_backend::chroot::ChrootKvBackend;
use crate::kv_backend::etcd::EtcdStore;
use crate::kv_backend::memory::MemoryKvBackend;
@@ -570,73 +397,4 @@ mod tests {
)
.await;
}
#[tokio::test]
async fn test_poison() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let poison_manager = KvStateStore::new(mem_kv.clone());
let key = "table/1";
let token = "expected_token";
poison_manager.set_poison_inner(key, token).await.unwrap();
// Put again, should be ok.
poison_manager.set_poison_inner(key, token).await.unwrap();
// Delete, should be ok.
poison_manager
.delete_poison_inner(key, token)
.await
.unwrap();
// Delete again, should be ok.
poison_manager
.delete_poison_inner(key, token)
.await
.unwrap();
}
#[tokio::test]
async fn test_consistency_poison_failed() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let poison_manager = KvStateStore::new(mem_kv.clone());
let key = "table/1";
let token = "expected_token";
let token2 = "expected_token2";
poison_manager.set_poison_inner(key, token).await.unwrap();
let err = poison_manager
.set_poison_inner(key, token2)
.await
.unwrap_err();
assert_matches!(err, Error::ProcedurePoisonConflict { .. });
let err = poison_manager
.delete_poison_inner(key, token2)
.await
.unwrap_err();
assert_matches!(err, Error::ProcedurePoisonConflict { .. });
}
#[test]
fn test_serialize_deserialize() {
let key = "table/1";
let value = PoisonValue {
token: "expected_token".to_string(),
};
let serialized_key = with_poison_prefix(key).as_bytes().to_vec();
let serialized_value = value.try_as_raw_value().unwrap();
let expected_key = "/__procedure_poison/table/1";
let expected_value = r#"{"token":"expected_token"}"#;
assert_eq!(expected_key.as_bytes(), serialized_key);
assert_eq!(expected_value.as_bytes(), serialized_value);
}
}

View File

@@ -9,5 +9,4 @@ workspace = true
[dependencies]
async-trait.workspace = true
common-procedure = { workspace = true, features = ["testing"] }
snafu.workspace = true
common-procedure.workspace = true

View File

@@ -18,32 +18,21 @@ use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use common_procedure::store::poison_store::PoisonStore;
use common_procedure::test_util::InMemoryPoisonStore;
use common_procedure::{
Context, ContextProvider, Output, PoisonKey, Procedure, ProcedureId, ProcedureState,
ProcedureWithId, Result, Status,
Context, ContextProvider, Output, Procedure, ProcedureId, ProcedureState, ProcedureWithId,
Result, Status,
};
/// A Mock [ContextProvider].
#[derive(Default)]
pub struct MockContextProvider {
states: HashMap<ProcedureId, ProcedureState>,
poison_manager: InMemoryPoisonStore,
}
impl MockContextProvider {
/// Returns a new provider.
pub fn new(states: HashMap<ProcedureId, ProcedureState>) -> MockContextProvider {
MockContextProvider {
states,
poison_manager: InMemoryPoisonStore::default(),
}
}
/// Returns a reference to the poison manager.
pub fn poison_manager(&self) -> &InMemoryPoisonStore {
&self.poison_manager
MockContextProvider { states }
}
}
@@ -52,12 +41,6 @@ impl ContextProvider for MockContextProvider {
async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
Ok(self.states.get(&procedure_id).cloned())
}
async fn try_put_poison(&self, key: &PoisonKey, procedure_id: ProcedureId) -> Result<()> {
self.poison_manager
.try_put_poison(key.to_string(), procedure_id.to_string())
.await
}
}
/// Executes a procedure until it returns [Status::Done].
@@ -78,7 +61,6 @@ pub async fn execute_procedure_until_done(procedure: &mut dyn Procedure) -> Opti
"Executing subprocedure is unsupported"
),
Status::Done { output } => return output,
Status::Poisoned { .. } => return None,
}
}
}
@@ -106,7 +88,6 @@ pub async fn execute_procedure_once(
false
}
Status::Done { .. } => true,
Status::Poisoned { .. } => false,
}
}
@@ -128,7 +109,6 @@ pub async fn execute_until_suspended_or_done(
Status::Executing { .. } => (),
Status::Suspended { subprocedures, .. } => return Some(subprocedures),
Status::Done { .. } => break,
Status::Poisoned { .. } => unreachable!(),
}
}

View File

@@ -21,21 +21,14 @@ use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
use crate::procedure::ProcedureId;
use crate::PoisonKey;
/// Procedure error.
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display(
"Failed to execute procedure due to external error, clean poisons: {}",
clean_poisons
))]
External {
source: BoxedError,
clean_poisons: bool,
},
#[snafu(display("Failed to execute procedure due to external error"))]
External { source: BoxedError },
#[snafu(display("Loader {} is already registered", name))]
LoaderConflict {
@@ -73,32 +66,6 @@ pub enum Error {
source: BoxedError,
},
#[snafu(display("Failed to put poison, key: '{key}', token: '{token}'"))]
PutPoison {
key: String,
token: String,
#[snafu(implicit)]
location: Location,
source: BoxedError,
},
#[snafu(display("Failed to get poison, key: '{key}'"))]
GetPoison {
key: String,
#[snafu(implicit)]
location: Location,
source: BoxedError,
},
#[snafu(display("Failed to delete poison, key: '{key}', token: '{token}'"))]
DeletePoison {
key: String,
token: String,
#[snafu(implicit)]
location: Location,
source: BoxedError,
},
#[snafu(display("Failed to delete {}", key))]
DeleteState {
key: String,
@@ -208,21 +175,6 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Procedure not found, procedure_id: {}", procedure_id))]
ProcedureNotFound {
procedure_id: ProcedureId,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Poison key not defined, key: '{key}', procedure_id: '{procedure_id}'"))]
PoisonKeyNotDefined {
key: PoisonKey,
procedure_id: ProcedureId,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -233,18 +185,14 @@ impl ErrorExt for Error {
Error::External { source, .. }
| Error::PutState { source, .. }
| Error::DeleteStates { source, .. }
| Error::ListState { source, .. }
| Error::PutPoison { source, .. }
| Error::DeletePoison { source, .. }
| Error::GetPoison { source, .. } => source.status_code(),
| Error::ListState { source, .. } => source.status_code(),
Error::ToJson { .. }
| Error::DeleteState { .. }
| Error::FromJson { .. }
| Error::WaitWatcher { .. }
| Error::RetryLater { .. }
| Error::RollbackProcedureRecovered { .. }
| Error::PoisonKeyNotDefined { .. } => StatusCode::Internal,
| Error::RollbackProcedureRecovered { .. } => StatusCode::Internal,
Error::RetryTimesExceeded { .. }
| Error::RollbackTimesExceeded { .. }
@@ -256,8 +204,7 @@ impl ErrorExt for Error {
}
Error::ProcedurePanic { .. }
| Error::ParseSegmentKey { .. }
| Error::Unexpected { .. }
| &Error::ProcedureNotFound { .. } => StatusCode::Unexpected,
| Error::Unexpected { .. } => StatusCode::Unexpected,
Error::ProcedureExec { source, .. } => source.status_code(),
Error::StartRemoveOutdatedMetaTask { source, .. }
| Error::StopRemoveOutdatedMetaTask { source, .. } => source.status_code(),
@@ -274,15 +221,6 @@ impl Error {
pub fn external<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
Error::External {
source: BoxedError::new(err),
clean_poisons: false,
}
}
/// Creates a new [Error::External] error from source `err` and clean poisons.
pub fn external_and_clean_poisons<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
Error::External {
source: BoxedError::new(err),
clean_poisons: true,
}
}
@@ -298,11 +236,6 @@ impl Error {
matches!(self, Error::RetryLater { .. })
}
/// Determine whether it needs to clean poisons.
pub fn need_clean_poisons(&self) -> bool {
matches!(self, Error::External { clean_poisons, .. } if *clean_poisons)
}
/// Creates a new [Error::RetryLater] or [Error::External] error from source `err` according
/// to its [StatusCode].
pub fn from_error_ext<E: ErrorExt + Send + Sync + 'static>(err: E) -> Self {

View File

@@ -23,13 +23,10 @@ mod procedure;
pub mod store;
pub mod watcher;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
pub use crate::error::{Error, Result};
pub use crate::procedure::{
BoxedProcedure, BoxedProcedureLoader, Context, ContextProvider, LockKey, Output, ParseIdError,
PoisonKey, PoisonKeys, Procedure, ProcedureId, ProcedureInfo, ProcedureManager,
ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, StringKey,
Procedure, ProcedureId, ProcedureInfo, ProcedureManager, ProcedureManagerRef, ProcedureState,
ProcedureWithId, Status, StringKey,
};
pub use crate::watcher::Watcher;

View File

@@ -25,23 +25,21 @@ use backon::ExponentialBuilder;
use common_runtime::{RepeatedTask, TaskFunction};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{error, info, tracing};
use snafu::{ensure, OptionExt, ResultExt};
use snafu::{ensure, ResultExt};
use tokio::sync::watch::{self, Receiver, Sender};
use tokio::sync::{Mutex as TokioMutex, Notify};
use self::rwlock::KeyRwLock;
use crate::error::{
self, DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu,
PoisonKeyNotDefinedSnafu, ProcedureNotFoundSnafu, Result, StartRemoveOutdatedMetaTaskSnafu,
StopRemoveOutdatedMetaTaskSnafu,
self, DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu, Result,
StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu,
};
use crate::local::runner::Runner;
use crate::procedure::{BoxedProcedureLoader, InitProcedureState, PoisonKeys, ProcedureInfo};
use crate::store::poison_store::PoisonStoreRef;
use crate::procedure::{BoxedProcedureLoader, InitProcedureState, ProcedureInfo};
use crate::store::{ProcedureMessage, ProcedureMessages, ProcedureStore, StateStoreRef};
use crate::{
BoxedProcedure, ContextProvider, LockKey, PoisonKey, ProcedureId, ProcedureManager,
ProcedureState, ProcedureWithId, Watcher,
BoxedProcedure, ContextProvider, LockKey, ProcedureId, ProcedureManager, ProcedureState,
ProcedureWithId, Watcher,
};
/// The expired time of a procedure's metadata.
@@ -67,8 +65,6 @@ pub(crate) struct ProcedureMeta {
child_notify: Notify,
/// Lock required by this procedure.
lock_key: LockKey,
/// Poison keys that may cause this procedure to become poisoned during execution.
poison_keys: PoisonKeys,
/// Sender to notify the procedure state.
state_sender: Sender<ProcedureState>,
/// Receiver to watch the procedure state.
@@ -87,7 +83,6 @@ impl ProcedureMeta {
procedure_state: ProcedureState,
parent_id: Option<ProcedureId>,
lock_key: LockKey,
poison_keys: PoisonKeys,
type_name: &str,
) -> ProcedureMeta {
let (state_sender, state_receiver) = watch::channel(procedure_state);
@@ -96,7 +91,6 @@ impl ProcedureMeta {
parent_id,
child_notify: Notify::new(),
lock_key,
poison_keys,
state_sender,
state_receiver,
children: Mutex::new(Vec::new()),
@@ -169,8 +163,6 @@ pub(crate) struct ManagerContext {
finished_procedures: Mutex<VecDeque<(ProcedureId, Instant)>>,
/// Running flag.
running: Arc<AtomicBool>,
/// Poison manager.
poison_manager: PoisonStoreRef,
}
#[async_trait]
@@ -178,33 +170,11 @@ impl ContextProvider for ManagerContext {
async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
Ok(self.state(procedure_id))
}
async fn try_put_poison(&self, key: &PoisonKey, procedure_id: ProcedureId) -> Result<()> {
{
// validate the procedure exists
let procedures = self.procedures.read().unwrap();
let procedure = procedures
.get(&procedure_id)
.context(ProcedureNotFoundSnafu { procedure_id })?;
// validate the poison key is defined
ensure!(
procedure.poison_keys.contains(key),
PoisonKeyNotDefinedSnafu {
key: key.clone(),
procedure_id
}
);
}
let key = key.to_string();
let procedure_id = procedure_id.to_string();
self.poison_manager.try_put_poison(key, procedure_id).await
}
}
impl ManagerContext {
/// Returns a new [ManagerContext].
fn new(poison_manager: PoisonStoreRef) -> ManagerContext {
fn new() -> ManagerContext {
ManagerContext {
key_lock: KeyRwLock::new(),
loaders: Mutex::new(HashMap::new()),
@@ -212,7 +182,6 @@ impl ManagerContext {
messages: Mutex::new(HashMap::new()),
finished_procedures: Mutex::new(VecDeque::new()),
running: Arc::new(AtomicBool::new(false)),
poison_manager,
}
}
@@ -468,12 +437,8 @@ pub struct LocalManager {
impl LocalManager {
/// Create a new [LocalManager] with specific `config`.
pub fn new(
config: ManagerConfig,
state_store: StateStoreRef,
poison_store: PoisonStoreRef,
) -> LocalManager {
let manager_ctx = Arc::new(ManagerContext::new(poison_store));
pub fn new(config: ManagerConfig, state_store: StateStoreRef) -> LocalManager {
let manager_ctx = Arc::new(ManagerContext::new());
LocalManager {
manager_ctx,
@@ -511,7 +476,6 @@ impl LocalManager {
procedure_state,
None,
procedure.lock_key(),
procedure.poison_keys(),
procedure.type_name(),
));
let runner = Runner {
@@ -754,7 +718,6 @@ pub(crate) mod test_util {
ProcedureState::Running,
None,
LockKey::default(),
PoisonKeys::default(),
"ProcedureAdapter",
)
}
@@ -778,17 +741,11 @@ mod tests {
use super::*;
use crate::error::{self, Error};
use crate::store::state_store::ObjectStateStore;
use crate::test_util::InMemoryPoisonStore;
use crate::{Context, Procedure, Status};
fn new_test_manager_context() -> ManagerContext {
let poison_manager = Arc::new(InMemoryPoisonStore::default());
ManagerContext::new(poison_manager)
}
#[test]
fn test_manager_context() {
let ctx = new_test_manager_context();
let ctx = ManagerContext::new();
let meta = Arc::new(test_util::procedure_meta_for_test());
assert!(!ctx.contains_procedure(meta.id));
@@ -804,7 +761,7 @@ mod tests {
#[test]
fn test_manager_context_insert_duplicate() {
let ctx = new_test_manager_context();
let ctx = ManagerContext::new();
let meta = Arc::new(test_util::procedure_meta_for_test());
assert!(ctx.try_insert_procedure(meta.clone()));
@@ -826,7 +783,7 @@ mod tests {
#[test]
fn test_procedures_in_tree() {
let ctx = new_test_manager_context();
let ctx = ManagerContext::new();
let root = Arc::new(test_util::procedure_meta_for_test());
assert!(ctx.try_insert_procedure(root.clone()));
@@ -850,7 +807,6 @@ mod tests {
struct ProcedureToLoad {
content: String,
lock_key: LockKey,
poison_keys: PoisonKeys,
}
#[async_trait]
@@ -870,10 +826,6 @@ mod tests {
fn lock_key(&self) -> LockKey {
self.lock_key.clone()
}
fn poison_keys(&self) -> PoisonKeys {
self.poison_keys.clone()
}
}
impl ProcedureToLoad {
@@ -881,7 +833,6 @@ mod tests {
ProcedureToLoad {
content: content.to_string(),
lock_key: LockKey::default(),
poison_keys: PoisonKeys::default(),
}
}
@@ -904,8 +855,7 @@ mod tests {
..Default::default()
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
let manager = LocalManager::new(config, state_store);
manager.manager_ctx.start();
manager
@@ -929,8 +879,7 @@ mod tests {
..Default::default()
};
let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
let manager = LocalManager::new(config, state_store);
manager.manager_ctx.start();
manager
@@ -983,8 +932,7 @@ mod tests {
..Default::default()
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
let manager = LocalManager::new(config, state_store);
manager.manager_ctx.start();
let procedure_id = ProcedureId::random();
@@ -1035,8 +983,7 @@ mod tests {
..Default::default()
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
let manager = LocalManager::new(config, state_store);
manager.manager_ctx.start();
#[derive(Debug)]
@@ -1075,10 +1022,6 @@ mod tests {
fn lock_key(&self) -> LockKey {
LockKey::single_exclusive("test.submit")
}
fn poison_keys(&self) -> PoisonKeys {
PoisonKeys::default()
}
}
let check_procedure = |procedure| async {
@@ -1116,8 +1059,7 @@ mod tests {
..Default::default()
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
let manager = LocalManager::new(config, state_store);
let mut procedure = ProcedureToLoad::new("submit");
procedure.lock_key = LockKey::single_exclusive("test.submit");
@@ -1144,8 +1086,7 @@ mod tests {
..Default::default()
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
let manager = LocalManager::new(config, state_store);
manager.start().await.unwrap();
manager.stop().await.unwrap();
@@ -1180,8 +1121,7 @@ mod tests {
remove_outdated_meta_ttl: Duration::from_millis(1),
};
let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
let manager = LocalManager::new(config, state_store);
manager.manager_ctx.set_running();
let mut procedure = ProcedureToLoad::new("submit");
@@ -1256,7 +1196,6 @@ mod tests {
content: String,
lock_key: LockKey,
notify: Option<Arc<Notify>>,
poison_keys: PoisonKeys,
}
#[async_trait]
@@ -1281,10 +1220,6 @@ mod tests {
self.notify.as_ref().unwrap().notify_one();
Ok(())
}
fn poison_keys(&self) -> PoisonKeys {
self.poison_keys.clone()
}
}
impl ProcedureToRecover {
@@ -1292,7 +1227,6 @@ mod tests {
ProcedureToRecover {
content: content.to_string(),
lock_key: LockKey::default(),
poison_keys: PoisonKeys::default(),
notify: None,
}
}
@@ -1302,7 +1236,6 @@ mod tests {
let procedure = ProcedureToRecover {
content: json.to_string(),
lock_key: LockKey::default(),
poison_keys: PoisonKeys::default(),
notify: Some(notify.clone()),
};
Ok(Box::new(procedure) as _)
@@ -1323,8 +1256,7 @@ mod tests {
..Default::default()
};
let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
let manager = LocalManager::new(config, state_store);
manager.manager_ctx.start();
let notify = Arc::new(Notify::new());

View File

@@ -238,34 +238,11 @@ impl Runner {
}
ProcedureState::Done { .. } => return,
ProcedureState::Failed { .. } => return,
ProcedureState::Poisoned { .. } => return,
}
self.execute_once(ctx).await;
}
}
async fn clean_poisons(&mut self) -> Result<()> {
let mut error = None;
for key in self.meta.poison_keys.iter() {
let key = key.to_string();
if let Err(e) = self
.manager_ctx
.poison_manager
.delete_poison(key, self.meta.id.to_string())
.await
{
error!(e; "Failed to clean poisons for procedure: {}", self.meta.id);
error = Some(e);
}
}
// returns the last error if any.
if let Some(e) = error {
return Err(e);
}
Ok(())
}
async fn rollback(&mut self, ctx: &Context, err: Arc<Error>) {
if self.procedure.rollback_supported() {
if let Err(e) = self.procedure.rollback(ctx).await {
@@ -278,7 +255,7 @@ impl Runner {
}
async fn prepare_rollback(&mut self, err: Arc<Error>) {
if let Err(e) = self.write_rollback_procedure_state(err.to_string()).await {
if let Err(e) = self.write_procedure_state(err.to_string()).await {
self.meta
.set_state(ProcedureState::prepare_rollback(Arc::new(e)));
return;
@@ -311,48 +288,26 @@ impl Runner {
return;
}
// Cleans poisons before persist.
if status.need_clean_poisons() {
if let Err(e) = self.clean_poisons().await {
error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
return;
}
}
if status.need_persist() {
if let Err(e) = self.persist_procedure().await {
error!(e; "Failed to persist procedure: {}", self.meta.id);
self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
if let Err(err) = self.persist_procedure().await {
self.meta.set_state(ProcedureState::retrying(Arc::new(err)));
return;
}
}
match status {
Status::Executing { .. } => {}
Status::Executing { .. } => (),
Status::Suspended { subprocedures, .. } => {
self.on_suspended(subprocedures).await;
}
Status::Done { output } => {
if let Err(e) = self.commit_procedure().await {
error!(e; "Failed to commit procedure: {}", self.meta.id);
self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
return;
}
self.done(output);
}
Status::Poisoned { error, keys } => {
error!(
error;
"Procedure {}-{} is poisoned, keys: {:?}",
self.procedure.type_name(),
self.meta.id,
keys,
);
self.meta
.set_state(ProcedureState::poisoned(keys, Arc::new(error)));
}
}
}
Err(e) => {
@@ -372,14 +327,6 @@ impl Runner {
return;
}
if e.need_clean_poisons() {
if let Err(e) = self.clean_poisons().await {
error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
return;
}
}
if e.is_retry_later() {
self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
return;
@@ -392,9 +339,7 @@ impl Runner {
}
ProcedureState::PrepareRollback { error } => self.prepare_rollback(error).await,
ProcedureState::RollingBack { error } => self.rollback(ctx, error).await,
ProcedureState::Failed { .. }
| ProcedureState::Done { .. }
| ProcedureState::Poisoned { .. } => (),
ProcedureState::Failed { .. } | ProcedureState::Done { .. } => (),
}
}
@@ -427,7 +372,6 @@ impl Runner {
procedure_state,
Some(self.meta.id),
procedure.lock_key(),
procedure.poison_keys(),
procedure.type_name(),
));
let runner = Runner {
@@ -550,7 +494,7 @@ impl Runner {
Ok(())
}
async fn write_rollback_procedure_state(&mut self, error: String) -> Result<()> {
async fn write_procedure_state(&mut self, error: String) -> Result<()> {
// Persists procedure state
let type_name = self.procedure.type_name().to_string();
let data = self.procedure.dump()?;
@@ -605,10 +549,8 @@ mod tests {
use super::*;
use crate::local::test_util;
use crate::procedure::PoisonKeys;
use crate::store::proc_path;
use crate::test_util::InMemoryPoisonStore;
use crate::{ContextProvider, Error, LockKey, PoisonKey, Procedure};
use crate::{ContextProvider, Error, LockKey, Procedure};
const ROOT_ID: &str = "9f805a1f-05f7-490c-9f91-bd56e3cc54c1";
@@ -620,9 +562,7 @@ mod tests {
Runner {
meta,
procedure,
manager_ctx: Arc::new(ManagerContext::new(
Arc::new(InMemoryPoisonStore::default()),
)),
manager_ctx: Arc::new(ManagerContext::new()),
step: 0,
exponential_builder: ExponentialBuilder::default(),
store,
@@ -647,16 +587,6 @@ mod tests {
assert_eq!(files, files_in_dir);
}
fn context_with_provider(
procedure_id: ProcedureId,
provider: Arc<dyn ContextProvider>,
) -> Context {
Context {
procedure_id,
provider,
}
}
fn context_without_provider(procedure_id: ProcedureId) -> Context {
struct MockProvider;
@@ -668,14 +598,6 @@ mod tests {
) -> Result<Option<ProcedureState>> {
unimplemented!()
}
async fn try_put_poison(
&self,
_key: &PoisonKey,
_procedure_id: ProcedureId,
) -> Result<()> {
unimplemented!()
}
}
Context {
@@ -689,7 +611,6 @@ mod tests {
struct ProcedureAdapter<F> {
data: String,
lock_key: LockKey,
poison_keys: PoisonKeys,
exec_fn: F,
rollback_fn: Option<RollbackFn>,
}
@@ -699,7 +620,6 @@ mod tests {
let mut meta = test_util::procedure_meta_for_test();
meta.id = ProcedureId::parse_str(uuid).unwrap();
meta.lock_key = self.lock_key.clone();
meta.poison_keys = self.poison_keys.clone();
Arc::new(meta)
}
@@ -737,10 +657,6 @@ mod tests {
fn lock_key(&self) -> LockKey {
self.lock_key.clone()
}
fn poison_keys(&self) -> PoisonKeys {
self.poison_keys.clone()
}
}
async fn execute_once_normal(persist: bool, first_files: &[&str], second_files: &[&str]) {
@@ -749,7 +665,7 @@ mod tests {
times += 1;
async move {
if times == 1 {
Ok(Status::executing(persist))
Ok(Status::Executing { persist })
} else {
Ok(Status::done())
}
@@ -759,7 +675,6 @@ mod tests {
let normal = ProcedureAdapter {
data: "normal".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::default(),
exec_fn,
rollback_fn: None,
};
@@ -824,7 +739,6 @@ mod tests {
let suspend = ProcedureAdapter {
data: "suspend".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::default(),
exec_fn,
rollback_fn: None,
};
@@ -849,7 +763,7 @@ mod tests {
async move {
if times == 1 {
time::sleep(Duration::from_millis(200)).await;
Ok(Status::executing(true))
Ok(Status::Executing { persist: true })
} else {
Ok(Status::done())
}
@@ -859,7 +773,6 @@ mod tests {
let child = ProcedureAdapter {
data: "child".to_string(),
lock_key: LockKey::new_exclusive(keys.iter().map(|k| k.to_string())),
poison_keys: PoisonKeys::default(),
exec_fn,
rollback_fn: None,
};
@@ -929,7 +842,6 @@ mod tests {
let parent = ProcedureAdapter {
data: "parent".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::default(),
exec_fn,
rollback_fn: None,
};
@@ -941,8 +853,7 @@ mod tests {
let object_store = test_util::new_object_store(&dir);
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store.clone());
let poison_manager = Arc::new(InMemoryPoisonStore::default());
let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
let manager_ctx = Arc::new(ManagerContext::new());
manager_ctx.start();
// Manually add this procedure to the manager ctx.
assert!(manager_ctx.try_insert_procedure(meta));
@@ -974,11 +885,10 @@ mod tests {
#[tokio::test]
async fn test_running_is_stopped() {
let exec_fn = move |_| async move { Ok(Status::executing(true)) }.boxed();
let exec_fn = move |_| async move { Ok(Status::Executing { persist: true }) }.boxed();
let normal = ProcedureAdapter {
data: "normal".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::default(),
exec_fn,
rollback_fn: None,
};
@@ -1023,7 +933,6 @@ mod tests {
let normal = ProcedureAdapter {
data: "fail".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::default(),
exec_fn,
rollback_fn: None,
};
@@ -1050,7 +959,6 @@ mod tests {
let fail = ProcedureAdapter {
data: "fail".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::default(),
exec_fn,
rollback_fn: None,
};
@@ -1087,7 +995,6 @@ mod tests {
let fail = ProcedureAdapter {
data: "fail".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::default(),
exec_fn,
rollback_fn: Some(Box::new(rollback_fn)),
};
@@ -1139,7 +1046,6 @@ mod tests {
let retry_later = ProcedureAdapter {
data: "retry_later".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::default(),
exec_fn,
rollback_fn: None,
};
@@ -1176,7 +1082,6 @@ mod tests {
let exceed_max_retry_later = ProcedureAdapter {
data: "exceed_max_retry_later".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::default(),
exec_fn,
rollback_fn: None,
};
@@ -1212,7 +1117,6 @@ mod tests {
let exceed_max_retry_later = ProcedureAdapter {
data: "exceed_max_rollback".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::default(),
exec_fn,
rollback_fn: Some(Box::new(rollback_fn)),
};
@@ -1255,7 +1159,6 @@ mod tests {
let retry_later = ProcedureAdapter {
data: "rollback_after_retry_fail".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::default(),
exec_fn,
rollback_fn: Some(Box::new(rollback_fn)),
};
@@ -1300,7 +1203,6 @@ mod tests {
let fail = ProcedureAdapter {
data: "fail".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"),
poison_keys: PoisonKeys::default(),
exec_fn,
rollback_fn: None,
};
@@ -1336,7 +1238,6 @@ mod tests {
let parent = ProcedureAdapter {
data: "parent".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::default(),
exec_fn,
rollback_fn: None,
};
@@ -1347,8 +1248,7 @@ mod tests {
let object_store = test_util::new_object_store(&dir);
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store);
let poison_manager = Arc::new(InMemoryPoisonStore::default());
let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
let manager_ctx = Arc::new(ManagerContext::new());
manager_ctx.start();
// Manually add this procedure to the manager ctx.
assert!(manager_ctx.try_insert_procedure(meta.clone()));
@@ -1361,327 +1261,4 @@ mod tests {
let err = meta.state().error().unwrap().output_msg();
assert!(err.contains("subprocedure failed"), "{err}");
}
#[tokio::test]
async fn test_execute_with_clean_poisons() {
common_telemetry::init_default_ut_logging();
let mut times = 0;
let poison_key = PoisonKey::new("table/1024");
let moved_poison_key = poison_key.clone();
let exec_fn = move |ctx: Context| {
times += 1;
let poison_key = moved_poison_key.clone();
async move {
if times == 1 {
// Put the poison to the context.
ctx.provider
.try_put_poison(&poison_key, ctx.procedure_id)
.await
.unwrap();
Ok(Status::executing(true))
} else {
Ok(Status::executing_with_clean_poisons(true))
}
}
.boxed()
};
let poison = ProcedureAdapter {
data: "poison".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
exec_fn,
rollback_fn: None,
};
let dir = create_temp_dir("clean_poisons");
let meta = poison.new_meta(ROOT_ID);
let object_store = test_util::new_object_store(&dir);
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
// Use the manager ctx as the context provider.
let ctx = context_with_provider(
meta.id,
runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
);
// Manually add this procedure to the manager ctx.
runner
.manager_ctx
.procedures
.write()
.unwrap()
.insert(meta.id, runner.meta.clone());
runner.manager_ctx.start();
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_running(), "{state:?}");
let procedure_id = runner
.manager_ctx
.poison_manager
.get_poison(&poison_key.to_string())
.await
.unwrap();
// poison key should be exist.
assert!(procedure_id.is_some());
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_running(), "{state:?}");
let procedure_id = runner
.manager_ctx
.poison_manager
.get_poison(&poison_key.to_string())
.await
.unwrap();
// poison key should be deleted.
assert!(procedure_id.is_none());
}
#[tokio::test]
async fn test_execute_error_with_clean_poisons() {
common_telemetry::init_default_ut_logging();
let mut times = 0;
let poison_key = PoisonKey::new("table/1024");
let moved_poison_key = poison_key.clone();
let exec_fn = move |ctx: Context| {
times += 1;
let poison_key = moved_poison_key.clone();
async move {
if times == 1 {
// Put the poison to the context.
ctx.provider
.try_put_poison(&poison_key, ctx.procedure_id)
.await
.unwrap();
Ok(Status::executing(true))
} else {
Err(Error::external_and_clean_poisons(MockError::new(
StatusCode::Unexpected,
)))
}
}
.boxed()
};
let poison = ProcedureAdapter {
data: "poison".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
exec_fn,
rollback_fn: None,
};
let dir = create_temp_dir("error_with_clean_poisons");
let meta = poison.new_meta(ROOT_ID);
let object_store = test_util::new_object_store(&dir);
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
// Use the manager ctx as the context provider.
let ctx = context_with_provider(
meta.id,
runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
);
// Manually add this procedure to the manager ctx.
runner
.manager_ctx
.procedures
.write()
.unwrap()
.insert(meta.id, runner.meta.clone());
runner.manager_ctx.start();
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_running(), "{state:?}");
let procedure_id = runner
.manager_ctx
.poison_manager
.get_poison(&poison_key.to_string())
.await
.unwrap();
// poison key should be exist.
assert!(procedure_id.is_some());
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_prepare_rollback(), "{state:?}");
let procedure_id = runner
.manager_ctx
.poison_manager
.get_poison(&poison_key.to_string())
.await
.unwrap();
// poison key should be deleted.
assert!(procedure_id.is_none());
}
#[tokio::test]
async fn test_execute_failed_after_set_poison() {
let mut times = 0;
let poison_key = PoisonKey::new("table/1024");
let moved_poison_key = poison_key.clone();
let exec_fn = move |ctx: Context| {
times += 1;
let poison_key = moved_poison_key.clone();
async move {
if times == 1 {
Ok(Status::executing(true))
} else {
// Put the poison to the context.
ctx.provider
.try_put_poison(&poison_key, ctx.procedure_id)
.await
.unwrap();
Err(Error::external(MockError::new(StatusCode::Unexpected)))
}
}
.boxed()
};
let poison = ProcedureAdapter {
data: "poison".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
exec_fn,
rollback_fn: None,
};
let dir = create_temp_dir("poison");
let meta = poison.new_meta(ROOT_ID);
let object_store = test_util::new_object_store(&dir);
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
// Use the manager ctx as the context provider.
let ctx = context_with_provider(
meta.id,
runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
);
// Manually add this procedure to the manager ctx.
runner
.manager_ctx
.procedures
.write()
.unwrap()
.insert(meta.id, runner.meta.clone());
runner.manager_ctx.start();
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_running(), "{state:?}");
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_prepare_rollback(), "{state:?}");
assert!(meta.state().is_prepare_rollback());
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_failed(), "{state:?}");
assert!(meta.state().is_failed());
// Check the poison is set.
let procedure_id = runner
.manager_ctx
.poison_manager
.get_poison(&poison_key.to_string())
.await
.unwrap()
.unwrap();
// If the procedure is poisoned, the poison key shouldn't be deleted.
assert_eq!(&procedure_id.to_string(), ROOT_ID);
}
#[tokio::test]
async fn test_execute_poisoned() {
let mut times = 0;
let poison_key = PoisonKey::new("table/1024");
let moved_poison_key = poison_key.clone();
let exec_fn = move |ctx: Context| {
times += 1;
let poison_key = moved_poison_key.clone();
async move {
if times == 1 {
Ok(Status::executing(true))
} else {
// Put the poison to the context.
ctx.provider
.try_put_poison(&poison_key, ctx.procedure_id)
.await
.unwrap();
Ok(Status::Poisoned {
keys: PoisonKeys::new(vec![poison_key.clone()]),
error: Error::external(MockError::new(StatusCode::Unexpected)),
})
}
}
.boxed()
};
let poison = ProcedureAdapter {
data: "poison".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
exec_fn,
rollback_fn: None,
};
let dir = create_temp_dir("poison");
let meta = poison.new_meta(ROOT_ID);
let object_store = test_util::new_object_store(&dir);
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
// Use the manager ctx as the context provider.
let ctx = context_with_provider(
meta.id,
runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
);
// Manually add this procedure to the manager ctx.
runner
.manager_ctx
.procedures
.write()
.unwrap()
.insert(meta.id, runner.meta.clone());
runner.manager_ctx.start();
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_running(), "{state:?}");
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_poisoned(), "{state:?}");
assert!(meta.state().is_poisoned());
check_files(
&object_store,
&procedure_store,
ctx.procedure_id,
&["0000000000.step"],
)
.await;
// Check the poison is set.
let procedure_id = runner
.manager_ctx
.poison_manager
.get_poison(&poison_key.to_string())
.await
.unwrap()
.unwrap();
// If the procedure is poisoned, the poison key shouldn't be deleted.
assert_eq!(procedure_id, ROOT_ID);
}
}

View File

@@ -14,7 +14,6 @@
use std::any::Any;
use std::fmt;
use std::fmt::Display;
use std::str::FromStr;
use std::sync::Arc;
@@ -36,8 +35,6 @@ pub enum Status {
Executing {
/// Whether the framework needs to persist the procedure.
persist: bool,
/// Whether the framework needs to clean the poisons.
clean_poisons: bool,
},
/// The procedure has suspended itself and is waiting for subprocedures.
Suspended {
@@ -45,40 +42,14 @@ pub enum Status {
/// Whether the framework needs to persist the procedure.
persist: bool,
},
/// The procedure is poisoned.
Poisoned {
/// The keys that cause the procedure to be poisoned.
keys: PoisonKeys,
/// The error that cause the procedure to be poisoned.
error: Error,
},
/// the procedure is done.
Done { output: Option<Output> },
}
impl Status {
/// Returns a [Status::Poisoned] with given `keys` and `error`.
pub fn poisoned(keys: impl IntoIterator<Item = PoisonKey>, error: Error) -> Status {
Status::Poisoned {
keys: PoisonKeys::new(keys),
error,
}
}
/// Returns a [Status::Executing] with given `persist` flag.
pub fn executing(persist: bool) -> Status {
Status::Executing {
persist,
clean_poisons: false,
}
}
/// Returns a [Status::Executing] with given `persist` flag and clean poisons.
pub fn executing_with_clean_poisons(persist: bool) -> Status {
Status::Executing {
persist,
clean_poisons: true,
}
Status::Executing { persist }
}
/// Returns a [Status::Done] without output.
@@ -115,20 +86,11 @@ impl Status {
/// Returns `true` if the procedure needs the framework to persist its intermediate state.
pub fn need_persist(&self) -> bool {
// If the procedure is done, the framework doesn't need to persist the procedure
// anymore. It only needs to mark the procedure as committed.
match self {
// If the procedure is done/poisoned, the framework doesn't need to persist the procedure
// anymore. It only needs to mark the procedure as committed.
Status::Executing { persist, .. } | Status::Suspended { persist, .. } => *persist,
Status::Done { .. } | Status::Poisoned { .. } => false,
}
}
/// Returns `true` if the framework needs to clean the poisons.
pub fn need_clean_poisons(&self) -> bool {
match self {
Status::Executing { clean_poisons, .. } => *clean_poisons,
Status::Done { .. } => true,
_ => false,
Status::Executing { persist } | Status::Suspended { persist, .. } => *persist,
Status::Done { .. } => false,
}
}
}
@@ -138,12 +100,6 @@ impl Status {
pub trait ContextProvider: Send + Sync {
/// Query the procedure state.
async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>>;
/// Try to put a poison key for a procedure.
///
/// This method is used to mark a resource as being operated on by a procedure.
/// If the poison key already exists with a different value, the operation will fail.
async fn try_put_poison(&self, key: &PoisonKey, procedure_id: ProcedureId) -> Result<()>;
}
/// Reference-counted pointer to [ContextProvider].
@@ -191,11 +147,6 @@ pub trait Procedure: Send {
/// Returns the [LockKey] that this procedure needs to acquire.
fn lock_key(&self) -> LockKey;
/// Returns the [PoisonKeys] that may cause this procedure to become poisoned during execution.
fn poison_keys(&self) -> PoisonKeys {
PoisonKeys::default()
}
}
#[async_trait]
@@ -223,54 +174,6 @@ impl<T: Procedure + ?Sized> Procedure for Box<T> {
fn lock_key(&self) -> LockKey {
(**self).lock_key()
}
fn poison_keys(&self) -> PoisonKeys {
(**self).poison_keys()
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct PoisonKey(String);
impl Display for PoisonKey {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
impl PoisonKey {
/// Creates a new [PoisonKey] from a [String].
pub fn new(key: impl Into<String>) -> Self {
Self(key.into())
}
}
/// A collection of [PoisonKey]s.
///
/// This type is used to represent the keys that may cause the procedure to become poisoned during execution.
#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Default)]
pub struct PoisonKeys(SmallVec<[PoisonKey; 2]>);
impl PoisonKeys {
/// Creates a new [PoisonKeys] from a [String].
pub fn single(key: impl Into<String>) -> Self {
Self(smallvec![PoisonKey::new(key)])
}
/// Creates a new [PoisonKeys] from a [PoisonKey].
pub fn new(keys: impl IntoIterator<Item = PoisonKey>) -> Self {
Self(keys.into_iter().collect())
}
/// Returns `true` if the [PoisonKeys] contains the given [PoisonKey].
pub fn contains(&self, key: &PoisonKey) -> bool {
self.0.contains(key)
}
/// Returns an iterator over the [PoisonKey]s.
pub fn iter(&self) -> impl Iterator<Item = &PoisonKey> {
self.0.iter()
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
@@ -422,8 +325,6 @@ pub enum ProcedureState {
RollingBack { error: Arc<Error> },
/// The procedure is failed and cannot proceed anymore.
Failed { error: Arc<Error> },
/// The procedure is poisoned.
Poisoned { keys: PoisonKeys, error: Arc<Error> },
}
impl ProcedureState {
@@ -447,11 +348,6 @@ impl ProcedureState {
ProcedureState::Retrying { error }
}
/// Returns a [ProcedureState] with poisoned state.
pub fn poisoned(keys: PoisonKeys, error: Arc<Error>) -> ProcedureState {
ProcedureState::Poisoned { keys, error }
}
/// Returns true if the procedure state is running.
pub fn is_running(&self) -> bool {
matches!(self, ProcedureState::Running)
@@ -462,11 +358,6 @@ impl ProcedureState {
matches!(self, ProcedureState::Done { .. })
}
/// Returns true if the procedure state is poisoned.
pub fn is_poisoned(&self) -> bool {
matches!(self, ProcedureState::Poisoned { .. })
}
/// Returns true if the procedure state failed.
pub fn is_failed(&self) -> bool {
matches!(self, ProcedureState::Failed { .. })
@@ -493,7 +384,6 @@ impl ProcedureState {
ProcedureState::Failed { error } => Some(error),
ProcedureState::Retrying { error } => Some(error),
ProcedureState::RollingBack { error } => Some(error),
ProcedureState::Poisoned { error, .. } => Some(error),
_ => None,
}
}
@@ -507,7 +397,6 @@ impl ProcedureState {
ProcedureState::Failed { .. } => "Failed",
ProcedureState::PrepareRollback { .. } => "PrepareRollback",
ProcedureState::RollingBack { .. } => "RollingBack",
ProcedureState::Poisoned { .. } => "Poisoned",
}
}
}
@@ -581,18 +470,12 @@ mod tests {
#[test]
fn test_status() {
let status = Status::executing(false);
let status = Status::Executing { persist: false };
assert!(!status.need_persist());
let status = Status::executing(true);
let status = Status::Executing { persist: true };
assert!(status.need_persist());
let status = Status::executing_with_clean_poisons(false);
assert!(status.need_clean_poisons());
let status = Status::executing_with_clean_poisons(true);
assert!(status.need_clean_poisons());
let status = Status::Suspended {
subprocedures: Vec::new(),
persist: false,
@@ -607,7 +490,6 @@ mod tests {
let status = Status::done();
assert!(!status.need_persist());
assert!(status.need_clean_poisons());
}
#[test]

View File

@@ -24,7 +24,6 @@ use crate::error::{Result, ToJsonSnafu};
pub(crate) use crate::store::state_store::StateStoreRef;
use crate::ProcedureId;
pub mod poison_store;
pub mod state_store;
pub mod util;
@@ -342,7 +341,6 @@ mod tests {
use object_store::ObjectStore;
use crate::procedure::PoisonKeys;
use crate::store::state_store::ObjectStateStore;
use crate::BoxedProcedure;
@@ -505,10 +503,6 @@ mod tests {
fn lock_key(&self) -> LockKey {
LockKey::default()
}
fn poison_keys(&self) -> PoisonKeys {
PoisonKeys::default()
}
}
#[tokio::test]

View File

@@ -1,59 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use async_trait::async_trait;
use crate::error::Result;
pub type PoisonStoreRef = Arc<dyn PoisonStore>;
/// Poison store.
///
/// This trait is used to manage the state of operations on resources, particularly
/// when an operation encounters an unrecoverable error, potentially leading to
/// metadata inconsistency. In such cases, manual intervention is required to
/// resolve the issue before any further operations can be performed on the resource.
///
/// ## Behavior:
/// - **Insertion**: When an operation begins on a resource, a "poison" key is inserted
/// into the state store to indicate the operation is in progress.
/// - **Deletion**: If the operation completes successfully or
/// other cases can ensure the resource is in a consistent state, the poison key is removed
/// from the state store, indicating the resource is in a consistent state.
/// - **Failure Handling**:
/// - If the operation fails or other cases may lead to metadata inconsistency,
/// the poison key remains in the state store.
/// - The presence of this key indicates that the resource has encountered an
/// unrecoverable error and the metadata may be inconsistent.
/// - New operations on the same resource are rejected until the resource is
/// manually recovered and the poison key is removed.
#[async_trait]
pub trait PoisonStore: Send + Sync {
/// Try to put the poison key.
///
/// If the poison key already exists with a different value, the operation will fail.
async fn try_put_poison(&self, key: String, token: String) -> Result<()>;
/// Delete the poison key.
///
/// If the poison key exists with a different value, the operation will fail.
async fn delete_poison(&self, key: String, token: String) -> Result<()>;
/// Get the poison key.
///
/// If the poison key does not exist, the operation will return `None`.
async fn get_poison(&self, key: &str) -> Result<Option<String>>;
}

View File

@@ -1,85 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use snafu::ensure;
use super::*;
use crate::error;
use crate::store::poison_store::PoisonStore;
/// A poison store that uses an in-memory map to store the poison state.
#[derive(Debug, Default)]
pub struct InMemoryPoisonStore {
map: Arc<RwLock<HashMap<String, String>>>,
}
impl InMemoryPoisonStore {
/// Create a new in-memory poison manager.
pub fn new() -> Self {
Self::default()
}
}
#[async_trait::async_trait]
impl PoisonStore for InMemoryPoisonStore {
async fn try_put_poison(&self, key: String, token: String) -> Result<()> {
let mut map = self.map.write().unwrap();
match map.entry(key) {
Entry::Vacant(v) => {
v.insert(token.to_string());
}
Entry::Occupied(o) => {
let value = o.get();
ensure!(
value == &token,
error::UnexpectedSnafu {
err_msg: format!("The poison is already set by other token {}", value)
}
);
}
}
Ok(())
}
async fn delete_poison(&self, key: String, token: String) -> Result<()> {
let mut map = self.map.write().unwrap();
match map.entry(key) {
Entry::Vacant(_) => {
// do nothing
}
Entry::Occupied(o) => {
let value = o.get();
ensure!(
value == &token,
error::UnexpectedSnafu {
err_msg: format!("The poison is not set by the token {}", value)
}
);
o.remove();
}
}
Ok(())
}
async fn get_poison(&self, key: &str) -> Result<Option<String>> {
let map = self.map.read().unwrap();
let key = key.to_string();
Ok(map.get(&key).cloned())
}
}

View File

@@ -43,10 +43,6 @@ pub async fn wait(watcher: &mut Watcher) -> Result<Option<Output>> {
ProcedureState::PrepareRollback { error } => {
debug!("commit rollback, source: {}", error)
}
ProcedureState::Poisoned { error, .. } => {
debug!("poisoned, source: {}", error);
return Err(error.clone()).context(ProcedureExecSnafu);
}
}
}
}
@@ -65,9 +61,7 @@ mod tests {
use super::*;
use crate::error::Error;
use crate::local::{test_util, LocalManager, ManagerConfig};
use crate::procedure::PoisonKeys;
use crate::store::state_store::ObjectStateStore;
use crate::test_util::InMemoryPoisonStore;
use crate::{
Context, LockKey, Procedure, ProcedureId, ProcedureManager, ProcedureWithId, Status,
};
@@ -82,8 +76,7 @@ mod tests {
..Default::default()
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let poison_manager = Arc::new(InMemoryPoisonStore::default());
let manager = LocalManager::new(config, state_store, poison_manager);
let manager = LocalManager::new(config, state_store);
manager.start().await.unwrap();
#[derive(Debug)]
@@ -113,10 +106,6 @@ mod tests {
fn lock_key(&self) -> LockKey {
LockKey::single_exclusive("test.submit")
}
fn poison_keys(&self) -> PoisonKeys {
PoisonKeys::default()
}
}
let procedure_id = ProcedureId::random();

View File

@@ -111,7 +111,7 @@ impl Eq for LoggingOptions {}
impl Default for LoggingOptions {
fn default() -> Self {
Self {
dir: "/tmp/greptimedb/logs".to_string(),
dir: "./greptimedb_data/logs".to_string(),
level: None,
log_format: LogFormat::Text,
enable_otlp_tracing: false,

View File

@@ -468,7 +468,6 @@ mod test {
&[GrantedRegion {
region_id: region_id.as_u64(),
role: api::v1::meta::RegionRole::Leader.into(),
extensions: HashMap::new(),
}],
Instant::now() + Duration::from_millis(200),
)

View File

@@ -36,7 +36,7 @@ use servers::Mode;
pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5);
/// Default data home in file storage
const DEFAULT_DATA_HOME: &str = "/tmp/greptimedb";
const DEFAULT_DATA_HOME: &str = "./greptimedb_data";
/// Object storage config
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]

View File

@@ -25,6 +25,6 @@ pub mod heartbeat;
pub mod metrics;
pub mod region_server;
pub mod service;
mod store;
pub mod store;
#[cfg(any(test, feature = "testing"))]
pub mod tests;

View File

@@ -15,7 +15,7 @@
//! object storage utilities
mod azblob;
mod fs;
pub mod fs;
mod gcs;
mod oss;
mod s3;

View File

@@ -24,7 +24,8 @@ use crate::config::FileConfig;
use crate::error::{self, Result};
use crate::store;
pub(crate) async fn new_fs_object_store(
/// A helper function to create a file system object store.
pub async fn new_fs_object_store(
data_home: &str,
_file_config: &FileConfig,
) -> Result<ObjectStore> {

View File

@@ -285,6 +285,20 @@ impl Value {
}
}
/// Cast Value to i64. Return None if value is not a valid int64 data type.
pub fn as_i64(&self) -> Option<i64> {
match self {
Value::Int8(v) => Some(*v as _),
Value::Int16(v) => Some(*v as _),
Value::Int32(v) => Some(*v as _),
Value::Int64(v) => Some(*v),
Value::UInt8(v) => Some(*v as _),
Value::UInt16(v) => Some(*v as _),
Value::UInt32(v) => Some(*v as _),
_ => None,
}
}
/// Cast Value to u64. Return None if value is not a valid uint64 data type.
pub fn as_u64(&self) -> Option<u64> {
match self {
@@ -295,7 +309,6 @@ impl Value {
_ => None,
}
}
/// Cast Value to f64. Return None if it's not castable;
pub fn as_f64_lossy(&self) -> Option<f64> {
match self {

View File

@@ -20,7 +20,7 @@ use api::v1::{ColumnDataType, ColumnDataTypeExtension, CreateTableExpr, Semantic
use common_error::ext::BoxedError;
use common_meta::key::table_info::TableInfoValue;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema};
use datatypes::schema::ColumnSchema;
use itertools::Itertools;
use operator::expr_helper;
use session::context::QueryContextBuilder;
@@ -174,15 +174,7 @@ pub fn table_info_value_to_relation_desc(
let default_values = raw_schema
.column_schemas
.iter()
.map(|c| {
c.default_constraint().cloned().or_else(|| {
if c.is_nullable() {
Some(ColumnDefaultConstraint::null_value())
} else {
None
}
})
})
.map(|c| c.default_constraint().cloned())
.collect_vec();
Ok(TableDesc::new(relation_desc, default_values))

View File

@@ -151,12 +151,12 @@ impl ScalarExpr {
/// apply optimization to the expression, like flatten variadic function
pub fn optimize(&mut self) {
self.flatten_variadic_fn();
self.flatten_varidic_fn();
}
/// Because Substrait's `And`/`Or` function is binary, but FlowPlan's
/// `And`/`Or` function is variadic, we need to flatten the `And` function if multiple `And`/`Or` functions are nested.
fn flatten_variadic_fn(&mut self) {
fn flatten_varidic_fn(&mut self) {
if let ScalarExpr::CallVariadic { func, exprs } = self {
let mut new_exprs = vec![];
for expr in std::mem::take(exprs) {
@@ -167,7 +167,7 @@ impl ScalarExpr {
{
if *func == inner_func {
for inner_expr in inner_exprs.iter_mut() {
inner_expr.flatten_variadic_fn();
inner_expr.flatten_varidic_fn();
}
new_exprs.extend(inner_exprs);
}

View File

@@ -145,18 +145,14 @@ impl Instance {
.context(error::OpenRaftEngineBackendSnafu)?;
let kv_backend = Arc::new(kv_backend);
let kv_state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
let manager_config = ManagerConfig {
max_retry_times: procedure_config.max_retry_times,
retry_delay: procedure_config.retry_delay,
..Default::default()
};
let procedure_manager = Arc::new(LocalManager::new(
manager_config,
kv_state_store.clone(),
kv_state_store,
));
let procedure_manager = Arc::new(LocalManager::new(manager_config, state_store));
Ok((kv_backend, procedure_manager))
}

View File

@@ -28,14 +28,14 @@ use common_recordbatch::adapter::RecordBatchStreamAdapter;
use datafusion::dataframe::DataFrame;
use datafusion::execution::context::SessionContext;
use datafusion::execution::SessionStateBuilder;
use datafusion_expr::{col, lit, lit_timestamp_nano, Expr};
use datafusion_expr::{col, lit, lit_timestamp_nano, wildcard, Expr};
use query::QueryEngineRef;
use serde_json::Value as JsonValue;
use servers::error::{
CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult,
TableNotFoundSnafu,
};
use servers::http::jaeger::{QueryTraceParams, FIND_TRACES_COLS};
use servers::http::jaeger::{QueryTraceParams, JAEGER_QUERY_TABLE_NAME_KEY};
use servers::otlp::trace::{
DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_KIND_COLUMN,
SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN, TRACE_TABLE_NAME,
@@ -43,6 +43,7 @@ use servers::otlp::trace::{
use servers::query_handler::JaegerQueryHandler;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use table::requests::{TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1};
use table::table::adapter::DfTableProviderAdapter;
use super::Instance;
@@ -82,7 +83,19 @@ impl JaegerQueryHandler for Instance {
))));
}
// It's equivalent to `SELECT span_name, span_kind FROM {db}.{trace_table} WHERE service_name = '{service_name}'`.
// It's equivalent to
//
// ```
// SELECT
// span_name,
// span_kind
// FROM
// {db}.{trace_table}
// WHERE
// service_name = '{service_name}'
// ORDER BY
// timestamp
// ```.
Ok(query_trace_table(
ctx,
self.catalog_manager(),
@@ -101,9 +114,19 @@ impl JaegerQueryHandler for Instance {
}
async fn get_trace(&self, ctx: QueryContextRef, trace_id: &str) -> ServerResult<Output> {
// It's equivalent to `SELECT trace_id, timestamp, duration_nano, service_name, span_name, span_id, span_attributes, resource_attributes, parent_span_id
// FROM {db}.{trace_table} WHERE trace_id = '{trace_id}'`.
let selects: Vec<Expr> = FIND_TRACES_COLS.clone();
// It's equivalent to
//
// ```
// SELECT
// *
// FROM
// {db}.{trace_table}
// WHERE
// trace_id = '{trace_id}'
// ORDER BY
// timestamp
// ```.
let selects = vec![wildcard()];
let filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
@@ -125,7 +148,7 @@ impl JaegerQueryHandler for Instance {
ctx: QueryContextRef,
query_params: QueryTraceParams,
) -> ServerResult<Output> {
let selects: Vec<Expr> = FIND_TRACES_COLS.clone();
let selects = vec![wildcard()];
let mut filters = vec![];
@@ -174,17 +197,34 @@ async fn query_trace_table(
tags: Option<HashMap<String, JsonValue>>,
distinct: bool,
) -> ServerResult<Output> {
let db = ctx.get_db_string();
let table_name = ctx
.extension(JAEGER_QUERY_TABLE_NAME_KEY)
.unwrap_or(TRACE_TABLE_NAME);
let table = catalog_manager
.table(ctx.current_catalog(), &db, TRACE_TABLE_NAME, Some(&ctx))
.table(
ctx.current_catalog(),
&ctx.current_schema(),
table_name,
Some(&ctx),
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table: TRACE_TABLE_NAME,
table: table_name,
catalog: ctx.current_catalog(),
schema: db,
schema: ctx.current_schema(),
})?;
let is_data_model_v1 = table
.table_info()
.meta
.options
.extra_options
.get(TABLE_DATA_MODEL)
.map(|s| s.as_str())
== Some(TABLE_DATA_MODEL_TRACE_V1);
let df_context = create_df_context(query_engine, ctx.clone())?;
let dataframe = df_context
@@ -196,7 +236,9 @@ async fn query_trace_table(
// Apply all filters.
let dataframe = filters
.into_iter()
.chain(tags.map_or(Ok(vec![]), |t| tags_filters(&dataframe, t))?)
.chain(tags.map_or(Ok(vec![]), |t| {
tags_filters(&dataframe, t, is_data_model_v1)
})?)
.try_fold(dataframe, |df, expr| {
df.filter(expr).context(DataFusionSnafu)
})?;
@@ -205,7 +247,10 @@ async fn query_trace_table(
let dataframe = if distinct {
dataframe.distinct().context(DataFusionSnafu)?
} else {
// for non distinct query, sort by timestamp to make results stable
dataframe
.sort_by(vec![col(TIMESTAMP_COLUMN)])
.context(DataFusionSnafu)?
};
// Apply the limit if needed.
@@ -237,7 +282,7 @@ fn create_df_context(
SessionStateBuilder::new_from_existing(query_engine.engine_state().session_state()).build(),
);
// The following JSON UDFs will be used for tags filters.
// The following JSON UDFs will be used for tags filters on v0 data model.
let udfs: Vec<FunctionRef> = vec![
Arc::new(JsonGetInt),
Arc::new(JsonGetFloat),
@@ -256,7 +301,7 @@ fn create_df_context(
Ok(df_context)
}
fn tags_filters(
fn json_tag_filters(
dataframe: &DataFrame,
tags: HashMap<String, JsonValue>,
) -> ServerResult<Vec<Expr>> {
@@ -322,3 +367,41 @@ fn tags_filters(
Ok(filters)
}
fn flatten_tag_filters(tags: HashMap<String, JsonValue>) -> ServerResult<Vec<Expr>> {
let filters = tags
.into_iter()
.filter_map(|(key, value)| {
let key = format!("\"span_attributes.{}\"", key);
match value {
JsonValue::String(value) => Some(col(key).eq(lit(value))),
JsonValue::Number(value) => {
if value.is_f64() {
// safe to unwrap as checked previously
Some(col(key).eq(lit(value.as_f64().unwrap())))
} else {
Some(col(key).eq(lit(value.as_i64().unwrap())))
}
}
JsonValue::Bool(value) => Some(col(key).eq(lit(value))),
JsonValue::Null => Some(col(key).is_null()),
// not supported at the moment
JsonValue::Array(_value) => None,
JsonValue::Object(_value) => None,
}
})
.collect();
Ok(filters)
}
fn tags_filters(
dataframe: &DataFrame,
tags: HashMap<String, JsonValue>,
is_data_model_v1: bool,
) -> ServerResult<Vec<Expr>> {
if is_data_model_v1 {
flatten_tag_filters(tags)
} else {
json_tag_filters(dataframe, tags)
}
}

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::Deref;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use client::Output;
use common_error::ext::BoxedError;
@@ -20,7 +22,7 @@ use server_error::Result as ServerResult;
use servers::error::{self as server_error, AuthSnafu, ExecuteQuerySnafu};
use servers::interceptor::{LogQueryInterceptor, LogQueryInterceptorRef};
use servers::query_handler::LogQueryHandler;
use session::context::QueryContextRef;
use session::context::{QueryContext, QueryContextRef};
use snafu::ResultExt;
use tonic::async_trait;
@@ -64,4 +66,8 @@ impl LogQueryHandler for Instance {
Ok(interceptor.as_ref().post_query(output, ctx.clone())?)
}
fn catalog_manager(&self, _ctx: &QueryContext) -> ServerResult<&dyn catalog::CatalogManager> {
Ok(self.catalog_manager.deref())
}
}

View File

@@ -90,6 +90,8 @@ impl OpenTelemetryProtocolHandler for Instance {
.get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_execute(ctx.clone())?;
let is_trace_v1_model = matches!(pipeline, PipelineWay::OtlpTraceDirectV1);
let (requests, rows) = otlp::trace::to_grpc_insert_requests(
request,
pipeline,
@@ -101,10 +103,17 @@ impl OpenTelemetryProtocolHandler for Instance {
OTLP_TRACES_ROWS.inc_by(rows as u64);
self.handle_trace_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)
if is_trace_v1_model {
self.handle_trace_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)
} else {
self.handle_log_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)
}
}
#[tracing::instrument(skip_all)]

View File

@@ -284,7 +284,7 @@ impl ClusterInfo for MetaClient {
followers
.into_iter()
.map(|node| NodeInfo {
peer: node.peer.map(|p| p.into()).unwrap_or_default(),
peer: node.peer.unwrap_or_default(),
last_activity_ts,
status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
version: node.version,
@@ -292,7 +292,7 @@ impl ClusterInfo for MetaClient {
start_time_ms: node.start_time_ms,
})
.chain(leader.into_iter().map(|node| NodeInfo {
peer: node.peer.map(|p| p.into()).unwrap_or_default(),
peer: node.peer.unwrap_or_default(),
last_activity_ts,
status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
version: node.version,

View File

@@ -6,7 +6,8 @@ license.workspace = true
[features]
mock = []
pg_kvbackend = ["dep:tokio-postgres", "common-meta/pg_kvbackend"]
pg_kvbackend = ["dep:tokio-postgres", "common-meta/pg_kvbackend", "dep:deadpool-postgres", "dep:deadpool"]
mysql_kvbackend = ["dep:sqlx", "common-meta/mysql_kvbackend"]
[lints]
workspace = true
@@ -38,8 +39,8 @@ common-version.workspace = true
common-wal.workspace = true
dashmap.workspace = true
datatypes.workspace = true
deadpool.workspace = true
deadpool-postgres.workspace = true
deadpool = { workspace = true, optional = true }
deadpool-postgres = { workspace = true, optional = true }
derive_builder.workspace = true
etcd-client.workspace = true
futures.workspace = true
@@ -60,6 +61,7 @@ serde.workspace = true
serde_json.workspace = true
servers.workspace = true
snafu.workspace = true
sqlx = { workspace = true, optional = true }
store-api.workspace = true
strum.workspace = true
table.workspace = true

View File

@@ -23,6 +23,8 @@ use common_config::Configurable;
use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::memory::MemoryKvBackend;
#[cfg(feature = "mysql_kvbackend")]
use common_meta::kv_backend::rds::MySqlStore;
#[cfg(feature = "pg_kvbackend")]
use common_meta::kv_backend::rds::PgStore;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
@@ -38,9 +40,15 @@ use servers::export_metrics::ExportMetricsTask;
use servers::http::{HttpServer, HttpServerBuilder};
use servers::metrics_handler::MetricsHandler;
use servers::server::Server;
#[cfg(feature = "pg_kvbackend")]
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
use snafu::OptionExt;
use snafu::ResultExt;
#[cfg(feature = "mysql_kvbackend")]
use sqlx::mysql::MySqlConnectOptions;
#[cfg(feature = "mysql_kvbackend")]
use sqlx::mysql::{MySqlConnection, MySqlPool};
#[cfg(feature = "mysql_kvbackend")]
use sqlx::Connection;
use tokio::net::TcpListener;
use tokio::sync::mpsc::{self, Receiver, Sender};
#[cfg(feature = "pg_kvbackend")]
@@ -49,9 +57,11 @@ use tonic::codec::CompressionEncoding;
use tonic::transport::server::{Router, TcpIncoming};
use crate::election::etcd::EtcdElection;
#[cfg(feature = "mysql_kvbackend")]
use crate::election::mysql::MySqlElection;
#[cfg(feature = "pg_kvbackend")]
use crate::election::postgres::PgElection;
#[cfg(feature = "pg_kvbackend")]
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
use crate::election::CANDIDATE_LEASE_SECS;
use crate::metasrv::builder::MetasrvBuilder;
use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectorRef};
@@ -229,7 +239,6 @@ pub async fn metasrv_builder(
#[cfg(feature = "pg_kvbackend")]
(None, BackendImpl::PostgresStore) => {
let pool = create_postgres_pool(opts).await?;
// TODO(CookiePie): use table name from config.
let kv_backend = PgStore::with_pg_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
.await
.context(error::KvBackendSnafu)?;
@@ -246,6 +255,26 @@ pub async fn metasrv_builder(
.await?;
(kv_backend, Some(election))
}
#[cfg(feature = "mysql_kvbackend")]
(None, BackendImpl::MysqlStore) => {
let pool = create_mysql_pool(opts).await?;
let kv_backend =
MySqlStore::with_mysql_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
.await
.context(error::KvBackendSnafu)?;
// Since election will acquire a lock of the table, we need a separate table for election.
let election_table_name = opts.meta_table_name.clone() + "_election";
let election_client = create_mysql_client(opts).await?;
let election = MySqlElection::with_mysql_client(
opts.server_addr.clone(),
election_client,
opts.store_key_prefix.clone(),
CANDIDATE_LEASE_SECS,
&election_table_name,
)
.await?;
(kv_backend, Some(election))
}
};
if !opts.store_key_prefix.is_empty() {
@@ -323,3 +352,41 @@ async fn create_postgres_pool(opts: &MetasrvOptions) -> Result<deadpool_postgres
.context(error::CreatePostgresPoolSnafu)?;
Ok(pool)
}
#[cfg(feature = "mysql_kvbackend")]
async fn setup_mysql_options(opts: &MetasrvOptions) -> Result<MySqlConnectOptions> {
let mysql_url = opts
.store_addrs
.first()
.context(error::InvalidArgumentsSnafu {
err_msg: "empty store addrs",
})?;
// Avoid `SET` commands in sqlx
let opts: MySqlConnectOptions = mysql_url
.parse()
.context(error::ParseMySqlUrlSnafu { mysql_url })?;
let opts = opts
.no_engine_substitution(false)
.pipes_as_concat(false)
.timezone(None)
.set_names(false);
Ok(opts)
}
#[cfg(feature = "mysql_kvbackend")]
async fn create_mysql_pool(opts: &MetasrvOptions) -> Result<MySqlPool> {
let opts = setup_mysql_options(opts).await?;
let pool = MySqlPool::connect_with(opts)
.await
.context(error::CreateMySqlPoolSnafu)?;
Ok(pool)
}
#[cfg(feature = "mysql_kvbackend")]
async fn create_mysql_client(opts: &MetasrvOptions) -> Result<MySqlConnection> {
let opts = setup_mysql_options(opts).await?;
let client = MySqlConnection::connect_with(&opts)
.await
.context(error::ConnectMySqlSnafu)?;
Ok(client)
}

View File

@@ -13,6 +13,8 @@
// limitations under the License.
pub mod etcd;
#[cfg(feature = "mysql_kvbackend")]
pub mod mysql;
#[cfg(feature = "pg_kvbackend")]
pub mod postgres;

View File

@@ -0,0 +1,800 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS};
use common_telemetry::{error, warn};
use common_time::Timestamp;
use itertools::Itertools;
use snafu::{ensure, OptionExt, ResultExt};
use sqlx::mysql::{MySqlArguments, MySqlRow};
use sqlx::query::Query;
use sqlx::{MySql, MySqlConnection, MySqlTransaction, Row};
use tokio::sync::{broadcast, Mutex, MutexGuard};
use tokio::time::{Interval, MissedTickBehavior};
use crate::election::{
listen_leader_change, Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT, ELECTION_KEY,
};
use crate::error::{
DeserializeFromJsonSnafu, MySqlExecutionSnafu, NoLeaderSnafu, Result, SerializeToJsonSnafu,
UnexpectedSnafu,
};
use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
// Separator between value and expire time.
const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#;
/// Lease information.
/// TODO(CookiePie): PgElection can also use this struct. Refactor it to a common module.
#[derive(Default, Clone)]
struct Lease {
leader_value: String,
expire_time: Timestamp,
current: Timestamp,
// origin is the origin value of the lease, used for CAS.
origin: String,
}
struct ElectionSqlFactory<'a> {
table_name: &'a str,
}
struct ElectionSqlSet {
campaign: String,
// SQL to put a value with expire time.
//
// Parameters for the query:
// `$1`: key,
// `$2`: value,
// `$3`: lease time in seconds
//
// Returns:
// If the key already exists, return the previous value.
put_value_with_lease: String,
// SQL to update a value with expire time.
//
// Parameters for the query:
// `$1`: updated value,
// `$2`: lease time in seconds
// `$3`: key,
// `$4`: previous value,
update_value_with_lease: String,
// SQL to get a value with expire time.
//
// Parameters:
// `$1`: key
get_value_with_lease: String,
// SQL to get all values with expire time with the given key prefix.
//
// Parameters:
// `$1`: key prefix like 'prefix%'
//
// Returns:
// column 0: value,
// column 1: current timestamp
get_value_with_lease_by_prefix: String,
// SQL to delete a value.
//
// Parameters:
// `?`: key
//
// Returns:
// Rows affected
delete_value: String,
}
impl<'a> ElectionSqlFactory<'a> {
fn new(table_name: &'a str) -> Self {
Self { table_name }
}
fn build(self) -> ElectionSqlSet {
ElectionSqlSet {
campaign: self.campaign_sql(),
put_value_with_lease: self.put_value_with_lease_sql(),
update_value_with_lease: self.update_value_with_lease_sql(),
get_value_with_lease: self.get_value_with_lease_sql(),
get_value_with_lease_by_prefix: self.get_value_with_lease_by_prefix_sql(),
delete_value: self.delete_value_sql(),
}
}
// Currently the session timeout is longer than the leader lease time.
// So the leader will renew the lease twice before the session timeout if everything goes well.
fn set_idle_session_timeout_sql(&self) -> String {
format!("SET SESSION wait_timeout = {};", META_LEASE_SECS + 1)
}
fn set_lock_wait_timeout_sql(&self) -> &str {
"SET SESSION innodb_lock_wait_timeout = 1;"
}
fn create_table_sql(&self) -> String {
format!(
r#"
CREATE TABLE IF NOT EXISTS `{}` (
k VARBINARY(3072) PRIMARY KEY,
v BLOB
);
"#,
self.table_name
)
}
fn insert_once(&self) -> String {
format!(
"INSERT IGNORE INTO `{}` (k, v) VALUES ('__place_holder_for_lock', '');",
self.table_name
)
}
fn check_version(&self) -> &str {
"SELECT @@version;"
}
fn campaign_sql(&self) -> String {
format!("SELECT * FROM `{}` FOR UPDATE;", self.table_name)
}
fn put_value_with_lease_sql(&self) -> String {
format!(
r#"
INSERT INTO `{}` (k, v) VALUES (
?,
CONCAT(
?,
'{}',
DATE_FORMAT(DATE_ADD(NOW(4), INTERVAL ? SECOND), '%Y-%m-%d %T.%f')
)
)
ON DUPLICATE KEY UPDATE v = VALUES(v);
"#,
self.table_name, LEASE_SEP
)
}
fn update_value_with_lease_sql(&self) -> String {
format!(
r#"UPDATE `{}`
SET v = CONCAT(?, '{}', DATE_FORMAT(DATE_ADD(NOW(4), INTERVAL ? SECOND), '%Y-%m-%d %T.%f'))
WHERE k = ? AND v = ?"#,
self.table_name, LEASE_SEP
)
}
fn get_value_with_lease_sql(&self) -> String {
format!(
r#"SELECT v, DATE_FORMAT(NOW(4), '%Y-%m-%d %T.%f') FROM `{}` WHERE k = ?"#,
self.table_name
)
}
fn get_value_with_lease_by_prefix_sql(&self) -> String {
format!(
r#"SELECT v, DATE_FORMAT(NOW(4), '%Y-%m-%d %T.%f') FROM `{}` WHERE k LIKE ?"#,
self.table_name
)
}
fn delete_value_sql(&self) -> String {
format!("DELETE FROM {} WHERE k = ?;", self.table_name)
}
}
/// Parse the value and expire time from the given string. The value should be in the format "value || LEASE_SEP || expire_time".
fn parse_value_and_expire_time(value: &str) -> Result<(String, Timestamp)> {
let (value, expire_time) =
value
.split(LEASE_SEP)
.collect_tuple()
.with_context(|| UnexpectedSnafu {
violated: format!(
"Invalid value {}, expect node info || {} || expire time",
value, LEASE_SEP
),
})?;
// Given expire_time is in the format 'YYYY-MM-DD HH24:MI:SS.MS'
let expire_time = match Timestamp::from_str(expire_time, None) {
Ok(ts) => ts,
Err(_) => UnexpectedSnafu {
violated: format!("Invalid timestamp: {}", expire_time),
}
.fail()?,
};
Ok((value.to_string(), expire_time))
}
#[derive(Debug, Clone, Default)]
struct MySqlLeaderKey {
name: Vec<u8>,
key: Vec<u8>,
rev: i64,
lease: i64,
}
impl LeaderKey for MySqlLeaderKey {
fn name(&self) -> &[u8] {
&self.name
}
fn key(&self) -> &[u8] {
&self.key
}
fn revision(&self) -> i64 {
self.rev
}
fn lease_id(&self) -> i64 {
self.lease
}
}
enum Executor<'a> {
Default(MutexGuard<'a, MySqlConnection>),
Txn(MySqlTransaction<'a>),
}
impl Executor<'_> {
async fn query(
&mut self,
query: Query<'_, MySql, MySqlArguments>,
sql: &str,
) -> Result<Vec<MySqlRow>> {
match self {
Executor::Default(client) => {
let res = query
.fetch_all(&mut **client)
.await
.context(MySqlExecutionSnafu { sql })?;
Ok(res)
}
Executor::Txn(txn) => {
let res = query
.fetch_all(&mut **txn)
.await
.context(MySqlExecutionSnafu { sql })?;
Ok(res)
}
}
}
async fn execute(&mut self, query: Query<'_, MySql, MySqlArguments>, sql: &str) -> Result<u64> {
match self {
Executor::Default(client) => {
let res = query
.execute(&mut **client)
.await
.context(MySqlExecutionSnafu { sql })?;
Ok(res.rows_affected())
}
Executor::Txn(txn) => {
let res = query
.execute(&mut **txn)
.await
.context(MySqlExecutionSnafu { sql })?;
Ok(res.rows_affected())
}
}
}
async fn commit(self) -> Result<()> {
match self {
Executor::Txn(txn) => {
txn.commit()
.await
.context(MySqlExecutionSnafu { sql: "COMMIT" })?;
Ok(())
}
_ => Ok(()),
}
}
}
/// MySQL implementation of Election.
pub struct MySqlElection {
leader_value: String,
client: Mutex<MySqlConnection>,
is_leader: AtomicBool,
leader_infancy: AtomicBool,
leader_watcher: broadcast::Sender<LeaderChangeMessage>,
store_key_prefix: String,
candidate_lease_ttl_secs: u64,
sql_set: ElectionSqlSet,
}
impl MySqlElection {
pub async fn with_mysql_client(
leader_value: String,
mut client: sqlx::MySqlConnection,
store_key_prefix: String,
candidate_lease_ttl_secs: u64,
table_name: &str,
) -> Result<ElectionRef> {
let sql_factory = ElectionSqlFactory::new(table_name);
sqlx::query(&sql_factory.create_table_sql())
.execute(&mut client)
.await
.context(MySqlExecutionSnafu {
sql: &sql_factory.create_table_sql(),
})?;
// Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead lock.
sqlx::query(&sql_factory.set_idle_session_timeout_sql())
.execute(&mut client)
.await
.context(MySqlExecutionSnafu {
sql: &sql_factory.set_idle_session_timeout_sql(),
})?;
// Set lock wait timeout to LOCK_WAIT_TIMEOUT to avoid waiting too long.
sqlx::query(sql_factory.set_lock_wait_timeout_sql())
.execute(&mut client)
.await
.context(MySqlExecutionSnafu {
sql: sql_factory.set_lock_wait_timeout_sql(),
})?;
// Insert at least one row for `SELECT * FOR UPDATE` to work.
sqlx::query(&sql_factory.insert_once())
.execute(&mut client)
.await
.context(MySqlExecutionSnafu {
sql: &sql_factory.insert_once(),
})?;
// Check MySQL version
Self::check_version(&mut client, sql_factory.check_version()).await?;
let tx = listen_leader_change(leader_value.clone());
Ok(Arc::new(Self {
leader_value,
client: Mutex::new(client),
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(false),
leader_watcher: tx,
store_key_prefix,
candidate_lease_ttl_secs,
sql_set: sql_factory.build(),
}))
}
fn election_key(&self) -> String {
format!("{}{}", self.store_key_prefix, ELECTION_KEY)
}
fn candidate_root(&self) -> String {
format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
}
fn candidate_key(&self) -> String {
format!("{}{}", self.candidate_root(), self.leader_value)
}
}
#[async_trait::async_trait]
impl Election for MySqlElection {
type Leader = LeaderValue;
fn is_leader(&self) -> bool {
self.is_leader.load(Ordering::Relaxed)
}
fn in_leader_infancy(&self) -> bool {
self.leader_infancy
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
}
async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
let key = self.candidate_key();
let node_info =
serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu {
input: format!("{node_info:?}"),
})?;
{
let client = self.client.lock().await;
let mut executor = Executor::Default(client);
let res = self
.put_value_with_lease(
&key,
&node_info,
self.candidate_lease_ttl_secs,
&mut executor,
)
.await?;
// May registered before, just update the lease.
if !res {
warn!("Candidate already registered, update the lease");
self.delete_value(&key, &mut executor).await?;
self.put_value_with_lease(
&key,
&node_info,
self.candidate_lease_ttl_secs,
&mut executor,
)
.await?;
}
}
// Check if the current lease has expired and renew the lease.
let mut keep_alive_interval =
tokio::time::interval(Duration::from_secs(self.candidate_lease_ttl_secs / 2));
loop {
let _ = keep_alive_interval.tick().await;
let client = self.client.lock().await;
let mut executor = Executor::Default(client);
let lease = self
.get_value_with_lease(&key, &mut executor)
.await?
.unwrap_or_default();
ensure!(
lease.expire_time > lease.current,
UnexpectedSnafu {
violated: format!(
"Candidate lease expired at {:?} (current time: {:?}), key: {:?}",
lease.expire_time,
lease.current,
String::from_utf8_lossy(&key.into_bytes())
),
}
);
self.update_value_with_lease(&key, &lease.origin, &node_info, &mut executor)
.await?;
std::mem::drop(executor);
}
}
async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
let key_prefix = self.candidate_root();
let client = self.client.lock().await;
let mut executor = Executor::Default(client);
let (mut candidates, current) = self
.get_value_with_lease_by_prefix(&key_prefix, &mut executor)
.await?;
// Remove expired candidates
candidates.retain(|c| c.1 > current);
let mut valid_candidates = Vec::with_capacity(candidates.len());
for (c, _) in candidates {
let node_info: MetasrvNodeInfo =
serde_json::from_str(&c).with_context(|_| DeserializeFromJsonSnafu {
input: format!("{:?}", c),
})?;
valid_candidates.push(node_info);
}
Ok(valid_candidates)
}
async fn campaign(&self) -> Result<()> {
let mut keep_alive_interval =
tokio::time::interval(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS));
keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
let _ = self.do_campaign(&mut keep_alive_interval).await;
}
}
async fn leader(&self) -> Result<Self::Leader> {
if self.is_leader.load(Ordering::Relaxed) {
Ok(self.leader_value.as_bytes().into())
} else {
let key = self.election_key();
let client = self.client.lock().await;
let mut executor = Executor::Default(client);
if let Some(lease) = self.get_value_with_lease(&key, &mut executor).await? {
ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
Ok(lease.leader_value.as_bytes().into())
} else {
NoLeaderSnafu.fail()
}
}
}
async fn resign(&self) -> Result<()> {
todo!()
}
fn subscribe_leader_change(&self) -> broadcast::Receiver<LeaderChangeMessage> {
self.leader_watcher.subscribe()
}
}
impl MySqlElection {
/// Returns value, expire time and current time. If `with_origin` is true, the origin string is also returned.
async fn get_value_with_lease(
&self,
key: &str,
executor: &mut Executor<'_>,
) -> Result<Option<Lease>> {
let key = key.as_bytes();
let query = sqlx::query(&self.sql_set.get_value_with_lease).bind(key);
let res = executor
.query(query, &self.sql_set.get_value_with_lease)
.await?;
if res.is_empty() {
return Ok(None);
}
// Safety: Checked if res is empty above.
let current_time_str = String::from_utf8_lossy(res[0].try_get(1).unwrap());
let current_time = match Timestamp::from_str(&current_time_str, None) {
Ok(ts) => ts,
Err(_) => UnexpectedSnafu {
violated: format!("Invalid timestamp: {}", current_time_str),
}
.fail()?,
};
// Safety: Checked if res is empty above.
let value_and_expire_time = String::from_utf8_lossy(res[0].try_get(0).unwrap_or_default());
let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
Ok(Some(Lease {
leader_value: value,
expire_time,
current: current_time,
origin: value_and_expire_time.to_string(),
}))
}
/// Returns all values and expire time with the given key prefix. Also returns the current time.
async fn get_value_with_lease_by_prefix(
&self,
key_prefix: &str,
executor: &mut Executor<'_>,
) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
let query = sqlx::query(&self.sql_set.get_value_with_lease_by_prefix).bind(key_prefix);
let res = executor
.query(query, &self.sql_set.get_value_with_lease_by_prefix)
.await?;
let mut values_with_leases = vec![];
let mut current = Timestamp::default();
for row in res {
let current_time_str = row.try_get(1).unwrap_or_default();
current = match Timestamp::from_str(current_time_str, None) {
Ok(ts) => ts,
Err(_) => UnexpectedSnafu {
violated: format!("Invalid timestamp: {}", current_time_str),
}
.fail()?,
};
let value_and_expire_time = String::from_utf8_lossy(row.try_get(0).unwrap_or_default());
let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
values_with_leases.push((value, expire_time));
}
Ok((values_with_leases, current))
}
async fn update_value_with_lease(
&self,
key: &str,
prev: &str,
updated: &str,
executor: &mut Executor<'_>,
) -> Result<()> {
let key = key.as_bytes();
let prev = prev.as_bytes();
let updated = updated.as_bytes();
let query = sqlx::query(&self.sql_set.update_value_with_lease)
.bind(updated)
.bind(self.candidate_lease_ttl_secs as f64)
.bind(key)
.bind(prev);
let res = executor
.execute(query, &self.sql_set.update_value_with_lease)
.await?;
ensure!(
res == 1,
UnexpectedSnafu {
violated: format!("Failed to update key: {}", String::from_utf8_lossy(key)),
}
);
Ok(())
}
/// Returns `true` if the insertion is successful
async fn put_value_with_lease(
&self,
key: &str,
value: &str,
lease_ttl_secs: u64,
executor: &mut Executor<'_>,
) -> Result<bool> {
let key = key.as_bytes();
let lease_ttl_secs = lease_ttl_secs as f64;
let query = sqlx::query(&self.sql_set.put_value_with_lease)
.bind(key)
.bind(value)
.bind(lease_ttl_secs);
let res = executor
.query(query, &self.sql_set.put_value_with_lease)
.await?;
Ok(res.is_empty())
}
/// Returns `true` if the deletion is successful.
/// Caution: Should only delete the key if the lease is expired.
async fn delete_value(&self, key: &str, executor: &mut Executor<'_>) -> Result<bool> {
let key = key.as_bytes();
let query = sqlx::query(&self.sql_set.delete_value).bind(key);
let res = executor.execute(query, &self.sql_set.delete_value).await?;
Ok(res == 1)
}
/// Attempts to acquire leadership by executing a campaign. This function continuously checks
/// if the current lease is still valid.
async fn do_campaign(&self, interval: &mut Interval) -> Result<()> {
// Need to restrict the scope of the client to avoid ambiguous overloads.
use sqlx::Acquire;
loop {
let client = self.client.lock().await;
let executor = Executor::Default(client);
let mut lease = Lease::default();
match (
self.lease_check(executor, &mut lease).await,
self.is_leader(),
) {
// If the leader lease is valid and I'm the leader, renew the lease.
(Ok(_), true) => {
let mut client = self.client.lock().await;
let txn = client
.begin()
.await
.context(MySqlExecutionSnafu { sql: "BEGIN" })?;
let mut executor = Executor::Txn(txn);
let query = sqlx::query(&self.sql_set.campaign);
executor.query(query, &self.sql_set.campaign).await?;
self.renew_lease(executor, lease).await?;
}
// If the leader lease expires and I'm the leader, notify the leader watcher and step down.
// Another instance should be elected as the leader in this case.
(Err(_), true) => {
warn!("Leader lease expired, re-initiate the campaign");
self.step_down_without_lock().await?;
}
// If the leader lease expires and I'm not the leader, elect myself.
(Err(_), false) => {
warn!("Leader lease expired, re-initiate the campaign");
let mut client = self.client.lock().await;
let txn = client
.begin()
.await
.context(MySqlExecutionSnafu { sql: "BEGIN" })?;
let mut executor = Executor::Txn(txn);
let query = sqlx::query(&self.sql_set.campaign);
executor.query(query, &self.sql_set.campaign).await?;
self.elected(&mut executor).await?;
executor.commit().await?;
}
// If the leader lease is valid and I'm not the leader, do nothing.
(Ok(_), false) => {}
}
interval.tick().await;
}
}
/// Renew the lease
async fn renew_lease(&self, mut executor: Executor<'_>, lease: Lease) -> Result<()> {
let key = self.election_key();
self.update_value_with_lease(&key, &lease.origin, &self.leader_value, &mut executor)
.await?;
executor.commit().await?;
Ok(())
}
/// Performs a lease check during the election process.
///
/// This function performs the following checks and actions:
///
/// - **Case 1**: If the current instance is not the leader but the lease has expired, it raises an error
/// to re-initiate the campaign. If the leader failed to renew the lease, its session will expire and the lock
/// will be released.
/// - **Case 2**: If all checks pass, the function returns without performing any actions.
async fn lease_check(&self, mut executor: Executor<'_>, lease: &mut Lease) -> Result<()> {
let key = self.election_key();
let check_lease = self
.get_value_with_lease(&key, &mut executor)
.await?
.context(NoLeaderSnafu)?;
*lease = check_lease;
// Case 1: Lease expired
ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
// Case 2: Everything is fine
Ok(())
}
/// Still consider itself as the leader locally but failed to acquire the lock. Step down without deleting the key.
async fn step_down_without_lock(&self) -> Result<()> {
let key = self.election_key().into_bytes();
let leader_key = MySqlLeaderKey {
name: self.leader_value.clone().into_bytes(),
key: key.clone(),
..Default::default()
};
if self
.is_leader
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::StepDown(Arc::new(leader_key)))
{
error!(e; "Failed to send leader change message");
}
}
Ok(())
}
/// Elected as leader. The leader should put the key and notify the leader watcher.
/// Caution: Should only elected while holding the lock.
async fn elected(&self, executor: &mut Executor<'_>) -> Result<()> {
let key = self.election_key();
let leader_key = MySqlLeaderKey {
name: self.leader_value.clone().into_bytes(),
key: key.clone().into_bytes(),
..Default::default()
};
self.delete_value(&key, executor).await?;
self.put_value_with_lease(&key, &self.leader_value, META_LEASE_SECS, executor)
.await?;
if self
.is_leader
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
self.leader_infancy.store(true, Ordering::Relaxed);
if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::Elected(Arc::new(leader_key)))
{
error!(e; "Failed to send leader change message");
}
}
Ok(())
}
/// Check if the MySQL version is supported.
async fn check_version(client: &mut MySqlConnection, sql: &str) -> Result<()> {
let query = sqlx::query(sql);
match query.fetch_one(client).await {
Ok(row) => {
let version: String = row.try_get(0).unwrap();
if !version.starts_with("8.0") || !version.starts_with("5.7") {
warn!(
"Unsupported MySQL version: {}, expected: [5.7, 8.0]",
version
);
}
}
Err(e) => {
warn!(e; "Failed to check MySQL version through sql: {}", sql);
}
}
Ok(())
}
}

View File

@@ -109,10 +109,10 @@ impl<'a> ElectionSqlFactory<'a> {
}
}
// Currently the session timeout is longer than the leader lease time, so the leader lease may expire while the session is still alive.
// Either the leader reconnects and step down or the session expires and the lock is released.
fn set_idle_session_timeout_sql(&self) -> &str {
"SET idle_session_timeout = '10s';"
// Currently the session timeout is longer than the leader lease time.
// So the leader will renew the lease twice before the session timeout if everything goes well.
fn set_idle_session_timeout_sql(&self) -> String {
format!("SET idle_session_timeout = '{}s';", META_LEASE_SECS + 1)
}
fn campaign_sql(&self) -> String {
@@ -241,7 +241,7 @@ impl PgElection {
let sql_factory = ElectionSqlFactory::new(lock_id, table_name);
// Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead advisory lock.
client
.execute(sql_factory.set_idle_session_timeout_sql(), &[])
.execute(&sql_factory.set_idle_session_timeout_sql(), &[])
.await
.context(PostgresExecutionSnafu)?;
@@ -316,7 +316,9 @@ impl Election for PgElection {
prev_expire_time > current_time,
UnexpectedSnafu {
violated: format!(
"Candidate lease expired, key: {:?}",
"Candidate lease expired at {:?} (current time {:?}), key: {:?}",
prev_expire_time,
current_time,
String::from_utf8_lossy(&key.into_bytes())
),
}
@@ -368,23 +370,19 @@ impl Election for PgElection {
.query(&self.sql_set.campaign, &[])
.await
.context(PostgresExecutionSnafu)?;
if let Some(row) = res.first() {
match row.try_get(0) {
Ok(true) => self.leader_action().await?,
Ok(false) => self.follower_action().await?,
Err(_) => {
return UnexpectedSnafu {
violated: "Failed to get the result of acquiring advisory lock"
.to_string(),
}
.fail();
}
let row = res.first().context(UnexpectedSnafu {
violated: "Failed to get the result of acquiring advisory lock",
})?;
let is_leader = row.try_get(0).map_err(|_| {
UnexpectedSnafu {
violated: "Failed to get the result of get lock",
}
.build()
})?;
if is_leader {
self.leader_action().await?;
} else {
return UnexpectedSnafu {
violated: "Failed to get the result of acquiring advisory lock".to_string(),
}
.fail();
self.follower_action().await?;
}
let _ = keep_alive_interval.tick().await;
}

View File

@@ -343,6 +343,16 @@ pub enum Error {
location: Location,
},
#[cfg(feature = "mysql_kvbackend")]
#[snafu(display("Failed to parse mysql url: {}", mysql_url))]
ParseMySqlUrl {
#[snafu(source)]
error: sqlx::error::Error,
mysql_url: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to find table route for {table_id}"))]
TableRouteNotFound {
table_id: TableId,
@@ -729,6 +739,34 @@ pub enum Error {
location: Location,
},
#[cfg(feature = "mysql_kvbackend")]
#[snafu(display("Failed to execute via mysql, sql: {}", sql))]
MySqlExecution {
#[snafu(source)]
error: sqlx::Error,
#[snafu(implicit)]
location: Location,
sql: String,
},
#[cfg(feature = "mysql_kvbackend")]
#[snafu(display("Failed to create mysql pool"))]
CreateMySqlPool {
#[snafu(source)]
error: sqlx::Error,
#[snafu(implicit)]
location: Location,
},
#[cfg(feature = "mysql_kvbackend")]
#[snafu(display("Failed to connect to mysql"))]
ConnectMySql {
#[snafu(source)]
error: sqlx::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Handler not found: {}", name))]
HandlerNotFound {
name: String,
@@ -911,6 +949,11 @@ impl ErrorExt for Error {
| Error::GetPostgresConnection { .. }
| Error::PostgresExecution { .. }
| Error::ConnectPostgres { .. } => StatusCode::Internal,
#[cfg(feature = "mysql_kvbackend")]
Error::MySqlExecution { .. }
| Error::CreateMySqlPool { .. }
| Error::ConnectMySql { .. }
| Error::ParseMySqlUrl { .. } => StatusCode::Internal,
}
}

View File

@@ -153,7 +153,7 @@ fn extract_base_info(request: &HeartbeatRequest) -> Option<(NodeInfoKey, Peer, P
return None;
};
Some((key, Peer::from(peer.clone()), info.clone()))
Some((key, peer.clone(), info.clone()))
}
async fn put_into_memory_store(ctx: &mut Context, key: NodeInfoKey, value: NodeInfo) -> Result<()> {

View File

@@ -1,231 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::{HeartbeatRequest, Role};
use common_meta::region_registry::LeaderRegion;
use store_api::region_engine::RegionRole;
use crate::error::Result;
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
pub struct CollectLeaderRegionHandler;
#[async_trait::async_trait]
impl HeartbeatHandler for CollectLeaderRegionHandler {
fn is_acceptable(&self, role: Role) -> bool {
role == Role::Datanode
}
async fn handle(
&self,
_req: &HeartbeatRequest,
ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<HandleControl> {
let Some(current_stat) = acc.stat.as_ref() else {
return Ok(HandleControl::Continue);
};
let mut key_values = Vec::with_capacity(current_stat.region_stats.len());
for stat in current_stat.region_stats.iter() {
if stat.role != RegionRole::Leader {
continue;
}
let manifest = stat.region_manifest.into();
let value = LeaderRegion {
datanode_id: current_stat.id,
manifest,
};
key_values.push((stat.id, value));
}
ctx.leader_region_registry.batch_put(key_values);
Ok(HandleControl::Continue)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::region_registry::{LeaderRegionManifestInfo, LeaderRegionRegistry};
use common_meta::sequence::SequenceBuilder;
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
use super::*;
use crate::cluster::MetaPeerClientBuilder;
use crate::handler::{HeartbeatMailbox, Pushers};
use crate::service::store::cached_kv::LeaderCachedKvBackend;
fn mock_ctx() -> Context {
let in_memory = Arc::new(MemoryKvBackend::new());
let kv_backend = Arc::new(MemoryKvBackend::new());
let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader(
kv_backend.clone(),
));
let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build();
let mailbox = HeartbeatMailbox::create(Pushers::default(), seq);
let meta_peer_client = MetaPeerClientBuilder::default()
.election(None)
.in_memory(in_memory.clone())
.build()
.map(Arc::new)
// Safety: all required fields set at initialization
.unwrap();
Context {
server_addr: "127.0.0.1:0000".to_string(),
in_memory,
kv_backend: kv_backend.clone(),
leader_cached_kv_backend,
meta_peer_client,
mailbox,
election: None,
is_infancy: false,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
cache_invalidator: Arc::new(DummyCacheInvalidator),
leader_region_registry: Arc::new(LeaderRegionRegistry::new()),
}
}
fn new_region_stat(id: RegionId, manifest_version: u64, role: RegionRole) -> RegionStat {
RegionStat {
id,
region_manifest: RegionManifestInfo::Mito {
manifest_version,
flushed_entry_id: 0,
},
rcus: 0,
wcus: 0,
approximate_bytes: 0,
engine: "mito".to_string(),
role,
num_rows: 0,
memtable_size: 0,
manifest_size: 0,
sst_size: 0,
index_size: 0,
}
}
#[tokio::test]
async fn test_handle_collect_leader_region() {
let mut ctx = mock_ctx();
let mut acc = HeartbeatAccumulator {
stat: Some(Stat {
id: 1,
region_stats: vec![
new_region_stat(RegionId::new(1, 1), 1, RegionRole::Leader),
new_region_stat(RegionId::new(1, 2), 2, RegionRole::Follower),
],
addr: "127.0.0.1:0000".to_string(),
region_num: 2,
..Default::default()
}),
..Default::default()
};
let handler = CollectLeaderRegionHandler;
let control = handler
.handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
.await
.unwrap();
assert_eq!(control, HandleControl::Continue);
let regions = ctx
.leader_region_registry
.batch_get(vec![RegionId::new(1, 1), RegionId::new(1, 2)].into_iter());
assert_eq!(regions.len(), 1);
assert_eq!(
regions.get(&RegionId::new(1, 1)),
Some(&LeaderRegion {
datanode_id: 1,
manifest: LeaderRegionManifestInfo::Mito {
manifest_version: 1,
flushed_entry_id: 0,
},
})
);
// New heartbeat with new manifest version
acc.stat = Some(Stat {
id: 1,
region_stats: vec![new_region_stat(RegionId::new(1, 1), 2, RegionRole::Leader)],
timestamp_millis: 0,
addr: "127.0.0.1:0000".to_string(),
region_num: 1,
node_epoch: 0,
..Default::default()
});
let control = handler
.handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
.await
.unwrap();
assert_eq!(control, HandleControl::Continue);
let regions = ctx
.leader_region_registry
.batch_get(vec![RegionId::new(1, 1)].into_iter());
assert_eq!(regions.len(), 1);
assert_eq!(
regions.get(&RegionId::new(1, 1)),
Some(&LeaderRegion {
datanode_id: 1,
manifest: LeaderRegionManifestInfo::Mito {
manifest_version: 2,
flushed_entry_id: 0,
},
})
);
// New heartbeat with old manifest version
acc.stat = Some(Stat {
id: 1,
region_stats: vec![new_region_stat(RegionId::new(1, 1), 1, RegionRole::Leader)],
timestamp_millis: 0,
addr: "127.0.0.1:0000".to_string(),
region_num: 1,
node_epoch: 0,
..Default::default()
});
let control = handler
.handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
.await
.unwrap();
assert_eq!(control, HandleControl::Continue);
let regions = ctx
.leader_region_registry
.batch_get(vec![RegionId::new(1, 1)].into_iter());
assert_eq!(regions.len(), 1);
assert_eq!(
regions.get(&RegionId::new(1, 1)),
// The manifest version is not updated
Some(&LeaderRegion {
datanode_id: 1,
manifest: LeaderRegionManifestInfo::Mito {
manifest_version: 2,
flushed_entry_id: 0,
},
})
);
}
}

View File

@@ -70,7 +70,7 @@ impl HeartbeatHandler for RemapFlowPeerHandler {
async fn rewrite_node_address(ctx: &mut Context, peer: &Peer) {
let key = NodeAddressKey::with_flownode(peer.id).to_bytes();
if let Ok(value) = NodeAddressValue::new(peer.clone().into()).try_as_raw_value() {
if let Ok(value) = NodeAddressValue::new(peer.clone()).try_as_raw_value() {
let put = PutRequest {
key,
value,

View File

@@ -70,11 +70,11 @@ use crate::state::{become_follower, become_leader, StateRef};
pub const TABLE_ID_SEQ: &str = "table_id";
pub const FLOW_ID_SEQ: &str = "flow_id";
pub const METASRV_HOME: &str = "/tmp/metasrv";
pub const METASRV_HOME: &str = "./greptimedb_data/metasrv";
#[cfg(feature = "pg_kvbackend")]
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
pub const DEFAULT_META_TABLE_NAME: &str = "greptime_metakv";
#[cfg(feature = "pg_kvbackend")]
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
pub const DEFAULT_META_ELECTION_LOCK_ID: u64 = 1;
// The datastores that implements metadata kvbackend.
@@ -89,6 +89,9 @@ pub enum BackendImpl {
#[cfg(feature = "pg_kvbackend")]
// Postgres as metadata storage.
PostgresStore,
#[cfg(feature = "mysql_kvbackend")]
// MySql as metadata storage.
MysqlStore,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
@@ -146,7 +149,7 @@ pub struct MetasrvOptions {
pub tracing: TracingOptions,
/// The datastore for kv metadata.
pub backend: BackendImpl,
#[cfg(feature = "pg_kvbackend")]
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
/// Table name of rds kv backend.
pub meta_table_name: String,
#[cfg(feature = "pg_kvbackend")]
@@ -191,7 +194,7 @@ impl Default for MetasrvOptions {
flush_stats_factor: 3,
tracing: TracingOptions::default(),
backend: BackendImpl::EtcdStore,
#[cfg(feature = "pg_kvbackend")]
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
meta_table_name: DEFAULT_META_TABLE_NAME.to_string(),
#[cfg(feature = "pg_kvbackend")]
meta_election_lock_id: DEFAULT_META_ELECTION_LOCK_ID,

View File

@@ -447,20 +447,13 @@ fn build_procedure_manager(
retry_delay: options.procedure.retry_delay,
..Default::default()
};
let kv_state_store = Arc::new(
KvStateStore::new(kv_backend.clone()).with_max_value_size(
options
.procedure
.max_metadata_value_size
.map(|v| v.as_bytes() as usize),
),
let state_store = KvStateStore::new(kv_backend.clone()).with_max_value_size(
options
.procedure
.max_metadata_value_size
.map(|v| v.as_bytes() as usize),
);
Arc::new(LocalManager::new(
manager_config,
kv_state_store.clone(),
kv_state_store,
))
Arc::new(LocalManager::new(manager_config, Arc::new(state_store)))
}
impl Default for MetasrvBuilder {

View File

@@ -55,11 +55,7 @@ impl TestingEnv {
let mailbox_ctx = MailboxContext::new(mailbox_sequence);
let state_store = Arc::new(KvStateStore::new(kv_backend));
let procedure_manager = Arc::new(LocalManager::new(
ManagerConfig::default(),
state_store.clone(),
state_store,
));
let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store));
Self {
table_metadata_manager,

View File

@@ -31,7 +31,6 @@ use common_meta::sequence::SequenceBuilder;
use common_meta::state_store::KvStateStore;
use common_meta::DatanodeId;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::test_util::InMemoryPoisonStore;
use common_procedure::{Context as ProcedureContext, ProcedureId, ProcedureManagerRef, Status};
use common_procedure_test::MockContextProvider;
use common_telemetry::debug;
@@ -86,12 +85,7 @@ impl TestingEnv {
let opening_region_keeper = Arc::new(MemoryRegionKeeper::default());
let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
let poison_manager = Arc::new(InMemoryPoisonStore::default());
let procedure_manager = Arc::new(LocalManager::new(
ManagerConfig::default(),
state_store,
poison_manager,
));
let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store));
Self {
table_metadata_manager,

View File

@@ -224,13 +224,7 @@ async fn test_on_datanode_create_regions() {
});
let status = procedure.on_datanode_create_regions().await.unwrap();
assert!(matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false,
}
));
assert!(matches!(status, Status::Executing { persist: true }));
assert!(matches!(
procedure.creator.data.state,
CreateTableState::CreateMetadata
@@ -297,13 +291,7 @@ async fn test_on_datanode_create_logical_regions() {
procedure.check_tables_already_exist().await.unwrap();
let status = procedure.on_datanode_create_regions().await.unwrap();
assert!(matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false,
}
));
assert!(matches!(status, Status::Executing { persist: true }));
assert!(matches!(
procedure.data.state(),
&CreateTablesState::CreateMetadata

View File

@@ -1,440 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use common_error::ext::BoxedError;
use common_meta::key::TableMetadataManagerRef;
use common_meta::lock_key::RemoteWalLock;
use common_meta::region_registry::LeaderRegionRegistryRef;
use common_procedure::error::ToJsonSnafu;
use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
Result as ProcedureResult, Status, StringKey,
};
use common_telemetry::warn;
use log_store::kafka::DEFAULT_PARTITION;
use rskafka::client::partition::UnknownTopicHandling;
use rskafka::client::Client;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::logstore::EntryId;
use store_api::storage::RegionId;
use crate::error::{self, BuildPartitionClientSnafu, DeleteRecordSnafu, TableMetadataManagerSnafu};
use crate::Result;
type KafkaClientRef = Arc<Client>;
const TIMEOUT: i32 = 1000;
/// The state of WAL pruning.
#[derive(Debug, Serialize, Deserialize)]
pub enum WalPruneState {
Prepare,
Prune,
}
pub struct Context {
/// The Kafka client.
client: KafkaClientRef,
/// The table metadata manager.
table_metadata_manager: TableMetadataManagerRef,
leader_region_registry: LeaderRegionRegistryRef,
}
/// The data of WAL pruning.
#[derive(Serialize, Deserialize)]
pub struct WalPruneData {
/// The topic name to prune.
pub topic: String,
/// The minimum flush entry id for topic, which is used to prune the WAL.
/// If the topic has no region, the value is set to `None`.
pub min_flushed_entry_id: EntryId,
/// The state.
pub state: WalPruneState,
}
/// The procedure to prune WAL.
pub struct WalPruneProcedure {
pub data: WalPruneData,
pub context: Context,
}
impl WalPruneProcedure {
const TYPE_NAME: &'static str = "metasrv-procedure::WalPrune";
pub fn new(topic: String, context: Context) -> Self {
Self {
data: WalPruneData {
topic,
min_flushed_entry_id: 0,
state: WalPruneState::Prepare,
},
context,
}
}
pub fn from_json(json: &str, context: Context) -> ProcedureResult<Self> {
let data: WalPruneData = serde_json::from_str(json).context(ToJsonSnafu)?;
Ok(Self { data, context })
}
/// Calculate the last entry id to prune for each topic.
pub async fn on_prepare(&mut self) -> Result<Status> {
let region_ids = self
.context
.table_metadata_manager
.topic_region_manager()
.regions(&self.data.topic)
.await
.context(TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: "Failed to get topic-region map",
})?;
let flush_entry_ids_map: HashMap<_, _> = self
.context
.leader_region_registry
.batch_get(region_ids.iter().cloned())
.into_iter()
.map(|(region_id, region)| {
let flushed_entry_id = region.manifest.min_flushed_entry_id();
(region_id, flushed_entry_id)
})
.collect();
if region_ids.is_empty() {
// No regions to prune.
return Ok(Status::done());
}
// Check if the `flush_entry_ids_map` contains all region ids.
let non_collected_region_ids =
check_heartbeat_collected_region_ids(&region_ids, &flush_entry_ids_map);
if !non_collected_region_ids.is_empty() {
// The heartbeat collected region ids do not contain all region ids in the topic-region map.
// In this case, we should not prune the WAL.
warn!("The heartbeat collected region ids do not contain all region ids in the topic-region map. Aborting the WAL prune procedure.
topic: {}, non-collected region ids: {:?}", self.data.topic, non_collected_region_ids);
return Ok(Status::done());
}
// Safety: `flush_entry_ids_map` are not empty.
self.data.min_flushed_entry_id = *(flush_entry_ids_map.values().min().unwrap());
self.data.state = WalPruneState::Prune;
Ok(Status::executing(true))
}
/// Prune the WAL.
pub async fn on_prune(&mut self) -> Result<Status> {
// Safety: last_entry_ids are loaded in on_prepare.
let partition_client = self
.context
.client
.partition_client(
self.data.topic.clone(),
DEFAULT_PARTITION,
UnknownTopicHandling::Retry,
)
.await
.context(BuildPartitionClientSnafu {
topic: self.data.topic.clone(),
partition: DEFAULT_PARTITION,
})?;
partition_client
.delete_records(self.data.min_flushed_entry_id as i64, TIMEOUT)
.await
.context(DeleteRecordSnafu {
topic: self.data.topic.clone(),
partition: DEFAULT_PARTITION,
offset: self.data.min_flushed_entry_id,
})
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: "Failed to delete records",
})?;
// TODO(CookiePie): Persist the minimum flushed entry id to the table metadata manager.
Ok(Status::done())
}
}
#[async_trait::async_trait]
impl Procedure for WalPruneProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
fn rollback_supported(&self) -> bool {
false
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &self.data.state;
match state {
WalPruneState::Prepare => self.on_prepare().await,
WalPruneState::Prune => self.on_prune().await,
}
.map_err(|e| {
if e.is_retryable() {
ProcedureError::retry_later(e)
} else {
ProcedureError::external(e)
}
})
}
fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}
/// WAL prune procedure will read the topic-region map from the table metadata manager,
/// which are modified by `DROP [TABLE|DATABASE]` and `CREATE [TABLE]` operations.
/// But the modifications are atomic, so it does not conflict with the procedure.
/// It only abort the procedure sometimes since the `check_heartbeat_collected_region_ids` fails.
fn lock_key(&self) -> LockKey {
let lock_key: StringKey = RemoteWalLock::Write(self.data.topic.clone()).into();
LockKey::new(vec![lock_key])
}
}
/// Check if the heartbeat collected region ids contains all region ids in the topic-region map.
fn check_heartbeat_collected_region_ids(
region_ids: &[RegionId],
heartbeat_collected_region_ids: &HashMap<RegionId, u64>,
) -> Vec<RegionId> {
let mut non_collected_region_ids = Vec::new();
for region_id in region_ids {
if !heartbeat_collected_region_ids.contains_key(region_id) {
non_collected_region_ids.push(*region_id);
}
}
non_collected_region_ids
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::wal_options_allocator::build_kafka_topic_creator;
use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use common_wal::config::kafka::MetasrvKafkaConfig;
use common_wal::test_util::run_test_with_kafka_wal;
use rskafka::record::Record;
use super::*;
use crate::procedure::test_util::new_wal_prune_metadata;
struct TestEnv {
table_metadata_manager: TableMetadataManagerRef,
leader_region_registry: LeaderRegionRegistryRef,
}
impl TestEnv {
fn new() -> Self {
let kv_backend = Arc::new(MemoryKvBackend::new());
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
let leader_region_registry = Arc::new(LeaderRegionRegistry::new());
Self {
table_metadata_manager,
leader_region_registry,
}
}
fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
&self.table_metadata_manager
}
fn leader_region_registry(&self) -> &LeaderRegionRegistryRef {
&self.leader_region_registry
}
}
/// Mock a test env for testing.
/// Including:
/// 1. Create a test env with a mailbox, a table metadata manager and a in-memory kv backend.
/// 2. Prepare some data in the table metadata manager and in-memory kv backend.
/// 3. Generate a `WalPruneProcedure` with the test env.
/// 4. Return the test env, the procedure, the minimum last entry id to prune and the regions to flush.
async fn mock_test_env(
topic: String,
broker_endpoints: Vec<String>,
env: &TestEnv,
) -> (WalPruneProcedure, u64, Vec<RegionId>) {
// Creates a topic manager.
let kafka_topic = KafkaTopicConfig {
replication_factor: broker_endpoints.len() as i16,
..Default::default()
};
let config = MetasrvKafkaConfig {
connection: KafkaConnectionConfig {
broker_endpoints,
..Default::default()
},
kafka_topic,
..Default::default()
};
let topic_creator = build_kafka_topic_creator(&config).await.unwrap();
let table_metadata_manager = env.table_metadata_manager().clone();
let leader_region_registry = env.leader_region_registry().clone();
let offsets = mock_wal_entries(topic_creator.client().clone(), &topic, 10).await;
let (min_last_entry_id, regions_to_flush) = new_wal_prune_metadata(
table_metadata_manager.clone(),
leader_region_registry.clone(),
10,
5,
&offsets,
10,
topic.clone(),
)
.await;
let context = Context {
client: topic_creator.client().clone(),
table_metadata_manager,
leader_region_registry,
};
let wal_prune_procedure = WalPruneProcedure::new(topic, context);
(wal_prune_procedure, min_last_entry_id, regions_to_flush)
}
fn record(i: usize) -> Record {
let key = format!("key_{i}");
let value = format!("value_{i}");
Record {
key: Some(key.into()),
value: Some(value.into()),
timestamp: chrono::Utc::now(),
headers: Default::default(),
}
}
async fn mock_wal_entries(
client: KafkaClientRef,
topic_name: &str,
n_entries: usize,
) -> Vec<i64> {
let controller_client = client.controller_client().unwrap();
let _ = controller_client
.create_topic(topic_name, 1, 1, 5_000)
.await;
let partition_client = client
.partition_client(topic_name, 0, UnknownTopicHandling::Retry)
.await
.unwrap();
let mut offsets = Vec::with_capacity(n_entries);
for i in 0..n_entries {
let record = vec![record(i)];
let offset = partition_client
.produce(
record,
rskafka::client::partition::Compression::NoCompression,
)
.await
.unwrap();
offsets.extend(offset);
}
offsets
}
async fn check_entry_id_existence(
client: KafkaClientRef,
topic_name: &str,
entry_id: i64,
) -> bool {
let partition_client = client
.partition_client(topic_name, 0, UnknownTopicHandling::Retry)
.await
.unwrap();
let (records, _high_watermark) = partition_client
.fetch_records(entry_id, 0..10001, 5_000)
.await
.unwrap();
!records.is_empty()
}
async fn delete_topic(client: KafkaClientRef, topic_name: &str) {
let controller_client = client.controller_client().unwrap();
controller_client
.delete_topic(topic_name, 5_000)
.await
.unwrap();
}
#[tokio::test]
async fn test_procedure_execution() {
run_test_with_kafka_wal(|broker_endpoints| {
Box::pin(async {
common_telemetry::init_default_ut_logging();
let topic_name = "greptime_test_topic".to_string();
let env = TestEnv::new();
let (mut procedure, min_last_entry_id, _) =
mock_test_env(topic_name.clone(), broker_endpoints, &env).await;
// Step 1: Test `on_prepare`.
let status = procedure.on_prepare().await.unwrap();
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
assert_matches!(procedure.data.state, WalPruneState::Prune);
assert_eq!(procedure.data.min_flushed_entry_id, min_last_entry_id);
// Step 2: Test `on_prune`.
let status = procedure.on_prune().await.unwrap();
assert_matches!(status, Status::Done { output: None });
// Check if the entry ids after `min_flushed_entry_id` still exist.
assert!(
check_entry_id_existence(
procedure.context.client.clone(),
&topic_name,
procedure.data.min_flushed_entry_id as i64,
)
.await
);
// Check if the entry s before `min_flushed_entry_id` are deleted.
assert!(
procedure.data.min_flushed_entry_id == 0
|| !check_entry_id_existence(
procedure.context.client.clone(),
&topic_name,
procedure.data.min_flushed_entry_id as i64 - 1,
)
.await
);
// `check_heartbeat_collected_region_ids` fails.
// Should log a warning and return `Status::Done`.
procedure.context.leader_region_registry.reset();
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Done { output: None });
// Clean up the topic.
delete_topic(procedure.context.client, &topic_name).await;
})
})
.await;
}
}

View File

@@ -103,6 +103,7 @@ impl DataRegion {
.get_metadata(region_id)
.await
.context(MitoReadOperationSnafu)?;
let version = region_metadata.schema_version;
// find the max column id
let new_column_id_start = 1 + region_metadata
@@ -165,6 +166,7 @@ impl DataRegion {
debug!("Adding (Column id assigned) columns {new_columns:?} to region {region_id:?}");
// assemble alter request
let alter_request = RegionRequest::Alter(RegionAlterRequest {
schema_version: version,
kind: AlterKind::AddColumns {
columns: new_columns,
},

View File

@@ -234,6 +234,7 @@ mod test {
// alter physical region
let physical_region_id = env.default_physical_region_id();
let request = RegionAlterRequest {
schema_version: 0,
kind: AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: ColumnMetadata {
@@ -261,6 +262,7 @@ mod test {
// alter physical region's option should work
let alter_region_option_request = RegionAlterRequest {
schema_version: 0,
kind: AlterKind::SetRegionOptions {
options: vec![SetRegionOption::Ttl(Some(Duration::from_secs(500).into()))],
},

View File

@@ -59,7 +59,7 @@ pub mod engine;
pub mod error;
mod metadata_region;
mod metrics;
mod row_modifier;
pub mod row_modifier;
#[cfg(test)]
mod test_util;
mod utils;

View File

@@ -40,7 +40,7 @@ const TSID_HASH_SEED: u32 = 846793005;
///
/// - For [`PrimaryKeyEncoding::Dense`] encoding,
/// it adds two columns(`__table_id`, `__tsid`) to the row.
pub struct RowModifier {
pub(crate) struct RowModifier {
codec: SparsePrimaryKeyCodec,
}
@@ -52,7 +52,7 @@ impl RowModifier {
}
/// Modify rows with the given primary key encoding.
pub fn modify_rows(
pub(crate) fn modify_rows(
&self,
iter: RowsIter,
table_id: TableId,
@@ -145,16 +145,14 @@ impl RowModifier {
/// Fills internal columns of a row with table name and a hash of tag values.
fn fill_internal_columns(&self, table_id: TableId, iter: &RowIter<'_>) -> (Value, Value) {
let mut hasher = mur3::Hasher128::with_seed(TSID_HASH_SEED);
let mut hasher = TsidGenerator::default();
for (name, value) in iter.primary_keys_with_name() {
// The type is checked before. So only null is ignored.
if let Some(ValueData::StringValue(string)) = &value.value_data {
name.hash(&mut hasher);
string.hash(&mut hasher);
hasher.write_label(name, string);
}
}
// TSID is 64 bits, simply truncate the 128 bits hash
let (hash, _) = hasher.finish128();
let hash = hasher.finish();
(
ValueData::U32Value(table_id).into(),
@@ -163,6 +161,34 @@ impl RowModifier {
}
}
/// Tsid generator.
pub struct TsidGenerator {
hasher: mur3::Hasher128,
}
impl Default for TsidGenerator {
fn default() -> Self {
Self {
hasher: mur3::Hasher128::with_seed(TSID_HASH_SEED),
}
}
}
impl TsidGenerator {
/// Writes a label pair to the generator.
pub fn write_label(&mut self, name: &str, value: &str) {
name.hash(&mut self.hasher);
value.hash(&mut self.hasher);
}
/// Generates a new TSID.
pub fn finish(&mut self) -> u64 {
// TSID is 64 bits, simply truncate the 128 bits hash
let (hash, _) = self.hasher.finish128();
hash
}
}
/// Index of a value.
#[derive(Debug, Clone, Copy)]
struct ValueIndex {

View File

@@ -177,6 +177,7 @@ pub fn alter_logical_region_add_tag_columns(
});
}
RegionAlterRequest {
schema_version: 0,
kind: AlterKind::AddColumns {
columns: new_columns,
},

View File

@@ -121,7 +121,7 @@ impl AccessLayer {
/// Writes a SST with specific `file_id` and `metadata` to the layer.
///
/// Returns the info of the SST. If no data written, returns None.
pub(crate) async fn write_sst(
pub async fn write_sst(
&self,
request: SstWriteRequest,
write_opts: &WriteOptions,
@@ -191,26 +191,26 @@ impl AccessLayer {
/// `OperationType` represents the origin of the `SstWriteRequest`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(crate) enum OperationType {
pub enum OperationType {
Flush,
Compact,
}
/// Contents to build a SST.
pub(crate) struct SstWriteRequest {
pub(crate) op_type: OperationType,
pub(crate) metadata: RegionMetadataRef,
pub(crate) source: Source,
pub(crate) cache_manager: CacheManagerRef,
pub struct SstWriteRequest {
pub op_type: OperationType,
pub metadata: RegionMetadataRef,
pub source: Source,
pub cache_manager: CacheManagerRef,
#[allow(dead_code)]
pub(crate) storage: Option<String>,
pub(crate) max_sequence: Option<SequenceNumber>,
pub storage: Option<String>,
pub max_sequence: Option<SequenceNumber>,
/// Configs for index
pub(crate) index_options: IndexOptions,
pub(crate) inverted_index_config: InvertedIndexConfig,
pub(crate) fulltext_index_config: FulltextIndexConfig,
pub(crate) bloom_filter_index_config: BloomFilterConfig,
pub index_options: IndexOptions,
pub inverted_index_config: InvertedIndexConfig,
pub fulltext_index_config: FulltextIndexConfig,
pub bloom_filter_index_config: BloomFilterConfig,
}
pub(crate) async fn new_fs_cache_store(root: &str) -> Result<ObjectStore> {

View File

@@ -46,6 +46,7 @@ const INDEX_CREATE_MEM_THRESHOLD_FACTOR: u64 = 16;
pub(crate) const FETCH_OPTION_TIMEOUT: Duration = Duration::from_secs(3);
/// Configuration for [MitoEngine](crate::engine::MitoEngine).
/// Before using the config, make sure to call `MitoConfig::validate()` to check if the config is valid.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(default)]
pub struct MitoConfig {

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
@@ -20,6 +19,7 @@ use std::time::Duration;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, Row, Rows, SemanticType};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::RecordBatches;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextOptions};
@@ -34,7 +34,6 @@ use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::engine::listener::{AlterFlushListener, NotifyRegionChangeResultListener};
use crate::engine::MitoEngine;
use crate::error;
use crate::test_util::{
build_rows, build_rows_for_key, flush_region, put_rows, rows_schema, CreateRequestBuilder,
TestEnv,
@@ -52,6 +51,7 @@ async fn scan_check_after_alter(engine: &MitoEngine, region_id: RegionId, expect
fn add_tag1() -> RegionAlterRequest {
RegionAlterRequest {
schema_version: 0,
kind: AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: ColumnMetadata {
@@ -71,6 +71,7 @@ fn add_tag1() -> RegionAlterRequest {
fn alter_column_inverted_index() -> RegionAlterRequest {
RegionAlterRequest {
schema_version: 0,
kind: AlterKind::SetIndex {
options: ApiSetIndexOptions::Inverted {
column_name: "tag_0".to_string(),
@@ -81,6 +82,7 @@ fn alter_column_inverted_index() -> RegionAlterRequest {
fn alter_column_fulltext_options() -> RegionAlterRequest {
RegionAlterRequest {
schema_version: 0,
kind: AlterKind::SetIndex {
options: ApiSetIndexOptions::Fulltext {
column_name: "tag_0".to_string(),
@@ -355,8 +357,7 @@ async fn test_alter_region_retry() {
.handle_request(region_id, RegionRequest::Alter(request))
.await
.unwrap_err();
let err = err.as_any().downcast_ref::<error::Error>().unwrap();
assert_matches!(err, &error::Error::InvalidRegionRequest { .. });
assert_eq!(err.status_code(), StatusCode::RequestOutdated);
let expected = "\
+-------+-------+---------+---------------------+
@@ -730,6 +731,7 @@ async fn test_alter_region_ttl_options() {
.unwrap();
let engine_cloned = engine.clone();
let alter_ttl_request = RegionAlterRequest {
schema_version: 0,
kind: AlterKind::SetRegionOptions {
options: vec![SetRegionOption::Ttl(Some(Duration::from_secs(500).into()))],
},

View File

@@ -535,6 +535,7 @@ async fn test_change_region_compaction_window() {
// Change compaction window.
let request = RegionRequest::Alter(RegionAlterRequest {
schema_version: region.metadata().schema_version,
kind: SetRegionOptions {
options: vec![SetRegionOption::Twsc(
"compaction.twcs.time_window".to_string(),

View File

@@ -1,246 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches::assert_matches;
use api::v1::{Rows, SemanticType};
use common_error::ext::ErrorExt;
use common_recordbatch::RecordBatches;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use store_api::metadata::ColumnMetadata;
use store_api::region_engine::{RegionEngine, RegionManifestInfo};
use store_api::region_request::{
AddColumn, AddColumnLocation, AlterKind, RegionAlterRequest, RegionOpenRequest, RegionRequest,
};
use store_api::storage::{RegionId, ScanRequest};
use super::MitoEngine;
use crate::config::MitoConfig;
use crate::error::Error;
use crate::test_util::{
build_rows, flush_region, put_rows, rows_schema, CreateRequestBuilder, TestEnv,
};
fn add_tag1() -> RegionAlterRequest {
RegionAlterRequest {
kind: AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_1",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 3,
},
location: Some(AddColumnLocation::First),
}],
},
}
}
async fn scan_check(
engine: &MitoEngine,
region_id: RegionId,
expected: &str,
num_memtable: usize,
num_files: usize,
) {
let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request).unwrap();
assert_eq!(num_memtable, scanner.num_memtables());
assert_eq!(num_files, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, batches.pretty_print().unwrap());
}
#[tokio::test]
async fn test_sync_after_flush_region() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
let request = CreateRequestBuilder::new().build();
let region_dir = request.region_dir.clone();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let rows = Rows {
schema: column_schemas,
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;
// Open the region on the follower engine
let follower_engine = env.create_follower_engine(MitoConfig::default()).await;
follower_engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: Default::default(),
// Ensure the region is not replayed from the WAL.
skip_wal_replay: true,
}),
)
.await
.unwrap();
flush_region(&engine, region_id, None).await;
// Scan the region on the leader engine
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+";
scan_check(&engine, region_id, expected, 0, 1).await;
common_telemetry::info!("Scan the region on the follower engine");
// Scan the region on the follower engine
let expected = "++\n++";
scan_check(&follower_engine, region_id, expected, 0, 0).await;
// Returns error since the max manifest is 1
let manifest_info = RegionManifestInfo::mito(2, 0);
let err = follower_engine
.sync_region(region_id, manifest_info)
.await
.unwrap_err();
let err = err.as_any().downcast_ref::<Error>().unwrap();
assert_matches!(err, Error::InstallManifestTo { .. });
let manifest_info = RegionManifestInfo::mito(1, 0);
follower_engine
.sync_region(region_id, manifest_info)
.await
.unwrap();
common_telemetry::info!("Scan the region on the follower engine after sync");
// Scan the region on the follower engine
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+";
scan_check(&follower_engine, region_id, expected, 0, 1).await;
}
#[tokio::test]
async fn test_sync_after_alter_region() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
let column_schemas = rows_schema(&request);
let region_dir = request.region_dir.clone();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let rows = Rows {
schema: column_schemas,
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;
// Open the region on the follower engine
let follower_engine = env.create_follower_engine(MitoConfig::default()).await;
follower_engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: Default::default(),
// Ensure the region is not replayed from the WAL.
skip_wal_replay: true,
}),
)
.await
.unwrap();
let request = add_tag1();
engine
.handle_request(region_id, RegionRequest::Alter(request))
.await
.unwrap();
let expected = "\
+-------+-------+---------+---------------------+
| tag_1 | tag_0 | field_0 | ts |
+-------+-------+---------+---------------------+
| | 0 | 0.0 | 1970-01-01T00:00:00 |
| | 1 | 1.0 | 1970-01-01T00:00:01 |
| | 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+-------+---------+---------------------+";
scan_check(&engine, region_id, expected, 0, 1).await;
let expected = "++\n++";
scan_check(&follower_engine, region_id, expected, 0, 0).await;
// Sync the region from the leader engine to the follower engine
let manifest_info = RegionManifestInfo::mito(2, 0);
follower_engine
.sync_region(region_id, manifest_info)
.await
.unwrap();
let expected = "\
+-------+-------+---------+---------------------+
| tag_1 | tag_0 | field_0 | ts |
+-------+-------+---------+---------------------+
| | 0 | 0.0 | 1970-01-01T00:00:00 |
| | 1 | 1.0 | 1970-01-01T00:00:01 |
| | 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+-------+---------+---------------------+";
scan_check(&follower_engine, region_id, expected, 0, 1).await;
}

View File

@@ -42,6 +42,14 @@ use crate::worker::WorkerId;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("External error, context: {}", context))]
External {
source: BoxedError,
context: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to encode sparse primary key, reason: {}", reason))]
EncodeSparsePrimaryKey {
reason: String,
@@ -482,6 +490,14 @@ pub enum Error {
location: Location,
},
#[snafu(display("Schema version doesn't match. Expect {} but gives {}", expect, actual))]
InvalidRegionRequestSchemaVersion {
expect: u64,
actual: u64,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Region {} is in {:?} state, which does not permit manifest updates.",
region_id,
@@ -765,6 +781,50 @@ pub enum Error {
#[snafu(display("checksum mismatch (actual: {}, expected: {})", actual, expected))]
ChecksumMismatch { actual: u32, expected: u32 },
#[snafu(display(
"No checkpoint found, region: {}, last_version: {}",
region_id,
last_version
))]
NoCheckpoint {
region_id: RegionId,
last_version: ManifestVersion,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"No manifests found in range: [{}..{}), region: {}, last_version: {}",
start_version,
end_version,
region_id,
last_version
))]
NoManifests {
region_id: RegionId,
start_version: ManifestVersion,
end_version: ManifestVersion,
last_version: ManifestVersion,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Failed to install manifest to {}, region: {}, available manifest version: {}, last version: {}",
target_version,
available_version,
region_id,
last_version
))]
InstallManifestTo {
region_id: RegionId,
target_version: ManifestVersion,
available_version: ManifestVersion,
#[snafu(implicit)]
location: Location,
last_version: ManifestVersion,
},
#[snafu(display("Region {} is stopped", region_id))]
RegionStopped {
region_id: RegionId,
@@ -1003,7 +1063,10 @@ impl ErrorExt for Error {
| OperateAbortedIndex { .. }
| UnexpectedReplay { .. }
| IndexEncodeNull { .. }
| UnexpectedImpureDefault { .. } => StatusCode::Unexpected,
| UnexpectedImpureDefault { .. }
| NoCheckpoint { .. }
| NoManifests { .. }
| InstallManifestTo { .. } => StatusCode::Unexpected,
RegionNotFound { .. } => StatusCode::RegionNotFound,
ObjectStoreNotFound { .. }
| InvalidScanIndex { .. }
@@ -1018,6 +1081,8 @@ impl ErrorExt for Error {
| PartitionOutOfRange { .. }
| ParseJobId { .. } => StatusCode::InvalidArguments,
InvalidRegionRequestSchemaVersion { .. } => StatusCode::RequestOutdated,
RegionMetadataNotFound { .. }
| Join { .. }
| WorkerStopped { .. }
@@ -1080,6 +1145,8 @@ impl ErrorExt for Error {
InvalidConfig { .. } => StatusCode::InvalidArguments,
StaleLogEntry { .. } => StatusCode::Unexpected,
External { source, .. } => source.status_code(),
FilterRecordBatch { source, .. } => source.status_code(),
Download { .. } | Upload { .. } => StatusCode::StorageUnavailable,

Some files were not shown because too many files have changed in this diff Show More