mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-04 12:22:55 +00:00
Compare commits
115 Commits
v0.12.0-ni
...
v0.11.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7fe735009c | ||
|
|
f0298afaf0 | ||
|
|
5175dea6b3 | ||
|
|
7caa88abc7 | ||
|
|
eafb01dfff | ||
|
|
b0de816d3d | ||
|
|
5c6161a95e | ||
|
|
5e3c5945c4 | ||
|
|
f6feac26f5 | ||
|
|
4b2c59e626 | ||
|
|
cf605ecccc | ||
|
|
ab3f9c42f1 | ||
|
|
258fc6f31b | ||
|
|
e2dccc1d1a | ||
|
|
78c5707642 | ||
|
|
204b5e474f | ||
|
|
e9f1fa0b7d | ||
|
|
a988ff5acf | ||
|
|
ef0fca9388 | ||
|
|
b704e7f703 | ||
|
|
3a4c636e29 | ||
|
|
a22e8b421c | ||
|
|
5b42546204 | ||
|
|
0678a31ab1 | ||
|
|
589cc84048 | ||
|
|
ed8c072a5e | ||
|
|
9d172f1cae | ||
|
|
236888313d | ||
|
|
0b97ef0e4f | ||
|
|
316e6a83eb | ||
|
|
6dc57b7a6c | ||
|
|
1f5c2b32e5 | ||
|
|
01e907be40 | ||
|
|
e4dc5ea243 | ||
|
|
3ff5754b5a | ||
|
|
c22ca3ebd5 | ||
|
|
327d165ad9 | ||
|
|
fe63a620ef | ||
|
|
be81f0db5a | ||
|
|
6ca7a305ae | ||
|
|
1111a8bd57 | ||
|
|
66b21b29b5 | ||
|
|
31cfab81ad | ||
|
|
dd3a509607 | ||
|
|
d4cae6af1e | ||
|
|
3fec71b5c0 | ||
|
|
9e31a6478b | ||
|
|
bce291a8e1 | ||
|
|
c788eb67e2 | ||
|
|
0c32dcf46c | ||
|
|
68a05b38bd | ||
|
|
ee72ae8bd0 | ||
|
|
556bd796d8 | ||
|
|
1327e8809f | ||
|
|
17d75c767c | ||
|
|
a1ed450c0c | ||
|
|
ea4ce9d1e3 | ||
|
|
1f7d9666b7 | ||
|
|
9f1a0d78b2 | ||
|
|
ed8e418716 | ||
|
|
9e7121c1bb | ||
|
|
94a49ed4f0 | ||
|
|
f5e743379f | ||
|
|
6735e5867e | ||
|
|
925525726b | ||
|
|
6427682a9a | ||
|
|
55b0022676 | ||
|
|
2d84cc8d87 | ||
|
|
c030705b17 | ||
|
|
443c600bd0 | ||
|
|
39cadfe10b | ||
|
|
9b5e4e80f7 | ||
|
|
041a276b66 | ||
|
|
614a25ddc5 | ||
|
|
4337e20010 | ||
|
|
65c52cc698 | ||
|
|
50f31fd681 | ||
|
|
b5af5aaf8d | ||
|
|
27693c7f1e | ||
|
|
a59fef9ffb | ||
|
|
bcecd8ce52 | ||
|
|
ffdcb8c1ac | ||
|
|
554121ad79 | ||
|
|
43c12b4f2c | ||
|
|
7aa8c28fe4 | ||
|
|
34fbe7739e | ||
|
|
06d7bd99dd | ||
|
|
b71d842615 | ||
|
|
7f71693b8e | ||
|
|
615ea1a171 | ||
|
|
4e725d259d | ||
|
|
dc2252eb6d | ||
|
|
6d4cc2e070 | ||
|
|
6066ce2c4a | ||
|
|
b90d8f7dbd | ||
|
|
fdccf4ff84 | ||
|
|
8b1484c064 | ||
|
|
576e20ac78 | ||
|
|
10b3e3da0f | ||
|
|
4a3ef2d718 | ||
|
|
65eabb2a05 | ||
|
|
bc5a57f51f | ||
|
|
f24b9d8814 | ||
|
|
dd4d0a88ce | ||
|
|
3d2096fe9d | ||
|
|
35715bb710 | ||
|
|
08a3befa67 | ||
|
|
ca1758d4e7 | ||
|
|
42bf818167 | ||
|
|
2c9b117224 | ||
|
|
3edf2317e1 | ||
|
|
85d72a3cd0 | ||
|
|
928172bd82 | ||
|
|
e9f5bddeff | ||
|
|
486755d795 |
2
.github/workflows/dev-build.yml
vendored
2
.github/workflows/dev-build.yml
vendored
@@ -29,7 +29,7 @@ on:
|
||||
linux_arm64_runner:
|
||||
type: choice
|
||||
description: The runner uses to build linux-arm64 artifacts
|
||||
default: ec2-c6g.4xlarge-arm64
|
||||
default: ec2-c6g.8xlarge-arm64
|
||||
options:
|
||||
- ec2-c6g.xlarge-arm64 # 4C8G
|
||||
- ec2-c6g.2xlarge-arm64 # 8C16G
|
||||
|
||||
2
.github/workflows/nightly-build.yml
vendored
2
.github/workflows/nightly-build.yml
vendored
@@ -27,7 +27,7 @@ on:
|
||||
linux_arm64_runner:
|
||||
type: choice
|
||||
description: The runner uses to build linux-arm64 artifacts
|
||||
default: ec2-c6g.4xlarge-arm64
|
||||
default: ec2-c6g.8xlarge-arm64
|
||||
options:
|
||||
- ec2-c6g.xlarge-arm64 # 4C8G
|
||||
- ec2-c6g.2xlarge-arm64 # 8C16G
|
||||
|
||||
2
.github/workflows/release.yml
vendored
2
.github/workflows/release.yml
vendored
@@ -91,7 +91,7 @@ env:
|
||||
# The scheduled version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-YYYYMMDD', like v0.2.0-nigthly-20230313;
|
||||
NIGHTLY_RELEASE_PREFIX: nightly
|
||||
# Note: The NEXT_RELEASE_VERSION should be modified manually by every formal release.
|
||||
NEXT_RELEASE_VERSION: v0.12.0
|
||||
NEXT_RELEASE_VERSION: v0.11.0
|
||||
|
||||
# Permission reference: https://docs.github.com/en/actions/using-jobs/assigning-permissions-to-jobs
|
||||
permissions:
|
||||
|
||||
181
Cargo.lock
generated
181
Cargo.lock
generated
@@ -188,7 +188,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
|
||||
|
||||
[[package]]
|
||||
name = "api"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"common-base",
|
||||
"common-decimal",
|
||||
@@ -773,7 +773,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "auth"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -1314,7 +1314,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cache"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"catalog",
|
||||
"common-error",
|
||||
@@ -1348,7 +1348,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
|
||||
|
||||
[[package]]
|
||||
name = "catalog"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow",
|
||||
@@ -1684,7 +1684,7 @@ checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97"
|
||||
|
||||
[[package]]
|
||||
name = "cli"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"auth",
|
||||
@@ -1727,7 +1727,7 @@ dependencies = [
|
||||
"session",
|
||||
"snafu 0.8.5",
|
||||
"store-api",
|
||||
"substrait 0.12.0",
|
||||
"substrait 0.11.2",
|
||||
"table",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
@@ -1736,7 +1736,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "client"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arc-swap",
|
||||
@@ -1763,7 +1763,7 @@ dependencies = [
|
||||
"rand",
|
||||
"serde_json",
|
||||
"snafu 0.8.5",
|
||||
"substrait 0.12.0",
|
||||
"substrait 0.11.2",
|
||||
"substrait 0.37.3",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
@@ -1804,7 +1804,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cmd"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"auth",
|
||||
@@ -1864,7 +1864,7 @@ dependencies = [
|
||||
"similar-asserts",
|
||||
"snafu 0.8.5",
|
||||
"store-api",
|
||||
"substrait 0.12.0",
|
||||
"substrait 0.11.2",
|
||||
"table",
|
||||
"temp-env",
|
||||
"tempfile",
|
||||
@@ -1916,7 +1916,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
|
||||
|
||||
[[package]]
|
||||
name = "common-base"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"anymap2",
|
||||
"async-trait",
|
||||
@@ -1938,11 +1938,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-catalog"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
|
||||
[[package]]
|
||||
name = "common-config"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"common-base",
|
||||
"common-error",
|
||||
@@ -1965,7 +1965,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-datasource"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-schema",
|
||||
@@ -2001,7 +2001,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-decimal"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"bigdecimal 0.4.5",
|
||||
"common-error",
|
||||
@@ -2014,7 +2014,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-error"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"http 0.2.12",
|
||||
"snafu 0.8.5",
|
||||
@@ -2024,7 +2024,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-frontend"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"common-error",
|
||||
@@ -2034,7 +2034,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-function"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"approx 0.5.1",
|
||||
@@ -2078,7 +2078,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-greptimedb-telemetry"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"common-runtime",
|
||||
@@ -2095,7 +2095,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-grpc"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow-flight",
|
||||
@@ -2121,7 +2121,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-grpc-expr"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"common-base",
|
||||
@@ -2140,7 +2140,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-macro"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"common-query",
|
||||
@@ -2154,7 +2154,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-mem-prof"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"common-error",
|
||||
"common-macro",
|
||||
@@ -2167,7 +2167,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-meta"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"anymap2",
|
||||
"api",
|
||||
@@ -2192,8 +2192,6 @@ dependencies = [
|
||||
"datafusion-common",
|
||||
"datafusion-expr",
|
||||
"datatypes",
|
||||
"deadpool",
|
||||
"deadpool-postgres",
|
||||
"derive_builder 0.12.0",
|
||||
"etcd-client",
|
||||
"futures",
|
||||
@@ -2226,7 +2224,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-options"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"common-grpc",
|
||||
"humantime-serde",
|
||||
@@ -2235,11 +2233,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-plugins"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
|
||||
[[package]]
|
||||
name = "common-pprof"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"common-error",
|
||||
"common-macro",
|
||||
@@ -2251,7 +2249,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-procedure"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
@@ -2278,7 +2276,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-procedure-test"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"common-procedure",
|
||||
@@ -2286,7 +2284,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-query"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -2312,7 +2310,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-recordbatch"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"common-error",
|
||||
@@ -2331,7 +2329,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-runtime"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"clap 4.5.19",
|
||||
@@ -2361,7 +2359,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-telemetry"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"atty",
|
||||
"backtrace",
|
||||
@@ -2389,7 +2387,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-test-util"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"client",
|
||||
"common-query",
|
||||
@@ -2401,7 +2399,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-time"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"chrono",
|
||||
@@ -2419,7 +2417,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-version"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"build-data",
|
||||
"const_format",
|
||||
@@ -2429,7 +2427,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-wal"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"common-base",
|
||||
"common-error",
|
||||
@@ -3228,7 +3226,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "datanode"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow-flight",
|
||||
@@ -3279,7 +3277,7 @@ dependencies = [
|
||||
"session",
|
||||
"snafu 0.8.5",
|
||||
"store-api",
|
||||
"substrait 0.12.0",
|
||||
"substrait 0.11.2",
|
||||
"table",
|
||||
"tokio",
|
||||
"toml 0.8.19",
|
||||
@@ -3288,7 +3286,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "datatypes"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -3315,39 +3313,6 @@ dependencies = [
|
||||
"sqlparser_derive 0.1.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "deadpool"
|
||||
version = "0.10.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fb84100978c1c7b37f09ed3ce3e5f843af02c2a2c431bae5b19230dad2c1b490"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"deadpool-runtime",
|
||||
"num_cpus",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "deadpool-postgres"
|
||||
version = "0.12.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bda39fa1cfff190d8924d447ad04fd22772c250438ca5ce1dfb3c80621c05aaa"
|
||||
dependencies = [
|
||||
"deadpool",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "deadpool-runtime"
|
||||
version = "0.1.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b"
|
||||
dependencies = [
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "debugid"
|
||||
version = "0.8.0"
|
||||
@@ -3945,7 +3910,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "file-engine"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -4061,7 +4026,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
|
||||
|
||||
[[package]]
|
||||
name = "flow"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow",
|
||||
@@ -4120,7 +4085,7 @@ dependencies = [
|
||||
"snafu 0.8.5",
|
||||
"store-api",
|
||||
"strum 0.25.0",
|
||||
"substrait 0.12.0",
|
||||
"substrait 0.11.2",
|
||||
"table",
|
||||
"tokio",
|
||||
"tonic 0.11.0",
|
||||
@@ -4158,7 +4123,7 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
|
||||
|
||||
[[package]]
|
||||
name = "frontend"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arc-swap",
|
||||
@@ -5308,7 +5273,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "index"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"asynchronous-codec",
|
||||
@@ -6158,7 +6123,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
|
||||
|
||||
[[package]]
|
||||
name = "log-query"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"common-error",
|
||||
@@ -6170,7 +6135,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "log-store"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
@@ -6514,7 +6479,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "meta-client"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -6541,7 +6506,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "meta-srv"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -6566,8 +6531,6 @@ dependencies = [
|
||||
"common-wal",
|
||||
"dashmap",
|
||||
"datatypes",
|
||||
"deadpool",
|
||||
"deadpool-postgres",
|
||||
"derive_builder 0.12.0",
|
||||
"etcd-client",
|
||||
"futures",
|
||||
@@ -6622,7 +6585,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "metric-engine"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"aquamarine",
|
||||
@@ -6716,7 +6679,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "mito2"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"aquamarine",
|
||||
@@ -7453,7 +7416,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "object-store"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bytes",
|
||||
@@ -7706,7 +7669,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "operator"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"api",
|
||||
@@ -7754,7 +7717,7 @@ dependencies = [
|
||||
"sql",
|
||||
"sqlparser 0.45.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=54a267ac89c09b11c0c88934690530807185d3e7)",
|
||||
"store-api",
|
||||
"substrait 0.12.0",
|
||||
"substrait 0.11.2",
|
||||
"table",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
@@ -8004,7 +7967,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "partition"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -8290,7 +8253,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
|
||||
|
||||
[[package]]
|
||||
name = "pipeline"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"api",
|
||||
@@ -8452,7 +8415,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "plugins"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"auth",
|
||||
"clap 4.5.19",
|
||||
@@ -8740,7 +8703,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "promql"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"async-trait",
|
||||
@@ -8975,7 +8938,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "puffin"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"async-compression 0.4.13",
|
||||
"async-trait",
|
||||
@@ -9100,7 +9063,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "query"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"api",
|
||||
@@ -9165,7 +9128,7 @@ dependencies = [
|
||||
"sqlparser 0.45.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=54a267ac89c09b11c0c88934690530807185d3e7)",
|
||||
"statrs",
|
||||
"store-api",
|
||||
"substrait 0.12.0",
|
||||
"substrait 0.11.2",
|
||||
"table",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
@@ -10649,7 +10612,7 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
|
||||
|
||||
[[package]]
|
||||
name = "script"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arc-swap",
|
||||
@@ -10941,7 +10904,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "servers"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"api",
|
||||
@@ -11053,7 +11016,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "session"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arc-swap",
|
||||
@@ -11407,7 +11370,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "sql"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"chrono",
|
||||
@@ -11471,7 +11434,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "sqlness-runner"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"clap 4.5.19",
|
||||
@@ -11689,7 +11652,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "store-api"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"aquamarine",
|
||||
@@ -11851,7 +11814,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "substrait"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
@@ -12050,7 +12013,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "table"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -12327,7 +12290,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
|
||||
|
||||
[[package]]
|
||||
name = "tests-fuzz"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"arbitrary",
|
||||
"async-trait",
|
||||
@@ -12370,7 +12333,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tests-integration"
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow-flight",
|
||||
@@ -12434,7 +12397,7 @@ dependencies = [
|
||||
"sql",
|
||||
"sqlx",
|
||||
"store-api",
|
||||
"substrait 0.12.0",
|
||||
"substrait 0.11.2",
|
||||
"table",
|
||||
"tempfile",
|
||||
"time",
|
||||
|
||||
@@ -68,7 +68,7 @@ members = [
|
||||
resolver = "2"
|
||||
|
||||
[workspace.package]
|
||||
version = "0.12.0"
|
||||
version = "0.11.2"
|
||||
edition = "2021"
|
||||
license = "Apache-2.0"
|
||||
|
||||
@@ -118,8 +118,6 @@ datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion
|
||||
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "7823ef2f63663907edab46af0d51359900f608d6" }
|
||||
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "7823ef2f63663907edab46af0d51359900f608d6" }
|
||||
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "7823ef2f63663907edab46af0d51359900f608d6" }
|
||||
deadpool = "0.10"
|
||||
deadpool-postgres = "0.12"
|
||||
derive_builder = "0.12"
|
||||
dotenv = "0.15"
|
||||
etcd-client = "0.13"
|
||||
|
||||
@@ -94,7 +94,7 @@
|
||||
| `storage` | -- | -- | The data storage options. |
|
||||
| `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. |
|
||||
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
|
||||
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
|
||||
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}/object_cache/read`. An empty string means disabling. |
|
||||
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
|
||||
| `storage.bucket` | String | Unset | The S3 bucket name.<br/>**It's only used when the storage type is `S3`, `Oss` and `Gcs`**. |
|
||||
| `storage.root` | String | Unset | The S3 data will be stored in the specified prefix, for example, `s3://${bucket}/${root}`.<br/>**It's only used when the storage type is `S3`, `Oss` and `Azblob`**. |
|
||||
@@ -132,10 +132,10 @@
|
||||
| `region_engine.mito.vector_cache_size` | String | Auto | Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
|
||||
| `region_engine.mito.page_cache_size` | String | Auto | Cache size for pages of SST row groups. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/8 of OS memory. |
|
||||
| `region_engine.mito.selector_result_cache_size` | String | Auto | Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
|
||||
| `region_engine.mito.enable_write_cache` | Bool | `false` | Whether to enable the write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. |
|
||||
| `region_engine.mito.write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}`. |
|
||||
| `region_engine.mito.write_cache_size` | String | `5GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |
|
||||
| `region_engine.mito.write_cache_ttl` | String | Unset | TTL for write cache. |
|
||||
| `region_engine.mito.enable_experimental_write_cache` | Bool | `false` | Whether to enable the experimental write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. |
|
||||
| `region_engine.mito.experimental_write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}/object_cache/write`. |
|
||||
| `region_engine.mito.experimental_write_cache_size` | String | `5GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |
|
||||
| `region_engine.mito.experimental_write_cache_ttl` | String | Unset | TTL for write cache. |
|
||||
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
|
||||
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
|
||||
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
|
||||
@@ -466,10 +466,10 @@
|
||||
| `region_engine.mito.vector_cache_size` | String | Auto | Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
|
||||
| `region_engine.mito.page_cache_size` | String | Auto | Cache size for pages of SST row groups. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/8 of OS memory. |
|
||||
| `region_engine.mito.selector_result_cache_size` | String | Auto | Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
|
||||
| `region_engine.mito.enable_write_cache` | Bool | `false` | Whether to enable the write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. |
|
||||
| `region_engine.mito.write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}`. |
|
||||
| `region_engine.mito.write_cache_size` | String | `5GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |
|
||||
| `region_engine.mito.write_cache_ttl` | String | Unset | TTL for write cache. |
|
||||
| `region_engine.mito.enable_experimental_write_cache` | Bool | `false` | Whether to enable the experimental write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. |
|
||||
| `region_engine.mito.experimental_write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}`. |
|
||||
| `region_engine.mito.experimental_write_cache_size` | String | `5GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |
|
||||
| `region_engine.mito.experimental_write_cache_ttl` | String | Unset | TTL for write cache. |
|
||||
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
|
||||
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
|
||||
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
|
||||
|
||||
@@ -475,18 +475,18 @@ auto_flush_interval = "1h"
|
||||
## @toml2docs:none-default="Auto"
|
||||
#+ selector_result_cache_size = "512MB"
|
||||
|
||||
## Whether to enable the write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance.
|
||||
enable_write_cache = false
|
||||
## Whether to enable the experimental write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance.
|
||||
enable_experimental_write_cache = false
|
||||
|
||||
## File system path for write cache, defaults to `{data_home}`.
|
||||
write_cache_path = ""
|
||||
experimental_write_cache_path = ""
|
||||
|
||||
## Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger.
|
||||
write_cache_size = "5GiB"
|
||||
experimental_write_cache_size = "5GiB"
|
||||
|
||||
## TTL for write cache.
|
||||
## @toml2docs:none-default
|
||||
write_cache_ttl = "8h"
|
||||
experimental_write_cache_ttl = "8h"
|
||||
|
||||
## Buffer size for SST writing.
|
||||
sst_write_buffer_size = "8MB"
|
||||
|
||||
@@ -337,7 +337,7 @@ data_home = "/tmp/greptimedb/"
|
||||
type = "File"
|
||||
|
||||
## Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.
|
||||
## A local file directory, defaults to `{data_home}`. An empty string means disabling.
|
||||
## A local file directory, defaults to `{data_home}/object_cache/read`. An empty string means disabling.
|
||||
## @toml2docs:none-default
|
||||
#+ cache_path = ""
|
||||
|
||||
@@ -518,18 +518,18 @@ auto_flush_interval = "1h"
|
||||
## @toml2docs:none-default="Auto"
|
||||
#+ selector_result_cache_size = "512MB"
|
||||
|
||||
## Whether to enable the write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance.
|
||||
enable_write_cache = false
|
||||
## Whether to enable the experimental write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance.
|
||||
enable_experimental_write_cache = false
|
||||
|
||||
## File system path for write cache, defaults to `{data_home}`.
|
||||
write_cache_path = ""
|
||||
## File system path for write cache, defaults to `{data_home}/object_cache/write`.
|
||||
experimental_write_cache_path = ""
|
||||
|
||||
## Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger.
|
||||
write_cache_size = "5GiB"
|
||||
experimental_write_cache_size = "5GiB"
|
||||
|
||||
## TTL for write cache.
|
||||
## @toml2docs:none-default
|
||||
write_cache_ttl = "8h"
|
||||
experimental_write_cache_ttl = "8h"
|
||||
|
||||
## Buffer size for SST writing.
|
||||
sst_write_buffer_size = "8MB"
|
||||
|
||||
@@ -62,11 +62,6 @@ impl Instance {
|
||||
pub fn datanode(&self) -> &Datanode {
|
||||
&self.datanode
|
||||
}
|
||||
|
||||
/// allow customizing datanode for downstream projects
|
||||
pub fn datanode_mut(&mut self) -> &mut Datanode {
|
||||
&mut self.datanode
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
||||
@@ -66,11 +66,6 @@ impl Instance {
|
||||
pub fn flownode(&self) -> &FlownodeInstance {
|
||||
&self.flownode
|
||||
}
|
||||
|
||||
/// allow customizing flownode for downstream projects
|
||||
pub fn flownode_mut(&mut self) -> &mut FlownodeInstance {
|
||||
&mut self.flownode
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
|
||||
@@ -69,7 +69,7 @@ fn test_load_datanode_example_config() {
|
||||
region_engine: vec![
|
||||
RegionEngineConfig::Mito(MitoConfig {
|
||||
auto_flush_interval: Duration::from_secs(3600),
|
||||
write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
|
||||
experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
|
||||
..Default::default()
|
||||
}),
|
||||
RegionEngineConfig::File(EngineConfig {}),
|
||||
@@ -203,7 +203,7 @@ fn test_load_standalone_example_config() {
|
||||
region_engine: vec![
|
||||
RegionEngineConfig::Mito(MitoConfig {
|
||||
auto_flush_interval: Duration::from_secs(3600),
|
||||
write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
|
||||
experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
|
||||
..Default::default()
|
||||
}),
|
||||
RegionEngineConfig::File(EngineConfig {}),
|
||||
|
||||
@@ -35,8 +35,6 @@ common-wal.workspace = true
|
||||
datafusion-common.workspace = true
|
||||
datafusion-expr.workspace = true
|
||||
datatypes.workspace = true
|
||||
deadpool.workspace = true
|
||||
deadpool-postgres.workspace = true
|
||||
derive_builder.workspace = true
|
||||
etcd-client.workspace = true
|
||||
futures.workspace = true
|
||||
|
||||
@@ -667,18 +667,10 @@ pub enum Error {
|
||||
},
|
||||
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
#[snafu(display("Failed to create connection pool for Postgres"))]
|
||||
CreatePostgresPool {
|
||||
#[snafu(display("Failed to connect to Postgres"))]
|
||||
ConnectPostgres {
|
||||
#[snafu(source)]
|
||||
error: deadpool_postgres::CreatePoolError,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
#[snafu(display("Failed to get Postgres connection from pool: {}", reason))]
|
||||
GetPostgresConnection {
|
||||
reason: String,
|
||||
error: tokio_postgres::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
@@ -794,9 +786,9 @@ impl ErrorExt for Error {
|
||||
| EmptyDdlTasks { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
PostgresExecution { .. } | CreatePostgresPool { .. } | GetPostgresConnection { .. } => {
|
||||
StatusCode::Internal
|
||||
}
|
||||
PostgresExecution { .. } => StatusCode::Internal,
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
ConnectPostgres { .. } => StatusCode::Internal,
|
||||
Error::DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,17 +16,15 @@ use std::any::Any;
|
||||
use std::borrow::Cow;
|
||||
use std::sync::Arc;
|
||||
|
||||
use deadpool_postgres::{Config, Pool, Runtime};
|
||||
use common_telemetry::error;
|
||||
use snafu::ResultExt;
|
||||
use tokio_postgres::types::ToSql;
|
||||
use tokio_postgres::NoTls;
|
||||
use tokio_postgres::{Client, NoTls};
|
||||
|
||||
use crate::error::{
|
||||
CreatePostgresPoolSnafu, Error, GetPostgresConnectionSnafu, PostgresExecutionSnafu, Result,
|
||||
StrFromUtf8Snafu,
|
||||
};
|
||||
use super::{KvBackend, TxnService};
|
||||
use crate::error::{ConnectPostgresSnafu, Error, PostgresExecutionSnafu, Result, StrFromUtf8Snafu};
|
||||
use crate::kv_backend::txn::{Txn as KvTxn, TxnResponse as KvTxnResponse};
|
||||
use crate::kv_backend::{KvBackend, KvBackendRef, TxnService};
|
||||
use crate::kv_backend::KvBackendRef;
|
||||
use crate::rpc::store::{
|
||||
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
|
||||
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
|
||||
@@ -36,7 +34,8 @@ use crate::rpc::KeyValue;
|
||||
|
||||
/// Posgres backend store for metasrv
|
||||
pub struct PgStore {
|
||||
pool: Pool,
|
||||
// TODO: Consider using sqlx crate.
|
||||
client: Client,
|
||||
}
|
||||
|
||||
const EMPTY: &[u8] = &[0];
|
||||
@@ -95,49 +94,33 @@ SELECT k, v FROM prev;"#;
|
||||
impl PgStore {
|
||||
/// Create pgstore impl of KvBackendRef from url.
|
||||
pub async fn with_url(url: &str) -> Result<KvBackendRef> {
|
||||
let mut cfg = Config::new();
|
||||
cfg.url = Some(url.to_string());
|
||||
let pool = cfg
|
||||
.create_pool(Some(Runtime::Tokio1), NoTls)
|
||||
.context(CreatePostgresPoolSnafu)?;
|
||||
Self::with_pg_pool(pool).await
|
||||
// TODO: support tls.
|
||||
let (client, conn) = tokio_postgres::connect(url, NoTls)
|
||||
.await
|
||||
.context(ConnectPostgresSnafu)?;
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = conn.await {
|
||||
error!(e; "connection error");
|
||||
}
|
||||
});
|
||||
Self::with_pg_client(client).await
|
||||
}
|
||||
|
||||
/// Create pgstore impl of KvBackendRef from tokio-postgres client.
|
||||
pub async fn with_pg_pool(pool: Pool) -> Result<KvBackendRef> {
|
||||
pub async fn with_pg_client(client: Client) -> Result<KvBackendRef> {
|
||||
// This step ensures the postgres metadata backend is ready to use.
|
||||
// We check if greptime_metakv table exists, and we will create a new table
|
||||
// if it does not exist.
|
||||
let client = match pool.get().await {
|
||||
Ok(client) => client,
|
||||
Err(e) => {
|
||||
return GetPostgresConnectionSnafu {
|
||||
reason: e.to_string(),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
};
|
||||
client
|
||||
.execute(METADKV_CREATION, &[])
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
Ok(Arc::new(Self { pool }))
|
||||
}
|
||||
|
||||
async fn get_client(&self) -> Result<deadpool::managed::Object<deadpool_postgres::Manager>> {
|
||||
match self.pool.get().await {
|
||||
Ok(client) => Ok(client),
|
||||
Err(e) => GetPostgresConnectionSnafu {
|
||||
reason: e.to_string(),
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
Ok(Arc::new(Self { client }))
|
||||
}
|
||||
|
||||
async fn put_if_not_exists(&self, key: &str, value: &str) -> Result<bool> {
|
||||
let res = self
|
||||
.get_client()
|
||||
.await?
|
||||
.client
|
||||
.query(PUT_IF_NOT_EXISTS, &[&key, &value])
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
@@ -276,8 +259,7 @@ impl KvBackend for PgStore {
|
||||
})
|
||||
.collect();
|
||||
let res = self
|
||||
.get_client()
|
||||
.await?
|
||||
.client
|
||||
.query(&template, ¶ms)
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
@@ -345,10 +327,8 @@ impl KvBackend for PgStore {
|
||||
in_params.iter().map(|x| x as &(dyn ToSql + Sync)).collect();
|
||||
|
||||
let query = generate_batch_upsert_query(req.kvs.len());
|
||||
|
||||
let res = self
|
||||
.get_client()
|
||||
.await?
|
||||
.client
|
||||
.query(&query, ¶ms)
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
@@ -385,10 +365,8 @@ impl KvBackend for PgStore {
|
||||
.iter()
|
||||
.map(|x| x as &(dyn ToSql + Sync))
|
||||
.collect();
|
||||
|
||||
let res = self
|
||||
.get_client()
|
||||
.await?
|
||||
.client
|
||||
.query(&query, ¶ms)
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
@@ -431,8 +409,7 @@ impl KvBackend for PgStore {
|
||||
.collect();
|
||||
|
||||
let res = self
|
||||
.get_client()
|
||||
.await?
|
||||
.client
|
||||
.query(template, ¶ms)
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
@@ -476,10 +453,8 @@ impl KvBackend for PgStore {
|
||||
.iter()
|
||||
.map(|x| x as &(dyn ToSql + Sync))
|
||||
.collect();
|
||||
|
||||
let res = self
|
||||
.get_client()
|
||||
.await?
|
||||
.client
|
||||
.query(&query, ¶ms)
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
@@ -513,8 +488,7 @@ impl KvBackend for PgStore {
|
||||
let expect = process_bytes(&req.expect, "CASExpect")?;
|
||||
|
||||
let res = self
|
||||
.get_client()
|
||||
.await?
|
||||
.client
|
||||
.query(CAS, &[&key, &value, &expect])
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
@@ -586,19 +560,10 @@ mod tests {
|
||||
return None;
|
||||
}
|
||||
|
||||
let mut cfg = Config::new();
|
||||
cfg.url = Some(endpoints);
|
||||
let pool = cfg
|
||||
.create_pool(Some(Runtime::Tokio1), NoTls)
|
||||
.context(CreatePostgresPoolSnafu)
|
||||
.unwrap();
|
||||
let client = pool.get().await.unwrap();
|
||||
client
|
||||
.execute(METADKV_CREATION, &[])
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)
|
||||
.unwrap();
|
||||
Some(PgStore { pool })
|
||||
let (client, connection) = tokio_postgres::connect(&endpoints, NoTls).await.unwrap();
|
||||
tokio::spawn(connection);
|
||||
let _ = client.execute(METADKV_CREATION, &[]).await;
|
||||
Some(PgStore { client })
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -433,8 +433,8 @@ impl DatanodeBuilder {
|
||||
) -> Result<MitoEngine> {
|
||||
if opts.storage.is_object_storage() {
|
||||
// Enable the write cache when setting object storage
|
||||
config.enable_write_cache = true;
|
||||
info!("Configured 'enable_write_cache=true' for mito engine.");
|
||||
config.enable_experimental_write_cache = true;
|
||||
info!("Configured 'enable_experimental_write_cache=true' for mito engine.");
|
||||
}
|
||||
|
||||
let mito_engine = match &opts.wal {
|
||||
|
||||
@@ -34,8 +34,6 @@ common-version.workspace = true
|
||||
common-wal.workspace = true
|
||||
dashmap.workspace = true
|
||||
datatypes.workspace = true
|
||||
deadpool.workspace = true
|
||||
deadpool-postgres.workspace = true
|
||||
derive_builder.workspace = true
|
||||
etcd-client.workspace = true
|
||||
futures.workspace = true
|
||||
|
||||
@@ -29,8 +29,6 @@ use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
use common_telemetry::error;
|
||||
use common_telemetry::info;
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
use deadpool_postgres::{Config, Runtime};
|
||||
use etcd_client::Client;
|
||||
use futures::future;
|
||||
use servers::configurator::ConfiguratorRef;
|
||||
@@ -50,9 +48,8 @@ use tonic::transport::server::{Router, TcpIncoming};
|
||||
|
||||
use crate::election::etcd::EtcdElection;
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
use crate::election::postgres::PgElection;
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
use crate::election::CANDIDATE_LEASE_SECS;
|
||||
use crate::error::InvalidArgumentsSnafu;
|
||||
use crate::error::{InitExportMetricsTaskSnafu, TomlFormatSnafu};
|
||||
use crate::metasrv::builder::MetasrvBuilder;
|
||||
use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectorRef};
|
||||
use crate::selector::lease_based::LeaseBasedSelector;
|
||||
@@ -85,14 +82,14 @@ impl MetasrvInstance {
|
||||
let httpsrv = Arc::new(
|
||||
HttpServerBuilder::new(opts.http.clone())
|
||||
.with_metrics_handler(MetricsHandler)
|
||||
.with_greptime_config_options(opts.to_toml().context(error::TomlFormatSnafu)?)
|
||||
.with_greptime_config_options(opts.to_toml().context(TomlFormatSnafu)?)
|
||||
.build(),
|
||||
);
|
||||
let metasrv = Arc::new(metasrv);
|
||||
// put metasrv into plugins for later use
|
||||
plugins.insert::<Arc<Metasrv>>(metasrv.clone());
|
||||
let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
|
||||
.context(error::InitExportMetricsTaskSnafu)?;
|
||||
.context(InitExportMetricsTaskSnafu)?;
|
||||
Ok(MetasrvInstance {
|
||||
metasrv,
|
||||
httpsrv,
|
||||
@@ -107,7 +104,7 @@ impl MetasrvInstance {
|
||||
self.metasrv.try_start().await?;
|
||||
|
||||
if let Some(t) = self.export_metrics_task.as_ref() {
|
||||
t.start(None).context(error::InitExportMetricsTaskSnafu)?
|
||||
t.start(None).context(InitExportMetricsTaskSnafu)?
|
||||
}
|
||||
|
||||
let (tx, rx) = mpsc::channel::<()>(1);
|
||||
@@ -228,20 +225,11 @@ pub async fn metasrv_builder(
|
||||
}
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
(None, BackendImpl::PostgresStore) => {
|
||||
let pool = create_postgres_pool(opts).await?;
|
||||
let kv_backend = PgStore::with_pg_pool(pool)
|
||||
let pg_client = create_postgres_client(opts).await?;
|
||||
let kv_backend = PgStore::with_pg_client(pg_client)
|
||||
.await
|
||||
.context(error::KvBackendSnafu)?;
|
||||
// Client for election should be created separately since we need a different session keep-alive idle time.
|
||||
let election_client = create_postgres_client(opts).await?;
|
||||
let election = PgElection::with_pg_client(
|
||||
opts.server_addr.clone(),
|
||||
election_client,
|
||||
opts.store_key_prefix.clone(),
|
||||
CANDIDATE_LEASE_SECS,
|
||||
)
|
||||
.await?;
|
||||
(kv_backend, Some(election))
|
||||
(kv_backend, None)
|
||||
}
|
||||
};
|
||||
|
||||
@@ -287,12 +275,9 @@ async fn create_etcd_client(opts: &MetasrvOptions) -> Result<Client> {
|
||||
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
async fn create_postgres_client(opts: &MetasrvOptions) -> Result<tokio_postgres::Client> {
|
||||
let postgres_url = opts
|
||||
.store_addrs
|
||||
.first()
|
||||
.context(error::InvalidArgumentsSnafu {
|
||||
err_msg: "empty store addrs",
|
||||
})?;
|
||||
let postgres_url = opts.store_addrs.first().context(InvalidArgumentsSnafu {
|
||||
err_msg: "empty store addrs",
|
||||
})?;
|
||||
let (client, connection) = tokio_postgres::connect(postgres_url, NoTls)
|
||||
.await
|
||||
.context(error::ConnectPostgresSnafu)?;
|
||||
@@ -304,19 +289,3 @@ async fn create_postgres_client(opts: &MetasrvOptions) -> Result<tokio_postgres:
|
||||
});
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
async fn create_postgres_pool(opts: &MetasrvOptions) -> Result<deadpool_postgres::Pool> {
|
||||
let postgres_url = opts
|
||||
.store_addrs
|
||||
.first()
|
||||
.context(error::InvalidArgumentsSnafu {
|
||||
err_msg: "empty store addrs",
|
||||
})?;
|
||||
let mut cfg = Config::new();
|
||||
cfg.url = Some(postgres_url.to_string());
|
||||
let pool = cfg
|
||||
.create_pool(Some(Runtime::Tokio1), NoTls)
|
||||
.context(error::CreatePostgresPoolSnafu)?;
|
||||
Ok(pool)
|
||||
}
|
||||
|
||||
@@ -19,9 +19,7 @@ pub mod postgres;
|
||||
use std::fmt::{self, Debug};
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::{info, warn};
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
use tokio::sync::broadcast::{self, Receiver, Sender};
|
||||
use tokio::sync::broadcast::Receiver;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::metasrv::MetasrvNodeInfo;
|
||||
@@ -77,37 +75,6 @@ impl fmt::Display for LeaderChangeMessage {
|
||||
}
|
||||
}
|
||||
|
||||
fn listen_leader_change(leader_value: String) -> Sender<LeaderChangeMessage> {
|
||||
let (tx, mut rx) = broadcast::channel(100);
|
||||
let _handle = common_runtime::spawn_global(async move {
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Ok(msg) => match msg {
|
||||
LeaderChangeMessage::Elected(key) => {
|
||||
info!(
|
||||
"[{leader_value}] is elected as leader: {:?}, lease: {}",
|
||||
String::from_utf8_lossy(key.name()),
|
||||
key.lease_id()
|
||||
);
|
||||
}
|
||||
LeaderChangeMessage::StepDown(key) => {
|
||||
warn!(
|
||||
"[{leader_value}] is stepping down: {:?}, lease: {}",
|
||||
String::from_utf8_lossy(key.name()),
|
||||
key.lease_id()
|
||||
);
|
||||
}
|
||||
},
|
||||
Err(RecvError::Lagged(_)) => {
|
||||
warn!("Log printing is too slow or leader changed too fast!");
|
||||
}
|
||||
Err(RecvError::Closed) => break,
|
||||
}
|
||||
}
|
||||
});
|
||||
tx
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait Election: Send + Sync {
|
||||
type Leader;
|
||||
|
||||
@@ -23,12 +23,13 @@ use etcd_client::{
|
||||
};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
use tokio::sync::broadcast::Receiver;
|
||||
use tokio::time::{timeout, MissedTickBehavior};
|
||||
|
||||
use crate::election::{
|
||||
listen_leader_change, Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT,
|
||||
CANDIDATE_LEASE_SECS, ELECTION_KEY, KEEP_ALIVE_INTERVAL_SECS,
|
||||
Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT, CANDIDATE_LEASE_SECS, ELECTION_KEY,
|
||||
KEEP_ALIVE_INTERVAL_SECS,
|
||||
};
|
||||
use crate::error;
|
||||
use crate::error::Result;
|
||||
@@ -87,7 +88,36 @@ impl EtcdElection {
|
||||
E: AsRef<str>,
|
||||
{
|
||||
let leader_value: String = leader_value.as_ref().into();
|
||||
let tx = listen_leader_change(leader_value.clone());
|
||||
|
||||
let leader_ident = leader_value.clone();
|
||||
let (tx, mut rx) = broadcast::channel(100);
|
||||
let _handle = common_runtime::spawn_global(async move {
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Ok(msg) => match msg {
|
||||
LeaderChangeMessage::Elected(key) => {
|
||||
info!(
|
||||
"[{leader_ident}] is elected as leader: {:?}, lease: {}",
|
||||
String::from_utf8_lossy(key.name()),
|
||||
key.lease_id()
|
||||
);
|
||||
}
|
||||
LeaderChangeMessage::StepDown(key) => {
|
||||
warn!(
|
||||
"[{leader_ident}] is stepping down: {:?}, lease: {}",
|
||||
String::from_utf8_lossy(key.name()),
|
||||
key.lease_id()
|
||||
);
|
||||
}
|
||||
},
|
||||
Err(RecvError::Lagged(_)) => {
|
||||
warn!("Log printing is too slow or leader changed too fast!");
|
||||
}
|
||||
Err(RecvError::Closed) => break,
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Arc::new(Self {
|
||||
leader_value,
|
||||
client,
|
||||
|
||||
@@ -16,32 +16,18 @@ use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS};
|
||||
use common_telemetry::{error, warn};
|
||||
use common_time::Timestamp;
|
||||
use itertools::Itertools;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::time::MissedTickBehavior;
|
||||
use tokio_postgres::Client;
|
||||
|
||||
use crate::election::{
|
||||
listen_leader_change, Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT, ELECTION_KEY,
|
||||
};
|
||||
use crate::election::{Election, LeaderChangeMessage, CANDIDATES_ROOT, ELECTION_KEY};
|
||||
use crate::error::{
|
||||
DeserializeFromJsonSnafu, NoLeaderSnafu, PostgresExecutionSnafu, Result, SerializeToJsonSnafu,
|
||||
UnexpectedSnafu,
|
||||
DeserializeFromJsonSnafu, PostgresExecutionSnafu, Result, SerializeToJsonSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
|
||||
|
||||
// TODO(CookiePie): The lock id should be configurable.
|
||||
const CAMPAIGN: &str = "SELECT pg_try_advisory_lock(28319)";
|
||||
const STEP_DOWN: &str = "SELECT pg_advisory_unlock(28319)";
|
||||
const SET_IDLE_SESSION_TIMEOUT: &str = "SET idle_in_transaction_session_timeout = $1";
|
||||
// Currently the session timeout is longer than the leader lease time, so the leader lease may expire while the session is still alive.
|
||||
// Either the leader reconnects and step down or the session expires and the lock is released.
|
||||
const IDLE_SESSION_TIMEOUT: &str = "10s";
|
||||
|
||||
// Separator between value and expire time.
|
||||
const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#;
|
||||
|
||||
@@ -95,33 +81,8 @@ fn parse_value_and_expire_time(value: &str) -> Result<(String, Timestamp)> {
|
||||
Ok((value.to_string(), expire_time))
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
struct PgLeaderKey {
|
||||
name: Vec<u8>,
|
||||
key: Vec<u8>,
|
||||
rev: i64,
|
||||
lease: i64,
|
||||
}
|
||||
|
||||
impl LeaderKey for PgLeaderKey {
|
||||
fn name(&self) -> &[u8] {
|
||||
&self.name
|
||||
}
|
||||
|
||||
fn key(&self) -> &[u8] {
|
||||
&self.key
|
||||
}
|
||||
|
||||
fn revision(&self) -> i64 {
|
||||
self.rev
|
||||
}
|
||||
|
||||
fn lease_id(&self) -> i64 {
|
||||
self.lease
|
||||
}
|
||||
}
|
||||
|
||||
/// PostgreSql implementation of Election.
|
||||
/// TODO(CookiePie): Currently only support candidate registration. Add election logic.
|
||||
pub struct PgElection {
|
||||
leader_value: String,
|
||||
client: Client,
|
||||
@@ -139,13 +100,7 @@ impl PgElection {
|
||||
store_key_prefix: String,
|
||||
candidate_lease_ttl_secs: u64,
|
||||
) -> Result<ElectionRef> {
|
||||
// Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead advisory lock.
|
||||
client
|
||||
.execute(SET_IDLE_SESSION_TIMEOUT, &[&IDLE_SESSION_TIMEOUT])
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
|
||||
let tx = listen_leader_change(leader_value.clone());
|
||||
let (tx, _) = broadcast::channel(100);
|
||||
Ok(Arc::new(Self {
|
||||
leader_value,
|
||||
client,
|
||||
@@ -157,7 +112,7 @@ impl PgElection {
|
||||
}))
|
||||
}
|
||||
|
||||
fn election_key(&self) -> String {
|
||||
fn _election_key(&self) -> String {
|
||||
format!("{}{}", self.store_key_prefix, ELECTION_KEY)
|
||||
}
|
||||
|
||||
@@ -191,14 +146,11 @@ impl Election for PgElection {
|
||||
serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu {
|
||||
input: format!("{node_info:?}"),
|
||||
})?;
|
||||
let res = self
|
||||
.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl_secs)
|
||||
.await?;
|
||||
let res = self.put_value_with_lease(&key, &node_info).await?;
|
||||
// May registered before, just update the lease.
|
||||
if !res {
|
||||
self.delete_value(&key).await?;
|
||||
self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl_secs)
|
||||
.await?;
|
||||
self.put_value_with_lease(&key, &node_info).await?;
|
||||
}
|
||||
|
||||
// Check if the current lease has expired and renew the lease.
|
||||
@@ -245,65 +197,12 @@ impl Election for PgElection {
|
||||
Ok(valid_candidates)
|
||||
}
|
||||
|
||||
/// Attempts to acquire leadership by executing a campaign. This function continuously checks
|
||||
/// if the current instance can become the leader by acquiring an advisory lock in the PostgreSQL database.
|
||||
///
|
||||
/// The function operates in a loop, where it:
|
||||
///
|
||||
/// 1. Waits for a predefined interval before attempting to acquire the lock again.
|
||||
/// 2. Executes the `CAMPAIGN` SQL query to try to acquire the advisory lock.
|
||||
/// 3. Checks the result of the query:
|
||||
/// - If the lock is successfully acquired (result is true), it calls the `leader_action` method
|
||||
/// to perform actions as the leader.
|
||||
/// - If the lock is not acquired (result is false), it calls the `follower_action` method
|
||||
/// to perform actions as a follower.
|
||||
async fn campaign(&self) -> Result<()> {
|
||||
let mut keep_alive_interval =
|
||||
tokio::time::interval(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS));
|
||||
keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
|
||||
|
||||
loop {
|
||||
let res = self
|
||||
.client
|
||||
.query(CAMPAIGN, &[])
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
if let Some(row) = res.first() {
|
||||
match row.try_get(0) {
|
||||
Ok(true) => self.leader_action().await?,
|
||||
Ok(false) => self.follower_action().await?,
|
||||
Err(_) => {
|
||||
return UnexpectedSnafu {
|
||||
violated: "Failed to get the result of acquiring advisory lock"
|
||||
.to_string(),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return UnexpectedSnafu {
|
||||
violated: "Failed to get the result of acquiring advisory lock".to_string(),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
let _ = keep_alive_interval.tick().await;
|
||||
}
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn leader(&self) -> Result<Self::Leader> {
|
||||
if self.is_leader.load(Ordering::Relaxed) {
|
||||
Ok(self.leader_value.as_bytes().into())
|
||||
} else {
|
||||
let key = self.election_key();
|
||||
if let Some((leader, expire_time, current, _)) =
|
||||
self.get_value_with_lease(&key, false).await?
|
||||
{
|
||||
ensure!(expire_time > current, NoLeaderSnafu);
|
||||
Ok(leader.as_bytes().into())
|
||||
} else {
|
||||
NoLeaderSnafu.fail()
|
||||
}
|
||||
}
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn resign(&self) -> Result<()> {
|
||||
@@ -416,17 +315,17 @@ impl PgElection {
|
||||
}
|
||||
|
||||
/// Returns `true` if the insertion is successful
|
||||
async fn put_value_with_lease(
|
||||
&self,
|
||||
key: &str,
|
||||
value: &str,
|
||||
lease_ttl_secs: u64,
|
||||
) -> Result<bool> {
|
||||
async fn put_value_with_lease(&self, key: &str, value: &str) -> Result<bool> {
|
||||
let res = self
|
||||
.client
|
||||
.query(
|
||||
PUT_IF_NOT_EXISTS_WITH_EXPIRE_TIME,
|
||||
&[&key, &value, &LEASE_SEP, &(lease_ttl_secs as f64)],
|
||||
&[
|
||||
&key,
|
||||
&value,
|
||||
&LEASE_SEP,
|
||||
&(self.candidate_lease_ttl_secs as f64),
|
||||
],
|
||||
)
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
@@ -444,177 +343,6 @@ impl PgElection {
|
||||
|
||||
Ok(res.len() == 1)
|
||||
}
|
||||
|
||||
/// Handles the actions of a leader in the election process.
|
||||
///
|
||||
/// This function performs the following checks and actions:
|
||||
///
|
||||
/// - **Case 1**: If the current instance believes it is the leader from the previous term,
|
||||
/// it attempts to renew the lease. It checks if the lease is still valid and either renews it
|
||||
/// or steps down if it has expired.
|
||||
///
|
||||
/// - **Case 1.1**: If the instance is still the leader and the lease is valid, it renews the lease
|
||||
/// by updating the value associated with the election key.
|
||||
/// - **Case 1.2**: If the instance is still the leader but the lease has expired, it logs a warning
|
||||
/// and steps down, initiating a new campaign for leadership.
|
||||
/// - **Case 1.3**: If the instance is not the leader (which is a rare scenario), it logs a warning
|
||||
/// indicating that it still holds the lock and steps down to re-initiate the campaign. This may
|
||||
/// happen if the leader has failed to renew the lease and the session has expired, and recovery
|
||||
/// after a period of time during which other leaders have been elected and stepped down.
|
||||
/// - **Case 1.4**: If no lease information is found, it also steps down and re-initiates the campaign.
|
||||
///
|
||||
/// - **Case 2**: If the current instance is not leader previously, it calls the
|
||||
/// `elected` method as a newly elected leader.
|
||||
async fn leader_action(&self) -> Result<()> {
|
||||
let key = self.election_key();
|
||||
// Case 1
|
||||
if self.is_leader() {
|
||||
match self.get_value_with_lease(&key, true).await? {
|
||||
Some((prev_leader, expire_time, current, prev)) => {
|
||||
match (prev_leader == self.leader_value, expire_time > current) {
|
||||
// Case 1.1
|
||||
(true, true) => {
|
||||
// Safety: prev is Some since we are using `get_value_with_lease` with `true`.
|
||||
let prev = prev.unwrap();
|
||||
self.update_value_with_lease(&key, &prev, &self.leader_value)
|
||||
.await?;
|
||||
}
|
||||
// Case 1.2
|
||||
(true, false) => {
|
||||
warn!("Leader lease expired, now stepping down.");
|
||||
self.step_down().await?;
|
||||
}
|
||||
// Case 1.3
|
||||
(false, _) => {
|
||||
warn!("Leader lease not found, but still hold the lock. Now stepping down.");
|
||||
self.step_down().await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Case 1.4
|
||||
None => {
|
||||
warn!("Leader lease not found, but still hold the lock. Now stepping down.");
|
||||
self.step_down().await?;
|
||||
}
|
||||
}
|
||||
// Case 2
|
||||
} else {
|
||||
self.elected().await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handles the actions of a follower in the election process.
|
||||
///
|
||||
/// This function performs the following checks and actions:
|
||||
///
|
||||
/// - **Case 1**: If the current instance believes it is the leader from the previous term,
|
||||
/// it steps down without deleting the key.
|
||||
/// - **Case 2**: If the current instance is not the leader but the lease has expired, it raises an error
|
||||
/// to re-initiate the campaign. If the leader failed to renew the lease, its session will expire and the lock
|
||||
/// will be released.
|
||||
/// - **Case 3**: If all checks pass, the function returns without performing any actions.
|
||||
async fn follower_action(&self) -> Result<()> {
|
||||
let key = self.election_key();
|
||||
// Case 1
|
||||
if self.is_leader() {
|
||||
self.step_down_without_lock().await?;
|
||||
}
|
||||
let (_, expire_time, current, _) = self
|
||||
.get_value_with_lease(&key, false)
|
||||
.await?
|
||||
.context(NoLeaderSnafu)?;
|
||||
// Case 2
|
||||
ensure!(expire_time > current, NoLeaderSnafu);
|
||||
// Case 3
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Step down the leader. The leader should delete the key and notify the leader watcher.
|
||||
///
|
||||
/// __DO NOT__ check if the deletion is successful, since the key may be deleted by others elected.
|
||||
///
|
||||
/// ## Caution:
|
||||
/// Should only step down while holding the advisory lock.
|
||||
async fn step_down(&self) -> Result<()> {
|
||||
let key = self.election_key();
|
||||
let leader_key = PgLeaderKey {
|
||||
name: self.leader_value.clone().into_bytes(),
|
||||
key: key.clone().into_bytes(),
|
||||
..Default::default()
|
||||
};
|
||||
if self
|
||||
.is_leader
|
||||
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
|
||||
.is_ok()
|
||||
{
|
||||
self.delete_value(&key).await?;
|
||||
self.client
|
||||
.query(STEP_DOWN, &[])
|
||||
.await
|
||||
.context(PostgresExecutionSnafu)?;
|
||||
if let Err(e) = self
|
||||
.leader_watcher
|
||||
.send(LeaderChangeMessage::StepDown(Arc::new(leader_key)))
|
||||
{
|
||||
error!(e; "Failed to send leader change message");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Still consider itself as the leader locally but failed to acquire the lock. Step down without deleting the key.
|
||||
async fn step_down_without_lock(&self) -> Result<()> {
|
||||
let key = self.election_key().into_bytes();
|
||||
let leader_key = PgLeaderKey {
|
||||
name: self.leader_value.clone().into_bytes(),
|
||||
key: key.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
if self
|
||||
.is_leader
|
||||
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
|
||||
.is_ok()
|
||||
{
|
||||
if let Err(e) = self
|
||||
.leader_watcher
|
||||
.send(LeaderChangeMessage::StepDown(Arc::new(leader_key)))
|
||||
{
|
||||
error!(e; "Failed to send leader change message");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Elected as leader. The leader should put the key and notify the leader watcher.
|
||||
/// Caution: Should only elected while holding the advisory lock.
|
||||
async fn elected(&self) -> Result<()> {
|
||||
let key = self.election_key();
|
||||
let leader_key = PgLeaderKey {
|
||||
name: self.leader_value.clone().into_bytes(),
|
||||
key: key.clone().into_bytes(),
|
||||
..Default::default()
|
||||
};
|
||||
self.delete_value(&key).await?;
|
||||
self.put_value_with_lease(&key, &self.leader_value, META_LEASE_SECS)
|
||||
.await?;
|
||||
|
||||
if self
|
||||
.is_leader
|
||||
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
|
||||
.is_ok()
|
||||
{
|
||||
self.leader_infancy.store(true, Ordering::Relaxed);
|
||||
|
||||
if let Err(e) = self
|
||||
.leader_watcher
|
||||
.send(LeaderChangeMessage::Elected(Arc::new(leader_key)))
|
||||
{
|
||||
error!(e; "Failed to send leader change message");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -662,7 +390,7 @@ mod tests {
|
||||
};
|
||||
|
||||
let res = pg_election
|
||||
.put_value_with_lease(&key, &value, 10)
|
||||
.put_value_with_lease(&key, &value)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(res);
|
||||
@@ -690,7 +418,7 @@ mod tests {
|
||||
let key = format!("test_key_{}", i);
|
||||
let value = format!("test_value_{}", i);
|
||||
pg_election
|
||||
.put_value_with_lease(&key, &value, 10)
|
||||
.put_value_with_lease(&key, &value)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
@@ -750,7 +478,7 @@ mod tests {
|
||||
handles.push(handle);
|
||||
}
|
||||
// Wait for candidates to registrate themselves and renew their leases at least once.
|
||||
tokio::time::sleep(Duration::from_secs(3)).await;
|
||||
tokio::time::sleep(Duration::from_secs(6)).await;
|
||||
|
||||
let client = create_postgres_client().await.unwrap();
|
||||
|
||||
@@ -788,402 +516,4 @@ mod tests {
|
||||
assert!(res);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_elected_and_step_down() {
|
||||
let leader_value = "test_leader".to_string();
|
||||
let candidate_lease_ttl_secs = 5;
|
||||
let client = create_postgres_client().await.unwrap();
|
||||
|
||||
let (tx, mut rx) = broadcast::channel(100);
|
||||
let leader_pg_election = PgElection {
|
||||
leader_value: leader_value.clone(),
|
||||
client,
|
||||
is_leader: AtomicBool::new(false),
|
||||
leader_infancy: AtomicBool::new(true),
|
||||
leader_watcher: tx,
|
||||
store_key_prefix: "test_prefix".to_string(),
|
||||
candidate_lease_ttl_secs,
|
||||
};
|
||||
|
||||
leader_pg_election.elected().await.unwrap();
|
||||
let (leader, expire_time, current, _) = leader_pg_election
|
||||
.get_value_with_lease(&leader_pg_election.election_key(), false)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert!(leader == leader_value);
|
||||
assert!(expire_time > current);
|
||||
assert!(leader_pg_election.is_leader());
|
||||
|
||||
match rx.recv().await {
|
||||
Ok(LeaderChangeMessage::Elected(key)) => {
|
||||
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
|
||||
assert_eq!(
|
||||
String::from_utf8_lossy(key.key()),
|
||||
leader_pg_election.election_key()
|
||||
);
|
||||
assert_eq!(key.lease_id(), i64::default());
|
||||
assert_eq!(key.revision(), i64::default());
|
||||
}
|
||||
_ => panic!("Expected LeaderChangeMessage::Elected"),
|
||||
}
|
||||
|
||||
leader_pg_election.step_down_without_lock().await.unwrap();
|
||||
let (leader, _, _, _) = leader_pg_election
|
||||
.get_value_with_lease(&leader_pg_election.election_key(), false)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert!(leader == leader_value);
|
||||
assert!(!leader_pg_election.is_leader());
|
||||
|
||||
match rx.recv().await {
|
||||
Ok(LeaderChangeMessage::StepDown(key)) => {
|
||||
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
|
||||
assert_eq!(
|
||||
String::from_utf8_lossy(key.key()),
|
||||
leader_pg_election.election_key()
|
||||
);
|
||||
assert_eq!(key.lease_id(), i64::default());
|
||||
assert_eq!(key.revision(), i64::default());
|
||||
}
|
||||
_ => panic!("Expected LeaderChangeMessage::StepDown"),
|
||||
}
|
||||
|
||||
leader_pg_election.elected().await.unwrap();
|
||||
let (leader, expire_time, current, _) = leader_pg_election
|
||||
.get_value_with_lease(&leader_pg_election.election_key(), false)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert!(leader == leader_value);
|
||||
assert!(expire_time > current);
|
||||
assert!(leader_pg_election.is_leader());
|
||||
|
||||
match rx.recv().await {
|
||||
Ok(LeaderChangeMessage::Elected(key)) => {
|
||||
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
|
||||
assert_eq!(
|
||||
String::from_utf8_lossy(key.key()),
|
||||
leader_pg_election.election_key()
|
||||
);
|
||||
assert_eq!(key.lease_id(), i64::default());
|
||||
assert_eq!(key.revision(), i64::default());
|
||||
}
|
||||
_ => panic!("Expected LeaderChangeMessage::Elected"),
|
||||
}
|
||||
|
||||
leader_pg_election.step_down().await.unwrap();
|
||||
let res = leader_pg_election
|
||||
.get_value_with_lease(&leader_pg_election.election_key(), false)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(res.is_none());
|
||||
assert!(!leader_pg_election.is_leader());
|
||||
|
||||
match rx.recv().await {
|
||||
Ok(LeaderChangeMessage::StepDown(key)) => {
|
||||
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
|
||||
assert_eq!(
|
||||
String::from_utf8_lossy(key.key()),
|
||||
leader_pg_election.election_key()
|
||||
);
|
||||
assert_eq!(key.lease_id(), i64::default());
|
||||
assert_eq!(key.revision(), i64::default());
|
||||
}
|
||||
_ => panic!("Expected LeaderChangeMessage::StepDown"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_leader_action() {
|
||||
let leader_value = "test_leader".to_string();
|
||||
let candidate_lease_ttl_secs = 5;
|
||||
let client = create_postgres_client().await.unwrap();
|
||||
|
||||
let (tx, mut rx) = broadcast::channel(100);
|
||||
let leader_pg_election = PgElection {
|
||||
leader_value: leader_value.clone(),
|
||||
client,
|
||||
is_leader: AtomicBool::new(false),
|
||||
leader_infancy: AtomicBool::new(true),
|
||||
leader_watcher: tx,
|
||||
store_key_prefix: "test_prefix".to_string(),
|
||||
candidate_lease_ttl_secs,
|
||||
};
|
||||
|
||||
// Step 1: No leader exists, campaign and elected.
|
||||
let res = leader_pg_election
|
||||
.client
|
||||
.query(CAMPAIGN, &[])
|
||||
.await
|
||||
.unwrap();
|
||||
let res: bool = res[0].get(0);
|
||||
assert!(res);
|
||||
leader_pg_election.leader_action().await.unwrap();
|
||||
let (leader, expire_time, current, _) = leader_pg_election
|
||||
.get_value_with_lease(&leader_pg_election.election_key(), false)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert!(leader == leader_value);
|
||||
assert!(expire_time > current);
|
||||
assert!(leader_pg_election.is_leader());
|
||||
|
||||
match rx.recv().await {
|
||||
Ok(LeaderChangeMessage::Elected(key)) => {
|
||||
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
|
||||
assert_eq!(
|
||||
String::from_utf8_lossy(key.key()),
|
||||
leader_pg_election.election_key()
|
||||
);
|
||||
assert_eq!(key.lease_id(), i64::default());
|
||||
assert_eq!(key.revision(), i64::default());
|
||||
}
|
||||
_ => panic!("Expected LeaderChangeMessage::Elected"),
|
||||
}
|
||||
|
||||
// Step 2: As a leader, renew the lease.
|
||||
let res = leader_pg_election
|
||||
.client
|
||||
.query(CAMPAIGN, &[])
|
||||
.await
|
||||
.unwrap();
|
||||
let res: bool = res[0].get(0);
|
||||
assert!(res);
|
||||
leader_pg_election.leader_action().await.unwrap();
|
||||
let (leader, new_expire_time, current, _) = leader_pg_election
|
||||
.get_value_with_lease(&leader_pg_election.election_key(), false)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert!(leader == leader_value);
|
||||
assert!(new_expire_time > current && new_expire_time > expire_time);
|
||||
assert!(leader_pg_election.is_leader());
|
||||
|
||||
// Step 3: Something wrong, the leader lease expired.
|
||||
tokio::time::sleep(Duration::from_secs(META_LEASE_SECS)).await;
|
||||
|
||||
let res = leader_pg_election
|
||||
.client
|
||||
.query(CAMPAIGN, &[])
|
||||
.await
|
||||
.unwrap();
|
||||
let res: bool = res[0].get(0);
|
||||
assert!(res);
|
||||
leader_pg_election.leader_action().await.unwrap();
|
||||
let res = leader_pg_election
|
||||
.get_value_with_lease(&leader_pg_election.election_key(), false)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(res.is_none());
|
||||
|
||||
match rx.recv().await {
|
||||
Ok(LeaderChangeMessage::StepDown(key)) => {
|
||||
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
|
||||
assert_eq!(
|
||||
String::from_utf8_lossy(key.key()),
|
||||
leader_pg_election.election_key()
|
||||
);
|
||||
assert_eq!(key.lease_id(), i64::default());
|
||||
assert_eq!(key.revision(), i64::default());
|
||||
}
|
||||
_ => panic!("Expected LeaderChangeMessage::StepDown"),
|
||||
}
|
||||
|
||||
// Step 4: Re-campaign and elected.
|
||||
let res = leader_pg_election
|
||||
.client
|
||||
.query(CAMPAIGN, &[])
|
||||
.await
|
||||
.unwrap();
|
||||
let res: bool = res[0].get(0);
|
||||
assert!(res);
|
||||
leader_pg_election.leader_action().await.unwrap();
|
||||
let (leader, expire_time, current, _) = leader_pg_election
|
||||
.get_value_with_lease(&leader_pg_election.election_key(), false)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert!(leader == leader_value);
|
||||
assert!(expire_time > current);
|
||||
assert!(leader_pg_election.is_leader());
|
||||
|
||||
match rx.recv().await {
|
||||
Ok(LeaderChangeMessage::Elected(key)) => {
|
||||
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
|
||||
assert_eq!(
|
||||
String::from_utf8_lossy(key.key()),
|
||||
leader_pg_election.election_key()
|
||||
);
|
||||
assert_eq!(key.lease_id(), i64::default());
|
||||
assert_eq!(key.revision(), i64::default());
|
||||
}
|
||||
_ => panic!("Expected LeaderChangeMessage::Elected"),
|
||||
}
|
||||
|
||||
// Step 5: Something wrong, the leader key is deleted by other followers.
|
||||
leader_pg_election
|
||||
.delete_value(&leader_pg_election.election_key())
|
||||
.await
|
||||
.unwrap();
|
||||
leader_pg_election.leader_action().await.unwrap();
|
||||
let res = leader_pg_election
|
||||
.get_value_with_lease(&leader_pg_election.election_key(), false)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(res.is_none());
|
||||
assert!(!leader_pg_election.is_leader());
|
||||
|
||||
match rx.recv().await {
|
||||
Ok(LeaderChangeMessage::StepDown(key)) => {
|
||||
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
|
||||
assert_eq!(
|
||||
String::from_utf8_lossy(key.key()),
|
||||
leader_pg_election.election_key()
|
||||
);
|
||||
assert_eq!(key.lease_id(), i64::default());
|
||||
assert_eq!(key.revision(), i64::default());
|
||||
}
|
||||
_ => panic!("Expected LeaderChangeMessage::StepDown"),
|
||||
}
|
||||
|
||||
// Step 6: Re-campaign and elected.
|
||||
let res = leader_pg_election
|
||||
.client
|
||||
.query(CAMPAIGN, &[])
|
||||
.await
|
||||
.unwrap();
|
||||
let res: bool = res[0].get(0);
|
||||
assert!(res);
|
||||
leader_pg_election.leader_action().await.unwrap();
|
||||
let (leader, expire_time, current, _) = leader_pg_election
|
||||
.get_value_with_lease(&leader_pg_election.election_key(), false)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert!(leader == leader_value);
|
||||
assert!(expire_time > current);
|
||||
assert!(leader_pg_election.is_leader());
|
||||
|
||||
match rx.recv().await {
|
||||
Ok(LeaderChangeMessage::Elected(key)) => {
|
||||
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
|
||||
assert_eq!(
|
||||
String::from_utf8_lossy(key.key()),
|
||||
leader_pg_election.election_key()
|
||||
);
|
||||
assert_eq!(key.lease_id(), i64::default());
|
||||
assert_eq!(key.revision(), i64::default());
|
||||
}
|
||||
_ => panic!("Expected LeaderChangeMessage::Elected"),
|
||||
}
|
||||
|
||||
// Step 7: Something wrong, the leader key changed by others.
|
||||
let res = leader_pg_election
|
||||
.client
|
||||
.query(CAMPAIGN, &[])
|
||||
.await
|
||||
.unwrap();
|
||||
let res: bool = res[0].get(0);
|
||||
assert!(res);
|
||||
leader_pg_election
|
||||
.delete_value(&leader_pg_election.election_key())
|
||||
.await
|
||||
.unwrap();
|
||||
leader_pg_election
|
||||
.put_value_with_lease(&leader_pg_election.election_key(), "test", 10)
|
||||
.await
|
||||
.unwrap();
|
||||
leader_pg_election.leader_action().await.unwrap();
|
||||
let res = leader_pg_election
|
||||
.get_value_with_lease(&leader_pg_election.election_key(), false)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(res.is_none());
|
||||
assert!(!leader_pg_election.is_leader());
|
||||
|
||||
match rx.recv().await {
|
||||
Ok(LeaderChangeMessage::StepDown(key)) => {
|
||||
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
|
||||
assert_eq!(
|
||||
String::from_utf8_lossy(key.key()),
|
||||
leader_pg_election.election_key()
|
||||
);
|
||||
assert_eq!(key.lease_id(), i64::default());
|
||||
assert_eq!(key.revision(), i64::default());
|
||||
}
|
||||
_ => panic!("Expected LeaderChangeMessage::StepDown"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_follower_action() {
|
||||
let candidate_lease_ttl_secs = 5;
|
||||
|
||||
let follower_client = create_postgres_client().await.unwrap();
|
||||
let (tx, mut rx) = broadcast::channel(100);
|
||||
let follower_pg_election = PgElection {
|
||||
leader_value: "test_follower".to_string(),
|
||||
client: follower_client,
|
||||
is_leader: AtomicBool::new(false),
|
||||
leader_infancy: AtomicBool::new(true),
|
||||
leader_watcher: tx,
|
||||
store_key_prefix: "test_prefix".to_string(),
|
||||
candidate_lease_ttl_secs,
|
||||
};
|
||||
|
||||
let leader_client = create_postgres_client().await.unwrap();
|
||||
let (tx, _) = broadcast::channel(100);
|
||||
let leader_pg_election = PgElection {
|
||||
leader_value: "test_leader".to_string(),
|
||||
client: leader_client,
|
||||
is_leader: AtomicBool::new(false),
|
||||
leader_infancy: AtomicBool::new(true),
|
||||
leader_watcher: tx,
|
||||
store_key_prefix: "test_prefix".to_string(),
|
||||
candidate_lease_ttl_secs,
|
||||
};
|
||||
|
||||
leader_pg_election
|
||||
.client
|
||||
.query(CAMPAIGN, &[])
|
||||
.await
|
||||
.unwrap();
|
||||
leader_pg_election.elected().await.unwrap();
|
||||
|
||||
// Step 1: As a follower, the leader exists and the lease is not expired.
|
||||
follower_pg_election.follower_action().await.unwrap();
|
||||
|
||||
// Step 2: As a follower, the leader exists but the lease expired.
|
||||
tokio::time::sleep(Duration::from_secs(META_LEASE_SECS)).await;
|
||||
assert!(follower_pg_election.follower_action().await.is_err());
|
||||
|
||||
// Step 3: As a follower, the leader does not exist.
|
||||
leader_pg_election
|
||||
.delete_value(&leader_pg_election.election_key())
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(follower_pg_election.follower_action().await.is_err());
|
||||
|
||||
// Step 4: Follower thinks it's the leader but failed to acquire the lock.
|
||||
follower_pg_election
|
||||
.is_leader
|
||||
.store(true, Ordering::Relaxed);
|
||||
assert!(follower_pg_election.follower_action().await.is_err());
|
||||
|
||||
match rx.recv().await {
|
||||
Ok(LeaderChangeMessage::StepDown(key)) => {
|
||||
assert_eq!(String::from_utf8_lossy(key.name()), "test_follower");
|
||||
assert_eq!(
|
||||
String::from_utf8_lossy(key.key()),
|
||||
follower_pg_election.election_key()
|
||||
);
|
||||
assert_eq!(key.lease_id(), i64::default());
|
||||
assert_eq!(key.revision(), i64::default());
|
||||
}
|
||||
_ => panic!("Expected LeaderChangeMessage::StepDown"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -704,7 +704,7 @@ pub enum Error {
|
||||
},
|
||||
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
#[snafu(display("Failed to connect to Postgres"))]
|
||||
#[snafu(display("Failed to connect to PostgresSQL"))]
|
||||
ConnectPostgres {
|
||||
#[snafu(source)]
|
||||
error: tokio_postgres::Error,
|
||||
@@ -712,23 +712,6 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
#[snafu(display("Failed to create connection pool for Postgres"))]
|
||||
CreatePostgresPool {
|
||||
#[snafu(source)]
|
||||
error: deadpool_postgres::CreatePoolError,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
#[snafu(display("Failed to get connection from Postgres pool: {}", reason))]
|
||||
GetPostgresConnection {
|
||||
reason: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Handler not found: {}", name))]
|
||||
HandlerNotFound {
|
||||
name: String,
|
||||
@@ -860,10 +843,9 @@ impl ErrorExt for Error {
|
||||
Error::Other { source, .. } => source.status_code(),
|
||||
Error::LookupPeer { source, .. } => source.status_code(),
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
Error::CreatePostgresPool { .. }
|
||||
| Error::GetPostgresConnection { .. }
|
||||
| Error::PostgresExecution { .. }
|
||||
| Error::ConnectPostgres { .. } => StatusCode::Internal,
|
||||
Error::ConnectPostgres { .. } => StatusCode::Internal,
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
Error::PostgresExecution { .. } => StatusCode::Internal,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -93,15 +93,15 @@ pub struct MitoConfig {
|
||||
pub page_cache_size: ReadableSize,
|
||||
/// Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.
|
||||
pub selector_result_cache_size: ReadableSize,
|
||||
/// Whether to enable the write cache.
|
||||
pub enable_write_cache: bool,
|
||||
/// Whether to enable the experimental write cache.
|
||||
pub enable_experimental_write_cache: bool,
|
||||
/// File system path for write cache dir's root, defaults to `{data_home}`.
|
||||
pub write_cache_path: String,
|
||||
pub experimental_write_cache_path: String,
|
||||
/// Capacity for write cache.
|
||||
pub write_cache_size: ReadableSize,
|
||||
pub experimental_write_cache_size: ReadableSize,
|
||||
/// TTL for write cache.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub write_cache_ttl: Option<Duration>,
|
||||
pub experimental_write_cache_ttl: Option<Duration>,
|
||||
|
||||
// Other configs:
|
||||
/// Buffer size for SST writing.
|
||||
@@ -147,10 +147,10 @@ impl Default for MitoConfig {
|
||||
vector_cache_size: ReadableSize::mb(512),
|
||||
page_cache_size: ReadableSize::mb(512),
|
||||
selector_result_cache_size: ReadableSize::mb(512),
|
||||
enable_write_cache: false,
|
||||
write_cache_path: String::new(),
|
||||
write_cache_size: ReadableSize::gb(5),
|
||||
write_cache_ttl: None,
|
||||
enable_experimental_write_cache: false,
|
||||
experimental_write_cache_path: String::new(),
|
||||
experimental_write_cache_size: ReadableSize::gb(5),
|
||||
experimental_write_cache_ttl: None,
|
||||
sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
|
||||
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
|
||||
allow_stale_entries: false,
|
||||
@@ -234,8 +234,8 @@ impl MitoConfig {
|
||||
}
|
||||
|
||||
// Sets write cache path if it is empty.
|
||||
if self.write_cache_path.trim().is_empty() {
|
||||
self.write_cache_path = data_home.to_string();
|
||||
if self.experimental_write_cache_path.trim().is_empty() {
|
||||
self.experimental_write_cache_path = data_home.to_string();
|
||||
}
|
||||
|
||||
self.index.sanitize(data_home, &self.inverted_index)?;
|
||||
@@ -268,7 +268,7 @@ impl MitoConfig {
|
||||
self.selector_result_cache_size = mem_cache_size;
|
||||
}
|
||||
|
||||
/// Enable write cache.
|
||||
/// Enable experimental write cache.
|
||||
#[cfg(test)]
|
||||
pub fn enable_write_cache(
|
||||
mut self,
|
||||
@@ -276,10 +276,10 @@ impl MitoConfig {
|
||||
size: ReadableSize,
|
||||
ttl: Option<Duration>,
|
||||
) -> Self {
|
||||
self.enable_write_cache = true;
|
||||
self.write_cache_path = path;
|
||||
self.write_cache_size = size;
|
||||
self.write_cache_ttl = ttl;
|
||||
self.enable_experimental_write_cache = true;
|
||||
self.experimental_write_cache_path = path;
|
||||
self.experimental_write_cache_size = size;
|
||||
self.experimental_write_cache_ttl = ttl;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
@@ -140,7 +140,7 @@ async fn test_edit_region_fill_cache() {
|
||||
.create_engine_with(
|
||||
MitoConfig {
|
||||
// Write cache must be enabled to download the ingested SST file.
|
||||
enable_write_cache: true,
|
||||
enable_experimental_write_cache: true,
|
||||
..Default::default()
|
||||
},
|
||||
None,
|
||||
|
||||
@@ -365,20 +365,23 @@ async fn write_cache_from_config(
|
||||
puffin_manager_factory: PuffinManagerFactory,
|
||||
intermediate_manager: IntermediateManager,
|
||||
) -> Result<Option<WriteCacheRef>> {
|
||||
if !config.enable_write_cache {
|
||||
if !config.enable_experimental_write_cache {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
tokio::fs::create_dir_all(Path::new(&config.write_cache_path))
|
||||
// TODO(yingwen): Remove this and document the config once the write cache is ready.
|
||||
warn!("Write cache is an experimental feature");
|
||||
|
||||
tokio::fs::create_dir_all(Path::new(&config.experimental_write_cache_path))
|
||||
.await
|
||||
.context(CreateDirSnafu {
|
||||
dir: &config.write_cache_path,
|
||||
dir: &config.experimental_write_cache_path,
|
||||
})?;
|
||||
|
||||
let cache = WriteCache::new_fs(
|
||||
&config.write_cache_path,
|
||||
config.write_cache_size,
|
||||
config.write_cache_ttl,
|
||||
&config.experimental_write_cache_path,
|
||||
config.experimental_write_cache_size,
|
||||
config.experimental_write_cache_ttl,
|
||||
puffin_manager_factory,
|
||||
intermediate_manager,
|
||||
)
|
||||
|
||||
@@ -928,9 +928,9 @@ worker_request_batch_size = 64
|
||||
manifest_checkpoint_distance = 10
|
||||
compress_manifest = false
|
||||
auto_flush_interval = "30m"
|
||||
enable_write_cache = false
|
||||
write_cache_path = ""
|
||||
write_cache_size = "5GiB"
|
||||
enable_experimental_write_cache = false
|
||||
experimental_write_cache_path = ""
|
||||
experimental_write_cache_size = "5GiB"
|
||||
sst_write_buffer_size = "8MiB"
|
||||
parallel_scan_channel_size = 32
|
||||
allow_stale_entries = false
|
||||
|
||||
Reference in New Issue
Block a user