mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
Add safekeeper information exchange through etcd.
Safekeers now publish to and pull from etcd per-timeline data. Immediate goal is WAL truncation, for which every safekeeper must know remote_consistent_lsn; the next would be callmemaybe replacement. Adds corresponding '--broker' argument to safekeeper and ability to run etcd in tests. Adds test checking remote_consistent_lsn is indeed communicated.
This commit is contained in:
252
Cargo.lock
generated
252
Cargo.lock
generated
@@ -75,6 +75,27 @@ dependencies = [
|
|||||||
"zstd-safe",
|
"zstd-safe",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "async-stream"
|
||||||
|
version = "0.3.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "171374e7e3b2504e0e5236e3b59260560f9fe94bfe9ac39ba5e4e929c5590625"
|
||||||
|
dependencies = [
|
||||||
|
"async-stream-impl",
|
||||||
|
"futures-core",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "async-stream-impl"
|
||||||
|
version = "0.3.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "648ed8c8d2ce5409ccd57453d9d1b214b342a0d69376a6feda1fd6cae3299308"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "async-trait"
|
name = "async-trait"
|
||||||
version = "0.1.52"
|
version = "0.1.52"
|
||||||
@@ -703,6 +724,21 @@ dependencies = [
|
|||||||
"termcolor",
|
"termcolor",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "etcd-client"
|
||||||
|
version = "0.8.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "585de5039d1ecce74773db49ba4e8107e42be7c2cd0b1a9e7fce27181db7b118"
|
||||||
|
dependencies = [
|
||||||
|
"http",
|
||||||
|
"prost",
|
||||||
|
"tokio",
|
||||||
|
"tokio-stream",
|
||||||
|
"tonic",
|
||||||
|
"tonic-build",
|
||||||
|
"tower-service",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "fail"
|
name = "fail"
|
||||||
version = "0.5.0"
|
version = "0.5.0"
|
||||||
@@ -741,6 +777,12 @@ dependencies = [
|
|||||||
"winapi",
|
"winapi",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "fixedbitset"
|
||||||
|
version = "0.4.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "279fb028e20b3c4c320317955b77c5e0c9701f05a1d309905d6fc702cdc5053e"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "fnv"
|
name = "fnv"
|
||||||
version = "1.0.7"
|
version = "1.0.7"
|
||||||
@@ -926,7 +968,7 @@ dependencies = [
|
|||||||
"indexmap",
|
"indexmap",
|
||||||
"slab",
|
"slab",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-util",
|
"tokio-util 0.6.9",
|
||||||
"tracing",
|
"tracing",
|
||||||
]
|
]
|
||||||
|
|
||||||
@@ -954,6 +996,15 @@ dependencies = [
|
|||||||
"ahash 0.7.6",
|
"ahash 0.7.6",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "heck"
|
||||||
|
version = "0.3.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c"
|
||||||
|
dependencies = [
|
||||||
|
"unicode-segmentation",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "hermit-abi"
|
name = "hermit-abi"
|
||||||
version = "0.1.19"
|
version = "0.1.19"
|
||||||
@@ -1075,6 +1126,18 @@ dependencies = [
|
|||||||
"tokio-rustls 0.23.2",
|
"tokio-rustls 0.23.2",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "hyper-timeout"
|
||||||
|
version = "0.4.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1"
|
||||||
|
dependencies = [
|
||||||
|
"hyper",
|
||||||
|
"pin-project-lite",
|
||||||
|
"tokio",
|
||||||
|
"tokio-io-timeout",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ident_case"
|
name = "ident_case"
|
||||||
version = "1.0.1"
|
version = "1.0.1"
|
||||||
@@ -1308,9 +1371,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "mio"
|
name = "mio"
|
||||||
version = "0.7.14"
|
version = "0.8.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "8067b404fe97c70829f082dec8bcf4f71225d7eaea1d8645349cb76fa06205cc"
|
checksum = "ba272f85fa0b41fc91872be579b3bbe0f56b792aa361a380eb669469f68dafb2"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"libc",
|
"libc",
|
||||||
"log",
|
"log",
|
||||||
@@ -1328,6 +1391,12 @@ dependencies = [
|
|||||||
"winapi",
|
"winapi",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "multimap"
|
||||||
|
version = "0.8.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "nix"
|
name = "nix"
|
||||||
version = "0.23.1"
|
version = "0.23.1"
|
||||||
@@ -1557,6 +1626,16 @@ version = "2.1.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
|
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "petgraph"
|
||||||
|
version = "0.6.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "4a13a2fa9d0b63e5f22328828741e523766fff0ee9e779316902290dff3f824f"
|
||||||
|
dependencies = [
|
||||||
|
"fixedbitset",
|
||||||
|
"indexmap",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "phf"
|
name = "phf"
|
||||||
version = "0.8.0"
|
version = "0.8.0"
|
||||||
@@ -1776,6 +1855,59 @@ dependencies = [
|
|||||||
"thiserror",
|
"thiserror",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "prost"
|
||||||
|
version = "0.9.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "444879275cb4fd84958b1a1d5420d15e6fcf7c235fe47f053c9c2a80aceb6001"
|
||||||
|
dependencies = [
|
||||||
|
"bytes",
|
||||||
|
"prost-derive",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "prost-build"
|
||||||
|
version = "0.9.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5"
|
||||||
|
dependencies = [
|
||||||
|
"bytes",
|
||||||
|
"heck",
|
||||||
|
"itertools",
|
||||||
|
"lazy_static",
|
||||||
|
"log",
|
||||||
|
"multimap",
|
||||||
|
"petgraph",
|
||||||
|
"prost",
|
||||||
|
"prost-types",
|
||||||
|
"regex",
|
||||||
|
"tempfile",
|
||||||
|
"which",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "prost-derive"
|
||||||
|
version = "0.9.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f9cc1a3263e07e0bf68e96268f37665207b49560d98739662cdfaae215c720fe"
|
||||||
|
dependencies = [
|
||||||
|
"anyhow",
|
||||||
|
"itertools",
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "prost-types"
|
||||||
|
version = "0.9.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "534b7a0e836e3c482d2693070f982e39e7611da9695d4d1f5a4b186b51faef0a"
|
||||||
|
dependencies = [
|
||||||
|
"bytes",
|
||||||
|
"prost",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "proxy"
|
name = "proxy"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
@@ -1979,7 +2111,7 @@ dependencies = [
|
|||||||
"serde_urlencoded",
|
"serde_urlencoded",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-rustls 0.23.2",
|
"tokio-rustls 0.23.2",
|
||||||
"tokio-util",
|
"tokio-util 0.6.9",
|
||||||
"url",
|
"url",
|
||||||
"wasm-bindgen",
|
"wasm-bindgen",
|
||||||
"wasm-bindgen-futures",
|
"wasm-bindgen-futures",
|
||||||
@@ -2508,9 +2640,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tokio"
|
name = "tokio"
|
||||||
version = "1.16.1"
|
version = "1.17.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "0c27a64b625de6d309e8c57716ba93021dccf1b3b5c97edd6d3dd2d2135afc0a"
|
checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bytes",
|
"bytes",
|
||||||
"libc",
|
"libc",
|
||||||
@@ -2520,10 +2652,21 @@ dependencies = [
|
|||||||
"once_cell",
|
"once_cell",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"signal-hook-registry",
|
"signal-hook-registry",
|
||||||
|
"socket2",
|
||||||
"tokio-macros",
|
"tokio-macros",
|
||||||
"winapi",
|
"winapi",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tokio-io-timeout"
|
||||||
|
version = "1.2.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf"
|
||||||
|
dependencies = [
|
||||||
|
"pin-project-lite",
|
||||||
|
"tokio",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tokio-macros"
|
name = "tokio-macros"
|
||||||
version = "1.7.0"
|
version = "1.7.0"
|
||||||
@@ -2554,7 +2697,7 @@ dependencies = [
|
|||||||
"postgres-types 0.2.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7)",
|
"postgres-types 0.2.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7)",
|
||||||
"socket2",
|
"socket2",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-util",
|
"tokio-util 0.6.9",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -2576,7 +2719,7 @@ dependencies = [
|
|||||||
"postgres-types 0.2.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858)",
|
"postgres-types 0.2.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858)",
|
||||||
"socket2",
|
"socket2",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-util",
|
"tokio-util 0.6.9",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -2641,6 +2784,20 @@ dependencies = [
|
|||||||
"tokio",
|
"tokio",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tokio-util"
|
||||||
|
version = "0.7.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "64910e1b9c1901aaf5375561e35b9c057d95ff41a44ede043a03e09279eabaf1"
|
||||||
|
dependencies = [
|
||||||
|
"bytes",
|
||||||
|
"futures-core",
|
||||||
|
"futures-sink",
|
||||||
|
"log",
|
||||||
|
"pin-project-lite",
|
||||||
|
"tokio",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "toml"
|
name = "toml"
|
||||||
version = "0.5.8"
|
version = "0.5.8"
|
||||||
@@ -2663,6 +2820,75 @@ dependencies = [
|
|||||||
"serde",
|
"serde",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tonic"
|
||||||
|
version = "0.6.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "ff08f4649d10a70ffa3522ca559031285d8e421d727ac85c60825761818f5d0a"
|
||||||
|
dependencies = [
|
||||||
|
"async-stream",
|
||||||
|
"async-trait",
|
||||||
|
"base64 0.13.0",
|
||||||
|
"bytes",
|
||||||
|
"futures-core",
|
||||||
|
"futures-util",
|
||||||
|
"h2",
|
||||||
|
"http",
|
||||||
|
"http-body",
|
||||||
|
"hyper",
|
||||||
|
"hyper-timeout",
|
||||||
|
"percent-encoding",
|
||||||
|
"pin-project",
|
||||||
|
"prost",
|
||||||
|
"prost-derive",
|
||||||
|
"tokio",
|
||||||
|
"tokio-stream",
|
||||||
|
"tokio-util 0.6.9",
|
||||||
|
"tower",
|
||||||
|
"tower-layer",
|
||||||
|
"tower-service",
|
||||||
|
"tracing",
|
||||||
|
"tracing-futures",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tonic-build"
|
||||||
|
version = "0.6.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "9403f1bafde247186684b230dc6f38b5cd514584e8bec1dd32514be4745fa757"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"prost-build",
|
||||||
|
"quote",
|
||||||
|
"syn",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tower"
|
||||||
|
version = "0.4.12"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "9a89fd63ad6adf737582df5db40d286574513c69a11dac5214dc3b5603d6713e"
|
||||||
|
dependencies = [
|
||||||
|
"futures-core",
|
||||||
|
"futures-util",
|
||||||
|
"indexmap",
|
||||||
|
"pin-project",
|
||||||
|
"pin-project-lite",
|
||||||
|
"rand",
|
||||||
|
"slab",
|
||||||
|
"tokio",
|
||||||
|
"tokio-util 0.7.0",
|
||||||
|
"tower-layer",
|
||||||
|
"tower-service",
|
||||||
|
"tracing",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tower-layer"
|
||||||
|
version = "0.3.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "343bc9466d3fe6b0f960ef45960509f84480bf4fd96f92901afe7ff3df9d3a62"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tower-service"
|
name = "tower-service"
|
||||||
version = "0.3.1"
|
version = "0.3.1"
|
||||||
@@ -2676,6 +2902,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||||||
checksum = "2d8d93354fe2a8e50d5953f5ae2e47a3fc2ef03292e7ea46e3cc38f549525fb9"
|
checksum = "2d8d93354fe2a8e50d5953f5ae2e47a3fc2ef03292e7ea46e3cc38f549525fb9"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"cfg-if",
|
"cfg-if",
|
||||||
|
"log",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"tracing-attributes",
|
"tracing-attributes",
|
||||||
"tracing-core",
|
"tracing-core",
|
||||||
@@ -2768,6 +2995,12 @@ dependencies = [
|
|||||||
"tinyvec",
|
"tinyvec",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "unicode-segmentation"
|
||||||
|
version = "1.9.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "7e8820f5d777f6224dc4be3632222971ac30164d4a258d595640799554ebfd99"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "unicode-width"
|
name = "unicode-width"
|
||||||
version = "0.1.9"
|
version = "0.1.9"
|
||||||
@@ -2838,6 +3071,7 @@ dependencies = [
|
|||||||
"const_format",
|
"const_format",
|
||||||
"crc32c",
|
"crc32c",
|
||||||
"daemonize",
|
"daemonize",
|
||||||
|
"etcd-client",
|
||||||
"fs2",
|
"fs2",
|
||||||
"hex",
|
"hex",
|
||||||
"humantime",
|
"humantime",
|
||||||
@@ -2850,11 +3084,13 @@ dependencies = [
|
|||||||
"rust-s3",
|
"rust-s3",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
|
"serde_with",
|
||||||
"signal-hook",
|
"signal-hook",
|
||||||
"tempfile",
|
"tempfile",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-postgres 0.7.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7)",
|
"tokio-postgres 0.7.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7)",
|
||||||
"tracing",
|
"tracing",
|
||||||
|
"url",
|
||||||
"walkdir",
|
"walkdir",
|
||||||
"workspace_hack",
|
"workspace_hack",
|
||||||
"zenith_metrics",
|
"zenith_metrics",
|
||||||
|
|||||||
@@ -57,6 +57,10 @@ pub struct LocalEnv {
|
|||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub private_key_path: PathBuf,
|
pub private_key_path: PathBuf,
|
||||||
|
|
||||||
|
// A comma separated broker (etcd) endpoints for storage nodes coordination, e.g. 'http://127.0.0.1:2379'.
|
||||||
|
#[serde(default)]
|
||||||
|
pub broker_endpoints: Option<String>,
|
||||||
|
|
||||||
pub pageserver: PageServerConf,
|
pub pageserver: PageServerConf,
|
||||||
|
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
|
|||||||
@@ -73,6 +73,8 @@ pub struct SafekeeperNode {
|
|||||||
pub http_base_url: String,
|
pub http_base_url: String,
|
||||||
|
|
||||||
pub pageserver: Arc<PageServerNode>,
|
pub pageserver: Arc<PageServerNode>,
|
||||||
|
|
||||||
|
broker_endpoints: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SafekeeperNode {
|
impl SafekeeperNode {
|
||||||
@@ -89,6 +91,7 @@ impl SafekeeperNode {
|
|||||||
http_client: Client::new(),
|
http_client: Client::new(),
|
||||||
http_base_url: format!("http://127.0.0.1:{}/v1", conf.http_port),
|
http_base_url: format!("http://127.0.0.1:{}/v1", conf.http_port),
|
||||||
pageserver,
|
pageserver,
|
||||||
|
broker_endpoints: env.broker_endpoints.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -135,6 +138,9 @@ impl SafekeeperNode {
|
|||||||
if !self.conf.sync {
|
if !self.conf.sync {
|
||||||
cmd.arg("--no-sync");
|
cmd.arg("--no-sync");
|
||||||
}
|
}
|
||||||
|
if let Some(ref ep) = self.broker_endpoints {
|
||||||
|
cmd.args(&["--broker-endpoints", ep]);
|
||||||
|
}
|
||||||
|
|
||||||
if !cmd.status()?.success() {
|
if !cmd.status()?.success() {
|
||||||
bail!(
|
bail!(
|
||||||
|
|||||||
@@ -10,6 +10,8 @@ Prerequisites:
|
|||||||
below to run from other directories.
|
below to run from other directories.
|
||||||
- The zenith git repo, including the postgres submodule
|
- The zenith git repo, including the postgres submodule
|
||||||
(for some tests, e.g. `pg_regress`)
|
(for some tests, e.g. `pg_regress`)
|
||||||
|
- Some tests (involving storage nodes coordination) require etcd installed. Follow
|
||||||
|
[`the guide`](https://etcd.io/docs/v3.5/install/) to obtain it.
|
||||||
|
|
||||||
### Test Organization
|
### Test Organization
|
||||||
|
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ from dataclasses import dataclass, field
|
|||||||
from multiprocessing import Process, Value
|
from multiprocessing import Process, Value
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from fixtures.zenith_fixtures import PgBin, Postgres, Safekeeper, ZenithEnv, ZenithEnvBuilder, PortDistributor, SafekeeperPort, zenith_binpath, PgProtocol
|
from fixtures.zenith_fixtures import PgBin, Postgres, Safekeeper, ZenithEnv, ZenithEnvBuilder, PortDistributor, SafekeeperPort, zenith_binpath, PgProtocol
|
||||||
from fixtures.utils import lsn_to_hex, mkdir_if_needed, lsn_from_hex
|
from fixtures.utils import etcd_path, lsn_to_hex, mkdir_if_needed, lsn_from_hex
|
||||||
from fixtures.log_helper import log
|
from fixtures.log_helper import log
|
||||||
from typing import List, Optional, Any
|
from typing import List, Optional, Any
|
||||||
|
|
||||||
@@ -22,6 +22,7 @@ from typing import List, Optional, Any
|
|||||||
# succeed and data is written
|
# succeed and data is written
|
||||||
def test_normal_work(zenith_env_builder: ZenithEnvBuilder):
|
def test_normal_work(zenith_env_builder: ZenithEnvBuilder):
|
||||||
zenith_env_builder.num_safekeepers = 3
|
zenith_env_builder.num_safekeepers = 3
|
||||||
|
zenith_env_builder.broker = True
|
||||||
env = zenith_env_builder.init_start()
|
env = zenith_env_builder.init_start()
|
||||||
|
|
||||||
env.zenith_cli.create_branch('test_wal_acceptors_normal_work')
|
env.zenith_cli.create_branch('test_wal_acceptors_normal_work')
|
||||||
@@ -326,6 +327,49 @@ def test_race_conditions(zenith_env_builder: ZenithEnvBuilder, stop_value):
|
|||||||
proc.join()
|
proc.join()
|
||||||
|
|
||||||
|
|
||||||
|
# Test that safekeepers push their info to the broker and learn peer status from it
|
||||||
|
@pytest.mark.skipif(etcd_path() is None, reason="requires etcd which is not present in PATH")
|
||||||
|
def test_broker(zenith_env_builder: ZenithEnvBuilder):
|
||||||
|
zenith_env_builder.num_safekeepers = 3
|
||||||
|
zenith_env_builder.broker = True
|
||||||
|
zenith_env_builder.enable_local_fs_remote_storage()
|
||||||
|
env = zenith_env_builder.init_start()
|
||||||
|
|
||||||
|
env.zenith_cli.create_branch("test_broker", "main")
|
||||||
|
pg = env.postgres.create_start('test_broker')
|
||||||
|
pg.safe_psql("CREATE TABLE t(key int primary key, value text)")
|
||||||
|
|
||||||
|
# learn zenith timeline from compute
|
||||||
|
tenant_id = pg.safe_psql("show zenith.zenith_tenant")[0][0]
|
||||||
|
timeline_id = pg.safe_psql("show zenith.zenith_timeline")[0][0]
|
||||||
|
|
||||||
|
# wait until remote_consistent_lsn gets advanced on all safekeepers
|
||||||
|
clients = [sk.http_client() for sk in env.safekeepers]
|
||||||
|
stat_before = [cli.timeline_status(tenant_id, timeline_id) for cli in clients]
|
||||||
|
log.info(f"statuses is {stat_before}")
|
||||||
|
|
||||||
|
pg.safe_psql("INSERT INTO t SELECT generate_series(1,100), 'payload'")
|
||||||
|
# force checkpoint to advance remote_consistent_lsn
|
||||||
|
with closing(env.pageserver.connect()) as psconn:
|
||||||
|
with psconn.cursor() as pscur:
|
||||||
|
pscur.execute(f"checkpoint {tenant_id} {timeline_id}")
|
||||||
|
# and wait till remote_consistent_lsn propagates to all safekeepers
|
||||||
|
started_at = time.time()
|
||||||
|
while True:
|
||||||
|
stat_after = [cli.timeline_status(tenant_id, timeline_id) for cli in clients]
|
||||||
|
if all(
|
||||||
|
lsn_from_hex(s_after.remote_consistent_lsn) > lsn_from_hex(
|
||||||
|
s_before.remote_consistent_lsn) for s_after,
|
||||||
|
s_before in zip(stat_after, stat_before)):
|
||||||
|
break
|
||||||
|
elapsed = time.time() - started_at
|
||||||
|
if elapsed > 20:
|
||||||
|
raise RuntimeError(
|
||||||
|
f"timed out waiting {elapsed:.0f}s for remote_consistent_lsn propagation: status before {stat_before}, status current {stat_after}"
|
||||||
|
)
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
|
||||||
class ProposerPostgres(PgProtocol):
|
class ProposerPostgres(PgProtocol):
|
||||||
"""Object for running postgres without ZenithEnv"""
|
"""Object for running postgres without ZenithEnv"""
|
||||||
def __init__(self,
|
def __init__(self,
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import os
|
import os
|
||||||
|
import shutil
|
||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
from typing import Any, List
|
from typing import Any, List
|
||||||
@@ -76,3 +77,8 @@ def print_gc_result(row):
|
|||||||
log.info(
|
log.info(
|
||||||
" total: {layers_total}, needed_by_cutoff {layers_needed_by_cutoff}, needed_by_branches: {layers_needed_by_branches}, not_updated: {layers_not_updated}, removed: {layers_removed}"
|
" total: {layers_total}, needed_by_cutoff {layers_needed_by_cutoff}, needed_by_branches: {layers_needed_by_branches}, not_updated: {layers_not_updated}, removed: {layers_removed}"
|
||||||
.format_map(row))
|
.format_map(row))
|
||||||
|
|
||||||
|
|
||||||
|
# path to etcd binary or None if not present.
|
||||||
|
def etcd_path():
|
||||||
|
return shutil.which("etcd")
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ from typing_extensions import Literal
|
|||||||
import requests
|
import requests
|
||||||
import backoff # type: ignore
|
import backoff # type: ignore
|
||||||
|
|
||||||
from .utils import (get_self_dir, lsn_from_hex, mkdir_if_needed, subprocess_capture)
|
from .utils import (etcd_path, get_self_dir, mkdir_if_needed, subprocess_capture, lsn_from_hex)
|
||||||
from fixtures.log_helper import log
|
from fixtures.log_helper import log
|
||||||
"""
|
"""
|
||||||
This file contains pytest fixtures. A fixture is a test resource that can be
|
This file contains pytest fixtures. A fixture is a test resource that can be
|
||||||
@@ -433,7 +433,8 @@ class ZenithEnvBuilder:
|
|||||||
num_safekeepers: int = 0,
|
num_safekeepers: int = 0,
|
||||||
pageserver_auth_enabled: bool = False,
|
pageserver_auth_enabled: bool = False,
|
||||||
rust_log_override: Optional[str] = None,
|
rust_log_override: Optional[str] = None,
|
||||||
default_branch_name=DEFAULT_BRANCH_NAME):
|
default_branch_name=DEFAULT_BRANCH_NAME,
|
||||||
|
broker: bool = False):
|
||||||
self.repo_dir = repo_dir
|
self.repo_dir = repo_dir
|
||||||
self.rust_log_override = rust_log_override
|
self.rust_log_override = rust_log_override
|
||||||
self.port_distributor = port_distributor
|
self.port_distributor = port_distributor
|
||||||
@@ -442,6 +443,7 @@ class ZenithEnvBuilder:
|
|||||||
self.num_safekeepers = num_safekeepers
|
self.num_safekeepers = num_safekeepers
|
||||||
self.pageserver_auth_enabled = pageserver_auth_enabled
|
self.pageserver_auth_enabled = pageserver_auth_enabled
|
||||||
self.default_branch_name = default_branch_name
|
self.default_branch_name = default_branch_name
|
||||||
|
self.broker = broker
|
||||||
self.env: Optional[ZenithEnv] = None
|
self.env: Optional[ZenithEnv] = None
|
||||||
|
|
||||||
self.s3_mock_server: Optional[MockS3Server] = None
|
self.s3_mock_server: Optional[MockS3Server] = None
|
||||||
@@ -517,6 +519,8 @@ class ZenithEnvBuilder:
|
|||||||
self.env.pageserver.stop(immediate=True)
|
self.env.pageserver.stop(immediate=True)
|
||||||
if self.s3_mock_server:
|
if self.s3_mock_server:
|
||||||
self.s3_mock_server.kill()
|
self.s3_mock_server.kill()
|
||||||
|
if self.env.broker is not None:
|
||||||
|
self.env.broker.stop()
|
||||||
|
|
||||||
|
|
||||||
class ZenithEnv:
|
class ZenithEnv:
|
||||||
@@ -569,6 +573,16 @@ class ZenithEnv:
|
|||||||
default_tenant_id = '{self.initial_tenant.hex}'
|
default_tenant_id = '{self.initial_tenant.hex}'
|
||||||
""")
|
""")
|
||||||
|
|
||||||
|
self.broker = None
|
||||||
|
if config.broker:
|
||||||
|
# keep etcd datadir inside 'repo'
|
||||||
|
self.broker = Etcd(datadir=os.path.join(self.repo_dir, "etcd"),
|
||||||
|
port=self.port_distributor.get_port(),
|
||||||
|
peer_port=self.port_distributor.get_port())
|
||||||
|
toml += textwrap.dedent(f"""
|
||||||
|
broker_endpoints = 'http://127.0.0.1:{self.broker.port}'
|
||||||
|
""")
|
||||||
|
|
||||||
# Create config for pageserver
|
# Create config for pageserver
|
||||||
pageserver_port = PageserverPort(
|
pageserver_port = PageserverPort(
|
||||||
pg=self.port_distributor.get_port(),
|
pg=self.port_distributor.get_port(),
|
||||||
@@ -611,12 +625,15 @@ class ZenithEnv:
|
|||||||
self.zenith_cli.init(toml)
|
self.zenith_cli.init(toml)
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
# Start up the page server and all the safekeepers
|
# Start up the page server, all the safekeepers and the broker
|
||||||
self.pageserver.start()
|
self.pageserver.start()
|
||||||
|
|
||||||
for safekeeper in self.safekeepers:
|
for safekeeper in self.safekeepers:
|
||||||
safekeeper.start()
|
safekeeper.start()
|
||||||
|
|
||||||
|
if self.broker is not None:
|
||||||
|
self.broker.start()
|
||||||
|
|
||||||
def get_safekeeper_connstrs(self) -> str:
|
def get_safekeeper_connstrs(self) -> str:
|
||||||
""" Get list of safekeeper endpoints suitable for wal_acceptors GUC """
|
""" Get list of safekeeper endpoints suitable for wal_acceptors GUC """
|
||||||
return ','.join([f'localhost:{wa.port.pg}' for wa in self.safekeepers])
|
return ','.join([f'localhost:{wa.port.pg}' for wa in self.safekeepers])
|
||||||
@@ -1674,6 +1691,7 @@ class Safekeeper:
|
|||||||
class SafekeeperTimelineStatus:
|
class SafekeeperTimelineStatus:
|
||||||
acceptor_epoch: int
|
acceptor_epoch: int
|
||||||
flush_lsn: str
|
flush_lsn: str
|
||||||
|
remote_consistent_lsn: str
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@@ -1697,7 +1715,8 @@ class SafekeeperHttpClient(requests.Session):
|
|||||||
res.raise_for_status()
|
res.raise_for_status()
|
||||||
resj = res.json()
|
resj = res.json()
|
||||||
return SafekeeperTimelineStatus(acceptor_epoch=resj['acceptor_state']['epoch'],
|
return SafekeeperTimelineStatus(acceptor_epoch=resj['acceptor_state']['epoch'],
|
||||||
flush_lsn=resj['flush_lsn'])
|
flush_lsn=resj['flush_lsn'],
|
||||||
|
remote_consistent_lsn=resj['remote_consistent_lsn'])
|
||||||
|
|
||||||
def get_metrics(self) -> SafekeeperMetrics:
|
def get_metrics(self) -> SafekeeperMetrics:
|
||||||
request_result = self.get(f"http://localhost:{self.port}/metrics")
|
request_result = self.get(f"http://localhost:{self.port}/metrics")
|
||||||
@@ -1718,6 +1737,54 @@ class SafekeeperHttpClient(requests.Session):
|
|||||||
return metrics
|
return metrics
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Etcd:
|
||||||
|
""" An object managing etcd instance """
|
||||||
|
datadir: str
|
||||||
|
port: int
|
||||||
|
peer_port: int
|
||||||
|
handle: Optional[subprocess.Popen[Any]] = None # handle of running daemon
|
||||||
|
|
||||||
|
def check_status(self):
|
||||||
|
s = requests.Session()
|
||||||
|
s.mount('http://', requests.adapters.HTTPAdapter(max_retries=1)) # do not retry
|
||||||
|
s.get(f"http://localhost:{self.port}/health").raise_for_status()
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
pathlib.Path(self.datadir).mkdir(exist_ok=True)
|
||||||
|
etcd_full_path = etcd_path()
|
||||||
|
if etcd_full_path is None:
|
||||||
|
raise Exception('etcd not found')
|
||||||
|
|
||||||
|
with open(os.path.join(self.datadir, "etcd.log"), "wb") as log_file:
|
||||||
|
args = [
|
||||||
|
etcd_full_path,
|
||||||
|
f"--data-dir={self.datadir}",
|
||||||
|
f"--listen-client-urls=http://localhost:{self.port}",
|
||||||
|
f"--advertise-client-urls=http://localhost:{self.port}",
|
||||||
|
f"--listen-peer-urls=http://localhost:{self.peer_port}"
|
||||||
|
]
|
||||||
|
self.handle = subprocess.Popen(args, stdout=log_file, stderr=log_file)
|
||||||
|
|
||||||
|
# wait for start
|
||||||
|
started_at = time.time()
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
self.check_status()
|
||||||
|
except Exception as e:
|
||||||
|
elapsed = time.time() - started_at
|
||||||
|
if elapsed > 5:
|
||||||
|
raise RuntimeError(f"timed out waiting {elapsed:.0f}s for etcd start: {e}")
|
||||||
|
time.sleep(0.5)
|
||||||
|
else:
|
||||||
|
break # success
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
if self.handle is not None:
|
||||||
|
self.handle.terminate()
|
||||||
|
self.handle.wait()
|
||||||
|
|
||||||
|
|
||||||
def get_test_output_dir(request: Any) -> str:
|
def get_test_output_dir(request: Any) -> str:
|
||||||
""" Compute the working directory for an individual test. """
|
""" Compute the working directory for an individual test. """
|
||||||
test_name = request.node.name
|
test_name = request.node.name
|
||||||
|
|||||||
@@ -22,11 +22,14 @@ anyhow = "1.0"
|
|||||||
crc32c = "0.6.0"
|
crc32c = "0.6.0"
|
||||||
humantime = "2.1.0"
|
humantime = "2.1.0"
|
||||||
walkdir = "2"
|
walkdir = "2"
|
||||||
|
url = "2.2.2"
|
||||||
signal-hook = "0.3.10"
|
signal-hook = "0.3.10"
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
|
serde_with = {version = "1.12.0"}
|
||||||
hex = "0.4.3"
|
hex = "0.4.3"
|
||||||
const_format = "0.2.21"
|
const_format = "0.2.21"
|
||||||
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="2949d98df52587d562986aad155dd4e889e408b7" }
|
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="2949d98df52587d562986aad155dd4e889e408b7" }
|
||||||
|
etcd-client = "0.8.3"
|
||||||
|
|
||||||
postgres_ffi = { path = "../postgres_ffi" }
|
postgres_ffi = { path = "../postgres_ffi" }
|
||||||
zenith_metrics = { path = "../zenith_metrics" }
|
zenith_metrics = { path = "../zenith_metrics" }
|
||||||
|
|||||||
@@ -11,18 +11,19 @@ use std::io::{ErrorKind, Write};
|
|||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use tracing::*;
|
use tracing::*;
|
||||||
|
use url::{ParseError, Url};
|
||||||
use walkeeper::control_file::{self};
|
use walkeeper::control_file::{self};
|
||||||
use zenith_utils::http::endpoint;
|
use zenith_utils::http::endpoint;
|
||||||
use zenith_utils::zid::ZNodeId;
|
use zenith_utils::zid::ZNodeId;
|
||||||
use zenith_utils::{logging, tcp_listener, GIT_VERSION};
|
use zenith_utils::{logging, tcp_listener, GIT_VERSION};
|
||||||
|
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use walkeeper::callmemaybe;
|
|
||||||
use walkeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR};
|
use walkeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR};
|
||||||
use walkeeper::http;
|
use walkeeper::http;
|
||||||
use walkeeper::s3_offload;
|
use walkeeper::s3_offload;
|
||||||
use walkeeper::wal_service;
|
use walkeeper::wal_service;
|
||||||
use walkeeper::SafeKeeperConf;
|
use walkeeper::SafeKeeperConf;
|
||||||
|
use walkeeper::{broker, callmemaybe};
|
||||||
use zenith_utils::shutdown::exit_now;
|
use zenith_utils::shutdown::exit_now;
|
||||||
use zenith_utils::signals;
|
use zenith_utils::signals;
|
||||||
|
|
||||||
@@ -104,6 +105,11 @@ fn main() -> Result<()> {
|
|||||||
)
|
)
|
||||||
.arg(
|
.arg(
|
||||||
Arg::new("id").long("id").takes_value(true).help("safekeeper node id: integer")
|
Arg::new("id").long("id").takes_value(true).help("safekeeper node id: integer")
|
||||||
|
).arg(
|
||||||
|
Arg::new("broker-endpoints")
|
||||||
|
.long("broker-endpoints")
|
||||||
|
.takes_value(true)
|
||||||
|
.help("a comma separated broker (etcd) endpoints for storage nodes coordination, e.g. 'http://127.0.0.1:2379'"),
|
||||||
)
|
)
|
||||||
.get_matches();
|
.get_matches();
|
||||||
|
|
||||||
@@ -154,6 +160,11 @@ fn main() -> Result<()> {
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(addr) = arg_matches.value_of("broker-endpoints") {
|
||||||
|
let collected_ep: Result<Vec<Url>, ParseError> = addr.split(',').map(Url::parse).collect();
|
||||||
|
conf.broker_endpoints = Some(collected_ep?);
|
||||||
|
}
|
||||||
|
|
||||||
start_safekeeper(conf, given_id, arg_matches.is_present("init"))
|
start_safekeeper(conf, given_id, arg_matches.is_present("init"))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -259,11 +270,12 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<ZNodeId>, init: b
|
|||||||
|
|
||||||
threads.push(wal_acceptor_thread);
|
threads.push(wal_acceptor_thread);
|
||||||
|
|
||||||
|
let conf_cloned = conf.clone();
|
||||||
let callmemaybe_thread = thread::Builder::new()
|
let callmemaybe_thread = thread::Builder::new()
|
||||||
.name("callmemaybe thread".into())
|
.name("callmemaybe thread".into())
|
||||||
.spawn(|| {
|
.spawn(|| {
|
||||||
// thread code
|
// thread code
|
||||||
let thread_result = callmemaybe::thread_main(conf, rx);
|
let thread_result = callmemaybe::thread_main(conf_cloned, rx);
|
||||||
if let Err(e) = thread_result {
|
if let Err(e) = thread_result {
|
||||||
error!("callmemaybe thread terminated: {}", e);
|
error!("callmemaybe thread terminated: {}", e);
|
||||||
}
|
}
|
||||||
@@ -271,6 +283,17 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<ZNodeId>, init: b
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
threads.push(callmemaybe_thread);
|
threads.push(callmemaybe_thread);
|
||||||
|
|
||||||
|
if conf.broker_endpoints.is_some() {
|
||||||
|
let conf_ = conf.clone();
|
||||||
|
threads.push(
|
||||||
|
thread::Builder::new()
|
||||||
|
.name("broker thread".into())
|
||||||
|
.spawn(|| {
|
||||||
|
broker::thread_main(conf_);
|
||||||
|
})?,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: put more thoughts into handling of failed threads
|
// TODO: put more thoughts into handling of failed threads
|
||||||
// We probably should restart them.
|
// We probably should restart them.
|
||||||
|
|
||||||
|
|||||||
211
walkeeper/src/broker.rs
Normal file
211
walkeeper/src/broker.rs
Normal file
@@ -0,0 +1,211 @@
|
|||||||
|
//! Communication with etcd, providing safekeeper peers and pageserver coordination.
|
||||||
|
|
||||||
|
use anyhow::bail;
|
||||||
|
use anyhow::Context;
|
||||||
|
use anyhow::Error;
|
||||||
|
use anyhow::Result;
|
||||||
|
use etcd_client::Client;
|
||||||
|
use etcd_client::EventType;
|
||||||
|
use etcd_client::PutOptions;
|
||||||
|
use etcd_client::WatchOptions;
|
||||||
|
use lazy_static::lazy_static;
|
||||||
|
use regex::Regex;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use serde_with::{serde_as, DisplayFromStr};
|
||||||
|
use std::str::FromStr;
|
||||||
|
use std::time::Duration;
|
||||||
|
use tokio::task::JoinHandle;
|
||||||
|
use tokio::{runtime, time::sleep};
|
||||||
|
use tracing::*;
|
||||||
|
use zenith_utils::zid::ZTenantId;
|
||||||
|
use zenith_utils::zid::ZTimelineId;
|
||||||
|
use zenith_utils::{
|
||||||
|
lsn::Lsn,
|
||||||
|
zid::{ZNodeId, ZTenantTimelineId},
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::{safekeeper::Term, timeline::GlobalTimelines, SafeKeeperConf};
|
||||||
|
|
||||||
|
const RETRY_INTERVAL_MSEC: u64 = 1000;
|
||||||
|
const PUSH_INTERVAL_MSEC: u64 = 1000;
|
||||||
|
const LEASE_TTL_SEC: i64 = 5;
|
||||||
|
// TODO: add global zenith installation ID.
|
||||||
|
const ZENITH_PREFIX: &str = "zenith";
|
||||||
|
|
||||||
|
/// Published data about safekeeper. Fields made optional for easy migrations.
|
||||||
|
#[serde_as]
|
||||||
|
#[derive(Deserialize, Serialize)]
|
||||||
|
pub struct SafekeeperInfo {
|
||||||
|
/// Term of the last entry.
|
||||||
|
pub last_log_term: Option<Term>,
|
||||||
|
/// LSN of the last record.
|
||||||
|
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||||
|
pub flush_lsn: Option<Lsn>,
|
||||||
|
/// Up to which LSN safekeeper regards its WAL as committed.
|
||||||
|
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||||
|
pub commit_lsn: Option<Lsn>,
|
||||||
|
/// LSN up to which safekeeper offloaded WAL to s3.
|
||||||
|
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||||
|
pub s3_wal_lsn: Option<Lsn>,
|
||||||
|
/// LSN of last checkpoint uploaded by pageserver.
|
||||||
|
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||||
|
pub remote_consistent_lsn: Option<Lsn>,
|
||||||
|
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||||
|
pub peer_horizon_lsn: Option<Lsn>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn thread_main(conf: SafeKeeperConf) {
|
||||||
|
let runtime = runtime::Builder::new_current_thread()
|
||||||
|
.enable_all()
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let _enter = info_span!("broker").entered();
|
||||||
|
info!("started, broker endpoints {:?}", conf.broker_endpoints);
|
||||||
|
|
||||||
|
runtime.block_on(async {
|
||||||
|
main_loop(conf).await;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Prefix to timeline related data.
|
||||||
|
fn timeline_path(zttid: &ZTenantTimelineId) -> String {
|
||||||
|
format!(
|
||||||
|
"{}/{}/{}",
|
||||||
|
ZENITH_PREFIX, zttid.tenant_id, zttid.timeline_id
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Key to per timeline per safekeeper data.
|
||||||
|
fn timeline_safekeeper_path(zttid: &ZTenantTimelineId, sk_id: ZNodeId) -> String {
|
||||||
|
format!("{}/safekeeper/{}", timeline_path(zttid), sk_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Push once in a while data about all active timelines to the broker.
|
||||||
|
async fn push_loop(conf: SafeKeeperConf) -> Result<()> {
|
||||||
|
let mut client = Client::connect(conf.broker_endpoints.as_ref().unwrap(), None).await?;
|
||||||
|
|
||||||
|
// Get and maintain lease to automatically delete obsolete data
|
||||||
|
let lease = client.lease_grant(LEASE_TTL_SEC, None).await?;
|
||||||
|
let (mut keeper, mut ka_stream) = client.lease_keep_alive(lease.id()).await?;
|
||||||
|
|
||||||
|
let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
|
||||||
|
loop {
|
||||||
|
// Note: we lock runtime here and in timeline methods as GlobalTimelines
|
||||||
|
// is under plain mutex. That's ok, all this code is not performance
|
||||||
|
// sensitive and there is no risk of deadlock as we don't await while
|
||||||
|
// lock is held.
|
||||||
|
let active_tlis = GlobalTimelines::get_active_timelines();
|
||||||
|
for zttid in &active_tlis {
|
||||||
|
if let Ok(tli) = GlobalTimelines::get(&conf, *zttid, false) {
|
||||||
|
let sk_info = tli.get_public_info();
|
||||||
|
let put_opts = PutOptions::new().with_lease(lease.id());
|
||||||
|
client
|
||||||
|
.put(
|
||||||
|
timeline_safekeeper_path(zttid, conf.my_id),
|
||||||
|
serde_json::to_string(&sk_info)?,
|
||||||
|
Some(put_opts),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.context("failed to push safekeeper info")?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// revive the lease
|
||||||
|
keeper
|
||||||
|
.keep_alive()
|
||||||
|
.await
|
||||||
|
.context("failed to send LeaseKeepAliveRequest")?;
|
||||||
|
ka_stream
|
||||||
|
.message()
|
||||||
|
.await
|
||||||
|
.context("failed to receive LeaseKeepAliveResponse")?;
|
||||||
|
sleep(push_interval).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Subscribe and fetch all the interesting data from the broker.
|
||||||
|
async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
|
||||||
|
lazy_static! {
|
||||||
|
static ref TIMELINE_SAFEKEEPER_RE: Regex =
|
||||||
|
Regex::new(r"^zenith/([[:xdigit:]]+)/([[:xdigit:]]+)/safekeeper/([[:digit:]])$")
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
let mut client = Client::connect(conf.broker_endpoints.as_ref().unwrap(), None).await?;
|
||||||
|
loop {
|
||||||
|
let wo = WatchOptions::new().with_prefix();
|
||||||
|
// TODO: subscribe only to my timelines
|
||||||
|
let (_, mut stream) = client.watch(ZENITH_PREFIX, Some(wo)).await?;
|
||||||
|
while let Some(resp) = stream.message().await? {
|
||||||
|
if resp.canceled() {
|
||||||
|
bail!("watch canceled");
|
||||||
|
}
|
||||||
|
|
||||||
|
for event in resp.events() {
|
||||||
|
if EventType::Put == event.event_type() {
|
||||||
|
if let Some(kv) = event.kv() {
|
||||||
|
if let Some(caps) = TIMELINE_SAFEKEEPER_RE.captures(kv.key_str()?) {
|
||||||
|
let tenant_id = ZTenantId::from_str(caps.get(1).unwrap().as_str())?;
|
||||||
|
let timeline_id = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?;
|
||||||
|
let zttid = ZTenantTimelineId::new(tenant_id, timeline_id);
|
||||||
|
let safekeeper_id = ZNodeId(caps.get(3).unwrap().as_str().parse()?);
|
||||||
|
let value_str = kv.value_str()?;
|
||||||
|
match serde_json::from_str::<SafekeeperInfo>(value_str) {
|
||||||
|
Ok(safekeeper_info) => {
|
||||||
|
if let Ok(tli) = GlobalTimelines::get(&conf, zttid, false) {
|
||||||
|
tli.record_safekeeper_info(&safekeeper_info, safekeeper_id)?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => warn!(
|
||||||
|
"failed to deserialize safekeeper info {}: {}",
|
||||||
|
value_str, err
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn main_loop(conf: SafeKeeperConf) {
|
||||||
|
let mut ticker = tokio::time::interval(Duration::from_millis(RETRY_INTERVAL_MSEC));
|
||||||
|
let mut push_handle: Option<JoinHandle<Result<(), Error>>> = None;
|
||||||
|
let mut pull_handle: Option<JoinHandle<Result<(), Error>>> = None;
|
||||||
|
// Selecting on JoinHandles requires some squats; is there a better way to
|
||||||
|
// reap tasks individually?
|
||||||
|
|
||||||
|
// Handling failures in task itself won't catch panic and in Tokio, task's
|
||||||
|
// panic doesn't kill the whole executor, so it is better to do reaping
|
||||||
|
// here.
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
res = async { push_handle.as_mut().unwrap().await }, if push_handle.is_some() => {
|
||||||
|
// was it panic or normal error?
|
||||||
|
let err = match res {
|
||||||
|
Ok(res_internal) => res_internal.unwrap_err(),
|
||||||
|
Err(err_outer) => err_outer.into(),
|
||||||
|
};
|
||||||
|
warn!("push task failed: {:?}", err);
|
||||||
|
push_handle = None;
|
||||||
|
},
|
||||||
|
res = async { pull_handle.as_mut().unwrap().await }, if pull_handle.is_some() => {
|
||||||
|
// was it panic or normal error?
|
||||||
|
let err = match res {
|
||||||
|
Ok(res_internal) => res_internal.unwrap_err(),
|
||||||
|
Err(err_outer) => err_outer.into(),
|
||||||
|
};
|
||||||
|
warn!("pull task failed: {:?}", err);
|
||||||
|
pull_handle = None;
|
||||||
|
},
|
||||||
|
_ = ticker.tick() => {
|
||||||
|
if push_handle.is_none() {
|
||||||
|
push_handle = Some(tokio::spawn(push_loop(conf.clone())));
|
||||||
|
}
|
||||||
|
if pull_handle.is_none() {
|
||||||
|
pull_handle = Some(tokio::spawn(pull_loop(conf.clone())));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -168,7 +168,14 @@ impl SafekeeperPostgresHandler {
|
|||||||
fn handle_identify_system(&mut self, pgb: &mut PostgresBackend) -> Result<()> {
|
fn handle_identify_system(&mut self, pgb: &mut PostgresBackend) -> Result<()> {
|
||||||
let start_pos = self.timeline.get().get_end_of_wal();
|
let start_pos = self.timeline.get().get_end_of_wal();
|
||||||
let lsn = start_pos.to_string();
|
let lsn = start_pos.to_string();
|
||||||
let sysid = self.timeline.get().get_info().server.system_id.to_string();
|
let sysid = self
|
||||||
|
.timeline
|
||||||
|
.get()
|
||||||
|
.get_state()
|
||||||
|
.1
|
||||||
|
.server
|
||||||
|
.system_id
|
||||||
|
.to_string();
|
||||||
let lsn_bytes = lsn.as_bytes();
|
let lsn_bytes = lsn.as_bytes();
|
||||||
let tli = PG_TLI.to_string();
|
let tli = PG_TLI.to_string();
|
||||||
let tli_bytes = tli.as_bytes();
|
let tli_bytes = tli.as_bytes();
|
||||||
|
|||||||
@@ -86,23 +86,24 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
|
|||||||
);
|
);
|
||||||
|
|
||||||
let tli = GlobalTimelines::get(get_conf(&request), zttid, false).map_err(ApiError::from_err)?;
|
let tli = GlobalTimelines::get(get_conf(&request), zttid, false).map_err(ApiError::from_err)?;
|
||||||
let sk_state = tli.get_info();
|
let (inmem, state) = tli.get_state();
|
||||||
let flush_lsn = tli.get_end_of_wal();
|
let flush_lsn = tli.get_end_of_wal();
|
||||||
|
|
||||||
let acc_state = AcceptorStateStatus {
|
let acc_state = AcceptorStateStatus {
|
||||||
term: sk_state.acceptor_state.term,
|
term: state.acceptor_state.term,
|
||||||
epoch: sk_state.acceptor_state.get_epoch(flush_lsn),
|
epoch: state.acceptor_state.get_epoch(flush_lsn),
|
||||||
term_history: sk_state.acceptor_state.term_history,
|
term_history: state.acceptor_state.term_history,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Note: we report in memory values which can be lost.
|
||||||
let status = TimelineStatus {
|
let status = TimelineStatus {
|
||||||
tenant_id: zttid.tenant_id,
|
tenant_id: zttid.tenant_id,
|
||||||
timeline_id: zttid.timeline_id,
|
timeline_id: zttid.timeline_id,
|
||||||
acceptor_state: acc_state,
|
acceptor_state: acc_state,
|
||||||
commit_lsn: sk_state.commit_lsn,
|
commit_lsn: inmem.commit_lsn,
|
||||||
s3_wal_lsn: sk_state.s3_wal_lsn,
|
s3_wal_lsn: inmem.s3_wal_lsn,
|
||||||
peer_horizon_lsn: sk_state.peer_horizon_lsn,
|
peer_horizon_lsn: inmem.peer_horizon_lsn,
|
||||||
remote_consistent_lsn: sk_state.remote_consistent_lsn,
|
remote_consistent_lsn: inmem.remote_consistent_lsn,
|
||||||
flush_lsn,
|
flush_lsn,
|
||||||
};
|
};
|
||||||
Ok(json_response(StatusCode::OK, status)?)
|
Ok(json_response(StatusCode::OK, status)?)
|
||||||
|
|||||||
@@ -73,7 +73,7 @@ pub fn handle_json_ctrl(
|
|||||||
|
|
||||||
let inserted_wal = append_logical_message(spg, append_request)?;
|
let inserted_wal = append_logical_message(spg, append_request)?;
|
||||||
let response = AppendResult {
|
let response = AppendResult {
|
||||||
state: spg.timeline.get().get_info(),
|
state: spg.timeline.get().get_state().1,
|
||||||
inserted_wal,
|
inserted_wal,
|
||||||
};
|
};
|
||||||
let response_data = serde_json::to_vec(&response)?;
|
let response_data = serde_json::to_vec(&response)?;
|
||||||
@@ -112,7 +112,7 @@ fn prepare_safekeeper(spg: &mut SafekeeperPostgresHandler) -> Result<()> {
|
|||||||
|
|
||||||
fn send_proposer_elected(spg: &mut SafekeeperPostgresHandler, term: Term, lsn: Lsn) -> Result<()> {
|
fn send_proposer_elected(spg: &mut SafekeeperPostgresHandler, term: Term, lsn: Lsn) -> Result<()> {
|
||||||
// add new term to existing history
|
// add new term to existing history
|
||||||
let history = spg.timeline.get().get_info().acceptor_state.term_history;
|
let history = spg.timeline.get().get_state().1.acceptor_state.term_history;
|
||||||
let history = history.up_to(lsn.checked_sub(1u64).unwrap());
|
let history = history.up_to(lsn.checked_sub(1u64).unwrap());
|
||||||
let mut history_entries = history.0;
|
let mut history_entries = history.0;
|
||||||
history_entries.push(TermSwitchEntry { term, lsn });
|
history_entries.push(TermSwitchEntry { term, lsn });
|
||||||
@@ -142,7 +142,7 @@ fn append_logical_message(
|
|||||||
msg: &AppendLogicalMessage,
|
msg: &AppendLogicalMessage,
|
||||||
) -> Result<InsertedWAL> {
|
) -> Result<InsertedWAL> {
|
||||||
let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message);
|
let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message);
|
||||||
let sk_state = spg.timeline.get().get_info();
|
let sk_state = spg.timeline.get().get_state().1;
|
||||||
|
|
||||||
let begin_lsn = msg.begin_lsn;
|
let begin_lsn = msg.begin_lsn;
|
||||||
let end_lsn = begin_lsn + wal_data.len() as u64;
|
let end_lsn = begin_lsn + wal_data.len() as u64;
|
||||||
|
|||||||
@@ -1,9 +1,11 @@
|
|||||||
//
|
//
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
use zenith_utils::zid::{ZNodeId, ZTenantTimelineId};
|
use zenith_utils::zid::{ZNodeId, ZTenantTimelineId};
|
||||||
|
|
||||||
|
pub mod broker;
|
||||||
pub mod callmemaybe;
|
pub mod callmemaybe;
|
||||||
pub mod control_file;
|
pub mod control_file;
|
||||||
pub mod control_file_upgrade;
|
pub mod control_file_upgrade;
|
||||||
@@ -47,6 +49,7 @@ pub struct SafeKeeperConf {
|
|||||||
pub ttl: Option<Duration>,
|
pub ttl: Option<Duration>,
|
||||||
pub recall_period: Duration,
|
pub recall_period: Duration,
|
||||||
pub my_id: ZNodeId,
|
pub my_id: ZNodeId,
|
||||||
|
pub broker_endpoints: Option<Vec<Url>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SafeKeeperConf {
|
impl SafeKeeperConf {
|
||||||
@@ -71,6 +74,7 @@ impl Default for SafeKeeperConf {
|
|||||||
ttl: None,
|
ttl: None,
|
||||||
recall_period: defaults::DEFAULT_RECALL_PERIOD,
|
recall_period: defaults::DEFAULT_RECALL_PERIOD,
|
||||||
my_id: ZNodeId(0),
|
my_id: ZNodeId(0),
|
||||||
|
broker_endpoints: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -193,7 +193,7 @@ pub struct SafeKeeperState {
|
|||||||
pub peer_horizon_lsn: Lsn,
|
pub peer_horizon_lsn: Lsn,
|
||||||
/// LSN of the oldest known checkpoint made by pageserver and successfully
|
/// LSN of the oldest known checkpoint made by pageserver and successfully
|
||||||
/// pushed to s3. We don't remove WAL beyond it. Persisted only for
|
/// pushed to s3. We don't remove WAL beyond it. Persisted only for
|
||||||
/// informational purposes, we receive it from pageserver.
|
/// informational purposes, we receive it from pageserver (or broker).
|
||||||
pub remote_consistent_lsn: Lsn,
|
pub remote_consistent_lsn: Lsn,
|
||||||
// Peers and their state as we remember it. Knowing peers themselves is
|
// Peers and their state as we remember it. Knowing peers themselves is
|
||||||
// fundamental; but state is saved here only for informational purposes and
|
// fundamental; but state is saved here only for informational purposes and
|
||||||
@@ -203,11 +203,13 @@ pub struct SafeKeeperState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
// In memory safekeeper state. Fields mirror ones in `SafeKeeperState`; they are
|
// In memory safekeeper state. Fields mirror ones in `SafeKeeperState`; values
|
||||||
// not flushed yet.
|
// are not flushed yet.
|
||||||
pub struct SafekeeperMemState {
|
pub struct SafekeeperMemState {
|
||||||
pub commit_lsn: Lsn,
|
pub commit_lsn: Lsn,
|
||||||
|
pub s3_wal_lsn: Lsn, // TODO: keep only persistent version
|
||||||
pub peer_horizon_lsn: Lsn,
|
pub peer_horizon_lsn: Lsn,
|
||||||
|
pub remote_consistent_lsn: Lsn,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SafeKeeperState {
|
impl SafeKeeperState {
|
||||||
@@ -494,14 +496,13 @@ pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
|
|||||||
metrics: SafeKeeperMetrics,
|
metrics: SafeKeeperMetrics,
|
||||||
|
|
||||||
/// Maximum commit_lsn between all nodes, can be ahead of local flush_lsn.
|
/// Maximum commit_lsn between all nodes, can be ahead of local flush_lsn.
|
||||||
global_commit_lsn: Lsn,
|
pub global_commit_lsn: Lsn,
|
||||||
/// LSN since the proposer safekeeper currently talking to appends WAL;
|
/// LSN since the proposer safekeeper currently talking to appends WAL;
|
||||||
/// determines epoch switch point.
|
/// determines epoch switch point.
|
||||||
epoch_start_lsn: Lsn,
|
epoch_start_lsn: Lsn,
|
||||||
|
|
||||||
pub inmem: SafekeeperMemState, // in memory part
|
pub inmem: SafekeeperMemState, // in memory part
|
||||||
|
pub s: SafeKeeperState, // persistent part
|
||||||
pub s: SafeKeeperState, // persistent part
|
|
||||||
|
|
||||||
pub control_store: CTRL,
|
pub control_store: CTRL,
|
||||||
pub wal_store: WAL,
|
pub wal_store: WAL,
|
||||||
@@ -529,7 +530,9 @@ where
|
|||||||
epoch_start_lsn: Lsn(0),
|
epoch_start_lsn: Lsn(0),
|
||||||
inmem: SafekeeperMemState {
|
inmem: SafekeeperMemState {
|
||||||
commit_lsn: state.commit_lsn,
|
commit_lsn: state.commit_lsn,
|
||||||
|
s3_wal_lsn: state.s3_wal_lsn,
|
||||||
peer_horizon_lsn: state.peer_horizon_lsn,
|
peer_horizon_lsn: state.peer_horizon_lsn,
|
||||||
|
remote_consistent_lsn: state.remote_consistent_lsn,
|
||||||
},
|
},
|
||||||
s: state,
|
s: state,
|
||||||
control_store,
|
control_store,
|
||||||
@@ -545,8 +548,7 @@ where
|
|||||||
.up_to(self.wal_store.flush_lsn())
|
.up_to(self.wal_store.flush_lsn())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
pub fn get_epoch(&self) -> Term {
|
||||||
fn get_epoch(&self) -> Term {
|
|
||||||
self.s.acceptor_state.get_epoch(self.wal_store.flush_lsn())
|
self.s.acceptor_state.get_epoch(self.wal_store.flush_lsn())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -697,7 +699,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Advance commit_lsn taking into account what we have locally
|
/// Advance commit_lsn taking into account what we have locally
|
||||||
fn update_commit_lsn(&mut self) -> Result<()> {
|
pub fn update_commit_lsn(&mut self) -> Result<()> {
|
||||||
let commit_lsn = min(self.global_commit_lsn, self.wal_store.flush_lsn());
|
let commit_lsn = min(self.global_commit_lsn, self.wal_store.flush_lsn());
|
||||||
assert!(commit_lsn >= self.inmem.commit_lsn);
|
assert!(commit_lsn >= self.inmem.commit_lsn);
|
||||||
|
|
||||||
|
|||||||
@@ -230,7 +230,7 @@ impl ReplicationConn {
|
|||||||
|
|
||||||
let mut wal_seg_size: usize;
|
let mut wal_seg_size: usize;
|
||||||
loop {
|
loop {
|
||||||
wal_seg_size = spg.timeline.get().get_info().server.wal_seg_size as usize;
|
wal_seg_size = spg.timeline.get().get_state().1.server.wal_seg_size as usize;
|
||||||
if wal_seg_size == 0 {
|
if wal_seg_size == 0 {
|
||||||
error!("Cannot start replication before connecting to wal_proposer");
|
error!("Cannot start replication before connecting to wal_proposer");
|
||||||
sleep(Duration::from_secs(1));
|
sleep(Duration::from_secs(1));
|
||||||
|
|||||||
@@ -17,12 +17,14 @@ use tracing::*;
|
|||||||
use zenith_utils::lsn::Lsn;
|
use zenith_utils::lsn::Lsn;
|
||||||
use zenith_utils::zid::{ZNodeId, ZTenantTimelineId};
|
use zenith_utils::zid::{ZNodeId, ZTenantTimelineId};
|
||||||
|
|
||||||
|
use crate::broker::SafekeeperInfo;
|
||||||
use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey};
|
use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey};
|
||||||
|
|
||||||
use crate::control_file;
|
use crate::control_file;
|
||||||
use crate::control_file::Storage as cf_storage;
|
use crate::control_file::Storage as cf_storage;
|
||||||
use crate::safekeeper::{
|
use crate::safekeeper::{
|
||||||
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
|
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
|
||||||
|
SafekeeperMemState,
|
||||||
};
|
};
|
||||||
use crate::send_wal::HotStandbyFeedback;
|
use crate::send_wal::HotStandbyFeedback;
|
||||||
use crate::wal_storage;
|
use crate::wal_storage;
|
||||||
@@ -349,6 +351,11 @@ impl Timeline {
|
|||||||
Ok(false)
|
Ok(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn is_active(&self) -> bool {
|
||||||
|
let shared_state = self.mutex.lock().unwrap();
|
||||||
|
shared_state.active
|
||||||
|
}
|
||||||
|
|
||||||
/// Timed wait for an LSN to be committed.
|
/// Timed wait for an LSN to be committed.
|
||||||
///
|
///
|
||||||
/// Returns the last committed LSN, which will be at least
|
/// Returns the last committed LSN, which will be at least
|
||||||
@@ -410,8 +417,61 @@ impl Timeline {
|
|||||||
Ok(rmsg)
|
Ok(rmsg)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_info(&self) -> SafeKeeperState {
|
pub fn get_state(&self) -> (SafekeeperMemState, SafeKeeperState) {
|
||||||
self.mutex.lock().unwrap().sk.s.clone()
|
let shared_state = self.mutex.lock().unwrap();
|
||||||
|
(shared_state.sk.inmem.clone(), shared_state.sk.s.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Prepare public safekeeper info for reporting.
|
||||||
|
pub fn get_public_info(&self) -> SafekeeperInfo {
|
||||||
|
let shared_state = self.mutex.lock().unwrap();
|
||||||
|
SafekeeperInfo {
|
||||||
|
last_log_term: Some(shared_state.sk.get_epoch()),
|
||||||
|
flush_lsn: Some(shared_state.sk.wal_store.flush_lsn()),
|
||||||
|
// note: this value is not flushed to control file yet and can be lost
|
||||||
|
commit_lsn: Some(shared_state.sk.inmem.commit_lsn),
|
||||||
|
s3_wal_lsn: Some(shared_state.sk.inmem.s3_wal_lsn),
|
||||||
|
// TODO: rework feedbacks to avoid max here
|
||||||
|
remote_consistent_lsn: Some(max(
|
||||||
|
shared_state.get_replicas_state().remote_consistent_lsn,
|
||||||
|
shared_state.sk.inmem.remote_consistent_lsn,
|
||||||
|
)),
|
||||||
|
peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Update timeline state with peer safekeeper data.
|
||||||
|
pub fn record_safekeeper_info(&self, sk_info: &SafekeeperInfo, _sk_id: ZNodeId) -> Result<()> {
|
||||||
|
let mut shared_state = self.mutex.lock().unwrap();
|
||||||
|
// Note: the check is too restrictive, generally we can update local
|
||||||
|
// commit_lsn if our history matches (is part of) history of advanced
|
||||||
|
// commit_lsn provider.
|
||||||
|
if let (Some(commit_lsn), Some(last_log_term)) = (sk_info.commit_lsn, sk_info.last_log_term)
|
||||||
|
{
|
||||||
|
if last_log_term == shared_state.sk.get_epoch() {
|
||||||
|
shared_state.sk.global_commit_lsn =
|
||||||
|
max(commit_lsn, shared_state.sk.global_commit_lsn);
|
||||||
|
shared_state.sk.update_commit_lsn()?;
|
||||||
|
let local_commit_lsn = min(commit_lsn, shared_state.sk.wal_store.flush_lsn());
|
||||||
|
shared_state.sk.inmem.commit_lsn =
|
||||||
|
max(local_commit_lsn, shared_state.sk.inmem.commit_lsn);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Some(s3_wal_lsn) = sk_info.s3_wal_lsn {
|
||||||
|
shared_state.sk.inmem.s3_wal_lsn = max(s3_wal_lsn, shared_state.sk.inmem.s3_wal_lsn);
|
||||||
|
}
|
||||||
|
if let Some(remote_consistent_lsn) = sk_info.remote_consistent_lsn {
|
||||||
|
shared_state.sk.inmem.remote_consistent_lsn = max(
|
||||||
|
remote_consistent_lsn,
|
||||||
|
shared_state.sk.inmem.remote_consistent_lsn,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if let Some(peer_horizon_lsn) = sk_info.peer_horizon_lsn {
|
||||||
|
shared_state.sk.inmem.peer_horizon_lsn =
|
||||||
|
max(peer_horizon_lsn, shared_state.sk.inmem.peer_horizon_lsn);
|
||||||
|
}
|
||||||
|
// TODO: sync control file
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn add_replica(&self, state: ReplicaState) -> usize {
|
pub fn add_replica(&self, state: ReplicaState) -> usize {
|
||||||
@@ -495,7 +555,7 @@ impl GlobalTimelines {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Get a timeline with control file loaded from the global TIMELINES map.
|
/// Get a timeline with control file loaded from the global TIMELINES map.
|
||||||
/// If control file doesn't exist, bails out.
|
/// If control file doesn't exist and create=false, bails out.
|
||||||
pub fn get(
|
pub fn get(
|
||||||
conf: &SafeKeeperConf,
|
conf: &SafeKeeperConf,
|
||||||
zttid: ZTenantTimelineId,
|
zttid: ZTenantTimelineId,
|
||||||
@@ -537,4 +597,14 @@ impl GlobalTimelines {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get ZTenantTimelineIDs of all active timelines.
|
||||||
|
pub fn get_active_timelines() -> Vec<ZTenantTimelineId> {
|
||||||
|
let timelines = TIMELINES.lock().unwrap();
|
||||||
|
timelines
|
||||||
|
.iter()
|
||||||
|
.filter(|&(_, tli)| tli.is_active())
|
||||||
|
.map(|(zttid, _)| *zttid)
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user