mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-23 16:10:37 +00:00
Compare commits
5 Commits
problame/b
...
problame/f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cabf452fa7 | ||
|
|
7c9f4c270e | ||
|
|
2404106586 | ||
|
|
b45c1b5965 | ||
|
|
82e97e0c59 |
188
Cargo.lock
generated
188
Cargo.lock
generated
@@ -10,9 +10,9 @@ checksum = "8b5ace29ee3216de37c0546865ad08edef58b0f9e76838ed8959a84a990e58c5"
|
||||
|
||||
[[package]]
|
||||
name = "addr2line"
|
||||
version = "0.21.0"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb"
|
||||
checksum = "a76fd60b23679b7d19bd066031410fb7e458ccc5e958eb5c325888ce4baedc97"
|
||||
dependencies = [
|
||||
"gimli",
|
||||
]
|
||||
@@ -286,7 +286,6 @@ dependencies = [
|
||||
"pageserver_client",
|
||||
"postgres_backend",
|
||||
"postgres_connection",
|
||||
"scopeguard",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"thiserror",
|
||||
@@ -841,15 +840,15 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "backtrace"
|
||||
version = "0.3.69"
|
||||
version = "0.3.67"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837"
|
||||
checksum = "233d376d6d185f2a3093e58f283f60f880315b6c60075b01f36b3b85154564ca"
|
||||
dependencies = [
|
||||
"addr2line",
|
||||
"cc",
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"miniz_oxide",
|
||||
"miniz_oxide 0.6.2",
|
||||
"object",
|
||||
"rustc-demangle",
|
||||
]
|
||||
@@ -1216,7 +1215,7 @@ dependencies = [
|
||||
"flate2",
|
||||
"futures",
|
||||
"hyper",
|
||||
"nix 0.27.1",
|
||||
"nix 0.26.2",
|
||||
"notify",
|
||||
"num_cpus",
|
||||
"opentelemetry",
|
||||
@@ -1332,7 +1331,7 @@ dependencies = [
|
||||
"git-version",
|
||||
"hex",
|
||||
"hyper",
|
||||
"nix 0.27.1",
|
||||
"nix 0.26.2",
|
||||
"once_cell",
|
||||
"pageserver_api",
|
||||
"pageserver_client",
|
||||
@@ -1873,13 +1872,13 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "filetime"
|
||||
version = "0.2.22"
|
||||
version = "0.2.21"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d4029edd3e734da6fe05b6cd7bd2960760a616bd2ddd0d59a0124746d6272af0"
|
||||
checksum = "5cbc844cecaee9d4443931972e1289c8ff485cb4cc2767cb03ca139ed6885153"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"redox_syscall 0.3.5",
|
||||
"redox_syscall 0.2.16",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
@@ -1896,7 +1895,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3b9429470923de8e8cbd4d2dc513535400b4b3fef0319fb5c4e1f520a7bef743"
|
||||
dependencies = [
|
||||
"crc32fast",
|
||||
"miniz_oxide",
|
||||
"miniz_oxide 0.7.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2094,9 +2093,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "gimli"
|
||||
version = "0.28.1"
|
||||
version = "0.27.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253"
|
||||
checksum = "ad0a93d233ebf96623465aad4046a8d3aa4da22d4f4beba5388838c8a434bbb4"
|
||||
|
||||
[[package]]
|
||||
name = "git-version"
|
||||
@@ -2749,18 +2748,18 @@ checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167"
|
||||
|
||||
[[package]]
|
||||
name = "memoffset"
|
||||
version = "0.8.0"
|
||||
version = "0.7.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d61c719bcfbcf5d62b3a09efa6088de8c54bc0bfcd3ea7ae39fcc186108b8de1"
|
||||
checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "memoffset"
|
||||
version = "0.9.0"
|
||||
version = "0.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5a634b1c61a95585bd15607c6ab0c4e5b226e695ff2800ba0cdccddf208c406c"
|
||||
checksum = "d61c719bcfbcf5d62b3a09efa6088de8c54bc0bfcd3ea7ae39fcc186108b8de1"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
]
|
||||
@@ -2798,6 +2797,15 @@ version = "0.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
|
||||
|
||||
[[package]]
|
||||
name = "miniz_oxide"
|
||||
version = "0.6.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b275950c28b37e794e8c55d88aeb5e139d0ce23fdbbeda68f8d7174abdf9e8fa"
|
||||
dependencies = [
|
||||
"adler",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "miniz_oxide"
|
||||
version = "0.7.1"
|
||||
@@ -2857,14 +2865,16 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nix"
|
||||
version = "0.27.1"
|
||||
version = "0.26.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053"
|
||||
checksum = "bfdda3d196821d6af13126e40375cdf7da646a96114af134d5f417a9a1dc8e1a"
|
||||
dependencies = [
|
||||
"bitflags 2.4.1",
|
||||
"bitflags 1.3.2",
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"memoffset 0.9.0",
|
||||
"memoffset 0.7.1",
|
||||
"pin-utils",
|
||||
"static_assertions",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2879,21 +2889,20 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "notify"
|
||||
version = "6.1.1"
|
||||
version = "5.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d"
|
||||
checksum = "729f63e1ca555a43fe3efa4f3efdf4801c479da85b432242a7b726f353c88486"
|
||||
dependencies = [
|
||||
"bitflags 2.4.1",
|
||||
"bitflags 1.3.2",
|
||||
"crossbeam-channel",
|
||||
"filetime",
|
||||
"fsevent-sys",
|
||||
"inotify 0.9.6",
|
||||
"kqueue",
|
||||
"libc",
|
||||
"log",
|
||||
"mio",
|
||||
"walkdir",
|
||||
"windows-sys 0.48.0",
|
||||
"windows-sys 0.45.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3019,9 +3028,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "object"
|
||||
version = "0.32.2"
|
||||
version = "0.30.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441"
|
||||
checksum = "ea86265d3d3dcb6a27fc51bd29a4bf387fae9d2986b823079d4986af253eb439"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
@@ -3093,9 +3102,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry"
|
||||
version = "0.20.0"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9591d937bc0e6d2feb6f71a559540ab300ea49955229c347a517a28d27784c54"
|
||||
checksum = "5f4b8347cc26099d3aeee044065ecc3ae11469796b4d65d065a23a584ed92a6f"
|
||||
dependencies = [
|
||||
"opentelemetry_api",
|
||||
"opentelemetry_sdk",
|
||||
@@ -3103,9 +3112,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry-http"
|
||||
version = "0.9.0"
|
||||
version = "0.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c7594ec0e11d8e33faf03530a4c49af7064ebba81c1480e01be67d90b356508b"
|
||||
checksum = "a819b71d6530c4297b49b3cae2939ab3a8cc1b9f382826a1bc29dd0ca3864906"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
@@ -3116,56 +3125,54 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry-otlp"
|
||||
version = "0.13.0"
|
||||
version = "0.12.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7e5e5a5c4135864099f3faafbe939eb4d7f9b80ebf68a8448da961b32a7c1275"
|
||||
checksum = "8af72d59a4484654ea8eb183fea5ae4eb6a41d7ac3e3bae5f4d2a282a3a7d3ca"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"futures-core",
|
||||
"futures",
|
||||
"futures-util",
|
||||
"http",
|
||||
"opentelemetry",
|
||||
"opentelemetry-http",
|
||||
"opentelemetry-proto",
|
||||
"opentelemetry-semantic-conventions",
|
||||
"opentelemetry_api",
|
||||
"opentelemetry_sdk",
|
||||
"prost",
|
||||
"reqwest",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tonic",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry-proto"
|
||||
version = "0.3.0"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b1e3f814aa9f8c905d0ee4bde026afd3b2577a97c10e1699912e3e44f0c4cbeb"
|
||||
checksum = "045f8eea8c0fa19f7d48e7bc3128a39c2e5c533d5c61298c548dfefc1064474c"
|
||||
dependencies = [
|
||||
"opentelemetry_api",
|
||||
"opentelemetry_sdk",
|
||||
"futures",
|
||||
"futures-util",
|
||||
"opentelemetry",
|
||||
"prost",
|
||||
"tonic",
|
||||
"tonic 0.8.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry-semantic-conventions"
|
||||
version = "0.12.0"
|
||||
version = "0.11.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "73c9f9340ad135068800e7f1b24e9e09ed9e7143f5bf8518ded3d3ec69789269"
|
||||
checksum = "24e33428e6bf08c6f7fcea4ddb8e358fab0fe48ab877a87c70c6ebe20f673ce5"
|
||||
dependencies = [
|
||||
"opentelemetry",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry_api"
|
||||
version = "0.20.0"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8a81f725323db1b1206ca3da8bb19874bbd3f57c3bcd59471bfb04525b265b9b"
|
||||
checksum = "ed41783a5bf567688eb38372f2b7a8530f5a607a4b49d38dd7573236c23ca7e2"
|
||||
dependencies = [
|
||||
"fnv",
|
||||
"futures-channel",
|
||||
"futures-util",
|
||||
"indexmap 1.9.3",
|
||||
"js-sys",
|
||||
"once_cell",
|
||||
"pin-project-lite",
|
||||
"thiserror",
|
||||
@@ -3174,22 +3181,21 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry_sdk"
|
||||
version = "0.20.0"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fa8e705a0612d48139799fcbaba0d4a90f06277153e43dd2bdc16c6f0edd8026"
|
||||
checksum = "8b3a2a91fdbfdd4d212c0dcc2ab540de2c2bcbbd90be17de7a7daf8822d010c1"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"crossbeam-channel",
|
||||
"dashmap",
|
||||
"fnv",
|
||||
"futures-channel",
|
||||
"futures-executor",
|
||||
"futures-util",
|
||||
"once_cell",
|
||||
"opentelemetry_api",
|
||||
"ordered-float 3.9.2",
|
||||
"percent-encoding",
|
||||
"rand 0.8.5",
|
||||
"regex",
|
||||
"serde_json",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
@@ -3204,15 +3210,6 @@ dependencies = [
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ordered-float"
|
||||
version = "3.9.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f1e1c390732d15f1d48471625cd92d154e66db2c56645e29a9cd26f4699f72dc"
|
||||
dependencies = [
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ordered-multimap"
|
||||
version = "0.7.1"
|
||||
@@ -3328,7 +3325,7 @@ dependencies = [
|
||||
"itertools",
|
||||
"md5",
|
||||
"metrics",
|
||||
"nix 0.27.1",
|
||||
"nix 0.26.2",
|
||||
"num-traits",
|
||||
"num_cpus",
|
||||
"once_cell",
|
||||
@@ -4342,9 +4339,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "reqwest-tracing"
|
||||
version = "0.4.7"
|
||||
version = "0.4.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5a0152176687dd5cfe7f507ac1cb1a491c679cfe483afd133a7db7aaea818bb3"
|
||||
checksum = "1b97ad83c2fc18113346b7158d79732242002427c30f620fa817c1f32901e0a8"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
@@ -5198,7 +5195,7 @@ dependencies = [
|
||||
"prost",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tonic",
|
||||
"tonic 0.9.2",
|
||||
"tonic-build",
|
||||
"tracing",
|
||||
"utils",
|
||||
@@ -5418,7 +5415,7 @@ checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09"
|
||||
dependencies = [
|
||||
"byteorder",
|
||||
"integer-encoding",
|
||||
"ordered-float 2.10.1",
|
||||
"ordered-float",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -5684,6 +5681,38 @@ dependencies = [
|
||||
"winnow",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tonic"
|
||||
version = "0.8.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
"axum",
|
||||
"base64 0.13.1",
|
||||
"bytes",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"h2",
|
||||
"http",
|
||||
"http-body",
|
||||
"hyper",
|
||||
"hyper-timeout",
|
||||
"percent-encoding",
|
||||
"pin-project",
|
||||
"prost",
|
||||
"prost-derive",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"tower",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
"tracing-futures",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tonic"
|
||||
version = "0.9.2"
|
||||
@@ -5827,6 +5856,16 @@ dependencies = [
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-futures"
|
||||
version = "0.2.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2"
|
||||
dependencies = [
|
||||
"pin-project",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-log"
|
||||
version = "0.1.3"
|
||||
@@ -5840,9 +5879,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tracing-opentelemetry"
|
||||
version = "0.20.0"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fc09e402904a5261e42cf27aea09ccb7d5318c6717a9eec3d8e2e65c56b18f19"
|
||||
checksum = "00a39dcf9bfc1742fa4d6215253b33a6e474be78275884c216fc2a06267b3600"
|
||||
dependencies = [
|
||||
"once_cell",
|
||||
"opentelemetry",
|
||||
@@ -6079,7 +6118,7 @@ dependencies = [
|
||||
"hyper",
|
||||
"jsonwebtoken",
|
||||
"metrics",
|
||||
"nix 0.27.1",
|
||||
"nix 0.26.2",
|
||||
"once_cell",
|
||||
"pin-project-lite",
|
||||
"postgres_connection",
|
||||
@@ -6587,8 +6626,10 @@ dependencies = [
|
||||
"clap",
|
||||
"clap_builder",
|
||||
"crossbeam-utils",
|
||||
"dashmap",
|
||||
"either",
|
||||
"fail",
|
||||
"futures",
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"futures-executor",
|
||||
@@ -6633,7 +6674,6 @@ dependencies = [
|
||||
"tokio-util",
|
||||
"toml_datetime",
|
||||
"toml_edit",
|
||||
"tonic",
|
||||
"tower",
|
||||
"tracing",
|
||||
"tracing-core",
|
||||
|
||||
14
Cargo.toml
14
Cargo.toml
@@ -99,14 +99,14 @@ libc = "0.2"
|
||||
md5 = "0.7.0"
|
||||
memoffset = "0.8"
|
||||
native-tls = "0.2"
|
||||
nix = { version = "0.27", features = ["fs", "process", "socket", "signal", "poll"] }
|
||||
notify = "6.0.0"
|
||||
nix = "0.26"
|
||||
notify = "5.0.0"
|
||||
num_cpus = "1.15"
|
||||
num-traits = "0.2.15"
|
||||
once_cell = "1.13"
|
||||
opentelemetry = "0.20.0"
|
||||
opentelemetry-otlp = { version = "0.13.0", default_features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
|
||||
opentelemetry-semantic-conventions = "0.12.0"
|
||||
opentelemetry = "0.19.0"
|
||||
opentelemetry-otlp = { version = "0.12.0", default_features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
|
||||
opentelemetry-semantic-conventions = "0.11.0"
|
||||
parking_lot = "0.12"
|
||||
parquet = { version = "49.0.0", default-features = false, features = ["zstd"] }
|
||||
parquet_derive = "49.0.0"
|
||||
@@ -118,7 +118,7 @@ rand = "0.8"
|
||||
redis = { version = "0.24.0", features = ["tokio-rustls-comp", "keep-alive"] }
|
||||
regex = "1.10.2"
|
||||
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"] }
|
||||
reqwest-tracing = { version = "0.4.7", features = ["opentelemetry_0_20"] }
|
||||
reqwest-tracing = { version = "0.4.0", features = ["opentelemetry_0_19"] }
|
||||
reqwest-middleware = "0.2.0"
|
||||
reqwest-retry = "0.2.2"
|
||||
routerify = "3"
|
||||
@@ -162,7 +162,7 @@ toml_edit = "0.19"
|
||||
tonic = {version = "0.9", features = ["tls", "tls-roots"]}
|
||||
tracing = "0.1"
|
||||
tracing-error = "0.2.0"
|
||||
tracing-opentelemetry = "0.20.0"
|
||||
tracing-opentelemetry = "0.19.0"
|
||||
tracing-subscriber = { version = "0.3", default_features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json"] }
|
||||
url = "2.2"
|
||||
uuid = { version = "1.6.1", features = ["v4", "v7", "serde"] }
|
||||
|
||||
@@ -14,7 +14,6 @@ hyper.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
pageserver_client.workspace = true
|
||||
postgres_connection.workspace = true
|
||||
scopeguard.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
thiserror.workspace = true
|
||||
|
||||
@@ -66,7 +66,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
jwt_token: args.jwt_token,
|
||||
};
|
||||
|
||||
let persistence = Arc::new(Persistence::spawn(&args.path).await);
|
||||
let persistence = Arc::new(Persistence::new(&args.path).await);
|
||||
|
||||
let service = Service::spawn(config, persistence).await?;
|
||||
|
||||
|
||||
@@ -11,7 +11,6 @@ use pageserver_api::{
|
||||
};
|
||||
use postgres_connection::parse_host_port;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::info;
|
||||
use utils::{
|
||||
generation::Generation,
|
||||
id::{NodeId, TenantId},
|
||||
@@ -21,28 +20,46 @@ use crate::{node::Node, PlacementPolicy};
|
||||
|
||||
/// Placeholder for storage. This will be replaced with a database client.
|
||||
pub struct Persistence {
|
||||
inner: std::sync::Mutex<Inner>,
|
||||
}
|
||||
|
||||
struct Inner {
|
||||
state: PersistentState,
|
||||
write_queue_tx: tokio::sync::mpsc::UnboundedSender<PendingWrite>,
|
||||
state: std::sync::Mutex<PersistentState>,
|
||||
}
|
||||
|
||||
// Top level state available to all HTTP handlers
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct PersistentState {
|
||||
tenants: HashMap<TenantShardId, TenantShardPersistence>,
|
||||
|
||||
#[serde(skip)]
|
||||
path: Utf8PathBuf,
|
||||
}
|
||||
|
||||
/// A convenience for serializing the state inside a sync lock, and then
|
||||
/// writing it to disk outside of the lock. This will go away when switching
|
||||
/// to a database backend.
|
||||
struct PendingWrite {
|
||||
bytes: Vec<u8>,
|
||||
done_tx: tokio::sync::oneshot::Sender<()>,
|
||||
path: Utf8PathBuf,
|
||||
}
|
||||
|
||||
impl PendingWrite {
|
||||
async fn commit(&self) -> anyhow::Result<()> {
|
||||
tokio::fs::write(&self.path, &self.bytes).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl PersistentState {
|
||||
fn save(&self) -> PendingWrite {
|
||||
PendingWrite {
|
||||
bytes: serde_json::to_vec(self).expect("Serialization error"),
|
||||
path: self.path.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn load(path: &Utf8Path) -> anyhow::Result<Self> {
|
||||
let bytes = tokio::fs::read(path).await?;
|
||||
let mut decoded = serde_json::from_slice::<Self>(&bytes)?;
|
||||
decoded.path = path.to_owned();
|
||||
|
||||
for (tenant_id, tenant) in &mut decoded.tenants {
|
||||
// Backward compat: an old attachments.json from before PR #6251, replace
|
||||
@@ -71,6 +88,7 @@ impl PersistentState {
|
||||
tracing::info!("Will create state file at {}", path);
|
||||
Self {
|
||||
tenants: HashMap::new(),
|
||||
path: path.to_owned(),
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -81,74 +99,13 @@ impl PersistentState {
|
||||
}
|
||||
|
||||
impl Persistence {
|
||||
pub async fn spawn(path: &Utf8Path) -> Self {
|
||||
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
pub async fn new(path: &Utf8Path) -> Self {
|
||||
let state = PersistentState::load_or_new(path).await;
|
||||
tokio::spawn(Self::writer_task(rx, path.to_owned()));
|
||||
Self {
|
||||
inner: std::sync::Mutex::new(Inner {
|
||||
state,
|
||||
write_queue_tx: tx,
|
||||
}),
|
||||
state: std::sync::Mutex::new(state),
|
||||
}
|
||||
}
|
||||
|
||||
async fn writer_task(
|
||||
mut rx: tokio::sync::mpsc::UnboundedReceiver<PendingWrite>,
|
||||
path: Utf8PathBuf,
|
||||
) {
|
||||
scopeguard::defer! {
|
||||
info!("persistence writer task exiting");
|
||||
};
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Some(write) => {
|
||||
tokio::task::spawn_blocking({
|
||||
let path = path.clone();
|
||||
move || {
|
||||
let tmp_path =
|
||||
utils::crashsafe::path_with_suffix_extension(&path, "___new");
|
||||
utils::crashsafe::overwrite(&path, &tmp_path, &write.bytes)
|
||||
}
|
||||
})
|
||||
.await
|
||||
.expect("spawn_blocking")
|
||||
.expect("write file");
|
||||
let _ = write.done_tx.send(()); // receiver may lose interest any time
|
||||
}
|
||||
None => {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Perform a modification on our [`PersistentState`].
|
||||
/// Return a future that completes once our modification has been persisted.
|
||||
/// The output of the future is the return value of the `txn`` closure.
|
||||
async fn mutating_transaction<F, R>(&self, txn: F) -> R
|
||||
where
|
||||
F: FnOnce(&mut PersistentState) -> R,
|
||||
{
|
||||
let (ret, done_rx) = {
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
let ret = txn(&mut inner.state);
|
||||
let (done_tx, done_rx) = tokio::sync::oneshot::channel();
|
||||
let write = PendingWrite {
|
||||
bytes: serde_json::to_vec(&inner.state).expect("Serialization error"),
|
||||
done_tx,
|
||||
};
|
||||
inner
|
||||
.write_queue_tx
|
||||
.send(write)
|
||||
.expect("writer task always outlives self");
|
||||
(ret, done_rx)
|
||||
};
|
||||
// the write task can go away once we start .await'ing
|
||||
let _: () = done_rx.await.expect("writer task dead, check logs");
|
||||
ret
|
||||
}
|
||||
|
||||
/// When registering a node, persist it so that on next start we will be able to
|
||||
/// iterate over known nodes to synchronize their tenant shard states with our observed state.
|
||||
pub(crate) async fn insert_node(&self, _node: &Node) -> anyhow::Result<()> {
|
||||
@@ -192,8 +149,8 @@ impl Persistence {
|
||||
|
||||
/// At startup, we populate our map of tenant shards from persistent storage.
|
||||
pub(crate) async fn list_tenant_shards(&self) -> anyhow::Result<Vec<TenantShardPersistence>> {
|
||||
let inner = self.inner.lock().unwrap();
|
||||
Ok(inner.state.tenants.values().cloned().collect())
|
||||
let locked = self.state.lock().unwrap();
|
||||
Ok(locked.tenants.values().cloned().collect())
|
||||
}
|
||||
|
||||
/// Tenants must be persisted before we schedule them for the first time. This enables us
|
||||
@@ -202,7 +159,8 @@ impl Persistence {
|
||||
&self,
|
||||
shards: Vec<TenantShardPersistence>,
|
||||
) -> anyhow::Result<()> {
|
||||
self.mutating_transaction(|locked| {
|
||||
let write = {
|
||||
let mut locked = self.state.lock().unwrap();
|
||||
for shard in shards {
|
||||
let tenant_shard_id = TenantShardId {
|
||||
tenant_id: TenantId::from_str(shard.tenant_id.as_str())?,
|
||||
@@ -212,9 +170,12 @@ impl Persistence {
|
||||
|
||||
locked.tenants.insert(tenant_shard_id, shard);
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
locked.save()
|
||||
};
|
||||
|
||||
write.commit().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Reconciler calls this immediately before attaching to a new pageserver, to acquire a unique, monotonically
|
||||
@@ -225,7 +186,8 @@ impl Persistence {
|
||||
tenant_shard_id: TenantShardId,
|
||||
node_id: NodeId,
|
||||
) -> anyhow::Result<Generation> {
|
||||
self.mutating_transaction(|locked| {
|
||||
let (write, gen) = {
|
||||
let mut locked = self.state.lock().unwrap();
|
||||
let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) else {
|
||||
anyhow::bail!("Tried to increment generation of unknown shard");
|
||||
};
|
||||
@@ -234,34 +196,45 @@ impl Persistence {
|
||||
shard.generation_pageserver = Some(node_id);
|
||||
|
||||
let gen = Generation::new(shard.generation);
|
||||
Ok(gen)
|
||||
})
|
||||
.await
|
||||
(locked.save(), gen)
|
||||
};
|
||||
|
||||
write.commit().await?;
|
||||
Ok(gen)
|
||||
}
|
||||
|
||||
pub(crate) async fn detach(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
) -> Option<TenantShardPersistence> {
|
||||
self.mutating_transaction(|locked| locked.tenants.remove(&tenant_shard_id))
|
||||
.await
|
||||
pub(crate) async fn detach(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
|
||||
let write = {
|
||||
let mut locked = self.state.lock().unwrap();
|
||||
let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) else {
|
||||
anyhow::bail!("Tried to increment generation of unknown shard");
|
||||
};
|
||||
shard.generation_pageserver = None;
|
||||
locked.save()
|
||||
};
|
||||
write.commit().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn re_attach(
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
) -> anyhow::Result<HashMap<TenantShardId, Generation>> {
|
||||
self.mutating_transaction(|locked| {
|
||||
let (write, result) = {
|
||||
let mut result = HashMap::new();
|
||||
let mut locked = self.state.lock().unwrap();
|
||||
for (tenant_shard_id, shard) in locked.tenants.iter_mut() {
|
||||
if shard.generation_pageserver == Some(node_id) {
|
||||
shard.generation += 1;
|
||||
result.insert(*tenant_shard_id, Generation::new(shard.generation));
|
||||
}
|
||||
}
|
||||
Ok(result)
|
||||
})
|
||||
.await
|
||||
|
||||
(locked.save(), result)
|
||||
};
|
||||
|
||||
write.commit().await?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
// TODO: when we start shard splitting, we must durably mark the tenant so that
|
||||
|
||||
@@ -389,7 +389,7 @@ impl Reconciler {
|
||||
// Nothing to do
|
||||
tracing::info!("Observed configuration already correct.")
|
||||
}
|
||||
observed_conf => {
|
||||
_ => {
|
||||
// In all cases other than a matching observed configuration, we will
|
||||
// reconcile this location. This includes locations with different configurations, as well
|
||||
// as locations with unknown (None) observed state.
|
||||
@@ -399,7 +399,6 @@ impl Reconciler {
|
||||
.await?;
|
||||
wanted_conf.generation = self.generation.into();
|
||||
tracing::info!("Observed configuration requires update.");
|
||||
tracing::debug!(?wanted_conf, ?observed_conf, "observed configuration");
|
||||
self.location_config(node_id, wanted_conf, None).await?;
|
||||
if let Err(e) = self
|
||||
.compute_hook
|
||||
|
||||
@@ -329,110 +329,94 @@ impl Service {
|
||||
&self,
|
||||
attach_req: AttachHookRequest,
|
||||
) -> anyhow::Result<AttachHookResponse> {
|
||||
#[derive(Debug)]
|
||||
enum Mode {
|
||||
Insert { new: bool, node_id: NodeId },
|
||||
Detach,
|
||||
}
|
||||
|
||||
// This is a test hook. To enable using it on tenants that were created directly with
|
||||
// the pageserver API (not via this service), we will auto-create any missing tenant
|
||||
// shards with default state.
|
||||
let tenant_shard_id = attach_req.tenant_shard_id;
|
||||
let mode = {
|
||||
let insert = {
|
||||
let locked = self.inner.write().unwrap();
|
||||
if let Some(node_id) = attach_req.node_id {
|
||||
Mode::Insert {
|
||||
new: !locked.tenants.contains_key(&attach_req.tenant_shard_id),
|
||||
node_id,
|
||||
}
|
||||
} else {
|
||||
Mode::Detach
|
||||
}
|
||||
!locked.tenants.contains_key(&attach_req.tenant_shard_id)
|
||||
};
|
||||
|
||||
tracing::info!(?mode, "attach-hook start");
|
||||
match mode {
|
||||
Mode::Insert { new, node_id } => {
|
||||
if new {
|
||||
let tsp = TenantShardPersistence {
|
||||
tenant_id: tenant_shard_id.tenant_id.to_string(),
|
||||
shard_number: tenant_shard_id.shard_number.0 as i32,
|
||||
shard_count: tenant_shard_id.shard_count.0 as i32,
|
||||
shard_stripe_size: 0,
|
||||
generation: 0,
|
||||
generation_pageserver: None,
|
||||
placement_policy: serde_json::to_string(&PlacementPolicy::default())
|
||||
.unwrap(),
|
||||
config: serde_json::to_string(&TenantConfig::default()).unwrap(),
|
||||
};
|
||||
if insert {
|
||||
let tsp = TenantShardPersistence {
|
||||
tenant_id: attach_req.tenant_shard_id.tenant_id.to_string(),
|
||||
shard_number: attach_req.tenant_shard_id.shard_number.0 as i32,
|
||||
shard_count: attach_req.tenant_shard_id.shard_count.0 as i32,
|
||||
shard_stripe_size: 0,
|
||||
generation: 0,
|
||||
generation_pageserver: None,
|
||||
placement_policy: serde_json::to_string(&PlacementPolicy::default()).unwrap(),
|
||||
config: serde_json::to_string(&TenantConfig::default()).unwrap(),
|
||||
};
|
||||
|
||||
self.persistence.insert_tenant_shards(vec![tsp]).await?;
|
||||
self.persistence.insert_tenant_shards(vec![tsp]).await?;
|
||||
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
locked.tenants.insert(
|
||||
tenant_shard_id,
|
||||
TenantState::new(
|
||||
tenant_shard_id,
|
||||
ShardIdentity::unsharded(),
|
||||
PlacementPolicy::Single,
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
let new_generation = self
|
||||
.persistence
|
||||
.increment_generation(tenant_shard_id, node_id)
|
||||
.await?;
|
||||
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let tenant_state = locked
|
||||
.tenants
|
||||
.get_mut(&tenant_shard_id)
|
||||
.expect("Checked for existence above");
|
||||
tenant_state.generation = new_generation;
|
||||
tenant_state.intent.attached = Some(node_id);
|
||||
|
||||
tracing::info!(
|
||||
"attach_hook: tenant {} set generation {:?}, pageserver {}",
|
||||
tenant_shard_id,
|
||||
tenant_state.generation,
|
||||
node_id,
|
||||
);
|
||||
|
||||
Ok(AttachHookResponse {
|
||||
gen: tenant_state.generation.into(),
|
||||
})
|
||||
}
|
||||
Mode::Detach => {
|
||||
let res = { self.persistence.detach(tenant_shard_id).await };
|
||||
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let tenant_state = locked.tenants.remove(&tenant_shard_id);
|
||||
match res {
|
||||
Some(detached) => {
|
||||
tracing::info!(
|
||||
tenant_id = %tenant_shard_id,
|
||||
ps_id = ?detached.generation_pageserver,
|
||||
generation = ?detached.generation,
|
||||
"dropping",
|
||||
);
|
||||
assert!(tenant_state.is_some(), "persistence state said it existed");
|
||||
}
|
||||
None => {
|
||||
tracing::info!(
|
||||
tenant_id = %tenant_shard_id,
|
||||
"no-op: tenant already has no pageserver");
|
||||
assert!(
|
||||
tenant_state.is_none(),
|
||||
"persistence state said it already doesn't exist"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(AttachHookResponse { gen: None })
|
||||
}
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
locked.tenants.insert(
|
||||
attach_req.tenant_shard_id,
|
||||
TenantState::new(
|
||||
attach_req.tenant_shard_id,
|
||||
ShardIdentity::unsharded(),
|
||||
PlacementPolicy::Single,
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
let new_generation = if let Some(req_node_id) = attach_req.node_id {
|
||||
Some(
|
||||
self.persistence
|
||||
.increment_generation(attach_req.tenant_shard_id, req_node_id)
|
||||
.await?,
|
||||
)
|
||||
} else {
|
||||
self.persistence.detach(attach_req.tenant_shard_id).await?;
|
||||
None
|
||||
};
|
||||
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let tenant_state = locked
|
||||
.tenants
|
||||
.get_mut(&attach_req.tenant_shard_id)
|
||||
.expect("Checked for existence above");
|
||||
|
||||
if let Some(new_generation) = new_generation {
|
||||
tenant_state.generation = new_generation;
|
||||
}
|
||||
|
||||
if let Some(attaching_pageserver) = attach_req.node_id.as_ref() {
|
||||
tracing::info!(
|
||||
tenant_id = %attach_req.tenant_shard_id,
|
||||
ps_id = %attaching_pageserver,
|
||||
generation = ?tenant_state.generation,
|
||||
"issuing",
|
||||
);
|
||||
} else if let Some(ps_id) = tenant_state.intent.attached {
|
||||
tracing::info!(
|
||||
tenant_id = %attach_req.tenant_shard_id,
|
||||
%ps_id,
|
||||
generation = ?tenant_state.generation,
|
||||
"dropping",
|
||||
);
|
||||
} else {
|
||||
tracing::info!(
|
||||
tenant_id = %attach_req.tenant_shard_id,
|
||||
"no-op: tenant already has no pageserver");
|
||||
}
|
||||
tenant_state.intent.attached = attach_req.node_id;
|
||||
|
||||
tracing::info!(
|
||||
"attach_hook: tenant {} set generation {:?}, pageserver {}",
|
||||
attach_req.tenant_shard_id,
|
||||
tenant_state.generation,
|
||||
// TODO: this is an odd number of 0xf's
|
||||
attach_req.node_id.unwrap_or(utils::id::NodeId(0xfffffff))
|
||||
);
|
||||
|
||||
Ok(AttachHookResponse {
|
||||
gen: attach_req
|
||||
.node_id
|
||||
.map(|_| tenant_state.generation.into().unwrap()),
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn inspect(&self, inspect_req: InspectRequest) -> InspectResponse {
|
||||
|
||||
@@ -79,7 +79,7 @@ pub(crate) struct IntentState {
|
||||
pub(crate) secondary: Vec<NodeId>,
|
||||
}
|
||||
|
||||
#[derive(Default, Clone, Debug)]
|
||||
#[derive(Default, Clone)]
|
||||
pub(crate) struct ObservedState {
|
||||
pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
|
||||
}
|
||||
@@ -93,7 +93,7 @@ pub(crate) struct ObservedState {
|
||||
/// what it is (e.g. we failed partway through configuring it)
|
||||
/// * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
|
||||
/// and that configuration will still be present unless something external interfered.
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct ObservedStateLocation {
|
||||
/// If None, it means we do not know the status of this shard's location on this node, but
|
||||
/// we know that we might have some state on this node.
|
||||
|
||||
@@ -104,7 +104,6 @@ pub struct KeySpaceAccum {
|
||||
accum: Option<Range<Key>>,
|
||||
|
||||
ranges: Vec<Range<Key>>,
|
||||
size: u64,
|
||||
}
|
||||
|
||||
impl KeySpaceAccum {
|
||||
@@ -112,7 +111,6 @@ impl KeySpaceAccum {
|
||||
Self {
|
||||
accum: None,
|
||||
ranges: Vec::new(),
|
||||
size: 0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -123,8 +121,6 @@ impl KeySpaceAccum {
|
||||
|
||||
#[inline(always)]
|
||||
pub fn add_range(&mut self, range: Range<Key>) {
|
||||
self.size += key_range_size(&range) as u64;
|
||||
|
||||
match self.accum.as_mut() {
|
||||
Some(accum) => {
|
||||
if range.start == accum.end {
|
||||
@@ -150,23 +146,6 @@ impl KeySpaceAccum {
|
||||
ranges: self.ranges,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn consume_keyspace(&mut self) -> KeySpace {
|
||||
if let Some(accum) = self.accum.take() {
|
||||
self.ranges.push(accum);
|
||||
}
|
||||
|
||||
let mut prev_accum = KeySpaceAccum::new();
|
||||
std::mem::swap(self, &mut prev_accum);
|
||||
|
||||
KeySpace {
|
||||
ranges: prev_accum.ranges,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn size(&self) -> u64 {
|
||||
self.size
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
@@ -275,30 +254,6 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn keyspace_consume() {
|
||||
let ranges = vec![kr(0..10), kr(20..35), kr(40..45)];
|
||||
|
||||
let mut accum = KeySpaceAccum::new();
|
||||
for range in &ranges {
|
||||
accum.add_range(range.clone());
|
||||
}
|
||||
|
||||
let expected_size: u64 = ranges.iter().map(|r| key_range_size(r) as u64).sum();
|
||||
assert_eq!(accum.size(), expected_size);
|
||||
|
||||
assert_ks_eq(&accum.consume_keyspace(), ranges.clone());
|
||||
assert_eq!(accum.size(), 0);
|
||||
|
||||
assert_ks_eq(&accum.consume_keyspace(), vec![]);
|
||||
assert_eq!(accum.size(), 0);
|
||||
|
||||
for range in &ranges {
|
||||
accum.add_range(range.clone());
|
||||
}
|
||||
assert_ks_eq(&accum.to_keyspace(), ranges);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn keyspace_add_range() {
|
||||
// two separate ranges
|
||||
|
||||
@@ -111,19 +111,7 @@ impl RelTag {
|
||||
/// These files are divided into segments, which are divided into
|
||||
/// pages of the same BLCKSZ as used for relation files.
|
||||
///
|
||||
#[derive(
|
||||
Debug,
|
||||
Clone,
|
||||
Copy,
|
||||
Hash,
|
||||
Serialize,
|
||||
Deserialize,
|
||||
PartialEq,
|
||||
Eq,
|
||||
PartialOrd,
|
||||
Ord,
|
||||
strum_macros::EnumIter,
|
||||
)]
|
||||
#[derive(Debug, Clone, Copy, Hash, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub enum SlruKind {
|
||||
Clog,
|
||||
MultiXactMembers,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
fs::{self, File},
|
||||
io::{self, Write},
|
||||
io,
|
||||
};
|
||||
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
@@ -112,48 +112,6 @@ pub async fn fsync_async(path: impl AsRef<Utf8Path>) -> Result<(), std::io::Erro
|
||||
tokio::fs::File::open(path.as_ref()).await?.sync_all().await
|
||||
}
|
||||
|
||||
/// Writes a file to the specified `final_path` in a crash safe fasion
|
||||
///
|
||||
/// The file is first written to the specified tmp_path, and in a second
|
||||
/// step, the tmp path is renamed to the final path. As renames are
|
||||
/// atomic, a crash during the write operation will never leave behind a
|
||||
/// partially written file.
|
||||
///
|
||||
/// NB: an async variant of this code exists in Pageserver's VirtualFile.
|
||||
pub fn overwrite(
|
||||
final_path: &Utf8Path,
|
||||
tmp_path: &Utf8Path,
|
||||
content: &[u8],
|
||||
) -> std::io::Result<()> {
|
||||
let Some(final_path_parent) = final_path.parent() else {
|
||||
return Err(std::io::Error::from_raw_os_error(
|
||||
nix::errno::Errno::EINVAL as i32,
|
||||
));
|
||||
};
|
||||
std::fs::remove_file(tmp_path).or_else(crate::fs_ext::ignore_not_found)?;
|
||||
let mut file = std::fs::OpenOptions::new()
|
||||
.write(true)
|
||||
// Use `create_new` so that, if we race with ourselves or something else,
|
||||
// we bail out instead of causing damage.
|
||||
.create_new(true)
|
||||
.open(tmp_path)?;
|
||||
file.write_all(content)?;
|
||||
file.sync_all()?;
|
||||
drop(file); // before the rename, that's important!
|
||||
// renames are atomic
|
||||
std::fs::rename(tmp_path, final_path)?;
|
||||
// Only open final path parent dirfd now, so that this operation only
|
||||
// ever holds one VirtualFile fd at a time. That's important because
|
||||
// the current `find_victim_slot` impl might pick the same slot for both
|
||||
// VirtualFile., and it eventually does a blocking write lock instead of
|
||||
// try_lock.
|
||||
let final_parent_dirfd = std::fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.open(final_path_parent)?;
|
||||
final_parent_dirfd.sync_all()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
|
||||
@@ -5,10 +5,10 @@ use std::os::unix::io::RawFd;
|
||||
pub fn set_nonblock(fd: RawFd) -> Result<(), std::io::Error> {
|
||||
let bits = fcntl(fd, F_GETFL)?;
|
||||
|
||||
// If F_GETFL returns some unknown bits, they should be valid
|
||||
// Safety: If F_GETFL returns some unknown bits, they should be valid
|
||||
// for passing back to F_SETFL, too. If we left them out, the F_SETFL
|
||||
// would effectively clear them, which is not what we want.
|
||||
let mut flags = OFlag::from_bits_retain(bits);
|
||||
let mut flags = unsafe { OFlag::from_bits_unchecked(bits) };
|
||||
flags |= OFlag::O_NONBLOCK;
|
||||
|
||||
fcntl(fd, F_SETFL(flags))?;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::{
|
||||
io,
|
||||
net::{TcpListener, ToSocketAddrs},
|
||||
os::unix::prelude::AsRawFd,
|
||||
};
|
||||
|
||||
use nix::sys::socket::{setsockopt, sockopt::ReuseAddr};
|
||||
@@ -9,7 +10,7 @@ use nix::sys::socket::{setsockopt, sockopt::ReuseAddr};
|
||||
pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> {
|
||||
let listener = TcpListener::bind(addr)?;
|
||||
|
||||
setsockopt(&listener, ReuseAddr, &true)?;
|
||||
setsockopt(listener.as_raw_fd(), ReuseAddr, &true)?;
|
||||
|
||||
Ok(listener)
|
||||
}
|
||||
|
||||
@@ -423,8 +423,8 @@ async fn client(
|
||||
tokio::select! {
|
||||
res = do_requests => { res },
|
||||
_ = cancel.cancelled() => {
|
||||
// fallthrough to shutdown
|
||||
client.shutdown().await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
client.shutdown().await;
|
||||
}
|
||||
|
||||
@@ -11,9 +11,8 @@
|
||||
//! from data stored in object storage.
|
||||
//!
|
||||
use anyhow::{anyhow, bail, ensure, Context};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use fail::fail_point;
|
||||
use pageserver_api::key::{key_to_slru_block, Key};
|
||||
use postgres_ffi::pg_constants;
|
||||
use std::fmt::Write as FmtWrite;
|
||||
use std::time::SystemTime;
|
||||
@@ -134,87 +133,6 @@ where
|
||||
ctx: &'a RequestContext,
|
||||
}
|
||||
|
||||
/// A sink that accepts SLRU blocks ordered by key and forwards
|
||||
/// full segments to the archive.
|
||||
struct SlruSegmentsBuilder<'a, 'b, W>
|
||||
where
|
||||
W: AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
ar: &'a mut Builder<&'b mut W>,
|
||||
buf: Vec<u8>,
|
||||
current_segment: Option<(SlruKind, u32)>,
|
||||
}
|
||||
|
||||
impl<'a, 'b, W> SlruSegmentsBuilder<'a, 'b, W>
|
||||
where
|
||||
W: AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
fn new(ar: &'a mut Builder<&'b mut W>) -> Self {
|
||||
Self {
|
||||
ar,
|
||||
buf: Vec::new(),
|
||||
current_segment: None,
|
||||
}
|
||||
}
|
||||
|
||||
async fn add_block(&mut self, key: &Key, block: Bytes) -> anyhow::Result<()> {
|
||||
let (kind, segno, _) = key_to_slru_block(*key)?;
|
||||
|
||||
match kind {
|
||||
SlruKind::Clog => {
|
||||
ensure!(block.len() == BLCKSZ as usize || block.len() == BLCKSZ as usize + 8);
|
||||
}
|
||||
SlruKind::MultiXactMembers | SlruKind::MultiXactOffsets => {
|
||||
ensure!(block.len() == BLCKSZ as usize);
|
||||
}
|
||||
}
|
||||
|
||||
let segment = (kind, segno);
|
||||
match self.current_segment {
|
||||
None => {
|
||||
self.current_segment = Some(segment);
|
||||
self.buf
|
||||
.extend_from_slice(block.slice(..BLCKSZ as usize).as_ref());
|
||||
}
|
||||
Some(current_seg) if current_seg == segment => {
|
||||
self.buf
|
||||
.extend_from_slice(block.slice(..BLCKSZ as usize).as_ref());
|
||||
}
|
||||
Some(_) => {
|
||||
self.flush().await?;
|
||||
|
||||
self.current_segment = Some(segment);
|
||||
self.buf
|
||||
.extend_from_slice(block.slice(..BLCKSZ as usize).as_ref());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn flush(&mut self) -> anyhow::Result<()> {
|
||||
let nblocks = self.buf.len() / BLCKSZ as usize;
|
||||
let (kind, segno) = self.current_segment.take().unwrap();
|
||||
let segname = format!("{}/{:>04X}", kind.to_str(), segno);
|
||||
let header = new_tar_header(&segname, self.buf.len() as u64)?;
|
||||
self.ar.append(&header, self.buf.as_slice()).await?;
|
||||
|
||||
trace!("Added to basebackup slru {} relsize {}", segname, nblocks);
|
||||
|
||||
self.buf.clear();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn finish(mut self) -> anyhow::Result<()> {
|
||||
if self.current_segment.is_none() || self.buf.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.flush().await
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, W> Basebackup<'a, W>
|
||||
where
|
||||
W: AsyncWrite + Send + Sync + Unpin,
|
||||
@@ -250,27 +168,20 @@ where
|
||||
}
|
||||
|
||||
// Gather non-relational files from object storage pages.
|
||||
let slru_partitions = self
|
||||
.timeline
|
||||
.get_slru_keyspace(Version::Lsn(self.lsn), self.ctx)
|
||||
.await?
|
||||
.partition(Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64);
|
||||
|
||||
let mut slru_builder = SlruSegmentsBuilder::new(&mut self.ar);
|
||||
|
||||
for part in slru_partitions.parts {
|
||||
let blocks = self
|
||||
for kind in [
|
||||
SlruKind::Clog,
|
||||
SlruKind::MultiXactOffsets,
|
||||
SlruKind::MultiXactMembers,
|
||||
] {
|
||||
for segno in self
|
||||
.timeline
|
||||
.get_vectored(&part.ranges, self.lsn, self.ctx)
|
||||
.await?;
|
||||
|
||||
for (key, block) in blocks {
|
||||
slru_builder.add_block(&key, block?).await?;
|
||||
.list_slru_segments(kind, Version::Lsn(self.lsn), self.ctx)
|
||||
.await?
|
||||
{
|
||||
self.add_slru_segment(kind, segno).await?;
|
||||
}
|
||||
}
|
||||
|
||||
slru_builder.finish().await?;
|
||||
|
||||
let mut min_restart_lsn: Lsn = Lsn::MAX;
|
||||
// Create tablespace directories
|
||||
for ((spcnode, dbnode), has_relmap_file) in
|
||||
@@ -394,6 +305,39 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
//
|
||||
// Generate SLRU segment files from repository.
|
||||
//
|
||||
async fn add_slru_segment(&mut self, slru: SlruKind, segno: u32) -> anyhow::Result<()> {
|
||||
let nblocks = self
|
||||
.timeline
|
||||
.get_slru_segment_size(slru, segno, Version::Lsn(self.lsn), self.ctx)
|
||||
.await?;
|
||||
|
||||
let mut slru_buf: Vec<u8> = Vec::with_capacity(nblocks as usize * BLCKSZ as usize);
|
||||
for blknum in 0..nblocks {
|
||||
let img = self
|
||||
.timeline
|
||||
.get_slru_page_at_lsn(slru, segno, blknum, self.lsn, self.ctx)
|
||||
.await?;
|
||||
|
||||
if slru == SlruKind::Clog {
|
||||
ensure!(img.len() == BLCKSZ as usize || img.len() == BLCKSZ as usize + 8);
|
||||
} else {
|
||||
ensure!(img.len() == BLCKSZ as usize);
|
||||
}
|
||||
|
||||
slru_buf.extend_from_slice(&img[..BLCKSZ as usize]);
|
||||
}
|
||||
|
||||
let segname = format!("{}/{:>04X}", slru.to_str(), segno);
|
||||
let header = new_tar_header(&segname, slru_buf.len() as u64)?;
|
||||
self.ar.append(&header, slru_buf.as_slice()).await?;
|
||||
|
||||
trace!("Added to basebackup slru {} relsize {}", segname, nblocks);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
//
|
||||
// Include database/tablespace directories.
|
||||
//
|
||||
|
||||
@@ -877,56 +877,6 @@ paths:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
/v1/tenant/{tenant_id}/{timeline_id}/preserve_initdb_archive:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
- name: timeline_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
post:
|
||||
description: |
|
||||
Marks the initdb archive for preservation upon deletion of the timeline or tenant.
|
||||
This is meant to be part of the disaster recovery process.
|
||||
responses:
|
||||
"202":
|
||||
description: Tenant scheduled to load successfully
|
||||
"404":
|
||||
description: No tenant or timeline found for the specified ids
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"401":
|
||||
description: Unauthorized Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/UnauthorizedError"
|
||||
"403":
|
||||
description: Forbidden Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ForbiddenError"
|
||||
"500":
|
||||
description: Generic operation error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
|
||||
/v1/tenant/{tenant_id}/synthetic_size:
|
||||
parameters:
|
||||
|
||||
@@ -561,43 +561,6 @@ async fn timeline_list_handler(
|
||||
json_response(StatusCode::OK, response_data)
|
||||
}
|
||||
|
||||
async fn timeline_preserve_initdb_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
// Part of the process for disaster recovery from safekeeper-stored WAL:
|
||||
// If we don't recover into a new timeline but want to keep the timeline ID,
|
||||
// then the initdb archive is deleted. This endpoint copies it to a different
|
||||
// location where timeline recreation cand find it.
|
||||
|
||||
async {
|
||||
let tenant = mgr::get_tenant(tenant_shard_id, true)?;
|
||||
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, false)
|
||||
.map_err(|e| ApiError::NotFound(e.into()))?;
|
||||
|
||||
timeline
|
||||
.preserve_initdb_archive()
|
||||
.await
|
||||
.context("preserving initdb archive")
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
Ok::<_, ApiError>(())
|
||||
}
|
||||
.instrument(info_span!("timeline_preserve_initdb_archive",
|
||||
tenant_id = %tenant_shard_id.tenant_id,
|
||||
shard_id = %tenant_shard_id.shard_slug(),
|
||||
%timeline_id))
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn timeline_detail_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
@@ -1980,10 +1943,6 @@ pub fn make_router(
|
||||
.post("/v1/tenant/:tenant_id/ignore", |r| {
|
||||
api_handler(r, tenant_ignore_handler)
|
||||
})
|
||||
.post(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/preserve_initdb_archive",
|
||||
|r| api_handler(r, timeline_preserve_initdb_handler),
|
||||
)
|
||||
.get("/v1/tenant/:tenant_shard_id/timeline/:timeline_id", |r| {
|
||||
api_handler(r, timeline_detail_handler)
|
||||
})
|
||||
|
||||
@@ -27,7 +27,6 @@ use serde::{Deserialize, Serialize};
|
||||
use std::collections::{hash_map, HashMap, HashSet};
|
||||
use std::ops::ControlFlow;
|
||||
use std::ops::Range;
|
||||
use strum::IntoEnumIterator;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, trace, warn};
|
||||
use utils::bin_ser::DeserializeError;
|
||||
@@ -534,33 +533,6 @@ impl Timeline {
|
||||
Ok(Default::default())
|
||||
}
|
||||
|
||||
pub(crate) async fn get_slru_keyspace(
|
||||
&self,
|
||||
version: Version<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<KeySpace, PageReconstructError> {
|
||||
let mut accum = KeySpaceAccum::new();
|
||||
|
||||
for kind in SlruKind::iter() {
|
||||
let mut segments: Vec<u32> = self
|
||||
.list_slru_segments(kind, version, ctx)
|
||||
.await?
|
||||
.into_iter()
|
||||
.collect();
|
||||
segments.sort_unstable();
|
||||
|
||||
for seg in segments {
|
||||
let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
|
||||
|
||||
accum.add_range(
|
||||
slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(accum.to_keyspace())
|
||||
}
|
||||
|
||||
/// Get a list of SLRU segments
|
||||
pub(crate) async fn list_slru_segments(
|
||||
&self,
|
||||
|
||||
@@ -1881,7 +1881,7 @@ impl Tenant {
|
||||
&self,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<(), timeline::CompactionError> {
|
||||
) -> Result<(), timeline::CompactionError> {
|
||||
// Don't start doing work during shutdown, or when broken, we do not need those in the logs
|
||||
if !self.is_active() {
|
||||
return Ok(());
|
||||
|
||||
@@ -283,15 +283,15 @@ impl LayerMap {
|
||||
///
|
||||
/// This is used for garbage collection, to determine if an old layer can
|
||||
/// be deleted.
|
||||
pub fn image_layer_exists(&self, key: &Range<Key>, lsn: &Range<Lsn>) -> bool {
|
||||
pub fn image_layer_exists(&self, key: &Range<Key>, lsn: &Range<Lsn>) -> Result<bool> {
|
||||
if key.is_empty() {
|
||||
// Vacuously true. There's a newer image for all 0 of the kerys in the range.
|
||||
return true;
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
|
||||
Some(v) => v,
|
||||
None => return false,
|
||||
None => return Ok(false),
|
||||
};
|
||||
|
||||
let start = key.start.to_i128();
|
||||
@@ -304,17 +304,17 @@ impl LayerMap {
|
||||
|
||||
// Check the start is covered
|
||||
if !layer_covers(version.image_coverage.query(start)) {
|
||||
return false;
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
// Check after all changes of coverage
|
||||
for (_, change_val) in version.image_coverage.range(start..end) {
|
||||
if !layer_covers(change_val) {
|
||||
return false;
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
|
||||
true
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<PersistentLayerDesc>> {
|
||||
@@ -325,14 +325,18 @@ impl LayerMap {
|
||||
/// Divide the whole given range of keys into sub-ranges based on the latest
|
||||
/// image layer that covers each range at the specified lsn (inclusive).
|
||||
/// This is used when creating new image layers.
|
||||
///
|
||||
// FIXME: clippy complains that the result type is very complex. She's probably
|
||||
// right...
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub fn image_coverage(
|
||||
&self,
|
||||
key_range: &Range<Key>,
|
||||
lsn: Lsn,
|
||||
) -> Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> {
|
||||
) -> Result<Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)>> {
|
||||
let version = match self.historic.get().unwrap().get_version(lsn.0) {
|
||||
Some(v) => v,
|
||||
None => return vec![],
|
||||
None => return Ok(vec![]),
|
||||
};
|
||||
|
||||
let start = key_range.start.to_i128();
|
||||
@@ -355,7 +359,7 @@ impl LayerMap {
|
||||
let kr = Key::from_i128(current_key)..Key::from_i128(end);
|
||||
coverage.push((kr, current_val.take()));
|
||||
|
||||
coverage
|
||||
Ok(coverage)
|
||||
}
|
||||
|
||||
pub fn is_l0(layer: &PersistentLayerDesc) -> bool {
|
||||
@@ -406,19 +410,24 @@ impl LayerMap {
|
||||
/// This number is used to compute the largest number of deltas that
|
||||
/// we'll need to visit for any page reconstruction in this region.
|
||||
/// We use this heuristic to decide whether to create an image layer.
|
||||
pub fn count_deltas(&self, key: &Range<Key>, lsn: &Range<Lsn>, limit: Option<usize>) -> usize {
|
||||
pub fn count_deltas(
|
||||
&self,
|
||||
key: &Range<Key>,
|
||||
lsn: &Range<Lsn>,
|
||||
limit: Option<usize>,
|
||||
) -> Result<usize> {
|
||||
// We get the delta coverage of the region, and for each part of the coverage
|
||||
// we recurse right underneath the delta. The recursion depth is limited by
|
||||
// the largest result this function could return, which is in practice between
|
||||
// 3 and 10 (since we usually try to create an image when the number gets larger).
|
||||
|
||||
if lsn.is_empty() || key.is_empty() || limit == Some(0) {
|
||||
return 0;
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
|
||||
Some(v) => v,
|
||||
None => return 0,
|
||||
None => return Ok(0),
|
||||
};
|
||||
|
||||
let start = key.start.to_i128();
|
||||
@@ -439,7 +448,8 @@ impl LayerMap {
|
||||
if !kr.is_empty() {
|
||||
let base_count = Self::is_reimage_worthy(&val, key) as usize;
|
||||
let new_limit = limit.map(|l| l - base_count);
|
||||
let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit);
|
||||
let max_stacked_deltas_underneath =
|
||||
self.count_deltas(&kr, &lr, new_limit)?;
|
||||
max_stacked_deltas = std::cmp::max(
|
||||
max_stacked_deltas,
|
||||
base_count + max_stacked_deltas_underneath,
|
||||
@@ -461,7 +471,7 @@ impl LayerMap {
|
||||
if !kr.is_empty() {
|
||||
let base_count = Self::is_reimage_worthy(&val, key) as usize;
|
||||
let new_limit = limit.map(|l| l - base_count);
|
||||
let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit);
|
||||
let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit)?;
|
||||
max_stacked_deltas = std::cmp::max(
|
||||
max_stacked_deltas,
|
||||
base_count + max_stacked_deltas_underneath,
|
||||
@@ -470,7 +480,7 @@ impl LayerMap {
|
||||
}
|
||||
}
|
||||
|
||||
max_stacked_deltas
|
||||
Ok(max_stacked_deltas)
|
||||
}
|
||||
|
||||
/// Count how many reimage-worthy layers we need to visit for given key-lsn pair.
|
||||
@@ -582,7 +592,10 @@ impl LayerMap {
|
||||
if limit == Some(difficulty) {
|
||||
break;
|
||||
}
|
||||
for (img_range, last_img) in self.image_coverage(range, lsn) {
|
||||
for (img_range, last_img) in self
|
||||
.image_coverage(range, lsn)
|
||||
.expect("why would this err?")
|
||||
{
|
||||
if limit == Some(difficulty) {
|
||||
break;
|
||||
}
|
||||
@@ -593,7 +606,9 @@ impl LayerMap {
|
||||
};
|
||||
|
||||
if img_lsn < lsn {
|
||||
let num_deltas = self.count_deltas(&img_range, &(img_lsn..lsn), limit);
|
||||
let num_deltas = self
|
||||
.count_deltas(&img_range, &(img_lsn..lsn), limit)
|
||||
.expect("why would this err lol?");
|
||||
difficulty = std::cmp::max(difficulty, num_deltas);
|
||||
}
|
||||
}
|
||||
@@ -604,8 +619,8 @@ impl LayerMap {
|
||||
}
|
||||
|
||||
/// Return all L0 delta layers
|
||||
pub fn get_level0_deltas(&self) -> Result<Vec<Arc<PersistentLayerDesc>>> {
|
||||
Ok(self.l0_delta_layers.to_vec())
|
||||
pub fn get_level0_deltas(&self) -> Vec<Arc<PersistentLayerDesc>> {
|
||||
self.l0_delta_layers.to_vec()
|
||||
}
|
||||
|
||||
/// debugging function to print out the contents of the layer map
|
||||
|
||||
@@ -237,7 +237,7 @@ use utils::id::{TenantId, TimelineId};
|
||||
use self::index::IndexPart;
|
||||
|
||||
use super::storage_layer::{Layer, LayerFileName, ResidentLayer};
|
||||
use super::upload_queue::SetDeletedFlagProgress;
|
||||
use super::upload_queue::{self, SetDeletedFlagProgress};
|
||||
use super::Generation;
|
||||
|
||||
pub(crate) use download::{is_temp_download_file, list_remote_timelines};
|
||||
@@ -257,8 +257,6 @@ pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
|
||||
|
||||
pub(crate) const INITDB_PATH: &str = "initdb.tar.zst";
|
||||
|
||||
pub(crate) const INITDB_PRESERVED_PATH: &str = "initdb-preserved.tar.zst";
|
||||
|
||||
/// Default buffer size when interfacing with [`tokio::fs::File`].
|
||||
pub(crate) const BUFFER_SIZE: usize = 32 * 1024;
|
||||
|
||||
@@ -623,7 +621,9 @@ impl RemoteTimelineClient {
|
||||
///
|
||||
/// Like schedule_index_upload_for_metadata_update(), this merely adds
|
||||
/// the upload to the upload queue and returns quickly.
|
||||
pub fn schedule_index_upload_for_file_changes(self: &Arc<Self>) -> anyhow::Result<()> {
|
||||
pub(crate) fn schedule_index_upload_for_file_changes(
|
||||
self: &Arc<Self>,
|
||||
) -> Result<(), upload_queue::NotInitialized> {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
@@ -668,7 +668,7 @@ impl RemoteTimelineClient {
|
||||
pub(crate) fn schedule_layer_file_upload(
|
||||
self: &Arc<Self>,
|
||||
layer: ResidentLayer,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), upload_queue::NotInitialized> {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
@@ -877,7 +877,7 @@ impl RemoteTimelineClient {
|
||||
self: &Arc<Self>,
|
||||
compacted_from: &[Layer],
|
||||
compacted_to: &[ResidentLayer],
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), upload_queue::NotInitialized> {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
@@ -1068,28 +1068,6 @@ impl RemoteTimelineClient {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn preserve_initdb_archive(
|
||||
self: &Arc<Self>,
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
backoff::retry(
|
||||
|| async {
|
||||
upload::preserve_initdb_archive(&self.storage_impl, tenant_id, timeline_id, cancel)
|
||||
.await
|
||||
},
|
||||
|_e| false,
|
||||
FAILED_DOWNLOAD_WARN_THRESHOLD,
|
||||
FAILED_REMOTE_OP_RETRIES,
|
||||
"preserve_initdb_tar_zst",
|
||||
backoff::Cancel::new(cancel.clone(), || anyhow::anyhow!("Cancelled!")),
|
||||
)
|
||||
.await
|
||||
.context("backing up initdb archive")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Prerequisites: UploadQueue should be in stopped state and deleted_at should be successfuly set.
|
||||
/// The function deletes layer files one by one, then lists the prefix to see if we leaked something
|
||||
/// deletes leaked files if any and proceeds with deletion of index file at the end.
|
||||
@@ -1125,14 +1103,6 @@ impl RemoteTimelineClient {
|
||||
let layer_deletion_count = layers.len();
|
||||
self.deletion_queue_client.push_immediate(layers).await?;
|
||||
|
||||
// Delete the initdb.tar.zst, which is not always present, but deletion attempts of
|
||||
// inexistant objects are not considered errors.
|
||||
let initdb_path =
|
||||
remote_initdb_archive_path(&self.tenant_shard_id.tenant_id, &self.timeline_id);
|
||||
self.deletion_queue_client
|
||||
.push_immediate(vec![initdb_path])
|
||||
.await?;
|
||||
|
||||
// Do not delete index part yet, it is needed for possible retry. If we remove it first
|
||||
// and retry will arrive to different pageserver there wont be any traces of it on remote storage
|
||||
let timeline_storage_path = remote_timeline_path(&self.tenant_shard_id, &self.timeline_id);
|
||||
@@ -1180,8 +1150,10 @@ impl RemoteTimelineClient {
|
||||
if p == &latest_index {
|
||||
return false;
|
||||
}
|
||||
if p.object_name() == Some(INITDB_PRESERVED_PATH) {
|
||||
return false;
|
||||
if let Some(name) = p.object_name() {
|
||||
if name == INITDB_PATH {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
true
|
||||
})
|
||||
@@ -1754,16 +1726,6 @@ pub fn remote_initdb_archive_path(tenant_id: &TenantId, timeline_id: &TimelineId
|
||||
.expect("Failed to construct path")
|
||||
}
|
||||
|
||||
pub fn remote_initdb_preserved_archive_path(
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
) -> RemotePath {
|
||||
RemotePath::from_string(&format!(
|
||||
"tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{INITDB_PRESERVED_PATH}"
|
||||
))
|
||||
.expect("Failed to construct path")
|
||||
}
|
||||
|
||||
pub fn remote_index_path(
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
|
||||
@@ -32,8 +32,7 @@ use utils::id::TimelineId;
|
||||
use super::index::{IndexPart, LayerFileMetadata};
|
||||
use super::{
|
||||
parse_remote_index_path, remote_index_path, remote_initdb_archive_path,
|
||||
remote_initdb_preserved_archive_path, FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES,
|
||||
INITDB_PATH,
|
||||
FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, INITDB_PATH,
|
||||
};
|
||||
|
||||
///
|
||||
@@ -431,9 +430,6 @@ pub(crate) async fn download_initdb_tar_zst(
|
||||
|
||||
let remote_path = remote_initdb_archive_path(&tenant_shard_id.tenant_id, timeline_id);
|
||||
|
||||
let remote_preserved_path =
|
||||
remote_initdb_preserved_archive_path(&tenant_shard_id.tenant_id, timeline_id);
|
||||
|
||||
let timeline_path = conf.timelines_path(tenant_shard_id);
|
||||
|
||||
if !timeline_path.exists() {
|
||||
@@ -460,16 +456,8 @@ pub(crate) async fn download_initdb_tar_zst(
|
||||
.with_context(|| format!("tempfile creation {temp_path}"))
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
let download = match download_cancellable(&cancel_inner, storage.download(&remote_path))
|
||||
.await
|
||||
{
|
||||
Ok(dl) => dl,
|
||||
Err(DownloadError::NotFound) => {
|
||||
download_cancellable(&cancel_inner, storage.download(&remote_preserved_path))
|
||||
.await?
|
||||
}
|
||||
Err(other) => Err(other)?,
|
||||
};
|
||||
let download =
|
||||
download_cancellable(&cancel_inner, storage.download(&remote_path)).await?;
|
||||
let mut download = tokio_util::io::StreamReader::new(download.download_stream);
|
||||
let mut writer = tokio::io::BufWriter::with_capacity(8 * 1024, file);
|
||||
|
||||
|
||||
@@ -13,8 +13,8 @@ use super::Generation;
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
tenant::remote_timeline_client::{
|
||||
index::IndexPart, remote_index_path, remote_initdb_archive_path,
|
||||
remote_initdb_preserved_archive_path, remote_path, upload_cancellable,
|
||||
index::IndexPart, remote_index_path, remote_initdb_archive_path, remote_path,
|
||||
upload_cancellable,
|
||||
},
|
||||
};
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
@@ -144,16 +144,3 @@ pub(crate) async fn upload_initdb_dir(
|
||||
.await
|
||||
.with_context(|| format!("upload initdb dir for '{tenant_id} / {timeline_id}'"))
|
||||
}
|
||||
|
||||
pub(crate) async fn preserve_initdb_archive(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let source_path = remote_initdb_archive_path(tenant_id, timeline_id);
|
||||
let dest_path = remote_initdb_preserved_archive_path(tenant_id, timeline_id);
|
||||
upload_cancellable(cancel, storage.copy_object(&source_path, &dest_path))
|
||||
.await
|
||||
.with_context(|| format!("backing up initdb archive for '{tenant_id} / {timeline_id}'"))
|
||||
}
|
||||
|
||||
@@ -290,7 +290,7 @@ impl Layer {
|
||||
}
|
||||
|
||||
/// Downloads if necessary and creates a guard, which will keep this layer from being evicted.
|
||||
pub(crate) async fn download_and_keep_resident(&self) -> anyhow::Result<ResidentLayer> {
|
||||
pub(crate) async fn download_and_keep_resident(&self) -> Result<ResidentLayer, DownloadError> {
|
||||
let downloaded = self.0.get_or_maybe_download(true, None).await?;
|
||||
|
||||
Ok(ResidentLayer {
|
||||
@@ -1174,7 +1174,7 @@ pub(crate) enum EvictionError {
|
||||
|
||||
/// Error internal to the [`LayerInner::get_or_maybe_download`]
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
enum DownloadError {
|
||||
pub(crate) enum DownloadError {
|
||||
#[error("timeline has already shutdown")]
|
||||
TimelineShutdown,
|
||||
#[error("no remote storage configured")]
|
||||
@@ -1197,6 +1197,15 @@ enum DownloadError {
|
||||
PostStatFailed(#[source] std::io::Error),
|
||||
}
|
||||
|
||||
impl DownloadError {
|
||||
pub fn is_cancelled(&self) -> bool {
|
||||
match self {
|
||||
Self::TimelineShutdown | Self::DownloadCancelled => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub(crate) enum NeedsDownload {
|
||||
NotFound,
|
||||
|
||||
@@ -9,6 +9,7 @@ use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::metrics::TENANT_TASK_EVENTS;
|
||||
use crate::task_mgr;
|
||||
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
|
||||
use crate::tenant::timeline::CompactionError;
|
||||
use crate::tenant::{Tenant, TenantState};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
@@ -181,8 +182,11 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
);
|
||||
error_run_count += 1;
|
||||
let wait_duration = Duration::from_secs_f64(wait_duration);
|
||||
error!(
|
||||
"Compaction failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}",
|
||||
log_compaction_error(
|
||||
&e,
|
||||
error_run_count,
|
||||
&wait_duration,
|
||||
cancel.is_cancelled(),
|
||||
);
|
||||
wait_duration
|
||||
} else {
|
||||
@@ -210,6 +214,38 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
|
||||
}
|
||||
|
||||
fn log_compaction_error(
|
||||
e: &CompactionError,
|
||||
error_run_count: u32,
|
||||
sleep_duration: &std::time::Duration,
|
||||
task_cancelled: bool,
|
||||
) {
|
||||
use crate::tenant::upload_queue::NotInitialized;
|
||||
use crate::tenant::PageReconstructError;
|
||||
use CompactionError::*;
|
||||
|
||||
enum LooksLike {
|
||||
Info,
|
||||
Error,
|
||||
}
|
||||
|
||||
let decision = match e {
|
||||
ShuttingDown => None,
|
||||
_ if task_cancelled => Some(LooksLike::Info),
|
||||
Other(e) => Some(LooksLike::Error),
|
||||
};
|
||||
|
||||
match decision {
|
||||
Some(LooksLike::Info) => info!(
|
||||
"Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:#}",
|
||||
),
|
||||
Some(LooksLike::Error) => error!(
|
||||
"Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:?}",
|
||||
),
|
||||
None => {}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// GC task's main loop
|
||||
///
|
||||
|
||||
@@ -14,7 +14,6 @@ use enumset::EnumSet;
|
||||
use fail::fail_point;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::{
|
||||
keyspace::{key_range_size, KeySpaceAccum},
|
||||
models::{
|
||||
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy,
|
||||
LayerMapInfo, TimelineState,
|
||||
@@ -33,7 +32,7 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::sync::gate::Gate;
|
||||
|
||||
use std::collections::{BTreeMap, BinaryHeap, HashMap, HashSet};
|
||||
use std::collections::{BinaryHeap, HashMap, HashSet};
|
||||
use std::ops::{Deref, Range};
|
||||
use std::pin::pin;
|
||||
use std::sync::atomic::Ordering as AtomicOrdering;
|
||||
@@ -104,11 +103,14 @@ use self::layer_manager::LayerManager;
|
||||
use self::logical_size::LogicalSize;
|
||||
use self::walreceiver::{WalReceiver, WalReceiverConf};
|
||||
|
||||
use super::config::TenantConf;
|
||||
use super::remote_timeline_client::index::{IndexLayerMetadata, IndexPart};
|
||||
use super::remote_timeline_client::RemoteTimelineClient;
|
||||
use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
|
||||
use super::{config::TenantConf, upload_queue::NotInitialized};
|
||||
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
|
||||
use super::{
|
||||
remote_timeline_client::index::{IndexLayerMetadata, IndexPart},
|
||||
storage_layer::layer,
|
||||
};
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
pub(super) enum FlushLoopState {
|
||||
@@ -392,8 +394,7 @@ pub(crate) enum PageReconstructError {
|
||||
#[error("Ancestor LSN wait error: {0}")]
|
||||
AncestorLsnTimeout(#[from] WaitLsnError),
|
||||
|
||||
/// The operation was cancelled
|
||||
#[error("Cancelled")]
|
||||
#[error("timeline shutting down")]
|
||||
Cancelled,
|
||||
|
||||
/// The ancestor of this is being stopped
|
||||
@@ -405,19 +406,17 @@ pub(crate) enum PageReconstructError {
|
||||
WalRedo(anyhow::Error),
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
enum CreateImageLayersError {
|
||||
#[error("timeline shutting down")]
|
||||
Cancelled,
|
||||
|
||||
#[error(transparent)]
|
||||
GetVectoredError(GetVectoredError),
|
||||
|
||||
#[error(transparent)]
|
||||
PageReconstructError(PageReconstructError),
|
||||
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
impl PageReconstructError {
|
||||
/// Returns true if this error indicates a tenant/timeline shutdown alike situation
|
||||
pub(crate) fn is_stopping(&self) -> bool {
|
||||
use PageReconstructError::*;
|
||||
match self {
|
||||
Other(_) => false,
|
||||
AncestorLsnTimeout(_) => false,
|
||||
Cancelled | AncestorStopping(_) => true,
|
||||
WalRedo(_) => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
@@ -427,24 +426,12 @@ enum FlushLayerError {
|
||||
Cancelled,
|
||||
|
||||
#[error(transparent)]
|
||||
CreateImageLayersError(CreateImageLayersError),
|
||||
PageReconstructError(#[from] PageReconstructError),
|
||||
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum GetVectoredError {
|
||||
#[error("timeline shutting down")]
|
||||
Cancelled,
|
||||
|
||||
#[error("Requested too many keys: {0} > {}", Timeline::MAX_GET_VECTORED_KEYS)]
|
||||
Oversized(u64),
|
||||
|
||||
#[error("Requested at invalid LSN: {0}")]
|
||||
InvalidLsn(Lsn),
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub enum LogicalSizeCalculationCause {
|
||||
Initial,
|
||||
@@ -484,45 +471,6 @@ pub(crate) enum WaitLsnError {
|
||||
Timeout(String),
|
||||
}
|
||||
|
||||
// The impls below achieve cancellation mapping for errors.
|
||||
// Perhaps there's a way of achieving this with less cruft.
|
||||
|
||||
impl From<CreateImageLayersError> for CompactionError {
|
||||
fn from(e: CreateImageLayersError) -> Self {
|
||||
match e {
|
||||
CreateImageLayersError::Cancelled => CompactionError::ShuttingDown,
|
||||
_ => CompactionError::Other(e.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<CreateImageLayersError> for FlushLayerError {
|
||||
fn from(e: CreateImageLayersError) -> Self {
|
||||
match e {
|
||||
CreateImageLayersError::Cancelled => FlushLayerError::Cancelled,
|
||||
any => FlushLayerError::CreateImageLayersError(any),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PageReconstructError> for CreateImageLayersError {
|
||||
fn from(e: PageReconstructError) -> Self {
|
||||
match e {
|
||||
PageReconstructError::Cancelled => CreateImageLayersError::Cancelled,
|
||||
_ => CreateImageLayersError::PageReconstructError(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetVectoredError> for CreateImageLayersError {
|
||||
fn from(e: GetVectoredError) -> Self {
|
||||
match e {
|
||||
GetVectoredError::Cancelled => CreateImageLayersError::Cancelled,
|
||||
_ => CreateImageLayersError::GetVectoredError(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Public interface functions
|
||||
impl Timeline {
|
||||
/// Get the LSN where this branch was created
|
||||
@@ -642,53 +590,6 @@ impl Timeline {
|
||||
res
|
||||
}
|
||||
|
||||
pub(crate) const MAX_GET_VECTORED_KEYS: u64 = 32;
|
||||
|
||||
/// Look up multiple page versions at a given LSN
|
||||
///
|
||||
/// This naive implementation will be replaced with a more efficient one
|
||||
/// which actually vectorizes the read path.
|
||||
pub(crate) async fn get_vectored(
|
||||
&self,
|
||||
key_ranges: &[Range<Key>],
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
|
||||
if !lsn.is_valid() {
|
||||
return Err(GetVectoredError::InvalidLsn(lsn));
|
||||
}
|
||||
|
||||
let key_count = key_ranges
|
||||
.iter()
|
||||
.map(|range| key_range_size(range) as u64)
|
||||
.sum();
|
||||
if key_count > Timeline::MAX_GET_VECTORED_KEYS {
|
||||
return Err(GetVectoredError::Oversized(key_count));
|
||||
}
|
||||
|
||||
let mut values = BTreeMap::new();
|
||||
for range in key_ranges {
|
||||
let mut key = range.start;
|
||||
while key != range.end {
|
||||
assert!(!self.shard_identity.is_key_disposable(&key));
|
||||
|
||||
let block = self.get(key, lsn, ctx).await;
|
||||
|
||||
if matches!(
|
||||
block,
|
||||
Err(PageReconstructError::Cancelled | PageReconstructError::AncestorStopping(_))
|
||||
) {
|
||||
return Err(GetVectoredError::Cancelled);
|
||||
}
|
||||
|
||||
values.insert(key, block);
|
||||
key = key.next();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(values)
|
||||
}
|
||||
|
||||
/// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
|
||||
pub fn get_last_record_lsn(&self) -> Lsn {
|
||||
self.last_record_lsn.load().last
|
||||
@@ -946,8 +847,7 @@ impl Timeline {
|
||||
// "enough".
|
||||
let layers = self
|
||||
.create_image_layers(&partitioning, lsn, false, &image_ctx)
|
||||
.await
|
||||
.map_err(anyhow::Error::from)?;
|
||||
.await?;
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
for layer in layers {
|
||||
remote_client.schedule_layer_file_upload(layer)?;
|
||||
@@ -2696,7 +2596,7 @@ impl Timeline {
|
||||
return;
|
||||
}
|
||||
err @ Err(
|
||||
FlushLayerError::Other(_) | FlushLayerError::CreateImageLayersError(_),
|
||||
FlushLayerError::Other(_) | FlushLayerError::PageReconstructError(_),
|
||||
) => {
|
||||
error!("could not flush frozen layer: {err:?}");
|
||||
break err;
|
||||
@@ -2973,21 +2873,6 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn preserve_initdb_archive(&self) -> anyhow::Result<()> {
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
remote_client
|
||||
.preserve_initdb_archive(
|
||||
&self.tenant_shard_id.tenant_id,
|
||||
&self.timeline_id,
|
||||
&self.cancel,
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
bail!("No remote storage configured, but was asked to backup the initdb archive for {} / {}", self.tenant_shard_id.tenant_id, self.timeline_id);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Write out the given frozen in-memory layer as a new L0 delta file. This L0 file will not be tracked
|
||||
// in layer map immediately. The caller is responsible to put it into the layer map.
|
||||
async fn create_delta_layer(
|
||||
@@ -3079,7 +2964,11 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// Is it time to create a new image layer for the given partition?
|
||||
async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
|
||||
async fn time_for_new_image_layer(
|
||||
&self,
|
||||
partition: &KeySpace,
|
||||
lsn: Lsn,
|
||||
) -> anyhow::Result<bool> {
|
||||
let threshold = self.get_image_creation_threshold();
|
||||
|
||||
let guard = self.layers.read().await;
|
||||
@@ -3099,20 +2988,20 @@ impl Timeline {
|
||||
// but the range is already covered by image layers at more recent LSNs. Before we
|
||||
// create a new image layer, check if the range is already covered at more recent LSNs.
|
||||
if !layers
|
||||
.image_layer_exists(&img_range, &(Lsn::min(lsn, *cutoff_lsn)..lsn + 1))
|
||||
.image_layer_exists(&img_range, &(Lsn::min(lsn, *cutoff_lsn)..lsn + 1))?
|
||||
{
|
||||
debug!(
|
||||
"Force generation of layer {}-{} wanted by GC, cutoff={}, lsn={})",
|
||||
img_range.start, img_range.end, cutoff_lsn, lsn
|
||||
);
|
||||
return true;
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for part_range in &partition.ranges {
|
||||
let image_coverage = layers.image_coverage(part_range, lsn);
|
||||
let image_coverage = layers.image_coverage(part_range, lsn)?;
|
||||
for (img_range, last_img) in image_coverage {
|
||||
let img_lsn = if let Some(last_img) = last_img {
|
||||
last_img.get_lsn_range().end
|
||||
@@ -3133,7 +3022,7 @@ impl Timeline {
|
||||
// after we read last_record_lsn, which is passed here in the 'lsn' argument.
|
||||
if img_lsn < lsn {
|
||||
let num_deltas =
|
||||
layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold));
|
||||
layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold))?;
|
||||
|
||||
max_deltas = max_deltas.max(num_deltas);
|
||||
if num_deltas >= threshold {
|
||||
@@ -3141,7 +3030,7 @@ impl Timeline {
|
||||
"key range {}-{}, has {} deltas on this timeline in LSN range {}..{}",
|
||||
img_range.start, img_range.end, num_deltas, img_lsn, lsn
|
||||
);
|
||||
return true;
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3151,7 +3040,7 @@ impl Timeline {
|
||||
max_deltas,
|
||||
"none of the partitioned ranges had >= {threshold} deltas"
|
||||
);
|
||||
false
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, fields(%lsn, %force))]
|
||||
@@ -3161,7 +3050,7 @@ impl Timeline {
|
||||
lsn: Lsn,
|
||||
force: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Vec<ResidentLayer>, CreateImageLayersError> {
|
||||
) -> Result<Vec<ResidentLayer>, PageReconstructError> {
|
||||
let timer = self.metrics.create_images_time_histo.start_timer();
|
||||
let mut image_layers = Vec::new();
|
||||
|
||||
@@ -3179,7 +3068,7 @@ impl Timeline {
|
||||
for partition in partitioning.parts.iter() {
|
||||
let img_range = start..partition.ranges.last().unwrap().end;
|
||||
start = img_range.end;
|
||||
if force || self.time_for_new_image_layer(partition, lsn).await {
|
||||
if force || self.time_for_new_image_layer(partition, lsn).await? {
|
||||
let mut image_layer_writer = ImageLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
@@ -3190,12 +3079,10 @@ impl Timeline {
|
||||
.await?;
|
||||
|
||||
fail_point!("image-layer-writer-fail-before-finish", |_| {
|
||||
Err(CreateImageLayersError::Other(anyhow::anyhow!(
|
||||
Err(PageReconstructError::Other(anyhow::anyhow!(
|
||||
"failpoint image-layer-writer-fail-before-finish"
|
||||
)))
|
||||
});
|
||||
|
||||
let mut key_request_accum = KeySpaceAccum::new();
|
||||
for range in &partition.ranges {
|
||||
let mut key = range.start;
|
||||
while key < range.end {
|
||||
@@ -3208,55 +3095,34 @@ impl Timeline {
|
||||
key = key.next();
|
||||
continue;
|
||||
}
|
||||
|
||||
key_request_accum.add_key(key);
|
||||
if key_request_accum.size() >= Timeline::MAX_GET_VECTORED_KEYS
|
||||
|| key.next() == range.end
|
||||
{
|
||||
let results = self
|
||||
.get_vectored(
|
||||
&key_request_accum.consume_keyspace().ranges,
|
||||
lsn,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
for (img_key, img) in results {
|
||||
let img = match img {
|
||||
Ok(img) => img,
|
||||
Err(err) => {
|
||||
// If we fail to reconstruct a VM or FSM page, we can zero the
|
||||
// page without losing any actual user data. That seems better
|
||||
// than failing repeatedly and getting stuck.
|
||||
//
|
||||
// We had a bug at one point, where we truncated the FSM and VM
|
||||
// in the pageserver, but the Postgres didn't know about that
|
||||
// and continued to generate incremental WAL records for pages
|
||||
// that didn't exist in the pageserver. Trying to replay those
|
||||
// WAL records failed to find the previous image of the page.
|
||||
// This special case allows us to recover from that situation.
|
||||
// See https://github.com/neondatabase/neon/issues/2601.
|
||||
//
|
||||
// Unfortunately we cannot do this for the main fork, or for
|
||||
// any metadata keys, keys, as that would lead to actual data
|
||||
// loss.
|
||||
if is_rel_fsm_block_key(img_key)
|
||||
|| is_rel_vm_block_key(img_key)
|
||||
{
|
||||
warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}");
|
||||
ZERO_PAGE.clone()
|
||||
} else {
|
||||
return Err(
|
||||
CreateImageLayersError::PageReconstructError(err),
|
||||
);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
image_layer_writer.put_image(img_key, &img).await?;
|
||||
let img = match self.get(key, lsn, ctx).await {
|
||||
Ok(img) => img,
|
||||
Err(err) => {
|
||||
// If we fail to reconstruct a VM or FSM page, we can zero the
|
||||
// page without losing any actual user data. That seems better
|
||||
// than failing repeatedly and getting stuck.
|
||||
//
|
||||
// We had a bug at one point, where we truncated the FSM and VM
|
||||
// in the pageserver, but the Postgres didn't know about that
|
||||
// and continued to generate incremental WAL records for pages
|
||||
// that didn't exist in the pageserver. Trying to replay those
|
||||
// WAL records failed to find the previous image of the page.
|
||||
// This special case allows us to recover from that situation.
|
||||
// See https://github.com/neondatabase/neon/issues/2601.
|
||||
//
|
||||
// Unfortunately we cannot do this for the main fork, or for
|
||||
// any metadata keys, keys, as that would lead to actual data
|
||||
// loss.
|
||||
if is_rel_fsm_block_key(key) || is_rel_vm_block_key(key) {
|
||||
warn!("could not reconstruct FSM or VM key {key}, filling with zeros: {err:?}");
|
||||
ZERO_PAGE.clone()
|
||||
} else {
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
image_layer_writer.put_image(key, &img).await?;
|
||||
key = key.next();
|
||||
}
|
||||
}
|
||||
@@ -3348,7 +3214,46 @@ pub(crate) enum CompactionError {
|
||||
ShuttingDown,
|
||||
/// Compaction cannot be done right now; page reconstruction and so on.
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
impl CompactionError {
|
||||
fn other<E>(err: E) -> Self
|
||||
where
|
||||
E: std::error::Error + Send + Sync + 'static,
|
||||
{
|
||||
CompactionError::Other(anyhow::Error::new(err))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PageReconstructError> for CompactionError {
|
||||
fn from(value: PageReconstructError) -> Self {
|
||||
if value.is_stopping() {
|
||||
CompactionError::ShuttingDown
|
||||
} else {
|
||||
CompactionError::other(value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<NotInitialized> for CompactionError {
|
||||
fn from(value: NotInitialized) -> Self {
|
||||
if value.is_stopping() {
|
||||
CompactionError::ShuttingDown
|
||||
} else {
|
||||
CompactionError::other(value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<layer::DownloadError> for CompactionError {
|
||||
fn from(value: layer::DownloadError) -> Self {
|
||||
if value.is_cancelled() {
|
||||
CompactionError::ShuttingDown
|
||||
} else {
|
||||
CompactionError::other(value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
@@ -3481,7 +3386,7 @@ impl Timeline {
|
||||
stats.read_lock_held_spawn_blocking_startup_micros =
|
||||
stats.read_lock_acquisition_micros.till_now(); // set by caller
|
||||
let layers = guard.layer_map();
|
||||
let level0_deltas = layers.get_level0_deltas()?;
|
||||
let level0_deltas = layers.get_level0_deltas();
|
||||
let mut level0_deltas = level0_deltas
|
||||
.into_iter()
|
||||
.map(|x| guard.get_from_desc(&x))
|
||||
@@ -3528,7 +3433,8 @@ impl Timeline {
|
||||
delta
|
||||
.download_and_keep_resident()
|
||||
.await
|
||||
.context("download layer for failpoint")?,
|
||||
.context("download layer for failpoint")
|
||||
.map_err(CompactionError::Other)?,
|
||||
);
|
||||
}
|
||||
tracing::info!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
|
||||
@@ -3612,7 +3518,7 @@ impl Timeline {
|
||||
let mut all_keys = Vec::new();
|
||||
|
||||
for l in deltas_to_compact.iter() {
|
||||
all_keys.extend(l.load_keys(ctx).await?);
|
||||
all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?);
|
||||
}
|
||||
|
||||
// FIXME: should spawn_blocking the rest of this function
|
||||
@@ -3632,7 +3538,10 @@ impl Timeline {
|
||||
// has not so much sense, because largest holes will corresponds field1/field2 changes.
|
||||
// But we are mostly interested to eliminate holes which cause generation of excessive image layers.
|
||||
// That is why it is better to measure size of hole as number of covering image layers.
|
||||
let coverage_size = layers.image_coverage(&key_range, last_record_lsn).len();
|
||||
let coverage_size = layers
|
||||
.image_coverage(&key_range, last_record_lsn)
|
||||
.map_err(CompactionError::Other)?
|
||||
.len();
|
||||
if coverage_size >= min_hole_coverage_size {
|
||||
heap.push(Hole {
|
||||
key_range,
|
||||
@@ -3731,7 +3640,7 @@ impl Timeline {
|
||||
key, lsn, ref val, ..
|
||||
} in all_values_iter
|
||||
{
|
||||
let value = val.load(ctx).await?;
|
||||
let value = val.load(ctx).await.map_err(CompactionError::Other)?;
|
||||
let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
|
||||
// We need to check key boundaries once we reach next key or end of layer with the same key
|
||||
if !same_key || lsn == dup_end_lsn {
|
||||
@@ -3788,7 +3697,8 @@ impl Timeline {
|
||||
.take()
|
||||
.unwrap()
|
||||
.finish(prev_key.unwrap().next(), self)
|
||||
.await?,
|
||||
.await
|
||||
.map_err(CompactionError::Other)?,
|
||||
);
|
||||
writer = None;
|
||||
|
||||
@@ -3818,7 +3728,8 @@ impl Timeline {
|
||||
lsn_range.clone()
|
||||
},
|
||||
)
|
||||
.await?,
|
||||
.await
|
||||
.map_err(CompactionError::Other)?,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -3829,7 +3740,12 @@ impl Timeline {
|
||||
});
|
||||
|
||||
if !self.shard_identity.is_key_disposable(&key) {
|
||||
writer.as_mut().unwrap().put_value(key, lsn, value).await?;
|
||||
writer
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.put_value(key, lsn, value)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?;
|
||||
} else {
|
||||
debug!(
|
||||
"Dropping key {} during compaction (it belongs on shard {:?})",
|
||||
@@ -3845,7 +3761,12 @@ impl Timeline {
|
||||
prev_key = Some(key);
|
||||
}
|
||||
if let Some(writer) = writer {
|
||||
new_layers.push(writer.finish(prev_key.unwrap().next(), self).await?);
|
||||
new_layers.push(
|
||||
writer
|
||||
.finish(prev_key.unwrap().next(), self)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?,
|
||||
);
|
||||
}
|
||||
|
||||
// Sync layers
|
||||
@@ -3874,7 +3795,8 @@ impl Timeline {
|
||||
// minimize latency.
|
||||
par_fsync::par_fsync_async(&layer_paths)
|
||||
.await
|
||||
.context("fsync all new layers")?;
|
||||
.context("fsync all new layers")
|
||||
.map_err(CompactionError::Other)?;
|
||||
|
||||
let timeline_dir = self
|
||||
.conf
|
||||
@@ -3882,7 +3804,8 @@ impl Timeline {
|
||||
|
||||
par_fsync::par_fsync_async(&[timeline_dir])
|
||||
.await
|
||||
.context("fsync of timeline dir")?;
|
||||
.context("fsync of timeline dir")
|
||||
.map_err(CompactionError::Other)?;
|
||||
}
|
||||
|
||||
stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
|
||||
@@ -4258,7 +4181,7 @@ impl Timeline {
|
||||
// we cannot remove C, even though it's older than 2500, because
|
||||
// the delta layer 2000-3000 depends on it.
|
||||
if !layers
|
||||
.image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))
|
||||
.image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))?
|
||||
{
|
||||
debug!("keeping {} because it is the latest layer", l.filename());
|
||||
// Collect delta key ranges that need image layers to allow garbage
|
||||
|
||||
@@ -126,6 +126,27 @@ pub(super) struct UploadQueueStopped {
|
||||
pub(super) deleted_at: SetDeletedFlagProgress,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum NotInitialized {
|
||||
#[error("queue is in state Uninitialized")]
|
||||
Uninitialized,
|
||||
#[error("queue is in state Stopping")]
|
||||
Stopped,
|
||||
#[error("queue is shutting down")]
|
||||
ShuttingDown,
|
||||
}
|
||||
|
||||
impl NotInitialized {
|
||||
pub(crate) fn is_stopping(&self) -> bool {
|
||||
use NotInitialized::*;
|
||||
match self {
|
||||
Uninitialized => false,
|
||||
Stopped => true,
|
||||
ShuttingDown => true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl UploadQueue {
|
||||
pub(crate) fn initialize_empty_remote(
|
||||
&mut self,
|
||||
@@ -213,18 +234,20 @@ impl UploadQueue {
|
||||
Ok(self.initialized_mut().expect("we just set it"))
|
||||
}
|
||||
|
||||
pub(crate) fn initialized_mut(&mut self) -> anyhow::Result<&mut UploadQueueInitialized> {
|
||||
pub(crate) fn initialized_mut(
|
||||
&mut self,
|
||||
) -> Result<&mut UploadQueueInitialized, NotInitialized> {
|
||||
use UploadQueue::*;
|
||||
match self {
|
||||
UploadQueue::Uninitialized | UploadQueue::Stopped(_) => {
|
||||
anyhow::bail!("queue is in state {}", self.as_str())
|
||||
}
|
||||
UploadQueue::Initialized(x) => {
|
||||
if !x.shutting_down {
|
||||
Ok(x)
|
||||
Uninitialized => Err(NotInitialized::Uninitialized.into()),
|
||||
Initialized(x) => {
|
||||
if x.shutting_down {
|
||||
Err(NotInitialized::ShuttingDown.into())
|
||||
} else {
|
||||
anyhow::bail!("queue is shutting down")
|
||||
Ok(x)
|
||||
}
|
||||
}
|
||||
Stopped(_) => Err(NotInitialized::Stopped.into()),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -356,7 +356,12 @@ impl VirtualFile {
|
||||
Ok(vfile)
|
||||
}
|
||||
|
||||
/// Async & [`VirtualFile`]-enabled version of [`::utils::crashsafe::overwrite`].
|
||||
/// Writes a file to the specified `final_path` in a crash safe fasion
|
||||
///
|
||||
/// The file is first written to the specified tmp_path, and in a second
|
||||
/// step, the tmp path is renamed to the final path. As renames are
|
||||
/// atomic, a crash during the write operation will never leave behind a
|
||||
/// partially written file.
|
||||
pub async fn crashsafe_overwrite(
|
||||
final_path: &Utf8Path,
|
||||
tmp_path: &Utf8Path,
|
||||
|
||||
@@ -836,8 +836,9 @@ impl WalRedoProcess {
|
||||
let mut proc = { input }; // TODO: remove this legacy rename, but this keep the patch small.
|
||||
let mut nwrite = 0usize;
|
||||
|
||||
let mut stdin_pollfds = [PollFd::new(proc.stdin.as_raw_fd(), PollFlags::POLLOUT)];
|
||||
|
||||
while nwrite < writebuf.len() {
|
||||
let mut stdin_pollfds = [PollFd::new(&proc.stdin, PollFlags::POLLOUT)];
|
||||
let n = loop {
|
||||
match nix::poll::poll(&mut stdin_pollfds[..], wal_redo_timeout.as_millis() as i32) {
|
||||
Err(nix::errno::Errno::EINTR) => continue,
|
||||
@@ -876,6 +877,7 @@ impl WalRedoProcess {
|
||||
// advancing processed responses number.
|
||||
|
||||
let mut output = self.stdout.lock().unwrap();
|
||||
let mut stdout_pollfds = [PollFd::new(output.stdout.as_raw_fd(), PollFlags::POLLIN)];
|
||||
let n_processed_responses = output.n_processed_responses;
|
||||
while n_processed_responses + output.pending_responses.len() <= request_no {
|
||||
// We expect the WAL redo process to respond with an 8k page image. We read it
|
||||
@@ -883,7 +885,6 @@ impl WalRedoProcess {
|
||||
let mut resultbuf = vec![0; BLCKSZ.into()];
|
||||
let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
|
||||
while nresult < BLCKSZ.into() {
|
||||
let mut stdout_pollfds = [PollFd::new(&output.stdout, PollFlags::POLLIN)];
|
||||
// We do two things simultaneously: reading response from stdout
|
||||
// and forward any logging information that the child writes to its stderr to the page server's log.
|
||||
let n = loop {
|
||||
|
||||
@@ -4,9 +4,7 @@ pub mod backend;
|
||||
pub use backend::BackendType;
|
||||
|
||||
mod credentials;
|
||||
pub use credentials::{
|
||||
check_peer_addr_is_in_list, endpoint_sni, ComputeUserInfoMaybeEndpoint, IpPattern,
|
||||
};
|
||||
pub use credentials::{check_peer_addr_is_in_list, endpoint_sni, ComputeUserInfoMaybeEndpoint};
|
||||
|
||||
mod password_hack;
|
||||
pub use password_hack::parse_endpoint_param;
|
||||
|
||||
@@ -35,8 +35,6 @@ use std::sync::Arc;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use super::IpPattern;
|
||||
|
||||
/// This type serves two purposes:
|
||||
///
|
||||
/// * When `T` is `()`, it's just a regular auth backend selector
|
||||
@@ -57,7 +55,7 @@ pub enum BackendType<'a, T> {
|
||||
|
||||
pub trait TestBackend: Send + Sync + 'static {
|
||||
fn wake_compute(&self) -> Result<CachedNodeInfo, console::errors::WakeComputeError>;
|
||||
fn get_allowed_ips(&self) -> Result<Vec<IpPattern>, console::errors::GetAuthInfoError>;
|
||||
fn get_allowed_ips(&self) -> Result<Vec<SmolStr>, console::errors::GetAuthInfoError>;
|
||||
}
|
||||
|
||||
impl std::fmt::Display for BackendType<'_, ()> {
|
||||
@@ -204,18 +202,21 @@ async fn auth_quirks(
|
||||
if !check_peer_addr_is_in_list(&ctx.peer_addr, &allowed_ips) {
|
||||
return Err(auth::AuthError::ip_address_not_allowed());
|
||||
}
|
||||
let cached_secret = api.get_role_secret(ctx, &info).await?;
|
||||
let maybe_secret = api.get_role_secret(ctx, &info).await?;
|
||||
|
||||
let secret = cached_secret.value.clone().unwrap_or_else(|| {
|
||||
let cached_secret = maybe_secret.unwrap_or_else(|| {
|
||||
// If we don't have an authentication secret, we mock one to
|
||||
// prevent malicious probing (possible due to missing protocol steps).
|
||||
// This mocked secret will never lead to successful authentication.
|
||||
info!("authentication info not found, mocking it");
|
||||
AuthSecret::Scram(scram::ServerSecret::mock(&info.user, rand::random()))
|
||||
Cached::new_uncached(AuthSecret::Scram(scram::ServerSecret::mock(
|
||||
&info.user,
|
||||
rand::random(),
|
||||
)))
|
||||
});
|
||||
match authenticate_with_secret(
|
||||
ctx,
|
||||
secret,
|
||||
cached_secret.value.clone(),
|
||||
info,
|
||||
client,
|
||||
unauthenticated_password,
|
||||
|
||||
@@ -7,7 +7,7 @@ use crate::{
|
||||
use itertools::Itertools;
|
||||
use pq_proto::StartupMessageParams;
|
||||
use smol_str::SmolStr;
|
||||
use std::{collections::HashSet, net::IpAddr, str::FromStr};
|
||||
use std::{collections::HashSet, net::IpAddr};
|
||||
use thiserror::Error;
|
||||
use tracing::{info, warn};
|
||||
|
||||
@@ -151,51 +151,30 @@ impl ComputeUserInfoMaybeEndpoint {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn check_peer_addr_is_in_list(peer_addr: &IpAddr, ip_list: &[IpPattern]) -> bool {
|
||||
ip_list.is_empty() || ip_list.iter().any(|pattern| check_ip(peer_addr, pattern))
|
||||
pub fn check_peer_addr_is_in_list(peer_addr: &IpAddr, ip_list: &Vec<SmolStr>) -> bool {
|
||||
if ip_list.is_empty() {
|
||||
return true;
|
||||
}
|
||||
for ip in ip_list {
|
||||
// We expect that all ip addresses from control plane are correct.
|
||||
// However, if some of them are broken, we still can check the others.
|
||||
match parse_ip_pattern(ip) {
|
||||
Ok(pattern) => {
|
||||
if check_ip(peer_addr, &pattern) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
Err(err) => warn!("Cannot parse ip: {}; err: {}", ip, err),
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
pub enum IpPattern {
|
||||
enum IpPattern {
|
||||
Subnet(ipnet::IpNet),
|
||||
Range(IpAddr, IpAddr),
|
||||
Single(IpAddr),
|
||||
None,
|
||||
}
|
||||
|
||||
impl<'de> serde::de::Deserialize<'de> for IpPattern {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
struct StrVisitor;
|
||||
impl<'de> serde::de::Visitor<'de> for StrVisitor {
|
||||
type Value = IpPattern;
|
||||
|
||||
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(formatter, "comma separated list with ip address, ip address range, or ip address subnet mask")
|
||||
}
|
||||
|
||||
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
|
||||
where
|
||||
E: serde::de::Error,
|
||||
{
|
||||
Ok(parse_ip_pattern(v).unwrap_or_else(|e| {
|
||||
warn!("Cannot parse ip pattern {v}: {e}");
|
||||
IpPattern::None
|
||||
}))
|
||||
}
|
||||
}
|
||||
deserializer.deserialize_str(StrVisitor)
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for IpPattern {
|
||||
type Err = anyhow::Error;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
parse_ip_pattern(s)
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_ip_pattern(pattern: &str) -> anyhow::Result<IpPattern> {
|
||||
@@ -217,7 +196,6 @@ fn check_ip(ip: &IpAddr, pattern: &IpPattern) -> bool {
|
||||
IpPattern::Subnet(subnet) => subnet.contains(ip),
|
||||
IpPattern::Range(start, end) => start <= ip && ip <= end,
|
||||
IpPattern::Single(addr) => addr == ip,
|
||||
IpPattern::None => false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -228,7 +206,6 @@ fn project_name_valid(name: &str) -> bool {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use serde_json::json;
|
||||
use ComputeUserInfoParseError::*;
|
||||
|
||||
#[test]
|
||||
@@ -438,17 +415,21 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_check_peer_addr_is_in_list() {
|
||||
fn check(v: serde_json::Value) -> bool {
|
||||
let peer_addr = IpAddr::from([127, 0, 0, 1]);
|
||||
let ip_list: Vec<IpPattern> = serde_json::from_value(v).unwrap();
|
||||
check_peer_addr_is_in_list(&peer_addr, &ip_list)
|
||||
}
|
||||
|
||||
assert!(check(json!([])));
|
||||
assert!(check(json!(["127.0.0.1"])));
|
||||
assert!(!check(json!(["8.8.8.8"])));
|
||||
let peer_addr = IpAddr::from([127, 0, 0, 1]);
|
||||
assert!(check_peer_addr_is_in_list(&peer_addr, &vec![]));
|
||||
assert!(check_peer_addr_is_in_list(
|
||||
&peer_addr,
|
||||
&vec!["127.0.0.1".into()]
|
||||
));
|
||||
assert!(!check_peer_addr_is_in_list(
|
||||
&peer_addr,
|
||||
&vec!["8.8.8.8".into()]
|
||||
));
|
||||
// If there is an incorrect address, it will be skipped.
|
||||
assert!(check(json!(["88.8.8", "127.0.0.1"])));
|
||||
assert!(check_peer_addr_is_in_list(
|
||||
&peer_addr,
|
||||
&vec!["88.8.8".into(), "127.0.0.1".into()]
|
||||
));
|
||||
}
|
||||
#[test]
|
||||
fn test_parse_ip_v4() -> anyhow::Result<()> {
|
||||
|
||||
65
proxy/src/cache/project_info.rs
vendored
65
proxy/src/cache/project_info.rs
vendored
@@ -11,7 +11,7 @@ use smol_str::SmolStr;
|
||||
use tokio::time::Instant;
|
||||
use tracing::{debug, info};
|
||||
|
||||
use crate::{auth::IpPattern, config::ProjectInfoCacheOptions, console::AuthSecret};
|
||||
use crate::{config::ProjectInfoCacheOptions, console::AuthSecret};
|
||||
|
||||
use super::{Cache, Cached};
|
||||
|
||||
@@ -44,8 +44,8 @@ impl<T> From<T> for Entry<T> {
|
||||
|
||||
#[derive(Default)]
|
||||
struct EndpointInfo {
|
||||
secret: std::collections::HashMap<SmolStr, Entry<Option<AuthSecret>>>,
|
||||
allowed_ips: Option<Entry<Arc<Vec<IpPattern>>>>,
|
||||
secret: std::collections::HashMap<SmolStr, Entry<AuthSecret>>,
|
||||
allowed_ips: Option<Entry<Arc<Vec<SmolStr>>>>,
|
||||
}
|
||||
|
||||
impl EndpointInfo {
|
||||
@@ -60,7 +60,7 @@ impl EndpointInfo {
|
||||
role_name: &SmolStr,
|
||||
valid_since: Instant,
|
||||
ignore_cache_since: Option<Instant>,
|
||||
) -> Option<(Option<AuthSecret>, bool)> {
|
||||
) -> Option<(AuthSecret, bool)> {
|
||||
if let Some(secret) = self.secret.get(role_name) {
|
||||
if valid_since < secret.created_at {
|
||||
return Some((
|
||||
@@ -76,7 +76,7 @@ impl EndpointInfo {
|
||||
&self,
|
||||
valid_since: Instant,
|
||||
ignore_cache_since: Option<Instant>,
|
||||
) -> Option<(Arc<Vec<IpPattern>>, bool)> {
|
||||
) -> Option<(Arc<Vec<SmolStr>>, bool)> {
|
||||
if let Some(allowed_ips) = &self.allowed_ips {
|
||||
if valid_since < allowed_ips.created_at {
|
||||
return Some((
|
||||
@@ -169,7 +169,7 @@ impl ProjectInfoCacheImpl {
|
||||
&self,
|
||||
endpoint_id: &SmolStr,
|
||||
role_name: &SmolStr,
|
||||
) -> Option<Cached<&Self, Option<AuthSecret>>> {
|
||||
) -> Option<Cached<&Self, AuthSecret>> {
|
||||
let (valid_since, ignore_cache_since) = self.get_cache_times();
|
||||
let endpoint_info = self.cache.get(endpoint_id)?;
|
||||
let (value, ignore_cache) =
|
||||
@@ -189,7 +189,7 @@ impl ProjectInfoCacheImpl {
|
||||
pub fn get_allowed_ips(
|
||||
&self,
|
||||
endpoint_id: &SmolStr,
|
||||
) -> Option<Cached<&Self, Arc<Vec<IpPattern>>>> {
|
||||
) -> Option<Cached<&Self, Arc<Vec<SmolStr>>>> {
|
||||
let (valid_since, ignore_cache_since) = self.get_cache_times();
|
||||
let endpoint_info = self.cache.get(endpoint_id)?;
|
||||
let value = endpoint_info.get_allowed_ips(valid_since, ignore_cache_since);
|
||||
@@ -208,7 +208,7 @@ impl ProjectInfoCacheImpl {
|
||||
project_id: &SmolStr,
|
||||
endpoint_id: &SmolStr,
|
||||
role_name: &SmolStr,
|
||||
secret: Option<AuthSecret>,
|
||||
secret: AuthSecret,
|
||||
) {
|
||||
if self.cache.len() >= self.config.size {
|
||||
// If there are too many entries, wait until the next gc cycle.
|
||||
@@ -224,7 +224,7 @@ impl ProjectInfoCacheImpl {
|
||||
&self,
|
||||
project_id: &SmolStr,
|
||||
endpoint_id: &SmolStr,
|
||||
allowed_ips: Arc<Vec<IpPattern>>,
|
||||
allowed_ips: Arc<Vec<SmolStr>>,
|
||||
) {
|
||||
if self.cache.len() >= self.config.size {
|
||||
// If there are too many entries, wait until the next gc cycle.
|
||||
@@ -364,15 +364,9 @@ mod tests {
|
||||
let endpoint_id = "endpoint".into();
|
||||
let user1: SmolStr = "user1".into();
|
||||
let user2: SmolStr = "user2".into();
|
||||
let secret1 = Some(AuthSecret::Scram(ServerSecret::mock(
|
||||
user1.as_str(),
|
||||
[1; 32],
|
||||
)));
|
||||
let secret2 = None;
|
||||
let allowed_ips = Arc::new(vec![
|
||||
"127.0.0.1".parse().unwrap(),
|
||||
"127.0.0.2".parse().unwrap(),
|
||||
]);
|
||||
let secret1 = AuthSecret::Scram(ServerSecret::mock(user1.as_str(), [1; 32]));
|
||||
let secret2 = AuthSecret::Scram(ServerSecret::mock(user2.as_str(), [2; 32]));
|
||||
let allowed_ips = Arc::new(vec!["allowed_ip1".into(), "allowed_ip2".into()]);
|
||||
cache.insert_role_secret(&project_id, &endpoint_id, &user1, secret1.clone());
|
||||
cache.insert_role_secret(&project_id, &endpoint_id, &user2, secret2.clone());
|
||||
cache.insert_allowed_ips(&project_id, &endpoint_id, allowed_ips.clone());
|
||||
@@ -386,10 +380,7 @@ mod tests {
|
||||
|
||||
// Shouldn't add more than 2 roles.
|
||||
let user3: SmolStr = "user3".into();
|
||||
let secret3 = Some(AuthSecret::Scram(ServerSecret::mock(
|
||||
user3.as_str(),
|
||||
[3; 32],
|
||||
)));
|
||||
let secret3 = AuthSecret::Scram(ServerSecret::mock(user3.as_str(), [3; 32]));
|
||||
cache.insert_role_secret(&project_id, &endpoint_id, &user3, secret3.clone());
|
||||
assert!(cache.get_role_secret(&endpoint_id, &user3).is_none());
|
||||
|
||||
@@ -422,18 +413,9 @@ mod tests {
|
||||
let endpoint_id = "endpoint".into();
|
||||
let user1: SmolStr = "user1".into();
|
||||
let user2: SmolStr = "user2".into();
|
||||
let secret1 = Some(AuthSecret::Scram(ServerSecret::mock(
|
||||
user1.as_str(),
|
||||
[1; 32],
|
||||
)));
|
||||
let secret2 = Some(AuthSecret::Scram(ServerSecret::mock(
|
||||
user2.as_str(),
|
||||
[2; 32],
|
||||
)));
|
||||
let allowed_ips = Arc::new(vec![
|
||||
"127.0.0.1".parse().unwrap(),
|
||||
"127.0.0.2".parse().unwrap(),
|
||||
]);
|
||||
let secret1 = AuthSecret::Scram(ServerSecret::mock(user1.as_str(), [1; 32]));
|
||||
let secret2 = AuthSecret::Scram(ServerSecret::mock(user2.as_str(), [2; 32]));
|
||||
let allowed_ips = Arc::new(vec!["allowed_ip1".into(), "allowed_ip2".into()]);
|
||||
cache.insert_role_secret(&project_id, &endpoint_id, &user1, secret1.clone());
|
||||
cache.insert_role_secret(&project_id, &endpoint_id, &user2, secret2.clone());
|
||||
cache.insert_allowed_ips(&project_id, &endpoint_id, allowed_ips.clone());
|
||||
@@ -477,18 +459,9 @@ mod tests {
|
||||
let endpoint_id = "endpoint".into();
|
||||
let user1: SmolStr = "user1".into();
|
||||
let user2: SmolStr = "user2".into();
|
||||
let secret1 = Some(AuthSecret::Scram(ServerSecret::mock(
|
||||
user1.as_str(),
|
||||
[1; 32],
|
||||
)));
|
||||
let secret2 = Some(AuthSecret::Scram(ServerSecret::mock(
|
||||
user2.as_str(),
|
||||
[2; 32],
|
||||
)));
|
||||
let allowed_ips = Arc::new(vec![
|
||||
"127.0.0.1".parse().unwrap(),
|
||||
"127.0.0.2".parse().unwrap(),
|
||||
]);
|
||||
let secret1 = AuthSecret::Scram(ServerSecret::mock(user1.as_str(), [1; 32]));
|
||||
let secret2 = AuthSecret::Scram(ServerSecret::mock(user2.as_str(), [2; 32]));
|
||||
let allowed_ips = Arc::new(vec!["allowed_ip1".into(), "allowed_ip2".into()]);
|
||||
cache.insert_role_secret(&project_id, &endpoint_id, &user1, secret1.clone());
|
||||
cache.clone().disable_ttl();
|
||||
tokio::time::advance(Duration::from_millis(100)).await;
|
||||
|
||||
@@ -2,8 +2,6 @@ use serde::Deserialize;
|
||||
use smol_str::SmolStr;
|
||||
use std::fmt;
|
||||
|
||||
use crate::auth::IpPattern;
|
||||
|
||||
/// Generic error response with human-readable description.
|
||||
/// Note that we can't always present it to user as is.
|
||||
#[derive(Debug, Deserialize)]
|
||||
@@ -16,7 +14,7 @@ pub struct ConsoleError {
|
||||
#[derive(Deserialize)]
|
||||
pub struct GetRoleSecret {
|
||||
pub role_secret: Box<str>,
|
||||
pub allowed_ips: Option<Vec<IpPattern>>,
|
||||
pub allowed_ips: Option<Vec<Box<str>>>,
|
||||
pub project_id: Option<Box<str>>,
|
||||
}
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ pub mod neon;
|
||||
|
||||
use super::messages::MetricsAuxInfo;
|
||||
use crate::{
|
||||
auth::{backend::ComputeUserInfo, IpPattern},
|
||||
auth::backend::ComputeUserInfo,
|
||||
cache::{project_info::ProjectInfoCacheImpl, Cached, TimedLru},
|
||||
compute,
|
||||
config::{CacheOptions, ProjectInfoCacheOptions},
|
||||
@@ -212,7 +212,7 @@ pub enum AuthSecret {
|
||||
pub struct AuthInfo {
|
||||
pub secret: Option<AuthSecret>,
|
||||
/// List of IP addresses allowed for the autorization.
|
||||
pub allowed_ips: Vec<IpPattern>,
|
||||
pub allowed_ips: Vec<SmolStr>,
|
||||
/// Project ID. This is used for cache invalidation.
|
||||
pub project_id: Option<SmolStr>,
|
||||
}
|
||||
@@ -235,8 +235,8 @@ pub struct NodeInfo {
|
||||
|
||||
pub type NodeInfoCache = TimedLru<SmolStr, NodeInfo>;
|
||||
pub type CachedNodeInfo = Cached<&'static NodeInfoCache>;
|
||||
pub type CachedRoleSecret = Cached<&'static ProjectInfoCacheImpl, Option<AuthSecret>>;
|
||||
pub type CachedAllowedIps = Cached<&'static ProjectInfoCacheImpl, Arc<Vec<IpPattern>>>;
|
||||
pub type CachedRoleSecret = Cached<&'static ProjectInfoCacheImpl, AuthSecret>;
|
||||
pub type CachedAllowedIps = Cached<&'static ProjectInfoCacheImpl, Arc<Vec<SmolStr>>>;
|
||||
|
||||
/// This will allocate per each call, but the http requests alone
|
||||
/// already require a few allocations, so it should be fine.
|
||||
@@ -249,7 +249,7 @@ pub trait Api {
|
||||
&self,
|
||||
ctx: &mut RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<CachedRoleSecret, errors::GetAuthInfoError>;
|
||||
) -> Result<Option<CachedRoleSecret>, errors::GetAuthInfoError>;
|
||||
|
||||
async fn get_allowed_ips(
|
||||
&self,
|
||||
@@ -280,7 +280,7 @@ impl Api for ConsoleBackend {
|
||||
&self,
|
||||
ctx: &mut RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<CachedRoleSecret, errors::GetAuthInfoError> {
|
||||
) -> Result<Option<CachedRoleSecret>, errors::GetAuthInfoError> {
|
||||
use ConsoleBackend::*;
|
||||
match self {
|
||||
Console(api) => api.get_role_secret(ctx, user_info).await,
|
||||
|
||||
@@ -4,13 +4,14 @@ use super::{
|
||||
errors::{ApiError, GetAuthInfoError, WakeComputeError},
|
||||
AuthInfo, AuthSecret, CachedNodeInfo, NodeInfo,
|
||||
};
|
||||
use crate::cache::Cached;
|
||||
use crate::console::provider::{CachedAllowedIps, CachedRoleSecret};
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::{auth::backend::ComputeUserInfo, compute, error::io_error, scram, url::ApiUrl};
|
||||
use crate::{auth::IpPattern, cache::Cached};
|
||||
use async_trait::async_trait;
|
||||
use futures::TryFutureExt;
|
||||
use std::{str::FromStr, sync::Arc};
|
||||
use smol_str::SmolStr;
|
||||
use std::sync::Arc;
|
||||
use thiserror::Error;
|
||||
use tokio_postgres::{config::SslMode, Client};
|
||||
use tracing::{error, info, info_span, warn, Instrument};
|
||||
@@ -87,9 +88,7 @@ impl Api {
|
||||
{
|
||||
Some(s) => {
|
||||
info!("got allowed_ips: {s}");
|
||||
s.split(',')
|
||||
.map(|s| IpPattern::from_str(s).unwrap())
|
||||
.collect()
|
||||
s.split(',').map(String::from).collect()
|
||||
}
|
||||
None => vec![],
|
||||
};
|
||||
@@ -101,7 +100,7 @@ impl Api {
|
||||
.await?;
|
||||
Ok(AuthInfo {
|
||||
secret,
|
||||
allowed_ips,
|
||||
allowed_ips: allowed_ips.iter().map(SmolStr::from).collect(),
|
||||
project_id: None,
|
||||
})
|
||||
}
|
||||
@@ -151,10 +150,12 @@ impl super::Api for Api {
|
||||
&self,
|
||||
_ctx: &mut RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<CachedRoleSecret, GetAuthInfoError> {
|
||||
Ok(CachedRoleSecret::new_uncached(
|
||||
self.do_get_auth_info(user_info).await?.secret,
|
||||
))
|
||||
) -> Result<Option<CachedRoleSecret>, GetAuthInfoError> {
|
||||
Ok(self
|
||||
.do_get_auth_info(user_info)
|
||||
.await?
|
||||
.secret
|
||||
.map(CachedRoleSecret::new_uncached))
|
||||
}
|
||||
|
||||
async fn get_allowed_ips(
|
||||
|
||||
@@ -14,6 +14,7 @@ use crate::{
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use futures::TryFutureExt;
|
||||
use itertools::Itertools;
|
||||
use smol_str::SmolStr;
|
||||
use std::sync::Arc;
|
||||
use tokio::time::Instant;
|
||||
@@ -85,18 +86,18 @@ impl Api {
|
||||
},
|
||||
};
|
||||
|
||||
let secret = if body.role_secret.is_empty() {
|
||||
None
|
||||
} else {
|
||||
let secret = scram::ServerSecret::parse(&body.role_secret)
|
||||
.map(AuthSecret::Scram)
|
||||
.ok_or(GetAuthInfoError::BadSecret)?;
|
||||
Some(secret)
|
||||
};
|
||||
let allowed_ips = body.allowed_ips.unwrap_or_default();
|
||||
let secret = scram::ServerSecret::parse(&body.role_secret)
|
||||
.map(AuthSecret::Scram)
|
||||
.ok_or(GetAuthInfoError::BadSecret)?;
|
||||
let allowed_ips = body
|
||||
.allowed_ips
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.map(SmolStr::from)
|
||||
.collect_vec();
|
||||
ALLOWED_IPS_NUMBER.observe(allowed_ips.len() as f64);
|
||||
Ok(AuthInfo {
|
||||
secret,
|
||||
secret: Some(secret),
|
||||
allowed_ips,
|
||||
project_id: body.project_id.map(SmolStr::from),
|
||||
})
|
||||
@@ -171,20 +172,19 @@ impl super::Api for Api {
|
||||
&self,
|
||||
ctx: &mut RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<CachedRoleSecret, GetAuthInfoError> {
|
||||
) -> Result<Option<CachedRoleSecret>, GetAuthInfoError> {
|
||||
let ep = &user_info.endpoint;
|
||||
let user = &user_info.user;
|
||||
if let Some(role_secret) = self.caches.project_info.get_role_secret(ep, user) {
|
||||
return Ok(role_secret);
|
||||
return Ok(Some(role_secret));
|
||||
}
|
||||
let auth_info = self.do_get_auth_info(ctx, user_info).await?;
|
||||
if let Some(project_id) = auth_info.project_id {
|
||||
self.caches.project_info.insert_role_secret(
|
||||
&project_id,
|
||||
ep,
|
||||
user,
|
||||
auth_info.secret.clone(),
|
||||
);
|
||||
if let Some(secret) = &auth_info.secret {
|
||||
self.caches
|
||||
.project_info
|
||||
.insert_role_secret(&project_id, ep, user, secret.clone())
|
||||
}
|
||||
self.caches.project_info.insert_allowed_ips(
|
||||
&project_id,
|
||||
ep,
|
||||
@@ -192,7 +192,7 @@ impl super::Api for Api {
|
||||
);
|
||||
}
|
||||
// When we just got a secret, we don't need to invalidate it.
|
||||
Ok(Cached::new_uncached(auth_info.secret))
|
||||
Ok(auth_info.secret.map(Cached::new_uncached))
|
||||
}
|
||||
|
||||
async fn get_allowed_ips(
|
||||
@@ -214,12 +214,11 @@ impl super::Api for Api {
|
||||
let allowed_ips = Arc::new(auth_info.allowed_ips);
|
||||
let user = &user_info.user;
|
||||
if let Some(project_id) = auth_info.project_id {
|
||||
self.caches.project_info.insert_role_secret(
|
||||
&project_id,
|
||||
ep,
|
||||
user,
|
||||
auth_info.secret.clone(),
|
||||
);
|
||||
if let Some(secret) = &auth_info.secret {
|
||||
self.caches
|
||||
.project_info
|
||||
.insert_role_secret(&project_id, ep, user, secret.clone())
|
||||
}
|
||||
self.caches
|
||||
.project_info
|
||||
.insert_allowed_ips(&project_id, ep, allowed_ips.clone());
|
||||
|
||||
@@ -6,13 +6,13 @@ use super::connect_compute::ConnectMechanism;
|
||||
use super::retry::ShouldRetry;
|
||||
use super::*;
|
||||
use crate::auth::backend::{ComputeUserInfo, TestBackend};
|
||||
use crate::auth::IpPattern;
|
||||
use crate::config::CertResolver;
|
||||
use crate::console::{self, CachedNodeInfo, NodeInfo};
|
||||
use crate::proxy::retry::{retry_after, NUM_RETRIES_CONNECT};
|
||||
use crate::{auth, http, sasl, scram};
|
||||
use async_trait::async_trait;
|
||||
use rstest::rstest;
|
||||
use smol_str::SmolStr;
|
||||
use tokio_postgres::config::SslMode;
|
||||
use tokio_postgres::tls::{MakeTlsConnect, NoTls};
|
||||
use tokio_postgres_rustls::{MakeRustlsConnect, RustlsStream};
|
||||
@@ -471,7 +471,7 @@ impl TestBackend for TestConnectMechanism {
|
||||
}
|
||||
}
|
||||
|
||||
fn get_allowed_ips(&self) -> Result<Vec<IpPattern>, console::errors::GetAuthInfoError> {
|
||||
fn get_allowed_ips(&self) -> Result<Vec<SmolStr>, console::errors::GetAuthInfoError> {
|
||||
unimplemented!("not used in tests")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -526,17 +526,6 @@ class PageserverHttpClient(requests.Session):
|
||||
res_json = res.json()
|
||||
assert res_json is None
|
||||
|
||||
def timeline_preserve_initdb_archive(
|
||||
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId
|
||||
):
|
||||
log.info(
|
||||
f"Requesting initdb archive preservation for tenant {tenant_id} and timeline {timeline_id}"
|
||||
)
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/preserve_initdb_archive",
|
||||
)
|
||||
self.verbose_error(res)
|
||||
|
||||
def timeline_get_lsn_by_timestamp(
|
||||
self,
|
||||
tenant_id: Union[TenantId, TenantShardId],
|
||||
|
||||
@@ -7,13 +7,11 @@ from typing import List, Optional
|
||||
|
||||
import pytest
|
||||
import toml
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.pageserver.utils import (
|
||||
timeline_delete_wait_completed,
|
||||
wait_for_last_record_lsn,
|
||||
@@ -271,20 +269,14 @@ def check_neon_works(env: NeonEnv, test_output_dir: Path, sql_dump_path: Path, r
|
||||
timeline_id = env.initial_timeline
|
||||
pg_version = env.pg_version
|
||||
|
||||
try:
|
||||
pageserver_http.timeline_preserve_initdb_archive(tenant_id, timeline_id)
|
||||
except PageserverApiException as e:
|
||||
# Allow the error as we might be running the old pageserver binary
|
||||
log.info(f"Got allowed error: '{e}'")
|
||||
|
||||
# Delete all files from local_fs_remote_storage except initdb-preserved.tar.zst,
|
||||
# Delete all files from local_fs_remote_storage except initdb.tar.zst,
|
||||
# the file is required for `timeline_create` with `existing_initdb_timeline_id`.
|
||||
#
|
||||
# TODO: switch to Path.walk() in Python 3.12
|
||||
# for dirpath, _dirnames, filenames in (repo_dir / "local_fs_remote_storage").walk():
|
||||
for dirpath, _dirnames, filenames in os.walk(repo_dir / "local_fs_remote_storage"):
|
||||
for filename in filenames:
|
||||
if filename != "initdb-preserved.tar.zst" and filename != "initdb.tar.zst":
|
||||
if filename != "initdb.tar.zst":
|
||||
(Path(dirpath) / filename).unlink()
|
||||
|
||||
timeline_delete_wait_completed(pageserver_http, tenant_id, timeline_id)
|
||||
|
||||
@@ -1,34 +0,0 @@
|
||||
import time
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.pg_version import PgVersion
|
||||
|
||||
|
||||
def test_neon_superuser(neon_simple_env: NeonEnv, pg_version: PgVersion):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_neon_superuser", "empty")
|
||||
endpoint = env.endpoints.create("test_neon_superuser")
|
||||
endpoint.respec(skip_pg_catalog_updates=False, features=["migrations"])
|
||||
endpoint.start()
|
||||
|
||||
time.sleep(1) # Sleep to let migrations run
|
||||
|
||||
with endpoint.cursor() as cur:
|
||||
cur.execute(
|
||||
"CREATE ROLE mr_whiskers WITH PASSWORD 'cat' LOGIN INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser"
|
||||
)
|
||||
cur.execute("CREATE DATABASE neondb WITH OWNER mr_whiskers")
|
||||
cur.execute("GRANT ALL PRIVILEGES ON DATABASE neondb TO neon_superuser")
|
||||
|
||||
with endpoint.cursor(dbname="neondb", user="mr_whiskers", password="cat") as cur:
|
||||
cur.execute("SELECT pg_has_role('mr_whiskers', 'neon_superuser', 'member')")
|
||||
assert cur.fetchall()[0][0]
|
||||
cur.execute("SELECT pg_has_role('mr_whiskers', 'neon_superuser', 'usage')")
|
||||
assert cur.fetchall()[0][0]
|
||||
|
||||
if pg_version == PgVersion.V16:
|
||||
cur.execute("SELECT pg_has_role('mr_whiskers', 'neon_superuser', 'set')")
|
||||
assert cur.fetchall()[0][0]
|
||||
|
||||
cur.execute("CREATE PUBLICATION pub FOR ALL TABLES")
|
||||
cur.execute("CREATE ROLE definitely_not_a_superuser WITH PASSWORD 'nope'")
|
||||
@@ -953,10 +953,6 @@ def test_timeline_logical_size_task_priority(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
endpoint.stop()
|
||||
env.pageserver.tenant_detach(tenant_id)
|
||||
env.pageserver.allowed_errors.append(
|
||||
# tenant detach causes this because the underlying attach-hook removes the tenant from attachment_service entirely
|
||||
".*Dropped remote consistent LSN updates.*",
|
||||
)
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start(
|
||||
extra_env_vars={
|
||||
|
||||
@@ -137,9 +137,6 @@ def test_wal_restore_http(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
ps_client = env.pageserver.http_client()
|
||||
|
||||
# Mark the initdb archive for preservation
|
||||
ps_client.timeline_preserve_initdb_archive(tenant_id, timeline_id)
|
||||
|
||||
# shut down the endpoint and delete the timeline from the pageserver
|
||||
endpoint.stop()
|
||||
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 11e970fe2b...8207291128
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 731b4d1609...c1c2272f43
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: cf302768b2...7be4a52d72
6
vendor/revisions.json
vendored
6
vendor/revisions.json
vendored
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"postgres-v16": "cf302768b2890569956641e0e5ba112ae1445351",
|
||||
"postgres-v15": "731b4d1609d6db1c953755810a41e0e67ea3db7b",
|
||||
"postgres-v14": "11e970fe2be56804f0a786ec5fc8141ffefa4ca7"
|
||||
"postgres-v16": "7be4a52d728459b79b59343c57d338c3073059c8",
|
||||
"postgres-v15": "c1c2272f436ed9231f6172f49de219fe71a9280d",
|
||||
"postgres-v14": "82072911287cabb32018cf92c8425fa1c744def4"
|
||||
}
|
||||
|
||||
@@ -29,8 +29,10 @@ chrono = { version = "0.4", default-features = false, features = ["clock", "serd
|
||||
clap = { version = "4", features = ["derive", "string"] }
|
||||
clap_builder = { version = "4", default-features = false, features = ["color", "help", "std", "string", "suggestions", "usage"] }
|
||||
crossbeam-utils = { version = "0.8" }
|
||||
dashmap = { version = "5", default-features = false, features = ["raw-api"] }
|
||||
either = { version = "1" }
|
||||
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
|
||||
futures = { version = "0.3" }
|
||||
futures-channel = { version = "0.3", features = ["sink"] }
|
||||
futures-core = { version = "0.3" }
|
||||
futures-executor = { version = "0.3" }
|
||||
@@ -72,7 +74,6 @@ tokio-rustls = { version = "0.24" }
|
||||
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }
|
||||
toml_datetime = { version = "0.6", default-features = false, features = ["serde"] }
|
||||
toml_edit = { version = "0.19", features = ["serde"] }
|
||||
tonic = { version = "0.9", features = ["tls-roots"] }
|
||||
tower = { version = "0.4", default-features = false, features = ["balance", "buffer", "limit", "log", "timeout", "util"] }
|
||||
tracing = { version = "0.1", features = ["log"] }
|
||||
tracing-core = { version = "0.1" }
|
||||
|
||||
Reference in New Issue
Block a user