mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 06:30:43 +00:00
Compare commits
1 Commits
erik/prod-
...
erik/batch
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4308ffe5c0 |
229
Cargo.lock
generated
229
Cargo.lock
generated
@@ -46,15 +46,6 @@ dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "aligned-vec"
|
||||
version = "0.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7e0966165eaf052580bd70eb1b32cb3d6245774c0104d1b2793e9650bf83b52a"
|
||||
dependencies = [
|
||||
"equator",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "allocator-api2"
|
||||
version = "0.2.16"
|
||||
@@ -155,12 +146,6 @@ dependencies = [
|
||||
"static_assertions",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "arrayvec"
|
||||
version = "0.7.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
|
||||
|
||||
[[package]]
|
||||
name = "asn1-rs"
|
||||
version = "0.6.2"
|
||||
@@ -757,7 +742,7 @@ dependencies = [
|
||||
"once_cell",
|
||||
"paste",
|
||||
"pin-project",
|
||||
"quick-xml 0.31.0",
|
||||
"quick-xml",
|
||||
"rand 0.8.5",
|
||||
"reqwest 0.11.19",
|
||||
"rustc_version",
|
||||
@@ -1009,9 +994,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
|
||||
|
||||
[[package]]
|
||||
name = "bytes"
|
||||
version = "1.5.0"
|
||||
version = "1.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223"
|
||||
checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
@@ -1396,15 +1381,6 @@ version = "0.8.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa"
|
||||
|
||||
[[package]]
|
||||
name = "cpp_demangle"
|
||||
version = "0.4.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "96e58d342ad113c2b878f16d5d034c03be492ae460cdbc02b7f0f2284d310c7d"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cpufeatures"
|
||||
version = "0.2.9"
|
||||
@@ -1928,26 +1904,6 @@ dependencies = [
|
||||
"termcolor",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "equator"
|
||||
version = "0.2.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c35da53b5a021d2484a7cc49b2ac7f2d840f8236a286f84202369bd338d761ea"
|
||||
dependencies = [
|
||||
"equator-macro",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "equator-macro"
|
||||
version = "0.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3bf679796c0322556351f287a51b49e48f7c4986e727b5dd78c972d30e2e16cc"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.52",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "equivalent"
|
||||
version = "1.0.1"
|
||||
@@ -2055,18 +2011,6 @@ dependencies = [
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "findshlibs"
|
||||
version = "0.10.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "40b9e59cd0f7e0806cca4be089683ecb6434e602038df21fe6bf6711b2f07f64"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"lazy_static",
|
||||
"libc",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fixedbitset"
|
||||
version = "0.4.2"
|
||||
@@ -2770,24 +2714,6 @@ version = "0.2.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "64e9829a50b42bb782c1df523f78d332fe371b10c661e78b7a3c34b0198e9fac"
|
||||
|
||||
[[package]]
|
||||
name = "inferno"
|
||||
version = "0.11.21"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "232929e1d75fe899576a3d5c7416ad0d88dbfbb3c3d6aa00873a7408a50ddb88"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"indexmap 2.0.1",
|
||||
"is-terminal",
|
||||
"itoa",
|
||||
"log",
|
||||
"num-format",
|
||||
"once_cell",
|
||||
"quick-xml 0.26.0",
|
||||
"rgb",
|
||||
"str_stack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "inotify"
|
||||
version = "0.9.6"
|
||||
@@ -3127,15 +3053,6 @@ version = "2.6.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167"
|
||||
|
||||
[[package]]
|
||||
name = "memmap2"
|
||||
version = "0.9.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "45fd3a57831bf88bc63f8cebc0cf956116276e97fef3966103e96416209f7c92"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "memoffset"
|
||||
version = "0.7.1"
|
||||
@@ -3361,16 +3278,6 @@ version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
|
||||
|
||||
[[package]]
|
||||
name = "num-format"
|
||||
version = "0.4.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a652d9771a63711fd3c3deb670acfbe5c30a4072e664d7a3bf5a9e1056ac72c3"
|
||||
dependencies = [
|
||||
"arrayvec",
|
||||
"itoa",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-integer"
|
||||
version = "0.1.45"
|
||||
@@ -4102,7 +4009,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres"
|
||||
version = "0.19.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
@@ -4115,7 +4022,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres-protocol"
|
||||
version = "0.6.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
|
||||
dependencies = [
|
||||
"base64 0.20.0",
|
||||
"byteorder",
|
||||
@@ -4134,7 +4041,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres-types"
|
||||
version = "0.2.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
@@ -4201,31 +4108,6 @@ version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
|
||||
|
||||
[[package]]
|
||||
name = "pprof"
|
||||
version = "0.14.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ebbe2f8898beba44815fdc9e5a4ae9c929e21c5dc29b0c774a15555f7f58d6d0"
|
||||
dependencies = [
|
||||
"aligned-vec",
|
||||
"backtrace",
|
||||
"cfg-if",
|
||||
"criterion",
|
||||
"findshlibs",
|
||||
"inferno",
|
||||
"libc",
|
||||
"log",
|
||||
"nix 0.26.4",
|
||||
"once_cell",
|
||||
"parking_lot 0.12.1",
|
||||
"protobuf",
|
||||
"protobuf-codegen-pure",
|
||||
"smallvec",
|
||||
"symbolic-demangle",
|
||||
"tempfile",
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ppv-lite86"
|
||||
version = "0.2.17"
|
||||
@@ -4378,31 +4260,6 @@ dependencies = [
|
||||
"prost",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "protobuf"
|
||||
version = "2.28.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94"
|
||||
|
||||
[[package]]
|
||||
name = "protobuf-codegen"
|
||||
version = "2.28.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "033460afb75cf755fcfc16dfaed20b86468082a2ea24e05ac35ab4a099a017d6"
|
||||
dependencies = [
|
||||
"protobuf",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "protobuf-codegen-pure"
|
||||
version = "2.28.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "95a29399fc94bcd3eeaa951c715f7bea69409b2445356b00519740bcd6ddd865"
|
||||
dependencies = [
|
||||
"protobuf",
|
||||
"protobuf-codegen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proxy"
|
||||
version = "0.1.0"
|
||||
@@ -4514,15 +4371,6 @@ dependencies = [
|
||||
"zerocopy",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quick-xml"
|
||||
version = "0.26.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7f50b1c63b38611e7d4d7f68b82d3ad0cc71a2ad2e7f61fc10f1328d917c93cd"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quick-xml"
|
||||
version = "0.31.0"
|
||||
@@ -5005,15 +4853,6 @@ dependencies = [
|
||||
"subtle",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rgb"
|
||||
version = "0.8.50"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "57397d16646700483b67d2dd6511d79318f9d057fdbd21a4066aeac8b41d310a"
|
||||
dependencies = [
|
||||
"bytemuck",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ring"
|
||||
version = "0.17.6"
|
||||
@@ -5327,7 +5166,6 @@ dependencies = [
|
||||
"postgres-protocol",
|
||||
"postgres_backend",
|
||||
"postgres_ffi",
|
||||
"pprof",
|
||||
"pq_proto",
|
||||
"rand 0.8.5",
|
||||
"regex",
|
||||
@@ -5825,9 +5663,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "smallvec"
|
||||
version = "1.13.2"
|
||||
version = "1.13.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67"
|
||||
checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7"
|
||||
|
||||
[[package]]
|
||||
name = "smol_str"
|
||||
@@ -5874,12 +5712,6 @@ dependencies = [
|
||||
"der 0.7.8",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "stable_deref_trait"
|
||||
version = "1.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
|
||||
|
||||
[[package]]
|
||||
name = "static_assertions"
|
||||
version = "1.1.0"
|
||||
@@ -6026,12 +5858,6 @@ dependencies = [
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "str_stack"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9091b6114800a5f2141aee1d1b9d6ca3592ac062dc5decb3764ec5895a47b4eb"
|
||||
|
||||
[[package]]
|
||||
name = "stringprep"
|
||||
version = "0.1.2"
|
||||
@@ -6079,29 +5905,6 @@ version = "0.4.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "20e16a0f46cf5fd675563ef54f26e83e20f2366bcf027bcb3cc3ed2b98aaf2ca"
|
||||
|
||||
[[package]]
|
||||
name = "symbolic-common"
|
||||
version = "12.12.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "366f1b4c6baf6cfefc234bbd4899535fca0b06c74443039a73f6dfb2fad88d77"
|
||||
dependencies = [
|
||||
"debugid",
|
||||
"memmap2",
|
||||
"stable_deref_trait",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "symbolic-demangle"
|
||||
version = "12.12.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "aba05ba5b9962ea5617baf556293720a8b2d0a282aa14ee4bf10e22efc7da8c8"
|
||||
dependencies = [
|
||||
"cpp_demangle",
|
||||
"rustc-demangle",
|
||||
"symbolic-common",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "syn"
|
||||
version = "1.0.109"
|
||||
@@ -6271,9 +6074,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tikv-jemalloc-ctl"
|
||||
version = "0.6.0"
|
||||
version = "0.5.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f21f216790c8df74ce3ab25b534e0718da5a1916719771d3fec23315c99e468b"
|
||||
checksum = "619bfed27d807b54f7f776b9430d4f8060e66ee138a28632ca898584d462c31c"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"paste",
|
||||
@@ -6282,9 +6085,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tikv-jemalloc-sys"
|
||||
version = "0.6.0+5.3.0-1-ge13ca993e8ccb9ba9847cc330696e02839f328f7"
|
||||
version = "0.5.4+5.3.0-patched"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cd3c60906412afa9c2b5b5a48ca6a5abe5736aec9eb48ad05037a677e52e4e2d"
|
||||
checksum = "9402443cb8fd499b6f327e40565234ff34dbda27460c5b47db0db77443dd85d1"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
@@ -6292,9 +6095,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tikv-jemallocator"
|
||||
version = "0.6.0"
|
||||
version = "0.5.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4cec5ff18518d81584f477e9bfdf957f5bb0979b0bac3af4ca30b5b3ae2d2865"
|
||||
checksum = "965fe0c26be5c56c94e38ba547249074803efd52adfb66de62107d95aab3eaca"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"tikv-jemalloc-sys",
|
||||
@@ -6424,7 +6227,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "tokio-postgres"
|
||||
version = "0.7.7"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"byteorder",
|
||||
@@ -6969,7 +6772,6 @@ dependencies = [
|
||||
"once_cell",
|
||||
"pin-project-lite",
|
||||
"postgres_connection",
|
||||
"pprof",
|
||||
"pq_proto",
|
||||
"rand 0.8.5",
|
||||
"regex",
|
||||
@@ -7538,7 +7340,6 @@ dependencies = [
|
||||
"libc",
|
||||
"log",
|
||||
"memchr",
|
||||
"nix 0.26.4",
|
||||
"nom",
|
||||
"num-bigint",
|
||||
"num-integer",
|
||||
|
||||
26
Cargo.toml
26
Cargo.toml
@@ -130,7 +130,6 @@ parquet = { version = "53", default-features = false, features = ["zstd"] }
|
||||
parquet_derive = "53"
|
||||
pbkdf2 = { version = "0.12.1", features = ["simple", "std"] }
|
||||
pin-project-lite = "0.2"
|
||||
pprof = { version = "0.14", features = ["criterion", "flamegraph", "protobuf", "protobuf-codec"] }
|
||||
procfs = "0.16"
|
||||
prometheus = {version = "0.13", default-features=false, features = ["process"]} # removes protobuf dependency
|
||||
prost = "0.13"
|
||||
@@ -169,8 +168,8 @@ sync_wrapper = "0.1.2"
|
||||
tar = "0.4"
|
||||
test-context = "0.3"
|
||||
thiserror = "1.0"
|
||||
tikv-jemallocator = { version = "0.6", features = ["stats"] }
|
||||
tikv-jemalloc-ctl = { version = "0.6", features = ["stats"] }
|
||||
tikv-jemallocator = "0.5"
|
||||
tikv-jemalloc-ctl = "0.5"
|
||||
tokio = { version = "1.17", features = ["macros"] }
|
||||
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
|
||||
tokio-io-timeout = "1.2.0"
|
||||
@@ -204,10 +203,21 @@ env_logger = "0.10"
|
||||
log = "0.4"
|
||||
|
||||
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
|
||||
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
|
||||
|
||||
# We want to use the 'neon' branch for these, but there's currently one
|
||||
# incompatible change on the branch. See:
|
||||
#
|
||||
# - PR #8076 which contained changes that depended on the new changes in
|
||||
# the rust-postgres crate, and
|
||||
# - PR #8654 which reverted those changes and made the code in proxy incompatible
|
||||
# with the tip of the 'neon' branch again.
|
||||
#
|
||||
# When those proxy changes are re-applied (see PR #8747), we can switch using
|
||||
# the tip of the 'neon' branch again.
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
|
||||
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
|
||||
|
||||
## Local libraries
|
||||
compute_api = { version = "0.1", path = "./libs/compute_api/" }
|
||||
@@ -245,7 +255,7 @@ tonic-build = "0.12"
|
||||
[patch.crates-io]
|
||||
|
||||
# Needed to get `tokio-postgres-rustls` to depend on our fork.
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
|
||||
|
||||
################# Binary contents sections
|
||||
|
||||
|
||||
@@ -6,7 +6,6 @@
|
||||
import 'sql_exporter/compute_backpressure_throttling_seconds.libsonnet',
|
||||
import 'sql_exporter/compute_current_lsn.libsonnet',
|
||||
import 'sql_exporter/compute_logical_snapshot_files.libsonnet',
|
||||
import 'sql_exporter/compute_max_connections.libsonnet',
|
||||
import 'sql_exporter/compute_receive_lsn.libsonnet',
|
||||
import 'sql_exporter/compute_subscriptions_count.libsonnet',
|
||||
import 'sql_exporter/connection_counts.libsonnet',
|
||||
|
||||
@@ -1,10 +0,0 @@
|
||||
{
|
||||
metric_name: 'compute_max_connections',
|
||||
type: 'gauge',
|
||||
help: 'Max connections allowed for Postgres',
|
||||
key_labels: null,
|
||||
values: [
|
||||
'max_connections',
|
||||
],
|
||||
query: importstr 'sql_exporter/compute_max_connections.sql',
|
||||
}
|
||||
@@ -1 +0,0 @@
|
||||
SELECT current_setting('max_connections') as max_connections;
|
||||
@@ -1,45 +1,3 @@
|
||||
commit 00aa659afc9c7336ab81036edec3017168aabf40
|
||||
Author: Heikki Linnakangas <heikki@neon.tech>
|
||||
Date: Tue Nov 12 16:59:19 2024 +0200
|
||||
|
||||
Temporarily disable test that depends on timezone
|
||||
|
||||
diff --git a/tests/expected/generalization.out b/tests/expected/generalization.out
|
||||
index 23ef5fa..9e60deb 100644
|
||||
--- a/ext-src/pg_anon-src/tests/expected/generalization.out
|
||||
+++ b/ext-src/pg_anon-src/tests/expected/generalization.out
|
||||
@@ -284,12 +284,9 @@ SELECT anon.generalize_tstzrange('19041107','century');
|
||||
["Tue Jan 01 00:00:00 1901 PST","Mon Jan 01 00:00:00 2001 PST")
|
||||
(1 row)
|
||||
|
||||
-SELECT anon.generalize_tstzrange('19041107','millennium');
|
||||
- generalize_tstzrange
|
||||
------------------------------------------------------------------
|
||||
- ["Thu Jan 01 00:00:00 1001 PST","Mon Jan 01 00:00:00 2001 PST")
|
||||
-(1 row)
|
||||
-
|
||||
+-- temporarily disabled, see:
|
||||
+-- https://gitlab.com/dalibo/postgresql_anonymizer/-/commit/199f0a392b37c59d92ae441fb8f037e094a11a52#note_2148017485
|
||||
+--SELECT anon.generalize_tstzrange('19041107','millennium');
|
||||
-- generalize_daterange
|
||||
SELECT anon.generalize_daterange('19041107');
|
||||
generalize_daterange
|
||||
diff --git a/tests/sql/generalization.sql b/tests/sql/generalization.sql
|
||||
index b868344..b4fc977 100644
|
||||
--- a/ext-src/pg_anon-src/tests/sql/generalization.sql
|
||||
+++ b/ext-src/pg_anon-src/tests/sql/generalization.sql
|
||||
@@ -61,7 +61,9 @@ SELECT anon.generalize_tstzrange('19041107','month');
|
||||
SELECT anon.generalize_tstzrange('19041107','year');
|
||||
SELECT anon.generalize_tstzrange('19041107','decade');
|
||||
SELECT anon.generalize_tstzrange('19041107','century');
|
||||
-SELECT anon.generalize_tstzrange('19041107','millennium');
|
||||
+-- temporarily disabled, see:
|
||||
+-- https://gitlab.com/dalibo/postgresql_anonymizer/-/commit/199f0a392b37c59d92ae441fb8f037e094a11a52#note_2148017485
|
||||
+--SELECT anon.generalize_tstzrange('19041107','millennium');
|
||||
|
||||
-- generalize_daterange
|
||||
SELECT anon.generalize_daterange('19041107');
|
||||
|
||||
commit 7dd414ee75f2875cffb1d6ba474df1f135a6fc6f
|
||||
Author: Alexey Masterov <alexeymasterov@neon.tech>
|
||||
Date: Fri May 31 06:34:26 2024 +0000
|
||||
|
||||
@@ -37,7 +37,6 @@ allow = [
|
||||
"BSD-2-Clause",
|
||||
"BSD-3-Clause",
|
||||
"CC0-1.0",
|
||||
"CDDL-1.0",
|
||||
"ISC",
|
||||
"MIT",
|
||||
"MPL-2.0",
|
||||
|
||||
@@ -24,7 +24,7 @@ pub struct Key {
|
||||
|
||||
/// When working with large numbers of Keys in-memory, it is more efficient to handle them as i128 than as
|
||||
/// a struct of fields.
|
||||
#[derive(Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)]
|
||||
#[derive(Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd)]
|
||||
pub struct CompactKey(i128);
|
||||
|
||||
/// The storage key size.
|
||||
|
||||
@@ -41,11 +41,6 @@ pub enum NeonWalRecord {
|
||||
file_path: String,
|
||||
content: Option<Bytes>,
|
||||
},
|
||||
// Truncate visibility map page
|
||||
TruncateVisibilityMap {
|
||||
trunc_byte: usize,
|
||||
trunc_offs: usize,
|
||||
},
|
||||
|
||||
/// A testing record for unit testing purposes. It supports append data to an existing image, or clear it.
|
||||
#[cfg(feature = "testing")]
|
||||
|
||||
@@ -24,7 +24,7 @@ use postgres_ffi::Oid;
|
||||
// FIXME: should move 'forknum' as last field to keep this consistent with Postgres.
|
||||
// Then we could replace the custom Ord and PartialOrd implementations below with
|
||||
// deriving them. This will require changes in walredoproc.c.
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)]
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize)]
|
||||
pub struct RelTag {
|
||||
pub forknum: u8,
|
||||
pub spcnode: Oid,
|
||||
|
||||
@@ -243,11 +243,8 @@ const FSM_LEAF_NODES_PER_PAGE: usize = FSM_NODES_PER_PAGE - FSM_NON_LEAF_NODES_P
|
||||
pub const SLOTS_PER_FSM_PAGE: u32 = FSM_LEAF_NODES_PER_PAGE as u32;
|
||||
|
||||
/* From visibilitymap.c */
|
||||
|
||||
pub const VM_MAPSIZE: usize = BLCKSZ as usize - MAXALIGN_SIZE_OF_PAGE_HEADER_DATA;
|
||||
pub const VM_BITS_PER_HEAPBLOCK: usize = 2;
|
||||
pub const VM_HEAPBLOCKS_PER_BYTE: usize = 8 / VM_BITS_PER_HEAPBLOCK;
|
||||
pub const VM_HEAPBLOCKS_PER_PAGE: usize = VM_MAPSIZE * VM_HEAPBLOCKS_PER_BYTE;
|
||||
pub const VM_HEAPBLOCKS_PER_PAGE: u32 =
|
||||
(BLCKSZ as usize - SIZEOF_PAGE_HEADER_DATA) as u32 * (8 / 2); // MAPSIZE * (BITS_PER_BYTE / BITS_PER_HEAPBLOCK)
|
||||
|
||||
/* From origin.c */
|
||||
pub const REPLICATION_STATE_MAGIC: u32 = 0x1257DADE;
|
||||
|
||||
@@ -16,7 +16,7 @@ use utils::bin_ser::DeserializeError;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[derive(Debug)]
|
||||
pub struct XlMultiXactCreate {
|
||||
pub mid: MultiXactId,
|
||||
/* new MultiXact's ID */
|
||||
@@ -46,7 +46,7 @@ impl XlMultiXactCreate {
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[derive(Debug)]
|
||||
pub struct XlMultiXactTruncate {
|
||||
pub oldest_multi_db: Oid,
|
||||
/* to-be-truncated range of multixact offsets */
|
||||
@@ -72,7 +72,7 @@ impl XlMultiXactTruncate {
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[derive(Debug)]
|
||||
pub struct XlRelmapUpdate {
|
||||
pub dbid: Oid, /* database ID, or 0 for shared map */
|
||||
pub tsid: Oid, /* database's tablespace, or pg_global */
|
||||
@@ -90,7 +90,7 @@ impl XlRelmapUpdate {
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[derive(Debug)]
|
||||
pub struct XlReploriginDrop {
|
||||
pub node_id: RepOriginId,
|
||||
}
|
||||
@@ -104,7 +104,7 @@ impl XlReploriginDrop {
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[derive(Debug)]
|
||||
pub struct XlReploriginSet {
|
||||
pub remote_lsn: Lsn,
|
||||
pub node_id: RepOriginId,
|
||||
@@ -120,7 +120,7 @@ impl XlReploriginSet {
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct RelFileNode {
|
||||
pub spcnode: Oid, /* tablespace */
|
||||
pub dbnode: Oid, /* database */
|
||||
@@ -911,7 +911,7 @@ impl XlSmgrCreate {
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[derive(Debug)]
|
||||
pub struct XlSmgrTruncate {
|
||||
pub blkno: BlockNumber,
|
||||
pub rnode: RelFileNode,
|
||||
@@ -984,7 +984,7 @@ impl XlDropDatabase {
|
||||
/// xl_xact_parsed_abort structs in PostgreSQL, but we use the same
|
||||
/// struct for commits and aborts.
|
||||
///
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[derive(Debug)]
|
||||
pub struct XlXactParsedRecord {
|
||||
pub xid: TransactionId,
|
||||
pub info: u8,
|
||||
|
||||
@@ -15,9 +15,6 @@ pub enum DownloadError {
|
||||
///
|
||||
/// Concurrency control is not timed within timeout.
|
||||
Timeout,
|
||||
/// Some integrity/consistency check failed during download. This is used during
|
||||
/// timeline loads to cancel the load of a tenant if some timeline detects fatal corruption.
|
||||
Fatal(String),
|
||||
/// The file was found in the remote storage, but the download failed.
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
@@ -32,7 +29,6 @@ impl std::fmt::Display for DownloadError {
|
||||
DownloadError::Unmodified => write!(f, "File was not modified"),
|
||||
DownloadError::Cancelled => write!(f, "Cancelled, shutting down"),
|
||||
DownloadError::Timeout => write!(f, "timeout"),
|
||||
DownloadError::Fatal(why) => write!(f, "Fatal read error: {why}"),
|
||||
DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e:?}"),
|
||||
}
|
||||
}
|
||||
@@ -45,7 +41,7 @@ impl DownloadError {
|
||||
pub fn is_permanent(&self) -> bool {
|
||||
use DownloadError::*;
|
||||
match self {
|
||||
BadInput(_) | NotFound | Unmodified | Fatal(_) | Cancelled => true,
|
||||
BadInput(_) | NotFound | Unmodified | Cancelled => true,
|
||||
Timeout | Other(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,7 +29,6 @@ jsonwebtoken.workspace = true
|
||||
nix.workspace = true
|
||||
once_cell.workspace = true
|
||||
pin-project-lite.workspace = true
|
||||
pprof.workspace = true
|
||||
regex.workspace = true
|
||||
routerify.workspace = true
|
||||
serde.workspace = true
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
use crate::auth::{AuthError, Claims, SwappableJwtAuth};
|
||||
use crate::http::error::{api_error_handler, route_error_handler, ApiError};
|
||||
use crate::http::request::{get_query_param, parse_query_param};
|
||||
use anyhow::{anyhow, Context};
|
||||
use hyper::header::{HeaderName, AUTHORIZATION, CONTENT_DISPOSITION};
|
||||
use anyhow::Context;
|
||||
use hyper::header::{HeaderName, AUTHORIZATION};
|
||||
use hyper::http::HeaderValue;
|
||||
use hyper::Method;
|
||||
use hyper::{header::CONTENT_TYPE, Body, Request, Response};
|
||||
@@ -13,13 +12,11 @@ use routerify::{Middleware, RequestInfo, Router, RouterBuilder};
|
||||
use tracing::{debug, info, info_span, warn, Instrument};
|
||||
|
||||
use std::future::Future;
|
||||
use std::io::Write as _;
|
||||
use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use pprof::protos::Message as _;
|
||||
use tokio::sync::{mpsc, Mutex};
|
||||
use std::io::Write as _;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
|
||||
static SERVE_METRICS_COUNT: Lazy<IntCounter> = Lazy::new(|| {
|
||||
@@ -331,82 +328,6 @@ pub async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
/// Generates CPU profiles.
|
||||
pub async fn profile_cpu_handler(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
enum Format {
|
||||
Pprof,
|
||||
Svg,
|
||||
}
|
||||
|
||||
// Parameters.
|
||||
let format = match get_query_param(&req, "format")?.as_deref() {
|
||||
None => Format::Pprof,
|
||||
Some("pprof") => Format::Pprof,
|
||||
Some("svg") => Format::Svg,
|
||||
Some(format) => return Err(ApiError::BadRequest(anyhow!("invalid format {format}"))),
|
||||
};
|
||||
let seconds = match parse_query_param(&req, "seconds")? {
|
||||
None => 5,
|
||||
Some(seconds @ 1..=30) => seconds,
|
||||
Some(_) => return Err(ApiError::BadRequest(anyhow!("duration must be 1-30 secs"))),
|
||||
};
|
||||
let frequency_hz = match parse_query_param(&req, "frequency")? {
|
||||
None => 99,
|
||||
Some(1001..) => return Err(ApiError::BadRequest(anyhow!("frequency must be <=1000 Hz"))),
|
||||
Some(frequency) => frequency,
|
||||
};
|
||||
|
||||
// Only allow one profiler at a time.
|
||||
static PROFILE_LOCK: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
|
||||
let _lock = PROFILE_LOCK
|
||||
.try_lock()
|
||||
.map_err(|_| ApiError::Conflict("profiler already running".into()))?;
|
||||
|
||||
// Take the profile.
|
||||
let report = tokio::task::spawn_blocking(move || {
|
||||
let guard = pprof::ProfilerGuardBuilder::default()
|
||||
.frequency(frequency_hz)
|
||||
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
|
||||
.build()?;
|
||||
std::thread::sleep(Duration::from_secs(seconds));
|
||||
guard.report().build()
|
||||
})
|
||||
.await
|
||||
.map_err(|join_err| ApiError::InternalServerError(join_err.into()))?
|
||||
.map_err(|pprof_err| ApiError::InternalServerError(pprof_err.into()))?;
|
||||
|
||||
// Return the report in the requested format.
|
||||
match format {
|
||||
Format::Pprof => {
|
||||
let mut body = Vec::new();
|
||||
report
|
||||
.pprof()
|
||||
.map_err(|err| ApiError::InternalServerError(err.into()))?
|
||||
.write_to_vec(&mut body)
|
||||
.map_err(|err| ApiError::InternalServerError(err.into()))?;
|
||||
|
||||
Response::builder()
|
||||
.status(200)
|
||||
.header(CONTENT_TYPE, "application/octet-stream")
|
||||
.header(CONTENT_DISPOSITION, "attachment; filename=\"profile.pb\"")
|
||||
.body(Body::from(body))
|
||||
.map_err(|err| ApiError::InternalServerError(err.into()))
|
||||
}
|
||||
|
||||
Format::Svg => {
|
||||
let mut body = Vec::new();
|
||||
report
|
||||
.flamegraph(&mut body)
|
||||
.map_err(|err| ApiError::InternalServerError(err.into()))?;
|
||||
Response::builder()
|
||||
.status(200)
|
||||
.header(CONTENT_TYPE, "image/svg+xml")
|
||||
.body(Body::from(body))
|
||||
.map_err(|err| ApiError::InternalServerError(err.into()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_request_id_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
|
||||
) -> Middleware<B, ApiError> {
|
||||
Middleware::pre(move |req| async move {
|
||||
|
||||
@@ -30,7 +30,7 @@ pub fn parse_request_param<T: FromStr>(
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_query_param<'a>(
|
||||
fn get_query_param<'a>(
|
||||
request: &'a Request<Body>,
|
||||
param_name: &str,
|
||||
) -> Result<Option<Cow<'a, str>>, ApiError> {
|
||||
|
||||
@@ -19,7 +19,7 @@ impl InterpretedWalRecord {
|
||||
pub fn from_bytes_filtered(
|
||||
buf: Bytes,
|
||||
shard: &ShardIdentity,
|
||||
next_record_lsn: Lsn,
|
||||
record_end_lsn: Lsn,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<InterpretedWalRecord> {
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
@@ -32,18 +32,18 @@ impl InterpretedWalRecord {
|
||||
FlushUncommittedRecords::No
|
||||
};
|
||||
|
||||
let metadata_record = MetadataRecord::from_decoded(&decoded, next_record_lsn, pg_version)?;
|
||||
let metadata_record = MetadataRecord::from_decoded(&decoded, record_end_lsn, pg_version)?;
|
||||
let batch = SerializedValueBatch::from_decoded_filtered(
|
||||
decoded,
|
||||
shard,
|
||||
next_record_lsn,
|
||||
record_end_lsn,
|
||||
pg_version,
|
||||
)?;
|
||||
|
||||
Ok(InterpretedWalRecord {
|
||||
metadata_record,
|
||||
batch,
|
||||
next_record_lsn,
|
||||
end_lsn: record_end_lsn,
|
||||
flush_uncommitted,
|
||||
xid,
|
||||
})
|
||||
@@ -53,7 +53,7 @@ impl InterpretedWalRecord {
|
||||
impl MetadataRecord {
|
||||
fn from_decoded(
|
||||
decoded: &DecodedWALRecord,
|
||||
next_record_lsn: Lsn,
|
||||
record_end_lsn: Lsn,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<Option<MetadataRecord>> {
|
||||
// Note: this doesn't actually copy the bytes since
|
||||
@@ -74,9 +74,7 @@ impl MetadataRecord {
|
||||
Ok(None)
|
||||
}
|
||||
pg_constants::RM_CLOG_ID => Self::decode_clog_record(&mut buf, decoded, pg_version),
|
||||
pg_constants::RM_XACT_ID => {
|
||||
Self::decode_xact_record(&mut buf, decoded, next_record_lsn)
|
||||
}
|
||||
pg_constants::RM_XACT_ID => Self::decode_xact_record(&mut buf, decoded, record_end_lsn),
|
||||
pg_constants::RM_MULTIXACT_ID => {
|
||||
Self::decode_multixact_record(&mut buf, decoded, pg_version)
|
||||
}
|
||||
@@ -88,9 +86,7 @@ impl MetadataRecord {
|
||||
//
|
||||
// Alternatively, one can make the checkpoint part of the subscription protocol
|
||||
// to the pageserver. This should work fine, but can be done at a later point.
|
||||
pg_constants::RM_XLOG_ID => {
|
||||
Self::decode_xlog_record(&mut buf, decoded, next_record_lsn)
|
||||
}
|
||||
pg_constants::RM_XLOG_ID => Self::decode_xlog_record(&mut buf, decoded, record_end_lsn),
|
||||
pg_constants::RM_LOGICALMSG_ID => {
|
||||
Self::decode_logical_message_record(&mut buf, decoded)
|
||||
}
|
||||
|
||||
@@ -32,19 +32,16 @@ use postgres_ffi::walrecord::{
|
||||
XlSmgrTruncate, XlXactParsedRecord,
|
||||
};
|
||||
use postgres_ffi::{Oid, TransactionId};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::serialized_batch::SerializedValueBatch;
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum FlushUncommittedRecords {
|
||||
Yes,
|
||||
No,
|
||||
}
|
||||
|
||||
/// An interpreted Postgres WAL record, ready to be handled by the pageserver
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct InterpretedWalRecord {
|
||||
/// Optional metadata record - may cause writes to metadata keys
|
||||
/// in the storage engine
|
||||
@@ -52,10 +49,8 @@ pub struct InterpretedWalRecord {
|
||||
/// A pre-serialized batch along with the required metadata for ingestion
|
||||
/// by the pageserver
|
||||
pub batch: SerializedValueBatch,
|
||||
/// Byte offset within WAL for the start of the next PG WAL record.
|
||||
/// Usually this is the end LSN of the current record, but in case of
|
||||
/// XLOG SWITCH records it will be within the next segment.
|
||||
pub next_record_lsn: Lsn,
|
||||
/// Byte offset within WAL for the end of the original PG WAL record
|
||||
pub end_lsn: Lsn,
|
||||
/// Whether to flush all uncommitted modifications to the storage engine
|
||||
/// before ingesting this record. This is currently only used for legacy PG
|
||||
/// database creations which read pages from a template database. Such WAL
|
||||
@@ -67,7 +62,6 @@ pub struct InterpretedWalRecord {
|
||||
|
||||
/// The interpreted part of the Postgres WAL record which requires metadata
|
||||
/// writes to the underlying storage engine.
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum MetadataRecord {
|
||||
Heapam(HeapamRecord),
|
||||
Neonrmgr(NeonrmgrRecord),
|
||||
@@ -83,12 +77,10 @@ pub enum MetadataRecord {
|
||||
Replorigin(ReploriginRecord),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum HeapamRecord {
|
||||
ClearVmBits(ClearVmBits),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct ClearVmBits {
|
||||
pub new_heap_blkno: Option<u32>,
|
||||
pub old_heap_blkno: Option<u32>,
|
||||
@@ -96,29 +88,24 @@ pub struct ClearVmBits {
|
||||
pub flags: u8,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum NeonrmgrRecord {
|
||||
ClearVmBits(ClearVmBits),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum SmgrRecord {
|
||||
Create(SmgrCreate),
|
||||
Truncate(XlSmgrTruncate),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct SmgrCreate {
|
||||
pub rel: RelTag,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum DbaseRecord {
|
||||
Create(DbaseCreate),
|
||||
Drop(DbaseDrop),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct DbaseCreate {
|
||||
pub db_id: Oid,
|
||||
pub tablespace_id: Oid,
|
||||
@@ -126,32 +113,27 @@ pub struct DbaseCreate {
|
||||
pub src_tablespace_id: Oid,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct DbaseDrop {
|
||||
pub db_id: Oid,
|
||||
pub tablespace_ids: Vec<Oid>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum ClogRecord {
|
||||
ZeroPage(ClogZeroPage),
|
||||
Truncate(ClogTruncate),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct ClogZeroPage {
|
||||
pub segno: u32,
|
||||
pub rpageno: u32,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct ClogTruncate {
|
||||
pub pageno: u32,
|
||||
pub oldest_xid: TransactionId,
|
||||
pub oldest_xid_db: Oid,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum XactRecord {
|
||||
Commit(XactCommon),
|
||||
Abort(XactCommon),
|
||||
@@ -160,7 +142,6 @@ pub enum XactRecord {
|
||||
Prepare(XactPrepare),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct XactCommon {
|
||||
pub parsed: XlXactParsedRecord,
|
||||
pub origin_id: u16,
|
||||
@@ -169,73 +150,61 @@ pub struct XactCommon {
|
||||
pub lsn: Lsn,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct XactPrepare {
|
||||
pub xl_xid: TransactionId,
|
||||
pub data: Bytes,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum MultiXactRecord {
|
||||
ZeroPage(MultiXactZeroPage),
|
||||
Create(XlMultiXactCreate),
|
||||
Truncate(XlMultiXactTruncate),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct MultiXactZeroPage {
|
||||
pub slru_kind: SlruKind,
|
||||
pub segno: u32,
|
||||
pub rpageno: u32,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum RelmapRecord {
|
||||
Update(RelmapUpdate),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct RelmapUpdate {
|
||||
pub update: XlRelmapUpdate,
|
||||
pub buf: Bytes,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum XlogRecord {
|
||||
Raw(RawXlogRecord),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct RawXlogRecord {
|
||||
pub info: u8,
|
||||
pub lsn: Lsn,
|
||||
pub buf: Bytes,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum LogicalMessageRecord {
|
||||
Put(PutLogicalMessage),
|
||||
#[cfg(feature = "testing")]
|
||||
Failpoint,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct PutLogicalMessage {
|
||||
pub path: String,
|
||||
pub buf: Bytes,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum StandbyRecord {
|
||||
RunningXacts(StandbyRunningXacts),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct StandbyRunningXacts {
|
||||
pub oldest_running_xid: TransactionId,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum ReploriginRecord {
|
||||
Set(XlReploriginSet),
|
||||
Drop(XlReploriginDrop),
|
||||
|
||||
@@ -16,7 +16,6 @@ use pageserver_api::shard::ShardIdentity;
|
||||
use pageserver_api::{key::CompactKey, value::Value};
|
||||
use postgres_ffi::walrecord::{DecodedBkpBlock, DecodedWALRecord};
|
||||
use postgres_ffi::{page_is_new, page_set_lsn, pg_constants, BLCKSZ};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -30,7 +29,6 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
|
||||
/// relation sizes. In the case of "observed" values, we only need to know
|
||||
/// the key and LSN, so two types of metadata are supported to save on network
|
||||
/// bandwidth.
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum ValueMeta {
|
||||
Serialized(SerializedValueMeta),
|
||||
Observed(ObservedValueMeta),
|
||||
@@ -77,7 +75,6 @@ impl PartialEq for OrderedValueMeta {
|
||||
impl Eq for OrderedValueMeta {}
|
||||
|
||||
/// Metadata for a [`Value`] serialized into the batch.
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct SerializedValueMeta {
|
||||
pub key: CompactKey,
|
||||
pub lsn: Lsn,
|
||||
@@ -89,14 +86,12 @@ pub struct SerializedValueMeta {
|
||||
}
|
||||
|
||||
/// Metadata for a [`Value`] observed by the batch
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct ObservedValueMeta {
|
||||
pub key: CompactKey,
|
||||
pub lsn: Lsn,
|
||||
}
|
||||
|
||||
/// Batch of serialized [`Value`]s.
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct SerializedValueBatch {
|
||||
/// [`Value`]s serialized in EphemeralFile's native format,
|
||||
/// ready for disk write by the pageserver
|
||||
@@ -137,7 +132,7 @@ impl SerializedValueBatch {
|
||||
pub(crate) fn from_decoded_filtered(
|
||||
decoded: DecodedWALRecord,
|
||||
shard: &ShardIdentity,
|
||||
next_record_lsn: Lsn,
|
||||
record_end_lsn: Lsn,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<SerializedValueBatch> {
|
||||
// First determine how big the buffer needs to be and allocate it up-front.
|
||||
@@ -161,17 +156,13 @@ impl SerializedValueBatch {
|
||||
let key = rel_block_to_key(rel, blk.blkno);
|
||||
|
||||
if !key.is_valid_key_on_write_path() {
|
||||
anyhow::bail!(
|
||||
"Unsupported key decoded at LSN {}: {}",
|
||||
next_record_lsn,
|
||||
key
|
||||
);
|
||||
anyhow::bail!("Unsupported key decoded at LSN {}: {}", record_end_lsn, key);
|
||||
}
|
||||
|
||||
let key_is_local = shard.is_key_local(&key);
|
||||
|
||||
tracing::debug!(
|
||||
lsn=%next_record_lsn,
|
||||
lsn=%record_end_lsn,
|
||||
key=%key,
|
||||
"ingest: shard decision {}",
|
||||
if !key_is_local { "drop" } else { "keep" },
|
||||
@@ -183,7 +174,7 @@ impl SerializedValueBatch {
|
||||
// its blkno in case it implicitly extends a relation.
|
||||
metadata.push(ValueMeta::Observed(ObservedValueMeta {
|
||||
key: key.to_compact(),
|
||||
lsn: next_record_lsn,
|
||||
lsn: record_end_lsn,
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -214,7 +205,7 @@ impl SerializedValueBatch {
|
||||
// that would corrupt the page.
|
||||
//
|
||||
if !page_is_new(&image) {
|
||||
page_set_lsn(&mut image, next_record_lsn)
|
||||
page_set_lsn(&mut image, record_end_lsn)
|
||||
}
|
||||
assert_eq!(image.len(), BLCKSZ as usize);
|
||||
|
||||
@@ -233,12 +224,12 @@ impl SerializedValueBatch {
|
||||
|
||||
metadata.push(ValueMeta::Serialized(SerializedValueMeta {
|
||||
key: key.to_compact(),
|
||||
lsn: next_record_lsn,
|
||||
lsn: record_end_lsn,
|
||||
batch_offset: relative_off,
|
||||
len: val_ser_size,
|
||||
will_init: val.will_init(),
|
||||
}));
|
||||
max_lsn = std::cmp::max(max_lsn, next_record_lsn);
|
||||
max_lsn = std::cmp::max(max_lsn, record_end_lsn);
|
||||
len += 1;
|
||||
}
|
||||
|
||||
|
||||
@@ -55,7 +55,6 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::auth::JwtAuth;
|
||||
use utils::failpoint_support::failpoints_handler;
|
||||
use utils::http::endpoint::profile_cpu_handler;
|
||||
use utils::http::endpoint::prometheus_metrics_handler;
|
||||
use utils::http::endpoint::request_span;
|
||||
use utils::http::request::must_parse_query_param;
|
||||
@@ -145,16 +144,10 @@ impl State {
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
secondary_controller: SecondaryController,
|
||||
) -> anyhow::Result<Self> {
|
||||
let allowlist_routes = [
|
||||
"/v1/status",
|
||||
"/v1/doc",
|
||||
"/swagger.yml",
|
||||
"/metrics",
|
||||
"/profile/cpu",
|
||||
]
|
||||
.iter()
|
||||
.map(|v| v.parse().unwrap())
|
||||
.collect::<Vec<_>>();
|
||||
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml", "/metrics"]
|
||||
.iter()
|
||||
.map(|v| v.parse().unwrap())
|
||||
.collect::<Vec<_>>();
|
||||
Ok(Self {
|
||||
conf,
|
||||
tenant_manager,
|
||||
@@ -331,7 +324,6 @@ impl From<crate::tenant::DeleteTimelineError> for ApiError {
|
||||
.into_boxed_str(),
|
||||
),
|
||||
a @ AlreadyInProgress(_) => ApiError::Conflict(a.to_string()),
|
||||
Cancelled => ApiError::ResourceUnavailable("shutting down".into()),
|
||||
Other(e) => ApiError::InternalServerError(e),
|
||||
}
|
||||
}
|
||||
@@ -3165,7 +3157,6 @@ pub fn make_router(
|
||||
Ok(router
|
||||
.data(state)
|
||||
.get("/metrics", |r| request_span(r, prometheus_metrics_handler))
|
||||
.get("/profile/cpu", |r| request_span(r, profile_cpu_handler))
|
||||
.get("/v1/status", |r| api_handler(r, status_handler))
|
||||
.put("/v1/failpoints", |r| {
|
||||
testing_api_handler("manage failpoints", r, failpoints_handler)
|
||||
|
||||
@@ -653,35 +653,6 @@ pub(crate) static COMPRESSION_IMAGE_OUTPUT_BYTES: Lazy<IntCounter> = Lazy::new(|
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RELSIZE_CACHE_ENTRIES: Lazy<UIntGauge> = Lazy::new(|| {
|
||||
register_uint_gauge!(
|
||||
"pageserver_relsize_cache_entries",
|
||||
"Number of entries in the relation size cache",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RELSIZE_CACHE_HITS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!("pageserver_relsize_cache_hits", "Relation size cache hits",)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RELSIZE_CACHE_MISSES: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_relsize_cache_misses",
|
||||
"Relation size cache misses",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RELSIZE_CACHE_MISSES_UPDATED: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_relsize_cache_misses_updated",
|
||||
"Relation size cache misses where relation update LSN is after lookup LSN",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) mod initial_logical_size {
|
||||
use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
@@ -10,9 +10,6 @@ use super::tenant::{PageReconstructError, Timeline};
|
||||
use crate::aux_file;
|
||||
use crate::context::RequestContext;
|
||||
use crate::keyspace::{KeySpace, KeySpaceAccum};
|
||||
use crate::metrics::{
|
||||
RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_UPDATED,
|
||||
};
|
||||
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
|
||||
use anyhow::{ensure, Context};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
@@ -958,12 +955,9 @@ impl Timeline {
|
||||
let rel_size_cache = self.rel_size_cache.read().unwrap();
|
||||
if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
|
||||
if lsn >= *cached_lsn {
|
||||
RELSIZE_CACHE_HITS.inc();
|
||||
return Some(*nblocks);
|
||||
}
|
||||
RELSIZE_CACHE_MISSES_UPDATED.inc();
|
||||
}
|
||||
RELSIZE_CACHE_MISSES.inc();
|
||||
None
|
||||
}
|
||||
|
||||
@@ -988,7 +982,6 @@ impl Timeline {
|
||||
}
|
||||
hash_map::Entry::Vacant(entry) => {
|
||||
entry.insert((lsn, nblocks));
|
||||
RELSIZE_CACHE_ENTRIES.inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -996,17 +989,13 @@ impl Timeline {
|
||||
/// Store cached relation size
|
||||
pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
|
||||
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
|
||||
if rel_size_cache.map.insert(tag, (lsn, nblocks)).is_none() {
|
||||
RELSIZE_CACHE_ENTRIES.inc();
|
||||
}
|
||||
rel_size_cache.map.insert(tag, (lsn, nblocks));
|
||||
}
|
||||
|
||||
/// Remove cached relation size
|
||||
pub fn remove_cached_rel_size(&self, tag: &RelTag) {
|
||||
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
|
||||
if rel_size_cache.map.remove(tag).is_some() {
|
||||
RELSIZE_CACHE_ENTRIES.dec();
|
||||
}
|
||||
rel_size_cache.map.remove(tag);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1022,7 +1011,7 @@ pub struct DatadirModification<'a> {
|
||||
pub tline: &'a Timeline,
|
||||
|
||||
/// Current LSN of the modification
|
||||
pub lsn: Lsn,
|
||||
lsn: Lsn,
|
||||
|
||||
// The modifications are not applied directly to the underlying key-value store.
|
||||
// The put-functions add the modifications here, and they are flushed to the
|
||||
|
||||
@@ -39,7 +39,6 @@ use remote_timeline_client::UploadQueueNotReadyError;
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt;
|
||||
use std::future::Future;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::Weak;
|
||||
use std::time::SystemTime;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
@@ -525,9 +524,6 @@ pub struct OffloadedTimeline {
|
||||
/// Prevent two tasks from deleting the timeline at the same time. If held, the
|
||||
/// timeline is being deleted. If 'true', the timeline has already been deleted.
|
||||
pub delete_progress: TimelineDeleteProgress,
|
||||
|
||||
/// Part of the `OffloadedTimeline` object's lifecycle: this needs to be set before we drop it
|
||||
pub deleted_from_ancestor: AtomicBool,
|
||||
}
|
||||
|
||||
impl OffloadedTimeline {
|
||||
@@ -537,16 +533,9 @@ impl OffloadedTimeline {
|
||||
/// the timeline is not in a stopped state.
|
||||
/// Panics if the timeline is not archived.
|
||||
fn from_timeline(timeline: &Timeline) -> Result<Self, UploadQueueNotReadyError> {
|
||||
let (ancestor_retain_lsn, ancestor_timeline_id) =
|
||||
if let Some(ancestor_timeline) = timeline.ancestor_timeline() {
|
||||
let ancestor_lsn = timeline.get_ancestor_lsn();
|
||||
let ancestor_timeline_id = ancestor_timeline.timeline_id;
|
||||
let mut gc_info = ancestor_timeline.gc_info.write().unwrap();
|
||||
gc_info.insert_child(timeline.timeline_id, ancestor_lsn, MaybeOffloaded::Yes);
|
||||
(Some(ancestor_lsn), Some(ancestor_timeline_id))
|
||||
} else {
|
||||
(None, None)
|
||||
};
|
||||
let ancestor_retain_lsn = timeline
|
||||
.get_ancestor_timeline_id()
|
||||
.map(|_timeline_id| timeline.get_ancestor_lsn());
|
||||
let archived_at = timeline
|
||||
.remote_client
|
||||
.archived_at_stopped_queue()?
|
||||
@@ -554,17 +543,14 @@ impl OffloadedTimeline {
|
||||
Ok(Self {
|
||||
tenant_shard_id: timeline.tenant_shard_id,
|
||||
timeline_id: timeline.timeline_id,
|
||||
ancestor_timeline_id,
|
||||
ancestor_timeline_id: timeline.get_ancestor_timeline_id(),
|
||||
ancestor_retain_lsn,
|
||||
archived_at,
|
||||
|
||||
delete_progress: timeline.delete_progress.clone(),
|
||||
deleted_from_ancestor: AtomicBool::new(false),
|
||||
})
|
||||
}
|
||||
fn from_manifest(tenant_shard_id: TenantShardId, manifest: &OffloadedTimelineManifest) -> Self {
|
||||
// We expect to reach this case in tenant loading, where the `retain_lsn` is populated in the parent's `gc_info`
|
||||
// by the `initialize_gc_info` function.
|
||||
let OffloadedTimelineManifest {
|
||||
timeline_id,
|
||||
ancestor_timeline_id,
|
||||
@@ -578,7 +564,6 @@ impl OffloadedTimeline {
|
||||
ancestor_retain_lsn,
|
||||
archived_at,
|
||||
delete_progress: TimelineDeleteProgress::default(),
|
||||
deleted_from_ancestor: AtomicBool::new(false),
|
||||
}
|
||||
}
|
||||
fn manifest(&self) -> OffloadedTimelineManifest {
|
||||
@@ -596,33 +581,6 @@ impl OffloadedTimeline {
|
||||
archived_at: *archived_at,
|
||||
}
|
||||
}
|
||||
/// Delete this timeline's retain_lsn from its ancestor, if present in the given tenant
|
||||
fn delete_from_ancestor_with_timelines(
|
||||
&self,
|
||||
timelines: &std::sync::MutexGuard<'_, HashMap<TimelineId, Arc<Timeline>>>,
|
||||
) {
|
||||
if let (Some(_retain_lsn), Some(ancestor_timeline_id)) =
|
||||
(self.ancestor_retain_lsn, self.ancestor_timeline_id)
|
||||
{
|
||||
if let Some((_, ancestor_timeline)) = timelines
|
||||
.iter()
|
||||
.find(|(tid, _tl)| **tid == ancestor_timeline_id)
|
||||
{
|
||||
ancestor_timeline
|
||||
.gc_info
|
||||
.write()
|
||||
.unwrap()
|
||||
.remove_child_offloaded(self.timeline_id);
|
||||
}
|
||||
}
|
||||
self.deleted_from_ancestor.store(true, Ordering::Release);
|
||||
}
|
||||
/// Call [`Self::delete_from_ancestor_with_timelines`] instead if possible.
|
||||
///
|
||||
/// As the entire tenant is being dropped, don't bother deregistering the `retain_lsn` from the ancestor.
|
||||
fn defuse_for_tenant_drop(&self) {
|
||||
self.deleted_from_ancestor.store(true, Ordering::Release);
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for OffloadedTimeline {
|
||||
@@ -631,17 +589,6 @@ impl fmt::Debug for OffloadedTimeline {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for OffloadedTimeline {
|
||||
fn drop(&mut self) {
|
||||
if !self.deleted_from_ancestor.load(Ordering::Acquire) {
|
||||
tracing::warn!(
|
||||
"offloaded timeline {} was dropped without having cleaned it up at the ancestor",
|
||||
self.timeline_id
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
|
||||
pub enum MaybeOffloaded {
|
||||
Yes,
|
||||
@@ -753,9 +700,6 @@ pub enum DeleteTimelineError {
|
||||
#[error("Timeline deletion is already in progress")]
|
||||
AlreadyInProgress(Arc<tokio::sync::Mutex<DeleteTimelineFlow>>),
|
||||
|
||||
#[error("Cancelled")]
|
||||
Cancelled,
|
||||
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
@@ -766,7 +710,6 @@ impl Debug for DeleteTimelineError {
|
||||
Self::NotFound => write!(f, "NotFound"),
|
||||
Self::HasChildren(c) => f.debug_tuple("HasChildren").field(c).finish(),
|
||||
Self::AlreadyInProgress(_) => f.debug_tuple("AlreadyInProgress").finish(),
|
||||
Self::Cancelled => f.debug_tuple("Cancelled").finish(),
|
||||
Self::Other(e) => f.debug_tuple("Other").field(e).finish(),
|
||||
}
|
||||
}
|
||||
@@ -1490,12 +1433,6 @@ impl Tenant {
|
||||
info!(%timeline_id, "index_part not found on remote");
|
||||
continue;
|
||||
}
|
||||
Err(DownloadError::Fatal(why)) => {
|
||||
// If, while loading one remote timeline, we saw an indication that our generation
|
||||
// number is likely invalid, then we should not load the whole tenant.
|
||||
error!(%timeline_id, "Fatal error loading timeline: {why}");
|
||||
anyhow::bail!(why.to_string());
|
||||
}
|
||||
Err(e) => {
|
||||
// Some (possibly ephemeral) error happened during index_part download.
|
||||
// Pretend the timeline exists to not delete the timeline directory,
|
||||
@@ -1584,7 +1521,7 @@ impl Tenant {
|
||||
}
|
||||
// Complete deletions for offloaded timeline id's.
|
||||
offloaded_timelines_list
|
||||
.retain(|(offloaded_id, offloaded)| {
|
||||
.retain(|(offloaded_id, _offloaded)| {
|
||||
// At this point, offloaded_timeline_ids has the list of all offloaded timelines
|
||||
// without a prefix in S3, so they are inexistent.
|
||||
// In the end, existence of a timeline is finally determined by the existence of an index-part.json in remote storage.
|
||||
@@ -1592,7 +1529,6 @@ impl Tenant {
|
||||
let delete = offloaded_timeline_ids.contains(offloaded_id);
|
||||
if delete {
|
||||
tracing::info!("Removing offloaded timeline {offloaded_id} from manifest as no remote prefix was found");
|
||||
offloaded.defuse_for_tenant_drop();
|
||||
}
|
||||
!delete
|
||||
});
|
||||
@@ -1981,15 +1917,9 @@ impl Tenant {
|
||||
)));
|
||||
};
|
||||
let mut offloaded_timelines = self.timelines_offloaded.lock().unwrap();
|
||||
match offloaded_timelines.remove(&timeline_id) {
|
||||
Some(offloaded) => {
|
||||
offloaded.delete_from_ancestor_with_timelines(&timelines);
|
||||
}
|
||||
None => warn!("timeline already removed from offloaded timelines"),
|
||||
if offloaded_timelines.remove(&timeline_id).is_none() {
|
||||
warn!("timeline already removed from offloaded timelines");
|
||||
}
|
||||
|
||||
self.initialize_gc_info(&timelines, &offloaded_timelines, Some(timeline_id));
|
||||
|
||||
Arc::clone(timeline)
|
||||
};
|
||||
|
||||
@@ -2727,7 +2657,7 @@ impl Tenant {
|
||||
.filter(|timeline| !(timeline.is_broken() || timeline.is_stopping()));
|
||||
|
||||
// Before activation, populate each Timeline's GcInfo with information about its children
|
||||
self.initialize_gc_info(&timelines_accessor, &timelines_offloaded_accessor, None);
|
||||
self.initialize_gc_info(&timelines_accessor, &timelines_offloaded_accessor);
|
||||
|
||||
// Spawn gc and compaction loops. The loops will shut themselves
|
||||
// down when they notice that the tenant is inactive.
|
||||
@@ -2842,14 +2772,8 @@ impl Tenant {
|
||||
let timeline_id = timeline.timeline_id;
|
||||
let span = tracing::info_span!("timeline_shutdown", %timeline_id, ?shutdown_mode);
|
||||
js.spawn(async move { timeline.shutdown(shutdown_mode).instrument(span).await });
|
||||
});
|
||||
}
|
||||
{
|
||||
let timelines_offloaded = self.timelines_offloaded.lock().unwrap();
|
||||
timelines_offloaded.values().for_each(|timeline| {
|
||||
timeline.defuse_for_tenant_drop();
|
||||
});
|
||||
}
|
||||
})
|
||||
};
|
||||
// test_long_timeline_create_then_tenant_delete is leaning on this message
|
||||
tracing::info!("Waiting for timelines...");
|
||||
while let Some(res) = js.join_next().await {
|
||||
@@ -3833,13 +3757,10 @@ impl Tenant {
|
||||
&self,
|
||||
timelines: &std::sync::MutexGuard<HashMap<TimelineId, Arc<Timeline>>>,
|
||||
timelines_offloaded: &std::sync::MutexGuard<HashMap<TimelineId, Arc<OffloadedTimeline>>>,
|
||||
restrict_to_timeline: Option<TimelineId>,
|
||||
) {
|
||||
if restrict_to_timeline.is_none() {
|
||||
// This function must be called before activation: after activation timeline create/delete operations
|
||||
// might happen, and this function is not safe to run concurrently with those.
|
||||
assert!(!self.is_active());
|
||||
}
|
||||
// This function must be called before activation: after activation timeline create/delete operations
|
||||
// might happen, and this function is not safe to run concurrently with those.
|
||||
assert!(!self.is_active());
|
||||
|
||||
// Scan all timelines. For each timeline, remember the timeline ID and
|
||||
// the branch point where it was created.
|
||||
@@ -3872,12 +3793,7 @@ impl Tenant {
|
||||
let horizon = self.get_gc_horizon();
|
||||
|
||||
// Populate each timeline's GcInfo with information about its child branches
|
||||
let timelines_to_write = if let Some(timeline_id) = restrict_to_timeline {
|
||||
itertools::Either::Left(timelines.get(&timeline_id).into_iter())
|
||||
} else {
|
||||
itertools::Either::Right(timelines.values())
|
||||
};
|
||||
for timeline in timelines_to_write {
|
||||
for timeline in timelines.values() {
|
||||
let mut branchpoints: Vec<(Lsn, TimelineId, MaybeOffloaded)> = all_branchpoints
|
||||
.remove(&timeline.timeline_id)
|
||||
.unwrap_or_default();
|
||||
@@ -9724,54 +9640,4 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
#[tokio::test]
|
||||
async fn test_timeline_offload_retain_lsn() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_timeline_offload_retain_lsn")
|
||||
.await
|
||||
.unwrap();
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline_parent = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let tline_child = tenant
|
||||
.branch_timeline_test(&tline_parent, NEW_TIMELINE_ID, Some(Lsn(0x20)), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
{
|
||||
let gc_info_parent = tline_parent.gc_info.read().unwrap();
|
||||
assert_eq!(
|
||||
gc_info_parent.retain_lsns,
|
||||
vec![(Lsn(0x20), tline_child.timeline_id, MaybeOffloaded::No)]
|
||||
);
|
||||
}
|
||||
// We have to directly call the remote_client instead of using the archive function to avoid constructing broker client...
|
||||
tline_child
|
||||
.remote_client
|
||||
.schedule_index_upload_for_timeline_archival_state(TimelineArchivalState::Archived)
|
||||
.unwrap();
|
||||
tline_child.remote_client.wait_completion().await.unwrap();
|
||||
offload_timeline(&tenant, &tline_child)
|
||||
.instrument(tracing::info_span!(parent: None, "offload_test", tenant_id=%"test", shard_id=%"test", timeline_id=%"test"))
|
||||
.await.unwrap();
|
||||
let child_timeline_id = tline_child.timeline_id;
|
||||
Arc::try_unwrap(tline_child).unwrap();
|
||||
|
||||
{
|
||||
let gc_info_parent = tline_parent.gc_info.read().unwrap();
|
||||
assert_eq!(
|
||||
gc_info_parent.retain_lsns,
|
||||
vec![(Lsn(0x20), child_timeline_id, MaybeOffloaded::Yes)]
|
||||
);
|
||||
}
|
||||
|
||||
tenant
|
||||
.get_offloaded_timeline(child_timeline_id)
|
||||
.unwrap()
|
||||
.defuse_for_tenant_drop();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -243,7 +243,7 @@ use self::index::IndexPart;
|
||||
use super::metadata::MetadataUpdate;
|
||||
use super::storage_layer::{Layer, LayerName, ResidentLayer};
|
||||
use super::upload_queue::{NotInitialized, SetDeletedFlagProgress};
|
||||
use super::{DeleteTimelineError, Generation};
|
||||
use super::Generation;
|
||||
|
||||
pub(crate) use download::{
|
||||
download_index_part, download_tenant_manifest, is_temp_download_file,
|
||||
@@ -574,18 +574,12 @@ impl RemoteTimelineClient {
|
||||
|
||||
if latest_index_generation > index_generation {
|
||||
// Unexpected! Why are we loading such an old index if a more recent one exists?
|
||||
// We will refuse to proceed, as there is no reasonable scenario where this should happen, but
|
||||
// there _is_ a clear bug/corruption scenario where it would happen (controller sets the generation
|
||||
// backwards).
|
||||
tracing::error!(
|
||||
tracing::warn!(
|
||||
?index_generation,
|
||||
?latest_index_generation,
|
||||
?latest_index_mtime,
|
||||
"Found a newer index while loading an old one"
|
||||
);
|
||||
return Err(DownloadError::Fatal(
|
||||
"Index age exceeds threshold and a newer index exists".into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1550,17 +1544,15 @@ impl RemoteTimelineClient {
|
||||
/// Prerequisites: UploadQueue should be in stopped state and deleted_at should be successfuly set.
|
||||
/// The function deletes layer files one by one, then lists the prefix to see if we leaked something
|
||||
/// deletes leaked files if any and proceeds with deletion of index file at the end.
|
||||
pub(crate) async fn delete_all(self: &Arc<Self>) -> Result<(), DeleteTimelineError> {
|
||||
pub(crate) async fn delete_all(self: &Arc<Self>) -> anyhow::Result<()> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let layers: Vec<RemotePath> = {
|
||||
let mut locked = self.upload_queue.lock().unwrap();
|
||||
let stopped = locked.stopped_mut().map_err(DeleteTimelineError::Other)?;
|
||||
let stopped = locked.stopped_mut()?;
|
||||
|
||||
if !matches!(stopped.deleted_at, SetDeletedFlagProgress::Successful(_)) {
|
||||
return Err(DeleteTimelineError::Other(anyhow::anyhow!(
|
||||
"deleted_at is not set"
|
||||
)));
|
||||
anyhow::bail!("deleted_at is not set")
|
||||
}
|
||||
|
||||
debug_assert!(stopped.upload_queue_for_deletion.no_pending_work());
|
||||
@@ -1595,10 +1587,7 @@ impl RemoteTimelineClient {
|
||||
};
|
||||
|
||||
let layer_deletion_count = layers.len();
|
||||
self.deletion_queue_client
|
||||
.push_immediate(layers)
|
||||
.await
|
||||
.map_err(|_| DeleteTimelineError::Cancelled)?;
|
||||
self.deletion_queue_client.push_immediate(layers).await?;
|
||||
|
||||
// Delete the initdb.tar.zst, which is not always present, but deletion attempts of
|
||||
// inexistant objects are not considered errors.
|
||||
@@ -1606,8 +1595,7 @@ impl RemoteTimelineClient {
|
||||
remote_initdb_archive_path(&self.tenant_shard_id.tenant_id, &self.timeline_id);
|
||||
self.deletion_queue_client
|
||||
.push_immediate(vec![initdb_path])
|
||||
.await
|
||||
.map_err(|_| DeleteTimelineError::Cancelled)?;
|
||||
.await?;
|
||||
|
||||
// Do not delete index part yet, it is needed for possible retry. If we remove it first
|
||||
// and retry will arrive to different pageserver there wont be any traces of it on remote storage
|
||||
@@ -1615,9 +1603,7 @@ impl RemoteTimelineClient {
|
||||
|
||||
// Execute all pending deletions, so that when we proceed to do a listing below, we aren't
|
||||
// taking the burden of listing all the layers that we already know we should delete.
|
||||
self.flush_deletion_queue()
|
||||
.await
|
||||
.map_err(|_| DeleteTimelineError::Cancelled)?;
|
||||
self.flush_deletion_queue().await?;
|
||||
|
||||
let cancel = shutdown_token();
|
||||
|
||||
@@ -1680,32 +1666,28 @@ impl RemoteTimelineClient {
|
||||
if !remaining_layers.is_empty() {
|
||||
self.deletion_queue_client
|
||||
.push_immediate(remaining_layers)
|
||||
.await
|
||||
.map_err(|_| DeleteTimelineError::Cancelled)?;
|
||||
.await?;
|
||||
}
|
||||
|
||||
fail::fail_point!("timeline-delete-before-index-delete", |_| {
|
||||
Err(DeleteTimelineError::Other(anyhow::anyhow!(
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: timeline-delete-before-index-delete"
|
||||
)))?
|
||||
))?
|
||||
});
|
||||
|
||||
debug!("enqueuing index part deletion");
|
||||
self.deletion_queue_client
|
||||
.push_immediate([latest_index].to_vec())
|
||||
.await
|
||||
.map_err(|_| DeleteTimelineError::Cancelled)?;
|
||||
.await?;
|
||||
|
||||
// Timeline deletion is rare and we have probably emitted a reasonably number of objects: wait
|
||||
// for a flush to a persistent deletion list so that we may be sure deletion will occur.
|
||||
self.flush_deletion_queue()
|
||||
.await
|
||||
.map_err(|_| DeleteTimelineError::Cancelled)?;
|
||||
self.flush_deletion_queue().await?;
|
||||
|
||||
fail::fail_point!("timeline-delete-after-index-delete", |_| {
|
||||
Err(DeleteTimelineError::Other(anyhow::anyhow!(
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: timeline-delete-after-index-delete"
|
||||
)))?
|
||||
))?
|
||||
});
|
||||
|
||||
info!(prefix=%timeline_storage_path, referenced=layer_deletion_count, not_referenced=%not_referenced_count, "done deleting in timeline prefix, including index_part.json");
|
||||
|
||||
@@ -477,21 +477,8 @@ impl GcInfo {
|
||||
self.retain_lsns.sort_by_key(|i| i.0);
|
||||
}
|
||||
|
||||
pub(super) fn remove_child_maybe_offloaded(
|
||||
&mut self,
|
||||
child_id: TimelineId,
|
||||
maybe_offloaded: MaybeOffloaded,
|
||||
) {
|
||||
self.retain_lsns
|
||||
.retain(|i| !(i.1 == child_id && i.2 == maybe_offloaded));
|
||||
}
|
||||
|
||||
pub(super) fn remove_child_not_offloaded(&mut self, child_id: TimelineId) {
|
||||
self.remove_child_maybe_offloaded(child_id, MaybeOffloaded::No);
|
||||
}
|
||||
|
||||
pub(super) fn remove_child_offloaded(&mut self, child_id: TimelineId) {
|
||||
self.remove_child_maybe_offloaded(child_id, MaybeOffloaded::Yes);
|
||||
pub(super) fn remove_child(&mut self, child_id: TimelineId) {
|
||||
self.retain_lsns.retain(|i| i.1 != child_id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4514,7 +4501,7 @@ impl Drop for Timeline {
|
||||
// This lock should never be poisoned, but in case it is we do a .map() instead of
|
||||
// an unwrap(), to avoid panicking in a destructor and thereby aborting the process.
|
||||
if let Ok(mut gc_info) = ancestor.gc_info.write() {
|
||||
gc_info.remove_child_not_offloaded(self.timeline_id)
|
||||
gc_info.remove_child(self.timeline_id)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -5043,7 +5030,7 @@ impl Timeline {
|
||||
|
||||
// 1. Is it newer than GC horizon cutoff point?
|
||||
if l.get_lsn_range().end > space_cutoff {
|
||||
info!(
|
||||
debug!(
|
||||
"keeping {} because it's newer than space_cutoff {}",
|
||||
l.layer_name(),
|
||||
space_cutoff,
|
||||
@@ -5054,7 +5041,7 @@ impl Timeline {
|
||||
|
||||
// 2. It is newer than PiTR cutoff point?
|
||||
if l.get_lsn_range().end > time_cutoff {
|
||||
info!(
|
||||
debug!(
|
||||
"keeping {} because it's newer than time_cutoff {}",
|
||||
l.layer_name(),
|
||||
time_cutoff,
|
||||
@@ -5073,7 +5060,7 @@ impl Timeline {
|
||||
for retain_lsn in &retain_lsns {
|
||||
// start_lsn is inclusive
|
||||
if &l.get_lsn_range().start <= retain_lsn {
|
||||
info!(
|
||||
debug!(
|
||||
"keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}",
|
||||
l.layer_name(),
|
||||
retain_lsn,
|
||||
@@ -5088,7 +5075,7 @@ impl Timeline {
|
||||
if let Some(lsn) = &max_lsn_with_valid_lease {
|
||||
// keep if layer start <= any of the lease
|
||||
if &l.get_lsn_range().start <= lsn {
|
||||
info!(
|
||||
debug!(
|
||||
"keeping {} because there is a valid lease preventing GC at {}",
|
||||
l.layer_name(),
|
||||
lsn,
|
||||
@@ -5120,13 +5107,13 @@ impl Timeline {
|
||||
if !layers
|
||||
.image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))
|
||||
{
|
||||
info!("keeping {} because it is the latest layer", l.layer_name());
|
||||
debug!("keeping {} because it is the latest layer", l.layer_name());
|
||||
result.layers_not_updated += 1;
|
||||
continue 'outer;
|
||||
}
|
||||
|
||||
// We didn't find any reason to keep this file, so remove it.
|
||||
info!(
|
||||
debug!(
|
||||
"garbage collecting {} is_dropped: xx is_incremental: {}",
|
||||
l.layer_name(),
|
||||
l.is_incremental(),
|
||||
|
||||
@@ -5,7 +5,6 @@ use std::{
|
||||
|
||||
use anyhow::Context;
|
||||
use pageserver_api::{models::TimelineState, shard::TenantShardId};
|
||||
use remote_storage::DownloadError;
|
||||
use tokio::sync::OwnedMutexGuard;
|
||||
use tracing::{error, info, info_span, instrument, Instrument};
|
||||
use utils::{crashsafe, fs_ext, id::TimelineId, pausable_failpoint};
|
||||
@@ -17,7 +16,7 @@ use crate::{
|
||||
metadata::TimelineMetadata,
|
||||
remote_timeline_client::{PersistIndexPartWithDeletedFlagError, RemoteTimelineClient},
|
||||
CreateTimelineCause, DeleteTimelineError, MaybeDeletedIndexPart, Tenant,
|
||||
TenantManifestError, TimelineOrOffloaded,
|
||||
TimelineOrOffloaded,
|
||||
},
|
||||
virtual_file::MaybeFatalIo,
|
||||
};
|
||||
@@ -111,6 +110,13 @@ pub(super) async fn delete_local_timeline_directory(
|
||||
info!("finished deleting layer files, releasing locks");
|
||||
}
|
||||
|
||||
/// Removes remote layers and an index file after them.
|
||||
async fn delete_remote_layers_and_index(
|
||||
remote_client: &Arc<RemoteTimelineClient>,
|
||||
) -> anyhow::Result<()> {
|
||||
remote_client.delete_all().await.context("delete_all")
|
||||
}
|
||||
|
||||
/// It is important that this gets called when DeletionGuard is being held.
|
||||
/// For more context see comments in [`DeleteTimelineFlow::prepare`]
|
||||
async fn remove_maybe_offloaded_timeline_from_tenant(
|
||||
@@ -141,10 +147,9 @@ async fn remove_maybe_offloaded_timeline_from_tenant(
|
||||
);
|
||||
}
|
||||
TimelineOrOffloaded::Offloaded(timeline) => {
|
||||
let offloaded_timeline = timelines_offloaded
|
||||
timelines_offloaded
|
||||
.remove(&timeline.timeline_id)
|
||||
.expect("timeline that we were deleting was concurrently removed from 'timelines_offloaded' map");
|
||||
offloaded_timeline.delete_from_ancestor_with_timelines(&timelines);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -216,24 +221,11 @@ impl DeleteTimelineFlow {
|
||||
None => {
|
||||
let remote_client = tenant
|
||||
.build_timeline_client(timeline.timeline_id(), tenant.remote_storage.clone());
|
||||
let result = match remote_client
|
||||
let result = remote_client
|
||||
.download_index_file(&tenant.cancel)
|
||||
.instrument(info_span!("download_index_file"))
|
||||
.await
|
||||
{
|
||||
Ok(r) => r,
|
||||
Err(DownloadError::NotFound) => {
|
||||
// Deletion is already complete
|
||||
tracing::info!("Timeline already deleted in remote storage");
|
||||
return Ok(());
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(DeleteTimelineError::Other(anyhow::anyhow!(
|
||||
"error: {:?}",
|
||||
e
|
||||
)));
|
||||
}
|
||||
};
|
||||
.map_err(|e| DeleteTimelineError::Other(anyhow::anyhow!("error: {:?}", e)))?;
|
||||
let index_part = match result {
|
||||
MaybeDeletedIndexPart::Deleted(p) => {
|
||||
tracing::info!("Timeline already set as deleted in remote index");
|
||||
@@ -414,12 +406,7 @@ impl DeleteTimelineFlow {
|
||||
"timeline_delete",
|
||||
async move {
|
||||
if let Err(err) = Self::background(guard, conf, &tenant, &timeline, remote_client).await {
|
||||
// Only log as an error if it's not a cancellation.
|
||||
if matches!(err, DeleteTimelineError::Cancelled) {
|
||||
info!("Shutdown during timeline deletion");
|
||||
}else {
|
||||
error!("Error: {err:#}");
|
||||
}
|
||||
error!("Error: {err:#}");
|
||||
if let TimelineOrOffloaded::Timeline(timeline) = timeline {
|
||||
timeline.set_broken(format!("{err:#}"))
|
||||
}
|
||||
@@ -451,7 +438,7 @@ impl DeleteTimelineFlow {
|
||||
Err(anyhow::anyhow!("failpoint: timeline-delete-after-rm"))?
|
||||
});
|
||||
|
||||
remote_client.delete_all().await?;
|
||||
delete_remote_layers_and_index(&remote_client).await?;
|
||||
|
||||
pausable_failpoint!("in_progress_delete");
|
||||
|
||||
@@ -462,10 +449,10 @@ impl DeleteTimelineFlow {
|
||||
// So indeed, the tenant manifest might refer to an offloaded timeline which has already been deleted.
|
||||
// However, we handle this case in tenant loading code so the next time we attach, the issue is
|
||||
// resolved.
|
||||
tenant.store_tenant_manifest().await.map_err(|e| match e {
|
||||
TenantManifestError::Cancelled => DeleteTimelineError::Cancelled,
|
||||
_ => DeleteTimelineError::Other(e.into()),
|
||||
})?;
|
||||
tenant
|
||||
.store_tenant_manifest()
|
||||
.await
|
||||
.map_err(|e| DeleteTimelineError::Other(anyhow::anyhow!(e)))?;
|
||||
|
||||
*guard = Self::Finished;
|
||||
|
||||
|
||||
@@ -66,7 +66,7 @@ pub(crate) async fn offload_timeline(
|
||||
let conf = &tenant.conf;
|
||||
delete_local_timeline_directory(conf, tenant.tenant_shard_id, &timeline).await;
|
||||
|
||||
let remaining_refcount = remove_timeline_from_tenant(tenant, &timeline, &guard);
|
||||
remove_timeline_from_tenant(tenant, &timeline, &guard);
|
||||
|
||||
{
|
||||
let mut offloaded_timelines = tenant.timelines_offloaded.lock().unwrap();
|
||||
@@ -87,20 +87,16 @@ pub(crate) async fn offload_timeline(
|
||||
// not our actual state of offloaded timelines.
|
||||
tenant.store_tenant_manifest().await?;
|
||||
|
||||
tracing::info!("Timeline offload complete (remaining arc refcount: {remaining_refcount})");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// It is important that this gets called when DeletionGuard is being held.
|
||||
/// For more context see comments in [`DeleteTimelineFlow::prepare`]
|
||||
///
|
||||
/// Returns the strong count of the timeline `Arc`
|
||||
fn remove_timeline_from_tenant(
|
||||
tenant: &Tenant,
|
||||
timeline: &Timeline,
|
||||
_: &DeletionGuard, // using it as a witness
|
||||
) -> usize {
|
||||
) {
|
||||
// Remove the timeline from the map.
|
||||
let mut timelines = tenant.timelines.lock().unwrap();
|
||||
let children_exist = timelines
|
||||
@@ -113,9 +109,7 @@ fn remove_timeline_from_tenant(
|
||||
panic!("Timeline grew children while we removed layer files");
|
||||
}
|
||||
|
||||
let timeline = timelines
|
||||
timelines
|
||||
.remove(&timeline.timeline_id)
|
||||
.expect("timeline that we were deleting was concurrently removed from 'timelines' map");
|
||||
|
||||
Arc::strong_count(&timeline)
|
||||
}
|
||||
|
||||
@@ -331,11 +331,11 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
while let Some((next_record_lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
while let Some((record_end_lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
// It is important to deal with the aligned records as lsn in getPage@LSN is
|
||||
// aligned and can be several bytes bigger. Without this alignment we are
|
||||
// at risk of hitting a deadlock.
|
||||
if !next_record_lsn.is_aligned() {
|
||||
if !record_end_lsn.is_aligned() {
|
||||
return Err(WalReceiverError::Other(anyhow!("LSN not aligned")));
|
||||
}
|
||||
|
||||
@@ -343,7 +343,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
let interpreted = InterpretedWalRecord::from_bytes_filtered(
|
||||
recdata,
|
||||
modification.tline.get_shard_identity(),
|
||||
next_record_lsn,
|
||||
record_end_lsn,
|
||||
modification.tline.pg_version,
|
||||
)?;
|
||||
|
||||
@@ -367,10 +367,10 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
.ingest_record(interpreted, &mut modification, &ctx)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("could not ingest record at {next_record_lsn}")
|
||||
format!("could not ingest record at {record_end_lsn}")
|
||||
})?;
|
||||
if !ingested {
|
||||
tracing::debug!("ingest: filtered out record @ LSN {next_record_lsn}");
|
||||
tracing::debug!("ingest: filtered out record @ LSN {record_end_lsn}");
|
||||
WAL_INGEST.records_filtered.inc();
|
||||
filtered_records += 1;
|
||||
}
|
||||
@@ -380,7 +380,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
// to timeout the tests.
|
||||
fail_point!("walreceiver-after-ingest");
|
||||
|
||||
last_rec_lsn = next_record_lsn;
|
||||
last_rec_lsn = record_end_lsn;
|
||||
|
||||
// Commit every ingest_batch_size records. Even if we filtered out
|
||||
// all records, we still need to call commit to advance the LSN.
|
||||
|
||||
@@ -154,7 +154,7 @@ impl WalIngest {
|
||||
WAL_INGEST.records_received.inc();
|
||||
let prev_len = modification.len();
|
||||
|
||||
modification.set_lsn(interpreted.next_record_lsn)?;
|
||||
modification.set_lsn(interpreted.end_lsn)?;
|
||||
|
||||
if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes) {
|
||||
// Records of this type should always be preceded by a commit(), as they
|
||||
@@ -327,25 +327,6 @@ impl WalIngest {
|
||||
let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
|
||||
let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
|
||||
|
||||
// Only apply the writes if this shard owns either of the VM pages. These WAL records are
|
||||
// broadcast to all shards, but should only be applied on the relevant ones. See:
|
||||
// https://github.com/neondatabase/neon/issues/9855
|
||||
//
|
||||
// TODO: we should differentiate on old_vm_blk and new_vm_blk since we may only own one of
|
||||
// them, but this is no worse than the old behavior where we applied all records, and will
|
||||
// avoid the performance penalty of the relsize cache miss and DbDir deserialization.
|
||||
//
|
||||
// TODO: consider filtering this during decoding, and avoid the broadcast.
|
||||
let old_is_local = old_vm_blk
|
||||
.map(|blk| self.shard.is_key_local(&rel_block_to_key(vm_rel, blk)))
|
||||
.unwrap_or_default();
|
||||
let new_is_local = new_vm_blk
|
||||
.map(|blk| self.shard.is_key_local(&rel_block_to_key(vm_rel, blk)))
|
||||
.unwrap_or_default();
|
||||
if !old_is_local && !new_is_local {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Sometimes, Postgres seems to create heap WAL records with the
|
||||
// ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
|
||||
// not set. In fact, it's possible that the VM page does not exist at all.
|
||||
@@ -353,11 +334,7 @@ impl WalIngest {
|
||||
// replaying it would fail to find the previous image of the page, because
|
||||
// it doesn't exist. So check if the VM page(s) exist, and skip the WAL
|
||||
// record if it doesn't.
|
||||
let (vm_size, vm_exists) = get_relsize(modification, vm_rel, ctx).await?;
|
||||
if vm_size == 0 {
|
||||
let lsn = modification.lsn;
|
||||
log::info!("ingest_clear_vm_bits: vm_rel {vm_rel} has size={vm_size} exists={vm_exists} at LSN {lsn}, flags={flags} old={old_heap_blkno:?} new={new_heap_blkno:?}");
|
||||
}
|
||||
let vm_size = get_relsize(modification, vm_rel, ctx).await?;
|
||||
if let Some(blknum) = new_vm_blk {
|
||||
if blknum >= vm_size {
|
||||
new_vm_blk = None;
|
||||
@@ -595,7 +572,7 @@ impl WalIngest {
|
||||
modification.put_rel_page_image_zero(rel, fsm_physical_page_no)?;
|
||||
fsm_physical_page_no += 1;
|
||||
}
|
||||
let (nblocks, _) = get_relsize(modification, rel, ctx).await?;
|
||||
let nblocks = get_relsize(modification, rel, ctx).await?;
|
||||
if nblocks > fsm_physical_page_no {
|
||||
// check if something to do: FSM is larger than truncate position
|
||||
self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
|
||||
@@ -610,32 +587,14 @@ impl WalIngest {
|
||||
forknum: VISIBILITYMAP_FORKNUM,
|
||||
};
|
||||
|
||||
// last remaining block, byte, and bit
|
||||
let mut vm_page_no = blkno / (pg_constants::VM_HEAPBLOCKS_PER_PAGE as u32);
|
||||
let trunc_byte = blkno as usize % pg_constants::VM_HEAPBLOCKS_PER_PAGE
|
||||
/ pg_constants::VM_HEAPBLOCKS_PER_BYTE;
|
||||
let trunc_offs = blkno as usize % pg_constants::VM_HEAPBLOCKS_PER_BYTE
|
||||
* pg_constants::VM_BITS_PER_HEAPBLOCK;
|
||||
|
||||
// Unless the new size is exactly at a visibility map page boundary, the
|
||||
// tail bits in the last remaining map page, representing truncated heap
|
||||
// blocks, need to be cleared. This is not only tidy, but also necessary
|
||||
// because we don't get a chance to clear the bits if the heap is extended
|
||||
// again.
|
||||
if (trunc_byte != 0 || trunc_offs != 0)
|
||||
&& self.shard.is_key_local(&rel_block_to_key(rel, vm_page_no))
|
||||
{
|
||||
modification.put_rel_wal_record(
|
||||
rel,
|
||||
vm_page_no,
|
||||
NeonWalRecord::TruncateVisibilityMap {
|
||||
trunc_byte,
|
||||
trunc_offs,
|
||||
},
|
||||
)?;
|
||||
let mut vm_page_no = blkno / pg_constants::VM_HEAPBLOCKS_PER_PAGE;
|
||||
if blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 {
|
||||
// Tail of last remaining vm page has to be zeroed.
|
||||
// We are not precise here and instead of digging in VM bitmap format just clear the whole page.
|
||||
modification.put_rel_page_image_zero(rel, vm_page_no)?;
|
||||
vm_page_no += 1;
|
||||
}
|
||||
let (nblocks, _) = get_relsize(modification, rel, ctx).await?;
|
||||
let nblocks = get_relsize(modification, rel, ctx).await?;
|
||||
if nblocks > vm_page_no {
|
||||
// check if something to do: VM is larger than truncate position
|
||||
self.put_rel_truncation(modification, rel, vm_page_no, ctx)
|
||||
@@ -1457,22 +1416,20 @@ async fn get_relsize(
|
||||
modification: &DatadirModification<'_>,
|
||||
rel: RelTag,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(BlockNumber, bool), PageReconstructError> {
|
||||
if !modification
|
||||
) -> Result<BlockNumber, PageReconstructError> {
|
||||
let nblocks = if !modification
|
||||
.tline
|
||||
.get_rel_exists(rel, Version::Modified(modification), ctx)
|
||||
.await?
|
||||
{
|
||||
Ok((0, false))
|
||||
0
|
||||
} else {
|
||||
Ok((
|
||||
modification
|
||||
.tline
|
||||
.get_rel_size(rel, Version::Modified(modification), ctx)
|
||||
.await?,
|
||||
true,
|
||||
))
|
||||
}
|
||||
modification
|
||||
.tline
|
||||
.get_rel_size(rel, Version::Modified(modification), ctx)
|
||||
.await?
|
||||
};
|
||||
Ok(nblocks)
|
||||
}
|
||||
|
||||
#[allow(clippy::bool_assert_comparison)]
|
||||
|
||||
@@ -42,34 +42,6 @@ pub(crate) fn apply_in_neon(
|
||||
} => {
|
||||
anyhow::bail!("tried to pass postgres wal record to neon WAL redo");
|
||||
}
|
||||
//
|
||||
// Code copied from PostgreSQL `visibilitymap_prepare_truncate` function in `visibilitymap.c`
|
||||
//
|
||||
NeonWalRecord::TruncateVisibilityMap {
|
||||
trunc_byte,
|
||||
trunc_offs,
|
||||
} => {
|
||||
// sanity check that this is modifying the correct relation
|
||||
let (rel, _) = key.to_rel_block().context("invalid record")?;
|
||||
assert!(
|
||||
rel.forknum == VISIBILITYMAP_FORKNUM,
|
||||
"TruncateVisibilityMap record on unexpected rel {}",
|
||||
rel
|
||||
);
|
||||
let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
|
||||
map[*trunc_byte + 1..].fill(0u8);
|
||||
/*----
|
||||
* Mask out the unwanted bits of the last remaining byte.
|
||||
*
|
||||
* ((1 << 0) - 1) = 00000000
|
||||
* ((1 << 1) - 1) = 00000001
|
||||
* ...
|
||||
* ((1 << 6) - 1) = 00111111
|
||||
* ((1 << 7) - 1) = 01111111
|
||||
*----
|
||||
*/
|
||||
map[*trunc_byte] &= (1 << *trunc_offs) - 1;
|
||||
}
|
||||
NeonWalRecord::ClearVisibilityMapFlags {
|
||||
new_heap_blkno,
|
||||
old_heap_blkno,
|
||||
|
||||
@@ -512,7 +512,7 @@ neon_shmem_startup_hook(void)
|
||||
if (prev_shmem_startup_hook)
|
||||
prev_shmem_startup_hook();
|
||||
|
||||
#if PG_MAJORVERSION_NUM >= 17
|
||||
#if PG_PG_MAJORVERSION_NUM >= 17
|
||||
WAIT_EVENT_NEON_LFC_MAINTENANCE = WaitEventExtensionNew("Neon/FileCache_Maintenance");
|
||||
WAIT_EVENT_NEON_LFC_READ = WaitEventExtensionNew("Neon/FileCache_Read");
|
||||
WAIT_EVENT_NEON_LFC_TRUNCATE = WaitEventExtensionNew("Neon/FileCache_Truncate");
|
||||
|
||||
@@ -30,7 +30,6 @@ once_cell.workspace = true
|
||||
parking_lot.workspace = true
|
||||
postgres.workspace = true
|
||||
postgres-protocol.workspace = true
|
||||
pprof.workspace = true
|
||||
rand.workspace = true
|
||||
regex.workspace = true
|
||||
scopeguard.workspace = true
|
||||
|
||||
@@ -14,10 +14,6 @@ cargo bench --package safekeeper --bench receive_wal process_msg/fsync=false
|
||||
|
||||
# List available benchmarks.
|
||||
cargo bench --package safekeeper --benches -- --list
|
||||
|
||||
# Generate flamegraph profiles using pprof-rs, profiling for 10 seconds.
|
||||
# Output in target/criterion/*/profile/flamegraph.svg.
|
||||
cargo bench --package safekeeper --bench receive_wal process_msg/fsync=false --profile-time 10
|
||||
```
|
||||
|
||||
Additional charts and statistics are available in `target/criterion/report/index.html`.
|
||||
|
||||
@@ -10,7 +10,6 @@ use camino_tempfile::tempfile;
|
||||
use criterion::{criterion_group, criterion_main, BatchSize, Bencher, Criterion};
|
||||
use itertools::Itertools as _;
|
||||
use postgres_ffi::v17::wal_generator::{LogicalMessageGenerator, WalGenerator};
|
||||
use pprof::criterion::{Output, PProfProfiler};
|
||||
use safekeeper::receive_wal::{self, WalAcceptor};
|
||||
use safekeeper::safekeeper::{
|
||||
AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage,
|
||||
@@ -25,9 +24,8 @@ const GB: usize = 1024 * MB;
|
||||
|
||||
// Register benchmarks with Criterion.
|
||||
criterion_group!(
|
||||
name = benches;
|
||||
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
|
||||
targets = bench_process_msg,
|
||||
benches,
|
||||
bench_process_msg,
|
||||
bench_wal_acceptor,
|
||||
bench_wal_acceptor_throughput,
|
||||
bench_file_write
|
||||
@@ -264,7 +262,7 @@ fn bench_wal_acceptor_throughput(c: &mut Criterion) {
|
||||
|
||||
// Send requests.
|
||||
for req in reqgen {
|
||||
_ = reply_rx.try_recv(); // discard any replies, to avoid blocking
|
||||
while reply_rx.try_recv().is_ok() {} // discard replies, to avoid blocking
|
||||
let msg = ProposerAcceptorMessage::AppendRequest(req);
|
||||
msg_tx.send(msg).await.expect("send failed");
|
||||
}
|
||||
|
||||
@@ -14,9 +14,7 @@ use tokio_stream::wrappers::ReceiverStream;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{info_span, Instrument};
|
||||
use utils::failpoint_support::failpoints_handler;
|
||||
use utils::http::endpoint::{
|
||||
profile_cpu_handler, prometheus_metrics_handler, request_span, ChannelWriter,
|
||||
};
|
||||
use utils::http::endpoint::{prometheus_metrics_handler, request_span, ChannelWriter};
|
||||
use utils::http::request::parse_query_param;
|
||||
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
@@ -576,7 +574,7 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
|
||||
router = router.middleware(auth_middleware(|request| {
|
||||
#[allow(clippy::mutable_key_type)]
|
||||
static ALLOWLIST_ROUTES: Lazy<HashSet<Uri>> = Lazy::new(|| {
|
||||
["/v1/status", "/metrics", "/pprof/profile"]
|
||||
["/v1/status", "/metrics"]
|
||||
.iter()
|
||||
.map(|v| v.parse().unwrap())
|
||||
.collect()
|
||||
@@ -600,7 +598,6 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
|
||||
.data(Arc::new(conf))
|
||||
.data(auth)
|
||||
.get("/metrics", |r| request_span(r, prometheus_metrics_handler))
|
||||
.get("/profile/cpu", |r| request_span(r, profile_cpu_handler))
|
||||
.get("/v1/status", |r| request_span(r, status_handler))
|
||||
.put("/v1/failpoints", |r| {
|
||||
request_span(r, move |r| async {
|
||||
|
||||
@@ -217,7 +217,8 @@ pub static WAL_RECEIVER_QUEUE_DEPTH: Lazy<Histogram> = Lazy::new(|| {
|
||||
let mut buckets = pow2_buckets(1, MSG_QUEUE_SIZE);
|
||||
buckets.insert(0, 0.0);
|
||||
buckets.insert(buckets.len() - 1, (MSG_QUEUE_SIZE - 1) as f64);
|
||||
assert!(buckets.len() <= 12, "too many histogram buckets");
|
||||
// TODO: tweak this.
|
||||
assert!(buckets.len() <= 16, "too many histogram buckets");
|
||||
|
||||
register_histogram!(
|
||||
"safekeeper_wal_receiver_queue_depth",
|
||||
|
||||
@@ -7,14 +7,15 @@ use crate::metrics::{
|
||||
WAL_RECEIVERS, WAL_RECEIVER_QUEUE_DEPTH, WAL_RECEIVER_QUEUE_DEPTH_TOTAL,
|
||||
WAL_RECEIVER_QUEUE_SIZE_TOTAL,
|
||||
};
|
||||
use crate::safekeeper::AcceptorProposerMessage;
|
||||
use crate::safekeeper::ProposerAcceptorMessage;
|
||||
use crate::safekeeper::ServerInfo;
|
||||
use crate::safekeeper::{
|
||||
AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage,
|
||||
ServerInfo,
|
||||
};
|
||||
use crate::timeline::WalResidentTimeline;
|
||||
use crate::wal_service::ConnectionId;
|
||||
use crate::GlobalTimelines;
|
||||
use anyhow::{anyhow, Context};
|
||||
use bytes::BytesMut;
|
||||
use bytes::{BufMut as _, Bytes, BytesMut};
|
||||
use parking_lot::MappedMutexGuard;
|
||||
use parking_lot::Mutex;
|
||||
use parking_lot::MutexGuard;
|
||||
@@ -206,7 +207,8 @@ impl Drop for WalReceiverGuard {
|
||||
}
|
||||
}
|
||||
|
||||
pub const MSG_QUEUE_SIZE: usize = 256;
|
||||
// TODO: reconsider this.
|
||||
pub const MSG_QUEUE_SIZE: usize = 4096;
|
||||
pub const REPLY_QUEUE_SIZE: usize = 16;
|
||||
|
||||
impl SafekeeperPostgresHandler {
|
||||
@@ -484,6 +486,9 @@ const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
|
||||
/// every 5 seconds, for 12 samples per poll. This will give a count of up to 12x active timelines.
|
||||
const METRICS_INTERVAL: Duration = Duration::from_secs(5);
|
||||
|
||||
/// The AppendRequest buffer size.
|
||||
const APPEND_BUFFER_SIZE: usize = 1024 * 1024;
|
||||
|
||||
/// Encapsulates a task which takes messages from msg_rx, processes and pushes
|
||||
/// replies to reply_tx.
|
||||
///
|
||||
@@ -530,6 +535,9 @@ impl WalAcceptor {
|
||||
async fn run(&mut self) -> anyhow::Result<()> {
|
||||
let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id);
|
||||
|
||||
// Buffer AppendRequests to submit them as a single large write.
|
||||
let mut append_buf = BufferedAppendRequest::new(APPEND_BUFFER_SIZE);
|
||||
|
||||
// Periodically flush the WAL and compute metrics.
|
||||
let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL);
|
||||
flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
|
||||
@@ -546,7 +554,7 @@ impl WalAcceptor {
|
||||
// Process inbound message.
|
||||
msg = self.msg_rx.recv() => {
|
||||
// If disconnected, break to flush WAL and return.
|
||||
let Some(mut msg) = msg else {
|
||||
let Some(msg) = msg else {
|
||||
break;
|
||||
};
|
||||
|
||||
@@ -562,15 +570,45 @@ impl WalAcceptor {
|
||||
// Don't flush the WAL on every append, only periodically via flush_ticker.
|
||||
// This batches multiple appends per fsync. If the channel is empty after
|
||||
// sending the reply, we'll schedule an immediate flush.
|
||||
//
|
||||
// Note that a flush can still happen on segment bounds, which will result
|
||||
// in an AppendResponse.
|
||||
if let ProposerAcceptorMessage::AppendRequest(append_request) = msg {
|
||||
msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
|
||||
dirty = true;
|
||||
}
|
||||
// Try to batch multiple messages into a single large write.
|
||||
if !append_buf.is_empty() || !self.msg_rx.is_empty() {
|
||||
if append_buf.add(&append_request) {
|
||||
continue; // message buffered, go get next message
|
||||
}
|
||||
|
||||
self.tli.process_msg(&msg).await?
|
||||
// Full buffer, write it and buffer this message for next iteration.
|
||||
dirty = true;
|
||||
let buf_req = append_buf.take().expect("empty buffer");
|
||||
let buf_msg = ProposerAcceptorMessage::NoFlushAppendRequest(buf_req);
|
||||
let reply = self.tli.process_msg(&buf_msg).await?;
|
||||
drop(buf_msg); // allow reusing buffer for add
|
||||
assert!(append_buf.add(&append_request), "empty buffer rejected msg");
|
||||
reply
|
||||
} else {
|
||||
dirty = true;
|
||||
let msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
|
||||
self.tli.process_msg(&msg).await?
|
||||
}
|
||||
} else {
|
||||
self.tli.process_msg(&msg).await?
|
||||
}
|
||||
}
|
||||
|
||||
// If there are no pending messages, write the append buffer.
|
||||
//
|
||||
// NB: we don't also flush the WAL here. Otherwise we can get into a regime where we
|
||||
// quickly drain msg_rx and fsync before the sender is able to repopulate msg_rx.
|
||||
// This happens consistently due to Tokio scheduling, leading to overeager fsyncing.
|
||||
// Instead, we perform the write without fsyncing and give the sender a chance to
|
||||
// get scheduled and populate msg_rx for the next iteration. If there are no further
|
||||
// messages, the next iteration will flush the WAL.
|
||||
_ = future::ready(()), if self.msg_rx.is_empty() && !append_buf.is_empty() => {
|
||||
dirty = true;
|
||||
let buf_req = append_buf.take().expect("empty buffer");
|
||||
self.tli
|
||||
.process_msg(&ProposerAcceptorMessage::NoFlushAppendRequest(buf_req))
|
||||
.await?
|
||||
}
|
||||
|
||||
// While receiving AppendRequests, flush the WAL periodically and respond with an
|
||||
@@ -582,11 +620,11 @@ impl WalAcceptor {
|
||||
.await?
|
||||
}
|
||||
|
||||
// If there are no pending messages, flush the WAL immediately.
|
||||
// If there are no pending messages, flush the WAL and append buffer immediately.
|
||||
//
|
||||
// TODO: this should be done via flush_ticker.reset_immediately(), but that's always
|
||||
// delayed by 1ms due to this bug: https://github.com/tokio-rs/tokio/issues/6866.
|
||||
_ = future::ready(()), if dirty && self.msg_rx.is_empty() => {
|
||||
_ = future::ready(()), if self.msg_rx.is_empty() && dirty => {
|
||||
dirty = false;
|
||||
flush_ticker.reset();
|
||||
self.tli
|
||||
@@ -630,3 +668,115 @@ impl Drop for WalAcceptor {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Buffers WAL data for multiple AppendRequests, to submit them as a single write.
|
||||
struct BufferedAppendRequest {
|
||||
/// The buffer capacity.
|
||||
capacity: usize,
|
||||
/// The buffered header and WAL data.
|
||||
buf: Option<(AppendRequestHeader, BytesMut)>,
|
||||
/// A previous buffer that can be reused when the returned message is dropped.
|
||||
reuse_buf: Option<Bytes>,
|
||||
/// If an AppendRequest is larger than the buffer capacity (when empty), just stash it here to
|
||||
/// avoid growing the buffer and copying it. This will be returned as-is.
|
||||
large: Option<AppendRequest>,
|
||||
}
|
||||
|
||||
impl BufferedAppendRequest {
|
||||
/// Creates a new append request buffer with the given capacity.
|
||||
fn new(capacity: usize) -> Self {
|
||||
Self {
|
||||
capacity,
|
||||
buf: None,
|
||||
reuse_buf: None,
|
||||
large: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Adds the given append request to the buffer, if possible. Returns `false` if the message
|
||||
/// can't be buffered, leaving self unmodified. An empty buffer will always accept a message.
|
||||
///
|
||||
/// If the buffer is not empty, the message must have the same term and proposer and contiguous
|
||||
/// `begin_lsn` and `end_lsn`. The buffer must have available capacity for the entire
|
||||
/// `wal_data`. If the message is greater than an empty buffer's capacity, it is accepted but
|
||||
/// simply stashed away in `large` without growing the buffer.
|
||||
pub fn add(&mut self, msg: &AppendRequest) -> bool {
|
||||
// If there is a stashed large message, reject further messages.
|
||||
if self.large.is_some() {
|
||||
return false;
|
||||
}
|
||||
|
||||
// If there is no existing buffer, initialize one with the message.
|
||||
let Some((ref mut h, ref mut wal_data)) = self.buf else {
|
||||
// If the message is larger than the buffer capacity, just stash it instead of growing.
|
||||
if msg.wal_data.len() > self.capacity {
|
||||
assert!(self.large.is_none());
|
||||
self.large = Some(msg.clone()); // clone is cheap with Bytes
|
||||
return true;
|
||||
}
|
||||
|
||||
// Reuse a previous buffer, if any, or allocate a new one.
|
||||
//
|
||||
// TODO: try_into_mut() is essentially runtime borrow checking. If AppendRequest used a
|
||||
// normal Vec<u8> we could do compile-time borrow checking instead and avoid panic.
|
||||
let mut wal_data = match self.reuse_buf.take() {
|
||||
Some(reuse_buf) => match reuse_buf.try_into_mut() {
|
||||
Ok(mut reuse_buf) => {
|
||||
assert_eq!(reuse_buf.capacity(), self.capacity);
|
||||
reuse_buf.clear();
|
||||
reuse_buf
|
||||
}
|
||||
Err(_) => panic!("couldn't reuse buffer, still in use"),
|
||||
},
|
||||
None => BytesMut::with_capacity(self.capacity),
|
||||
};
|
||||
// Copy the append request into the buffer.
|
||||
wal_data.put_slice(&msg.wal_data);
|
||||
self.buf = Some((msg.h, wal_data));
|
||||
return true;
|
||||
};
|
||||
|
||||
// The messages must have the same term and proposer.
|
||||
if h.term != msg.h.term || h.proposer_uuid != msg.h.proposer_uuid {
|
||||
return false;
|
||||
}
|
||||
// The messages must be contiguous.
|
||||
if h.end_lsn != msg.h.begin_lsn {
|
||||
return false;
|
||||
}
|
||||
// The message must fit in the buffer.
|
||||
if wal_data.len() + msg.wal_data.len() > self.capacity {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Add the message to the buffer, bumping the commit and truncate LSNs. We assume that later
|
||||
// messages have later commit/truncate LSNs.
|
||||
h.end_lsn = msg.h.end_lsn;
|
||||
h.commit_lsn = msg.h.commit_lsn;
|
||||
h.truncate_lsn = msg.h.truncate_lsn;
|
||||
wal_data.put_slice(&msg.wal_data);
|
||||
true
|
||||
}
|
||||
|
||||
/// Returns true if there is no buffered message.
|
||||
fn is_empty(&self) -> bool {
|
||||
self.buf.is_none() && self.large.is_none()
|
||||
}
|
||||
|
||||
/// Takes the buffered AppendRequest (if any), leaving a None in its place.
|
||||
///
|
||||
/// NB: The returned `wal_data` Bytes must be dropped before the next call to `add()`, in order
|
||||
/// to reuse the buffer. This is basically runtime borrow checking, because of Bytes.
|
||||
fn take(&mut self) -> Option<AppendRequest> {
|
||||
// If there is a stashed large message, return it.
|
||||
if let Some(large) = self.large.take() {
|
||||
assert!(self.buf.is_none(), "both buf and large are set");
|
||||
return Some(large);
|
||||
}
|
||||
|
||||
let (h, wal_data) = self.buf.take()?;
|
||||
let wal_data = wal_data.freeze();
|
||||
self.reuse_buf = Some(wal_data.clone()); // keep a reference to the buffer
|
||||
Some(AppendRequest { h, wal_data })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -296,12 +296,13 @@ pub struct ProposerElected {
|
||||
|
||||
/// Request with WAL message sent from proposer to safekeeper. Along the way it
|
||||
/// communicates commit_lsn.
|
||||
#[derive(Debug)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct AppendRequest {
|
||||
pub h: AppendRequestHeader,
|
||||
pub wal_data: Bytes,
|
||||
}
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
|
||||
#[derive(Debug, Clone, Copy, Deserialize)]
|
||||
pub struct AppendRequestHeader {
|
||||
// safekeeper's current term; if it is higher than proposer's, the compute is out of date.
|
||||
pub term: Term,
|
||||
@@ -947,7 +948,6 @@ where
|
||||
// while first connection still gets some packets later. It might be
|
||||
// better to not log this as error! above.
|
||||
let write_lsn = self.wal_store.write_lsn();
|
||||
let flush_lsn = self.wal_store.flush_lsn();
|
||||
if write_lsn > msg.h.begin_lsn {
|
||||
bail!(
|
||||
"append request rewrites WAL written before, write_lsn={}, msg lsn={}",
|
||||
@@ -1005,9 +1005,7 @@ where
|
||||
);
|
||||
|
||||
// If flush_lsn hasn't updated, AppendResponse is not very useful.
|
||||
// This is the common case for !require_flush, but a flush can still
|
||||
// happen on segment bounds.
|
||||
if !require_flush && flush_lsn == self.flush_lsn() {
|
||||
if !require_flush {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
@@ -1169,7 +1167,7 @@ mod tests {
|
||||
proposer_uuid: [0; 16],
|
||||
};
|
||||
let mut append_request = AppendRequest {
|
||||
h: ar_hdr.clone(),
|
||||
h: ar_hdr,
|
||||
wal_data: Bytes::from_static(b"b"),
|
||||
};
|
||||
|
||||
@@ -1243,7 +1241,7 @@ mod tests {
|
||||
proposer_uuid: [0; 16],
|
||||
};
|
||||
let append_request = AppendRequest {
|
||||
h: ar_hdr.clone(),
|
||||
h: ar_hdr,
|
||||
wal_data: Bytes::from_static(b"b"),
|
||||
};
|
||||
|
||||
@@ -1251,7 +1249,7 @@ mod tests {
|
||||
sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
|
||||
.await
|
||||
.unwrap();
|
||||
let mut ar_hrd2 = ar_hdr.clone();
|
||||
let mut ar_hrd2 = ar_hdr;
|
||||
ar_hrd2.begin_lsn = Lsn(4);
|
||||
ar_hrd2.end_lsn = Lsn(5);
|
||||
let append_request = AppendRequest {
|
||||
|
||||
@@ -113,13 +113,6 @@ pub struct PhysicalStorage {
|
||||
/// non-aligned chunks of data.
|
||||
write_record_lsn: Lsn,
|
||||
|
||||
/// The last LSN flushed to disk. May be in the middle of a record.
|
||||
///
|
||||
/// NB: when the rest of the system refers to `flush_lsn`, it usually
|
||||
/// actually refers to `flush_record_lsn`. This ambiguity can be dangerous
|
||||
/// and should be resolved.
|
||||
flush_lsn: Lsn,
|
||||
|
||||
/// The LSN of the last WAL record flushed to disk.
|
||||
flush_record_lsn: Lsn,
|
||||
|
||||
@@ -134,29 +127,23 @@ pub struct PhysicalStorage {
|
||||
/// - doesn't point to the end of the segment
|
||||
file: Option<File>,
|
||||
|
||||
/// When true, WAL truncation potentially has been interrupted and we need
|
||||
/// to finish it before allowing WAL writes; see truncate_wal for details.
|
||||
/// In this case [`write_lsn`] can be less than actually written WAL on
|
||||
/// disk. In particular, there can be a case with unexpected .partial file.
|
||||
/// When false, we have just initialized storage using the LSN from find_end_of_wal().
|
||||
/// In this case, [`write_lsn`] can be less than actually written WAL on disk. In particular,
|
||||
/// there can be a case with unexpected .partial file.
|
||||
///
|
||||
/// Imagine the following:
|
||||
/// - 000000010000000000000001
|
||||
/// - it was fully written, but the last record is split between 2
|
||||
/// segments
|
||||
/// - after restart, `find_end_of_wal()` returned 0/1FFFFF0, which is in
|
||||
/// the end of this segment
|
||||
/// - `write_lsn`, `write_record_lsn` and `flush_record_lsn` were
|
||||
/// initialized to 0/1FFFFF0
|
||||
/// - it was fully written, but the last record is split between 2 segments
|
||||
/// - after restart, `find_end_of_wal()` returned 0/1FFFFF0, which is in the end of this segment
|
||||
/// - `write_lsn`, `write_record_lsn` and `flush_record_lsn` were initialized to 0/1FFFFF0
|
||||
/// - 000000010000000000000002.partial
|
||||
/// - it has only 1 byte written, which is not enough to make a full WAL
|
||||
/// record
|
||||
/// - it has only 1 byte written, which is not enough to make a full WAL record
|
||||
///
|
||||
/// Partial segment 002 has no WAL records, and it will be removed by the
|
||||
/// next truncate_wal(). This flag will be set to true after the first
|
||||
/// truncate_wal() call.
|
||||
/// Partial segment 002 has no WAL records, and it will be removed by the next truncate_wal().
|
||||
/// This flag will be set to true after the first truncate_wal() call.
|
||||
///
|
||||
/// [`write_lsn`]: Self::write_lsn
|
||||
pending_wal_truncation: bool,
|
||||
is_truncated_after_restart: bool,
|
||||
}
|
||||
|
||||
impl PhysicalStorage {
|
||||
@@ -218,11 +205,10 @@ impl PhysicalStorage {
|
||||
system_id: state.server.system_id,
|
||||
write_lsn,
|
||||
write_record_lsn: write_lsn,
|
||||
flush_lsn,
|
||||
flush_record_lsn: flush_lsn,
|
||||
decoder: WalStreamDecoder::new(write_lsn, state.server.pg_version / 10000),
|
||||
file: None,
|
||||
pending_wal_truncation: true,
|
||||
is_truncated_after_restart: false,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -303,9 +289,8 @@ impl PhysicalStorage {
|
||||
}
|
||||
}
|
||||
|
||||
/// Write WAL bytes, which are known to be located in a single WAL segment. Returns true if the
|
||||
/// segment was completed, closed, and flushed to disk.
|
||||
async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<bool> {
|
||||
/// Write WAL bytes, which are known to be located in a single WAL segment.
|
||||
async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> {
|
||||
let mut file = if let Some(file) = self.file.take() {
|
||||
file
|
||||
} else {
|
||||
@@ -329,24 +314,20 @@ impl PhysicalStorage {
|
||||
let (wal_file_path, wal_file_partial_path) =
|
||||
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size);
|
||||
fs::rename(wal_file_partial_path, wal_file_path).await?;
|
||||
Ok(true)
|
||||
} else {
|
||||
// otherwise, file can be reused later
|
||||
self.file = Some(file);
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Writes WAL to the segment files, until everything is writed. If some segments
|
||||
/// are fully written, they are flushed to disk. The last (partial) segment can
|
||||
/// be flushed separately later.
|
||||
///
|
||||
/// Updates `write_lsn` and `flush_lsn`.
|
||||
/// Updates `write_lsn`.
|
||||
async fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> {
|
||||
// TODO: this shouldn't be possible, except possibly with write_lsn == 0.
|
||||
// Rename this method to `append_exact`, and make it append-only, removing
|
||||
// the `pos` parameter and this check. For this reason, we don't update
|
||||
// `flush_lsn` here.
|
||||
if self.write_lsn != pos {
|
||||
// need to flush the file before discarding it
|
||||
if let Some(file) = self.file.take() {
|
||||
@@ -368,13 +349,9 @@ impl PhysicalStorage {
|
||||
buf.len()
|
||||
};
|
||||
|
||||
let flushed = self
|
||||
.write_in_segment(segno, xlogoff, &buf[..bytes_write])
|
||||
self.write_in_segment(segno, xlogoff, &buf[..bytes_write])
|
||||
.await?;
|
||||
self.write_lsn += bytes_write as u64;
|
||||
if flushed {
|
||||
self.flush_lsn = self.write_lsn;
|
||||
}
|
||||
buf = &buf[bytes_write..];
|
||||
}
|
||||
|
||||
@@ -388,9 +365,6 @@ impl Storage for PhysicalStorage {
|
||||
self.write_lsn
|
||||
}
|
||||
/// flush_lsn returns LSN of last durably stored WAL record.
|
||||
///
|
||||
/// TODO: flush_lsn() returns flush_record_lsn, but write_lsn() returns write_lsn: confusing.
|
||||
#[allow(clippy::misnamed_getters)]
|
||||
fn flush_lsn(&self) -> Lsn {
|
||||
self.flush_record_lsn
|
||||
}
|
||||
@@ -431,22 +405,14 @@ impl Storage for PhysicalStorage {
|
||||
startpos
|
||||
);
|
||||
}
|
||||
if self.pending_wal_truncation {
|
||||
bail!(
|
||||
"write_wal called with pending WAL truncation, write_lsn={}, startpos={}",
|
||||
self.write_lsn,
|
||||
startpos
|
||||
);
|
||||
}
|
||||
|
||||
let write_seconds = time_io_closure(self.write_exact(startpos, buf)).await?;
|
||||
// WAL is written, updating write metrics
|
||||
self.metrics.observe_write_seconds(write_seconds);
|
||||
self.metrics.observe_write_bytes(buf.len());
|
||||
|
||||
// Figure out the last record's end LSN and update `write_record_lsn`
|
||||
// (if we got a whole record). The write may also have closed and
|
||||
// flushed a segment, so update `flush_record_lsn` as well.
|
||||
// figure out last record's end lsn for reporting (if we got the
|
||||
// whole record)
|
||||
if self.decoder.available() != startpos {
|
||||
info!(
|
||||
"restart decoder from {} to {}",
|
||||
@@ -457,15 +423,12 @@ impl Storage for PhysicalStorage {
|
||||
self.decoder = WalStreamDecoder::new(startpos, pg_version);
|
||||
}
|
||||
self.decoder.feed_bytes(buf);
|
||||
|
||||
if self.write_record_lsn <= self.flush_lsn {
|
||||
// We may have flushed a previously written record.
|
||||
self.flush_record_lsn = self.write_record_lsn;
|
||||
}
|
||||
while let Some((lsn, _rec)) = self.decoder.poll_decode()? {
|
||||
self.write_record_lsn = lsn;
|
||||
if lsn <= self.flush_lsn {
|
||||
self.flush_record_lsn = lsn;
|
||||
loop {
|
||||
match self.decoder.poll_decode()? {
|
||||
None => break, // no full record yet
|
||||
Some((lsn, _rec)) => {
|
||||
self.write_record_lsn = lsn;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -482,17 +445,19 @@ impl Storage for PhysicalStorage {
|
||||
self.fdatasync_file(&unflushed_file).await?;
|
||||
self.file = Some(unflushed_file);
|
||||
} else {
|
||||
// We have unflushed data (write_lsn != flush_lsn), but no file. This
|
||||
// shouldn't happen, since the segment is flushed on close.
|
||||
bail!(
|
||||
"unexpected unflushed data with no open file, write_lsn={}, flush_lsn={}",
|
||||
self.write_lsn,
|
||||
self.flush_record_lsn
|
||||
);
|
||||
// We have unflushed data (write_lsn != flush_lsn), but no file.
|
||||
// This should only happen if last file was fully written and flushed,
|
||||
// but haven't updated flush_lsn yet.
|
||||
if self.write_lsn.segment_offset(self.wal_seg_size) != 0 {
|
||||
bail!(
|
||||
"unexpected unflushed data with no open file, write_lsn={}, flush_lsn={}",
|
||||
self.write_lsn,
|
||||
self.flush_record_lsn
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// everything is flushed now, let's update flush_lsn
|
||||
self.flush_lsn = self.write_lsn;
|
||||
self.flush_record_lsn = self.write_record_lsn;
|
||||
Ok(())
|
||||
}
|
||||
@@ -514,35 +479,15 @@ impl Storage for PhysicalStorage {
|
||||
);
|
||||
}
|
||||
|
||||
// Quick exit if nothing to do and we know that the state is clean to
|
||||
// avoid writing up to 16 MiB of zeros on disk (this happens on each
|
||||
// connect).
|
||||
if !self.pending_wal_truncation
|
||||
// Quick exit if nothing to do to avoid writing up to 16 MiB of zeros on
|
||||
// disk (this happens on each connect).
|
||||
if self.is_truncated_after_restart
|
||||
&& end_pos == self.write_lsn
|
||||
&& end_pos == self.flush_record_lsn
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Atomicity: we start with LSNs reset because once on disk deletion is
|
||||
// started it can't be reversed. However, we might crash/error in the
|
||||
// middle, leaving garbage above the truncation point. In theory,
|
||||
// concatenated with previous records it might form bogus WAL (though
|
||||
// very unlikely in practice because CRC would guard from that). To
|
||||
// protect, set pending_wal_truncation flag before beginning: it means
|
||||
// truncation must be retried and WAL writes are prohibited until it
|
||||
// succeeds. Flag is also set on boot because we don't know if the last
|
||||
// state was clean.
|
||||
//
|
||||
// Protocol (HandleElected before first AppendRequest) ensures we'll
|
||||
// always try to ensure clean truncation before any writes.
|
||||
self.pending_wal_truncation = true;
|
||||
|
||||
self.write_lsn = end_pos;
|
||||
self.flush_lsn = end_pos;
|
||||
self.write_record_lsn = end_pos;
|
||||
self.flush_record_lsn = end_pos;
|
||||
|
||||
// Close previously opened file, if any
|
||||
if let Some(unflushed_file) = self.file.take() {
|
||||
self.fdatasync_file(&unflushed_file).await?;
|
||||
@@ -568,7 +513,11 @@ impl Storage for PhysicalStorage {
|
||||
fs::rename(wal_file_path, wal_file_partial_path).await?;
|
||||
}
|
||||
|
||||
self.pending_wal_truncation = false;
|
||||
// Update LSNs
|
||||
self.write_lsn = end_pos;
|
||||
self.write_record_lsn = end_pos;
|
||||
self.flush_record_lsn = end_pos;
|
||||
self.is_truncated_after_restart = true;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -3642,7 +3642,6 @@ impl Service {
|
||||
match res {
|
||||
Ok(ok) => Ok(ok),
|
||||
Err(mgmt_api::Error::ApiError(StatusCode::CONFLICT, _)) => Ok(StatusCode::CONFLICT),
|
||||
Err(mgmt_api::Error::ApiError(StatusCode::SERVICE_UNAVAILABLE, msg)) => Err(ApiError::ResourceUnavailable(msg.into())),
|
||||
Err(e) => {
|
||||
Err(
|
||||
ApiError::InternalServerError(anyhow::anyhow!(
|
||||
@@ -6356,19 +6355,6 @@ impl Service {
|
||||
|
||||
// Pick the biggest tenant to split first
|
||||
top_n.sort_by_key(|i| i.resident_size);
|
||||
|
||||
// Filter out tenants in a prohibiting scheduling mode
|
||||
{
|
||||
let locked = self.inner.read().unwrap();
|
||||
top_n.retain(|i| {
|
||||
if let Some(shard) = locked.tenants.get(&i.id) {
|
||||
matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active)
|
||||
} else {
|
||||
false
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let Some(split_candidate) = top_n.into_iter().next() else {
|
||||
tracing::debug!("No split-elegible shards found");
|
||||
return;
|
||||
|
||||
@@ -5,8 +5,6 @@ from typing import TYPE_CHECKING, cast, final
|
||||
|
||||
import requests
|
||||
|
||||
from fixtures.log_helper import log
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Any, Literal, Optional
|
||||
|
||||
@@ -32,11 +30,7 @@ class NeonAPI:
|
||||
kwargs["headers"] = {}
|
||||
kwargs["headers"]["Authorization"] = f"Bearer {self.__neon_api_key}"
|
||||
|
||||
resp = requests.request(method, f"{self.__neon_api_base_url}{endpoint}", **kwargs)
|
||||
log.debug("%s %s returned a %d: %s", method, endpoint, resp.status_code, resp.text)
|
||||
resp.raise_for_status()
|
||||
|
||||
return resp
|
||||
return requests.request(method, f"{self.__neon_api_base_url}{endpoint}", **kwargs)
|
||||
|
||||
def create_project(
|
||||
self,
|
||||
@@ -72,6 +66,8 @@ class NeonAPI:
|
||||
json=data,
|
||||
)
|
||||
|
||||
assert resp.status_code == 201
|
||||
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def get_project_details(self, project_id: str) -> dict[str, Any]:
|
||||
@@ -83,7 +79,7 @@ class NeonAPI:
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
|
||||
assert resp.status_code == 200
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def delete_project(
|
||||
@@ -99,6 +95,8 @@ class NeonAPI:
|
||||
},
|
||||
)
|
||||
|
||||
assert resp.status_code == 200
|
||||
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def start_endpoint(
|
||||
@@ -114,6 +112,8 @@ class NeonAPI:
|
||||
},
|
||||
)
|
||||
|
||||
assert resp.status_code == 200
|
||||
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def suspend_endpoint(
|
||||
@@ -129,6 +129,8 @@ class NeonAPI:
|
||||
},
|
||||
)
|
||||
|
||||
assert resp.status_code == 200
|
||||
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def restart_endpoint(
|
||||
@@ -144,6 +146,8 @@ class NeonAPI:
|
||||
},
|
||||
)
|
||||
|
||||
assert resp.status_code == 200
|
||||
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def create_endpoint(
|
||||
@@ -174,6 +178,8 @@ class NeonAPI:
|
||||
json=data,
|
||||
)
|
||||
|
||||
assert resp.status_code == 201
|
||||
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def get_connection_uri(
|
||||
@@ -200,6 +206,8 @@ class NeonAPI:
|
||||
},
|
||||
)
|
||||
|
||||
assert resp.status_code == 200
|
||||
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def get_branches(self, project_id: str) -> dict[str, Any]:
|
||||
@@ -211,6 +219,8 @@ class NeonAPI:
|
||||
},
|
||||
)
|
||||
|
||||
assert resp.status_code == 200
|
||||
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def get_endpoints(self, project_id: str) -> dict[str, Any]:
|
||||
@@ -222,6 +232,8 @@ class NeonAPI:
|
||||
},
|
||||
)
|
||||
|
||||
assert resp.status_code == 200
|
||||
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def get_operations(self, project_id: str) -> dict[str, Any]:
|
||||
@@ -234,6 +246,8 @@ class NeonAPI:
|
||||
},
|
||||
)
|
||||
|
||||
assert resp.status_code == 200
|
||||
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def wait_for_operation_to_finish(self, project_id: str):
|
||||
|
||||
@@ -2379,17 +2379,6 @@ class NeonPageserver(PgProtocol, LogUtils):
|
||||
#
|
||||
# The entries in the list are regular experessions.
|
||||
self.allowed_errors: list[str] = list(DEFAULT_PAGESERVER_ALLOWED_ERRORS)
|
||||
# Store persistent failpoints that should be reapplied on each start
|
||||
self._persistent_failpoints: dict[str, str] = {}
|
||||
|
||||
def add_persistent_failpoint(self, name: str, action: str):
|
||||
"""
|
||||
Add a failpoint that will be automatically reapplied each time the pageserver starts.
|
||||
The failpoint will be set immediately if the pageserver is running.
|
||||
"""
|
||||
self._persistent_failpoints[name] = action
|
||||
if self.running:
|
||||
self.http_client().configure_failpoints([(name, action)])
|
||||
|
||||
def timeline_dir(
|
||||
self,
|
||||
@@ -2457,15 +2446,6 @@ class NeonPageserver(PgProtocol, LogUtils):
|
||||
"""
|
||||
assert self.running is False
|
||||
|
||||
if self._persistent_failpoints:
|
||||
# Tests shouldn't use this mechanism _and_ set FAILPOINTS explicitly
|
||||
assert extra_env_vars is None or "FAILPOINTS" not in extra_env_vars
|
||||
if extra_env_vars is None:
|
||||
extra_env_vars = {}
|
||||
extra_env_vars["FAILPOINTS"] = ",".join(
|
||||
f"{k}={v}" for (k, v) in self._persistent_failpoints.items()
|
||||
)
|
||||
|
||||
storage = self.env.pageserver_remote_storage
|
||||
if isinstance(storage, S3Storage):
|
||||
s3_env_vars = storage.access_env_vars()
|
||||
@@ -4542,7 +4522,7 @@ def pytest_addoption(parser: Parser):
|
||||
|
||||
|
||||
SMALL_DB_FILE_NAME_REGEX: re.Pattern[str] = re.compile(
|
||||
r"config-v1|heatmap-v1|tenant-manifest|metadata|.+\.(?:toml|pid|json|sql|conf)"
|
||||
r"config-v1|heatmap-v1|metadata|.+\.(?:toml|pid|json|sql|conf)"
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from collections.abc import Iterator
|
||||
from contextlib import contextmanager
|
||||
from typing import TYPE_CHECKING, cast
|
||||
|
||||
import psycopg2
|
||||
@@ -20,7 +18,7 @@ if TYPE_CHECKING:
|
||||
from fixtures.benchmark_fixture import NeonBenchmarker
|
||||
from fixtures.neon_api import NeonApiEndpoint
|
||||
from fixtures.neon_fixtures import NeonEnv, PgBin, VanillaPostgres
|
||||
from psycopg2.extensions import connection, cursor
|
||||
from psycopg2.extensions import cursor
|
||||
|
||||
|
||||
@pytest.mark.timeout(1000)
|
||||
@@ -294,48 +292,6 @@ def test_snap_files(
|
||||
then runs pgbench inserts while generating large numbers of snapfiles. Then restarts
|
||||
the node and tries to peek the replication changes.
|
||||
"""
|
||||
|
||||
@contextmanager
|
||||
def replication_slot(conn: connection, slot_name: str) -> Iterator[None]:
|
||||
"""
|
||||
Make sure that the replication slot doesn't outlive the test. Normally
|
||||
we wouldn't want this behavior, but since the test creates and drops
|
||||
the replication slot, we do.
|
||||
|
||||
We've had problems in the past where this slot sticking around caused
|
||||
issues with the publisher retaining WAL during the execution of the
|
||||
other benchmarks in this suite.
|
||||
"""
|
||||
|
||||
def __drop_replication_slot(c: cursor) -> None:
|
||||
c.execute(
|
||||
"""
|
||||
DO $$
|
||||
BEGIN
|
||||
IF EXISTS (
|
||||
SELECT 1
|
||||
FROM pg_replication_slots
|
||||
WHERE slot_name = %(slot_name)s
|
||||
) THEN
|
||||
PERFORM pg_drop_replication_slot(%(slot_name)s);
|
||||
END IF;
|
||||
END $$;
|
||||
""",
|
||||
{"slot_name": slot_name},
|
||||
)
|
||||
|
||||
with conn.cursor() as c:
|
||||
__drop_replication_slot(c)
|
||||
c.execute(
|
||||
"SELECT pg_create_logical_replication_slot(%(slot_name)s, 'test_decoding')",
|
||||
{"slot_name": slot_name},
|
||||
)
|
||||
|
||||
yield
|
||||
|
||||
with conn.cursor() as c:
|
||||
__drop_replication_slot(c)
|
||||
|
||||
test_duration_min = 60
|
||||
test_interval_min = 5
|
||||
pgbench_duration = f"-T{test_duration_min * 60 * 2}"
|
||||
@@ -358,35 +314,48 @@ def test_snap_files(
|
||||
conn = psycopg2.connect(connstr)
|
||||
conn.autocommit = True
|
||||
|
||||
with replication_slot(conn, "slotter"):
|
||||
workload = pg_bin.run_nonblocking(
|
||||
["pgbench", "-c10", pgbench_duration, "-Mprepared"], env=env
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
DO $$
|
||||
BEGIN
|
||||
IF EXISTS (
|
||||
SELECT 1
|
||||
FROM pg_replication_slots
|
||||
WHERE slot_name = 'slotter'
|
||||
) THEN
|
||||
PERFORM pg_drop_replication_slot('slotter');
|
||||
END IF;
|
||||
END $$;
|
||||
"""
|
||||
)
|
||||
try:
|
||||
start = time.time()
|
||||
prev_measurement = time.time()
|
||||
while time.time() - start < test_duration_min * 60:
|
||||
conn = psycopg2.connect(connstr)
|
||||
conn.autocommit = True
|
||||
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"SELECT count(*) FROM (SELECT pg_log_standby_snapshot() FROM generate_series(1, 10000) g) s"
|
||||
)
|
||||
check_pgbench_still_running(workload)
|
||||
cur.execute(
|
||||
"SELECT pg_replication_slot_advance('slotter', pg_current_wal_lsn())"
|
||||
)
|
||||
|
||||
conn.close()
|
||||
|
||||
# Measure storage
|
||||
if time.time() - prev_measurement > test_interval_min * 60:
|
||||
storage = benchmark_project_pub.get_synthetic_storage_size()
|
||||
zenbenchmark.record("storage", storage, "B", MetricReport.LOWER_IS_BETTER)
|
||||
prev_measurement = time.time()
|
||||
time.sleep(test_interval_min * 60 / 3)
|
||||
finally:
|
||||
workload.terminate()
|
||||
cur.execute("SELECT pg_create_logical_replication_slot('slotter', 'test_decoding')")
|
||||
|
||||
conn.close()
|
||||
|
||||
workload = pg_bin.run_nonblocking(["pgbench", "-c10", pgbench_duration, "-Mprepared"], env=env)
|
||||
try:
|
||||
start = time.time()
|
||||
prev_measurement = time.time()
|
||||
while time.time() - start < test_duration_min * 60:
|
||||
conn = psycopg2.connect(connstr)
|
||||
conn.autocommit = True
|
||||
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"SELECT count(*) FROM (SELECT pg_log_standby_snapshot() FROM generate_series(1, 10000) g) s"
|
||||
)
|
||||
check_pgbench_still_running(workload)
|
||||
cur.execute("SELECT pg_replication_slot_advance('slotter', pg_current_wal_lsn())")
|
||||
|
||||
conn.close()
|
||||
|
||||
# Measure storage
|
||||
if time.time() - prev_measurement > test_interval_min * 60:
|
||||
storage = benchmark_project_pub.get_synthetic_storage_size()
|
||||
zenbenchmark.record("storage", storage, "B", MetricReport.LOWER_IS_BETTER)
|
||||
prev_measurement = time.time()
|
||||
time.sleep(test_interval_min * 60 / 3)
|
||||
|
||||
finally:
|
||||
workload.terminate()
|
||||
|
||||
@@ -35,10 +35,9 @@ from fixtures.pageserver.utils import (
|
||||
wait_for_upload,
|
||||
)
|
||||
from fixtures.remote_storage import (
|
||||
LocalFsStorage,
|
||||
RemoteStorageKind,
|
||||
)
|
||||
from fixtures.utils import run_only_on_default_postgres, wait_until
|
||||
from fixtures.utils import wait_until
|
||||
from fixtures.workload import Workload
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -729,68 +728,3 @@ def test_upgrade_generationless_local_file_paths(
|
||||
)
|
||||
# We should download into the same local path we started with
|
||||
assert os.path.exists(victim_path)
|
||||
|
||||
|
||||
@run_only_on_default_postgres("Only tests index logic")
|
||||
def test_old_index_time_threshold(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
"""
|
||||
Exercise pageserver's detection of trying to load an ancient non-latest index.
|
||||
(see https://github.com/neondatabase/neon/issues/6951)
|
||||
"""
|
||||
|
||||
# Run with local_fs because we will interfere with mtimes by local filesystem access
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
workload.init()
|
||||
workload.write_rows(32)
|
||||
|
||||
# Remember generation 1's index path
|
||||
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
|
||||
index_path = env.pageserver_remote_storage.index_path(tenant_id, timeline_id)
|
||||
|
||||
# Increment generation by detaching+attaching, and write+flush some data to get a new remote index
|
||||
env.storage_controller.tenant_policy_update(tenant_id, {"placement": "Detached"})
|
||||
env.storage_controller.tenant_policy_update(tenant_id, {"placement": {"Attached": 0}})
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
workload.churn_rows(32)
|
||||
|
||||
# A new index should have been written
|
||||
assert env.pageserver_remote_storage.index_path(tenant_id, timeline_id) != index_path
|
||||
|
||||
# Hack the mtime on the generation 1 index
|
||||
log.info(f"Setting old mtime on {index_path}")
|
||||
os.utime(index_path, times=(time.time(), time.time() - 30 * 24 * 3600))
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*Found a newer index while loading an old one.*",
|
||||
".*Index age exceeds threshold and a newer index exists.*",
|
||||
]
|
||||
)
|
||||
|
||||
# Detach from storage controller + attach in an old generation directly on the pageserver.
|
||||
workload.stop()
|
||||
env.storage_controller.tenant_policy_update(tenant_id, {"placement": "Detached"})
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
env.storage_controller.tenant_policy_update(tenant_id, {"scheduling": "Stop"})
|
||||
env.storage_controller.allowed_errors.append(".*Scheduling is disabled by policy")
|
||||
|
||||
# The controller would not do this (attach in an old generation): we are doing it to simulate
|
||||
# a hypothetical profound bug in the controller.
|
||||
env.pageserver.http_client().tenant_location_conf(
|
||||
tenant_id, {"generation": 1, "mode": "AttachedSingle", "tenant_conf": {}}
|
||||
)
|
||||
|
||||
# The pageserver should react to this situation by refusing to attach the tenant and putting
|
||||
# it into Broken state
|
||||
env.pageserver.allowed_errors.append(".*tenant is broken.*")
|
||||
with pytest.raises(
|
||||
PageserverApiException,
|
||||
match="tenant is broken: Index age exceeds threshold and a newer index exists",
|
||||
):
|
||||
env.pageserver.http_client().timeline_detail(tenant_id, timeline_id)
|
||||
|
||||
@@ -122,7 +122,6 @@ def test_readonly_node(neon_simple_env: NeonEnv):
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.skip("See https://github.com/neondatabase/neon/issues/9754")
|
||||
def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test static endpoint is protected from GC by acquiring and renewing lsn leases.
|
||||
|
||||
@@ -1,33 +1,22 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import random
|
||||
import threading
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
from fixtures.common_types import TenantId, TenantShardId, TimelineArchivalState, TimelineId
|
||||
from fixtures.common_types import TenantId, TimelineArchivalState, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
last_flush_lsn_upload,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.pageserver.utils import (
|
||||
assert_prefix_empty,
|
||||
assert_prefix_not_empty,
|
||||
list_prefix,
|
||||
wait_until_tenant_active,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.pageserver.utils import assert_prefix_empty, assert_prefix_not_empty, list_prefix
|
||||
from fixtures.remote_storage import S3Storage, s3_storage
|
||||
from fixtures.utils import run_only_on_default_postgres, wait_until
|
||||
from fixtures.utils import wait_until
|
||||
from mypy_boto3_s3.type_defs import (
|
||||
ObjectTypeDef,
|
||||
)
|
||||
from psycopg2.errors import IoError, UndefinedTable
|
||||
|
||||
|
||||
@pytest.mark.parametrize("shard_count", [0, 4])
|
||||
@@ -389,279 +378,8 @@ def test_timeline_offload_persist(neon_env_builder: NeonEnvBuilder, delete_timel
|
||||
)
|
||||
|
||||
|
||||
@run_only_on_default_postgres("this test isn't sensitive to the contents of timelines")
|
||||
def test_timeline_archival_chaos(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
A general consistency check on archival/offload timeline state, and its intersection
|
||||
with tenant migrations and timeline deletions.
|
||||
"""
|
||||
|
||||
# Offloading is off by default at time of writing: remove this line when it's on by default
|
||||
neon_env_builder.pageserver_config_override = "timeline_offloading = true"
|
||||
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
||||
|
||||
# We will exercise migrations, so need multiple pageservers
|
||||
neon_env_builder.num_pageservers = 2
|
||||
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
"compaction_period": "1s",
|
||||
}
|
||||
)
|
||||
tenant_id = env.initial_tenant
|
||||
tenant_shard_id = TenantShardId(tenant_id, 0, 0)
|
||||
|
||||
# Unavailable pageservers during timeline CRUD operations can be logged as errors on the storage controller
|
||||
env.storage_controller.allowed_errors.extend(
|
||||
[
|
||||
".*error sending request.*",
|
||||
# FIXME: the pageserver should not return 500s on cancellation (https://github.com/neondatabase/neon/issues/97680)
|
||||
".*InternalServerError(Error deleting timeline .* on .* on .*: pageserver API: error: Cancelled",
|
||||
]
|
||||
)
|
||||
|
||||
for ps in env.pageservers:
|
||||
# We will do unclean restarts, which results in these messages when cleaning up files
|
||||
ps.allowed_errors.extend(
|
||||
[
|
||||
".*removing local file.*because it has unexpected length.*",
|
||||
".*__temp.*",
|
||||
# FIXME: there are still anyhow::Error paths in timeline creation/deletion which
|
||||
# generate 500 results when called during shutdown (https://github.com/neondatabase/neon/issues/9768)
|
||||
".*InternalServerError.*",
|
||||
# FIXME: there are still anyhow::Error paths in timeline deletion that generate
|
||||
# log lines at error severity (https://github.com/neondatabase/neon/issues/9768)
|
||||
".*delete_timeline.*Error",
|
||||
]
|
||||
)
|
||||
|
||||
class TimelineState:
|
||||
def __init__(self):
|
||||
self.timeline_id = TimelineId.generate()
|
||||
self.created = False
|
||||
self.archived = False
|
||||
self.offloaded = False
|
||||
self.deleted = False
|
||||
|
||||
controller_ps_api = env.storage_controller.pageserver_api()
|
||||
|
||||
shutdown = threading.Event()
|
||||
|
||||
violations = []
|
||||
|
||||
timelines_deleted = []
|
||||
|
||||
def list_timelines(tenant_id) -> tuple[set[TimelineId], set[TimelineId]]:
|
||||
"""Get the list of active and offloaded TimelineId"""
|
||||
listing = controller_ps_api.timeline_and_offloaded_list(tenant_id)
|
||||
active_ids = set([TimelineId(t["timeline_id"]) for t in listing.timelines])
|
||||
offloaded_ids = set([TimelineId(t["timeline_id"]) for t in listing.offloaded])
|
||||
|
||||
return (active_ids, offloaded_ids)
|
||||
|
||||
def timeline_objects(tenant_shard_id, timeline_id):
|
||||
response = list_prefix(
|
||||
env.pageserver_remote_storage, # type: ignore
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
str(tenant_shard_id),
|
||||
"timelines",
|
||||
str(timeline_id),
|
||||
)
|
||||
)
|
||||
+ "/",
|
||||
)
|
||||
|
||||
return [k["Key"] for k in response.get("Contents", [])]
|
||||
|
||||
def worker():
|
||||
"""
|
||||
Background thread which drives timeline lifecycle operations, and checks that between steps
|
||||
it obeys invariants. This should detect errors in pageserver persistence and in errors in
|
||||
concurrent operations on different timelines when it is run many times in parallel.
|
||||
"""
|
||||
state = TimelineState()
|
||||
|
||||
# Jitter worker startup, we're not interested in exercising lots of concurrent creations
|
||||
# as we know that's I/O bound.
|
||||
shutdown.wait(random.random() * 10)
|
||||
|
||||
while not shutdown.is_set():
|
||||
# A little wait between actions to jitter out the API calls rather than having them
|
||||
# all queue up at once
|
||||
shutdown.wait(random.random())
|
||||
|
||||
try:
|
||||
if not state.created:
|
||||
log.info(f"Creating timeline {state.timeline_id}")
|
||||
controller_ps_api.timeline_create(
|
||||
PgVersion.NOT_SET, tenant_id=tenant_id, new_timeline_id=state.timeline_id
|
||||
)
|
||||
state.created = True
|
||||
|
||||
if (
|
||||
timeline_objects(
|
||||
tenant_shard_id=tenant_shard_id, timeline_id=state.timeline_id
|
||||
)
|
||||
== []
|
||||
):
|
||||
msg = f"Timeline {state.timeline_id} unexpectedly not present in remote storage"
|
||||
violations.append(msg)
|
||||
|
||||
elif state.deleted:
|
||||
# Try to confirm its deletion completed.
|
||||
# Deleted timeline should not appear in listing API, either as offloaded or active
|
||||
(active_ids, offloaded_ids) = list_timelines(tenant_id)
|
||||
if state.timeline_id in active_ids or state.timeline_id in offloaded_ids:
|
||||
msg = f"Timeline {state.timeline_id} appeared in listing after deletion was acked"
|
||||
violations.append(msg)
|
||||
raise RuntimeError(msg)
|
||||
|
||||
objects = timeline_objects(tenant_shard_id, state.timeline_id)
|
||||
if len(objects) == 0:
|
||||
log.info(f"Confirmed deletion of timeline {state.timeline_id}")
|
||||
timelines_deleted.append(state.timeline_id)
|
||||
state = TimelineState() # A new timeline ID to create on next iteration
|
||||
else:
|
||||
# Deletion of objects doesn't have to be synchronous, we will keep polling
|
||||
log.info(f"Timeline {state.timeline_id} objects still exist: {objects}")
|
||||
shutdown.wait(random.random())
|
||||
else:
|
||||
# The main lifetime of a timeline: proceed active->archived->offloaded->deleted
|
||||
if not state.archived:
|
||||
log.info(f"Archiving timeline {state.timeline_id}")
|
||||
controller_ps_api.timeline_archival_config(
|
||||
tenant_id, state.timeline_id, TimelineArchivalState.ARCHIVED
|
||||
)
|
||||
state.archived = True
|
||||
elif state.archived and not state.offloaded:
|
||||
log.info(f"Waiting for offload of timeline {state.timeline_id}")
|
||||
# Wait for offload: this should happen fast because we configured a short compaction interval
|
||||
while not shutdown.is_set():
|
||||
(active_ids, offloaded_ids) = list_timelines(tenant_id)
|
||||
if state.timeline_id in active_ids:
|
||||
log.info(f"Timeline {state.timeline_id} is still active")
|
||||
shutdown.wait(0.5)
|
||||
elif state.timeline_id in offloaded_ids:
|
||||
log.info(f"Timeline {state.timeline_id} is now offloaded")
|
||||
state.offloaded = True
|
||||
break
|
||||
else:
|
||||
# Timeline is neither offloaded nor active, this is unexpected: the pageserver
|
||||
# should ensure that the timeline appears in either the offloaded list or main list
|
||||
msg = f"Timeline {state.timeline_id} disappeared!"
|
||||
violations.append(msg)
|
||||
raise RuntimeError(msg)
|
||||
elif state.offloaded:
|
||||
# Once it's offloaded it should only be in offloaded or deleted state: check
|
||||
# it didn't revert back to active. This tests that the manfiest is doing its
|
||||
# job to suppress loading of offloaded timelines as active.
|
||||
(active_ids, offloaded_ids) = list_timelines(tenant_id)
|
||||
if state.timeline_id in active_ids:
|
||||
msg = f"Timeline {state.timeline_id} is active, should be offloaded or deleted"
|
||||
violations.append(msg)
|
||||
raise RuntimeError(msg)
|
||||
|
||||
log.info(f"Deleting timeline {state.timeline_id}")
|
||||
controller_ps_api.timeline_delete(tenant_id, state.timeline_id)
|
||||
state.deleted = True
|
||||
else:
|
||||
raise RuntimeError("State should be unreachable")
|
||||
except PageserverApiException as e:
|
||||
# This is expected: we are injecting chaos, API calls will sometimes fail.
|
||||
# TODO: can we narrow this to assert we are getting friendly 503s?
|
||||
log.info(f"Iteration error, will retry: {e}")
|
||||
shutdown.wait(random.random())
|
||||
except requests.exceptions.RetryError as e:
|
||||
# Retryable error repeated more times than `requests` is configured to tolerate, this
|
||||
# is expected when a pageserver remains unavailable for a couple seconds
|
||||
log.info(f"Iteration error, will retry: {e}")
|
||||
shutdown.wait(random.random())
|
||||
except Exception as e:
|
||||
log.warning(
|
||||
f"Unexpected worker exception (current timeline {state.timeline_id}): {e}"
|
||||
)
|
||||
else:
|
||||
# In the non-error case, use a jitterd but small wait, we want to keep
|
||||
# a high rate of operations going
|
||||
shutdown.wait(random.random() * 0.1)
|
||||
|
||||
n_workers = 4
|
||||
threads = []
|
||||
for _i in range(0, n_workers):
|
||||
t = threading.Thread(target=worker)
|
||||
t.start()
|
||||
threads.append(t)
|
||||
|
||||
# Set delay failpoints so that deletions and migrations take some time, and have a good
|
||||
# chance to interact with other concurrent timeline mutations.
|
||||
env.storage_controller.configure_failpoints(
|
||||
[("reconciler-live-migrate-pre-await-lsn", "sleep(1)")]
|
||||
)
|
||||
for ps in env.pageservers:
|
||||
ps.add_persistent_failpoint("in_progress_delete", "sleep(1)")
|
||||
|
||||
# Generate some chaos, while our workers are trying to complete their timeline operations
|
||||
rng = random.Random()
|
||||
try:
|
||||
chaos_rounds = 48
|
||||
for _i in range(0, chaos_rounds):
|
||||
action = rng.choice([0, 1])
|
||||
if action == 0:
|
||||
# Pick a random pageserver to gracefully restart
|
||||
pageserver = rng.choice(env.pageservers)
|
||||
|
||||
# Whether to use a graceful shutdown or SIGKILL
|
||||
immediate = random.choice([True, False])
|
||||
log.info(f"Restarting pageserver {pageserver.id}, immediate={immediate}")
|
||||
|
||||
t1 = time.time()
|
||||
pageserver.restart(immediate=immediate)
|
||||
restart_duration = time.time() - t1
|
||||
|
||||
# Make sure we're up for as long as we spent restarting, to ensure operations can make progress
|
||||
log.info(f"Staying alive for {restart_duration}s")
|
||||
time.sleep(restart_duration)
|
||||
else:
|
||||
# Migrate our tenant between pageservers
|
||||
origin_ps = env.get_tenant_pageserver(tenant_shard_id)
|
||||
dest_ps = rng.choice([ps for ps in env.pageservers if ps.id != origin_ps.id])
|
||||
log.info(f"Migrating {tenant_shard_id} {origin_ps.id}->{dest_ps.id}")
|
||||
env.storage_controller.tenant_shard_migrate(
|
||||
tenant_shard_id=tenant_shard_id, dest_ps_id=dest_ps.id
|
||||
)
|
||||
|
||||
log.info(f"Full timeline lifecycles so far: {len(timelines_deleted)}")
|
||||
finally:
|
||||
shutdown.set()
|
||||
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
# Sanity check that during our run we did exercise some full timeline lifecycles, in case
|
||||
# one of our workers got stuck
|
||||
assert len(timelines_deleted) > 10
|
||||
|
||||
# That no invariant-violations were reported by workers
|
||||
assert violations == []
|
||||
|
||||
|
||||
@pytest.mark.parametrize("with_intermediary", [False, True])
|
||||
@pytest.mark.parametrize(
|
||||
"offload_child",
|
||||
[
|
||||
"offload",
|
||||
"offload-corrupt",
|
||||
"offload-no-restart",
|
||||
"offload-parent",
|
||||
"archive",
|
||||
None,
|
||||
],
|
||||
)
|
||||
def test_timeline_retain_lsn(
|
||||
neon_env_builder: NeonEnvBuilder, with_intermediary: bool, offload_child: Optional[str]
|
||||
):
|
||||
@pytest.mark.parametrize("offload_child", ["offload", "offload-corrupt", "archive", None])
|
||||
def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Optional[str]):
|
||||
"""
|
||||
Ensure that retain_lsn functionality for timelines works, both for offloaded and non-offloaded ones
|
||||
"""
|
||||
@@ -669,7 +387,6 @@ def test_timeline_retain_lsn(
|
||||
# Our corruption code only works with S3 compatible storage
|
||||
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
||||
|
||||
neon_env_builder.rust_log_override = "info,[gc_timeline]=debug"
|
||||
env = neon_env_builder.init_start()
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
@@ -677,30 +394,22 @@ def test_timeline_retain_lsn(
|
||||
tenant_id, root_timeline_id = env.create_tenant(
|
||||
conf={
|
||||
# small checkpointing and compaction targets to ensure we generate many upload operations
|
||||
"checkpoint_distance": 32 * 1024,
|
||||
"checkpoint_distance": 128 * 1024,
|
||||
"compaction_threshold": 1,
|
||||
"compaction_target_size": 32 * 1024,
|
||||
"compaction_target_size": 128 * 1024,
|
||||
# set small image creation thresholds so that gc deletes data
|
||||
"image_creation_threshold": 1,
|
||||
"image_creation_threshold": 2,
|
||||
# disable background compaction and GC. We invoke it manually when we want it to happen.
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
# Disable pitr, we only want the latest lsn
|
||||
"pitr_interval": "0s",
|
||||
"gc_horizon": 0,
|
||||
# Don't rely on endpoint lsn leases
|
||||
"lsn_lease_length": "0s",
|
||||
}
|
||||
)
|
||||
|
||||
if with_intermediary:
|
||||
parent_branch_name = "test_archived_parent"
|
||||
parent_timeline_id = env.create_branch("test_archived_parent", tenant_id)
|
||||
else:
|
||||
parent_branch_name = "main"
|
||||
parent_timeline_id = root_timeline_id
|
||||
|
||||
with env.endpoints.create_start(parent_branch_name, tenant_id=tenant_id) as endpoint:
|
||||
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
|
||||
endpoint.safe_psql_many(
|
||||
[
|
||||
"CREATE TABLE foo(v int, key serial primary key, t text default 'data_content')",
|
||||
@@ -710,16 +419,14 @@ def test_timeline_retain_lsn(
|
||||
)
|
||||
pre_branch_sum = endpoint.safe_psql("SELECT sum(key) from foo where v < 51200")
|
||||
log.info(f"Pre branch sum: {pre_branch_sum}")
|
||||
last_flush_lsn_upload(env, endpoint, tenant_id, parent_timeline_id)
|
||||
last_flush_lsn_upload(env, endpoint, tenant_id, root_timeline_id)
|
||||
|
||||
# Create a branch and write some additional data to the parent
|
||||
child_timeline_id = env.create_branch(
|
||||
"test_archived_branch", tenant_id, ancestor_branch_name=parent_branch_name
|
||||
)
|
||||
child_timeline_id = env.create_branch("test_archived_branch", tenant_id)
|
||||
|
||||
with env.endpoints.create_start(parent_branch_name, tenant_id=tenant_id) as endpoint:
|
||||
# Do some overwriting churn with compactions in between. This is important so that we can overwrite image layers.
|
||||
for i in range(5):
|
||||
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
|
||||
# Do some churn of the data. This is important so that we can overwrite image layers.
|
||||
for i in range(10):
|
||||
endpoint.safe_psql_many(
|
||||
[
|
||||
f"SELECT setseed(0.23{i})",
|
||||
@@ -728,9 +435,9 @@ def test_timeline_retain_lsn(
|
||||
"UPDATE foo SET v=(random() * 409600)::int WHERE v % 3 = 0",
|
||||
]
|
||||
)
|
||||
last_flush_lsn_upload(env, endpoint, tenant_id, parent_timeline_id)
|
||||
post_branch_sum = endpoint.safe_psql("SELECT sum(key) from foo where v < 51200")
|
||||
log.info(f"Post branch sum: {post_branch_sum}")
|
||||
last_flush_lsn_upload(env, endpoint, tenant_id, root_timeline_id)
|
||||
|
||||
if offload_child is not None:
|
||||
ps_http.timeline_archival_config(
|
||||
@@ -745,19 +452,9 @@ def test_timeline_retain_lsn(
|
||||
assert leaf_detail["is_archived"] is True
|
||||
if "offload" in offload_child:
|
||||
ps_http.timeline_offload(tenant_id, child_timeline_id)
|
||||
if "offload-parent" in offload_child:
|
||||
# Also offload the parent to ensure the retain_lsn of the child
|
||||
# is entered in the parent at unoffloading
|
||||
ps_http.timeline_archival_config(
|
||||
tenant_id,
|
||||
parent_timeline_id,
|
||||
state=TimelineArchivalState.ARCHIVED,
|
||||
)
|
||||
ps_http.timeline_offload(tenant_id, parent_timeline_id)
|
||||
|
||||
# Do a restart to get rid of any in-memory objects (we only init gc info once, at attach)
|
||||
if offload_child is None or "no-restart" not in offload_child:
|
||||
env.pageserver.stop()
|
||||
env.pageserver.stop()
|
||||
if offload_child == "offload-corrupt":
|
||||
assert isinstance(env.pageserver_remote_storage, S3Storage)
|
||||
listing = list_prefix(
|
||||
@@ -792,21 +489,13 @@ def test_timeline_retain_lsn(
|
||||
".*page_service_conn_main.*could not find data for key.*",
|
||||
]
|
||||
)
|
||||
if offload_child is None or "no-restart" not in offload_child:
|
||||
env.pageserver.start()
|
||||
if offload_child == "offload-parent":
|
||||
wait_until_tenant_active(ps_http, tenant_id=tenant_id)
|
||||
ps_http.timeline_archival_config(
|
||||
tenant_id,
|
||||
parent_timeline_id,
|
||||
state=TimelineArchivalState.UNARCHIVED,
|
||||
)
|
||||
env.pageserver.start()
|
||||
|
||||
# Do an agressive gc and compaction of the parent branch
|
||||
ps_http.timeline_gc(tenant_id=tenant_id, timeline_id=parent_timeline_id, gc_horizon=0)
|
||||
ps_http.timeline_gc(tenant_id=tenant_id, timeline_id=root_timeline_id, gc_horizon=0)
|
||||
ps_http.timeline_checkpoint(
|
||||
tenant_id,
|
||||
parent_timeline_id,
|
||||
root_timeline_id,
|
||||
force_l0_compaction=True,
|
||||
force_repartition=True,
|
||||
wait_until_uploaded=True,
|
||||
@@ -822,15 +511,10 @@ def test_timeline_retain_lsn(
|
||||
|
||||
# Now, after unarchival, the child timeline should still have its data accessible (or corrupted)
|
||||
if offload_child == "offload-corrupt":
|
||||
if with_intermediary:
|
||||
error_regex = "(.*could not read .* from page server.*|.*relation .* does not exist)"
|
||||
else:
|
||||
error_regex = ".*failed to get basebackup.*"
|
||||
with pytest.raises((RuntimeError, IoError, UndefinedTable), match=error_regex):
|
||||
with env.endpoints.create_start(
|
||||
with pytest.raises(RuntimeError, match=".*failed to get basebackup.*"):
|
||||
env.endpoints.create_start(
|
||||
"test_archived_branch", tenant_id=tenant_id, basebackup_request_tries=1
|
||||
) as endpoint:
|
||||
endpoint.safe_psql("SELECT sum(key) from foo where v < 51200")
|
||||
)
|
||||
else:
|
||||
with env.endpoints.create_start("test_archived_branch", tenant_id=tenant_id) as endpoint:
|
||||
sum = endpoint.safe_psql("SELECT sum(key) from foo where v < 51200")
|
||||
|
||||
@@ -1,33 +0,0 @@
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
|
||||
#
|
||||
# Test that VM is properly truncated
|
||||
#
|
||||
def test_vm_truncate(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
con = endpoint.connect()
|
||||
cur = con.cursor()
|
||||
cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
cur.execute("CREATE EXTENSION pageinspect")
|
||||
|
||||
cur.execute(
|
||||
"create table t(pk integer primary key, counter integer default 0, filler text default repeat('?', 200))"
|
||||
)
|
||||
cur.execute("insert into t (pk) values (generate_series(1,1000))")
|
||||
cur.execute("delete from t where pk>10")
|
||||
cur.execute("vacuum t") # truncates the relation, including its VM and FSM
|
||||
# get image of the first block of the VM excluding the page header. It's expected
|
||||
# to still be in the buffer cache.
|
||||
# ignore page header (24 bytes, 48 - it's hex representation)
|
||||
cur.execute("select substr(encode(get_raw_page('t', 'vm', 0), 'hex'), 48)")
|
||||
pg_bitmap = cur.fetchall()[0][0]
|
||||
# flush shared buffers
|
||||
cur.execute("SELECT clear_buffer_cache()")
|
||||
# now download the first block of the VM from the pageserver ...
|
||||
cur.execute("select substr(encode(get_raw_page('t', 'vm', 0), 'hex'), 48)")
|
||||
ps_bitmap = cur.fetchall()[0][0]
|
||||
# and check that content of bitmaps are equal, i.e. PS is producing the same VM page as Postgres
|
||||
assert pg_bitmap == ps_bitmap
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: c5e0d642ef...de0a000daf
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 1feff6b60f...fd631a9590
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: b0b693ea29...03b43900ed
2
vendor/postgres-v17
vendored
2
vendor/postgres-v17
vendored
Submodule vendor/postgres-v17 updated: aa2e29f2b6...ae4cc30dba
16
vendor/revisions.json
vendored
16
vendor/revisions.json
vendored
@@ -1,18 +1,18 @@
|
||||
{
|
||||
"v17": [
|
||||
"17.1",
|
||||
"aa2e29f2b6952140dfe51876bbd11054acae776f"
|
||||
"17.0",
|
||||
"ae4cc30dba24f3910533e5a48e8103c3f2fff300"
|
||||
],
|
||||
"v16": [
|
||||
"16.5",
|
||||
"b0b693ea298454e95e6b154780d1fd586a244dfd"
|
||||
"16.4",
|
||||
"03b43900edc5d8d6eecec460bfc89aec7174bd84"
|
||||
],
|
||||
"v15": [
|
||||
"15.9",
|
||||
"1feff6b60f07cb71b665d0f5ead71a4320a71743"
|
||||
"15.8",
|
||||
"fd631a959049dfe2b82f67409c8b8b0d3e0016d1"
|
||||
],
|
||||
"v14": [
|
||||
"14.14",
|
||||
"c5e0d642efb02e4bfedc283b0a7707fe6c79cc89"
|
||||
"14.13",
|
||||
"de0a000dafc2e66ce2e39282d3aa1c704fe0390e"
|
||||
]
|
||||
}
|
||||
|
||||
@@ -52,14 +52,13 @@ lazy_static = { version = "1", default-features = false, features = ["spin_no_st
|
||||
libc = { version = "0.2", features = ["extra_traits", "use_std"] }
|
||||
log = { version = "0.4", default-features = false, features = ["std"] }
|
||||
memchr = { version = "2" }
|
||||
nix = { version = "0.26" }
|
||||
nom = { version = "7" }
|
||||
num-bigint = { version = "0.4" }
|
||||
num-integer = { version = "0.1", features = ["i128"] }
|
||||
num-traits = { version = "0.2", features = ["i128", "libm"] }
|
||||
once_cell = { version = "1" }
|
||||
parquet = { version = "53", default-features = false, features = ["zstd"] }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon", default-features = false, features = ["with-serde_json-1"] }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2", default-features = false, features = ["with-serde_json-1"] }
|
||||
prost = { version = "0.13", features = ["prost-derive"] }
|
||||
rand = { version = "0.8", features = ["small_rng"] }
|
||||
regex = { version = "1" }
|
||||
@@ -76,10 +75,10 @@ smallvec = { version = "1", default-features = false, features = ["const_new", "
|
||||
spki = { version = "0.7", default-features = false, features = ["pem", "std"] }
|
||||
subtle = { version = "2" }
|
||||
sync_wrapper = { version = "0.1", default-features = false, features = ["futures"] }
|
||||
tikv-jemalloc-sys = { version = "0.6", features = ["stats"] }
|
||||
tikv-jemalloc-sys = { version = "0.5" }
|
||||
time = { version = "0.3", features = ["macros", "serde-well-known"] }
|
||||
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon", features = ["with-serde_json-1"] }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2", features = ["with-serde_json-1"] }
|
||||
tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring", "tls12"] }
|
||||
tokio-stream = { version = "0.1", features = ["net"] }
|
||||
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }
|
||||
|
||||
Reference in New Issue
Block a user