mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-19 06:00:38 +00:00
Compare commits
17 Commits
tristan957
...
diko/safek
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c53b4545c8 | ||
|
|
e48ac9ed76 | ||
|
|
94cb9a79d9 | ||
|
|
961835add6 | ||
|
|
fc242afcc2 | ||
|
|
e275221aef | ||
|
|
f859354466 | ||
|
|
b00a0096bf | ||
|
|
b3844903e5 | ||
|
|
5b0972151c | ||
|
|
51ffeef93f | ||
|
|
0fe07dec32 | ||
|
|
8de320ab9b | ||
|
|
108f7ec544 | ||
|
|
63d2b1844d | ||
|
|
133f16e9b5 | ||
|
|
88391ce069 |
171
Cargo.lock
generated
171
Cargo.lock
generated
@@ -1097,7 +1097,7 @@ checksum = "975982cdb7ad6a142be15bdf84aea7ec6a9e5d4d797c004d43185b24cfe4e684"
|
||||
dependencies = [
|
||||
"clap",
|
||||
"heck 0.5.0",
|
||||
"indexmap 2.9.0",
|
||||
"indexmap 2.10.0",
|
||||
"log",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -1313,7 +1313,7 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"chrono",
|
||||
"indexmap 2.9.0",
|
||||
"indexmap 2.10.0",
|
||||
"jsonwebtoken",
|
||||
"regex",
|
||||
"remote_storage",
|
||||
@@ -1350,7 +1350,7 @@ dependencies = [
|
||||
"http-body-util",
|
||||
"hyper 1.4.1",
|
||||
"hyper-util",
|
||||
"indexmap 2.9.0",
|
||||
"indexmap 2.10.0",
|
||||
"itertools 0.10.5",
|
||||
"jsonwebtoken",
|
||||
"metrics",
|
||||
@@ -1383,7 +1383,7 @@ dependencies = [
|
||||
"tokio-postgres",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"tonic 0.13.1",
|
||||
"tonic",
|
||||
"tower 0.5.2",
|
||||
"tower-http",
|
||||
"tower-otel",
|
||||
@@ -2659,7 +2659,7 @@ dependencies = [
|
||||
"futures-sink",
|
||||
"futures-util",
|
||||
"http 0.2.9",
|
||||
"indexmap 2.9.0",
|
||||
"indexmap 2.10.0",
|
||||
"slab",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
@@ -2678,7 +2678,7 @@ dependencies = [
|
||||
"futures-sink",
|
||||
"futures-util",
|
||||
"http 1.3.1",
|
||||
"indexmap 2.9.0",
|
||||
"indexmap 2.10.0",
|
||||
"slab",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
@@ -2937,7 +2937,7 @@ dependencies = [
|
||||
"pprof",
|
||||
"regex",
|
||||
"routerify",
|
||||
"rustls 0.23.27",
|
||||
"rustls 0.23.29",
|
||||
"rustls-pemfile 2.1.1",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -3274,9 +3274,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "indexmap"
|
||||
version = "2.9.0"
|
||||
version = "2.10.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cea70ddb795996207ad57735b50c5982d8844f38ba9ee5f1aedcfb708a2aa11e"
|
||||
checksum = "fe4cd85333e22411419a0bcae1297d25e58c9443848b11dc6a86fefe8c78a661"
|
||||
dependencies = [
|
||||
"equivalent",
|
||||
"hashbrown 0.15.2",
|
||||
@@ -3302,7 +3302,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "232929e1d75fe899576a3d5c7416ad0d88dbfbb3c3d6aa00873a7408a50ddb88"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"indexmap 2.9.0",
|
||||
"indexmap 2.10.0",
|
||||
"is-terminal",
|
||||
"itoa",
|
||||
"log",
|
||||
@@ -3325,7 +3325,7 @@ dependencies = [
|
||||
"crossbeam-utils",
|
||||
"dashmap 6.1.0",
|
||||
"env_logger",
|
||||
"indexmap 2.9.0",
|
||||
"indexmap 2.10.0",
|
||||
"itoa",
|
||||
"log",
|
||||
"num-format",
|
||||
@@ -4162,23 +4162,23 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry"
|
||||
version = "0.27.1"
|
||||
version = "0.30.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ab70038c28ed37b97d8ed414b6429d343a8bbf44c9f79ec854f3a643029ba6d7"
|
||||
checksum = "aaf416e4cb72756655126f7dd7bb0af49c674f4c1b9903e80c009e0c37e552e6"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
"js-sys",
|
||||
"pin-project-lite",
|
||||
"thiserror 1.0.69",
|
||||
"thiserror 2.0.11",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry-http"
|
||||
version = "0.27.0"
|
||||
version = "0.30.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "10a8a7f5f6ba7c1b286c2fbca0454eaba116f63bbe69ed250b642d36fbb04d80"
|
||||
checksum = "50f6639e842a97dbea8886e3439710ae463120091e2e064518ba8e716e6ac36d"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
@@ -4189,12 +4189,10 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry-otlp"
|
||||
version = "0.27.0"
|
||||
version = "0.30.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "91cf61a1868dacc576bf2b2a1c3e9ab150af7272909e80085c3173384fe11f76"
|
||||
checksum = "dbee664a43e07615731afc539ca60c6d9f1a9425e25ca09c57bc36c87c55852b"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"futures-core",
|
||||
"http 1.3.1",
|
||||
"opentelemetry",
|
||||
"opentelemetry-http",
|
||||
@@ -4202,46 +4200,43 @@ dependencies = [
|
||||
"opentelemetry_sdk",
|
||||
"prost 0.13.5",
|
||||
"reqwest",
|
||||
"thiserror 1.0.69",
|
||||
"thiserror 2.0.11",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry-proto"
|
||||
version = "0.27.0"
|
||||
version = "0.30.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a6e05acbfada5ec79023c85368af14abd0b307c015e9064d249b2a950ef459a6"
|
||||
checksum = "2e046fd7660710fe5a05e8748e70d9058dc15c94ba914e7c4faa7c728f0e8ddc"
|
||||
dependencies = [
|
||||
"opentelemetry",
|
||||
"opentelemetry_sdk",
|
||||
"prost 0.13.5",
|
||||
"tonic 0.12.3",
|
||||
"tonic",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry-semantic-conventions"
|
||||
version = "0.27.0"
|
||||
version = "0.30.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bc1b6902ff63b32ef6c489e8048c5e253e2e4a803ea3ea7e783914536eb15c52"
|
||||
checksum = "83d059a296a47436748557a353c5e6c5705b9470ef6c95cfc52c21a8814ddac2"
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry_sdk"
|
||||
version = "0.27.1"
|
||||
version = "0.30.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "231e9d6ceef9b0b2546ddf52335785ce41252bc7474ee8ba05bfad277be13ab8"
|
||||
checksum = "11f644aa9e5e31d11896e024305d7e3c98a88884d9f8919dbf37a9991bc47a4b"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"futures-channel",
|
||||
"futures-executor",
|
||||
"futures-util",
|
||||
"glob",
|
||||
"opentelemetry",
|
||||
"percent-encoding",
|
||||
"rand 0.8.5",
|
||||
"rand 0.9.1",
|
||||
"serde_json",
|
||||
"thiserror 1.0.69",
|
||||
"thiserror 2.0.11",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4368,7 +4363,7 @@ dependencies = [
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"tonic 0.13.1",
|
||||
"tonic",
|
||||
"tracing",
|
||||
"url",
|
||||
"utils",
|
||||
@@ -4465,7 +4460,7 @@ dependencies = [
|
||||
"reqwest",
|
||||
"rpds",
|
||||
"rstest",
|
||||
"rustls 0.23.27",
|
||||
"rustls 0.23.29",
|
||||
"scopeguard",
|
||||
"send-future",
|
||||
"serde",
|
||||
@@ -4489,7 +4484,7 @@ dependencies = [
|
||||
"tokio-tar",
|
||||
"tokio-util",
|
||||
"toml_edit",
|
||||
"tonic 0.13.1",
|
||||
"tonic",
|
||||
"tonic-reflection",
|
||||
"tower 0.5.2",
|
||||
"tracing",
|
||||
@@ -4575,7 +4570,7 @@ dependencies = [
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"tonic 0.13.1",
|
||||
"tonic",
|
||||
"tracing",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
@@ -4620,7 +4615,7 @@ dependencies = [
|
||||
"thiserror 1.0.69",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tonic 0.13.1",
|
||||
"tonic",
|
||||
"tonic-build",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
@@ -5002,7 +4997,7 @@ dependencies = [
|
||||
"bytes",
|
||||
"once_cell",
|
||||
"pq_proto",
|
||||
"rustls 0.23.27",
|
||||
"rustls 0.23.29",
|
||||
"rustls-pemfile 2.1.1",
|
||||
"serde",
|
||||
"thiserror 1.0.69",
|
||||
@@ -5401,7 +5396,7 @@ dependencies = [
|
||||
"hyper 0.14.30",
|
||||
"hyper 1.4.1",
|
||||
"hyper-util",
|
||||
"indexmap 2.9.0",
|
||||
"indexmap 2.10.0",
|
||||
"ipnet",
|
||||
"itertools 0.10.5",
|
||||
"itoa",
|
||||
@@ -5438,7 +5433,7 @@ dependencies = [
|
||||
"rsa",
|
||||
"rstest",
|
||||
"rustc-hash 2.1.1",
|
||||
"rustls 0.23.27",
|
||||
"rustls 0.23.29",
|
||||
"rustls-native-certs 0.8.0",
|
||||
"rustls-pemfile 2.1.1",
|
||||
"scopeguard",
|
||||
@@ -5717,7 +5712,7 @@ dependencies = [
|
||||
"num-bigint",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"rustls 0.23.27",
|
||||
"rustls 0.23.29",
|
||||
"rustls-native-certs 0.8.0",
|
||||
"ryu",
|
||||
"sha1_smol",
|
||||
@@ -5946,9 +5941,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "reqwest-tracing"
|
||||
version = "0.5.5"
|
||||
version = "0.5.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "73e6153390585f6961341b50e5a1931d6be6dee4292283635903c26ef9d980d2"
|
||||
checksum = "d70ea85f131b2ee9874f0b160ac5976f8af75f3c9badfe0d955880257d10bd83"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
@@ -6173,15 +6168,15 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rustls"
|
||||
version = "0.23.27"
|
||||
version = "0.23.29"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "730944ca083c1c233a75c09f199e973ca499344a2b7ba9e755c457e86fb4a321"
|
||||
checksum = "2491382039b29b9b11ff08b76ff6c97cf287671dbb74f0be44bda389fffe9bd1"
|
||||
dependencies = [
|
||||
"log",
|
||||
"once_cell",
|
||||
"ring",
|
||||
"rustls-pki-types",
|
||||
"rustls-webpki 0.103.3",
|
||||
"rustls-webpki 0.103.4",
|
||||
"subtle",
|
||||
"zeroize",
|
||||
]
|
||||
@@ -6245,9 +6240,12 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rustls-pki-types"
|
||||
version = "1.11.0"
|
||||
version = "1.12.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "917ce264624a4b4db1c364dcc35bfca9ded014d0a958cd47ad3e960e988ea51c"
|
||||
checksum = "229a4a4c221013e7e1f1a043678c5cc39fe5171437c88fb47151a21e6f5b5c79"
|
||||
dependencies = [
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-webpki"
|
||||
@@ -6272,9 +6270,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rustls-webpki"
|
||||
version = "0.103.3"
|
||||
version = "0.103.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e4a72fe2bcf7a6ac6fd7d0b9e5cb68aeb7d4c0a0271730218b3e92d43b4eb435"
|
||||
checksum = "0a17884ae0c1b773f1ccd2bd4a8c72f16da897310a98b0e84bf349ad5ead92fc"
|
||||
dependencies = [
|
||||
"ring",
|
||||
"rustls-pki-types",
|
||||
@@ -6335,7 +6333,7 @@ dependencies = [
|
||||
"regex",
|
||||
"remote_storage",
|
||||
"reqwest",
|
||||
"rustls 0.23.27",
|
||||
"rustls 0.23.29",
|
||||
"safekeeper_api",
|
||||
"safekeeper_client",
|
||||
"scopeguard",
|
||||
@@ -6525,7 +6523,7 @@ checksum = "255914a8e53822abd946e2ce8baa41d4cded6b8e938913b7f7b9da5b7ab44335"
|
||||
dependencies = [
|
||||
"httpdate",
|
||||
"reqwest",
|
||||
"rustls 0.23.27",
|
||||
"rustls 0.23.29",
|
||||
"sentry-backtrace",
|
||||
"sentry-contexts",
|
||||
"sentry-core",
|
||||
@@ -6657,7 +6655,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9d2de91cf02bbc07cde38891769ccd5d4f073d22a40683aa4bc7a95781aaa2c4"
|
||||
dependencies = [
|
||||
"form_urlencoded",
|
||||
"indexmap 2.9.0",
|
||||
"indexmap 2.10.0",
|
||||
"itoa",
|
||||
"ryu",
|
||||
"serde",
|
||||
@@ -6738,7 +6736,7 @@ dependencies = [
|
||||
"chrono",
|
||||
"hex",
|
||||
"indexmap 1.9.3",
|
||||
"indexmap 2.9.0",
|
||||
"indexmap 2.10.0",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
"serde_json",
|
||||
@@ -6981,10 +6979,10 @@ dependencies = [
|
||||
"once_cell",
|
||||
"parking_lot 0.12.1",
|
||||
"prost 0.13.5",
|
||||
"rustls 0.23.27",
|
||||
"rustls 0.23.29",
|
||||
"tokio",
|
||||
"tokio-rustls 0.26.2",
|
||||
"tonic 0.13.1",
|
||||
"tonic",
|
||||
"tonic-build",
|
||||
"tracing",
|
||||
"utils",
|
||||
@@ -7029,7 +7027,7 @@ dependencies = [
|
||||
"regex",
|
||||
"reqwest",
|
||||
"routerify",
|
||||
"rustls 0.23.27",
|
||||
"rustls 0.23.29",
|
||||
"rustls-native-certs 0.8.0",
|
||||
"safekeeper_api",
|
||||
"safekeeper_client",
|
||||
@@ -7083,7 +7081,7 @@ dependencies = [
|
||||
"postgres_ffi",
|
||||
"remote_storage",
|
||||
"reqwest",
|
||||
"rustls 0.23.27",
|
||||
"rustls 0.23.29",
|
||||
"rustls-native-certs 0.8.0",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -7622,7 +7620,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "04fb792ccd6bbcd4bba408eb8a292f70fc4a3589e5d793626f45190e6454b6ab"
|
||||
dependencies = [
|
||||
"ring",
|
||||
"rustls 0.23.27",
|
||||
"rustls 0.23.29",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-rustls 0.26.2",
|
||||
@@ -7673,7 +7671,7 @@ version = "0.26.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b"
|
||||
dependencies = [
|
||||
"rustls 0.23.27",
|
||||
"rustls 0.23.29",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
@@ -7772,34 +7770,13 @@ version = "0.22.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f21c7aaf97f1bd9ca9d4f9e73b0a6c74bd5afef56f2bc931943a6e1c37e04e38"
|
||||
dependencies = [
|
||||
"indexmap 2.9.0",
|
||||
"indexmap 2.10.0",
|
||||
"serde",
|
||||
"serde_spanned",
|
||||
"toml_datetime",
|
||||
"winnow",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tonic"
|
||||
version = "0.12.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"base64 0.22.1",
|
||||
"bytes",
|
||||
"http 1.3.1",
|
||||
"http-body 1.0.0",
|
||||
"http-body-util",
|
||||
"percent-encoding",
|
||||
"pin-project",
|
||||
"prost 0.13.5",
|
||||
"tokio-stream",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tonic"
|
||||
version = "0.13.1"
|
||||
@@ -7857,7 +7834,7 @@ dependencies = [
|
||||
"prost-types 0.13.5",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tonic 0.13.1",
|
||||
"tonic",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -7883,7 +7860,7 @@ checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"indexmap 2.9.0",
|
||||
"indexmap 2.10.0",
|
||||
"pin-project-lite",
|
||||
"slab",
|
||||
"sync_wrapper 1.0.1",
|
||||
@@ -7921,10 +7898,14 @@ checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e"
|
||||
|
||||
[[package]]
|
||||
name = "tower-otel"
|
||||
version = "0.2.0"
|
||||
source = "git+https://github.com/mattiapenati/tower-otel?rev=56a7321053bcb72443888257b622ba0d43a11fcd#56a7321053bcb72443888257b622ba0d43a11fcd"
|
||||
version = "0.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "345000ea5ae33222624a8ccfdd88892c30db4d413a39c2d4bd714b77e0a4b23c"
|
||||
dependencies = [
|
||||
"axum",
|
||||
"cfg-if",
|
||||
"http 1.3.1",
|
||||
"http-body 1.0.0",
|
||||
"opentelemetry",
|
||||
"pin-project",
|
||||
"tower-layer",
|
||||
@@ -8006,9 +7987,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tracing-opentelemetry"
|
||||
version = "0.28.0"
|
||||
version = "0.31.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "97a971f6058498b5c0f1affa23e7ea202057a7301dbff68e968b2d578bcbd053"
|
||||
checksum = "ddcf5959f39507d0d04d6413119c04f33b623f4f951ebcbdddddfad2d0623a9c"
|
||||
dependencies = [
|
||||
"js-sys",
|
||||
"once_cell",
|
||||
@@ -8216,7 +8197,7 @@ dependencies = [
|
||||
"base64 0.22.1",
|
||||
"log",
|
||||
"once_cell",
|
||||
"rustls 0.23.27",
|
||||
"rustls 0.23.29",
|
||||
"rustls-pki-types",
|
||||
"url",
|
||||
"webpki-roots",
|
||||
@@ -8888,7 +8869,7 @@ dependencies = [
|
||||
"hyper 0.14.30",
|
||||
"hyper 1.4.1",
|
||||
"hyper-util",
|
||||
"indexmap 2.9.0",
|
||||
"indexmap 2.10.0",
|
||||
"itertools 0.12.1",
|
||||
"lazy_static",
|
||||
"libc",
|
||||
@@ -8911,14 +8892,14 @@ dependencies = [
|
||||
"proc-macro2",
|
||||
"prost 0.13.5",
|
||||
"quote",
|
||||
"rand 0.8.5",
|
||||
"rand 0.9.1",
|
||||
"regex",
|
||||
"regex-automata 0.4.9",
|
||||
"regex-syntax 0.8.5",
|
||||
"reqwest",
|
||||
"rustls 0.23.27",
|
||||
"rustls 0.23.29",
|
||||
"rustls-pki-types",
|
||||
"rustls-webpki 0.103.3",
|
||||
"rustls-webpki 0.103.4",
|
||||
"scopeguard",
|
||||
"sec1 0.7.3",
|
||||
"serde",
|
||||
@@ -8931,6 +8912,7 @@ dependencies = [
|
||||
"subtle",
|
||||
"syn 2.0.100",
|
||||
"sync_wrapper 0.1.2",
|
||||
"thiserror 2.0.11",
|
||||
"tikv-jemalloc-ctl",
|
||||
"tikv-jemalloc-sys",
|
||||
"time",
|
||||
@@ -8940,6 +8922,7 @@ dependencies = [
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"toml_edit",
|
||||
"tonic",
|
||||
"tower 0.5.2",
|
||||
"tracing",
|
||||
"tracing-core",
|
||||
|
||||
17
Cargo.toml
17
Cargo.toml
@@ -143,10 +143,10 @@ notify = "6.0.0"
|
||||
num_cpus = "1.15"
|
||||
num-traits = "0.2.19"
|
||||
once_cell = "1.13"
|
||||
opentelemetry = "0.27"
|
||||
opentelemetry_sdk = "0.27"
|
||||
opentelemetry-otlp = { version = "0.27", default-features = false, features = ["http-proto", "trace", "http", "reqwest-client"] }
|
||||
opentelemetry-semantic-conventions = "0.27"
|
||||
opentelemetry = "0.30"
|
||||
opentelemetry_sdk = "0.30"
|
||||
opentelemetry-otlp = { version = "0.30", default-features = false, features = ["http-proto", "trace", "http", "reqwest-client"] }
|
||||
opentelemetry-semantic-conventions = "0.30"
|
||||
parking_lot = "0.12"
|
||||
parquet = { version = "53", default-features = false, features = ["zstd"] }
|
||||
parquet_derive = "53"
|
||||
@@ -164,7 +164,7 @@ rand_core = "=0.6"
|
||||
redis = { version = "0.29.2", features = ["tokio-rustls-comp", "keep-alive"] }
|
||||
regex = "1.10.2"
|
||||
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] }
|
||||
reqwest-tracing = { version = "0.5", features = ["opentelemetry_0_27"] }
|
||||
reqwest-tracing = { version = "0.5", features = ["opentelemetry_0_30"] }
|
||||
reqwest-middleware = "0.4"
|
||||
reqwest-retry = "0.7"
|
||||
routerify = "3"
|
||||
@@ -214,15 +214,12 @@ tonic = { version = "0.13.1", default-features = false, features = ["channel", "
|
||||
tonic-reflection = { version = "0.13.1", features = ["server"] }
|
||||
tower = { version = "0.5.2", default-features = false }
|
||||
tower-http = { version = "0.6.2", features = ["auth", "request-id", "trace"] }
|
||||
|
||||
# This revision uses opentelemetry 0.27. There's no tag for it.
|
||||
tower-otel = { git = "https://github.com/mattiapenati/tower-otel", rev = "56a7321053bcb72443888257b622ba0d43a11fcd" }
|
||||
|
||||
tower-otel = { version = "0.6", features = ["axum"] }
|
||||
tower-service = "0.3.3"
|
||||
tracing = "0.1"
|
||||
tracing-error = "0.2"
|
||||
tracing-log = "0.2"
|
||||
tracing-opentelemetry = "0.28"
|
||||
tracing-opentelemetry = "0.31"
|
||||
tracing-serde = "0.2.0"
|
||||
tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json"] }
|
||||
try-lock = "0.2.5"
|
||||
|
||||
@@ -138,6 +138,12 @@ struct Cli {
|
||||
/// Run in development mode, skipping VM-specific operations like process termination
|
||||
#[arg(long, action = clap::ArgAction::SetTrue)]
|
||||
pub dev: bool,
|
||||
|
||||
#[arg(long)]
|
||||
pub pg_init_timeout: Option<u64>,
|
||||
|
||||
#[arg(long, default_value_t = false, action = clap::ArgAction::Set)]
|
||||
pub lakebase_mode: bool,
|
||||
}
|
||||
|
||||
impl Cli {
|
||||
@@ -188,7 +194,7 @@ fn main() -> Result<()> {
|
||||
.build()?;
|
||||
let _rt_guard = runtime.enter();
|
||||
|
||||
runtime.block_on(init(cli.dev))?;
|
||||
let tracing_provider = init(cli.dev)?;
|
||||
|
||||
// enable core dumping for all child processes
|
||||
setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
|
||||
@@ -219,6 +225,8 @@ fn main() -> Result<()> {
|
||||
installed_extensions_collection_interval: Arc::new(AtomicU64::new(
|
||||
cli.installed_extensions_collection_interval,
|
||||
)),
|
||||
pg_init_timeout: cli.pg_init_timeout.map(Duration::from_secs),
|
||||
lakebase_mode: cli.lakebase_mode,
|
||||
},
|
||||
config,
|
||||
)?;
|
||||
@@ -227,11 +235,11 @@ fn main() -> Result<()> {
|
||||
|
||||
scenario.teardown();
|
||||
|
||||
deinit_and_exit(exit_code);
|
||||
deinit_and_exit(tracing_provider, exit_code);
|
||||
}
|
||||
|
||||
async fn init(dev_mode: bool) -> Result<()> {
|
||||
init_tracing_and_logging(DEFAULT_LOG_LEVEL).await?;
|
||||
fn init(dev_mode: bool) -> Result<Option<tracing_utils::Provider>> {
|
||||
let provider = init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
|
||||
|
||||
let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
|
||||
thread::spawn(move || {
|
||||
@@ -242,7 +250,7 @@ async fn init(dev_mode: bool) -> Result<()> {
|
||||
|
||||
info!("compute build_tag: {}", &BUILD_TAG.to_string());
|
||||
|
||||
Ok(())
|
||||
Ok(provider)
|
||||
}
|
||||
|
||||
fn get_config(cli: &Cli) -> Result<ComputeConfig> {
|
||||
@@ -267,25 +275,27 @@ fn get_config(cli: &Cli) -> Result<ComputeConfig> {
|
||||
}
|
||||
}
|
||||
|
||||
fn deinit_and_exit(exit_code: Option<i32>) -> ! {
|
||||
// Shutdown trace pipeline gracefully, so that it has a chance to send any
|
||||
// pending traces before we exit. Shutting down OTEL tracing provider may
|
||||
// hang for quite some time, see, for example:
|
||||
// - https://github.com/open-telemetry/opentelemetry-rust/issues/868
|
||||
// - and our problems with staging https://github.com/neondatabase/cloud/issues/3707#issuecomment-1493983636
|
||||
//
|
||||
// Yet, we want computes to shut down fast enough, as we may need a new one
|
||||
// for the same timeline ASAP. So wait no longer than 2s for the shutdown to
|
||||
// complete, then just error out and exit the main thread.
|
||||
info!("shutting down tracing");
|
||||
let (sender, receiver) = mpsc::channel();
|
||||
let _ = thread::spawn(move || {
|
||||
tracing_utils::shutdown_tracing();
|
||||
sender.send(()).ok()
|
||||
});
|
||||
let shutdown_res = receiver.recv_timeout(Duration::from_millis(2000));
|
||||
if shutdown_res.is_err() {
|
||||
error!("timed out while shutting down tracing, exiting anyway");
|
||||
fn deinit_and_exit(tracing_provider: Option<tracing_utils::Provider>, exit_code: Option<i32>) -> ! {
|
||||
if let Some(p) = tracing_provider {
|
||||
// Shutdown trace pipeline gracefully, so that it has a chance to send any
|
||||
// pending traces before we exit. Shutting down OTEL tracing provider may
|
||||
// hang for quite some time, see, for example:
|
||||
// - https://github.com/open-telemetry/opentelemetry-rust/issues/868
|
||||
// - and our problems with staging https://github.com/neondatabase/cloud/issues/3707#issuecomment-1493983636
|
||||
//
|
||||
// Yet, we want computes to shut down fast enough, as we may need a new one
|
||||
// for the same timeline ASAP. So wait no longer than 2s for the shutdown to
|
||||
// complete, then just error out and exit the main thread.
|
||||
info!("shutting down tracing");
|
||||
let (sender, receiver) = mpsc::channel();
|
||||
let _ = thread::spawn(move || {
|
||||
_ = p.shutdown();
|
||||
sender.send(()).ok()
|
||||
});
|
||||
let shutdown_res = receiver.recv_timeout(Duration::from_millis(2000));
|
||||
if shutdown_res.is_err() {
|
||||
error!("timed out while shutting down tracing, exiting anyway");
|
||||
}
|
||||
}
|
||||
|
||||
info!("shutting down");
|
||||
|
||||
@@ -113,6 +113,11 @@ pub struct ComputeNodeParams {
|
||||
|
||||
/// Interval for installed extensions collection
|
||||
pub installed_extensions_collection_interval: Arc<AtomicU64>,
|
||||
|
||||
/// Timeout of PG compute startup in the Init state.
|
||||
pub pg_init_timeout: Option<Duration>,
|
||||
|
||||
pub lakebase_mode: bool,
|
||||
}
|
||||
|
||||
type TaskHandle = Mutex<Option<JoinHandle<()>>>;
|
||||
@@ -154,6 +159,7 @@ pub struct RemoteExtensionMetrics {
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ComputeState {
|
||||
pub start_time: DateTime<Utc>,
|
||||
pub pg_start_time: Option<DateTime<Utc>>,
|
||||
pub status: ComputeStatus,
|
||||
/// Timestamp of the last Postgres activity. It could be `None` if
|
||||
/// compute wasn't used since start.
|
||||
@@ -191,6 +197,7 @@ impl ComputeState {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
start_time: Utc::now(),
|
||||
pg_start_time: None,
|
||||
status: ComputeStatus::Empty,
|
||||
last_active: None,
|
||||
error: None,
|
||||
@@ -648,6 +655,9 @@ impl ComputeNode {
|
||||
};
|
||||
_this_entered = start_compute_span.enter();
|
||||
|
||||
// Hadron: Record postgres start time (used to enforce pg_init_timeout).
|
||||
state_guard.pg_start_time.replace(Utc::now());
|
||||
|
||||
state_guard.set_status(ComputeStatus::Init, &self.state_changed);
|
||||
compute_state = state_guard.clone()
|
||||
}
|
||||
@@ -1441,7 +1451,7 @@ impl ComputeNode {
|
||||
})?;
|
||||
|
||||
// Update pg_hba.conf received with basebackup.
|
||||
update_pg_hba(pgdata_path)?;
|
||||
update_pg_hba(pgdata_path, None)?;
|
||||
|
||||
// Place pg_dynshmem under /dev/shm. This allows us to use
|
||||
// 'dynamic_shared_memory_type = mmap' so that the files are placed in
|
||||
@@ -1746,6 +1756,7 @@ impl ComputeNode {
|
||||
}
|
||||
|
||||
// Run migrations separately to not hold up cold starts
|
||||
let lakebase_mode = self.params.lakebase_mode;
|
||||
let params = self.params.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut conf = conf.as_ref().clone();
|
||||
@@ -1758,7 +1769,7 @@ impl ComputeNode {
|
||||
eprintln!("connection error: {e}");
|
||||
}
|
||||
});
|
||||
if let Err(e) = handle_migrations(params, &mut client).await {
|
||||
if let Err(e) = handle_migrations(params, &mut client, lakebase_mode).await {
|
||||
error!("Failed to run migrations: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
60
compute_tools/src/hadron_metrics.rs
Normal file
60
compute_tools/src/hadron_metrics.rs
Normal file
@@ -0,0 +1,60 @@
|
||||
use metrics::{
|
||||
IntCounter, IntGaugeVec, core::Collector, proto::MetricFamily, register_int_counter,
|
||||
register_int_gauge_vec,
|
||||
};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
// Counter keeping track of the number of PageStream request errors reported by Postgres.
|
||||
// An error is registered every time Postgres calls compute_ctl's /refresh_configuration API.
|
||||
// Postgres will invoke this API if it detected trouble with PageStream requests (get_page@lsn,
|
||||
// get_base_backup, etc.) it sends to any pageserver. An increase in this counter value typically
|
||||
// indicates Postgres downtime, as PageStream requests are critical for Postgres to function.
|
||||
pub static POSTGRES_PAGESTREAM_REQUEST_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pg_cctl_pagestream_request_errors_total",
|
||||
"Number of PageStream request errors reported by the postgres process"
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
// Counter keeping track of the number of compute configuration errors due to Postgres statement
|
||||
// timeouts. An error is registered every time `ComputeNode::reconfigure()` fails due to Postgres
|
||||
// error code 57014 (query cancelled). This statement timeout typically occurs when postgres is
|
||||
// stuck in a problematic retry loop when the PS is reject its connection requests (usually due
|
||||
// to PG pointing at the wrong PS). We should investigate the root cause when this counter value
|
||||
// increases by checking PG and PS logs.
|
||||
pub static COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pg_cctl_configure_statement_timeout_errors_total",
|
||||
"Number of compute configuration errors due to Postgres statement timeouts."
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub static COMPUTE_ATTACHED: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
register_int_gauge_vec!(
|
||||
"pg_cctl_attached",
|
||||
"Compute node attached status (1 if attached)",
|
||||
&[
|
||||
"pg_compute_id",
|
||||
"pg_instance_id",
|
||||
"tenant_id",
|
||||
"timeline_id"
|
||||
]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub fn collect() -> Vec<MetricFamily> {
|
||||
let mut metrics = Vec::new();
|
||||
metrics.extend(POSTGRES_PAGESTREAM_REQUEST_ERRORS.collect());
|
||||
metrics.extend(COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS.collect());
|
||||
metrics.extend(COMPUTE_ATTACHED.collect());
|
||||
metrics
|
||||
}
|
||||
|
||||
pub fn initialize_metrics() {
|
||||
Lazy::force(&POSTGRES_PAGESTREAM_REQUEST_ERRORS);
|
||||
Lazy::force(&COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS);
|
||||
Lazy::force(&COMPUTE_ATTACHED);
|
||||
}
|
||||
@@ -16,6 +16,7 @@ pub mod compute_prewarm;
|
||||
pub mod compute_promote;
|
||||
pub mod disk_quota;
|
||||
pub mod extension_server;
|
||||
pub mod hadron_metrics;
|
||||
pub mod installed_extensions;
|
||||
pub mod local_proxy;
|
||||
pub mod lsn_lease;
|
||||
|
||||
@@ -13,7 +13,9 @@ use tracing_subscriber::prelude::*;
|
||||
/// set `OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318`. See
|
||||
/// `tracing-utils` package description.
|
||||
///
|
||||
pub async fn init_tracing_and_logging(default_log_level: &str) -> anyhow::Result<()> {
|
||||
pub fn init_tracing_and_logging(
|
||||
default_log_level: &str,
|
||||
) -> anyhow::Result<Option<tracing_utils::Provider>> {
|
||||
// Initialize Logging
|
||||
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_log_level));
|
||||
@@ -24,8 +26,9 @@ pub async fn init_tracing_and_logging(default_log_level: &str) -> anyhow::Result
|
||||
.with_writer(std::io::stderr);
|
||||
|
||||
// Initialize OpenTelemetry
|
||||
let otlp_layer =
|
||||
tracing_utils::init_tracing("compute_ctl", tracing_utils::ExportConfig::default()).await;
|
||||
let provider =
|
||||
tracing_utils::init_tracing("compute_ctl", tracing_utils::ExportConfig::default());
|
||||
let otlp_layer = provider.as_ref().map(tracing_utils::layer);
|
||||
|
||||
// Put it all together
|
||||
tracing_subscriber::registry()
|
||||
@@ -37,7 +40,7 @@ pub async fn init_tracing_and_logging(default_log_level: &str) -> anyhow::Result
|
||||
|
||||
utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
|
||||
|
||||
Ok(())
|
||||
Ok(provider)
|
||||
}
|
||||
|
||||
/// Replace all newline characters with a special character to make it
|
||||
|
||||
@@ -9,15 +9,20 @@ use crate::metrics::DB_MIGRATION_FAILED;
|
||||
pub(crate) struct MigrationRunner<'m> {
|
||||
client: &'m mut Client,
|
||||
migrations: &'m [&'m str],
|
||||
lakebase_mode: bool,
|
||||
}
|
||||
|
||||
impl<'m> MigrationRunner<'m> {
|
||||
/// Create a new migration runner
|
||||
pub fn new(client: &'m mut Client, migrations: &'m [&'m str]) -> Self {
|
||||
pub fn new(client: &'m mut Client, migrations: &'m [&'m str], lakebase_mode: bool) -> Self {
|
||||
// The neon_migration.migration_id::id column is a bigint, which is equivalent to an i64
|
||||
assert!(migrations.len() + 1 < i64::MAX as usize);
|
||||
|
||||
Self { client, migrations }
|
||||
Self {
|
||||
client,
|
||||
migrations,
|
||||
lakebase_mode,
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the current value neon_migration.migration_id
|
||||
@@ -130,8 +135,13 @@ impl<'m> MigrationRunner<'m> {
|
||||
// ID is also the next index
|
||||
let migration_id = (current_migration + 1) as i64;
|
||||
let migration = self.migrations[current_migration];
|
||||
let migration = if self.lakebase_mode {
|
||||
migration.replace("neon_superuser", "databricks_superuser")
|
||||
} else {
|
||||
migration.to_string()
|
||||
};
|
||||
|
||||
match Self::run_migration(self.client, migration_id, migration).await {
|
||||
match Self::run_migration(self.client, migration_id, &migration).await {
|
||||
Ok(_) => {
|
||||
info!("Finished migration id={}", migration_id);
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ use tracing::{Level, error, info, instrument, span};
|
||||
use crate::compute::ComputeNode;
|
||||
use crate::metrics::{PG_CURR_DOWNTIME_MS, PG_TOTAL_DOWNTIME_MS};
|
||||
|
||||
const PG_DEFAULT_INIT_TIMEOUIT: Duration = Duration::from_secs(60);
|
||||
const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
|
||||
|
||||
/// Struct to store runtime state of the compute monitor thread.
|
||||
@@ -352,13 +353,47 @@ impl ComputeMonitor {
|
||||
// Hang on condition variable waiting until the compute status is `Running`.
|
||||
fn wait_for_postgres_start(compute: &ComputeNode) {
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
let pg_init_timeout = compute
|
||||
.params
|
||||
.pg_init_timeout
|
||||
.unwrap_or(PG_DEFAULT_INIT_TIMEOUIT);
|
||||
|
||||
while state.status != ComputeStatus::Running {
|
||||
info!("compute is not running, waiting before monitoring activity");
|
||||
state = compute.state_changed.wait(state).unwrap();
|
||||
if !compute.params.lakebase_mode {
|
||||
state = compute.state_changed.wait(state).unwrap();
|
||||
|
||||
if state.status == ComputeStatus::Running {
|
||||
break;
|
||||
if state.status == ComputeStatus::Running {
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if state.pg_start_time.is_some()
|
||||
&& Utc::now()
|
||||
.signed_duration_since(state.pg_start_time.unwrap())
|
||||
.to_std()
|
||||
.unwrap_or_default()
|
||||
> pg_init_timeout
|
||||
{
|
||||
// If Postgres isn't up and running with working PS/SK connections within POSTGRES_STARTUP_TIMEOUT, it is
|
||||
// possible that we started Postgres with a wrong spec (so it is talking to the wrong PS/SK nodes). To prevent
|
||||
// deadends we simply exit (panic) the compute node so it can restart with the latest spec.
|
||||
//
|
||||
// NB: We skip this check if we have not attempted to start PG yet (indicated by state.pg_start_up == None).
|
||||
// This is to make sure the more appropriate errors are surfaced if we encounter issues before we even attempt
|
||||
// to start PG (e.g., if we can't pull the spec, can't sync safekeepers, or can't get the basebackup).
|
||||
error!(
|
||||
"compute did not enter Running state in {} seconds, exiting",
|
||||
pg_init_timeout.as_secs()
|
||||
);
|
||||
std::process::exit(1);
|
||||
}
|
||||
state = compute
|
||||
.state_changed
|
||||
.wait_timeout(state, Duration::from_secs(5))
|
||||
.unwrap()
|
||||
.0;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -11,7 +11,9 @@ use std::time::{Duration, Instant};
|
||||
|
||||
use anyhow::{Result, bail};
|
||||
use compute_api::responses::TlsConfig;
|
||||
use compute_api::spec::{Database, GenericOption, GenericOptions, PgIdent, Role};
|
||||
use compute_api::spec::{
|
||||
Database, DatabricksSettings, GenericOption, GenericOptions, PgIdent, Role,
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use indexmap::IndexMap;
|
||||
use ini::Ini;
|
||||
@@ -184,6 +186,42 @@ impl DatabaseExt for Database {
|
||||
}
|
||||
}
|
||||
|
||||
pub trait DatabricksSettingsExt {
|
||||
fn as_pg_settings(&self) -> String;
|
||||
}
|
||||
|
||||
impl DatabricksSettingsExt for DatabricksSettings {
|
||||
fn as_pg_settings(&self) -> String {
|
||||
// Postgres GUCs rendered from DatabricksSettings
|
||||
vec![
|
||||
// ssl_ca_file
|
||||
Some(format!(
|
||||
"ssl_ca_file = '{}'",
|
||||
self.pg_compute_tls_settings.ca_file
|
||||
)),
|
||||
// [Optional] databricks.workspace_url
|
||||
Some(format!(
|
||||
"databricks.workspace_url = '{}'",
|
||||
&self.databricks_workspace_host
|
||||
)),
|
||||
// todo(vikas.jain): these are not required anymore as they are moved to static
|
||||
// conf but keeping these to avoid image mismatch between hcc and pg.
|
||||
// Once hcc and pg are in sync, we can remove these.
|
||||
//
|
||||
// databricks.enable_databricks_identity_login
|
||||
Some("databricks.enable_databricks_identity_login = true".to_string()),
|
||||
// databricks.enable_sql_restrictions
|
||||
Some("databricks.enable_sql_restrictions = true".to_string()),
|
||||
]
|
||||
.into_iter()
|
||||
// Removes `None`s
|
||||
.flatten()
|
||||
.collect::<Vec<String>>()
|
||||
.join("\n")
|
||||
+ "\n"
|
||||
}
|
||||
}
|
||||
|
||||
/// Generic trait used to provide quoting / encoding for strings used in the
|
||||
/// Postgres SQL queries and DATABASE_URL.
|
||||
pub trait Escaping {
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
use std::fs::File;
|
||||
use std::fs::{self, Permissions};
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
use std::path::Path;
|
||||
|
||||
use anyhow::{Result, anyhow, bail};
|
||||
@@ -133,10 +135,25 @@ pub fn get_config_from_control_plane(base_uri: &str, compute_id: &str) -> Result
|
||||
}
|
||||
|
||||
/// Check `pg_hba.conf` and update if needed to allow external connections.
|
||||
pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
|
||||
pub fn update_pg_hba(pgdata_path: &Path, databricks_pg_hba: Option<&String>) -> Result<()> {
|
||||
// XXX: consider making it a part of config.json
|
||||
let pghba_path = pgdata_path.join("pg_hba.conf");
|
||||
|
||||
// Update pg_hba to contains databricks specfic settings before adding neon settings
|
||||
// PG uses the first record that matches to perform authentication, so we need to have
|
||||
// our rules before the default ones from neon.
|
||||
// See https://www.postgresql.org/docs/16/auth-pg-hba-conf.html
|
||||
if let Some(databricks_pg_hba) = databricks_pg_hba {
|
||||
if config::line_in_file(
|
||||
&pghba_path,
|
||||
&format!("include_if_exists {}\n", *databricks_pg_hba),
|
||||
)? {
|
||||
info!("updated pg_hba.conf to include databricks_pg_hba.conf");
|
||||
} else {
|
||||
info!("pg_hba.conf already included databricks_pg_hba.conf");
|
||||
}
|
||||
}
|
||||
|
||||
if config::line_in_file(&pghba_path, PG_HBA_ALL_MD5)? {
|
||||
info!("updated pg_hba.conf to allow external connections");
|
||||
} else {
|
||||
@@ -146,6 +163,59 @@ pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check `pg_ident.conf` and update if needed to allow databricks config.
|
||||
pub fn update_pg_ident(pgdata_path: &Path, databricks_pg_ident: Option<&String>) -> Result<()> {
|
||||
info!("checking pg_ident.conf");
|
||||
let pghba_path = pgdata_path.join("pg_ident.conf");
|
||||
|
||||
// Update pg_ident to contains databricks specfic settings
|
||||
if let Some(databricks_pg_ident) = databricks_pg_ident {
|
||||
if config::line_in_file(
|
||||
&pghba_path,
|
||||
&format!("include_if_exists {}\n", *databricks_pg_ident),
|
||||
)? {
|
||||
info!("updated pg_ident.conf to include databricks_pg_ident.conf");
|
||||
} else {
|
||||
info!("pg_ident.conf already included databricks_pg_ident.conf");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Copy tls key_file and cert_file from k8s secret mount directory
|
||||
/// to pgdata and set private key file permissions as expected by Postgres.
|
||||
/// See this doc for expected permission <https://www.postgresql.org/docs/current/ssl-tcp.html>
|
||||
/// K8s secrets mount on dblet does not honor permission and ownership
|
||||
/// specified in the Volume or VolumeMount. So we need to explicitly copy the file and set the permissions.
|
||||
pub fn copy_tls_certificates(
|
||||
key_file: &String,
|
||||
cert_file: &String,
|
||||
pgdata_path: &Path,
|
||||
) -> Result<()> {
|
||||
let files = [cert_file, key_file];
|
||||
for file in files.iter() {
|
||||
let source = Path::new(file);
|
||||
let dest = pgdata_path.join(source.file_name().unwrap());
|
||||
if !dest.exists() {
|
||||
std::fs::copy(source, &dest)?;
|
||||
info!(
|
||||
"Copying tls file: {} to {}",
|
||||
&source.display(),
|
||||
&dest.display()
|
||||
);
|
||||
}
|
||||
if *file == key_file {
|
||||
// Postgres requires private key to be readable only by the owner by having
|
||||
// chmod 600 permissions.
|
||||
let permissions = Permissions::from_mode(0o600);
|
||||
fs::set_permissions(&dest, permissions)?;
|
||||
info!("Setting permission on {}.", &dest.display());
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Create a standby.signal file
|
||||
pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
|
||||
// XXX: consider making it a part of config.json
|
||||
@@ -170,7 +240,11 @@ pub async fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> {
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub async fn handle_migrations(params: ComputeNodeParams, client: &mut Client) -> Result<()> {
|
||||
pub async fn handle_migrations(
|
||||
params: ComputeNodeParams,
|
||||
client: &mut Client,
|
||||
lakebase_mode: bool,
|
||||
) -> Result<()> {
|
||||
info!("handle migrations");
|
||||
|
||||
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
@@ -234,7 +308,7 @@ pub async fn handle_migrations(params: ComputeNodeParams, client: &mut Client) -
|
||||
),
|
||||
];
|
||||
|
||||
MigrationRunner::new(client, &migrations)
|
||||
MigrationRunner::new(client, &migrations, lakebase_mode)
|
||||
.run_migrations()
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -416,6 +416,32 @@ pub struct GenericOption {
|
||||
pub vartype: String,
|
||||
}
|
||||
|
||||
/// Postgres compute TLS settings.
|
||||
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
|
||||
pub struct PgComputeTlsSettings {
|
||||
// Absolute path to the certificate file for server-side TLS.
|
||||
pub cert_file: String,
|
||||
// Absolute path to the private key file for server-side TLS.
|
||||
pub key_file: String,
|
||||
// Absolute path to the certificate authority file for verifying client certificates.
|
||||
pub ca_file: String,
|
||||
}
|
||||
|
||||
/// Databricks specific options for compute instance.
|
||||
/// This is used to store any other settings that needs to be propagate to Compute
|
||||
/// but should not be persisted to ComputeSpec in the database.
|
||||
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
|
||||
pub struct DatabricksSettings {
|
||||
pub pg_compute_tls_settings: PgComputeTlsSettings,
|
||||
// Absolute file path to databricks_pg_hba.conf file.
|
||||
pub databricks_pg_hba: String,
|
||||
// Absolute file path to databricks_pg_ident.conf file.
|
||||
pub databricks_pg_ident: String,
|
||||
// Hostname portion of the Databricks workspace URL of the endpoint, or empty string if not known.
|
||||
// A valid hostname is required for the compute instance to support PAT logins.
|
||||
pub databricks_workspace_host: String,
|
||||
}
|
||||
|
||||
/// Optional collection of `GenericOption`'s. Type alias allows us to
|
||||
/// declare a `trait` on it.
|
||||
pub type GenericOptions = Option<Vec<GenericOption>>;
|
||||
|
||||
@@ -394,7 +394,7 @@ impl From<&OtelExporterConfig> for tracing_utils::ExportConfig {
|
||||
tracing_utils::ExportConfig {
|
||||
endpoint: Some(val.endpoint.clone()),
|
||||
protocol: val.protocol.into(),
|
||||
timeout: val.timeout,
|
||||
timeout: Some(val.timeout),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -301,7 +301,12 @@ pub struct PullTimelineRequest {
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub http_hosts: Vec<String>,
|
||||
pub ignore_tombstone: Option<bool>,
|
||||
/// Membership configuration to switch to after pull.
|
||||
/// It guarantees that if pull_timeline returns successfully, the timeline will
|
||||
/// not be deleted by request with an older generation.
|
||||
/// Storage controller always sets this field.
|
||||
/// None is only allowed for manual pull_timeline requests.
|
||||
pub mconf: Option<Configuration>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
|
||||
@@ -1,11 +1,5 @@
|
||||
//! Helper functions to set up OpenTelemetry tracing.
|
||||
//!
|
||||
//! This comes in two variants, depending on whether you have a Tokio runtime available.
|
||||
//! If you do, call `init_tracing()`. It sets up the trace processor and exporter to use
|
||||
//! the current tokio runtime. If you don't have a runtime available, or you don't want
|
||||
//! to share the runtime with the tracing tasks, call `init_tracing_without_runtime()`
|
||||
//! instead. It sets up a dedicated single-threaded Tokio runtime for the tracing tasks.
|
||||
//!
|
||||
//! Example:
|
||||
//!
|
||||
//! ```rust,no_run
|
||||
@@ -21,7 +15,8 @@
|
||||
//! .with_writer(std::io::stderr);
|
||||
//!
|
||||
//! // Initialize OpenTelemetry. Exports tracing spans as OpenTelemetry traces
|
||||
//! let otlp_layer = tracing_utils::init_tracing("my_application", tracing_utils::ExportConfig::default()).await;
|
||||
//! let provider = tracing_utils::init_tracing("my_application", tracing_utils::ExportConfig::default());
|
||||
//! let otlp_layer = provider.as_ref().map(tracing_utils::layer);
|
||||
//!
|
||||
//! // Put it all together
|
||||
//! tracing_subscriber::registry()
|
||||
@@ -36,16 +31,18 @@
|
||||
pub mod http;
|
||||
pub mod perf_span;
|
||||
|
||||
use opentelemetry::KeyValue;
|
||||
use opentelemetry::trace::TracerProvider;
|
||||
use opentelemetry_otlp::WithExportConfig;
|
||||
pub use opentelemetry_otlp::{ExportConfig, Protocol};
|
||||
use opentelemetry_sdk::trace::SdkTracerProvider;
|
||||
use tracing::level_filters::LevelFilter;
|
||||
use tracing::{Dispatch, Subscriber};
|
||||
use tracing_subscriber::Layer;
|
||||
use tracing_subscriber::layer::SubscriberExt;
|
||||
use tracing_subscriber::registry::LookupSpan;
|
||||
|
||||
pub type Provider = SdkTracerProvider;
|
||||
|
||||
/// Set up OpenTelemetry exporter, using configuration from environment variables.
|
||||
///
|
||||
/// `service_name` is set as the OpenTelemetry 'service.name' resource (see
|
||||
@@ -70,16 +67,7 @@ use tracing_subscriber::registry::LookupSpan;
|
||||
/// If you need some other setting, please test if it works first. And perhaps
|
||||
/// add a comment in the list above to save the effort of testing for the next
|
||||
/// person.
|
||||
///
|
||||
/// This doesn't block, but is marked as 'async' to hint that this must be called in
|
||||
/// asynchronous execution context.
|
||||
pub async fn init_tracing<S>(
|
||||
service_name: &str,
|
||||
export_config: ExportConfig,
|
||||
) -> Option<impl Layer<S>>
|
||||
where
|
||||
S: Subscriber + for<'span> LookupSpan<'span>,
|
||||
{
|
||||
pub fn init_tracing(service_name: &str, export_config: ExportConfig) -> Option<Provider> {
|
||||
if std::env::var("OTEL_SDK_DISABLED") == Ok("true".to_string()) {
|
||||
return None;
|
||||
};
|
||||
@@ -89,52 +77,14 @@ where
|
||||
))
|
||||
}
|
||||
|
||||
/// Like `init_tracing`, but creates a separate tokio Runtime for the tracing
|
||||
/// tasks.
|
||||
pub fn init_tracing_without_runtime<S>(
|
||||
service_name: &str,
|
||||
export_config: ExportConfig,
|
||||
) -> Option<impl Layer<S>>
|
||||
pub fn layer<S>(p: &Provider) -> impl Layer<S>
|
||||
where
|
||||
S: Subscriber + for<'span> LookupSpan<'span>,
|
||||
{
|
||||
if std::env::var("OTEL_SDK_DISABLED") == Ok("true".to_string()) {
|
||||
return None;
|
||||
};
|
||||
|
||||
// The opentelemetry batch processor and the OTLP exporter needs a Tokio
|
||||
// runtime. Create a dedicated runtime for them. One thread should be
|
||||
// enough.
|
||||
//
|
||||
// (Alternatively, instead of batching, we could use the "simple
|
||||
// processor", which doesn't need Tokio, and use "reqwest-blocking"
|
||||
// feature for the OTLP exporter, which also doesn't need Tokio. However,
|
||||
// batching is considered best practice, and also I have the feeling that
|
||||
// the non-Tokio codepaths in the opentelemetry crate are less used and
|
||||
// might be more buggy, so better to stay on the well-beaten path.)
|
||||
//
|
||||
// We leak the runtime so that it keeps running after we exit the
|
||||
// function.
|
||||
let runtime = Box::leak(Box::new(
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.thread_name("otlp runtime thread")
|
||||
.worker_threads(1)
|
||||
.build()
|
||||
.unwrap(),
|
||||
));
|
||||
let _guard = runtime.enter();
|
||||
|
||||
Some(init_tracing_internal(
|
||||
service_name.to_string(),
|
||||
export_config,
|
||||
))
|
||||
tracing_opentelemetry::layer().with_tracer(p.tracer("global"))
|
||||
}
|
||||
|
||||
fn init_tracing_internal<S>(service_name: String, export_config: ExportConfig) -> impl Layer<S>
|
||||
where
|
||||
S: Subscriber + for<'span> LookupSpan<'span>,
|
||||
{
|
||||
fn init_tracing_internal(service_name: String, export_config: ExportConfig) -> Provider {
|
||||
// Sets up exporter from the provided [`ExportConfig`] parameter.
|
||||
// If the endpoint is not specified, it is loaded from the
|
||||
// OTEL_EXPORTER_OTLP_ENDPOINT environment variable.
|
||||
@@ -153,22 +103,14 @@ where
|
||||
opentelemetry_sdk::propagation::TraceContextPropagator::new(),
|
||||
);
|
||||
|
||||
let tracer = opentelemetry_sdk::trace::TracerProvider::builder()
|
||||
.with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
|
||||
.with_resource(opentelemetry_sdk::Resource::new(vec![KeyValue::new(
|
||||
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
|
||||
service_name,
|
||||
)]))
|
||||
Provider::builder()
|
||||
.with_batch_exporter(exporter)
|
||||
.with_resource(
|
||||
opentelemetry_sdk::Resource::builder()
|
||||
.with_service_name(service_name)
|
||||
.build(),
|
||||
)
|
||||
.build()
|
||||
.tracer("global");
|
||||
|
||||
tracing_opentelemetry::layer().with_tracer(tracer)
|
||||
}
|
||||
|
||||
// Shutdown trace pipeline gracefully, so that it has a chance to send any
|
||||
// pending traces before we exit.
|
||||
pub fn shutdown_tracing() {
|
||||
opentelemetry::global::shutdown_tracer_provider();
|
||||
}
|
||||
|
||||
pub enum OtelEnablement {
|
||||
@@ -176,17 +118,17 @@ pub enum OtelEnablement {
|
||||
Enabled {
|
||||
service_name: String,
|
||||
export_config: ExportConfig,
|
||||
runtime: &'static tokio::runtime::Runtime,
|
||||
},
|
||||
}
|
||||
|
||||
pub struct OtelGuard {
|
||||
provider: Provider,
|
||||
pub dispatch: Dispatch,
|
||||
}
|
||||
|
||||
impl Drop for OtelGuard {
|
||||
fn drop(&mut self) {
|
||||
shutdown_tracing();
|
||||
_ = self.provider.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -199,22 +141,19 @@ impl Drop for OtelGuard {
|
||||
/// The lifetime of the guard should match taht of the application. On drop, it tears down the
|
||||
/// OTEL infra.
|
||||
pub fn init_performance_tracing(otel_enablement: OtelEnablement) -> Option<OtelGuard> {
|
||||
let otel_subscriber = match otel_enablement {
|
||||
match otel_enablement {
|
||||
OtelEnablement::Disabled => None,
|
||||
OtelEnablement::Enabled {
|
||||
service_name,
|
||||
export_config,
|
||||
runtime,
|
||||
} => {
|
||||
let otel_layer = runtime
|
||||
.block_on(init_tracing(&service_name, export_config))
|
||||
.with_filter(LevelFilter::INFO);
|
||||
let provider = init_tracing(&service_name, export_config)?;
|
||||
|
||||
let otel_layer = layer(&provider).with_filter(LevelFilter::INFO);
|
||||
let otel_subscriber = tracing_subscriber::registry().with(otel_layer);
|
||||
let otel_dispatch = Dispatch::new(otel_subscriber);
|
||||
let dispatch = Dispatch::new(otel_subscriber);
|
||||
|
||||
Some(otel_dispatch)
|
||||
Some(OtelGuard { dispatch, provider })
|
||||
}
|
||||
};
|
||||
|
||||
otel_subscriber.map(|dispatch| OtelGuard { dispatch })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
//! from data stored in object storage.
|
||||
//!
|
||||
use std::fmt::Write as FmtWrite;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Instant, SystemTime};
|
||||
|
||||
use anyhow::{Context, anyhow};
|
||||
@@ -420,12 +421,16 @@ where
|
||||
}
|
||||
|
||||
let mut min_restart_lsn: Lsn = Lsn::MAX;
|
||||
|
||||
let mut dbdir_cnt = 0;
|
||||
let mut rel_cnt = 0;
|
||||
|
||||
// Create tablespace directories
|
||||
for ((spcnode, dbnode), has_relmap_file) in
|
||||
self.timeline.list_dbdirs(self.lsn, self.ctx).await?
|
||||
{
|
||||
self.add_dbdir(spcnode, dbnode, has_relmap_file).await?;
|
||||
|
||||
dbdir_cnt += 1;
|
||||
// If full backup is requested, include all relation files.
|
||||
// Otherwise only include init forks of unlogged relations.
|
||||
let rels = self
|
||||
@@ -433,6 +438,7 @@ where
|
||||
.list_rels(spcnode, dbnode, Version::at(self.lsn), self.ctx)
|
||||
.await?;
|
||||
for &rel in rels.iter() {
|
||||
rel_cnt += 1;
|
||||
// Send init fork as main fork to provide well formed empty
|
||||
// contents of UNLOGGED relations. Postgres copies it in
|
||||
// `reinit.c` during recovery.
|
||||
@@ -455,6 +461,10 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
self.timeline
|
||||
.db_rel_count
|
||||
.store(Some(Arc::new((dbdir_cnt, rel_cnt))));
|
||||
|
||||
let start_time = Instant::now();
|
||||
let aux_files = self
|
||||
.timeline
|
||||
|
||||
@@ -126,7 +126,6 @@ fn main() -> anyhow::Result<()> {
|
||||
Some(cfg) => tracing_utils::OtelEnablement::Enabled {
|
||||
service_name: "pageserver".to_string(),
|
||||
export_config: (&cfg.export_config).into(),
|
||||
runtime: *COMPUTE_REQUEST_RUNTIME,
|
||||
},
|
||||
None => tracing_utils::OtelEnablement::Disabled,
|
||||
};
|
||||
|
||||
@@ -156,6 +156,8 @@ impl FeatureResolver {
|
||||
|
||||
let tenant_properties = PerTenantProperties {
|
||||
remote_size_mb: Some(rand::rng().random_range(100.0..1000000.00)),
|
||||
db_count_max: Some(rand::rng().random_range(1..1000)),
|
||||
rel_count_max: Some(rand::rng().random_range(1..1000)),
|
||||
}
|
||||
.into_posthog_properties();
|
||||
|
||||
@@ -344,6 +346,8 @@ impl FeatureResolver {
|
||||
|
||||
struct PerTenantProperties {
|
||||
pub remote_size_mb: Option<f64>,
|
||||
pub db_count_max: Option<usize>,
|
||||
pub rel_count_max: Option<usize>,
|
||||
}
|
||||
|
||||
impl PerTenantProperties {
|
||||
@@ -355,6 +359,18 @@ impl PerTenantProperties {
|
||||
PostHogFlagFilterPropertyValue::Number(remote_size_mb),
|
||||
);
|
||||
}
|
||||
if let Some(db_count) = self.db_count_max {
|
||||
properties.insert(
|
||||
"tenant_db_count_max".to_string(),
|
||||
PostHogFlagFilterPropertyValue::Number(db_count as f64),
|
||||
);
|
||||
}
|
||||
if let Some(rel_count) = self.rel_count_max {
|
||||
properties.insert(
|
||||
"tenant_rel_count_max".to_string(),
|
||||
PostHogFlagFilterPropertyValue::Number(rel_count as f64),
|
||||
);
|
||||
}
|
||||
properties
|
||||
}
|
||||
}
|
||||
@@ -409,7 +425,11 @@ impl TenantFeatureResolver {
|
||||
|
||||
/// Refresh the cached properties and flags on the critical path.
|
||||
pub fn refresh_properties_and_flags(&self, tenant_shard: &TenantShard) {
|
||||
// Any of the remote size is none => this property is none.
|
||||
let mut remote_size_mb = Some(0.0);
|
||||
// Any of the db or rel count is available => this property is available.
|
||||
let mut db_count_max = None;
|
||||
let mut rel_count_max = None;
|
||||
for timeline in tenant_shard.list_timelines() {
|
||||
let size = timeline.metrics.resident_physical_size_get();
|
||||
if size == 0 {
|
||||
@@ -419,9 +439,25 @@ impl TenantFeatureResolver {
|
||||
if let Some(ref mut remote_size_mb) = remote_size_mb {
|
||||
*remote_size_mb += size as f64 / 1024.0 / 1024.0;
|
||||
}
|
||||
if let Some(data) = timeline.db_rel_count.load_full() {
|
||||
let (db_count, rel_count) = *data.as_ref();
|
||||
if db_count_max.is_none() {
|
||||
db_count_max = Some(db_count);
|
||||
}
|
||||
if rel_count_max.is_none() {
|
||||
rel_count_max = Some(rel_count);
|
||||
}
|
||||
db_count_max = db_count_max.map(|max| max.max(db_count));
|
||||
rel_count_max = rel_count_max.map(|max| max.max(rel_count));
|
||||
}
|
||||
}
|
||||
self.cached_tenant_properties.store(Arc::new(
|
||||
PerTenantProperties { remote_size_mb }.into_posthog_properties(),
|
||||
PerTenantProperties {
|
||||
remote_size_mb,
|
||||
db_count_max,
|
||||
rel_count_max,
|
||||
}
|
||||
.into_posthog_properties(),
|
||||
));
|
||||
|
||||
// BEGIN: Update the feature flag on the critical path.
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
//!
|
||||
use std::collections::{HashMap, HashSet, hash_map};
|
||||
use std::ops::{ControlFlow, Range};
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::walingest::{WalIngestError, WalIngestErrorKind};
|
||||
use crate::{PERF_TRACE_TARGET, ensure_walingest};
|
||||
@@ -1254,11 +1255,16 @@ impl Timeline {
|
||||
let dbdir = DbDirectory::des(&buf)?;
|
||||
|
||||
let mut total_size: u64 = 0;
|
||||
let mut dbdir_cnt = 0;
|
||||
let mut rel_cnt = 0;
|
||||
|
||||
for (spcnode, dbnode) in dbdir.dbdirs.keys() {
|
||||
dbdir_cnt += 1;
|
||||
for rel in self
|
||||
.list_rels(*spcnode, *dbnode, Version::at(lsn), ctx)
|
||||
.await?
|
||||
{
|
||||
rel_cnt += 1;
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(CalculateLogicalSizeError::Cancelled);
|
||||
}
|
||||
@@ -1269,6 +1275,10 @@ impl Timeline {
|
||||
total_size += relsize as u64;
|
||||
}
|
||||
}
|
||||
|
||||
self.db_rel_count
|
||||
.store(Some(Arc::new((dbdir_cnt, rel_cnt))));
|
||||
|
||||
Ok(total_size * BLCKSZ as u64)
|
||||
}
|
||||
|
||||
|
||||
@@ -287,7 +287,7 @@ pub struct Timeline {
|
||||
ancestor_lsn: Lsn,
|
||||
|
||||
// The LSN of gc-compaction that was last applied to this timeline.
|
||||
gc_compaction_state: ArcSwap<Option<GcCompactionState>>,
|
||||
gc_compaction_state: ArcSwapOption<GcCompactionState>,
|
||||
|
||||
pub(crate) metrics: Arc<TimelineMetrics>,
|
||||
|
||||
@@ -448,7 +448,11 @@ pub struct Timeline {
|
||||
/// A channel to send async requests to prepare a basebackup for the basebackup cache.
|
||||
basebackup_cache: Arc<BasebackupCache>,
|
||||
|
||||
#[expect(dead_code)]
|
||||
feature_resolver: Arc<TenantFeatureResolver>,
|
||||
|
||||
/// Basebackup will collect the count and store it here. Used for reldirv2 rollout.
|
||||
pub(crate) db_rel_count: ArcSwapOption<(usize, usize)>,
|
||||
}
|
||||
|
||||
pub(crate) enum PreviousHeatmap {
|
||||
@@ -3236,7 +3240,7 @@ impl Timeline {
|
||||
}),
|
||||
disk_consistent_lsn: AtomicLsn::new(disk_consistent_lsn.0),
|
||||
|
||||
gc_compaction_state: ArcSwap::new(Arc::new(gc_compaction_state)),
|
||||
gc_compaction_state: ArcSwapOption::from_pointee(gc_compaction_state),
|
||||
|
||||
last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0),
|
||||
last_freeze_ts: RwLock::new(Instant::now()),
|
||||
@@ -3341,6 +3345,8 @@ impl Timeline {
|
||||
basebackup_cache: resources.basebackup_cache,
|
||||
|
||||
feature_resolver: resources.feature_resolver.clone(),
|
||||
|
||||
db_rel_count: ArcSwapOption::from_pointee(None),
|
||||
};
|
||||
|
||||
result.repartition_threshold =
|
||||
@@ -3412,7 +3418,7 @@ impl Timeline {
|
||||
gc_compaction_state: GcCompactionState,
|
||||
) -> anyhow::Result<()> {
|
||||
self.gc_compaction_state
|
||||
.store(Arc::new(Some(gc_compaction_state.clone())));
|
||||
.store(Some(Arc::new(gc_compaction_state.clone())));
|
||||
self.remote_client
|
||||
.schedule_index_upload_for_gc_compaction_state_update(gc_compaction_state)
|
||||
}
|
||||
@@ -3428,7 +3434,10 @@ impl Timeline {
|
||||
}
|
||||
|
||||
pub(crate) fn get_gc_compaction_state(&self) -> Option<GcCompactionState> {
|
||||
self.gc_compaction_state.load_full().as_ref().clone()
|
||||
self.gc_compaction_state
|
||||
.load()
|
||||
.as_ref()
|
||||
.map(|x| x.as_ref().clone())
|
||||
}
|
||||
|
||||
/// Creates and starts the wal receiver.
|
||||
|
||||
@@ -1326,13 +1326,7 @@ impl Timeline {
|
||||
.max()
|
||||
};
|
||||
|
||||
let (partition_mode, partition_lsn) = if cfg!(test)
|
||||
|| cfg!(feature = "testing")
|
||||
|| self
|
||||
.feature_resolver
|
||||
.evaluate_boolean("image-compaction-boundary")
|
||||
.is_ok()
|
||||
{
|
||||
let (partition_mode, partition_lsn) = {
|
||||
let last_repartition_lsn = self.partitioning.read().1;
|
||||
let lsn = match l0_l1_boundary_lsn {
|
||||
Some(boundary) => gc_cutoff
|
||||
@@ -1348,8 +1342,6 @@ impl Timeline {
|
||||
} else {
|
||||
("l0_l1_boundary", lsn)
|
||||
}
|
||||
} else {
|
||||
("latest_record", self.get_last_record_lsn())
|
||||
};
|
||||
|
||||
// 2. Repartition and create image layers if necessary
|
||||
|
||||
@@ -362,7 +362,7 @@ impl<T: Types> Cache<T> {
|
||||
tokio::time::sleep(RETRY_BACKOFF).await;
|
||||
continue;
|
||||
} else {
|
||||
tracing::warn!(
|
||||
tracing::info!(
|
||||
"Failed to resolve tenant shard after {} attempts: {:?}",
|
||||
GET_MAX_RETRIES,
|
||||
e
|
||||
|
||||
@@ -178,6 +178,8 @@ static PageServer page_servers[MAX_SHARDS];
|
||||
static bool pageserver_flush(shardno_t shard_no);
|
||||
static void pageserver_disconnect(shardno_t shard_no);
|
||||
static void pageserver_disconnect_shard(shardno_t shard_no);
|
||||
// HADRON
|
||||
shardno_t get_num_shards(void);
|
||||
|
||||
static bool
|
||||
PagestoreShmemIsValid(void)
|
||||
@@ -286,6 +288,22 @@ AssignPageserverConnstring(const char *newval, void *extra)
|
||||
}
|
||||
}
|
||||
|
||||
/* BEGIN_HADRON */
|
||||
/**
|
||||
* Return the total number of shards seen in the shard map.
|
||||
*/
|
||||
shardno_t get_num_shards(void)
|
||||
{
|
||||
const ShardMap *shard_map;
|
||||
|
||||
Assert(pagestore_shared);
|
||||
shard_map = &pagestore_shared->shard_map;
|
||||
|
||||
Assert(shard_map != NULL);
|
||||
return shard_map->num_shards;
|
||||
}
|
||||
/* END_HADRON */
|
||||
|
||||
/*
|
||||
* Get the current number of shards, and/or the connection string for a
|
||||
* particular shard from the shard map in shared memory.
|
||||
|
||||
@@ -72,22 +72,21 @@ InitBufferTag(BufferTag *tag, const RelFileNode *rnode,
|
||||
(tag).rnode = (rinfo); \
|
||||
} while (false)
|
||||
|
||||
#define BufTagGetNRelFileInfo(tag) tag.rnode
|
||||
#define BufTagGetNRelFileInfo(tag) (tag).rnode
|
||||
|
||||
#define BufTagGetRelNumber(tagp) ((tagp)->rnode.relNode)
|
||||
|
||||
#define BufTagInit(tag, relNumber, forknum, blkno, spcOid, dbOid) \
|
||||
#define BufTagInit(tag, rel_number, fork_number, block_number, spc_oid, db_oid) \
|
||||
do { \
|
||||
RelFileNode rnode = { .spcNode = spcOid, .dbNode = dbOid, .relNode = relNumber}; \
|
||||
(tag).forkNum = forknum; \
|
||||
(tag).blockNum = blkno; \
|
||||
(tag).rnode = rnode; \
|
||||
RelFileNode rnode = { .spcNode = (spc_oid), .dbNode = (db_oid), .relNode = (rel_number)}; \
|
||||
(tag).forkNum = (fork_number); \
|
||||
(tag).blockNum = (block_number); \
|
||||
(tag).rnode = rnode; \
|
||||
} while (false)
|
||||
|
||||
#define InvalidRelFileNumber InvalidOid
|
||||
|
||||
#define SMgrRelGetRelInfo(reln) \
|
||||
(reln->smgr_rnode.node)
|
||||
#define SMgrRelGetRelInfo(reln) ((reln)->smgr_rnode.node)
|
||||
|
||||
#define DropRelationAllLocalBuffers DropRelFileNodeAllLocalBuffers
|
||||
|
||||
@@ -133,17 +132,16 @@ InitBufferTag(BufferTag *tag, const RelFileNode *rnode,
|
||||
.relNumber = (tag).relNumber, \
|
||||
})
|
||||
|
||||
#define BufTagInit(tag, relNumber, forknum, blkno, spcOid, dbOid) \
|
||||
#define BufTagInit(tag, rel_number, fork_number, block_number, spc_oid, db_oid) \
|
||||
do { \
|
||||
(tag).forkNum = forknum; \
|
||||
(tag).blockNum = blkno; \
|
||||
(tag).spcOid = spcOid; \
|
||||
(tag).dbOid = dbOid; \
|
||||
(tag).relNumber = relNumber; \
|
||||
(tag).forkNum = (fork_number); \
|
||||
(tag).blockNum = (block_number); \
|
||||
(tag).spcOid = (spc_oid); \
|
||||
(tag).dbOid = (db_oid); \
|
||||
(tag).relNumber = (rel_number); \
|
||||
} while (false)
|
||||
|
||||
#define SMgrRelGetRelInfo(reln) \
|
||||
((reln)->smgr_rlocator)
|
||||
#define SMgrRelGetRelInfo(reln) ((reln)->smgr_rlocator)
|
||||
|
||||
#define DropRelationAllLocalBuffers DropRelationAllLocalBuffers
|
||||
#endif
|
||||
|
||||
@@ -110,6 +110,9 @@ static void rm_safekeeper_event_set(Safekeeper *to_remove, bool is_sk);
|
||||
|
||||
static void CheckGracefulShutdown(WalProposer *wp);
|
||||
|
||||
// HADRON
|
||||
shardno_t get_num_shards(void);
|
||||
|
||||
static void
|
||||
init_walprop_config(bool syncSafekeepers)
|
||||
{
|
||||
@@ -646,18 +649,19 @@ walprop_pg_get_shmem_state(WalProposer *wp)
|
||||
* Record new ps_feedback in the array with shards and update min_feedback.
|
||||
*/
|
||||
static PageserverFeedback
|
||||
record_pageserver_feedback(PageserverFeedback *ps_feedback)
|
||||
record_pageserver_feedback(PageserverFeedback *ps_feedback, shardno_t num_shards)
|
||||
{
|
||||
PageserverFeedback min_feedback;
|
||||
|
||||
Assert(ps_feedback->present);
|
||||
Assert(ps_feedback->shard_number < MAX_SHARDS);
|
||||
Assert(ps_feedback->shard_number < num_shards);
|
||||
|
||||
SpinLockAcquire(&walprop_shared->mutex);
|
||||
|
||||
/* Update the number of shards */
|
||||
if (ps_feedback->shard_number + 1 > walprop_shared->num_shards)
|
||||
walprop_shared->num_shards = ps_feedback->shard_number + 1;
|
||||
// Hadron: Update the num_shards from the source-of-truth (shard map) lazily when we receive
|
||||
// a new pageserver feedback.
|
||||
walprop_shared->num_shards = Max(walprop_shared->num_shards, num_shards);
|
||||
|
||||
/* Update the feedback */
|
||||
memcpy(&walprop_shared->shard_ps_feedback[ps_feedback->shard_number], ps_feedback, sizeof(PageserverFeedback));
|
||||
@@ -2023,19 +2027,43 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, Safekeeper *sk)
|
||||
if (wp->config->syncSafekeepers)
|
||||
return;
|
||||
|
||||
|
||||
/* handle fresh ps_feedback */
|
||||
if (sk->appendResponse.ps_feedback.present)
|
||||
{
|
||||
PageserverFeedback min_feedback = record_pageserver_feedback(&sk->appendResponse.ps_feedback);
|
||||
shardno_t num_shards = get_num_shards();
|
||||
|
||||
/* Only one main shard sends non-zero currentClusterSize */
|
||||
if (sk->appendResponse.ps_feedback.currentClusterSize > 0)
|
||||
SetNeonCurrentClusterSize(sk->appendResponse.ps_feedback.currentClusterSize);
|
||||
|
||||
if (min_feedback.disk_consistent_lsn != standby_apply_lsn)
|
||||
// During shard split, we receive ps_feedback from child shards before
|
||||
// the split commits and our shard map GUC has been updated. We must
|
||||
// filter out such feedback here because record_pageserver_feedback()
|
||||
// doesn't do it.
|
||||
//
|
||||
// NB: what we would actually want to happen is that we only receive
|
||||
// ps_feedback from the parent shards when the split is committed, then
|
||||
// apply the split to our set of tracked feedback and from here on only
|
||||
// receive ps_feedback from child shards. This filter condition doesn't
|
||||
// do that: if we split from N parent to 2N child shards, the first N
|
||||
// child shards' feedback messages will pass this condition, even before
|
||||
// the split is committed. That's a bit sloppy, but OK for now.
|
||||
if (sk->appendResponse.ps_feedback.shard_number < num_shards)
|
||||
{
|
||||
standby_apply_lsn = min_feedback.disk_consistent_lsn;
|
||||
needToAdvanceSlot = true;
|
||||
PageserverFeedback min_feedback = record_pageserver_feedback(&sk->appendResponse.ps_feedback, num_shards);
|
||||
|
||||
/* Only one main shard sends non-zero currentClusterSize */
|
||||
if (sk->appendResponse.ps_feedback.currentClusterSize > 0)
|
||||
SetNeonCurrentClusterSize(sk->appendResponse.ps_feedback.currentClusterSize);
|
||||
|
||||
if (min_feedback.disk_consistent_lsn != standby_apply_lsn)
|
||||
{
|
||||
standby_apply_lsn = min_feedback.disk_consistent_lsn;
|
||||
needToAdvanceSlot = true;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// HADRON
|
||||
elog(DEBUG2, "Ignoring pageserver feedback for unknown shard %d (current shard number %d)",
|
||||
sk->appendResponse.ps_feedback.shard_number, num_shards);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -76,7 +76,7 @@ fn cli() -> clap::Command {
|
||||
}
|
||||
|
||||
pub async fn run() -> anyhow::Result<()> {
|
||||
let _logging_guard = crate::logging::init().await?;
|
||||
let _logging_guard = crate::logging::init()?;
|
||||
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
|
||||
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
|
||||
|
||||
|
||||
@@ -334,7 +334,7 @@ struct PgSniRouterArgs {
|
||||
}
|
||||
|
||||
pub async fn run() -> anyhow::Result<()> {
|
||||
let _logging_guard = crate::logging::init().await?;
|
||||
let _logging_guard = crate::logging::init()?;
|
||||
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
|
||||
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
|
||||
|
||||
|
||||
@@ -26,7 +26,7 @@ use crate::metrics::Metrics;
|
||||
/// configuration from environment variables. For example, to change the
|
||||
/// destination, set `OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318`.
|
||||
/// See <https://opentelemetry.io/docs/reference/specification/sdk-environment-variables>
|
||||
pub async fn init() -> anyhow::Result<LoggingGuard> {
|
||||
pub fn init() -> anyhow::Result<LoggingGuard> {
|
||||
let logfmt = LogFormat::from_env()?;
|
||||
|
||||
let env_filter = EnvFilter::builder()
|
||||
@@ -43,8 +43,8 @@ pub async fn init() -> anyhow::Result<LoggingGuard> {
|
||||
.expect("this should be a valid filter directive"),
|
||||
);
|
||||
|
||||
let otlp_layer =
|
||||
tracing_utils::init_tracing("proxy", tracing_utils::ExportConfig::default()).await;
|
||||
let provider = tracing_utils::init_tracing("proxy", tracing_utils::ExportConfig::default());
|
||||
let otlp_layer = provider.as_ref().map(tracing_utils::layer);
|
||||
|
||||
let json_log_layer = if logfmt == LogFormat::Json {
|
||||
Some(JsonLoggingLayer::new(
|
||||
@@ -76,7 +76,7 @@ pub async fn init() -> anyhow::Result<LoggingGuard> {
|
||||
.with(text_log_layer)
|
||||
.try_init()?;
|
||||
|
||||
Ok(LoggingGuard)
|
||||
Ok(LoggingGuard(provider))
|
||||
}
|
||||
|
||||
/// Initialize logging for local_proxy with log prefix and no opentelemetry.
|
||||
@@ -97,7 +97,7 @@ pub fn init_local_proxy() -> anyhow::Result<LoggingGuard> {
|
||||
.with(fmt_layer)
|
||||
.try_init()?;
|
||||
|
||||
Ok(LoggingGuard)
|
||||
Ok(LoggingGuard(None))
|
||||
}
|
||||
|
||||
pub struct LocalProxyFormatter(Format<Full, SystemTime>);
|
||||
@@ -118,14 +118,16 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LoggingGuard;
|
||||
pub struct LoggingGuard(Option<tracing_utils::Provider>);
|
||||
|
||||
impl Drop for LoggingGuard {
|
||||
fn drop(&mut self) {
|
||||
// Shutdown trace pipeline gracefully, so that it has a chance to send any
|
||||
// pending traces before we exit.
|
||||
tracing::info!("shutting down the tracing machinery");
|
||||
tracing_utils::shutdown_tracing();
|
||||
if let Some(p) = &self.0 {
|
||||
// Shutdown trace pipeline gracefully, so that it has a chance to send any
|
||||
// pending traces before we exit.
|
||||
tracing::info!("shutting down the tracing machinery");
|
||||
drop(p.shutdown());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -161,9 +161,9 @@ pub async fn handle_request(
|
||||
FileStorage::create_new(&tli_dir_path, new_state.clone(), conf.no_sync).await?;
|
||||
|
||||
// now we have a ready timeline in a temp directory
|
||||
validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?;
|
||||
validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path, None).await?;
|
||||
global_timelines
|
||||
.load_temp_timeline(request.destination_ttid, &tli_dir_path, true)
|
||||
.load_temp_timeline(request.destination_ttid, &tli_dir_path, None)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -193,7 +193,7 @@ pub async fn hcc_pull_timeline(
|
||||
tenant_id: timeline.tenant_id,
|
||||
timeline_id: timeline.timeline_id,
|
||||
http_hosts: Vec::new(),
|
||||
ignore_tombstone: None,
|
||||
mconf: None,
|
||||
};
|
||||
for host in timeline.peers {
|
||||
if host.0 == conf.my_id.0 {
|
||||
|
||||
@@ -352,7 +352,7 @@ async fn timeline_exclude_handler(mut request: Request<Body>) -> Result<Response
|
||||
// instead.
|
||||
if data.mconf.contains(my_id) {
|
||||
return Err(ApiError::Forbidden(format!(
|
||||
"refused to switch into {}, node {} is member of it",
|
||||
"refused to exclude timeline with {}, node {} is member of it",
|
||||
data.mconf, my_id
|
||||
)));
|
||||
}
|
||||
|
||||
@@ -13,8 +13,8 @@ use http_utils::error::ApiError;
|
||||
use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo};
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use reqwest::Certificate;
|
||||
use safekeeper_api::Term;
|
||||
use safekeeper_api::models::{PullTimelineRequest, PullTimelineResponse, TimelineStatus};
|
||||
use safekeeper_api::{Term, membership};
|
||||
use safekeeper_client::mgmt_api;
|
||||
use safekeeper_client::mgmt_api::Client;
|
||||
use serde::Deserialize;
|
||||
@@ -453,12 +453,40 @@ pub async fn handle_request(
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
wait_for_peer_timeline_status: bool,
|
||||
) -> Result<PullTimelineResponse, ApiError> {
|
||||
if let Some(mconf) = &request.mconf {
|
||||
let sk_id = global_timelines.get_sk_id();
|
||||
if !mconf.contains(sk_id) {
|
||||
return Err(ApiError::BadRequest(anyhow!(
|
||||
"refused to pull timeline with {mconf}, node {sk_id} is not member of it",
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
let existing_tli = global_timelines.get(TenantTimelineId::new(
|
||||
request.tenant_id,
|
||||
request.timeline_id,
|
||||
));
|
||||
if existing_tli.is_ok() {
|
||||
info!("Timeline {} already exists", request.timeline_id);
|
||||
if let Ok(timeline) = existing_tli {
|
||||
let cur_generation = timeline
|
||||
.read_shared_state()
|
||||
.await
|
||||
.sk
|
||||
.state()
|
||||
.mconf
|
||||
.generation;
|
||||
|
||||
info!(
|
||||
"Timeline {} already exists with generation {cur_generation}",
|
||||
request.timeline_id,
|
||||
);
|
||||
|
||||
if let Some(mconf) = request.mconf {
|
||||
timeline
|
||||
.membership_switch(mconf)
|
||||
.await
|
||||
.map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?;
|
||||
}
|
||||
|
||||
return Ok(PullTimelineResponse {
|
||||
safekeeper_host: None,
|
||||
});
|
||||
@@ -495,6 +523,19 @@ pub async fn handle_request(
|
||||
for (i, response) in responses.into_iter().enumerate() {
|
||||
match response {
|
||||
Ok(status) => {
|
||||
if let Some(mconf) = &request.mconf {
|
||||
if status.mconf.generation > mconf.generation {
|
||||
// We probably raced with another timeline membership change with higher generation.
|
||||
// Ignore this request.
|
||||
return Err(ApiError::Conflict(format!(
|
||||
"cannot pull timeline with generation {}: timeline {} already exists with generation {} on {}",
|
||||
mconf.generation,
|
||||
request.timeline_id,
|
||||
status.mconf.generation,
|
||||
http_hosts[i],
|
||||
)));
|
||||
}
|
||||
}
|
||||
statuses.push((status, i));
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -593,15 +634,13 @@ pub async fn handle_request(
|
||||
assert!(status.tenant_id == request.tenant_id);
|
||||
assert!(status.timeline_id == request.timeline_id);
|
||||
|
||||
let check_tombstone = !request.ignore_tombstone.unwrap_or_default();
|
||||
|
||||
match pull_timeline(
|
||||
status,
|
||||
safekeeper_host,
|
||||
sk_auth_token,
|
||||
http_client,
|
||||
global_timelines,
|
||||
check_tombstone,
|
||||
request.mconf,
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -611,6 +650,10 @@ pub async fn handle_request(
|
||||
Some(TimelineError::AlreadyExists(_)) => Ok(PullTimelineResponse {
|
||||
safekeeper_host: None,
|
||||
}),
|
||||
Some(TimelineError::Deleted(_)) => Err(ApiError::Conflict(format!(
|
||||
"Timeline {}/{} deleted",
|
||||
request.tenant_id, request.timeline_id
|
||||
))),
|
||||
Some(TimelineError::CreationInProgress(_)) => {
|
||||
// We don't return success here because creation might still fail.
|
||||
Err(ApiError::Conflict("Creation in progress".to_owned()))
|
||||
@@ -627,7 +670,7 @@ async fn pull_timeline(
|
||||
sk_auth_token: Option<SecretString>,
|
||||
http_client: reqwest::Client,
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
check_tombstone: bool,
|
||||
mconf: Option<membership::Configuration>,
|
||||
) -> Result<PullTimelineResponse> {
|
||||
let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
|
||||
info!(
|
||||
@@ -689,8 +732,11 @@ async fn pull_timeline(
|
||||
// fsync temp timeline directory to remember its contents.
|
||||
fsync_async_opt(&tli_dir_path, !conf.no_sync).await?;
|
||||
|
||||
let generation = mconf.as_ref().map(|c| c.generation);
|
||||
|
||||
// Let's create timeline from temp directory and verify that it's correct
|
||||
let (commit_lsn, flush_lsn) = validate_temp_timeline(conf, ttid, &tli_dir_path).await?;
|
||||
let (commit_lsn, flush_lsn) =
|
||||
validate_temp_timeline(conf, ttid, &tli_dir_path, generation).await?;
|
||||
info!(
|
||||
"finished downloading timeline {}, commit_lsn={}, flush_lsn={}",
|
||||
ttid, commit_lsn, flush_lsn
|
||||
@@ -698,10 +744,20 @@ async fn pull_timeline(
|
||||
assert!(status.commit_lsn <= status.flush_lsn);
|
||||
|
||||
// Finally, load the timeline.
|
||||
let _tli = global_timelines
|
||||
.load_temp_timeline(ttid, &tli_dir_path, check_tombstone)
|
||||
let timeline = global_timelines
|
||||
.load_temp_timeline(ttid, &tli_dir_path, generation)
|
||||
.await?;
|
||||
|
||||
if let Some(mconf) = mconf {
|
||||
// Switch to provided mconf to guarantee that the timeline will not
|
||||
// be deleted by request with older generation.
|
||||
// The generation might already be higer than the one in mconf, e.g.
|
||||
// if another membership_switch request was executed between `load_temp_timeline`
|
||||
// and `membership_switch`, but that's totaly fine. `membership_switch` will
|
||||
// ignore switch to older generation.
|
||||
timeline.membership_switch(mconf).await?;
|
||||
}
|
||||
|
||||
Ok(PullTimelineResponse {
|
||||
safekeeper_host: Some(host),
|
||||
})
|
||||
|
||||
@@ -1026,6 +1026,13 @@ where
|
||||
self.state.finish_change(&state).await?;
|
||||
}
|
||||
|
||||
if msg.mconf.generation > self.state.mconf.generation && !msg.mconf.contains(self.node_id) {
|
||||
bail!(
|
||||
"refused to switch into {}, node {} is not a member of it",
|
||||
msg.mconf,
|
||||
self.node_id,
|
||||
);
|
||||
}
|
||||
// Switch into conf given by proposer conf if it is higher.
|
||||
self.state.membership_switch(msg.mconf.clone()).await?;
|
||||
|
||||
|
||||
@@ -427,6 +427,9 @@ impl From<TimelineError> for ApiError {
|
||||
TimelineError::NotFound(ttid) => {
|
||||
ApiError::NotFound(anyhow!("timeline {} not found", ttid).into())
|
||||
}
|
||||
TimelineError::Deleted(ttid) => {
|
||||
ApiError::NotFound(anyhow!("timeline {} deleted", ttid).into())
|
||||
}
|
||||
_ => ApiError::InternalServerError(anyhow!("{}", te)),
|
||||
}
|
||||
}
|
||||
@@ -591,7 +594,7 @@ impl Timeline {
|
||||
|
||||
/// Cancel the timeline, requesting background activity to stop. Closing
|
||||
/// the `self.gate` waits for that.
|
||||
pub async fn cancel(&self) {
|
||||
pub fn cancel(&self) {
|
||||
info!("timeline {} shutting down", self.ttid);
|
||||
self.cancel.cancel();
|
||||
}
|
||||
@@ -911,6 +914,13 @@ impl Timeline {
|
||||
to: Configuration,
|
||||
) -> Result<TimelineMembershipSwitchResponse> {
|
||||
let mut state = self.write_shared_state().await;
|
||||
// Ensure we don't race with exclude/delete requests by checking the cancellation
|
||||
// token under the write_shared_state lock.
|
||||
// Exclude/delete cancel the timeline under the shared state lock,
|
||||
// so the timeline cannot be deleted in the middle of the membership switch.
|
||||
if self.is_cancelled() {
|
||||
bail!(TimelineError::Cancelled(self.ttid));
|
||||
}
|
||||
state.sk.membership_switch(to).await
|
||||
}
|
||||
|
||||
|
||||
@@ -10,13 +10,13 @@ use std::time::{Duration, Instant};
|
||||
use anyhow::{Context, Result, bail};
|
||||
use camino::Utf8PathBuf;
|
||||
use camino_tempfile::Utf8TempDir;
|
||||
use safekeeper_api::membership::Configuration;
|
||||
use safekeeper_api::membership::{Configuration, SafekeeperGeneration};
|
||||
use safekeeper_api::models::{SafekeeperUtilization, TimelineDeleteResult};
|
||||
use safekeeper_api::{ServerInfo, membership};
|
||||
use tokio::fs;
|
||||
use tracing::*;
|
||||
use utils::crashsafe::{durable_rename, fsync_async_opt};
|
||||
use utils::id::{TenantId, TenantTimelineId, TimelineId};
|
||||
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::defaults::DEFAULT_EVICTION_CONCURRENCY;
|
||||
@@ -40,10 +40,17 @@ enum GlobalMapTimeline {
|
||||
struct GlobalTimelinesState {
|
||||
timelines: HashMap<TenantTimelineId, GlobalMapTimeline>,
|
||||
|
||||
// A tombstone indicates this timeline used to exist has been deleted. These are used to prevent
|
||||
// on-demand timeline creation from recreating deleted timelines. This is only soft-enforced, as
|
||||
// this map is dropped on restart.
|
||||
tombstones: HashMap<TenantTimelineId, Instant>,
|
||||
/// A tombstone indicates this timeline used to exist has been deleted. These are used to prevent
|
||||
/// on-demand timeline creation from recreating deleted timelines. This is only soft-enforced, as
|
||||
/// this map is dropped on restart.
|
||||
/// The timeline might also be locally deleted (excluded) via safekeeper migration algorithm. In that case,
|
||||
/// the tombsone contains the corresponding safekeeper generation. The pull_timeline requests with
|
||||
/// higher generation ignore such tombstones and can recreate the timeline.
|
||||
timeline_tombstones: HashMap<TenantTimelineId, TimelineTombstone>,
|
||||
/// A tombstone indicates that the tenant used to exist has been deleted.
|
||||
/// These are created only by tenant_delete requests. They are always valid regardless of the
|
||||
/// request generation.
|
||||
/// This is only soft-enforced, as this map is dropped on restart.
|
||||
tenant_tombstones: HashMap<TenantId, Instant>,
|
||||
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
@@ -79,7 +86,7 @@ impl GlobalTimelinesState {
|
||||
Err(TimelineError::CreationInProgress(*ttid))
|
||||
}
|
||||
None => {
|
||||
if self.has_tombstone(ttid) {
|
||||
if self.has_tombstone(ttid, None) {
|
||||
Err(TimelineError::Deleted(*ttid))
|
||||
} else {
|
||||
Err(TimelineError::NotFound(*ttid))
|
||||
@@ -88,20 +95,46 @@ impl GlobalTimelinesState {
|
||||
}
|
||||
}
|
||||
|
||||
fn has_tombstone(&self, ttid: &TenantTimelineId) -> bool {
|
||||
self.tombstones.contains_key(ttid) || self.tenant_tombstones.contains_key(&ttid.tenant_id)
|
||||
fn has_timeline_tombstone(
|
||||
&self,
|
||||
ttid: &TenantTimelineId,
|
||||
generation: Option<SafekeeperGeneration>,
|
||||
) -> bool {
|
||||
if let Some(generation) = generation {
|
||||
self.timeline_tombstones
|
||||
.get(ttid)
|
||||
.is_some_and(|t| t.is_valid(generation))
|
||||
} else {
|
||||
self.timeline_tombstones.contains_key(ttid)
|
||||
}
|
||||
}
|
||||
|
||||
/// Removes all blocking tombstones for the given timeline ID.
|
||||
fn has_tenant_tombstone(&self, tenant_id: &TenantId) -> bool {
|
||||
self.tenant_tombstones.contains_key(tenant_id)
|
||||
}
|
||||
|
||||
/// Check if the state has a tenant or a timeline tombstone.
|
||||
/// If `generation` is provided, check only for timeline tombsotnes with same or higher generation.
|
||||
/// If `generation` is `None`, check for any timeline tombstone.
|
||||
/// Tenant tombstones are checked regardless of the generation.
|
||||
fn has_tombstone(
|
||||
&self,
|
||||
ttid: &TenantTimelineId,
|
||||
generation: Option<SafekeeperGeneration>,
|
||||
) -> bool {
|
||||
self.has_timeline_tombstone(ttid, generation) || self.has_tenant_tombstone(&ttid.tenant_id)
|
||||
}
|
||||
|
||||
/// Removes timeline tombstone for the given timeline ID.
|
||||
/// Returns `true` if there have been actual changes.
|
||||
fn remove_tombstone(&mut self, ttid: &TenantTimelineId) -> bool {
|
||||
self.tombstones.remove(ttid).is_some()
|
||||
|| self.tenant_tombstones.remove(&ttid.tenant_id).is_some()
|
||||
fn remove_timeline_tombstone(&mut self, ttid: &TenantTimelineId) -> bool {
|
||||
self.timeline_tombstones.remove(ttid).is_some()
|
||||
}
|
||||
|
||||
fn delete(&mut self, ttid: TenantTimelineId) {
|
||||
fn delete(&mut self, ttid: TenantTimelineId, generation: Option<SafekeeperGeneration>) {
|
||||
self.timelines.remove(&ttid);
|
||||
self.tombstones.insert(ttid, Instant::now());
|
||||
self.timeline_tombstones
|
||||
.insert(ttid, TimelineTombstone::new(generation));
|
||||
}
|
||||
|
||||
fn add_tenant_tombstone(&mut self, tenant_id: TenantId) {
|
||||
@@ -120,7 +153,7 @@ impl GlobalTimelines {
|
||||
Self {
|
||||
state: Mutex::new(GlobalTimelinesState {
|
||||
timelines: HashMap::new(),
|
||||
tombstones: HashMap::new(),
|
||||
timeline_tombstones: HashMap::new(),
|
||||
tenant_tombstones: HashMap::new(),
|
||||
conf,
|
||||
broker_active_set: Arc::new(TimelinesSet::default()),
|
||||
@@ -261,6 +294,8 @@ impl GlobalTimelines {
|
||||
start_lsn: Lsn,
|
||||
commit_lsn: Lsn,
|
||||
) -> Result<Arc<Timeline>> {
|
||||
let generation = Some(mconf.generation);
|
||||
|
||||
let (conf, _, _, _) = {
|
||||
let state = self.state.lock().unwrap();
|
||||
if let Ok(timeline) = state.get(&ttid) {
|
||||
@@ -268,8 +303,8 @@ impl GlobalTimelines {
|
||||
return Ok(timeline);
|
||||
}
|
||||
|
||||
if state.has_tombstone(&ttid) {
|
||||
anyhow::bail!("Timeline {ttid} is deleted, refusing to recreate");
|
||||
if state.has_tombstone(&ttid, generation) {
|
||||
anyhow::bail!(TimelineError::Deleted(ttid));
|
||||
}
|
||||
|
||||
state.get_dependencies()
|
||||
@@ -284,7 +319,9 @@ impl GlobalTimelines {
|
||||
// immediately initialize first WAL segment as well.
|
||||
let state = TimelinePersistentState::new(&ttid, mconf, server_info, start_lsn, commit_lsn)?;
|
||||
control_file::FileStorage::create_new(&tmp_dir_path, state, conf.no_sync).await?;
|
||||
let timeline = self.load_temp_timeline(ttid, &tmp_dir_path, true).await?;
|
||||
let timeline = self
|
||||
.load_temp_timeline(ttid, &tmp_dir_path, generation)
|
||||
.await?;
|
||||
Ok(timeline)
|
||||
}
|
||||
|
||||
@@ -303,7 +340,7 @@ impl GlobalTimelines {
|
||||
&self,
|
||||
ttid: TenantTimelineId,
|
||||
tmp_path: &Utf8PathBuf,
|
||||
check_tombstone: bool,
|
||||
generation: Option<SafekeeperGeneration>,
|
||||
) -> Result<Arc<Timeline>> {
|
||||
// Check for existence and mark that we're creating it.
|
||||
let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup) = {
|
||||
@@ -317,18 +354,18 @@ impl GlobalTimelines {
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
if check_tombstone {
|
||||
if state.has_tombstone(&ttid) {
|
||||
anyhow::bail!("timeline {ttid} is deleted, refusing to recreate");
|
||||
}
|
||||
} else {
|
||||
// We may be have been asked to load a timeline that was previously deleted (e.g. from `pull_timeline.rs`). We trust
|
||||
// that the human doing this manual intervention knows what they are doing, and remove its tombstone.
|
||||
// It's also possible that we enter this when the tenant has been deleted, even if the timeline itself has never existed.
|
||||
if state.remove_tombstone(&ttid) {
|
||||
warn!("un-deleted timeline {ttid}");
|
||||
}
|
||||
|
||||
if state.has_tombstone(&ttid, generation) {
|
||||
// If the timeline is deleted, we refuse to recreate it.
|
||||
// This is a safeguard against accidentally overwriting a timeline that was deleted
|
||||
// by concurrent request.
|
||||
anyhow::bail!(TimelineError::Deleted(ttid));
|
||||
}
|
||||
|
||||
// We might have an outdated tombstone with the older generation.
|
||||
// Remove it unconditionally.
|
||||
state.remove_timeline_tombstone(&ttid);
|
||||
|
||||
state
|
||||
.timelines
|
||||
.insert(ttid, GlobalMapTimeline::CreationInProgress);
|
||||
@@ -503,11 +540,16 @@ impl GlobalTimelines {
|
||||
ttid: &TenantTimelineId,
|
||||
action: DeleteOrExclude,
|
||||
) -> Result<TimelineDeleteResult, DeleteOrExcludeError> {
|
||||
let generation = match &action {
|
||||
DeleteOrExclude::Delete | DeleteOrExclude::DeleteLocal => None,
|
||||
DeleteOrExclude::Exclude(mconf) => Some(mconf.generation),
|
||||
};
|
||||
|
||||
let tli_res = {
|
||||
let state = self.state.lock().unwrap();
|
||||
|
||||
// Do NOT check tenant tombstones here: those were set earlier
|
||||
if state.tombstones.contains_key(ttid) {
|
||||
if state.has_timeline_tombstone(ttid, generation) {
|
||||
// Presence of a tombstone guarantees that a previous deletion has completed and there is no work to do.
|
||||
info!("Timeline {ttid} was already deleted");
|
||||
return Ok(TimelineDeleteResult { dir_existed: false });
|
||||
@@ -528,6 +570,11 @@ impl GlobalTimelines {
|
||||
// We would like to avoid holding the lock while waiting for the
|
||||
// gate to finish as this is deadlock prone, so for actual
|
||||
// deletion will take it second time.
|
||||
//
|
||||
// Canceling the timeline will block membership switch requests,
|
||||
// ensuring that the timeline generation will not increase
|
||||
// after this point, and we will not remove a timeline with a generation
|
||||
// higher than the requested one.
|
||||
if let DeleteOrExclude::Exclude(ref mconf) = action {
|
||||
let shared_state = timeline.read_shared_state().await;
|
||||
if shared_state.sk.state().mconf.generation > mconf.generation {
|
||||
@@ -536,9 +583,9 @@ impl GlobalTimelines {
|
||||
current: shared_state.sk.state().mconf.clone(),
|
||||
});
|
||||
}
|
||||
timeline.cancel().await;
|
||||
timeline.cancel();
|
||||
} else {
|
||||
timeline.cancel().await;
|
||||
timeline.cancel();
|
||||
}
|
||||
|
||||
timeline.close().await;
|
||||
@@ -565,7 +612,7 @@ impl GlobalTimelines {
|
||||
// Finalize deletion, by dropping Timeline objects and storing smaller tombstones. The tombstones
|
||||
// are used to prevent still-running computes from re-creating the same timeline when they send data,
|
||||
// and to speed up repeated deletion calls by avoiding re-listing objects.
|
||||
self.state.lock().unwrap().delete(*ttid);
|
||||
self.state.lock().unwrap().delete(*ttid, generation);
|
||||
|
||||
result
|
||||
}
|
||||
@@ -627,12 +674,16 @@ impl GlobalTimelines {
|
||||
// may recreate a deleted timeline.
|
||||
let now = Instant::now();
|
||||
state
|
||||
.tombstones
|
||||
.retain(|_, v| now.duration_since(*v) < *tombstone_ttl);
|
||||
.timeline_tombstones
|
||||
.retain(|_, v| now.duration_since(v.timestamp) < *tombstone_ttl);
|
||||
state
|
||||
.tenant_tombstones
|
||||
.retain(|_, v| now.duration_since(*v) < *tombstone_ttl);
|
||||
}
|
||||
|
||||
pub fn get_sk_id(&self) -> NodeId {
|
||||
self.state.lock().unwrap().conf.my_id
|
||||
}
|
||||
}
|
||||
|
||||
/// Action for delete_or_exclude.
|
||||
@@ -673,6 +724,7 @@ pub async fn validate_temp_timeline(
|
||||
conf: &SafeKeeperConf,
|
||||
ttid: TenantTimelineId,
|
||||
path: &Utf8PathBuf,
|
||||
generation: Option<SafekeeperGeneration>,
|
||||
) -> Result<(Lsn, Lsn)> {
|
||||
let control_path = path.join("safekeeper.control");
|
||||
|
||||
@@ -681,6 +733,15 @@ pub async fn validate_temp_timeline(
|
||||
bail!("wal_seg_size is not set");
|
||||
}
|
||||
|
||||
if let Some(generation) = generation {
|
||||
if control_store.mconf.generation > generation {
|
||||
bail!(
|
||||
"tmp timeline generation {} is higher than expected {generation}",
|
||||
control_store.mconf.generation
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let wal_store = wal_storage::PhysicalStorage::new(&ttid, path, &control_store, conf.no_sync)?;
|
||||
|
||||
let commit_lsn = control_store.commit_lsn;
|
||||
@@ -688,3 +749,28 @@ pub async fn validate_temp_timeline(
|
||||
|
||||
Ok((commit_lsn, flush_lsn))
|
||||
}
|
||||
|
||||
/// A tombstone for a deleted timeline.
|
||||
/// The generation is passed with "exclude" request and stored in the tombstone.
|
||||
/// We ignore the tombstone if the request generation is higher than
|
||||
/// the tombstone generation.
|
||||
/// If the tombstone doesn't have a generation, it's considered permanent,
|
||||
/// e.g. after "delete" request.
|
||||
struct TimelineTombstone {
|
||||
timestamp: Instant,
|
||||
generation: Option<SafekeeperGeneration>,
|
||||
}
|
||||
|
||||
impl TimelineTombstone {
|
||||
fn new(generation: Option<SafekeeperGeneration>) -> Self {
|
||||
TimelineTombstone {
|
||||
timestamp: Instant::now(),
|
||||
generation,
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if the timeline is still valid for the given generation.
|
||||
fn is_valid(&self, generation: SafekeeperGeneration) -> bool {
|
||||
self.generation.is_none_or(|g| g >= generation)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
ALTER TABLE timelines DROP sk_set_notified_generation;
|
||||
@@ -0,0 +1 @@
|
||||
ALTER TABLE timelines ADD sk_set_notified_generation INTEGER NOT NULL DEFAULT 1;
|
||||
@@ -351,7 +351,7 @@ impl Node {
|
||||
warn_threshold: u32,
|
||||
max_retries: u32,
|
||||
timeout: Duration,
|
||||
cancel: &CancellationToken,
|
||||
cancel_new_retries: &CancellationToken,
|
||||
) -> Option<mgmt_api::Result<T>>
|
||||
where
|
||||
O: FnMut(PageserverClient) -> F,
|
||||
@@ -402,7 +402,7 @@ impl Node {
|
||||
self.id,
|
||||
self.base_url(),
|
||||
),
|
||||
cancel,
|
||||
cancel_new_retries,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -131,6 +131,8 @@ pub(crate) enum DatabaseOperation {
|
||||
InsertTimeline,
|
||||
UpdateTimeline,
|
||||
UpdateTimelineMembership,
|
||||
UpdateCplaneNotifiedGeneration,
|
||||
UpdateSkSetNotifiedGeneration,
|
||||
GetTimeline,
|
||||
InsertTimelineReconcile,
|
||||
RemoveTimelineReconcile,
|
||||
@@ -1497,6 +1499,8 @@ impl Persistence {
|
||||
/// Update timeline membership configuration in the database.
|
||||
/// Perform a compare-and-swap (CAS) operation on the timeline's generation.
|
||||
/// The `new_generation` must be the next (+1) generation after the one in the database.
|
||||
/// Also inserts reconcile_requests to safekeeper_timeline_pending_ops table in the same
|
||||
/// transaction.
|
||||
pub(crate) async fn update_timeline_membership(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
@@ -1504,8 +1508,11 @@ impl Persistence {
|
||||
new_generation: SafekeeperGeneration,
|
||||
sk_set: &[NodeId],
|
||||
new_sk_set: Option<&[NodeId]>,
|
||||
reconcile_requests: &[TimelinePendingOpPersistence],
|
||||
) -> DatabaseResult<()> {
|
||||
use crate::schema::timelines::dsl;
|
||||
use crate::schema::safekeeper_timeline_pending_ops as stpo;
|
||||
use crate::schema::timelines;
|
||||
use diesel::query_dsl::methods::FilterDsl;
|
||||
|
||||
let prev_generation = new_generation.previous().unwrap();
|
||||
|
||||
@@ -1513,14 +1520,15 @@ impl Persistence {
|
||||
let timeline_id = &timeline_id;
|
||||
self.with_measured_conn(DatabaseOperation::UpdateTimelineMembership, move |conn| {
|
||||
Box::pin(async move {
|
||||
let updated = diesel::update(dsl::timelines)
|
||||
.filter(dsl::tenant_id.eq(&tenant_id.to_string()))
|
||||
.filter(dsl::timeline_id.eq(&timeline_id.to_string()))
|
||||
.filter(dsl::generation.eq(prev_generation.into_inner() as i32))
|
||||
let updated = diesel::update(timelines::table)
|
||||
.filter(timelines::tenant_id.eq(&tenant_id.to_string()))
|
||||
.filter(timelines::timeline_id.eq(&timeline_id.to_string()))
|
||||
.filter(timelines::generation.eq(prev_generation.into_inner() as i32))
|
||||
.set((
|
||||
dsl::generation.eq(new_generation.into_inner() as i32),
|
||||
dsl::sk_set.eq(sk_set.iter().map(|id| id.0 as i64).collect::<Vec<_>>()),
|
||||
dsl::new_sk_set.eq(new_sk_set
|
||||
timelines::generation.eq(new_generation.into_inner() as i32),
|
||||
timelines::sk_set
|
||||
.eq(sk_set.iter().map(|id| id.0 as i64).collect::<Vec<_>>()),
|
||||
timelines::new_sk_set.eq(new_sk_set
|
||||
.map(|set| set.iter().map(|id| id.0 as i64).collect::<Vec<_>>())),
|
||||
))
|
||||
.execute(conn)
|
||||
@@ -1530,20 +1538,123 @@ impl Persistence {
|
||||
0 => {
|
||||
// TODO(diko): It makes sense to select the current generation
|
||||
// and include it in the error message for better debuggability.
|
||||
Err(DatabaseError::Cas(
|
||||
return Err(DatabaseError::Cas(
|
||||
"Failed to update membership configuration".to_string(),
|
||||
))
|
||||
));
|
||||
}
|
||||
1 => {}
|
||||
_ => {
|
||||
return Err(DatabaseError::Logical(format!(
|
||||
"unexpected number of rows ({updated})"
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
for req in reconcile_requests {
|
||||
let inserted_updated = diesel::insert_into(stpo::table)
|
||||
.values(req)
|
||||
.on_conflict((stpo::tenant_id, stpo::timeline_id, stpo::sk_id))
|
||||
.do_update()
|
||||
.set(req)
|
||||
.filter(stpo::generation.lt(req.generation))
|
||||
.execute(conn)
|
||||
.await?;
|
||||
|
||||
if inserted_updated > 1 {
|
||||
return Err(DatabaseError::Logical(format!(
|
||||
"unexpected number of rows ({inserted_updated})"
|
||||
)));
|
||||
}
|
||||
1 => Ok(()),
|
||||
_ => Err(DatabaseError::Logical(format!(
|
||||
"unexpected number of rows ({updated})"
|
||||
))),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Update the cplane notified generation for a timeline.
|
||||
/// Perform a compare-and-swap (CAS) operation on the timeline's cplane notified generation.
|
||||
/// The update will fail if the specified generation is less than the cplane notified generation
|
||||
/// in the database.
|
||||
pub(crate) async fn update_cplane_notified_generation(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
generation: SafekeeperGeneration,
|
||||
) -> DatabaseResult<()> {
|
||||
use crate::schema::timelines::dsl;
|
||||
|
||||
let tenant_id = &tenant_id;
|
||||
let timeline_id = &timeline_id;
|
||||
self.with_measured_conn(
|
||||
DatabaseOperation::UpdateCplaneNotifiedGeneration,
|
||||
move |conn| {
|
||||
Box::pin(async move {
|
||||
let updated = diesel::update(dsl::timelines)
|
||||
.filter(dsl::tenant_id.eq(&tenant_id.to_string()))
|
||||
.filter(dsl::timeline_id.eq(&timeline_id.to_string()))
|
||||
.filter(dsl::cplane_notified_generation.le(generation.into_inner() as i32))
|
||||
.set(dsl::cplane_notified_generation.eq(generation.into_inner() as i32))
|
||||
.execute(conn)
|
||||
.await?;
|
||||
|
||||
match updated {
|
||||
0 => Err(DatabaseError::Cas(
|
||||
"Failed to update cplane notified generation".to_string(),
|
||||
)),
|
||||
1 => Ok(()),
|
||||
_ => Err(DatabaseError::Logical(format!(
|
||||
"unexpected number of rows ({updated})"
|
||||
))),
|
||||
}
|
||||
})
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Update the sk set notified generation for a timeline.
|
||||
/// Perform a compare-and-swap (CAS) operation on the timeline's sk set notified generation.
|
||||
/// The update will fail if the specified generation is less than the sk set notified generation
|
||||
/// in the database.
|
||||
pub(crate) async fn update_sk_set_notified_generation(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
generation: SafekeeperGeneration,
|
||||
) -> DatabaseResult<()> {
|
||||
use crate::schema::timelines::dsl;
|
||||
|
||||
let tenant_id = &tenant_id;
|
||||
let timeline_id = &timeline_id;
|
||||
self.with_measured_conn(
|
||||
DatabaseOperation::UpdateSkSetNotifiedGeneration,
|
||||
move |conn| {
|
||||
Box::pin(async move {
|
||||
let updated = diesel::update(dsl::timelines)
|
||||
.filter(dsl::tenant_id.eq(&tenant_id.to_string()))
|
||||
.filter(dsl::timeline_id.eq(&timeline_id.to_string()))
|
||||
.filter(dsl::sk_set_notified_generation.le(generation.into_inner() as i32))
|
||||
.set(dsl::sk_set_notified_generation.eq(generation.into_inner() as i32))
|
||||
.execute(conn)
|
||||
.await?;
|
||||
|
||||
match updated {
|
||||
0 => Err(DatabaseError::Cas(
|
||||
"Failed to update sk set notified generation".to_string(),
|
||||
)),
|
||||
1 => Ok(()),
|
||||
_ => Err(DatabaseError::Logical(format!(
|
||||
"unexpected number of rows ({updated})"
|
||||
))),
|
||||
}
|
||||
})
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Load timeline from db. Returns `None` if not present.
|
||||
pub(crate) async fn get_timeline(
|
||||
&self,
|
||||
@@ -2493,6 +2604,7 @@ pub(crate) struct TimelinePersistence {
|
||||
pub(crate) new_sk_set: Option<Vec<i64>>,
|
||||
pub(crate) cplane_notified_generation: i32,
|
||||
pub(crate) deleted_at: Option<chrono::DateTime<chrono::Utc>>,
|
||||
pub(crate) sk_set_notified_generation: i32,
|
||||
}
|
||||
|
||||
/// This is separate from [TimelinePersistence] only because postgres allows NULLs
|
||||
@@ -2511,6 +2623,7 @@ pub(crate) struct TimelineFromDb {
|
||||
pub(crate) new_sk_set: Option<Vec<Option<i64>>>,
|
||||
pub(crate) cplane_notified_generation: i32,
|
||||
pub(crate) deleted_at: Option<chrono::DateTime<chrono::Utc>>,
|
||||
pub(crate) sk_set_notified_generation: i32,
|
||||
}
|
||||
|
||||
impl TimelineFromDb {
|
||||
@@ -2530,6 +2643,7 @@ impl TimelineFromDb {
|
||||
new_sk_set,
|
||||
cplane_notified_generation: self.cplane_notified_generation,
|
||||
deleted_at: self.deleted_at,
|
||||
sk_set_notified_generation: self.sk_set_notified_generation,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -110,7 +110,7 @@ impl Safekeeper {
|
||||
warn_threshold: u32,
|
||||
max_retries: u32,
|
||||
timeout: Duration,
|
||||
cancel: &CancellationToken,
|
||||
cancel_new_retries: &CancellationToken,
|
||||
) -> mgmt_api::Result<T>
|
||||
where
|
||||
O: FnMut(SafekeeperClient) -> F,
|
||||
@@ -161,7 +161,7 @@ impl Safekeeper {
|
||||
self.id,
|
||||
self.base_url(),
|
||||
),
|
||||
cancel,
|
||||
cancel_new_retries,
|
||||
)
|
||||
.await
|
||||
.unwrap_or(Err(mgmt_api::Error::Cancelled))
|
||||
|
||||
@@ -118,6 +118,7 @@ diesel::table! {
|
||||
new_sk_set -> Nullable<Array<Nullable<Int8>>>,
|
||||
cplane_notified_generation -> Int4,
|
||||
deleted_at -> Nullable<Timestamptz>,
|
||||
sk_set_notified_generation -> Int4,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -364,7 +364,12 @@ impl SafekeeperReconcilerInner {
|
||||
http_hosts,
|
||||
tenant_id: req.tenant_id,
|
||||
timeline_id,
|
||||
ignore_tombstone: Some(false),
|
||||
// TODO(diko): get mconf from "timelines" table and pass it here.
|
||||
// Now we use pull_timeline reconciliation only for the timeline creation,
|
||||
// so it's not critical right now.
|
||||
// It could be fixed together with other reconciliation issues:
|
||||
// https://github.com/neondatabase/neon/issues/12189
|
||||
mconf: None,
|
||||
};
|
||||
success = self
|
||||
.reconcile_inner(
|
||||
|
||||
@@ -123,12 +123,17 @@ impl Service {
|
||||
|
||||
/// Perform an operation on a list of safekeepers in parallel with retries.
|
||||
///
|
||||
/// If desired_success_count is set, the remaining operations will be cancelled
|
||||
/// when the desired number of successful responses is reached.
|
||||
///
|
||||
/// Return the results of the operation on each safekeeper in the input order.
|
||||
async fn tenant_timeline_safekeeper_op<T, O, F>(
|
||||
&self,
|
||||
safekeepers: &[Safekeeper],
|
||||
op: O,
|
||||
max_retries: u32,
|
||||
timeout: Duration,
|
||||
desired_success_count: Option<usize>,
|
||||
) -> Result<Vec<mgmt_api::Result<T>>, ApiError>
|
||||
where
|
||||
O: FnMut(SafekeeperClient) -> F + Send + 'static,
|
||||
@@ -136,6 +141,7 @@ impl Service {
|
||||
F: std::future::Future<Output = mgmt_api::Result<T>> + Send + 'static,
|
||||
T: Sync + Send + 'static,
|
||||
{
|
||||
let warn_threshold = std::cmp::min(3, max_retries);
|
||||
let jwt = self
|
||||
.config
|
||||
.safekeeper_jwt_token
|
||||
@@ -143,23 +149,26 @@ impl Service {
|
||||
.map(SecretString::from);
|
||||
let mut joinset = JoinSet::new();
|
||||
|
||||
let cancel_new_retries = CancellationToken::new();
|
||||
|
||||
for (idx, sk) in safekeepers.iter().enumerate() {
|
||||
let sk = sk.clone();
|
||||
let http_client = self.http_client.clone();
|
||||
let jwt = jwt.clone();
|
||||
let op = op.clone();
|
||||
let cancel_new_retries = cancel_new_retries.clone();
|
||||
joinset.spawn(async move {
|
||||
let res = sk
|
||||
.with_client_retries(
|
||||
op,
|
||||
&http_client,
|
||||
&jwt,
|
||||
3,
|
||||
3,
|
||||
warn_threshold,
|
||||
max_retries,
|
||||
// TODO(diko): This is a wrong timeout.
|
||||
// It should be scaled to the retry count.
|
||||
timeout,
|
||||
&CancellationToken::new(),
|
||||
&cancel_new_retries,
|
||||
)
|
||||
.await;
|
||||
(idx, res)
|
||||
@@ -184,6 +193,7 @@ impl Service {
|
||||
// Wait until all tasks finish or timeout is hit, whichever occurs
|
||||
// first.
|
||||
let mut result_count = 0;
|
||||
let mut success_count = 0;
|
||||
loop {
|
||||
if let Ok(res) = tokio::time::timeout_at(reconcile_deadline, joinset.join_next()).await
|
||||
{
|
||||
@@ -198,6 +208,15 @@ impl Service {
|
||||
// Only print errors, as there is no Debug trait for T.
|
||||
res.as_ref().map(|_| ()),
|
||||
);
|
||||
if res.is_ok() {
|
||||
success_count += 1;
|
||||
if desired_success_count == Some(success_count) {
|
||||
// We reached the desired number of successful responses, cancel new retries for
|
||||
// the remaining safekeepers.
|
||||
// It does not cancel already started requests, we will still wait for them.
|
||||
cancel_new_retries.cancel();
|
||||
}
|
||||
}
|
||||
results[idx] = res;
|
||||
result_count += 1;
|
||||
}
|
||||
@@ -247,14 +266,14 @@ impl Service {
|
||||
);
|
||||
}
|
||||
|
||||
let quorum_size = target_sk_count / 2 + 1;
|
||||
let max_retries = 3;
|
||||
|
||||
let results = self
|
||||
.tenant_timeline_safekeeper_op(safekeepers, op, timeout)
|
||||
.tenant_timeline_safekeeper_op(safekeepers, op, max_retries, timeout, Some(quorum_size))
|
||||
.await?;
|
||||
|
||||
// Now check if quorum was reached in results.
|
||||
|
||||
let quorum_size = target_sk_count / 2 + 1;
|
||||
|
||||
let success_count = results.iter().filter(|res| res.is_ok()).count();
|
||||
if success_count < quorum_size {
|
||||
// Failure
|
||||
@@ -312,6 +331,7 @@ impl Service {
|
||||
new_sk_set: None,
|
||||
cplane_notified_generation: 0,
|
||||
deleted_at: None,
|
||||
sk_set_notified_generation: 0,
|
||||
};
|
||||
let inserted = self
|
||||
.persistence
|
||||
@@ -461,6 +481,7 @@ impl Service {
|
||||
new_sk_set: None,
|
||||
cplane_notified_generation: 1,
|
||||
deleted_at: None,
|
||||
sk_set_notified_generation: 1,
|
||||
};
|
||||
let inserted = self
|
||||
.persistence
|
||||
@@ -894,17 +915,21 @@ impl Service {
|
||||
/// If min_position is not None, validates that majority of safekeepers
|
||||
/// reached at least min_position.
|
||||
///
|
||||
/// If update_notified_generation is set, also updates sk_set_notified_generation
|
||||
/// in the timelines table.
|
||||
///
|
||||
/// Return responses from safekeepers in the input order.
|
||||
async fn tenant_timeline_set_membership_quorum(
|
||||
self: &Arc<Self>,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
safekeepers: &[Safekeeper],
|
||||
config: &membership::Configuration,
|
||||
mconf: &membership::Configuration,
|
||||
min_position: Option<(Term, Lsn)>,
|
||||
update_notified_generation: bool,
|
||||
) -> Result<Vec<mgmt_api::Result<TimelineMembershipSwitchResponse>>, ApiError> {
|
||||
let req = TimelineMembershipSwitchRequest {
|
||||
mconf: config.clone(),
|
||||
mconf: mconf.clone(),
|
||||
};
|
||||
|
||||
const SK_SET_MEM_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
@@ -945,28 +970,34 @@ impl Service {
|
||||
.await?;
|
||||
|
||||
for res in results.iter().flatten() {
|
||||
if res.current_conf.generation > config.generation {
|
||||
if res.current_conf.generation > mconf.generation {
|
||||
// Antoher switch_membership raced us.
|
||||
return Err(ApiError::Conflict(format!(
|
||||
"received configuration with generation {} from safekeeper, but expected {}",
|
||||
res.current_conf.generation, config.generation
|
||||
res.current_conf.generation, mconf.generation
|
||||
)));
|
||||
} else if res.current_conf.generation < config.generation {
|
||||
} else if res.current_conf.generation < mconf.generation {
|
||||
// Note: should never happen.
|
||||
// If we get a response, it should be at least the sent generation.
|
||||
tracing::error!(
|
||||
"received configuration with generation {} from safekeeper, but expected {}",
|
||||
res.current_conf.generation,
|
||||
config.generation
|
||||
mconf.generation
|
||||
);
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"received configuration with generation {} from safekeeper, but expected {}",
|
||||
res.current_conf.generation,
|
||||
config.generation
|
||||
mconf.generation
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
if update_notified_generation {
|
||||
self.persistence
|
||||
.update_sk_set_notified_generation(tenant_id, timeline_id, mconf.generation)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
@@ -979,6 +1010,7 @@ impl Service {
|
||||
timeline_id: TimelineId,
|
||||
to_safekeepers: &[Safekeeper],
|
||||
from_safekeepers: &[Safekeeper],
|
||||
mconf: membership::Configuration,
|
||||
) -> Result<(), ApiError> {
|
||||
let http_hosts = from_safekeepers
|
||||
.iter()
|
||||
@@ -997,17 +1029,15 @@ impl Service {
|
||||
.collect::<Vec<_>>()
|
||||
);
|
||||
|
||||
// TODO(diko): need to pass mconf/generation with the request
|
||||
// to properly handle tombstones. Ignore tombstones for now.
|
||||
// Worst case: we leave a timeline on a safekeeper which is not in the current set.
|
||||
let req = PullTimelineRequest {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
http_hosts,
|
||||
ignore_tombstone: Some(true),
|
||||
mconf: Some(mconf),
|
||||
};
|
||||
|
||||
const SK_PULL_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
let max_retries = 3;
|
||||
|
||||
let responses = self
|
||||
.tenant_timeline_safekeeper_op(
|
||||
@@ -1016,7 +1046,9 @@ impl Service {
|
||||
let req = req.clone();
|
||||
async move { client.pull_timeline(&req).await }
|
||||
},
|
||||
max_retries,
|
||||
SK_PULL_TIMELINE_RECONCILE_TIMEOUT,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -1035,20 +1067,28 @@ impl Service {
|
||||
}
|
||||
|
||||
/// Exclude a timeline from safekeepers in parallel with retries.
|
||||
/// If an exclude request is unsuccessful, it will be added to
|
||||
/// the reconciler, and after that the function will succeed.
|
||||
async fn tenant_timeline_safekeeper_exclude(
|
||||
///
|
||||
/// Assumes that the exclude requests are already persistent in the database.
|
||||
///
|
||||
/// The function does best effort: if an exclude request is unsuccessful,
|
||||
/// it will be added to the in-memory reconciler, and the function will succeed anyway.
|
||||
///
|
||||
/// Might fail if there is error accessing the database.
|
||||
async fn tenant_timeline_safekeeper_exclude_reconcile(
|
||||
self: &Arc<Self>,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
safekeepers: &[Safekeeper],
|
||||
config: &membership::Configuration,
|
||||
mconf: &membership::Configuration,
|
||||
) -> Result<(), ApiError> {
|
||||
let req = TimelineMembershipSwitchRequest {
|
||||
mconf: config.clone(),
|
||||
mconf: mconf.clone(),
|
||||
};
|
||||
|
||||
const SK_EXCLUDE_TIMELINE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
// Do not retry failed requests to speed up the finishing phase.
|
||||
// They will be retried in the reconciler.
|
||||
let max_retries = 0;
|
||||
|
||||
let results = self
|
||||
.tenant_timeline_safekeeper_op(
|
||||
@@ -1057,31 +1097,40 @@ impl Service {
|
||||
let req = req.clone();
|
||||
async move { client.exclude_timeline(tenant_id, timeline_id, &req).await }
|
||||
},
|
||||
max_retries,
|
||||
SK_EXCLUDE_TIMELINE_TIMEOUT,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut reconcile_requests = Vec::new();
|
||||
|
||||
for (idx, res) in results.iter().enumerate() {
|
||||
if res.is_err() {
|
||||
let sk_id = safekeepers[idx].skp.id;
|
||||
let pending_op = TimelinePendingOpPersistence {
|
||||
tenant_id: tenant_id.to_string(),
|
||||
timeline_id: timeline_id.to_string(),
|
||||
generation: config.generation.into_inner() as i32,
|
||||
op_kind: SafekeeperTimelineOpKind::Exclude,
|
||||
sk_id,
|
||||
};
|
||||
tracing::info!("writing pending exclude op for sk id {sk_id}");
|
||||
self.persistence.insert_pending_op(pending_op).await?;
|
||||
fail::fail_point!("sk-migration-step-9-mid-exclude", |_| {
|
||||
Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"failpoint sk-migration-step-9-mid-exclude"
|
||||
)))
|
||||
});
|
||||
|
||||
for (idx, res) in results.iter().enumerate() {
|
||||
let sk_id = safekeepers[idx].skp.id;
|
||||
let generation = mconf.generation.into_inner();
|
||||
|
||||
if res.is_ok() {
|
||||
self.persistence
|
||||
.remove_pending_op(
|
||||
tenant_id,
|
||||
Some(timeline_id),
|
||||
NodeId(sk_id as u64),
|
||||
generation,
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
let req = ScheduleRequest {
|
||||
safekeeper: Box::new(safekeepers[idx].clone()),
|
||||
host_list: Vec::new(),
|
||||
tenant_id,
|
||||
timeline_id: Some(timeline_id),
|
||||
generation: config.generation.into_inner(),
|
||||
generation,
|
||||
kind: SafekeeperTimelineOpKind::Exclude,
|
||||
};
|
||||
reconcile_requests.push(req);
|
||||
@@ -1208,6 +1257,22 @@ impl Service {
|
||||
}
|
||||
// It it is the same new_sk_set, we can continue the migration (retry).
|
||||
} else {
|
||||
let prev_finished = timeline.cplane_notified_generation == timeline.generation
|
||||
&& timeline.sk_set_notified_generation == timeline.generation;
|
||||
|
||||
if !prev_finished {
|
||||
// The previous migration is committed, but the finish step failed.
|
||||
// Safekeepers/cplane might not know about the last membership configuration.
|
||||
// Retry the finish step to ensure smooth migration.
|
||||
self.finish_safekeeper_migration_retry(tenant_id, timeline_id, &timeline)
|
||||
.await?;
|
||||
}
|
||||
|
||||
if cur_sk_set == new_sk_set {
|
||||
tracing::info!("timeline is already at the desired safekeeper set");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// 3. No active migration yet.
|
||||
// Increment current generation and put desired_set to new_sk_set.
|
||||
generation = generation.next();
|
||||
@@ -1219,8 +1284,15 @@ impl Service {
|
||||
generation,
|
||||
&cur_sk_set,
|
||||
Some(&new_sk_set),
|
||||
&[],
|
||||
)
|
||||
.await?;
|
||||
|
||||
fail::fail_point!("sk-migration-after-step-3", |_| {
|
||||
Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"failpoint sk-migration-after-step-3"
|
||||
)))
|
||||
});
|
||||
}
|
||||
|
||||
let cur_safekeepers = self.get_safekeepers(&timeline.sk_set)?;
|
||||
@@ -1249,6 +1321,7 @@ impl Service {
|
||||
&cur_safekeepers,
|
||||
&joint_config,
|
||||
None, // no min position
|
||||
true, // update notified generation
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -1266,6 +1339,12 @@ impl Service {
|
||||
"safekeepers set membership updated",
|
||||
);
|
||||
|
||||
fail::fail_point!("sk-migration-after-step-4", |_| {
|
||||
Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"failpoint sk-migration-after-step-4"
|
||||
)))
|
||||
});
|
||||
|
||||
// 5. Initialize timeline on safekeeper(s) from new_sk_set where it doesn't exist yet
|
||||
// by doing pull_timeline from the majority of the current set.
|
||||
|
||||
@@ -1282,9 +1361,16 @@ impl Service {
|
||||
timeline_id,
|
||||
&pull_to_safekeepers,
|
||||
&cur_safekeepers,
|
||||
joint_config.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
fail::fail_point!("sk-migration-after-step-5", |_| {
|
||||
Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"failpoint sk-migration-after-step-5"
|
||||
)))
|
||||
});
|
||||
|
||||
// 6. Call POST bump_term(sync_term) on safekeepers from the new set. Success on majority is enough.
|
||||
|
||||
// TODO(diko): do we need to bump timeline term?
|
||||
@@ -1300,9 +1386,16 @@ impl Service {
|
||||
&new_safekeepers,
|
||||
&joint_config,
|
||||
Some(sync_position),
|
||||
false, // we're just waiting for sync position, don't update notified generation
|
||||
)
|
||||
.await?;
|
||||
|
||||
fail::fail_point!("sk-migration-after-step-7", |_| {
|
||||
Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"failpoint sk-migration-after-step-7"
|
||||
)))
|
||||
});
|
||||
|
||||
// 8. Create new_conf: Configuration incrementing joint_conf generation and
|
||||
// having new safekeeper set as sk_set and None new_sk_set.
|
||||
|
||||
@@ -1314,45 +1407,55 @@ impl Service {
|
||||
new_members: None,
|
||||
};
|
||||
|
||||
self.persistence
|
||||
.update_timeline_membership(tenant_id, timeline_id, generation, &new_sk_set, None)
|
||||
.await?;
|
||||
|
||||
// TODO(diko): at this point we have already updated the timeline in the database,
|
||||
// but we still need to notify safekeepers and cplane about the new configuration,
|
||||
// and put delition of the timeline from the old safekeepers into the reconciler.
|
||||
// Ideally it should be done atomically, but now it's not.
|
||||
// Worst case: the timeline is not deleted from old safekeepers,
|
||||
// the compute may require both quorums till the migration is retried and completed.
|
||||
|
||||
self.tenant_timeline_set_membership_quorum(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&new_safekeepers,
|
||||
&new_conf,
|
||||
None, // no min position
|
||||
)
|
||||
.await?;
|
||||
|
||||
let new_ids: HashSet<NodeId> = new_safekeepers.iter().map(|sk| sk.get_id()).collect();
|
||||
let exclude_safekeepers = cur_safekeepers
|
||||
.into_iter()
|
||||
.filter(|sk| !new_ids.contains(&sk.get_id()))
|
||||
.collect::<Vec<_>>();
|
||||
self.tenant_timeline_safekeeper_exclude(
|
||||
let exclude_requests = exclude_safekeepers
|
||||
.iter()
|
||||
.map(|sk| TimelinePendingOpPersistence {
|
||||
sk_id: sk.skp.id,
|
||||
tenant_id: tenant_id.to_string(),
|
||||
timeline_id: timeline_id.to_string(),
|
||||
generation: generation.into_inner() as i32,
|
||||
op_kind: SafekeeperTimelineOpKind::Exclude,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
self.persistence
|
||||
.update_timeline_membership(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
generation,
|
||||
&new_sk_set,
|
||||
None,
|
||||
&exclude_requests,
|
||||
)
|
||||
.await?;
|
||||
|
||||
fail::fail_point!("sk-migration-after-step-8", |_| {
|
||||
Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"failpoint sk-migration-after-step-8"
|
||||
)))
|
||||
});
|
||||
|
||||
// At this point we have already updated the timeline in the database, so the final
|
||||
// membership configuration is commited and the migration is not abortable anymore.
|
||||
// But safekeepers and cplane/compute still need to be notified about the new configuration.
|
||||
// The [`Self::finish_safekeeper_migration`] does exactly that: notifies everyone about
|
||||
// the new configuration and reconciles excluded safekeepers.
|
||||
// If it fails, the safkeeper migration call should be retried.
|
||||
|
||||
self.finish_safekeeper_migration(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&exclude_safekeepers,
|
||||
&new_safekeepers,
|
||||
&new_conf,
|
||||
&exclude_safekeepers,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Notify cplane/compute about the membership change AFTER changing the membership on safekeepers.
|
||||
// This way the compute will stop talking to excluded safekeepers only after we stop requiring to
|
||||
// collect a quorum from them.
|
||||
self.cplane_notify_safekeepers(tenant_id, timeline_id, &new_conf)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1396,6 +1499,130 @@ impl Service {
|
||||
ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"failed to notify cplane about safekeeper membership change: {err}"
|
||||
))
|
||||
})
|
||||
})?;
|
||||
|
||||
self.persistence
|
||||
.update_cplane_notified_generation(tenant_id, timeline_id, mconf.generation)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Finish safekeeper migration.
|
||||
///
|
||||
/// It is the last step of the safekeeper migration.
|
||||
///
|
||||
/// Notifies safekeepers and cplane about the final membership configuration,
|
||||
/// reconciles excluded safekeepers and updates *_notified_generation in the database.
|
||||
async fn finish_safekeeper_migration(
|
||||
self: &Arc<Self>,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
new_safekeepers: &[Safekeeper],
|
||||
new_conf: &membership::Configuration,
|
||||
exclude_safekeepers: &[Safekeeper],
|
||||
) -> Result<(), ApiError> {
|
||||
// 9. Call PUT configuration on safekeepers from the new set, delivering them new_conf.
|
||||
// Also try to exclude safekeepers and notify cplane about the membership change.
|
||||
|
||||
self.tenant_timeline_set_membership_quorum(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
new_safekeepers,
|
||||
new_conf,
|
||||
None, // no min position
|
||||
true, // update notified generation
|
||||
)
|
||||
.await?;
|
||||
|
||||
fail::fail_point!("sk-migration-step-9-after-set-membership", |_| {
|
||||
Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"failpoint sk-migration-step-9-after-set-membership"
|
||||
)))
|
||||
});
|
||||
|
||||
self.tenant_timeline_safekeeper_exclude_reconcile(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
exclude_safekeepers,
|
||||
new_conf,
|
||||
)
|
||||
.await?;
|
||||
|
||||
fail::fail_point!("sk-migration-step-9-after-exclude", |_| {
|
||||
Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"failpoint sk-migration-step-9-after-exclude"
|
||||
)))
|
||||
});
|
||||
|
||||
// Notify cplane/compute about the membership change AFTER changing the membership on safekeepers.
|
||||
// This way the compute will stop talking to excluded safekeepers only after we stop requiring to
|
||||
// collect a quorum from them.
|
||||
self.cplane_notify_safekeepers(tenant_id, timeline_id, new_conf)
|
||||
.await?;
|
||||
|
||||
fail::fail_point!("sk-migration-after-step-9", |_| {
|
||||
Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"failpoint sk-migration-after-step-9"
|
||||
)))
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Same as [`Self::finish_safekeeper_migration`], but restores the migration state from the database.
|
||||
/// It's used when the migration failed during the finish step and we need to retry it.
|
||||
async fn finish_safekeeper_migration_retry(
|
||||
self: &Arc<Self>,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
timeline: &TimelinePersistence,
|
||||
) -> Result<(), ApiError> {
|
||||
if timeline.new_sk_set.is_some() {
|
||||
// Logical error, should never happen.
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"can't finish timeline migration for {tenant_id}/{timeline_id}: new_sk_set is not None"
|
||||
)));
|
||||
}
|
||||
|
||||
let cur_safekeepers = self.get_safekeepers(&timeline.sk_set)?;
|
||||
let cur_sk_member_set =
|
||||
Self::make_member_set(&cur_safekeepers).map_err(ApiError::InternalServerError)?;
|
||||
|
||||
let mconf = membership::Configuration {
|
||||
generation: SafekeeperGeneration::new(timeline.generation as u32),
|
||||
members: cur_sk_member_set,
|
||||
new_members: None,
|
||||
};
|
||||
|
||||
// We might have failed between commiting reconciliation requests and adding them to the in-memory reconciler.
|
||||
// Reload them from the database.
|
||||
let pending_ops = self
|
||||
.persistence
|
||||
.list_pending_ops_for_timeline(tenant_id, timeline_id)
|
||||
.await?;
|
||||
|
||||
let mut exclude_sk_ids = Vec::new();
|
||||
|
||||
for op in pending_ops {
|
||||
if op.op_kind == SafekeeperTimelineOpKind::Exclude
|
||||
&& op.generation == timeline.generation
|
||||
{
|
||||
exclude_sk_ids.push(op.sk_id);
|
||||
}
|
||||
}
|
||||
|
||||
let exclude_safekeepers = self.get_safekeepers(&exclude_sk_ids)?;
|
||||
|
||||
self.finish_safekeeper_migration(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&cur_safekeepers,
|
||||
&mconf,
|
||||
&exclude_safekeepers,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ def connection_parameters_to_env(params: dict[str, str]) -> dict[str, str]:
|
||||
|
||||
# Some API calls not yet implemented.
|
||||
# You may want to copy not-yet-implemented methods from the PR https://github.com/neondatabase/neon/pull/11305
|
||||
@final
|
||||
class NeonAPI:
|
||||
def __init__(self, neon_api_key: str, neon_api_base_url: str):
|
||||
self.__neon_api_key = neon_api_key
|
||||
@@ -170,7 +171,7 @@ class NeonAPI:
|
||||
protected: bool | None = None,
|
||||
archived: bool | None = None,
|
||||
init_source: str | None = None,
|
||||
add_endpoint=True,
|
||||
add_endpoint: bool = True,
|
||||
) -> dict[str, Any]:
|
||||
data: dict[str, Any] = {}
|
||||
if add_endpoint:
|
||||
@@ -226,6 +227,16 @@ class NeonAPI:
|
||||
)
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def reset_to_parent(self, project_id: str, branch_id: str) -> dict[str, Any]:
|
||||
resp = self.__request(
|
||||
"POST",
|
||||
f"/projects/{project_id}/branches/{branch_id}/reset_to_parent",
|
||||
headers={
|
||||
"Accept": "application/json",
|
||||
},
|
||||
)
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def restore_branch(
|
||||
self,
|
||||
project_id: str,
|
||||
|
||||
@@ -1540,6 +1540,17 @@ class NeonEnv:
|
||||
|
||||
raise RuntimeError(f"Pageserver with ID {id} not found")
|
||||
|
||||
def get_safekeeper(self, id: int) -> Safekeeper:
|
||||
"""
|
||||
Look up a safekeeper by its ID.
|
||||
"""
|
||||
|
||||
for sk in self.safekeepers:
|
||||
if sk.id == id:
|
||||
return sk
|
||||
|
||||
raise RuntimeError(f"Safekeeper with ID {id} not found")
|
||||
|
||||
def get_tenant_pageserver(self, tenant_id: TenantId | TenantShardId):
|
||||
"""
|
||||
Get the NeonPageserver where this tenant shard is currently attached, according
|
||||
@@ -5391,15 +5402,24 @@ class Safekeeper(LogUtils):
|
||||
return timeline_status.commit_lsn
|
||||
|
||||
def pull_timeline(
|
||||
self, srcs: list[Safekeeper], tenant_id: TenantId, timeline_id: TimelineId
|
||||
self,
|
||||
srcs: list[Safekeeper],
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
mconf: MembershipConfiguration | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""
|
||||
pull_timeline from srcs to self.
|
||||
"""
|
||||
src_https = [f"http://localhost:{sk.port.http}" for sk in srcs]
|
||||
res = self.http_client().pull_timeline(
|
||||
{"tenant_id": str(tenant_id), "timeline_id": str(timeline_id), "http_hosts": src_https}
|
||||
)
|
||||
body: dict[str, Any] = {
|
||||
"tenant_id": str(tenant_id),
|
||||
"timeline_id": str(timeline_id),
|
||||
"http_hosts": src_https,
|
||||
}
|
||||
if mconf is not None:
|
||||
body["mconf"] = mconf.__dict__
|
||||
res = self.http_client().pull_timeline(body)
|
||||
src_ids = [sk.id for sk in srcs]
|
||||
log.info(f"finished pulling timeline from {src_ids} to {self.id}")
|
||||
return res
|
||||
|
||||
@@ -152,6 +152,8 @@ DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS = [
|
||||
".*reconciler.*neon_local error.*",
|
||||
# Tenant rate limits may fire in tests that submit lots of API requests.
|
||||
".*tenant \\S+ is rate limited.*",
|
||||
# Reconciliations may get stuck/delayed e.g. in chaos tests.
|
||||
".*background_reconcile: Shard reconciliation is stuck.*",
|
||||
]
|
||||
|
||||
|
||||
|
||||
@@ -96,6 +96,11 @@ class NeonBranch:
|
||||
)
|
||||
self.benchmark: subprocess.Popen[Any] | None = None
|
||||
self.updated_at: datetime = datetime.fromisoformat(branch["branch"]["updated_at"])
|
||||
self.parent_timestamp: datetime = (
|
||||
datetime.fromisoformat(branch["branch"]["parent_timestamp"])
|
||||
if "parent_timestamp" in branch["branch"]
|
||||
else datetime.fromtimestamp(0, tz=UTC)
|
||||
)
|
||||
self.connect_env: dict[str, str] | None = None
|
||||
if self.connection_parameters:
|
||||
self.connect_env = {
|
||||
@@ -113,8 +118,18 @@ class NeonBranch:
|
||||
"""
|
||||
return f"{self.id}{'(r)' if self.id in self.project.reset_branches else ''}, parent: {self.parent}"
|
||||
|
||||
def create_child_branch(self) -> NeonBranch | None:
|
||||
return self.project.create_branch(self.id)
|
||||
def random_time(self) -> datetime:
|
||||
min_time = max(
|
||||
self.updated_at + timedelta(seconds=1),
|
||||
self.project.min_time,
|
||||
self.parent_timestamp + timedelta(seconds=1),
|
||||
)
|
||||
max_time = datetime.now(UTC) - timedelta(seconds=1)
|
||||
log.info("min_time: %s, max_time: %s", min_time, max_time)
|
||||
return (min_time + (max_time - min_time) * random.random()).replace(microsecond=0)
|
||||
|
||||
def create_child_branch(self, parent_timestamp: datetime | None = None) -> NeonBranch | None:
|
||||
return self.project.create_branch(self.id, parent_timestamp)
|
||||
|
||||
def create_ro_endpoint(self) -> NeonEndpoint | None:
|
||||
if not self.project.check_limit_endpoints():
|
||||
@@ -136,21 +151,33 @@ class NeonBranch:
|
||||
def terminate_benchmark(self) -> None:
|
||||
self.project.terminate_benchmark(self.id)
|
||||
|
||||
def reset_to_parent(self) -> None:
|
||||
for ep in self.project.endpoints.values():
|
||||
if ep.type == "read_only":
|
||||
ep.terminate_benchmark()
|
||||
self.terminate_benchmark()
|
||||
res = self.neon_api.reset_to_parent(self.project_id, self.id)
|
||||
self.updated_at = datetime.fromisoformat(res["branch"]["updated_at"])
|
||||
self.parent_timestamp = datetime.fromisoformat(res["branch"]["parent_timestamp"])
|
||||
self.project.wait()
|
||||
self.start_benchmark()
|
||||
for ep in self.project.endpoints.values():
|
||||
if ep.type == "read_only":
|
||||
ep.start_benchmark()
|
||||
|
||||
def restore_random_time(self) -> None:
|
||||
"""
|
||||
Does PITR, i.e. calls the reset API call on the same branch to the random time in the past
|
||||
"""
|
||||
min_time = self.updated_at + timedelta(seconds=1)
|
||||
max_time = datetime.now(UTC) - timedelta(seconds=1)
|
||||
target_time = (min_time + (max_time - min_time) * random.random()).replace(microsecond=0)
|
||||
res = self.restore(
|
||||
self.id,
|
||||
source_timestamp=target_time.isoformat().replace("+00:00", "Z"),
|
||||
source_timestamp=self.random_time().isoformat().replace("+00:00", "Z"),
|
||||
preserve_under_name=self.project.gen_restore_name(),
|
||||
)
|
||||
if res is None:
|
||||
return
|
||||
self.updated_at = datetime.fromisoformat(res["branch"]["updated_at"])
|
||||
self.parent_timestamp = datetime.fromisoformat(res["branch"]["parent_timestamp"])
|
||||
parent_id: str = res["branch"]["parent_id"]
|
||||
# Creates an object for the parent branch
|
||||
# After the reset operation a new parent branch is created
|
||||
@@ -225,6 +252,7 @@ class NeonProject:
|
||||
self.restart_pgbench_on_console_errors: bool = False
|
||||
self.limits: dict[str, Any] = self.get_limits()["limits"]
|
||||
self.read_only_endpoints_total: int = 0
|
||||
self.min_time: datetime = datetime.now(UTC)
|
||||
|
||||
def get_limits(self) -> dict[str, Any]:
|
||||
return self.neon_api.get_project_limits(self.id)
|
||||
@@ -251,11 +279,20 @@ class NeonProject:
|
||||
)
|
||||
return False
|
||||
|
||||
def create_branch(self, parent_id: str | None = None) -> NeonBranch | None:
|
||||
def create_branch(
|
||||
self, parent_id: str | None = None, parent_timestamp: datetime | None = None
|
||||
) -> NeonBranch | None:
|
||||
self.wait()
|
||||
if not self.check_limit_branches():
|
||||
return None
|
||||
branch_def = self.neon_api.create_branch(self.id, parent_id=parent_id)
|
||||
if parent_timestamp:
|
||||
log.info("Timestamp: %s", parent_timestamp)
|
||||
parent_timestamp_str: str | None = None
|
||||
if parent_timestamp:
|
||||
parent_timestamp_str = parent_timestamp.isoformat().replace("+00:00", "Z")
|
||||
branch_def = self.neon_api.create_branch(
|
||||
self.id, parent_id=parent_id, parent_timestamp=parent_timestamp_str
|
||||
)
|
||||
new_branch = NeonBranch(self, branch_def)
|
||||
self.wait()
|
||||
return new_branch
|
||||
@@ -288,6 +325,14 @@ class NeonProject:
|
||||
if parent.id in self.reset_branches:
|
||||
parent.delete()
|
||||
|
||||
def get_random_leaf_branch(self) -> NeonBranch | None:
|
||||
target: NeonBranch | None = None
|
||||
if self.leaf_branches:
|
||||
target = random.choice(list(self.leaf_branches.values()))
|
||||
else:
|
||||
log.info("No leaf branches found")
|
||||
return target
|
||||
|
||||
def delete_endpoint(self, endpoint_id: str) -> None:
|
||||
self.terminate_benchmark(endpoint_id)
|
||||
self.neon_api.delete_endpoint(self.id, endpoint_id)
|
||||
@@ -390,24 +435,22 @@ def do_action(project: NeonProject, action: str) -> bool:
|
||||
Runs the action
|
||||
"""
|
||||
log.info("Action: %s", action)
|
||||
if action == "new_branch":
|
||||
log.info("Trying to create a new branch")
|
||||
if action == "new_branch" or action == "new_branch_random_time":
|
||||
use_random_time: bool = action == "new_branch_random_time"
|
||||
log.info("Trying to create a new branch %s", "random time" if use_random_time else "")
|
||||
parent = project.branches[
|
||||
random.choice(list(set(project.branches.keys()) - project.reset_branches))
|
||||
]
|
||||
child = parent.create_child_branch()
|
||||
child = parent.create_child_branch(parent.random_time() if use_random_time else None)
|
||||
if child is None:
|
||||
return False
|
||||
log.info("Created branch %s", child)
|
||||
child.start_benchmark()
|
||||
elif action == "delete_branch":
|
||||
if project.leaf_branches:
|
||||
target: NeonBranch = random.choice(list(project.leaf_branches.values()))
|
||||
log.info("Trying to delete branch %s", target)
|
||||
target.delete()
|
||||
else:
|
||||
log.info("Leaf branches not found, skipping")
|
||||
if (target := project.get_random_leaf_branch()) is None:
|
||||
return False
|
||||
log.info("Trying to delete branch %s", target)
|
||||
target.delete()
|
||||
elif action == "new_ro_endpoint":
|
||||
ep = random.choice(
|
||||
[br for br in project.branches.values() if br.id not in project.reset_branches]
|
||||
@@ -427,13 +470,15 @@ def do_action(project: NeonProject, action: str) -> bool:
|
||||
target_ep.delete()
|
||||
log.info("endpoint %s deleted", target_ep.id)
|
||||
elif action == "restore_random_time":
|
||||
if project.leaf_branches:
|
||||
br: NeonBranch = random.choice(list(project.leaf_branches.values()))
|
||||
log.info("Restore %s", br)
|
||||
br.restore_random_time()
|
||||
else:
|
||||
log.info("No leaf branches found")
|
||||
if (target := project.get_random_leaf_branch()) is None:
|
||||
return False
|
||||
log.info("Restore %s", target)
|
||||
target.restore_random_time()
|
||||
elif action == "reset_to_parent":
|
||||
if (target := project.get_random_leaf_branch()) is None:
|
||||
return False
|
||||
log.info("Reset to parent %s", target)
|
||||
target.reset_to_parent()
|
||||
else:
|
||||
raise ValueError(f"The action {action} is unknown")
|
||||
return True
|
||||
@@ -460,17 +505,22 @@ def test_api_random(
|
||||
pg_bin, project = setup_class
|
||||
# Here we can assign weights
|
||||
ACTIONS = (
|
||||
("new_branch", 1.5),
|
||||
("new_branch", 1.2),
|
||||
("new_branch_random_time", 0.5),
|
||||
("new_ro_endpoint", 1.4),
|
||||
("delete_ro_endpoint", 0.8),
|
||||
("delete_branch", 1.0),
|
||||
("restore_random_time", 1.2),
|
||||
("delete_branch", 1.2),
|
||||
("restore_random_time", 0.9),
|
||||
("reset_to_parent", 0.3),
|
||||
)
|
||||
if num_ops_env := os.getenv("NUM_OPERATIONS"):
|
||||
num_operations = int(num_ops_env)
|
||||
else:
|
||||
num_operations = 250
|
||||
pg_bin.run(["pgbench", "-i", "-I", "dtGvp", "-s100"], env=project.main_branch.connect_env)
|
||||
# To not go to the past where pgbench tables do not exist
|
||||
time.sleep(1)
|
||||
project.min_time = datetime.now(UTC)
|
||||
for _ in range(num_operations):
|
||||
log.info("Starting action #%s", _ + 1)
|
||||
while not do_action(
|
||||
|
||||
@@ -50,11 +50,15 @@ def test_feature_flag(neon_env_builder: NeonEnvBuilder):
|
||||
)["result"]
|
||||
)
|
||||
|
||||
env.endpoints.create_start("main") # trigger basebackup
|
||||
env.pageserver.http_client().force_refresh_feature_flag(env.initial_tenant)
|
||||
|
||||
# Check if the properties exist
|
||||
result = env.pageserver.http_client().evaluate_feature_flag_multivariate(
|
||||
env.initial_tenant, "test-feature-flag"
|
||||
)
|
||||
|
||||
assert "tenant_remote_size_mb" in result["properties"]
|
||||
assert "tenant_db_count_max" in result["properties"]
|
||||
assert "tenant_rel_count_max" in result["properties"]
|
||||
assert "tenant_id" in result["properties"]
|
||||
|
||||
@@ -1,13 +1,25 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import StorageControllerApiException
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
# TODO(diko): pageserver spams with various errors during safekeeper migration.
|
||||
# Fix the code so it handles the migration better.
|
||||
PAGESERVER_ALLOWED_ERRORS = [
|
||||
".*Timeline .* was cancelled and cannot be used anymore.*",
|
||||
".*Timeline .* has been deleted.*",
|
||||
".*Timeline .* was not found in global map.*",
|
||||
".*wal receiver task finished with an error.*",
|
||||
]
|
||||
|
||||
|
||||
def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
@@ -24,16 +36,7 @@ def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder):
|
||||
"timeline_safekeeper_count": 1,
|
||||
}
|
||||
env = neon_env_builder.init_start()
|
||||
# TODO(diko): pageserver spams with various errors during safekeeper migration.
|
||||
# Fix the code so it handles the migration better.
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*Timeline .* was cancelled and cannot be used anymore.*",
|
||||
".*Timeline .* has been deleted.*",
|
||||
".*Timeline .* was not found in global map.*",
|
||||
".*wal receiver task finished with an error.*",
|
||||
]
|
||||
)
|
||||
env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS)
|
||||
|
||||
ep = env.endpoints.create("main", tenant_id=env.initial_tenant)
|
||||
|
||||
@@ -42,15 +45,23 @@ def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder):
|
||||
assert len(mconf["sk_set"]) == 1
|
||||
assert mconf["generation"] == 1
|
||||
|
||||
current_sk = mconf["sk_set"][0]
|
||||
|
||||
ep.start(safekeeper_generation=1, safekeepers=mconf["sk_set"])
|
||||
ep.safe_psql("CREATE EXTENSION neon_test_utils;")
|
||||
ep.safe_psql("CREATE TABLE t(a int)")
|
||||
|
||||
expected_gen = 1
|
||||
|
||||
for active_sk in range(1, 4):
|
||||
env.storage_controller.migrate_safekeepers(
|
||||
env.initial_tenant, env.initial_timeline, [active_sk]
|
||||
)
|
||||
|
||||
if active_sk != current_sk:
|
||||
expected_gen += 2
|
||||
current_sk = active_sk
|
||||
|
||||
other_sks = [sk for sk in range(1, 4) if sk != active_sk]
|
||||
|
||||
for sk in other_sks:
|
||||
@@ -65,9 +76,6 @@ def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
assert ep.safe_psql("SELECT * FROM t") == [(i,) for i in range(1, 4)]
|
||||
|
||||
# 1 initial generation + 2 migrations on each loop iteration.
|
||||
expected_gen = 1 + 2 * 3
|
||||
|
||||
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
|
||||
assert mconf["generation"] == expected_gen
|
||||
|
||||
@@ -113,3 +121,198 @@ def test_new_sk_set_validation(neon_env_builder: NeonEnvBuilder):
|
||||
env.storage_controller.safekeeper_scheduling_policy(decom_sk, "Decomissioned")
|
||||
|
||||
expect_fail([sk_set[0], decom_sk], "decomissioned")
|
||||
|
||||
|
||||
def test_safekeeper_migration_common_set_failpoints(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test that safekeeper migration handles failures well.
|
||||
|
||||
Two main conditions are checked:
|
||||
1. safekeeper migration handler can be retried on different failures.
|
||||
2. writes do not stuck if sk_set and new_sk_set have a quorum in common.
|
||||
"""
|
||||
neon_env_builder.num_safekeepers = 4
|
||||
neon_env_builder.storage_controller_config = {
|
||||
"timelines_onto_safekeepers": True,
|
||||
"timeline_safekeeper_count": 3,
|
||||
}
|
||||
env = neon_env_builder.init_start()
|
||||
env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS)
|
||||
|
||||
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
|
||||
assert len(mconf["sk_set"]) == 3
|
||||
assert mconf["generation"] == 1
|
||||
|
||||
ep = env.endpoints.create("main", tenant_id=env.initial_tenant)
|
||||
ep.start(safekeeper_generation=1, safekeepers=mconf["sk_set"])
|
||||
ep.safe_psql("CREATE EXTENSION neon_test_utils;")
|
||||
ep.safe_psql("CREATE TABLE t(a int)")
|
||||
|
||||
excluded_sk = mconf["sk_set"][-1]
|
||||
added_sk = [sk.id for sk in env.safekeepers if sk.id not in mconf["sk_set"]][0]
|
||||
new_sk_set = mconf["sk_set"][:-1] + [added_sk]
|
||||
log.info(f"migrating sk set from {mconf['sk_set']} to {new_sk_set}")
|
||||
|
||||
failpoints = [
|
||||
"sk-migration-after-step-3",
|
||||
"sk-migration-after-step-4",
|
||||
"sk-migration-after-step-5",
|
||||
"sk-migration-after-step-7",
|
||||
"sk-migration-after-step-8",
|
||||
"sk-migration-step-9-after-set-membership",
|
||||
"sk-migration-step-9-mid-exclude",
|
||||
"sk-migration-step-9-after-exclude",
|
||||
"sk-migration-after-step-9",
|
||||
]
|
||||
|
||||
for i, fp in enumerate(failpoints):
|
||||
env.storage_controller.configure_failpoints((fp, "return(1)"))
|
||||
|
||||
with pytest.raises(StorageControllerApiException, match=f"failpoint {fp}"):
|
||||
env.storage_controller.migrate_safekeepers(
|
||||
env.initial_tenant, env.initial_timeline, new_sk_set
|
||||
)
|
||||
ep.safe_psql(f"INSERT INTO t VALUES ({i})")
|
||||
|
||||
env.storage_controller.configure_failpoints((fp, "off"))
|
||||
|
||||
# No failpoints, migration should succeed.
|
||||
env.storage_controller.migrate_safekeepers(env.initial_tenant, env.initial_timeline, new_sk_set)
|
||||
|
||||
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
|
||||
assert mconf["new_sk_set"] is None
|
||||
assert mconf["sk_set"] == new_sk_set
|
||||
assert mconf["generation"] == 3
|
||||
|
||||
ep.clear_buffers()
|
||||
assert ep.safe_psql("SELECT * FROM t") == [(i,) for i in range(len(failpoints))]
|
||||
assert ep.safe_psql("SHOW neon.safekeepers")[0][0].startswith("g#3:")
|
||||
|
||||
# Check that we didn't forget to remove the timeline on the excluded safekeeper.
|
||||
with pytest.raises(requests.exceptions.HTTPError) as exc:
|
||||
env.safekeepers[excluded_sk - 1].http_client().timeline_status(
|
||||
env.initial_tenant, env.initial_timeline
|
||||
)
|
||||
assert exc.value.response.status_code == 404
|
||||
assert (
|
||||
f"timeline {env.initial_tenant}/{env.initial_timeline} deleted" in exc.value.response.text
|
||||
)
|
||||
|
||||
|
||||
def test_sk_generation_aware_tombstones(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test that safekeeper respects generations:
|
||||
1. Check that migration back and forth between two safekeepers works.
|
||||
2. Check that sk refuses to execute requests with stale generation.
|
||||
"""
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
neon_env_builder.storage_controller_config = {
|
||||
"timelines_onto_safekeepers": True,
|
||||
"timeline_safekeeper_count": 1,
|
||||
}
|
||||
env = neon_env_builder.init_start()
|
||||
env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS)
|
||||
|
||||
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
|
||||
assert mconf["new_sk_set"] is None
|
||||
assert len(mconf["sk_set"]) == 1
|
||||
cur_sk = mconf["sk_set"][0]
|
||||
|
||||
second_sk, third_sk = [sk.id for sk in env.safekeepers if sk.id != cur_sk]
|
||||
cur_gen = 1
|
||||
|
||||
# Pull the timeline manually to third_sk, so the timeline exists there with stale generation.
|
||||
# This is needed for the test later.
|
||||
env.get_safekeeper(third_sk).pull_timeline(
|
||||
[env.get_safekeeper(cur_sk)], env.initial_tenant, env.initial_timeline
|
||||
)
|
||||
|
||||
def expect_deleted(sk_id: int):
|
||||
with pytest.raises(requests.exceptions.HTTPError, match="Not Found") as exc:
|
||||
env.get_safekeeper(sk_id).http_client().timeline_status(
|
||||
env.initial_tenant, env.initial_timeline
|
||||
)
|
||||
assert exc.value.response.status_code == 404
|
||||
assert re.match(r".*timeline .* deleted.*", exc.value.response.text)
|
||||
|
||||
def get_mconf(sk_id: int):
|
||||
status = (
|
||||
env.get_safekeeper(sk_id)
|
||||
.http_client()
|
||||
.timeline_status(env.initial_tenant, env.initial_timeline)
|
||||
)
|
||||
assert status.mconf is not None
|
||||
return status.mconf
|
||||
|
||||
def migrate():
|
||||
nonlocal cur_sk, second_sk, cur_gen
|
||||
env.storage_controller.migrate_safekeepers(
|
||||
env.initial_tenant, env.initial_timeline, [second_sk]
|
||||
)
|
||||
cur_sk, second_sk = second_sk, cur_sk
|
||||
cur_gen += 2
|
||||
|
||||
# Migrate the timeline back and forth between cur_sk and second_sk.
|
||||
for _i in range(3):
|
||||
migrate()
|
||||
# Timeline should exist on cur_sk.
|
||||
assert get_mconf(cur_sk).generation == cur_gen
|
||||
# Timeline should be deleted on second_sk.
|
||||
expect_deleted(second_sk)
|
||||
|
||||
# Remember current mconf.
|
||||
mconf = get_mconf(cur_sk)
|
||||
|
||||
# Migrate the timeline one more time.
|
||||
# It increases the generation by 2.
|
||||
migrate()
|
||||
|
||||
# Check that sk refuses to execute the exclude request with the old mconf.
|
||||
with pytest.raises(requests.exceptions.HTTPError, match="Conflict") as exc:
|
||||
env.get_safekeeper(cur_sk).http_client().timeline_exclude(
|
||||
env.initial_tenant, env.initial_timeline, mconf
|
||||
)
|
||||
assert re.match(r".*refused to switch into excluding mconf.*", exc.value.response.text)
|
||||
# We shouldn't have deleted the timeline.
|
||||
assert get_mconf(cur_sk).generation == cur_gen
|
||||
|
||||
# Check that sk refuses to execute the pull_timeline request with the old mconf.
|
||||
# Note: we try to pull from third_sk, which has a timeline with stale generation.
|
||||
# Thus, we bypass some preliminary generation checks and actually test tombstones.
|
||||
with pytest.raises(requests.exceptions.HTTPError, match="Conflict") as exc:
|
||||
env.get_safekeeper(second_sk).pull_timeline(
|
||||
[env.get_safekeeper(third_sk)], env.initial_tenant, env.initial_timeline, mconf
|
||||
)
|
||||
assert re.match(r".*Timeline .* deleted.*", exc.value.response.text)
|
||||
# The timeline should remain deleted.
|
||||
expect_deleted(second_sk)
|
||||
|
||||
|
||||
def test_migrate_from_unavailable_sk(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test that we can migrate from an unavailable safekeeper
|
||||
if the quorum is still alive.
|
||||
"""
|
||||
neon_env_builder.num_safekeepers = 4
|
||||
neon_env_builder.storage_controller_config = {
|
||||
"timelines_onto_safekeepers": True,
|
||||
"timeline_safekeeper_count": 3,
|
||||
}
|
||||
env = neon_env_builder.init_start()
|
||||
env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS)
|
||||
|
||||
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
|
||||
assert len(mconf["sk_set"]) == 3
|
||||
|
||||
another_sk = [sk.id for sk in env.safekeepers if sk.id not in mconf["sk_set"]][0]
|
||||
|
||||
unavailable_sk = mconf["sk_set"][0]
|
||||
env.get_safekeeper(unavailable_sk).stop()
|
||||
|
||||
new_sk_set = mconf["sk_set"][1:] + [another_sk]
|
||||
|
||||
env.storage_controller.migrate_safekeepers(env.initial_tenant, env.initial_timeline, new_sk_set)
|
||||
|
||||
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
|
||||
assert mconf["sk_set"] == new_sk_set
|
||||
assert mconf["generation"] == 3
|
||||
|
||||
9
test_runner/sql_regress/expected/neon-spgist.out
Normal file
9
test_runner/sql_regress/expected/neon-spgist.out
Normal file
@@ -0,0 +1,9 @@
|
||||
-- Test unlogged build of SPGIST index (no "Page evicted with zero LSN" error)
|
||||
create table spgist_point_tbl(id int4, p point);
|
||||
create index spgist_point_idx on spgist_point_tbl using spgist(p) with (fillfactor = 25);
|
||||
insert into spgist_point_tbl (id, p) select g, point(g*10, g*10) from generate_series(1, 10000) g;
|
||||
insert into spgist_point_tbl (id, p) select g, point(g*10, g*10) from generate_series(1, 10000) g;
|
||||
insert into spgist_point_tbl (id, p) select g+100000, point(g*10+1, g*10+1) from generate_series(1, 10000) g;
|
||||
vacuum spgist_point_tbl;
|
||||
insert into spgist_point_tbl (id, p) select g+100000, point(g*10+1, g*10+1) from generate_series(1, 10000) g;
|
||||
checkpoint;
|
||||
@@ -9,5 +9,6 @@ test: neon-rel-truncate
|
||||
test: neon-clog
|
||||
test: neon-test-utils
|
||||
test: neon-vacuum-full
|
||||
test: neon-event-triggers
|
||||
test: neon-subxacts
|
||||
test: neon-spgist
|
||||
test: neon-event-triggers
|
||||
|
||||
10
test_runner/sql_regress/sql/neon-spgist.sql
Normal file
10
test_runner/sql_regress/sql/neon-spgist.sql
Normal file
@@ -0,0 +1,10 @@
|
||||
-- Test unlogged build of SPGIST index (no "Page evicted with zero LSN" error)
|
||||
create table spgist_point_tbl(id int4, p point);
|
||||
create index spgist_point_idx on spgist_point_tbl using spgist(p) with (fillfactor = 25);
|
||||
insert into spgist_point_tbl (id, p) select g, point(g*10, g*10) from generate_series(1, 10000) g;
|
||||
insert into spgist_point_tbl (id, p) select g, point(g*10, g*10) from generate_series(1, 10000) g;
|
||||
insert into spgist_point_tbl (id, p) select g+100000, point(g*10+1, g*10+1) from generate_series(1, 10000) g;
|
||||
|
||||
vacuum spgist_point_tbl;
|
||||
insert into spgist_point_tbl (id, p) select g+100000, point(g*10+1, g*10+1) from generate_series(1, 10000) g;
|
||||
checkpoint;
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 4cacada8bd...c9f9fdd011
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: e5ee23d998...aaaeff2550
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: ad2b69b582...9b9cb4b3e3
2
vendor/postgres-v17
vendored
2
vendor/postgres-v17
vendored
Submodule vendor/postgres-v17 updated: ba750903a9...fa1788475e
8
vendor/revisions.json
vendored
8
vendor/revisions.json
vendored
@@ -1,18 +1,18 @@
|
||||
{
|
||||
"v17": [
|
||||
"17.5",
|
||||
"ba750903a90dded8098f2f56d0b2a9012e6166af"
|
||||
"fa1788475e3146cc9c7c6a1b74f48fd296898fcd"
|
||||
],
|
||||
"v16": [
|
||||
"16.9",
|
||||
"ad2b69b58230290fc44c08fbe0c97981c64f6c7d"
|
||||
"9b9cb4b3e33347aea8f61e606bb6569979516de5"
|
||||
],
|
||||
"v15": [
|
||||
"15.13",
|
||||
"e5ee23d99874ea9f5b62f8acc7d076162ae95d6c"
|
||||
"aaaeff2550d5deba58847f112af9b98fa3a58b00"
|
||||
],
|
||||
"v14": [
|
||||
"14.18",
|
||||
"4cacada8bde7f6424751a0727a657783c6a1d20b"
|
||||
"c9f9fdd0113b52c0bd535afdb09d3a543aeee25f"
|
||||
]
|
||||
}
|
||||
|
||||
@@ -74,7 +74,7 @@ once_cell = { version = "1" }
|
||||
p256 = { version = "0.13", features = ["jwk"] }
|
||||
parquet = { version = "53", default-features = false, features = ["zstd"] }
|
||||
prost = { version = "0.13", features = ["no-recursion-limit", "prost-derive"] }
|
||||
rand = { version = "0.8", features = ["small_rng"] }
|
||||
rand = { version = "0.9" }
|
||||
regex = { version = "1" }
|
||||
regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] }
|
||||
regex-syntax = { version = "0.8" }
|
||||
@@ -93,6 +93,7 @@ spki = { version = "0.7", default-features = false, features = ["pem", "std"] }
|
||||
stable_deref_trait = { version = "1" }
|
||||
subtle = { version = "2" }
|
||||
sync_wrapper = { version = "0.1", default-features = false, features = ["futures"] }
|
||||
thiserror = { version = "2" }
|
||||
tikv-jemalloc-ctl = { version = "0.6", features = ["stats", "use_std"] }
|
||||
tikv-jemalloc-sys = { version = "0.6", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] }
|
||||
time = { version = "0.3", features = ["macros", "serde-well-known"] }
|
||||
@@ -101,6 +102,7 @@ tokio-rustls = { version = "0.26", default-features = false, features = ["loggin
|
||||
tokio-stream = { version = "0.1", features = ["net", "sync"] }
|
||||
tokio-util = { version = "0.7", features = ["codec", "compat", "io-util", "rt"] }
|
||||
toml_edit = { version = "0.22", features = ["serde"] }
|
||||
tonic = { version = "0.13", default-features = false, features = ["codegen", "gzip", "prost", "router", "server", "tls-native-roots", "tls-ring", "zstd"] }
|
||||
tower = { version = "0.5", default-features = false, features = ["balance", "buffer", "limit", "log"] }
|
||||
tracing = { version = "0.1", features = ["log"] }
|
||||
tracing-core = { version = "0.1" }
|
||||
|
||||
Reference in New Issue
Block a user