mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-13 23:50:36 +00:00
Compare commits
1 Commits
arpad/musl
...
skyzh/flus
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a6898f8deb |
2
.github/workflows/release.yml
vendored
2
.github/workflows/release.yml
vendored
@@ -53,7 +53,7 @@ jobs:
|
||||
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
|
||||
run: |
|
||||
cat << EOF > body.md
|
||||
## Storage & Compute release ${RELEASE_DATE}
|
||||
## Release ${RELEASE_DATE}
|
||||
|
||||
**Please merge this Pull Request using 'Create a merge commit' button**
|
||||
EOF
|
||||
|
||||
167
Cargo.lock
generated
167
Cargo.lock
generated
@@ -734,6 +734,8 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "azure_core"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "70fd680c0d0424a518229b1150922f92653ba2ac933aa000abc8bf1ca08105f7"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"base64 0.21.1",
|
||||
@@ -762,6 +764,8 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "azure_identity"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a6d2060f5b2e1c664026ca4edd561306c473be887c1f7a81f10bf06f9b71c63f"
|
||||
dependencies = [
|
||||
"async-lock",
|
||||
"async-trait",
|
||||
@@ -780,6 +784,8 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "azure_storage"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "15d3da73bfa09350e1bd6ae2a260806fcf90048c7e78cd2d8f88be60b19a7266"
|
||||
dependencies = [
|
||||
"RustyXML",
|
||||
"async-lock",
|
||||
@@ -797,6 +803,8 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "azure_storage_blobs"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "149c21834a4105d761e3dd33d91c2a3064acc05a3c978848ea8089102ae45c94"
|
||||
dependencies = [
|
||||
"RustyXML",
|
||||
"azure_core",
|
||||
@@ -816,6 +824,8 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "azure_svc_blobstorage"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "88c888b7bf522d5405218b8613bf0fae7ddaae6ef3bf4ad42ae005993c96ab8b"
|
||||
dependencies = [
|
||||
"azure_core",
|
||||
"bytes",
|
||||
@@ -1966,6 +1976,21 @@ version = "1.0.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
|
||||
|
||||
[[package]]
|
||||
name = "foreign-types"
|
||||
version = "0.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
|
||||
dependencies = [
|
||||
"foreign-types-shared",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "foreign-types-shared"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
|
||||
|
||||
[[package]]
|
||||
name = "form_urlencoded"
|
||||
version = "1.1.0"
|
||||
@@ -2595,6 +2620,19 @@ dependencies = [
|
||||
"tokio-io-timeout",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper-tls"
|
||||
version = "0.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"hyper 0.14.26",
|
||||
"native-tls",
|
||||
"tokio",
|
||||
"tokio-native-tls",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper-util"
|
||||
version = "0.1.3"
|
||||
@@ -3130,6 +3168,24 @@ version = "0.8.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
|
||||
|
||||
[[package]]
|
||||
name = "native-tls"
|
||||
version = "0.2.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e"
|
||||
dependencies = [
|
||||
"lazy_static",
|
||||
"libc",
|
||||
"log",
|
||||
"openssl",
|
||||
"openssl-probe",
|
||||
"openssl-sys",
|
||||
"schannel",
|
||||
"security-framework",
|
||||
"security-framework-sys",
|
||||
"tempfile",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nix"
|
||||
version = "0.25.1"
|
||||
@@ -3358,15 +3414,55 @@ version = "11.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
|
||||
|
||||
[[package]]
|
||||
name = "openssl"
|
||||
version = "0.10.60"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "79a4c6c3a2b158f7f8f2a2fc5a969fa3a068df6fc9dbb4a43845436e3af7c800"
|
||||
dependencies = [
|
||||
"bitflags 2.4.1",
|
||||
"cfg-if",
|
||||
"foreign-types",
|
||||
"libc",
|
||||
"once_cell",
|
||||
"openssl-macros",
|
||||
"openssl-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "openssl-macros"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.52",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "openssl-probe"
|
||||
version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
|
||||
|
||||
[[package]]
|
||||
name = "openssl-sys"
|
||||
version = "0.9.96"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3812c071ba60da8b5677cc12bcb1d42989a65553772897a7e0355545a819838f"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
"pkg-config",
|
||||
"vcpkg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry"
|
||||
version = "0.20.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9591d937bc0e6d2feb6f71a559540ab300ea49955229c347a517a28d27784c54"
|
||||
dependencies = [
|
||||
"opentelemetry_api",
|
||||
"opentelemetry_sdk",
|
||||
@@ -3375,6 +3471,8 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "opentelemetry-http"
|
||||
version = "0.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c7594ec0e11d8e33faf03530a4c49af7064ebba81c1480e01be67d90b356508b"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
@@ -3386,6 +3484,8 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "opentelemetry-otlp"
|
||||
version = "0.13.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7e5e5a5c4135864099f3faafbe939eb4d7f9b80ebf68a8448da961b32a7c1275"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"futures-core",
|
||||
@@ -3405,6 +3505,8 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "opentelemetry-proto"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b1e3f814aa9f8c905d0ee4bde026afd3b2577a97c10e1699912e3e44f0c4cbeb"
|
||||
dependencies = [
|
||||
"opentelemetry_api",
|
||||
"opentelemetry_sdk",
|
||||
@@ -3415,6 +3517,8 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "opentelemetry-semantic-conventions"
|
||||
version = "0.12.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "73c9f9340ad135068800e7f1b24e9e09ed9e7143f5bf8518ded3d3ec69789269"
|
||||
dependencies = [
|
||||
"opentelemetry",
|
||||
]
|
||||
@@ -3422,6 +3526,8 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "opentelemetry_api"
|
||||
version = "0.20.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8a81f725323db1b1206ca3da8bb19874bbd3f57c3bcd59471bfb04525b265b9b"
|
||||
dependencies = [
|
||||
"futures-channel",
|
||||
"futures-util",
|
||||
@@ -3436,6 +3542,8 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "opentelemetry_sdk"
|
||||
version = "0.20.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fa8e705a0612d48139799fcbaba0d4a90f06277153e43dd2bdc16c6f0edd8026"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"crossbeam-channel",
|
||||
@@ -3997,6 +4105,17 @@ dependencies = [
|
||||
"tokio-postgres",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "postgres-native-tls"
|
||||
version = "0.5.0"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
|
||||
dependencies = [
|
||||
"native-tls",
|
||||
"tokio",
|
||||
"tokio-native-tls",
|
||||
"tokio-postgres",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "postgres-protocol"
|
||||
version = "0.6.4"
|
||||
@@ -4304,6 +4423,7 @@ dependencies = [
|
||||
"md5",
|
||||
"measured",
|
||||
"metrics",
|
||||
"native-tls",
|
||||
"once_cell",
|
||||
"opentelemetry",
|
||||
"parking_lot 0.12.1",
|
||||
@@ -4311,6 +4431,7 @@ dependencies = [
|
||||
"parquet_derive",
|
||||
"pbkdf2",
|
||||
"pin-project-lite",
|
||||
"postgres-native-tls",
|
||||
"postgres-protocol",
|
||||
"postgres_backend",
|
||||
"pq_proto",
|
||||
@@ -4358,7 +4479,7 @@ dependencies = [
|
||||
"utils",
|
||||
"uuid",
|
||||
"walkdir",
|
||||
"webpki-roots 0.26.1",
|
||||
"webpki-roots 0.25.2",
|
||||
"workspace_hack",
|
||||
"x509-parser",
|
||||
]
|
||||
@@ -4665,21 +4786,20 @@ dependencies = [
|
||||
"http 0.2.9",
|
||||
"http-body 0.4.5",
|
||||
"hyper 0.14.26",
|
||||
"hyper-rustls 0.24.0",
|
||||
"hyper-tls",
|
||||
"ipnet",
|
||||
"js-sys",
|
||||
"log",
|
||||
"mime",
|
||||
"native-tls",
|
||||
"once_cell",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"rustls 0.21.11",
|
||||
"rustls-pemfile 1.0.2",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_urlencoded",
|
||||
"tokio",
|
||||
"tokio-rustls 0.24.0",
|
||||
"tokio-native-tls",
|
||||
"tokio-util",
|
||||
"tower-service",
|
||||
"url",
|
||||
@@ -4687,7 +4807,6 @@ dependencies = [
|
||||
"wasm-bindgen-futures",
|
||||
"wasm-streams 0.3.0",
|
||||
"web-sys",
|
||||
"webpki-roots 0.25.2",
|
||||
"winreg 0.50.0",
|
||||
]
|
||||
|
||||
@@ -5113,20 +5232,20 @@ dependencies = [
|
||||
"hex",
|
||||
"histogram",
|
||||
"itertools",
|
||||
"native-tls",
|
||||
"pageserver",
|
||||
"pageserver_api",
|
||||
"postgres-native-tls",
|
||||
"postgres_ffi",
|
||||
"rand 0.8.5",
|
||||
"remote_storage",
|
||||
"reqwest 0.12.4",
|
||||
"rustls 0.22.4",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_with",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres-rustls",
|
||||
"tokio-rustls 0.25.0",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
@@ -5134,7 +5253,6 @@ dependencies = [
|
||||
"tracing-appender",
|
||||
"tracing-subscriber",
|
||||
"utils",
|
||||
"webpki-roots 0.26.1",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
@@ -5725,15 +5843,6 @@ version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
|
||||
|
||||
[[package]]
|
||||
name = "statx-sys"
|
||||
version = "0.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "69c325f46f705b7a66fb87f0ebb999524a7363f30f05d373277b4ef7f409fe87"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "storage_broker"
|
||||
version = "0.1.0"
|
||||
@@ -6157,7 +6266,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "tokio-epoll-uring"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=arpad/statx_sys#ca8446b8edb5e0aef88520f2fc209a13a834fd25"
|
||||
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#342ddd197a060a8354e8f11f4d12994419fff939"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"nix 0.26.4",
|
||||
@@ -6191,6 +6300,16 @@ dependencies = [
|
||||
"syn 2.0.52",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-native-tls"
|
||||
version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2"
|
||||
dependencies = [
|
||||
"native-tls",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-postgres"
|
||||
version = "0.7.7"
|
||||
@@ -6217,7 +6336,10 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "tokio-postgres-rustls"
|
||||
version = "0.11.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0ea13f22eda7127c827983bdaf0d7fff9df21c8817bab02815ac277a21143677"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"ring 0.17.6",
|
||||
"rustls 0.22.4",
|
||||
"tokio",
|
||||
@@ -6675,12 +6797,11 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "uring-common"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=arpad/statx_sys#ca8446b8edb5e0aef88520f2fc209a13a834fd25"
|
||||
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#342ddd197a060a8354e8f11f4d12994419fff939"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"io-uring",
|
||||
"libc",
|
||||
"statx-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -7508,9 +7629,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "zeroize"
|
||||
version = "1.7.0"
|
||||
version = "1.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "525b4ec142c6b68a2d10f01f7bbf6755599ca3f81ea53b8431b7dd348f5fdb2d"
|
||||
checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9"
|
||||
dependencies = [
|
||||
"zeroize_derive",
|
||||
]
|
||||
|
||||
24
Cargo.toml
24
Cargo.toml
@@ -46,10 +46,10 @@ anyhow = { version = "1.0", features = ["backtrace"] }
|
||||
arc-swap = "1.6"
|
||||
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
|
||||
atomic-take = "1.1.0"
|
||||
azure_core = { path = "../azure-sdk-for-rust/sdk/core" } # "0.19"
|
||||
azure_identity = { path = "../azure-sdk-for-rust/sdk/identity" } # "0.19"
|
||||
azure_storage = { path = "../azure-sdk-for-rust/sdk/storage" } # "0.19"
|
||||
azure_storage_blobs = { path = "../azure-sdk-for-rust/sdk/storage_blobs" } # "0.19"
|
||||
azure_core = "0.19"
|
||||
azure_identity = "0.19"
|
||||
azure_storage = "0.19"
|
||||
azure_storage_blobs = "0.19"
|
||||
flate2 = "1.0.26"
|
||||
async-stream = "0.3"
|
||||
async-trait = "0.1"
|
||||
@@ -114,13 +114,14 @@ md5 = "0.7.0"
|
||||
measured = { version = "0.0.21", features=["lasso"] }
|
||||
measured-process = { version = "0.0.21" }
|
||||
memoffset = "0.8"
|
||||
native-tls = "0.2"
|
||||
nix = { version = "0.27", features = ["fs", "process", "socket", "signal", "poll"] }
|
||||
notify = "6.0.0"
|
||||
num_cpus = "1.15"
|
||||
num-traits = "0.2.15"
|
||||
once_cell = "1.13"
|
||||
opentelemetry = "0.20.0"
|
||||
opentelemetry-otlp = { path="../opentelemetry-rust/opentelemetry-otlp", default_features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
|
||||
opentelemetry-otlp = { version = "0.13.0", default_features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
|
||||
opentelemetry-semantic-conventions = "0.12.0"
|
||||
parking_lot = "0.12"
|
||||
parquet = { version = "51.0.0", default-features = false, features = ["zstd"] }
|
||||
@@ -170,9 +171,9 @@ thiserror = "1.0"
|
||||
tikv-jemallocator = "0.5"
|
||||
tikv-jemalloc-ctl = "0.5"
|
||||
tokio = { version = "1.17", features = ["macros"] }
|
||||
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "arpad/statx_sys" }
|
||||
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
|
||||
tokio-io-timeout = "1.2.0"
|
||||
#tokio-postgres-rustls = "0.11.0"
|
||||
tokio-postgres-rustls = "0.11.0"
|
||||
tokio-rustls = "0.25"
|
||||
tokio-stream = "0.1"
|
||||
tokio-tar = "0.3"
|
||||
@@ -190,7 +191,7 @@ url = "2.2"
|
||||
urlencoding = "2.1"
|
||||
uuid = { version = "1.6.1", features = ["v4", "v7", "serde"] }
|
||||
walkdir = "2.3.2"
|
||||
webpki-roots = "0.26"
|
||||
webpki-roots = "0.25"
|
||||
x509-parser = "0.15"
|
||||
|
||||
## TODO replace this with tracing
|
||||
@@ -199,7 +200,7 @@ log = "0.4"
|
||||
|
||||
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
tokio-postgres-rustls = {path = "../tokio-postgres-rustls"}
|
||||
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
@@ -240,11 +241,6 @@ tonic-build = "0.9"
|
||||
|
||||
[patch.crates-io]
|
||||
|
||||
opentelemetry_api = { path = "../opentelemetry-rust/opentelemetry-api" }
|
||||
opentelemetry_sdk = { path = "../opentelemetry-rust/opentelemetry-sdk" }
|
||||
opentelemetry-semantic-conventions = { path = "../opentelemetry-rust/opentelemetry-semantic-conventions" }
|
||||
opentelemetry = { path = "../opentelemetry-rust/opentelemetry" }
|
||||
|
||||
# This is only needed for proxy's tests.
|
||||
# TODO: we should probably fork `tokio-postgres-rustls` instead.
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
|
||||
@@ -23,7 +23,7 @@ serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
signal-hook.workspace = true
|
||||
tar.workspace = true
|
||||
reqwest = { workspace = true, features = ["json", "rustls-tls"] }
|
||||
reqwest = { workspace = true, features = ["json"] }
|
||||
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
|
||||
tokio-postgres.workspace = true
|
||||
tokio-util.workspace = true
|
||||
|
||||
@@ -20,7 +20,7 @@ hex.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
hyper.workspace = true
|
||||
regex.workspace = true
|
||||
reqwest = { workspace = true, features = ["blocking", "json", "rustls-tls"] }
|
||||
reqwest = { workspace = true, features = ["blocking", "json"] }
|
||||
scopeguard.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
|
||||
@@ -12,7 +12,7 @@ comfy-table.workspace = true
|
||||
hyper.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
pageserver_client.workspace = true
|
||||
reqwest = { workspace = true }
|
||||
reqwest.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json = { workspace = true, features = ["raw_value"] }
|
||||
thiserror.workspace = true
|
||||
|
||||
@@ -29,6 +29,7 @@ use http_types::{StatusCode, Url};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::debug;
|
||||
|
||||
use crate::RemoteStorageActivity;
|
||||
use crate::{
|
||||
error::Cancelled, s3_bucket::RequestKind, AzureConfig, ConcurrencyLimiter, Download,
|
||||
DownloadError, Listing, ListingMode, RemotePath, RemoteStorage, StorageMetadata,
|
||||
@@ -525,6 +526,10 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
// https://learn.microsoft.com/en-us/azure/storage/blobs/point-in-time-restore-overview
|
||||
Err(TimeTravelError::Unimplemented)
|
||||
}
|
||||
|
||||
fn activity(&self) -> RemoteStorageActivity {
|
||||
self.concurrency_limiter.activity()
|
||||
}
|
||||
}
|
||||
|
||||
pin_project_lite::pin_project! {
|
||||
|
||||
@@ -263,6 +263,17 @@ pub trait RemoteStorage: Send + Sync + 'static {
|
||||
done_if_after: SystemTime,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(), TimeTravelError>;
|
||||
|
||||
/// Query how busy we currently are: may be used by callers which wish to politely
|
||||
/// back off if there are already a lot of operations underway.
|
||||
fn activity(&self) -> RemoteStorageActivity;
|
||||
}
|
||||
|
||||
pub struct RemoteStorageActivity {
|
||||
pub read_available: usize,
|
||||
pub read_total: usize,
|
||||
pub write_available: usize,
|
||||
pub write_total: usize,
|
||||
}
|
||||
|
||||
/// DownloadStream is sensitive to the timeout and cancellation used with the original
|
||||
@@ -444,6 +455,15 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn activity(&self) -> RemoteStorageActivity {
|
||||
match self {
|
||||
Self::LocalFs(s) => s.activity(),
|
||||
Self::AwsS3(s) => s.activity(),
|
||||
Self::AzureBlob(s) => s.activity(),
|
||||
Self::Unreliable(s) => s.activity(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl GenericRemoteStorage {
|
||||
@@ -774,6 +794,9 @@ struct ConcurrencyLimiter {
|
||||
// The helps to ensure we don't exceed the thresholds.
|
||||
write: Arc<Semaphore>,
|
||||
read: Arc<Semaphore>,
|
||||
|
||||
write_total: usize,
|
||||
read_total: usize,
|
||||
}
|
||||
|
||||
impl ConcurrencyLimiter {
|
||||
@@ -802,10 +825,21 @@ impl ConcurrencyLimiter {
|
||||
Arc::clone(self.for_kind(kind)).acquire_owned().await
|
||||
}
|
||||
|
||||
fn activity(&self) -> RemoteStorageActivity {
|
||||
RemoteStorageActivity {
|
||||
read_available: self.read.available_permits(),
|
||||
read_total: self.read_total,
|
||||
write_available: self.write.available_permits(),
|
||||
write_total: self.write_total,
|
||||
}
|
||||
}
|
||||
|
||||
fn new(limit: usize) -> ConcurrencyLimiter {
|
||||
Self {
|
||||
read: Arc::new(Semaphore::new(limit)),
|
||||
write: Arc::new(Semaphore::new(limit)),
|
||||
read_total: limit,
|
||||
write_total: limit,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,8 +23,8 @@ use tokio_util::{io::ReaderStream, sync::CancellationToken};
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
|
||||
use crate::{
|
||||
Download, DownloadError, Listing, ListingMode, RemotePath, TimeTravelError, TimeoutOrCancel,
|
||||
REMOTE_STORAGE_PREFIX_SEPARATOR,
|
||||
Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorageActivity,
|
||||
TimeTravelError, TimeoutOrCancel, REMOTE_STORAGE_PREFIX_SEPARATOR,
|
||||
};
|
||||
|
||||
use super::{RemoteStorage, StorageMetadata};
|
||||
@@ -605,6 +605,16 @@ impl RemoteStorage for LocalFs {
|
||||
) -> Result<(), TimeTravelError> {
|
||||
Err(TimeTravelError::Unimplemented)
|
||||
}
|
||||
|
||||
fn activity(&self) -> RemoteStorageActivity {
|
||||
// LocalFS has no concurrency limiting: give callers the impression that plenty of units are available
|
||||
RemoteStorageActivity {
|
||||
read_available: 16,
|
||||
read_total: 16,
|
||||
write_available: 16,
|
||||
write_total: 16,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {
|
||||
|
||||
@@ -47,8 +47,8 @@ use utils::backoff;
|
||||
use super::StorageMetadata;
|
||||
use crate::{
|
||||
error::Cancelled, support::PermitCarrying, ConcurrencyLimiter, Download, DownloadError,
|
||||
Listing, ListingMode, RemotePath, RemoteStorage, S3Config, TimeTravelError, TimeoutOrCancel,
|
||||
MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
|
||||
Listing, ListingMode, RemotePath, RemoteStorage, RemoteStorageActivity, S3Config,
|
||||
TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
|
||||
};
|
||||
|
||||
pub(super) mod metrics;
|
||||
@@ -975,6 +975,10 @@ impl RemoteStorage for S3Bucket {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn activity(&self) -> RemoteStorageActivity {
|
||||
self.concurrency_limiter.activity()
|
||||
}
|
||||
}
|
||||
|
||||
/// On drop (cancellation) count towards [`metrics::BucketMetrics::cancelled_waits`].
|
||||
|
||||
@@ -12,7 +12,7 @@ use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::{
|
||||
Download, DownloadError, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorage,
|
||||
StorageMetadata, TimeTravelError,
|
||||
RemoteStorageActivity, StorageMetadata, TimeTravelError,
|
||||
};
|
||||
|
||||
pub struct UnreliableWrapper {
|
||||
@@ -213,4 +213,8 @@ impl RemoteStorage for UnreliableWrapper {
|
||||
.time_travel_recover(prefix, timestamp, done_if_after, cancel)
|
||||
.await
|
||||
}
|
||||
|
||||
fn activity(&self) -> RemoteStorageActivity {
|
||||
self.inner.activity()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,8 +135,7 @@ impl Gate {
|
||||
let started_at = std::time::Instant::now();
|
||||
let mut do_close = std::pin::pin!(self.do_close());
|
||||
|
||||
// with 1s we rarely saw anything, let's try if we get more gate closing reasons with 100ms
|
||||
let nag_after = Duration::from_millis(100);
|
||||
let nag_after = Duration::from_secs(1);
|
||||
|
||||
let Err(_timeout) = tokio::time::timeout(nag_after, &mut do_close).await else {
|
||||
return;
|
||||
|
||||
@@ -84,7 +84,7 @@ storage_broker.workspace = true
|
||||
tenant_size_model.workspace = true
|
||||
utils.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
reqwest = { workspace = true }
|
||||
reqwest.workspace = true
|
||||
rpds.workspace = true
|
||||
enum-map.workspace = true
|
||||
enumset = { workspace = true, features = ["serde"]}
|
||||
|
||||
@@ -8,7 +8,7 @@ license.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
thiserror.workspace = true
|
||||
async-trait.workspace = true
|
||||
reqwest = { workspace = true }
|
||||
reqwest.workspace = true
|
||||
utils.workspace = true
|
||||
serde.workspace = true
|
||||
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||
|
||||
@@ -380,8 +380,8 @@ impl interface::CompactionLayer<Key> for MockLayer {
|
||||
}
|
||||
fn file_size(&self) -> u64 {
|
||||
match self {
|
||||
MockLayer::Delta(this) => this.file_size,
|
||||
MockLayer::Image(this) => this.file_size,
|
||||
MockLayer::Delta(this) => this.file_size(),
|
||||
MockLayer::Image(this) => this.file_size(),
|
||||
}
|
||||
}
|
||||
fn short_id(&self) -> String {
|
||||
|
||||
@@ -2,7 +2,7 @@ use std::collections::HashMap;
|
||||
|
||||
use anyhow::Context;
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use pageserver::tenant::remote_timeline_client::index::IndexLayerMetadata;
|
||||
use pageserver::tenant::storage_layer::LayerName;
|
||||
use pageserver::tenant::{metadata::TimelineMetadata, IndexPart};
|
||||
use utils::lsn::Lsn;
|
||||
@@ -19,7 +19,7 @@ pub(crate) async fn main(cmd: &IndexPartCmd) -> anyhow::Result<()> {
|
||||
let des: IndexPart = IndexPart::from_s3_bytes(&bytes).context("deserialize")?;
|
||||
#[derive(serde::Serialize)]
|
||||
struct Output<'a> {
|
||||
layer_metadata: &'a HashMap<LayerName, LayerFileMetadata>,
|
||||
layer_metadata: &'a HashMap<LayerName, IndexLayerMetadata>,
|
||||
disk_consistent_lsn: Lsn,
|
||||
timeline_metadata: &'a TimelineMetadata,
|
||||
}
|
||||
|
||||
@@ -534,7 +534,7 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
});
|
||||
}
|
||||
EvictionLayer::Secondary(layer) => {
|
||||
let file_size = layer.metadata.file_size;
|
||||
let file_size = layer.metadata.file_size();
|
||||
|
||||
js.spawn(async move {
|
||||
layer
|
||||
@@ -641,7 +641,7 @@ impl EvictionLayer {
|
||||
pub(crate) fn get_file_size(&self) -> u64 {
|
||||
match self {
|
||||
Self::Attached(l) => l.layer_desc().file_size,
|
||||
Self::Secondary(sl) => sl.metadata.file_size,
|
||||
Self::Secondary(sl) => sl.metadata.file_size(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -260,8 +260,6 @@ async fn page_service_conn_main(
|
||||
socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
|
||||
let socket = std::pin::pin!(socket);
|
||||
|
||||
fail::fail_point!("ps::connection-start::pre-login");
|
||||
|
||||
// XXX: pgbackend.run() should take the connection_ctx,
|
||||
// and create a child per-query context when it invokes process_query.
|
||||
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
|
||||
@@ -605,7 +603,6 @@ impl PageServerHandler {
|
||||
};
|
||||
|
||||
trace!("query: {copy_data_bytes:?}");
|
||||
fail::fail_point!("ps::handle-pagerequest-message");
|
||||
|
||||
// Trace request if needed
|
||||
if let Some(t) = tracer.as_mut() {
|
||||
@@ -620,7 +617,6 @@ impl PageServerHandler {
|
||||
|
||||
let (response, span) = match neon_fe_msg {
|
||||
PagestreamFeMessage::Exists(req) => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::exists");
|
||||
let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn);
|
||||
(
|
||||
self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
|
||||
@@ -630,7 +626,6 @@ impl PageServerHandler {
|
||||
)
|
||||
}
|
||||
PagestreamFeMessage::Nblocks(req) => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::nblocks");
|
||||
let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn);
|
||||
(
|
||||
self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
|
||||
@@ -640,7 +635,6 @@ impl PageServerHandler {
|
||||
)
|
||||
}
|
||||
PagestreamFeMessage::GetPage(req) => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::getpage");
|
||||
// shard_id is filled in by the handler
|
||||
let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn);
|
||||
(
|
||||
@@ -651,7 +645,6 @@ impl PageServerHandler {
|
||||
)
|
||||
}
|
||||
PagestreamFeMessage::DbSize(req) => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::dbsize");
|
||||
let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn);
|
||||
(
|
||||
self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
|
||||
@@ -661,7 +654,6 @@ impl PageServerHandler {
|
||||
)
|
||||
}
|
||||
PagestreamFeMessage::GetSlruSegment(req) => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
|
||||
let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn);
|
||||
(
|
||||
self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx)
|
||||
@@ -1513,7 +1505,6 @@ where
|
||||
_pgb: &mut PostgresBackend<IO>,
|
||||
_sm: &FeStartupPacket,
|
||||
) -> Result<(), QueryError> {
|
||||
fail::fail_point!("ps::connection-start::startup-packet");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1528,8 +1519,6 @@ where
|
||||
Err(QueryError::SimulatedConnectionError)
|
||||
});
|
||||
|
||||
fail::fail_point!("ps::connection-start::process-query");
|
||||
|
||||
let ctx = self.connection_ctx.attached_child();
|
||||
debug!("process query {query_string:?}");
|
||||
let parts = query_string.split_whitespace().collect::<Vec<_>>();
|
||||
|
||||
@@ -1192,7 +1192,7 @@ impl RemoteTimelineClient {
|
||||
&self.storage_impl,
|
||||
uploaded.local_path(),
|
||||
&remote_path,
|
||||
uploaded.metadata().file_size,
|
||||
uploaded.metadata().file_size(),
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
@@ -1573,7 +1573,7 @@ impl RemoteTimelineClient {
|
||||
&self.storage_impl,
|
||||
local_path,
|
||||
&remote_path,
|
||||
layer_metadata.file_size,
|
||||
layer_metadata.file_size(),
|
||||
&self.cancel,
|
||||
)
|
||||
.measure_remote_op(
|
||||
@@ -1768,7 +1768,7 @@ impl RemoteTimelineClient {
|
||||
UploadOp::UploadLayer(_, m) => (
|
||||
RemoteOpFileKind::Layer,
|
||||
RemoteOpKind::Upload,
|
||||
RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size),
|
||||
RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size()),
|
||||
),
|
||||
UploadOp::UploadMetadata(_, _) => (
|
||||
RemoteOpFileKind::Index,
|
||||
|
||||
@@ -84,7 +84,7 @@ pub async fn download_layer_file<'a>(
|
||||
)
|
||||
.await?;
|
||||
|
||||
let expected = layer_metadata.file_size;
|
||||
let expected = layer_metadata.file_size();
|
||||
if expected != bytes_amount {
|
||||
return Err(DownloadError::Other(anyhow!(
|
||||
"According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}",
|
||||
|
||||
@@ -17,6 +17,46 @@ use pageserver_api::shard::ShardIndex;
|
||||
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
/// Metadata gathered for each of the layer files.
|
||||
///
|
||||
/// Fields have to be `Option`s because remote [`IndexPart`]'s can be from different version, which
|
||||
/// might have less or more metadata depending if upgrading or rolling back an upgrade.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
|
||||
//#[cfg_attr(test, derive(Default))]
|
||||
pub struct LayerFileMetadata {
|
||||
file_size: u64,
|
||||
|
||||
pub(crate) generation: Generation,
|
||||
|
||||
pub(crate) shard: ShardIndex,
|
||||
}
|
||||
|
||||
impl From<&'_ IndexLayerMetadata> for LayerFileMetadata {
|
||||
fn from(other: &IndexLayerMetadata) -> Self {
|
||||
LayerFileMetadata {
|
||||
file_size: other.file_size,
|
||||
generation: other.generation,
|
||||
shard: other.shard,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerFileMetadata {
|
||||
pub fn new(file_size: u64, generation: Generation, shard: ShardIndex) -> Self {
|
||||
LayerFileMetadata {
|
||||
file_size,
|
||||
generation,
|
||||
shard,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn file_size(&self) -> u64 {
|
||||
self.file_size
|
||||
}
|
||||
}
|
||||
|
||||
// TODO seems like another part of the remote storage file format
|
||||
// compatibility issue, see https://github.com/neondatabase/neon/issues/3072
|
||||
/// In-memory representation of an `index_part.json` file
|
||||
///
|
||||
/// Contains the data about all files in the timeline, present remotely and its metadata.
|
||||
@@ -37,7 +77,7 @@ pub struct IndexPart {
|
||||
///
|
||||
/// Older versions of `IndexPart` will not have this property or have only a part of metadata
|
||||
/// that latest version stores.
|
||||
pub layer_metadata: HashMap<LayerName, LayerFileMetadata>,
|
||||
pub layer_metadata: HashMap<LayerName, IndexLayerMetadata>,
|
||||
|
||||
// 'disk_consistent_lsn' is a copy of the 'disk_consistent_lsn' in the metadata.
|
||||
// It's duplicated for convenience when reading the serialized structure, but is
|
||||
@@ -87,7 +127,10 @@ impl IndexPart {
|
||||
lineage: Lineage,
|
||||
last_aux_file_policy: Option<AuxFilePolicy>,
|
||||
) -> Self {
|
||||
let layer_metadata = layers_and_metadata.clone();
|
||||
let layer_metadata = layers_and_metadata
|
||||
.iter()
|
||||
.map(|(k, v)| (k.to_owned(), IndexLayerMetadata::from(v)))
|
||||
.collect();
|
||||
|
||||
Self {
|
||||
version: Self::LATEST_VERSION,
|
||||
@@ -151,12 +194,9 @@ impl From<&UploadQueueInitialized> for IndexPart {
|
||||
}
|
||||
}
|
||||
|
||||
/// Metadata gathered for each of the layer files.
|
||||
///
|
||||
/// Fields have to be `Option`s because remote [`IndexPart`]'s can be from different version, which
|
||||
/// might have less or more metadata depending if upgrading or rolling back an upgrade.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
|
||||
pub struct LayerFileMetadata {
|
||||
/// Serialized form of [`LayerFileMetadata`].
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
|
||||
pub struct IndexLayerMetadata {
|
||||
pub file_size: u64,
|
||||
|
||||
#[serde(default = "Generation::none")]
|
||||
@@ -168,12 +208,12 @@ pub struct LayerFileMetadata {
|
||||
pub shard: ShardIndex,
|
||||
}
|
||||
|
||||
impl LayerFileMetadata {
|
||||
pub fn new(file_size: u64, generation: Generation, shard: ShardIndex) -> Self {
|
||||
LayerFileMetadata {
|
||||
file_size,
|
||||
generation,
|
||||
shard,
|
||||
impl From<&LayerFileMetadata> for IndexLayerMetadata {
|
||||
fn from(other: &LayerFileMetadata) -> Self {
|
||||
IndexLayerMetadata {
|
||||
file_size: other.file_size,
|
||||
generation: other.generation,
|
||||
shard: other.shard,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -267,12 +307,12 @@ mod tests {
|
||||
// note this is not verified, could be anything, but exists for humans debugging.. could be the git version instead?
|
||||
version: 1,
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
@@ -309,12 +349,12 @@ mod tests {
|
||||
// note this is not verified, could be anything, but exists for humans debugging.. could be the git version instead?
|
||||
version: 1,
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
@@ -352,12 +392,12 @@ mod tests {
|
||||
// note this is not verified, could be anything, but exists for humans debugging.. could be the git version instead?
|
||||
version: 2,
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
@@ -440,12 +480,12 @@ mod tests {
|
||||
let expected = IndexPart {
|
||||
version: 4,
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
@@ -482,12 +522,12 @@ mod tests {
|
||||
let expected = IndexPart {
|
||||
version: 5,
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000014EF420-00000000014EF499".parse().unwrap(), LayerFileMetadata {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000014EF420-00000000014EF499".parse().unwrap(), IndexLayerMetadata {
|
||||
file_size: 23289856,
|
||||
generation: Generation::new(1),
|
||||
shard: ShardIndex::unsharded(),
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000014EF499-00000000015A7619".parse().unwrap(), LayerFileMetadata {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000014EF499-00000000015A7619".parse().unwrap(), IndexLayerMetadata {
|
||||
file_size: 1015808,
|
||||
generation: Generation::new(1),
|
||||
shard: ShardIndex::unsharded(),
|
||||
@@ -529,12 +569,12 @@ mod tests {
|
||||
let expected = IndexPart {
|
||||
version: 6,
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
|
||||
@@ -45,10 +45,10 @@ use crate::tenant::{
|
||||
|
||||
use camino::Utf8PathBuf;
|
||||
use chrono::format::{DelayedFormat, StrftimeItems};
|
||||
use futures::Future;
|
||||
use futures::{Future, StreamExt};
|
||||
use pageserver_api::models::SecondaryProgress;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::{DownloadError, Etag, GenericRemoteStorage};
|
||||
use remote_storage::{DownloadError, Etag, GenericRemoteStorage, RemoteStorageActivity};
|
||||
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{info_span, instrument, warn, Instrument};
|
||||
@@ -67,6 +67,12 @@ use super::{
|
||||
/// download, if the uploader populated it.
|
||||
const DEFAULT_DOWNLOAD_INTERVAL: Duration = Duration::from_millis(60000);
|
||||
|
||||
/// Range of concurrency we may use when downloading layers within a timeline. This is independent
|
||||
/// for each tenant we're downloading: the concurrency of _tenants_ is defined separately in
|
||||
/// `PageServerConf::secondary_download_concurrency`
|
||||
const MAX_LAYER_CONCURRENCY: usize = 16;
|
||||
const MIN_LAYER_CONCURRENCY: usize = 1;
|
||||
|
||||
pub(super) async fn downloader_task(
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
remote_storage: GenericRemoteStorage,
|
||||
@@ -75,14 +81,15 @@ pub(super) async fn downloader_task(
|
||||
cancel: CancellationToken,
|
||||
root_ctx: RequestContext,
|
||||
) {
|
||||
let concurrency = tenant_manager.get_conf().secondary_download_concurrency;
|
||||
// How many tenants' secondary download operations we will run concurrently
|
||||
let tenant_concurrency = tenant_manager.get_conf().secondary_download_concurrency;
|
||||
|
||||
let generator = SecondaryDownloader {
|
||||
tenant_manager,
|
||||
remote_storage,
|
||||
root_ctx,
|
||||
};
|
||||
let mut scheduler = Scheduler::new(generator, concurrency);
|
||||
let mut scheduler = Scheduler::new(generator, tenant_concurrency);
|
||||
|
||||
scheduler
|
||||
.run(command_queue, background_jobs_can_start, cancel)
|
||||
@@ -407,7 +414,7 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
|
||||
tracing::warn!("Insufficient space while downloading. Will retry later.");
|
||||
}
|
||||
Err(UpdateError::Cancelled) => {
|
||||
tracing::info!("Shut down while downloading");
|
||||
tracing::debug!("Shut down while downloading");
|
||||
},
|
||||
Err(UpdateError::Deserialize(e)) => {
|
||||
tracing::error!("Corrupt content while downloading tenant: {e}");
|
||||
@@ -709,7 +716,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
let mut layer_byte_count: u64 = timeline_state
|
||||
.on_disk_layers
|
||||
.values()
|
||||
.map(|l| l.metadata.file_size)
|
||||
.map(|l| l.metadata.file_size())
|
||||
.sum();
|
||||
|
||||
// Remove on-disk layers that are no longer present in heatmap
|
||||
@@ -720,7 +727,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
.get(layer_file_name)
|
||||
.unwrap()
|
||||
.metadata
|
||||
.file_size;
|
||||
.file_size();
|
||||
|
||||
let local_path = local_layer_path(
|
||||
self.conf,
|
||||
@@ -841,6 +848,8 @@ impl<'a> TenantDownloader<'a> {
|
||||
|
||||
tracing::debug!(timeline_id=%timeline.timeline_id, "Downloading layers, {} in heatmap", timeline.layers.len());
|
||||
|
||||
let mut download_futs = Vec::new();
|
||||
|
||||
// Download heatmap layers that are not present on local disk, or update their
|
||||
// access time if they are already present.
|
||||
for layer in timeline.layers {
|
||||
@@ -877,7 +886,9 @@ impl<'a> TenantDownloader<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
if on_disk.metadata != layer.metadata || on_disk.access_time != layer.access_time {
|
||||
if on_disk.metadata != LayerFileMetadata::from(&layer.metadata)
|
||||
|| on_disk.access_time != layer.access_time
|
||||
{
|
||||
// We already have this layer on disk. Update its access time.
|
||||
tracing::debug!(
|
||||
"Access time updated for layer {}: {} -> {}",
|
||||
@@ -913,14 +924,31 @@ impl<'a> TenantDownloader<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
match self
|
||||
.download_layer(tenant_shard_id, &timeline.timeline_id, layer, ctx)
|
||||
.await?
|
||||
{
|
||||
Some(layer) => touched.push(layer),
|
||||
None => {
|
||||
// Not an error but we didn't download it: remote layer is missing. Don't add it to the list of
|
||||
// things to consider touched.
|
||||
download_futs.push(self.download_layer(
|
||||
tenant_shard_id,
|
||||
&timeline.timeline_id,
|
||||
layer,
|
||||
ctx,
|
||||
));
|
||||
}
|
||||
|
||||
// Break up layer downloads into chunks, so that for each chunk we can re-check how much
|
||||
// concurrency to use based on activity level of remote storage.
|
||||
while !download_futs.is_empty() {
|
||||
let chunk =
|
||||
download_futs.split_off(download_futs.len().saturating_sub(MAX_LAYER_CONCURRENCY));
|
||||
|
||||
let concurrency = Self::layer_concurrency(self.remote_storage.activity());
|
||||
|
||||
let mut result_stream = futures::stream::iter(chunk).buffered(concurrency);
|
||||
let mut result_stream = std::pin::pin!(result_stream);
|
||||
while let Some(result) = result_stream.next().await {
|
||||
match result {
|
||||
Err(e) => return Err(e),
|
||||
Ok(None) => {
|
||||
// No error, but we didn't download the layer. Don't mark it touched
|
||||
}
|
||||
Ok(Some(layer)) => touched.push(layer),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -951,7 +979,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
tenant_shard_id,
|
||||
&timeline.timeline_id,
|
||||
t.name,
|
||||
t.metadata.clone(),
|
||||
LayerFileMetadata::from(&t.metadata),
|
||||
t.access_time,
|
||||
local_path,
|
||||
));
|
||||
@@ -996,7 +1024,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
*tenant_shard_id,
|
||||
*timeline_id,
|
||||
&layer.name,
|
||||
&layer.metadata,
|
||||
&LayerFileMetadata::from(&layer.metadata),
|
||||
&local_path,
|
||||
&self.secondary_state.cancel,
|
||||
ctx,
|
||||
@@ -1055,6 +1083,19 @@ impl<'a> TenantDownloader<'a> {
|
||||
|
||||
Ok(Some(layer))
|
||||
}
|
||||
|
||||
/// Calculate the currently allowed parallelism of layer download tasks, based on activity level of the remote storage
|
||||
fn layer_concurrency(activity: RemoteStorageActivity) -> usize {
|
||||
// When less than 75% of units are available, use minimum concurrency. Else, do a linear mapping
|
||||
// of our concurrency range to the units available within the remaining 25%.
|
||||
let clamp_at = (activity.read_total * 3) / 4;
|
||||
if activity.read_available > clamp_at {
|
||||
(MAX_LAYER_CONCURRENCY * (activity.read_available - clamp_at))
|
||||
/ (activity.read_total - clamp_at)
|
||||
} else {
|
||||
MIN_LAYER_CONCURRENCY
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Scan local storage and build up Layer objects based on the metadata in a HeatMapTimeline
|
||||
@@ -1144,7 +1185,7 @@ async fn init_timeline_state(
|
||||
tenant_shard_id,
|
||||
&heatmap.timeline_id,
|
||||
name,
|
||||
remote_meta.metadata.clone(),
|
||||
LayerFileMetadata::from(&remote_meta.metadata),
|
||||
remote_meta.access_time,
|
||||
file_path,
|
||||
),
|
||||
@@ -1178,3 +1219,58 @@ async fn init_timeline_state(
|
||||
|
||||
detail
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn layer_concurrency() {
|
||||
// Totally idle
|
||||
assert_eq!(
|
||||
TenantDownloader::layer_concurrency(RemoteStorageActivity {
|
||||
read_available: 16,
|
||||
read_total: 16,
|
||||
write_available: 16,
|
||||
write_total: 16
|
||||
}),
|
||||
MAX_LAYER_CONCURRENCY
|
||||
);
|
||||
|
||||
// Totally busy
|
||||
assert_eq!(
|
||||
TenantDownloader::layer_concurrency(RemoteStorageActivity {
|
||||
read_available: 0,
|
||||
read_total: 16,
|
||||
|
||||
write_available: 16,
|
||||
write_total: 16
|
||||
}),
|
||||
MIN_LAYER_CONCURRENCY
|
||||
);
|
||||
|
||||
// Edge of the range at which we interpolate
|
||||
assert_eq!(
|
||||
TenantDownloader::layer_concurrency(RemoteStorageActivity {
|
||||
read_available: 12,
|
||||
read_total: 16,
|
||||
|
||||
write_available: 16,
|
||||
write_total: 16
|
||||
}),
|
||||
MIN_LAYER_CONCURRENCY
|
||||
);
|
||||
|
||||
// Midpoint of the range in which we interpolate
|
||||
assert_eq!(
|
||||
TenantDownloader::layer_concurrency(RemoteStorageActivity {
|
||||
read_available: 14,
|
||||
read_total: 16,
|
||||
|
||||
write_available: 16,
|
||||
write_total: 16
|
||||
}),
|
||||
MAX_LAYER_CONCURRENCY / 2
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::time::SystemTime;
|
||||
|
||||
use crate::tenant::{remote_timeline_client::index::LayerFileMetadata, storage_layer::LayerName};
|
||||
use crate::tenant::{remote_timeline_client::index::IndexLayerMetadata, storage_layer::LayerName};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::{serde_as, DisplayFromStr, TimestampSeconds};
|
||||
@@ -38,7 +38,7 @@ pub(crate) struct HeatMapTimeline {
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub(crate) struct HeatMapLayer {
|
||||
pub(super) name: LayerName,
|
||||
pub(super) metadata: LayerFileMetadata,
|
||||
pub(super) metadata: IndexLayerMetadata,
|
||||
|
||||
#[serde_as(as = "TimestampSeconds<i64>")]
|
||||
pub(super) access_time: SystemTime,
|
||||
@@ -49,7 +49,7 @@ pub(crate) struct HeatMapLayer {
|
||||
impl HeatMapLayer {
|
||||
pub(crate) fn new(
|
||||
name: LayerName,
|
||||
metadata: LayerFileMetadata,
|
||||
metadata: IndexLayerMetadata,
|
||||
access_time: SystemTime,
|
||||
) -> Self {
|
||||
Self {
|
||||
|
||||
@@ -47,7 +47,7 @@ use hex;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::LayerAccessKind;
|
||||
use pageserver_api::shard::{ShardIdentity, TenantShardId};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fs::File;
|
||||
@@ -473,7 +473,7 @@ impl ImageLayerInner {
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
let reads = self
|
||||
.plan_reads(keyspace, None, ctx)
|
||||
.plan_reads(keyspace, ctx)
|
||||
.await
|
||||
.map_err(GetVectoredError::Other)?;
|
||||
|
||||
@@ -485,15 +485,9 @@ impl ImageLayerInner {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Traverse the layer's index to build read operations on the overlap of the input keyspace
|
||||
/// and the keys in this layer.
|
||||
///
|
||||
/// If shard_identity is provided, it will be used to filter keys down to those stored on
|
||||
/// this shard.
|
||||
async fn plan_reads(
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
shard_identity: Option<&ShardIdentity>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<VectoredRead>> {
|
||||
let mut planner = VectoredReadPlanner::new(
|
||||
@@ -513,6 +507,7 @@ impl ImageLayerInner {
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut range_end_handled = false;
|
||||
|
||||
let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
range.start.write_to_byte_slice(&mut search_key);
|
||||
|
||||
@@ -525,22 +520,12 @@ impl ImageLayerInner {
|
||||
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
|
||||
assert!(key >= range.start);
|
||||
|
||||
let flag = if let Some(shard_identity) = shard_identity {
|
||||
if shard_identity.is_key_disposable(&key) {
|
||||
BlobFlag::Ignore
|
||||
} else {
|
||||
BlobFlag::None
|
||||
}
|
||||
} else {
|
||||
BlobFlag::None
|
||||
};
|
||||
|
||||
if key >= range.end {
|
||||
planner.handle_range_end(offset);
|
||||
range_end_handled = true;
|
||||
break;
|
||||
} else {
|
||||
planner.handle(key, self.lsn, offset, flag);
|
||||
planner.handle(key, self.lsn, offset, BlobFlag::None);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -553,50 +538,6 @@ impl ImageLayerInner {
|
||||
Ok(planner.finish())
|
||||
}
|
||||
|
||||
/// Given a key range, select the parts of that range that should be retained by the ShardIdentity,
|
||||
/// then execute vectored GET operations, passing the results of all read keys into the writer.
|
||||
pub(super) async fn filter(
|
||||
&self,
|
||||
shard_identity: &ShardIdentity,
|
||||
writer: &mut ImageLayerWriter,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<usize> {
|
||||
// Fragment the range into the regions owned by this ShardIdentity
|
||||
let plan = self
|
||||
.plan_reads(
|
||||
KeySpace {
|
||||
// If asked for the total key space, plan_reads will give us all the keys in the layer
|
||||
ranges: vec![Key::MIN..Key::MAX],
|
||||
},
|
||||
Some(shard_identity),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
|
||||
let mut key_count = 0;
|
||||
for read in plan.into_iter() {
|
||||
let buf_size = read.size();
|
||||
|
||||
let buf = BytesMut::with_capacity(buf_size);
|
||||
let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
|
||||
|
||||
let frozen_buf = blobs_buf.buf.freeze();
|
||||
|
||||
for meta in blobs_buf.blobs.iter() {
|
||||
let img_buf = frozen_buf.slice(meta.start..meta.end);
|
||||
|
||||
key_count += 1;
|
||||
writer
|
||||
.put_image(meta.meta.key, img_buf, ctx)
|
||||
.await
|
||||
.context(format!("Storing key {}", meta.meta.key))?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(key_count)
|
||||
}
|
||||
|
||||
async fn do_reads_and_update_state(
|
||||
&self,
|
||||
reads: Vec<VectoredRead>,
|
||||
@@ -914,136 +855,3 @@ impl Drop for ImageLayerWriter {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::{
|
||||
key::Key,
|
||||
shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize},
|
||||
};
|
||||
use utils::{id::TimelineId, lsn::Lsn};
|
||||
|
||||
use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};
|
||||
|
||||
use super::ImageLayerWriter;
|
||||
|
||||
#[tokio::test]
|
||||
async fn image_layer_rewrite() {
|
||||
let harness = TenantHarness::create("test_image_layer_rewrite").unwrap();
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
// The LSN at which we will create an image layer to filter
|
||||
let lsn = Lsn(0xdeadbeef0000);
|
||||
|
||||
let timeline_id = TimelineId::generate();
|
||||
let timeline = tenant
|
||||
.create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// This key range contains several 0x8000 page stripes, only one of which belongs to shard zero
|
||||
let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
|
||||
let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
|
||||
let range = input_start..input_end;
|
||||
|
||||
// Build an image layer to filter
|
||||
let resident = {
|
||||
let mut writer = ImageLayerWriter::new(
|
||||
harness.conf,
|
||||
timeline_id,
|
||||
harness.tenant_shard_id,
|
||||
&range,
|
||||
lsn,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let foo_img = Bytes::from_static(&[1, 2, 3, 4]);
|
||||
let mut key = range.start;
|
||||
while key < range.end {
|
||||
writer.put_image(key, foo_img.clone(), &ctx).await.unwrap();
|
||||
|
||||
key = key.next();
|
||||
}
|
||||
writer.finish(&timeline, &ctx).await.unwrap()
|
||||
};
|
||||
let original_size = resident.metadata().file_size;
|
||||
|
||||
// Filter for various shards: this exercises cases like values at start of key range, end of key
|
||||
// range, middle of key range.
|
||||
for shard_number in 0..4 {
|
||||
let mut filtered_writer = ImageLayerWriter::new(
|
||||
harness.conf,
|
||||
timeline_id,
|
||||
harness.tenant_shard_id,
|
||||
&range,
|
||||
lsn,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// TenantHarness gave us an unsharded tenant, but we'll use a sharded ShardIdentity
|
||||
// to exercise filter()
|
||||
let shard_identity = ShardIdentity::new(
|
||||
ShardNumber(shard_number),
|
||||
ShardCount::new(4),
|
||||
ShardStripeSize(0x8000),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let wrote_keys = resident
|
||||
.filter(&shard_identity, &mut filtered_writer, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let replacement = if wrote_keys > 0 {
|
||||
Some(filtered_writer.finish(&timeline, &ctx).await.unwrap())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// This exact size and those below will need updating as/when the layer encoding changes, but
|
||||
// should be deterministic for a given version of the format, as we used no randomness generating the input.
|
||||
assert_eq!(original_size, 1597440);
|
||||
|
||||
match shard_number {
|
||||
0 => {
|
||||
// We should have written out just one stripe for our shard identity
|
||||
assert_eq!(wrote_keys, 0x8000);
|
||||
let replacement = replacement.unwrap();
|
||||
|
||||
// We should have dropped some of the data
|
||||
assert!(replacement.metadata().file_size < original_size);
|
||||
assert!(replacement.metadata().file_size > 0);
|
||||
|
||||
// Assert that we dropped ~3/4 of the data.
|
||||
assert_eq!(replacement.metadata().file_size, 417792);
|
||||
}
|
||||
1 => {
|
||||
// Shard 1 has no keys in our input range
|
||||
assert_eq!(wrote_keys, 0x0);
|
||||
assert!(replacement.is_none());
|
||||
}
|
||||
2 => {
|
||||
// Shard 2 has one stripes in the input range
|
||||
assert_eq!(wrote_keys, 0x8000);
|
||||
let replacement = replacement.unwrap();
|
||||
assert!(replacement.metadata().file_size < original_size);
|
||||
assert!(replacement.metadata().file_size > 0);
|
||||
assert_eq!(replacement.metadata().file_size, 417792);
|
||||
}
|
||||
3 => {
|
||||
// Shard 3 has two stripes in the input range
|
||||
assert_eq!(wrote_keys, 0x10000);
|
||||
let replacement = replacement.unwrap();
|
||||
assert!(replacement.metadata().file_size < original_size);
|
||||
assert!(replacement.metadata().file_size > 0);
|
||||
assert_eq!(replacement.metadata().file_size, 811008);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::{
|
||||
HistoricLayerInfo, LayerAccessKind, LayerResidenceEventReason, LayerResidenceStatus,
|
||||
};
|
||||
use pageserver_api::shard::{ShardIdentity, ShardIndex, TenantShardId};
|
||||
use pageserver_api::shard::{ShardIndex, TenantShardId};
|
||||
use std::ops::Range;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Weak};
|
||||
@@ -23,10 +23,10 @@ use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::{remote_timeline_client::LayerFileMetadata, Timeline};
|
||||
|
||||
use super::delta_layer::{self, DeltaEntry};
|
||||
use super::image_layer::{self};
|
||||
use super::image_layer;
|
||||
use super::{
|
||||
AsLayerDesc, ImageLayerWriter, LayerAccessStats, LayerAccessStatsReset, LayerName,
|
||||
PersistentLayerDesc, ValueReconstructResult, ValueReconstructState, ValuesReconstructState,
|
||||
AsLayerDesc, LayerAccessStats, LayerAccessStatsReset, LayerName, PersistentLayerDesc,
|
||||
ValueReconstructResult, ValueReconstructState, ValuesReconstructState,
|
||||
};
|
||||
|
||||
use utils::generation::Generation;
|
||||
@@ -161,7 +161,7 @@ impl Layer {
|
||||
timeline.tenant_shard_id,
|
||||
timeline.timeline_id,
|
||||
file_name,
|
||||
metadata.file_size,
|
||||
metadata.file_size(),
|
||||
);
|
||||
|
||||
let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted);
|
||||
@@ -194,7 +194,7 @@ impl Layer {
|
||||
timeline.tenant_shard_id,
|
||||
timeline.timeline_id,
|
||||
file_name,
|
||||
metadata.file_size,
|
||||
metadata.file_size(),
|
||||
);
|
||||
|
||||
let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident);
|
||||
@@ -227,7 +227,7 @@ impl Layer {
|
||||
|
||||
timeline
|
||||
.metrics
|
||||
.resident_physical_size_add(metadata.file_size);
|
||||
.resident_physical_size_add(metadata.file_size());
|
||||
|
||||
ResidentLayer { downloaded, owner }
|
||||
}
|
||||
@@ -1802,15 +1802,16 @@ impl ResidentLayer {
|
||||
use LayerKind::*;
|
||||
|
||||
let owner = &self.owner.0;
|
||||
|
||||
match self.downloaded.get(owner, ctx).await? {
|
||||
Delta(ref d) => {
|
||||
// this is valid because the DownloadedLayer::kind is a OnceCell, not a
|
||||
// Mutex<OnceCell>, so we cannot go and deinitialize the value with OnceCell::take
|
||||
// while it's being held.
|
||||
owner
|
||||
.access_stats
|
||||
.record_access(LayerAccessKind::KeyIter, ctx);
|
||||
|
||||
// this is valid because the DownloadedLayer::kind is a OnceCell, not a
|
||||
// Mutex<OnceCell>, so we cannot go and deinitialize the value with OnceCell::take
|
||||
// while it's being held.
|
||||
delta_layer::DeltaLayerInner::load_keys(d, ctx)
|
||||
.await
|
||||
.with_context(|| format!("Layer index is corrupted for {self}"))
|
||||
@@ -1819,23 +1820,6 @@ impl ResidentLayer {
|
||||
}
|
||||
}
|
||||
|
||||
/// Read all they keys in this layer which match the ShardIdentity, and write them all to
|
||||
/// the provided writer. Return the number of keys written.
|
||||
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(layer=%self))]
|
||||
pub(crate) async fn filter<'a>(
|
||||
&'a self,
|
||||
shard_identity: &ShardIdentity,
|
||||
writer: &mut ImageLayerWriter,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<usize> {
|
||||
use LayerKind::*;
|
||||
|
||||
match self.downloaded.get(&self.owner.0, ctx).await? {
|
||||
Delta(_) => anyhow::bail!(format!("cannot filter() on a delta layer {self}")),
|
||||
Image(i) => i.filter(shard_identity, writer, ctx).await,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the amount of keys and values written to the writer.
|
||||
pub(crate) async fn copy_delta_prefix(
|
||||
&self,
|
||||
|
||||
@@ -41,7 +41,6 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::{
|
||||
bin_ser::BeSer,
|
||||
fs_ext,
|
||||
sync::gate::{Gate, GateGuard},
|
||||
vec_map::VecMap,
|
||||
};
|
||||
@@ -61,7 +60,6 @@ use std::{
|
||||
ops::ControlFlow,
|
||||
};
|
||||
|
||||
use crate::pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS;
|
||||
use crate::{
|
||||
aux_file::AuxFileSizeEstimator,
|
||||
tenant::{
|
||||
@@ -90,6 +88,9 @@ use crate::{
|
||||
metrics::ScanLatencyOngoingRecording, tenant::timeline::logical_size::CurrentLogicalSize,
|
||||
};
|
||||
use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind};
|
||||
use crate::{
|
||||
pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS, tenant::timeline::init::LocalLayerFileMetadata,
|
||||
};
|
||||
use crate::{
|
||||
pgdatadir_mapping::{AuxFilesDirectory, DirectoryKind},
|
||||
virtual_file::{MaybeFatalIo, VirtualFile},
|
||||
@@ -1423,7 +1424,7 @@ impl Timeline {
|
||||
let layer_map = guard.layer_map();
|
||||
let mut size = 0;
|
||||
for l in layer_map.iter_historic_layers() {
|
||||
size += l.file_size;
|
||||
size += l.file_size();
|
||||
}
|
||||
size
|
||||
}
|
||||
@@ -2453,6 +2454,8 @@ impl Timeline {
|
||||
let span = tracing::Span::current();
|
||||
|
||||
// Copy to move into the task we're about to spawn
|
||||
let generation = self.generation;
|
||||
let shard = self.get_shard_index();
|
||||
let this = self.myself.upgrade().expect("&self method holds the arc");
|
||||
|
||||
let (loaded_layers, needs_cleanup, total_physical_size) = tokio::task::spawn_blocking({
|
||||
@@ -2466,14 +2469,11 @@ impl Timeline {
|
||||
|
||||
for discovered in discovered {
|
||||
let (name, kind) = match discovered {
|
||||
Discovered::Layer(layer_file_name, local_metadata) => {
|
||||
discovered_layers.push((layer_file_name, local_metadata));
|
||||
Discovered::Layer(layer_file_name, local_path, file_size) => {
|
||||
discovered_layers.push((layer_file_name, local_path, file_size));
|
||||
continue;
|
||||
}
|
||||
Discovered::IgnoredBackup(path) => {
|
||||
std::fs::remove_file(path)
|
||||
.or_else(fs_ext::ignore_not_found)
|
||||
.fatal_err("Removing .old file");
|
||||
Discovered::IgnoredBackup => {
|
||||
continue;
|
||||
}
|
||||
Discovered::Unknown(file_name) => {
|
||||
@@ -2499,8 +2499,13 @@ impl Timeline {
|
||||
);
|
||||
}
|
||||
|
||||
let decided =
|
||||
init::reconcile(discovered_layers, index_part.as_ref(), disk_consistent_lsn);
|
||||
let decided = init::reconcile(
|
||||
discovered_layers,
|
||||
index_part.as_ref(),
|
||||
disk_consistent_lsn,
|
||||
generation,
|
||||
shard,
|
||||
);
|
||||
|
||||
let mut loaded_layers = Vec::new();
|
||||
let mut needs_cleanup = Vec::new();
|
||||
@@ -2508,6 +2513,21 @@ impl Timeline {
|
||||
|
||||
for (name, decision) in decided {
|
||||
let decision = match decision {
|
||||
Ok(UseRemote { local, remote }) => {
|
||||
// Remote is authoritative, but we may still choose to retain
|
||||
// the local file if the contents appear to match
|
||||
if local.metadata.file_size() == remote.file_size() {
|
||||
// Use the local file, but take the remote metadata so that we pick up
|
||||
// the correct generation.
|
||||
UseLocal(LocalLayerFileMetadata {
|
||||
metadata: remote,
|
||||
local_path: local.local_path,
|
||||
})
|
||||
} else {
|
||||
init::cleanup_local_file_for_remote(&local, &remote)?;
|
||||
UseRemote { local, remote }
|
||||
}
|
||||
}
|
||||
Ok(decision) => decision,
|
||||
Err(DismissedLayer::Future { local }) => {
|
||||
if let Some(local) = local {
|
||||
@@ -2525,11 +2545,6 @@ impl Timeline {
|
||||
// this file never existed remotely, we will have to do rework
|
||||
continue;
|
||||
}
|
||||
Err(DismissedLayer::BadMetadata(local)) => {
|
||||
init::cleanup_local_file_for_remote(&local)?;
|
||||
// this file never existed remotely, we will have to do rework
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
match &name {
|
||||
@@ -2540,12 +2555,14 @@ impl Timeline {
|
||||
tracing::debug!(layer=%name, ?decision, "applied");
|
||||
|
||||
let layer = match decision {
|
||||
Resident { local, remote } => {
|
||||
total_physical_size += local.file_size;
|
||||
Layer::for_resident(conf, &this, local.local_path, name, remote)
|
||||
UseLocal(local) => {
|
||||
total_physical_size += local.metadata.file_size();
|
||||
Layer::for_resident(conf, &this, local.local_path, name, local.metadata)
|
||||
.drop_eviction_guard()
|
||||
}
|
||||
Evicted(remote) => Layer::for_evicted(conf, &this, name, remote),
|
||||
Evicted(remote) | UseRemote { remote, .. } => {
|
||||
Layer::for_evicted(conf, &this, name, remote)
|
||||
}
|
||||
};
|
||||
|
||||
loaded_layers.push(layer);
|
||||
@@ -3054,7 +3071,7 @@ impl Timeline {
|
||||
|
||||
HeatMapLayer::new(
|
||||
layer.layer_desc().layer_name(),
|
||||
layer.metadata(),
|
||||
(&layer.metadata()).into(),
|
||||
last_activity_ts,
|
||||
)
|
||||
});
|
||||
@@ -3865,7 +3882,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
let (layers_to_upload, delta_layer_to_add) = if create_image_layer {
|
||||
let (layers_to_upload, delta_layers_to_add) = if create_image_layer {
|
||||
// Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
|
||||
// require downloading anything during initial import.
|
||||
let ((rel_partition, metadata_partition), _lsn) = self
|
||||
@@ -3882,26 +3899,23 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// For metadata, always create delta layers.
|
||||
let delta_layer = if !metadata_partition.parts.is_empty() {
|
||||
assert_eq!(
|
||||
metadata_partition.parts.len(),
|
||||
1,
|
||||
"currently sparse keyspace should only contain a single aux file keyspace"
|
||||
);
|
||||
let metadata_keyspace = &metadata_partition.parts[0];
|
||||
assert_eq!(
|
||||
metadata_keyspace.0.ranges.len(),
|
||||
1,
|
||||
"aux file keyspace should be a single range"
|
||||
);
|
||||
self.create_delta_layer(
|
||||
&frozen_layer,
|
||||
Some(metadata_keyspace.0.ranges[0].clone()),
|
||||
ctx,
|
||||
)
|
||||
.await?
|
||||
let delta_layers = if !metadata_partition.parts.is_empty() {
|
||||
// In the current implementation, the metadata partition will only have one part, and the part will only have
|
||||
// one single key range. This might change in the future.
|
||||
let mut delta_layers_created = Vec::new();
|
||||
for ks in &metadata_partition.parts {
|
||||
for range in &ks.0.ranges {
|
||||
let layer = self
|
||||
.create_delta_layer(&frozen_layer, Some(range.clone()), ctx)
|
||||
.await?;
|
||||
if let Some(layer) = layer {
|
||||
delta_layers_created.push(layer);
|
||||
}
|
||||
}
|
||||
}
|
||||
delta_layers_created
|
||||
} else {
|
||||
None
|
||||
Vec::new()
|
||||
};
|
||||
|
||||
// For image layers, we add them immediately into the layer map.
|
||||
@@ -3916,12 +3930,8 @@ impl Timeline {
|
||||
.await?,
|
||||
);
|
||||
|
||||
if let Some(delta_layer) = delta_layer {
|
||||
layers_to_upload.push(delta_layer.clone());
|
||||
(layers_to_upload, Some(delta_layer))
|
||||
} else {
|
||||
(layers_to_upload, None)
|
||||
}
|
||||
layers_to_upload.extend(delta_layers.iter().cloned());
|
||||
(layers_to_upload, delta_layers)
|
||||
} else {
|
||||
// Normal case, write out a L0 delta layer file.
|
||||
// `create_delta_layer` will not modify the layer map.
|
||||
@@ -3929,12 +3939,7 @@ impl Timeline {
|
||||
let Some(layer) = self.create_delta_layer(&frozen_layer, None, ctx).await? else {
|
||||
panic!("delta layer cannot be empty if no filter is applied");
|
||||
};
|
||||
(
|
||||
// FIXME: even though we have a single image and single delta layer assumption
|
||||
// we push them to vec
|
||||
vec![layer.clone()],
|
||||
Some(layer),
|
||||
)
|
||||
(vec![layer.clone()], vec![layer])
|
||||
};
|
||||
|
||||
pausable_failpoint!("flush-layer-cancel-after-writing-layer-out-pausable");
|
||||
@@ -3955,7 +3960,7 @@ impl Timeline {
|
||||
return Err(FlushLayerError::Cancelled);
|
||||
}
|
||||
|
||||
guard.finish_flush_l0_layer(delta_layer_to_add.as_ref(), &frozen_layer, &self.metrics);
|
||||
guard.finish_flush_l0_layer(&delta_layers_to_add, &frozen_layer, &self.metrics);
|
||||
|
||||
if self.set_disk_consistent_lsn(disk_consistent_lsn) {
|
||||
// Schedule remote uploads that will reflect our new disk_consistent_lsn
|
||||
@@ -4708,16 +4713,11 @@ impl Timeline {
|
||||
|
||||
async fn rewrite_layers(
|
||||
self: &Arc<Self>,
|
||||
mut replace_layers: Vec<(Layer, ResidentLayer)>,
|
||||
mut drop_layers: Vec<Layer>,
|
||||
replace_layers: Vec<(Layer, ResidentLayer)>,
|
||||
drop_layers: Vec<Layer>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut guard = self.layers.write().await;
|
||||
|
||||
// Trim our lists in case our caller (compaction) raced with someone else (GC) removing layers: we want
|
||||
// to avoid double-removing, and avoid rewriting something that was removed.
|
||||
replace_layers.retain(|(l, _)| guard.contains(l));
|
||||
drop_layers.retain(|l| guard.contains(l));
|
||||
|
||||
guard.rewrite_layers(&replace_layers, &drop_layers, &self.metrics);
|
||||
|
||||
let upload_layers: Vec<_> = replace_layers.into_iter().map(|r| r.1).collect();
|
||||
@@ -5592,6 +5592,26 @@ fn is_send() {
|
||||
_assert_send::<TimelineWriter<'_>>();
|
||||
}
|
||||
|
||||
/// Add a suffix to a layer file's name: .{num}.old
|
||||
/// Uses the first available num (starts at 0)
|
||||
fn rename_to_backup(path: &Utf8Path) -> anyhow::Result<()> {
|
||||
let filename = path
|
||||
.file_name()
|
||||
.ok_or_else(|| anyhow!("Path {path} don't have a file name"))?;
|
||||
let mut new_path = path.to_owned();
|
||||
|
||||
for i in 0u32.. {
|
||||
new_path.set_file_name(format!("{filename}.{i}.old"));
|
||||
if !new_path.exists() {
|
||||
std::fs::rename(path, &new_path)
|
||||
.with_context(|| format!("rename {path:?} to {new_path:?}"))?;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
bail!("couldn't find an unused backup number for {:?}", path)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use utils::{id::TimelineId, lsn::Lsn};
|
||||
|
||||
@@ -176,24 +176,13 @@ impl Timeline {
|
||||
async fn compact_shard_ancestors(
|
||||
self: &Arc<Self>,
|
||||
rewrite_max: usize,
|
||||
ctx: &RequestContext,
|
||||
_ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut drop_layers = Vec::new();
|
||||
let mut layers_to_rewrite: Vec<Layer> = Vec::new();
|
||||
let layers_to_rewrite: Vec<Layer> = Vec::new();
|
||||
|
||||
// We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
|
||||
// layer is behind this Lsn, it indicates that the layer is being retained beyond the
|
||||
// pitr_interval, for example because a branchpoint references it.
|
||||
//
|
||||
// Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
|
||||
// are rewriting layers.
|
||||
let latest_gc_cutoff = self.get_latest_gc_cutoff_lsn();
|
||||
|
||||
tracing::info!(
|
||||
"latest_gc_cutoff: {}, pitr cutoff {}",
|
||||
*latest_gc_cutoff,
|
||||
self.gc_info.read().unwrap().cutoffs.pitr
|
||||
);
|
||||
// We will use the PITR cutoff as a condition for rewriting layers.
|
||||
let pitr_cutoff = self.gc_info.read().unwrap().cutoffs.pitr;
|
||||
|
||||
let layers = self.layers.read().await;
|
||||
for layer_desc in layers.layer_map().iter_historic_layers() {
|
||||
@@ -252,9 +241,9 @@ impl Timeline {
|
||||
|
||||
// Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
|
||||
// without incurring the I/O cost of a rewrite.
|
||||
if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
|
||||
debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
|
||||
layer_desc.get_lsn_range().end, *latest_gc_cutoff);
|
||||
if layer_desc.get_lsn_range().end >= pitr_cutoff {
|
||||
debug!(%layer, "Skipping rewrite of layer still in PITR window ({} >= {})",
|
||||
layer_desc.get_lsn_range().end, pitr_cutoff);
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -264,10 +253,13 @@ impl Timeline {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Only rewrite layers if their generations differ. This guarantees:
|
||||
// - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
|
||||
// - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
|
||||
if layer.metadata().generation == self.generation {
|
||||
// Only rewrite layers if they would have different remote paths: either they belong to this
|
||||
// shard but an old generation, or they belonged to another shard. This also implicitly
|
||||
// guarantees that the layer is persistent in remote storage (as only remote persistent
|
||||
// layers are carried across shard splits, any local-only layer would be in the current generation)
|
||||
if layer.metadata().generation == self.generation
|
||||
&& layer.metadata().shard.shard_count == self.shard_identity.count
|
||||
{
|
||||
debug!(%layer, "Skipping rewrite, is not from old generation");
|
||||
continue;
|
||||
}
|
||||
@@ -280,69 +272,18 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// Fall through: all our conditions for doing a rewrite passed.
|
||||
layers_to_rewrite.push(layer);
|
||||
// TODO: implement rewriting
|
||||
tracing::debug!(%layer, "Would rewrite layer");
|
||||
}
|
||||
|
||||
// Drop read lock on layer map before we start doing time-consuming I/O
|
||||
// Drop the layers read lock: we will acquire it for write in [`Self::rewrite_layers`]
|
||||
drop(layers);
|
||||
|
||||
let mut replace_image_layers = Vec::new();
|
||||
|
||||
for layer in layers_to_rewrite {
|
||||
tracing::info!(layer=%layer, "Rewriting layer after shard split...");
|
||||
let mut image_layer_writer = ImageLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
&layer.layer_desc().key_range,
|
||||
layer.layer_desc().image_layer_lsn(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Safety of layer rewrites:
|
||||
// - We are writing to a different local file path than we are reading from, so the old Layer
|
||||
// cannot interfere with the new one.
|
||||
// - In the page cache, contents for a particular VirtualFile are stored with a file_id that
|
||||
// is different for two layers with the same name (in `ImageLayerInner::new` we always
|
||||
// acquire a fresh id from [`crate::page_cache::next_file_id`]. So readers do not risk
|
||||
// reading the index from one layer file, and then data blocks from the rewritten layer file.
|
||||
// - Any readers that have a reference to the old layer will keep it alive until they are done
|
||||
// with it. If they are trying to promote from remote storage, that will fail, but this is the same
|
||||
// as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
|
||||
// - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
|
||||
// - GC, which at worst witnesses us "undelete" a layer that they just deleted.
|
||||
// - ingestion, which only inserts layers, therefore cannot collide with us.
|
||||
let resident = layer.download_and_keep_resident().await?;
|
||||
|
||||
let keys_written = resident
|
||||
.filter(&self.shard_identity, &mut image_layer_writer, ctx)
|
||||
.await?;
|
||||
|
||||
if keys_written > 0 {
|
||||
let new_layer = image_layer_writer.finish(self, ctx).await?;
|
||||
tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
|
||||
layer.metadata().file_size,
|
||||
new_layer.metadata().file_size);
|
||||
|
||||
replace_image_layers.push((layer, new_layer));
|
||||
} else {
|
||||
// Drop the old layer. Usually for this case we would already have noticed that
|
||||
// the layer has no data for us with the ShardedRange check above, but
|
||||
drop_layers.push(layer);
|
||||
}
|
||||
}
|
||||
|
||||
// At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
|
||||
// metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
|
||||
// to remote index) and be removed. This is inefficient but safe.
|
||||
fail::fail_point!("compact-shard-ancestors-localonly");
|
||||
// TODO: collect layers to rewrite
|
||||
let replace_layers = Vec::new();
|
||||
|
||||
// Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
|
||||
self.rewrite_layers(replace_image_layers, drop_layers)
|
||||
.await?;
|
||||
|
||||
fail::fail_point!("compact-shard-ancestors-enqueued");
|
||||
self.rewrite_layers(replace_layers, drop_layers).await?;
|
||||
|
||||
// We wait for all uploads to complete before finishing this compaction stage. This is not
|
||||
// necessary for correctness, but it simplifies testing, and avoids proceeding with another
|
||||
@@ -350,8 +291,6 @@ impl Timeline {
|
||||
// load.
|
||||
self.remote_client.wait_completion().await?;
|
||||
|
||||
fail::fail_point!("compact-shard-ancestors-persistent");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -7,20 +7,19 @@ use crate::{
|
||||
index::{IndexPart, LayerFileMetadata},
|
||||
},
|
||||
storage_layer::LayerName,
|
||||
Generation,
|
||||
},
|
||||
};
|
||||
use anyhow::Context;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use std::{
|
||||
collections::{hash_map, HashMap},
|
||||
str::FromStr,
|
||||
};
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use std::{collections::HashMap, str::FromStr};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
/// Identified files in the timeline directory.
|
||||
pub(super) enum Discovered {
|
||||
/// The only one we care about
|
||||
Layer(LayerName, LocalLayerFileMetadata),
|
||||
Layer(LayerName, Utf8PathBuf, u64),
|
||||
/// Old ephmeral files from previous launches, should be removed
|
||||
Ephemeral(String),
|
||||
/// Old temporary timeline files, unsure what these really are, should be removed
|
||||
@@ -28,7 +27,7 @@ pub(super) enum Discovered {
|
||||
/// Temporary on-demand download files, should be removed
|
||||
TemporaryDownload(String),
|
||||
/// Backup file from previously future layers
|
||||
IgnoredBackup(Utf8PathBuf),
|
||||
IgnoredBackup,
|
||||
/// Unrecognized, warn about these
|
||||
Unknown(String),
|
||||
}
|
||||
@@ -44,15 +43,12 @@ pub(super) fn scan_timeline_dir(path: &Utf8Path) -> anyhow::Result<Vec<Discovere
|
||||
let discovered = match LayerName::from_str(&file_name) {
|
||||
Ok(file_name) => {
|
||||
let file_size = direntry.metadata()?.len();
|
||||
Discovered::Layer(
|
||||
file_name,
|
||||
LocalLayerFileMetadata::new(direntry.path().to_owned(), file_size),
|
||||
)
|
||||
Discovered::Layer(file_name, direntry.path().to_owned(), file_size)
|
||||
}
|
||||
Err(_) => {
|
||||
if file_name.ends_with(".old") {
|
||||
// ignore these
|
||||
Discovered::IgnoredBackup(direntry.path().to_owned())
|
||||
Discovered::IgnoredBackup
|
||||
} else if remote_timeline_client::is_temp_download_file(direntry.path()) {
|
||||
Discovered::TemporaryDownload(file_name)
|
||||
} else if is_ephemeral_file(&file_name) {
|
||||
@@ -75,32 +71,37 @@ pub(super) fn scan_timeline_dir(path: &Utf8Path) -> anyhow::Result<Vec<Discovere
|
||||
/// this structure extends it with metadata describing the layer's presence in local storage.
|
||||
#[derive(Clone, Debug)]
|
||||
pub(super) struct LocalLayerFileMetadata {
|
||||
pub(super) file_size: u64,
|
||||
pub(super) metadata: LayerFileMetadata,
|
||||
pub(super) local_path: Utf8PathBuf,
|
||||
}
|
||||
|
||||
impl LocalLayerFileMetadata {
|
||||
pub fn new(local_path: Utf8PathBuf, file_size: u64) -> Self {
|
||||
pub fn new(
|
||||
local_path: Utf8PathBuf,
|
||||
file_size: u64,
|
||||
generation: Generation,
|
||||
shard: ShardIndex,
|
||||
) -> Self {
|
||||
Self {
|
||||
local_path,
|
||||
file_size,
|
||||
metadata: LayerFileMetadata::new(file_size, generation, shard),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// For a layer that is present in remote metadata, this type describes how to handle
|
||||
/// it during startup: it is either Resident (and we have some metadata about a local file),
|
||||
/// or it is Evicted (and we only have remote metadata).
|
||||
/// Decision on what to do with a layer file after considering its local and remote metadata.
|
||||
#[derive(Clone, Debug)]
|
||||
pub(super) enum Decision {
|
||||
/// The layer is not present locally.
|
||||
Evicted(LayerFileMetadata),
|
||||
/// The layer is present locally, and metadata matches: we may hook up this layer to the
|
||||
/// existing file in local storage.
|
||||
Resident {
|
||||
/// The layer is present locally, but local metadata does not match remote; we must
|
||||
/// delete it and treat it as evicted.
|
||||
UseRemote {
|
||||
local: LocalLayerFileMetadata,
|
||||
remote: LayerFileMetadata,
|
||||
},
|
||||
/// The layer is present locally, and metadata matches.
|
||||
UseLocal(LocalLayerFileMetadata),
|
||||
}
|
||||
|
||||
/// A layer needs to be left out of the layer map.
|
||||
@@ -116,81 +117,77 @@ pub(super) enum DismissedLayer {
|
||||
/// In order to make crash safe updates to layer map, we must dismiss layers which are only
|
||||
/// found locally or not yet included in the remote `index_part.json`.
|
||||
LocalOnly(LocalLayerFileMetadata),
|
||||
|
||||
/// The layer exists in remote storage but the local layer's metadata (e.g. file size)
|
||||
/// does not match it
|
||||
BadMetadata(LocalLayerFileMetadata),
|
||||
}
|
||||
|
||||
/// Merges local discoveries and remote [`IndexPart`] to a collection of decisions.
|
||||
pub(super) fn reconcile(
|
||||
local_layers: Vec<(LayerName, LocalLayerFileMetadata)>,
|
||||
discovered: Vec<(LayerName, Utf8PathBuf, u64)>,
|
||||
index_part: Option<&IndexPart>,
|
||||
disk_consistent_lsn: Lsn,
|
||||
generation: Generation,
|
||||
shard: ShardIndex,
|
||||
) -> Vec<(LayerName, Result<Decision, DismissedLayer>)> {
|
||||
let Some(index_part) = index_part else {
|
||||
// If we have no remote metadata, no local layer files are considered valid to load
|
||||
return local_layers
|
||||
.into_iter()
|
||||
.map(|(layer_name, local_metadata)| {
|
||||
(layer_name, Err(DismissedLayer::LocalOnly(local_metadata)))
|
||||
})
|
||||
.collect();
|
||||
};
|
||||
use Decision::*;
|
||||
|
||||
let mut result = Vec::new();
|
||||
// name => (local_metadata, remote_metadata)
|
||||
type Collected =
|
||||
HashMap<LayerName, (Option<LocalLayerFileMetadata>, Option<LayerFileMetadata>)>;
|
||||
|
||||
let mut remote_layers = HashMap::new();
|
||||
let mut discovered = discovered
|
||||
.into_iter()
|
||||
.map(|(layer_name, local_path, file_size)| {
|
||||
(
|
||||
layer_name,
|
||||
// The generation and shard here will be corrected to match IndexPart in the merge below, unless
|
||||
// it is not in IndexPart, in which case using our current generation makes sense
|
||||
// because it will be uploaded in this generation.
|
||||
(
|
||||
Some(LocalLayerFileMetadata::new(
|
||||
local_path, file_size, generation, shard,
|
||||
)),
|
||||
None,
|
||||
),
|
||||
)
|
||||
})
|
||||
.collect::<Collected>();
|
||||
|
||||
// Construct Decisions for layers that are found locally, if they're in remote metadata. Otherwise
|
||||
// construct DismissedLayers to get rid of them.
|
||||
for (layer_name, local_metadata) in local_layers {
|
||||
let Some(remote_metadata) = index_part.layer_metadata.get(&layer_name) else {
|
||||
result.push((layer_name, Err(DismissedLayer::LocalOnly(local_metadata))));
|
||||
continue;
|
||||
};
|
||||
|
||||
if remote_metadata.file_size != local_metadata.file_size {
|
||||
result.push((layer_name, Err(DismissedLayer::BadMetadata(local_metadata))));
|
||||
continue;
|
||||
}
|
||||
|
||||
remote_layers.insert(
|
||||
layer_name,
|
||||
Decision::Resident {
|
||||
local: local_metadata,
|
||||
remote: remote_metadata.clone(),
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
// Construct Decision for layers that were not found locally
|
||||
// merge any index_part information, when available
|
||||
index_part
|
||||
.layer_metadata
|
||||
.iter()
|
||||
.as_ref()
|
||||
.map(|ip| ip.layer_metadata.iter())
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.map(|(name, metadata)| (name, LayerFileMetadata::from(metadata)))
|
||||
.for_each(|(name, metadata)| {
|
||||
if let hash_map::Entry::Vacant(entry) = remote_layers.entry(name.clone()) {
|
||||
entry.insert(Decision::Evicted(metadata.clone()));
|
||||
if let Some(existing) = discovered.get_mut(name) {
|
||||
existing.1 = Some(metadata);
|
||||
} else {
|
||||
discovered.insert(name.to_owned(), (None, Some(metadata)));
|
||||
}
|
||||
});
|
||||
|
||||
// For layers that were found in authoritative remote metadata, apply a final check that they are within
|
||||
// the disk_consistent_lsn.
|
||||
result.extend(remote_layers.into_iter().map(|(name, decision)| {
|
||||
if name.is_in_future(disk_consistent_lsn) {
|
||||
match decision {
|
||||
Decision::Evicted(_remote) => (name, Err(DismissedLayer::Future { local: None })),
|
||||
Decision::Resident {
|
||||
local,
|
||||
remote: _remote,
|
||||
} => (name, Err(DismissedLayer::Future { local: Some(local) })),
|
||||
}
|
||||
} else {
|
||||
(name, Ok(decision))
|
||||
}
|
||||
}));
|
||||
discovered
|
||||
.into_iter()
|
||||
.map(|(name, (local, remote))| {
|
||||
let decision = if name.is_in_future(disk_consistent_lsn) {
|
||||
Err(DismissedLayer::Future { local })
|
||||
} else {
|
||||
match (local, remote) {
|
||||
(Some(local), Some(remote)) if local.metadata != remote => {
|
||||
Ok(UseRemote { local, remote })
|
||||
}
|
||||
(Some(x), Some(_)) => Ok(UseLocal(x)),
|
||||
(None, Some(x)) => Ok(Evicted(x)),
|
||||
(Some(x), None) => Err(DismissedLayer::LocalOnly(x)),
|
||||
(None, None) => {
|
||||
unreachable!("there must not be any non-local non-remote files")
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
result
|
||||
(name, decision)
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
pub(super) fn cleanup(path: &Utf8Path, kind: &str) -> anyhow::Result<()> {
|
||||
@@ -199,15 +196,25 @@ pub(super) fn cleanup(path: &Utf8Path, kind: &str) -> anyhow::Result<()> {
|
||||
std::fs::remove_file(path).with_context(|| format!("failed to remove {kind} at {path}"))
|
||||
}
|
||||
|
||||
pub(super) fn cleanup_local_file_for_remote(local: &LocalLayerFileMetadata) -> anyhow::Result<()> {
|
||||
let local_size = local.file_size;
|
||||
pub(super) fn cleanup_local_file_for_remote(
|
||||
local: &LocalLayerFileMetadata,
|
||||
remote: &LayerFileMetadata,
|
||||
) -> anyhow::Result<()> {
|
||||
let local_size = local.metadata.file_size();
|
||||
let remote_size = remote.file_size();
|
||||
let path = &local.local_path;
|
||||
let file_name = path.file_name().expect("must be file path");
|
||||
tracing::warn!(
|
||||
"removing local file {file_name:?} because it has unexpected length {local_size};"
|
||||
);
|
||||
|
||||
std::fs::remove_file(path).with_context(|| format!("failed to remove layer at {path}"))
|
||||
let file_name = path.file_name().expect("must be file path");
|
||||
tracing::warn!("removing local file {file_name:?} because it has unexpected length {local_size}; length in remote index is {remote_size}");
|
||||
if let Err(err) = crate::tenant::timeline::rename_to_backup(path) {
|
||||
assert!(
|
||||
path.exists(),
|
||||
"we would leave the local_layer without a file if this does not hold: {path}",
|
||||
);
|
||||
Err(err)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn cleanup_future_layer(
|
||||
@@ -229,8 +236,8 @@ pub(super) fn cleanup_local_only_file(
|
||||
) -> anyhow::Result<()> {
|
||||
let kind = name.kind();
|
||||
tracing::info!(
|
||||
"found local-only {kind} layer {name} size {}",
|
||||
local.file_size
|
||||
"found local-only {kind} layer {name}, metadata {:?}",
|
||||
local.metadata
|
||||
);
|
||||
std::fs::remove_file(&local.local_path)?;
|
||||
Ok(())
|
||||
|
||||
@@ -166,7 +166,7 @@ impl LayerManager {
|
||||
/// Flush a frozen layer and add the written delta layer to the layer map.
|
||||
pub(crate) fn finish_flush_l0_layer(
|
||||
&mut self,
|
||||
delta_layer: Option<&ResidentLayer>,
|
||||
delta_layers: &[ResidentLayer],
|
||||
frozen_layer_for_check: &Arc<InMemoryLayer>,
|
||||
metrics: &TimelineMetrics,
|
||||
) {
|
||||
@@ -181,10 +181,12 @@ impl LayerManager {
|
||||
// layer to disk at the same time, that would not work.
|
||||
assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
|
||||
|
||||
if let Some(l) = delta_layer {
|
||||
if !delta_layers.is_empty() {
|
||||
let mut updates = self.layer_map.batch_update();
|
||||
Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
|
||||
metrics.record_new_file_metrics(l.layer_desc().file_size);
|
||||
for l in delta_layers {
|
||||
Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
|
||||
metrics.record_new_file_metrics(l.layer_desc().file_size);
|
||||
}
|
||||
updates.flush();
|
||||
}
|
||||
}
|
||||
@@ -212,34 +214,13 @@ impl LayerManager {
|
||||
&mut self,
|
||||
rewrite_layers: &[(Layer, ResidentLayer)],
|
||||
drop_layers: &[Layer],
|
||||
metrics: &TimelineMetrics,
|
||||
_metrics: &TimelineMetrics,
|
||||
) {
|
||||
let mut updates = self.layer_map.batch_update();
|
||||
for (old_layer, new_layer) in rewrite_layers {
|
||||
debug_assert_eq!(
|
||||
old_layer.layer_desc().key_range,
|
||||
new_layer.layer_desc().key_range
|
||||
);
|
||||
debug_assert_eq!(
|
||||
old_layer.layer_desc().lsn_range,
|
||||
new_layer.layer_desc().lsn_range
|
||||
);
|
||||
|
||||
// Safety: we may never rewrite the same file in-place. Callers are responsible
|
||||
// for ensuring that they only rewrite layers after something changes the path,
|
||||
// such as an increment in the generation number.
|
||||
assert_ne!(old_layer.local_path(), new_layer.local_path());
|
||||
// TODO: implement rewrites (currently this code path only used for drops)
|
||||
assert!(rewrite_layers.is_empty());
|
||||
|
||||
Self::delete_historic_layer(old_layer, &mut updates, &mut self.layer_fmgr);
|
||||
|
||||
Self::insert_historic_layer(
|
||||
new_layer.as_ref().clone(),
|
||||
&mut updates,
|
||||
&mut self.layer_fmgr,
|
||||
);
|
||||
|
||||
metrics.record_new_file_metrics(new_layer.layer_desc().file_size);
|
||||
}
|
||||
for l in drop_layers {
|
||||
Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
|
||||
}
|
||||
|
||||
@@ -213,7 +213,10 @@ impl UploadQueue {
|
||||
|
||||
let mut files = HashMap::with_capacity(index_part.layer_metadata.len());
|
||||
for (layer_name, layer_metadata) in &index_part.layer_metadata {
|
||||
files.insert(layer_name.to_owned(), layer_metadata.clone());
|
||||
files.insert(
|
||||
layer_name.to_owned(),
|
||||
LayerFileMetadata::from(layer_metadata),
|
||||
);
|
||||
}
|
||||
|
||||
info!(
|
||||
@@ -319,7 +322,9 @@ impl std::fmt::Display for UploadOp {
|
||||
write!(
|
||||
f,
|
||||
"UploadLayer({}, size={:?}, gen={:?})",
|
||||
layer, metadata.file_size, metadata.generation
|
||||
layer,
|
||||
metadata.file_size(),
|
||||
metadata.generation
|
||||
)
|
||||
}
|
||||
UploadOp::UploadMetadata(_, lsn) => {
|
||||
|
||||
@@ -51,6 +51,7 @@ int flush_every_n_requests = 8;
|
||||
|
||||
int neon_protocol_version = 2;
|
||||
|
||||
static int n_reconnect_attempts = 0;
|
||||
static int max_reconnect_attempts = 60;
|
||||
static int stripe_size;
|
||||
|
||||
@@ -94,44 +95,18 @@ static shmem_startup_hook_type prev_shmem_startup_hook;
|
||||
static PagestoreShmemState *pagestore_shared;
|
||||
static uint64 pagestore_local_counter = 0;
|
||||
|
||||
typedef enum PSConnectionState {
|
||||
PS_Disconnected, /* no connection yet */
|
||||
PS_Connecting_Startup, /* connection starting up */
|
||||
PS_Connecting_PageStream, /* negotiating pagestream */
|
||||
PS_Connected, /* connected, pagestream established */
|
||||
} PSConnectionState;
|
||||
|
||||
/* This backend's per-shard connections */
|
||||
typedef struct
|
||||
{
|
||||
TimestampTz last_connect_time; /* read-only debug value */
|
||||
TimestampTz last_reconnect_time;
|
||||
uint32 delay_us;
|
||||
int n_reconnect_attempts;
|
||||
PGconn *conn;
|
||||
|
||||
/*---
|
||||
* Pageserver connection state, i.e.
|
||||
* disconnected: conn == NULL, wes == NULL;
|
||||
* conn_startup: connection initiated, waiting for connection establishing
|
||||
* conn_ps: PageStream query sent, waiting for confirmation
|
||||
* connected: PageStream established
|
||||
*/
|
||||
PSConnectionState state;
|
||||
PGconn *conn;
|
||||
/*---
|
||||
* WaitEventSet containing:
|
||||
* - WL_SOCKET_READABLE on 'conn'
|
||||
* - WL_LATCH_SET on MyLatch, and
|
||||
* - WL_EXIT_ON_PM_DEATH.
|
||||
* - WL_SOCKET_READABLE on 'conn'
|
||||
* - WL_LATCH_SET on MyLatch, and
|
||||
* - WL_EXIT_ON_PM_DEATH.
|
||||
*/
|
||||
WaitEventSet *wes_read;
|
||||
/*---
|
||||
* WaitEventSet containing:
|
||||
* - WL_SOCKET_WRITABLE on 'conn'
|
||||
* - WL_LATCH_SET on MyLatch, and
|
||||
* - WL_EXIT_ON_PM_DEATH.
|
||||
*/
|
||||
WaitEventSet *wes_write;
|
||||
WaitEventSet *wes;
|
||||
} PageServer;
|
||||
|
||||
static PageServer page_servers[MAX_SHARDS];
|
||||
@@ -328,269 +303,119 @@ get_shard_number(BufferTag *tag)
|
||||
return hash % n_shards;
|
||||
}
|
||||
|
||||
static inline void
|
||||
CLEANUP_AND_DISCONNECT(PageServer *shard)
|
||||
{
|
||||
if (shard->wes_read)
|
||||
{
|
||||
FreeWaitEventSet(shard->wes_read);
|
||||
shard->wes_read = NULL;
|
||||
}
|
||||
if (shard->wes_write)
|
||||
{
|
||||
FreeWaitEventSet(shard->wes_write);
|
||||
shard->wes_write = NULL;
|
||||
}
|
||||
if (shard->conn)
|
||||
{
|
||||
PQfinish(shard->conn);
|
||||
shard->conn = NULL;
|
||||
}
|
||||
|
||||
shard->state = PS_Disconnected;
|
||||
}
|
||||
|
||||
/*
|
||||
* Connect to a pageserver, or continue to try to connect if we're yet to
|
||||
* complete the connection (e.g. due to receiving an earlier cancellation
|
||||
* during connection start).
|
||||
* Returns true if successfully connected; false if the connection failed.
|
||||
*
|
||||
* Throws errors in unrecoverable situations, or when this backend's query
|
||||
* is canceled.
|
||||
*/
|
||||
static bool
|
||||
pageserver_connect(shardno_t shard_no, int elevel)
|
||||
{
|
||||
PageServer *shard = &page_servers[shard_no];
|
||||
char *query;
|
||||
int ret;
|
||||
const char *keywords[3];
|
||||
const char *values[3];
|
||||
int n;
|
||||
PGconn *conn;
|
||||
WaitEventSet *wes;
|
||||
char connstr[MAX_PAGESERVER_CONNSTRING_SIZE];
|
||||
|
||||
static TimestampTz last_connect_time = 0;
|
||||
static uint64_t delay_us = MIN_RECONNECT_INTERVAL_USEC;
|
||||
TimestampTz now;
|
||||
uint64_t us_since_last_connect;
|
||||
bool broke_from_loop = false;
|
||||
|
||||
Assert(page_servers[shard_no].conn == NULL);
|
||||
|
||||
/*
|
||||
* Get the connection string for this shard. If the shard map has been
|
||||
* updated since we last looked, this will also disconnect any existing
|
||||
* pageserver connections as a side effect.
|
||||
* Note that connstr is used both during connection start, and when we
|
||||
* log the successful connection.
|
||||
*/
|
||||
load_shard_map(shard_no, connstr, NULL);
|
||||
|
||||
switch (shard->state)
|
||||
now = GetCurrentTimestamp();
|
||||
us_since_last_connect = now - last_connect_time;
|
||||
if (us_since_last_connect < MAX_RECONNECT_INTERVAL_USEC)
|
||||
{
|
||||
case PS_Disconnected:
|
||||
{
|
||||
const char *keywords[3];
|
||||
const char *values[3];
|
||||
int n_pgsql_params;
|
||||
TimestampTz now;
|
||||
int64 us_since_last_attempt;
|
||||
|
||||
/* Make sure we start with a clean slate */
|
||||
CLEANUP_AND_DISCONNECT(shard);
|
||||
|
||||
neon_shard_log(shard_no, DEBUG5, "Connection state: Disconnected");
|
||||
|
||||
now = GetCurrentTimestamp();
|
||||
us_since_last_attempt = (int64) (now - shard->last_reconnect_time);
|
||||
shard->last_reconnect_time = now;
|
||||
|
||||
/*
|
||||
* If we did other tasks between reconnect attempts, then we won't
|
||||
* need to wait as long as a full delay.
|
||||
*/
|
||||
if (us_since_last_attempt < shard->delay_us)
|
||||
{
|
||||
pg_usleep(shard->delay_us - us_since_last_attempt);
|
||||
}
|
||||
|
||||
/* update the delay metric */
|
||||
shard->delay_us = Min(shard->delay_us * 2, MAX_RECONNECT_INTERVAL_USEC);
|
||||
|
||||
/*
|
||||
* Connect using the connection string we got from the
|
||||
* neon.pageserver_connstring GUC. If the NEON_AUTH_TOKEN environment
|
||||
* variable was set, use that as the password.
|
||||
*
|
||||
* The connection options are parsed in the order they're given, so when
|
||||
* we set the password before the connection string, the connection string
|
||||
* can override the password from the env variable. Seems useful, although
|
||||
* we don't currently use that capability anywhere.
|
||||
*/
|
||||
keywords[0] = "dbname";
|
||||
values[0] = connstr;
|
||||
n_pgsql_params = 1;
|
||||
|
||||
if (neon_auth_token)
|
||||
{
|
||||
keywords[1] = "password";
|
||||
values[1] = neon_auth_token;
|
||||
n_pgsql_params++;
|
||||
}
|
||||
|
||||
keywords[n_pgsql_params] = NULL;
|
||||
values[n_pgsql_params] = NULL;
|
||||
|
||||
shard->conn = PQconnectStartParams(keywords, values, 1);
|
||||
if (!shard->conn)
|
||||
{
|
||||
neon_shard_log(shard_no, elevel, "Failed to connect to pageserver: out of memory");
|
||||
return false;
|
||||
}
|
||||
|
||||
shard->wes_read = CreateWaitEventSet(TopMemoryContext, 3);
|
||||
AddWaitEventToSet(shard->wes_read, WL_LATCH_SET, PGINVALID_SOCKET,
|
||||
MyLatch, NULL);
|
||||
AddWaitEventToSet(shard->wes_read, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
|
||||
NULL, NULL);
|
||||
AddWaitEventToSet(shard->wes_read, WL_SOCKET_READABLE, PQsocket(shard->conn), NULL, NULL);
|
||||
|
||||
shard->wes_write = CreateWaitEventSet(TopMemoryContext, 3);
|
||||
AddWaitEventToSet(shard->wes_write, WL_LATCH_SET, PGINVALID_SOCKET,
|
||||
MyLatch, NULL);
|
||||
AddWaitEventToSet(shard->wes_write, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
|
||||
NULL, NULL);
|
||||
AddWaitEventToSet(shard->wes_write, WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE,
|
||||
PQsocket(shard->conn),
|
||||
NULL, NULL);
|
||||
|
||||
shard->state = PS_Connecting_Startup;
|
||||
/* fallthrough */
|
||||
pg_usleep(delay_us);
|
||||
delay_us *= 2;
|
||||
}
|
||||
case PS_Connecting_Startup:
|
||||
else
|
||||
{
|
||||
char *pagestream_query;
|
||||
int ps_send_query_ret;
|
||||
bool connected = false;
|
||||
delay_us = MIN_RECONNECT_INTERVAL_USEC;
|
||||
}
|
||||
|
||||
neon_shard_log(shard_no, DEBUG5, "Connection state: Connecting_Startup");
|
||||
/*
|
||||
* Connect using the connection string we got from the
|
||||
* neon.pageserver_connstring GUC. If the NEON_AUTH_TOKEN environment
|
||||
* variable was set, use that as the password.
|
||||
*
|
||||
* The connection options are parsed in the order they're given, so when
|
||||
* we set the password before the connection string, the connection string
|
||||
* can override the password from the env variable. Seems useful, although
|
||||
* we don't currently use that capability anywhere.
|
||||
*/
|
||||
n = 0;
|
||||
if (neon_auth_token)
|
||||
{
|
||||
keywords[n] = "password";
|
||||
values[n] = neon_auth_token;
|
||||
n++;
|
||||
}
|
||||
keywords[n] = "dbname";
|
||||
values[n] = connstr;
|
||||
n++;
|
||||
keywords[n] = NULL;
|
||||
values[n] = NULL;
|
||||
n++;
|
||||
conn = PQconnectdbParams(keywords, values, 1);
|
||||
last_connect_time = GetCurrentTimestamp();
|
||||
|
||||
do
|
||||
{
|
||||
WaitEvent event;
|
||||
int poll_result = PQconnectPoll(shard->conn);
|
||||
if (PQstatus(conn) == CONNECTION_BAD)
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(conn));
|
||||
|
||||
switch (poll_result)
|
||||
{
|
||||
default: /* unknown/unused states are handled as a failed connection */
|
||||
case PGRES_POLLING_FAILED:
|
||||
{
|
||||
char *pqerr = PQerrorMessage(shard->conn);
|
||||
char *msg = NULL;
|
||||
neon_shard_log(shard_no, DEBUG5, "POLLING_FAILED");
|
||||
PQfinish(conn);
|
||||
|
||||
if (pqerr)
|
||||
msg = pchomp(pqerr);
|
||||
|
||||
CLEANUP_AND_DISCONNECT(shard);
|
||||
|
||||
if (msg)
|
||||
{
|
||||
neon_shard_log(shard_no, elevel,
|
||||
"could not connect to pageserver: %s",
|
||||
msg);
|
||||
pfree(msg);
|
||||
}
|
||||
else
|
||||
neon_shard_log(shard_no, elevel,
|
||||
"could not connect to pageserver");
|
||||
|
||||
return false;
|
||||
}
|
||||
case PGRES_POLLING_READING:
|
||||
/* Sleep until there's something to do */
|
||||
(void) WaitEventSetWait(shard->wes_read, -1L, &event, 1,
|
||||
PG_WAIT_EXTENSION);
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
/* query cancellation, backend shutdown */
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
/* PQconnectPoll() handles the socket polling state updates */
|
||||
|
||||
break;
|
||||
case PGRES_POLLING_WRITING:
|
||||
/* Sleep until there's something to do */
|
||||
(void) WaitEventSetWait(shard->wes_write, -1L, &event, 1,
|
||||
PG_WAIT_EXTENSION);
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
/* query cancellation, backend shutdown */
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
/* PQconnectPoll() handles the socket polling state updates */
|
||||
|
||||
break;
|
||||
case PGRES_POLLING_OK:
|
||||
neon_shard_log(shard_no, DEBUG5, "POLLING_OK");
|
||||
connected = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
while (!connected);
|
||||
|
||||
/* No more polling needed; connection succeeded */
|
||||
shard->last_connect_time = GetCurrentTimestamp();
|
||||
|
||||
switch (neon_protocol_version)
|
||||
{
|
||||
ereport(elevel,
|
||||
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
|
||||
errmsg(NEON_TAG "[shard %d] could not establish connection to pageserver", shard_no),
|
||||
errdetail_internal("%s", msg)));
|
||||
pfree(msg);
|
||||
return false;
|
||||
}
|
||||
switch (neon_protocol_version)
|
||||
{
|
||||
case 2:
|
||||
pagestream_query = psprintf("pagestream_v2 %s %s", neon_tenant, neon_timeline);
|
||||
query = psprintf("pagestream_v2 %s %s", neon_tenant, neon_timeline);
|
||||
break;
|
||||
case 1:
|
||||
pagestream_query = psprintf("pagestream %s %s", neon_tenant, neon_timeline);
|
||||
query = psprintf("pagestream %s %s", neon_tenant, neon_timeline);
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unexpected neon_protocol_version %d", neon_protocol_version);
|
||||
}
|
||||
|
||||
if (PQstatus(shard->conn) == CONNECTION_BAD)
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(shard->conn));
|
||||
|
||||
CLEANUP_AND_DISCONNECT(shard);
|
||||
|
||||
ereport(elevel,
|
||||
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
|
||||
errmsg(NEON_TAG "[shard %d] could not establish connection to pageserver", shard_no),
|
||||
errdetail_internal("%s", msg)));
|
||||
pfree(msg);
|
||||
return false;
|
||||
}
|
||||
|
||||
ps_send_query_ret = PQsendQuery(shard->conn, pagestream_query);
|
||||
pfree(pagestream_query);
|
||||
if (ps_send_query_ret != 1)
|
||||
{
|
||||
CLEANUP_AND_DISCONNECT(shard);
|
||||
|
||||
neon_shard_log(shard_no, elevel, "could not send pagestream command to pageserver");
|
||||
return false;
|
||||
}
|
||||
|
||||
shard->state = PS_Connecting_PageStream;
|
||||
/* fallthrough */
|
||||
}
|
||||
case PS_Connecting_PageStream:
|
||||
ret = PQsendQuery(conn, query);
|
||||
pfree(query);
|
||||
if (ret != 1)
|
||||
{
|
||||
neon_shard_log(shard_no, DEBUG5, "Connection state: Connecting_PageStream");
|
||||
PQfinish(conn);
|
||||
neon_shard_log(shard_no, elevel, "could not send pagestream command to pageserver");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (PQstatus(shard->conn) == CONNECTION_BAD)
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(shard->conn));
|
||||
CLEANUP_AND_DISCONNECT(shard);
|
||||
ereport(elevel,
|
||||
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
|
||||
errmsg(NEON_TAG "[shard %d] could not establish connection to pageserver", shard_no),
|
||||
errdetail_internal("%s", msg)));
|
||||
pfree(msg);
|
||||
return false;
|
||||
}
|
||||
wes = CreateWaitEventSet(TopMemoryContext, 3);
|
||||
AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET,
|
||||
MyLatch, NULL);
|
||||
AddWaitEventToSet(wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
|
||||
NULL, NULL);
|
||||
AddWaitEventToSet(wes, WL_SOCKET_READABLE, PQsocket(conn), NULL, NULL);
|
||||
|
||||
while (PQisBusy(shard->conn))
|
||||
PG_TRY();
|
||||
{
|
||||
while (PQisBusy(conn))
|
||||
{
|
||||
WaitEvent event;
|
||||
|
||||
/* Sleep until there's something to do */
|
||||
(void) WaitEventSetWait(shard->wes_read, -1L, &event, 1, PG_WAIT_EXTENSION);
|
||||
(void) WaitEventSetWait(wes, -1L, &event, 1, PG_WAIT_EXTENSION);
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
@@ -598,37 +423,40 @@ pageserver_connect(shardno_t shard_no, int elevel)
|
||||
/* Data available in socket? */
|
||||
if (event.events & WL_SOCKET_READABLE)
|
||||
{
|
||||
if (!PQconsumeInput(shard->conn))
|
||||
if (!PQconsumeInput(conn))
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(shard->conn));
|
||||
char *msg = pchomp(PQerrorMessage(conn));
|
||||
|
||||
PQfinish(conn);
|
||||
FreeWaitEventSet(wes);
|
||||
|
||||
CLEANUP_AND_DISCONNECT(shard);
|
||||
neon_shard_log(shard_no, elevel, "could not complete handshake with pageserver: %s",
|
||||
msg);
|
||||
pfree(msg);
|
||||
return false;
|
||||
/* Returning from inside PG_TRY is bad, so we break/return later */
|
||||
broke_from_loop = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
shard->state = PS_Connected;
|
||||
/* fallthrough */
|
||||
}
|
||||
case PS_Connected:
|
||||
/*
|
||||
* We successfully connected. Future connections to this PageServer
|
||||
* will do fast retries again, with exponential backoff.
|
||||
*/
|
||||
shard->delay_us = MIN_RECONNECT_INTERVAL_USEC;
|
||||
|
||||
neon_shard_log(shard_no, DEBUG5, "Connection state: Connected");
|
||||
neon_shard_log(shard_no, LOG, "libpagestore: connected to '%s' with protocol version %d", connstr, neon_protocol_version);
|
||||
return true;
|
||||
default:
|
||||
neon_shard_log(shard_no, ERROR, "libpagestore: invalid connection state %d", shard->state);
|
||||
PG_CATCH();
|
||||
{
|
||||
PQfinish(conn);
|
||||
FreeWaitEventSet(wes);
|
||||
PG_RE_THROW();
|
||||
}
|
||||
/* This shouldn't be hit */
|
||||
Assert(false);
|
||||
PG_END_TRY();
|
||||
|
||||
if (broke_from_loop)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
neon_shard_log(shard_no, LOG, "libpagestore: connected to '%s' with protocol version %d", connstr, neon_protocol_version);
|
||||
page_servers[shard_no].conn = conn;
|
||||
page_servers[shard_no].wes = wes;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -648,7 +476,7 @@ retry:
|
||||
WaitEvent event;
|
||||
|
||||
/* Sleep until there's something to do */
|
||||
(void) WaitEventSetWait(page_servers[shard_no].wes_read, -1L, &event, 1, PG_WAIT_EXTENSION);
|
||||
(void) WaitEventSetWait(page_servers[shard_no].wes, -1L, &event, 1, PG_WAIT_EXTENSION);
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
@@ -674,8 +502,7 @@ retry:
|
||||
|
||||
/*
|
||||
* Reset prefetch and drop connection to the shard.
|
||||
* It also drops connection to all other shards involved in prefetch, through
|
||||
* prefetch_on_ps_disconnect().
|
||||
* It also drops connection to all other shards involved in prefetch.
|
||||
*/
|
||||
static void
|
||||
pageserver_disconnect(shardno_t shard_no)
|
||||
@@ -685,6 +512,9 @@ pageserver_disconnect(shardno_t shard_no)
|
||||
* whole prefetch queue, even for other pageservers. It should not
|
||||
* cause big problems, because connection loss is supposed to be a
|
||||
* rare event.
|
||||
*
|
||||
* Prefetch state should be reset even if page_servers[shard_no].conn == NULL,
|
||||
* because prefetch request may be registered before connection is established.
|
||||
*/
|
||||
prefetch_on_ps_disconnect();
|
||||
|
||||
@@ -697,36 +527,37 @@ pageserver_disconnect(shardno_t shard_no)
|
||||
static void
|
||||
pageserver_disconnect_shard(shardno_t shard_no)
|
||||
{
|
||||
PageServer *shard = &page_servers[shard_no];
|
||||
/*
|
||||
* If anything goes wrong while we were sending a request, it's not clear
|
||||
* what state the connection is in. For example, if we sent the request
|
||||
* but didn't receive a response yet, we might receive the response some
|
||||
* time later after we have already sent a new unrelated request. Close
|
||||
* the connection to avoid getting confused.
|
||||
* Similarly, even when we're in PS_DISCONNECTED, we may have junk to
|
||||
* clean up: It is possible that we encountered an error allocating any
|
||||
* of the wait event sets or the psql connection, or failed when we tried
|
||||
* to attach wait events to the WaitEventSets.
|
||||
*/
|
||||
CLEANUP_AND_DISCONNECT(shard);
|
||||
|
||||
shard->state = PS_Disconnected;
|
||||
if (page_servers[shard_no].conn)
|
||||
{
|
||||
neon_shard_log(shard_no, LOG, "dropping connection to page server due to error");
|
||||
PQfinish(page_servers[shard_no].conn);
|
||||
page_servers[shard_no].conn = NULL;
|
||||
}
|
||||
if (page_servers[shard_no].wes != NULL)
|
||||
{
|
||||
FreeWaitEventSet(page_servers[shard_no].wes);
|
||||
page_servers[shard_no].wes = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static bool
|
||||
pageserver_send(shardno_t shard_no, NeonRequest *request)
|
||||
{
|
||||
StringInfoData req_buff;
|
||||
PageServer *shard = &page_servers[shard_no];
|
||||
PGconn *pageserver_conn;
|
||||
PGconn *pageserver_conn = page_servers[shard_no].conn;
|
||||
|
||||
/* If the connection was lost for some reason, reconnect */
|
||||
if (shard->state == PS_Connected && PQstatus(shard->conn) == CONNECTION_BAD)
|
||||
if (pageserver_conn && PQstatus(pageserver_conn) == CONNECTION_BAD)
|
||||
{
|
||||
neon_shard_log(shard_no, LOG, "pageserver_send disconnect bad connection");
|
||||
pageserver_disconnect(shard_no);
|
||||
pageserver_conn = NULL;
|
||||
}
|
||||
|
||||
req_buff = nm_pack_request(request);
|
||||
@@ -740,19 +571,17 @@ pageserver_send(shardno_t shard_no, NeonRequest *request)
|
||||
* https://github.com/neondatabase/neon/issues/1138 So try to reestablish
|
||||
* connection in case of failure.
|
||||
*/
|
||||
if (shard->state != PS_Connected)
|
||||
if (!page_servers[shard_no].conn)
|
||||
{
|
||||
while (!pageserver_connect(shard_no, shard->n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
|
||||
while (!pageserver_connect(shard_no, n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
|
||||
{
|
||||
HandleMainLoopInterrupts();
|
||||
shard->n_reconnect_attempts += 1;
|
||||
n_reconnect_attempts += 1;
|
||||
}
|
||||
shard->n_reconnect_attempts = 0;
|
||||
} else {
|
||||
Assert(shard->conn != NULL);
|
||||
n_reconnect_attempts = 0;
|
||||
}
|
||||
|
||||
pageserver_conn = shard->conn;
|
||||
pageserver_conn = page_servers[shard_no].conn;
|
||||
|
||||
/*
|
||||
* Send request.
|
||||
@@ -761,17 +590,13 @@ pageserver_send(shardno_t shard_no, NeonRequest *request)
|
||||
* should use async mode and check for interrupts while waiting. In
|
||||
* practice, our requests are small enough to always fit in the output and
|
||||
* TCP buffer.
|
||||
*
|
||||
* Note that this also will fail when the connection is in the
|
||||
* PGRES_POLLING_WRITING state. It's kinda dirty to disconnect at this
|
||||
* point, but on the grand scheme of things it's only a small issue.
|
||||
*/
|
||||
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0)
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||
|
||||
pageserver_disconnect(shard_no);
|
||||
neon_shard_log(shard_no, LOG, "pageserver_send disconnected: failed to send page request (try to reconnect): %s", msg);
|
||||
neon_shard_log(shard_no, LOG, "pageserver_send disconnect because failed to send page request (try to reconnect): %s", msg);
|
||||
pfree(msg);
|
||||
pfree(req_buff.data);
|
||||
return false;
|
||||
@@ -786,7 +611,6 @@ pageserver_send(shardno_t shard_no, NeonRequest *request)
|
||||
neon_shard_log(shard_no, PageStoreTrace, "sent request: %s", msg);
|
||||
pfree(msg);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -795,68 +619,58 @@ pageserver_receive(shardno_t shard_no)
|
||||
{
|
||||
StringInfoData resp_buff;
|
||||
NeonResponse *resp;
|
||||
PageServer *shard = &page_servers[shard_no];
|
||||
PGconn *pageserver_conn = shard->conn;
|
||||
/* read response */
|
||||
int rc;
|
||||
PGconn *pageserver_conn = page_servers[shard_no].conn;
|
||||
|
||||
if (shard->state != PS_Connected)
|
||||
{
|
||||
neon_shard_log(shard_no, LOG,
|
||||
"pageserver_receive: returning NULL for non-connected pageserver connection: 0x%02x",
|
||||
shard->state);
|
||||
if (!pageserver_conn)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
Assert(pageserver_conn);
|
||||
|
||||
rc = call_PQgetCopyData(shard_no, &resp_buff.data);
|
||||
if (rc >= 0)
|
||||
PG_TRY();
|
||||
{
|
||||
/* call_PQgetCopyData handles rc == 0 */
|
||||
Assert(rc > 0);
|
||||
/* read response */
|
||||
int rc;
|
||||
|
||||
PG_TRY();
|
||||
rc = call_PQgetCopyData(shard_no, &resp_buff.data);
|
||||
if (rc >= 0)
|
||||
{
|
||||
resp_buff.len = rc;
|
||||
resp_buff.cursor = 0;
|
||||
resp = nm_unpack_response(&resp_buff);
|
||||
PQfreemem(resp_buff.data);
|
||||
|
||||
if (message_level_is_interesting(PageStoreTrace))
|
||||
{
|
||||
char *msg = nm_to_string((NeonMessage *) resp);
|
||||
|
||||
neon_shard_log(shard_no, PageStoreTrace, "got response: %s", msg);
|
||||
pfree(msg);
|
||||
}
|
||||
}
|
||||
PG_CATCH();
|
||||
else if (rc == -1)
|
||||
{
|
||||
neon_shard_log(shard_no, LOG, "pageserver_receive: disconnect due malformatted response");
|
||||
neon_shard_log(shard_no, LOG, "pageserver_receive disconnect because call_PQgetCopyData returns -1: %s", pchomp(PQerrorMessage(pageserver_conn)));
|
||||
pageserver_disconnect(shard_no);
|
||||
PG_RE_THROW();
|
||||
resp = NULL;
|
||||
}
|
||||
PG_END_TRY();
|
||||
|
||||
if (message_level_is_interesting(PageStoreTrace))
|
||||
else if (rc == -2)
|
||||
{
|
||||
char *msg = nm_to_string((NeonMessage *) resp);
|
||||
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||
|
||||
neon_shard_log(shard_no, PageStoreTrace, "got response: %s", msg);
|
||||
pfree(msg);
|
||||
pageserver_disconnect(shard_no);
|
||||
neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect because could not read COPY data: %s", msg);
|
||||
}
|
||||
else
|
||||
{
|
||||
pageserver_disconnect(shard_no);
|
||||
neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect because unexpected PQgetCopyData return value: %d", rc);
|
||||
}
|
||||
}
|
||||
else if (rc == -1)
|
||||
PG_CATCH();
|
||||
{
|
||||
neon_shard_log(shard_no, LOG, "pageserver_receive disconnect: psql end of copy data: %s", pchomp(PQerrorMessage(pageserver_conn)));
|
||||
neon_shard_log(shard_no, LOG, "pageserver_receive disconnect due to caught exception");
|
||||
pageserver_disconnect(shard_no);
|
||||
resp = NULL;
|
||||
}
|
||||
else if (rc == -2)
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||
|
||||
pageserver_disconnect(shard_no);
|
||||
neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect: could not read COPY data: %s", msg);
|
||||
}
|
||||
else
|
||||
{
|
||||
pageserver_disconnect(shard_no);
|
||||
neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect: unexpected PQgetCopyData return value: %d", rc);
|
||||
PG_RE_THROW();
|
||||
}
|
||||
PG_END_TRY();
|
||||
|
||||
return (NeonResponse *) resp;
|
||||
}
|
||||
@@ -867,7 +681,7 @@ pageserver_flush(shardno_t shard_no)
|
||||
{
|
||||
PGconn *pageserver_conn = page_servers[shard_no].conn;
|
||||
|
||||
if (page_servers[shard_no].state != PS_Connected)
|
||||
if (!pageserver_conn)
|
||||
{
|
||||
neon_shard_log(shard_no, WARNING, "Tried to flush while disconnected");
|
||||
}
|
||||
@@ -883,7 +697,6 @@ pageserver_flush(shardno_t shard_no)
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -1078,7 +891,5 @@ pg_init_libpagestore(void)
|
||||
dbsize_hook = neon_dbsize;
|
||||
}
|
||||
|
||||
memset(page_servers, 0, sizeof(page_servers));
|
||||
|
||||
lfc_init();
|
||||
}
|
||||
|
||||
@@ -94,10 +94,6 @@ static char *hexdump_page(char *page);
|
||||
|
||||
const int SmgrTrace = DEBUG5;
|
||||
|
||||
#define NEON_PANIC_CONNECTION_STATE(shard_no, elvl, message, ...) \
|
||||
neon_shard_log(shard_no, elvl, "Broken connection state: " message, \
|
||||
##__VA_ARGS__)
|
||||
|
||||
page_server_api *page_server;
|
||||
|
||||
/* unlogged relation build states */
|
||||
@@ -530,8 +526,6 @@ prefetch_flush_requests(void)
|
||||
*
|
||||
* NOTE: this function may indirectly update MyPState->pfs_hash; which
|
||||
* invalidates any active pointers into the hash table.
|
||||
* NOTE: callers should make sure they can handle query cancellations in this
|
||||
* function's call path.
|
||||
*/
|
||||
static bool
|
||||
prefetch_wait_for(uint64 ring_index)
|
||||
@@ -567,8 +561,6 @@ prefetch_wait_for(uint64 ring_index)
|
||||
*
|
||||
* NOTE: this function may indirectly update MyPState->pfs_hash; which
|
||||
* invalidates any active pointers into the hash table.
|
||||
*
|
||||
* NOTE: this does IO, and can get canceled out-of-line.
|
||||
*/
|
||||
static bool
|
||||
prefetch_read(PrefetchRequest *slot)
|
||||
@@ -580,14 +572,6 @@ prefetch_read(PrefetchRequest *slot)
|
||||
Assert(slot->response == NULL);
|
||||
Assert(slot->my_ring_index == MyPState->ring_receive);
|
||||
|
||||
if (slot->status != PRFS_REQUESTED ||
|
||||
slot->response != NULL ||
|
||||
slot->my_ring_index != MyPState->ring_receive)
|
||||
neon_shard_log(slot->shard_no, ERROR,
|
||||
"Incorrect prefetch read: status=%d response=%llx my=%llu receive=%llu",
|
||||
slot->status, (size_t) (void *) slot->response,
|
||||
slot->my_ring_index, MyPState->ring_receive);
|
||||
|
||||
old = MemoryContextSwitchTo(MyPState->errctx);
|
||||
response = (NeonResponse *) page_server->receive(slot->shard_no);
|
||||
MemoryContextSwitchTo(old);
|
||||
@@ -605,11 +589,6 @@ prefetch_read(PrefetchRequest *slot)
|
||||
}
|
||||
else
|
||||
{
|
||||
neon_shard_log(slot->shard_no, WARNING,
|
||||
"No response from reading prefetch entry %llu: %u/%u/%u.%u block %u. This can be caused by a concurrent disconnect",
|
||||
slot->my_ring_index,
|
||||
RelFileInfoFmt(BufTagGetNRelFileInfo(slot->buftag)),
|
||||
slot->buftag.forkNum, slot->buftag.blockNum);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -624,7 +603,6 @@ void
|
||||
prefetch_on_ps_disconnect(void)
|
||||
{
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
|
||||
while (MyPState->ring_receive < MyPState->ring_unused)
|
||||
{
|
||||
PrefetchRequest *slot;
|
||||
@@ -647,7 +625,6 @@ prefetch_on_ps_disconnect(void)
|
||||
slot->status = PRFS_TAG_REMAINS;
|
||||
MyPState->n_requests_inflight -= 1;
|
||||
MyPState->ring_receive += 1;
|
||||
|
||||
prefetch_set_unused(ring_index);
|
||||
}
|
||||
}
|
||||
@@ -714,8 +691,6 @@ static void
|
||||
prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns)
|
||||
{
|
||||
bool found;
|
||||
uint64 mySlotNo = slot->my_ring_index;
|
||||
|
||||
NeonGetPageRequest request = {
|
||||
.req.tag = T_NeonGetPageRequest,
|
||||
/* lsn and not_modified_since are filled in below */
|
||||
@@ -724,8 +699,6 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
|
||||
.blkno = slot->buftag.blockNum,
|
||||
};
|
||||
|
||||
Assert(mySlotNo == MyPState->ring_unused);
|
||||
|
||||
if (force_request_lsns)
|
||||
slot->request_lsns = *force_request_lsns;
|
||||
else
|
||||
@@ -738,11 +711,7 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
|
||||
Assert(slot->response == NULL);
|
||||
Assert(slot->my_ring_index == MyPState->ring_unused);
|
||||
|
||||
while (!page_server->send(slot->shard_no, (NeonRequest *) &request))
|
||||
{
|
||||
Assert(mySlotNo == MyPState->ring_unused);
|
||||
/* loop */
|
||||
}
|
||||
while (!page_server->send(slot->shard_no, (NeonRequest *) &request));
|
||||
|
||||
/* update prefetch state */
|
||||
MyPState->n_requests_inflight += 1;
|
||||
@@ -753,6 +722,7 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
|
||||
|
||||
/* update slot state */
|
||||
slot->status = PRFS_REQUESTED;
|
||||
|
||||
prfh_insert(MyPState->prf_hash, slot, &found);
|
||||
Assert(!found);
|
||||
}
|
||||
@@ -924,10 +894,6 @@ Retry:
|
||||
return ring_index;
|
||||
}
|
||||
|
||||
/*
|
||||
* Note: this function can get canceled and use a long jump to the next catch
|
||||
* context. Take care.
|
||||
*/
|
||||
static NeonResponse *
|
||||
page_server_request(void const *req)
|
||||
{
|
||||
@@ -959,38 +925,19 @@ page_server_request(void const *req)
|
||||
* Current sharding model assumes that all metadata is present only at shard 0.
|
||||
* We still need to call get_shard_no() to check if shard map is up-to-date.
|
||||
*/
|
||||
if (((NeonRequest *) req)->tag != T_NeonGetPageRequest ||
|
||||
((NeonGetPageRequest *) req)->forknum != MAIN_FORKNUM)
|
||||
if (((NeonRequest *) req)->tag != T_NeonGetPageRequest || ((NeonGetPageRequest *) req)->forknum != MAIN_FORKNUM)
|
||||
{
|
||||
shard_no = 0;
|
||||
}
|
||||
|
||||
do
|
||||
{
|
||||
PG_TRY();
|
||||
{
|
||||
while (!page_server->send(shard_no, (NeonRequest *) req)
|
||||
|| !page_server->flush(shard_no))
|
||||
{
|
||||
/* do nothing */
|
||||
}
|
||||
consume_prefetch_responses();
|
||||
resp = page_server->receive(shard_no);
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
/*
|
||||
* Cancellation in this code needs to be handled better at some
|
||||
* point, but this currently seems fine for now.
|
||||
*/
|
||||
page_server->disconnect(shard_no);
|
||||
PG_RE_THROW();
|
||||
}
|
||||
PG_END_TRY();
|
||||
|
||||
while (!page_server->send(shard_no, (NeonRequest *) req) || !page_server->flush(shard_no));
|
||||
consume_prefetch_responses();
|
||||
resp = page_server->receive(shard_no);
|
||||
} while (resp == NULL);
|
||||
|
||||
return resp;
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -1958,9 +1905,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
|
||||
break;
|
||||
|
||||
default:
|
||||
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
|
||||
"Expected Exists (0x%02x) or Error (0x%02x) response to ExistsRequest, but got 0x%02x",
|
||||
T_NeonExistsResponse, T_NeonErrorResponse, resp->tag);
|
||||
neon_log(ERROR, "unexpected response from page server with tag 0x%02x in neon_exists", resp->tag);
|
||||
}
|
||||
pfree(resp);
|
||||
return exists;
|
||||
@@ -2412,7 +2357,7 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
/*
|
||||
* Try to find prefetched page in the list of received pages.
|
||||
*/
|
||||
Retry:
|
||||
Retry:
|
||||
entry = prfh_lookup(MyPState->prf_hash, (PrefetchRequest *) &buftag);
|
||||
|
||||
if (entry != NULL)
|
||||
@@ -2498,9 +2443,7 @@ Retry:
|
||||
((NeonErrorResponse *) resp)->message)));
|
||||
break;
|
||||
default:
|
||||
NEON_PANIC_CONNECTION_STATE(slot->shard_no, PANIC,
|
||||
"Expected GetPage (0x%02x) or Error (0x%02x) response to GetPageRequest, but got 0x%02x",
|
||||
T_NeonGetPageResponse, T_NeonErrorResponse, resp->tag);
|
||||
neon_log(ERROR, "unexpected response from page server with tag 0x%02x in neon_read_at_lsn", resp->tag);
|
||||
}
|
||||
|
||||
/* buffer was used, clean up for later reuse */
|
||||
@@ -2771,9 +2714,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
|
||||
break;
|
||||
|
||||
default:
|
||||
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
|
||||
"Expected Nblocks (0x%02x) or Error (0x%02x) response to NblocksRequest, but got 0x%02x",
|
||||
T_NeonNblocksResponse, T_NeonErrorResponse, resp->tag);
|
||||
neon_log(ERROR, "unexpected response from page server with tag 0x%02x in neon_nblocks", resp->tag);
|
||||
}
|
||||
update_cached_relsize(InfoFromSMgrRel(reln), forknum, n_blocks);
|
||||
|
||||
@@ -2826,9 +2767,7 @@ neon_dbsize(Oid dbNode)
|
||||
break;
|
||||
|
||||
default:
|
||||
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
|
||||
"Expected DbSize (0x%02x) or Error (0x%02x) response to DbSizeRequest, but got 0x%02x",
|
||||
T_NeonDbSizeResponse, T_NeonErrorResponse, resp->tag);
|
||||
neon_log(ERROR, "unexpected response from page server with tag 0x%02x in neon_dbsize", resp->tag);
|
||||
}
|
||||
|
||||
neon_log(SmgrTrace, "neon_dbsize: db %u (request LSN %X/%08X): %ld bytes",
|
||||
@@ -3167,9 +3106,7 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf
|
||||
break;
|
||||
|
||||
default:
|
||||
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
|
||||
"Expected GetSlruSegment (0x%02x) or Error (0x%02x) response to GetSlruSegmentRequest, but got 0x%02x",
|
||||
T_NeonGetSlruSegmentResponse, T_NeonErrorResponse, resp->tag);
|
||||
neon_log(ERROR, "unexpected response from page server with tag 0x%02x in neon_read_slru_segment", resp->tag);
|
||||
}
|
||||
pfree(resp);
|
||||
|
||||
|
||||
@@ -82,7 +82,6 @@ thiserror.workspace = true
|
||||
tikv-jemallocator.workspace = true
|
||||
tikv-jemalloc-ctl = { workspace = true, features = ["use_std"] }
|
||||
tokio-postgres.workspace = true
|
||||
tokio-postgres-rustls.workspace = true
|
||||
tokio-rustls.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tokio = { workspace = true, features = ["signal"] }
|
||||
@@ -97,6 +96,8 @@ utils.workspace = true
|
||||
uuid.workspace = true
|
||||
webpki-roots.workspace = true
|
||||
x509-parser.workspace = true
|
||||
native-tls.workspace = true
|
||||
postgres-native-tls.workspace = true
|
||||
postgres-protocol.workspace = true
|
||||
redis.workspace = true
|
||||
|
||||
|
||||
@@ -11,12 +11,10 @@ use crate::{
|
||||
use futures::{FutureExt, TryFutureExt};
|
||||
use itertools::Itertools;
|
||||
use pq_proto::StartupMessageParams;
|
||||
use rustls::{client::danger::ServerCertVerifier, pki_types::InvalidDnsNameError};
|
||||
use std::{io, net::SocketAddr, sync::Arc, time::Duration};
|
||||
use std::{io, net::SocketAddr, time::Duration};
|
||||
use thiserror::Error;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_postgres::tls::MakeTlsConnect;
|
||||
use tokio_postgres_rustls::MakeRustlsConnect;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
const COULD_NOT_CONNECT: &str = "Couldn't connect to compute node";
|
||||
@@ -32,7 +30,7 @@ pub enum ConnectionError {
|
||||
CouldNotConnect(#[from] io::Error),
|
||||
|
||||
#[error("{COULD_NOT_CONNECT}: {0}")]
|
||||
TlsError(#[from] InvalidDnsNameError),
|
||||
TlsError(#[from] native_tls::Error),
|
||||
|
||||
#[error("{COULD_NOT_CONNECT}: {0}")]
|
||||
WakeComputeError(#[from] WakeComputeError),
|
||||
@@ -259,7 +257,7 @@ pub struct PostgresConnection {
|
||||
/// Socket connected to a compute node.
|
||||
pub stream: tokio_postgres::maybe_tls_stream::MaybeTlsStream<
|
||||
tokio::net::TcpStream,
|
||||
tokio_postgres_rustls::RustlsStream<tokio::net::TcpStream>,
|
||||
postgres_native_tls::TlsStream<tokio::net::TcpStream>,
|
||||
>,
|
||||
/// PostgreSQL connection parameters.
|
||||
pub params: std::collections::HashMap<String, String>,
|
||||
@@ -284,24 +282,12 @@ impl ConnCfg {
|
||||
let (socket_addr, stream, host) = self.connect_raw(timeout).await?;
|
||||
drop(pause);
|
||||
|
||||
let client_config = if allow_self_signed_compute {
|
||||
let verifier = Arc::new(AcceptEverythingVerifier) as Arc<dyn ServerCertVerifier>;
|
||||
rustls::ClientConfig::builder()
|
||||
.dangerous()
|
||||
.with_custom_certificate_verifier(verifier)
|
||||
} else {
|
||||
let root_store = rustls::RootCertStore {
|
||||
roots: webpki_roots::TLS_SERVER_ROOTS.to_vec(),
|
||||
};
|
||||
rustls::ClientConfig::builder().with_root_certificates(root_store)
|
||||
};
|
||||
let client_config = client_config.with_no_client_auth();
|
||||
|
||||
let mut mk_tls = tokio_postgres_rustls::MakeRustlsConnect::new(client_config);
|
||||
let tls = <MakeRustlsConnect as MakeTlsConnect<tokio::net::TcpStream>>::make_tls_connect(
|
||||
&mut mk_tls,
|
||||
host,
|
||||
)?;
|
||||
let tls_connector = native_tls::TlsConnector::builder()
|
||||
.danger_accept_invalid_certs(allow_self_signed_compute)
|
||||
.build()
|
||||
.unwrap();
|
||||
let mut mk_tls = postgres_native_tls::MakeTlsConnector::new(tls_connector);
|
||||
let tls = MakeTlsConnect::<tokio::net::TcpStream>::make_tls_connect(&mut mk_tls, host)?;
|
||||
|
||||
// connect_raw() will not use TLS if sslmode is "disable"
|
||||
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Compute);
|
||||
@@ -354,50 +340,6 @@ fn filtered_options(params: &StartupMessageParams) -> Option<String> {
|
||||
Some(options)
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct AcceptEverythingVerifier;
|
||||
impl ServerCertVerifier for AcceptEverythingVerifier {
|
||||
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
|
||||
use rustls::SignatureScheme::*;
|
||||
// The schemes for which `SignatureScheme::supported_in_tls13` returns true.
|
||||
vec![
|
||||
ECDSA_NISTP521_SHA512,
|
||||
ECDSA_NISTP384_SHA384,
|
||||
ECDSA_NISTP256_SHA256,
|
||||
RSA_PSS_SHA512,
|
||||
RSA_PSS_SHA384,
|
||||
RSA_PSS_SHA256,
|
||||
ED25519,
|
||||
]
|
||||
}
|
||||
fn verify_server_cert(
|
||||
&self,
|
||||
_end_entity: &rustls::pki_types::CertificateDer<'_>,
|
||||
_intermediates: &[rustls::pki_types::CertificateDer<'_>],
|
||||
_server_name: &rustls::pki_types::ServerName<'_>,
|
||||
_ocsp_response: &[u8],
|
||||
_now: rustls::pki_types::UnixTime,
|
||||
) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
|
||||
Ok(rustls::client::danger::ServerCertVerified::assertion())
|
||||
}
|
||||
fn verify_tls12_signature(
|
||||
&self,
|
||||
_message: &[u8],
|
||||
_cert: &rustls::pki_types::CertificateDer<'_>,
|
||||
_dss: &rustls::DigitallySignedStruct,
|
||||
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
|
||||
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
|
||||
}
|
||||
fn verify_tls13_signature(
|
||||
&self,
|
||||
_message: &[u8],
|
||||
_cert: &rustls::pki_types::CertificateDer<'_>,
|
||||
_dss: &rustls::DigitallySignedStruct,
|
||||
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
|
||||
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -22,7 +22,8 @@ serde_with.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
utils.workspace = true
|
||||
async-stream.workspace = true
|
||||
tokio-postgres-rustls.workspace = true
|
||||
native-tls.workspace = true
|
||||
postgres-native-tls.workspace = true
|
||||
postgres_ffi.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
tokio-postgres.workspace = true
|
||||
@@ -30,8 +31,6 @@ tokio-util = { workspace = true }
|
||||
futures-util.workspace = true
|
||||
itertools.workspace = true
|
||||
camino.workspace = true
|
||||
rustls.workspace = true
|
||||
webpki-roots.workspace = true
|
||||
|
||||
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
|
||||
chrono = { workspace = true, default-features = false, features = ["clock", "serde"] }
|
||||
|
||||
@@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet};
|
||||
|
||||
use anyhow::Context;
|
||||
use aws_sdk_s3::{types::ObjectIdentifier, Client};
|
||||
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use pageserver::tenant::remote_timeline_client::index::IndexLayerMetadata;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use tracing::{error, info, warn};
|
||||
use utils::generation::Generation;
|
||||
@@ -208,7 +208,7 @@ impl TenantObjectListing {
|
||||
&mut self,
|
||||
timeline_id: TimelineId,
|
||||
layer_file: &LayerName,
|
||||
metadata: &LayerFileMetadata,
|
||||
metadata: &IndexLayerMetadata,
|
||||
) -> bool {
|
||||
let Some(shard_tl) = self.shard_timelines.get_mut(&(metadata.shard, timeline_id)) else {
|
||||
return false;
|
||||
|
||||
@@ -71,13 +71,8 @@ pub async fn scan_safekeeper_metadata(
|
||||
bucket_config.bucket, bucket_config.region, dump_db_table
|
||||
);
|
||||
// Use the native TLS implementation (Neon requires TLS)
|
||||
let root_store = rustls::RootCertStore {
|
||||
roots: webpki_roots::TLS_SERVER_ROOTS.to_vec(),
|
||||
};
|
||||
let client_config = rustls::ClientConfig::builder()
|
||||
.with_root_certificates(root_store)
|
||||
.with_no_client_auth();
|
||||
let tls_connector = tokio_postgres_rustls::MakeRustlsConnect::new(client_config);
|
||||
let tls_connector =
|
||||
postgres_native_tls::MakeTlsConnector::new(native_tls::TlsConnector::new().unwrap());
|
||||
let (client, connection) = tokio_postgres::connect(&dump_db_connstr, tls_connector).await?;
|
||||
// The connection object performs the actual communication with the database,
|
||||
// so spawn it off to run on its own.
|
||||
|
||||
@@ -11,7 +11,7 @@ use async_stream::stream;
|
||||
use aws_sdk_s3::Client;
|
||||
use camino::Utf8PathBuf;
|
||||
use futures::{StreamExt, TryStreamExt};
|
||||
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use pageserver::tenant::remote_timeline_client::index::IndexLayerMetadata;
|
||||
use pageserver::tenant::storage_layer::LayerName;
|
||||
use pageserver::tenant::IndexPart;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
@@ -49,8 +49,8 @@ impl SnapshotDownloader {
|
||||
&self,
|
||||
ttid: TenantShardTimelineId,
|
||||
layer_name: LayerName,
|
||||
layer_metadata: LayerFileMetadata,
|
||||
) -> anyhow::Result<(LayerName, LayerFileMetadata)> {
|
||||
layer_metadata: IndexLayerMetadata,
|
||||
) -> anyhow::Result<(LayerName, IndexLayerMetadata)> {
|
||||
// Note this is local as in a local copy of S3 data, not local as in the pageserver's local format. They use
|
||||
// different layer names (remote-style has the generation suffix)
|
||||
let local_path = self.output_path.join(format!(
|
||||
@@ -110,7 +110,7 @@ impl SnapshotDownloader {
|
||||
async fn download_layers(
|
||||
&self,
|
||||
ttid: TenantShardTimelineId,
|
||||
layers: Vec<(LayerName, LayerFileMetadata)>,
|
||||
layers: Vec<(LayerName, IndexLayerMetadata)>,
|
||||
) -> anyhow::Result<()> {
|
||||
let layer_count = layers.len();
|
||||
tracing::info!("Downloading {} layers for timeline {ttid}...", layer_count);
|
||||
@@ -161,7 +161,10 @@ impl SnapshotDownloader {
|
||||
ttid: TenantShardTimelineId,
|
||||
index_part: Box<IndexPart>,
|
||||
index_part_generation: Generation,
|
||||
ancestor_layers: &mut HashMap<TenantShardTimelineId, HashMap<LayerName, LayerFileMetadata>>,
|
||||
ancestor_layers: &mut HashMap<
|
||||
TenantShardTimelineId,
|
||||
HashMap<LayerName, IndexLayerMetadata>,
|
||||
>,
|
||||
) -> anyhow::Result<()> {
|
||||
let index_bytes = serde_json::to_string(&index_part).unwrap();
|
||||
|
||||
@@ -231,7 +234,7 @@ impl SnapshotDownloader {
|
||||
// happen if this tenant has been split at some point)
|
||||
let mut ancestor_layers: HashMap<
|
||||
TenantShardTimelineId,
|
||||
HashMap<LayerName, LayerFileMetadata>,
|
||||
HashMap<LayerName, IndexLayerMetadata>,
|
||||
> = Default::default();
|
||||
|
||||
for shard in shards.into_iter().filter(|s| s.shard_count == shard_count) {
|
||||
|
||||
@@ -36,7 +36,7 @@ postgres-protocol.workspace = true
|
||||
rand.workspace = true
|
||||
regex.workspace = true
|
||||
scopeguard.workspace = true
|
||||
reqwest = { workspace = true, features = ["rustls-tls", "json"] }
|
||||
reqwest = { workspace = true, features = ["json"] }
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
serde_with.workspace = true
|
||||
|
||||
@@ -31,7 +31,7 @@ once_cell.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
pageserver_client.workspace = true
|
||||
postgres_connection.workspace = true
|
||||
reqwest = { workspace = true, features = ["rustls-tls", "stream"] }
|
||||
reqwest = { workspace = true, features = ["stream"] }
|
||||
routerify.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
|
||||
@@ -2667,9 +2667,7 @@ class NeonPageserver(PgProtocol, LogUtils):
|
||||
tenant_id, generation=self.env.storage_controller.attach_hook_issue(tenant_id, self.id)
|
||||
)
|
||||
|
||||
def list_layers(
|
||||
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId
|
||||
) -> list[Path]:
|
||||
def list_layers(self, tenant_id: TenantId, timeline_id: TimelineId) -> list[Path]:
|
||||
"""
|
||||
Inspect local storage on a pageserver to discover which layer files are present.
|
||||
|
||||
|
||||
@@ -1,282 +0,0 @@
|
||||
from contextlib import closing
|
||||
from typing import Set
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonPageserver
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from psycopg2.errors import QueryCanceled
|
||||
|
||||
CRITICAL_PG_PS_WAIT_FAILPOINTS: Set[str] = {
|
||||
"ps::connection-start::pre-login",
|
||||
"ps::connection-start::startup-packet",
|
||||
"ps::connection-start::process-query",
|
||||
"ps::handle-pagerequest-message::exists",
|
||||
"ps::handle-pagerequest-message::nblocks",
|
||||
"ps::handle-pagerequest-message::getpage",
|
||||
"ps::handle-pagerequest-message::dbsize",
|
||||
# We don't yet have a good way to on-demand guarantee the download of an
|
||||
# SLRU segment, so that's disabled for now.
|
||||
# "ps::handle-pagerequest-message::slrusegment",
|
||||
}
|
||||
|
||||
PG_PS_START_FAILPOINTS = {
|
||||
"ps::connection-start::pre-login",
|
||||
"ps::connection-start::startup-packet",
|
||||
"ps::connection-start::process-query",
|
||||
}
|
||||
SMGR_EXISTS = "ps::handle-pagerequest-message::exists"
|
||||
SMGR_NBLOCKS = "ps::handle-pagerequest-message::nblocks"
|
||||
SMGR_GETPAGE = "ps::handle-pagerequest-message::getpage"
|
||||
SMGR_DBSIZE = "ps::handle-pagerequest-message::dbsize"
|
||||
|
||||
"""
|
||||
Test that we can handle connection delays and cancellations at various
|
||||
unfortunate connection startup and request states.
|
||||
"""
|
||||
|
||||
|
||||
def test_cancellations(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
ps = env.pageserver
|
||||
ps_http = ps.http_client()
|
||||
ps_http.is_testing_enabled_or_skip()
|
||||
|
||||
env.neon_cli.create_branch("test_config", "empty")
|
||||
|
||||
# We don't want to have any racy behaviour with autovacuum IOs
|
||||
ep = env.endpoints.create_start(
|
||||
"test_config",
|
||||
config_lines=[
|
||||
"autovacuum = off",
|
||||
"shared_buffers = 128MB",
|
||||
],
|
||||
)
|
||||
|
||||
with closing(ep.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
CREATE TABLE test1 AS
|
||||
SELECT id, sha256(id::text::bytea) payload
|
||||
FROM generate_series(1, 1024::bigint) p(id);
|
||||
"""
|
||||
)
|
||||
cur.execute(
|
||||
"""
|
||||
CREATE TABLE test2 AS
|
||||
SELECT id, sha256(id::text::bytea) payload
|
||||
FROM generate_series(1025, 2048::bigint) p(id);
|
||||
"""
|
||||
)
|
||||
cur.execute(
|
||||
"""
|
||||
VACUUM (ANALYZE, FREEZE) test1, test2;
|
||||
"""
|
||||
)
|
||||
cur.execute(
|
||||
"""
|
||||
CREATE EXTENSION pg_buffercache;
|
||||
"""
|
||||
)
|
||||
cur.execute(
|
||||
"""
|
||||
CREATE EXTENSION pg_prewarm;
|
||||
"""
|
||||
)
|
||||
|
||||
# data preparation is now complete, with 2 disjoint tables that aren't
|
||||
# preloaded into any caches.
|
||||
|
||||
ep.stop()
|
||||
|
||||
for failpoint in CRITICAL_PG_PS_WAIT_FAILPOINTS:
|
||||
connect_works_correctly(failpoint, ep, ps, ps_http)
|
||||
|
||||
|
||||
ENABLED_FAILPOINTS: Set[str] = set()
|
||||
|
||||
|
||||
def connect_works_correctly(
|
||||
failpoint: str, ep: Endpoint, ps: NeonPageserver, ps_http: PageserverHttpClient
|
||||
):
|
||||
log.debug("Starting work on %s", failpoint)
|
||||
# All queries we use should finish (incl. IO) within 500ms,
|
||||
# including all their IO.
|
||||
# This allows us to use `SET statement_timeout` to let the query
|
||||
# timeout system cancel queries, rather than us having to go
|
||||
# through the most annoying effort of manual query cancellation
|
||||
# in psycopg2.
|
||||
options = "-cstatement_timeout=500ms -ceffective_io_concurrency=1"
|
||||
|
||||
ep.start()
|
||||
|
||||
def fp_enable():
|
||||
global ENABLED_FAILPOINTS
|
||||
ps_http.configure_failpoints(
|
||||
[
|
||||
(failpoint, "pause"),
|
||||
]
|
||||
)
|
||||
ENABLED_FAILPOINTS = ENABLED_FAILPOINTS | {failpoint}
|
||||
log.info(
|
||||
'Enabled failpoint "%s", current_active=%s', failpoint, ENABLED_FAILPOINTS, stacklevel=2
|
||||
)
|
||||
|
||||
def fp_disable():
|
||||
global ENABLED_FAILPOINTS
|
||||
ps_http.configure_failpoints(
|
||||
[
|
||||
(failpoint, "off"),
|
||||
]
|
||||
)
|
||||
ENABLED_FAILPOINTS = ENABLED_FAILPOINTS - {failpoint}
|
||||
log.info(
|
||||
'Disabled failpoint "%s", current_active=%s',
|
||||
failpoint,
|
||||
ENABLED_FAILPOINTS,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
def check_buffers(cur):
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT n.nspname AS nspname
|
||||
, c.relname AS relname
|
||||
, count(*) AS count
|
||||
FROM pg_buffercache b
|
||||
JOIN pg_class c
|
||||
ON b.relfilenode = pg_relation_filenode(c.oid) AND
|
||||
b.reldatabase = (SELECT oid FROM pg_database WHERE datname = current_database())
|
||||
JOIN pg_namespace n ON n.oid = c.relnamespace
|
||||
WHERE c.oid IN ('test1'::regclass::oid, 'test2'::regclass::oid)
|
||||
GROUP BY n.nspname, c.relname
|
||||
ORDER BY 3 DESC
|
||||
LIMIT 10
|
||||
"""
|
||||
)
|
||||
return cur.fetchone()
|
||||
|
||||
def exec_may_cancel(query, cursor, result, cancels):
|
||||
if cancels:
|
||||
with pytest.raises(QueryCanceled):
|
||||
cursor.execute(query)
|
||||
assert cursor.fetchone() == result
|
||||
else:
|
||||
cursor.execute(query)
|
||||
assert cursor.fetchone() == result
|
||||
|
||||
fp_disable()
|
||||
|
||||
# Warm caches required for new connections, so that they can run without
|
||||
# requiring catalog reads.
|
||||
with closing(ep.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT 1;
|
||||
"""
|
||||
)
|
||||
assert cur.fetchone() == (1,)
|
||||
|
||||
assert check_buffers(cur) is None
|
||||
# Ensure all caches required for connection start are correctly
|
||||
# filled, so that we don't have any "accidents" in this test run
|
||||
# caused by changes in connection startup plans that require
|
||||
# requests to the PageServer.
|
||||
cur.execute(
|
||||
"""
|
||||
select array_agg(distinct (pg_prewarm(c.oid::regclass, 'buffer') >= 0))
|
||||
from pg_class c
|
||||
where c.oid < 16384 AND c.relkind IN ('i', 'r');
|
||||
"""
|
||||
)
|
||||
assert cur.fetchone() == ([True],)
|
||||
|
||||
# Enable failpoint
|
||||
fp_enable()
|
||||
|
||||
with closing(ep.connect(options=options, autocommit=True)) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SHOW statement_timeout;")
|
||||
assert cur.fetchone() == ("500ms",)
|
||||
assert check_buffers(cur) is None
|
||||
exec_may_cancel(
|
||||
"""
|
||||
SELECT min(id) FROM test1;
|
||||
""",
|
||||
cur,
|
||||
(1,),
|
||||
failpoint in (CRITICAL_PG_PS_WAIT_FAILPOINTS - {SMGR_EXISTS, SMGR_DBSIZE}),
|
||||
)
|
||||
|
||||
fp_disable()
|
||||
|
||||
with closing(ep.connect(options=options, autocommit=True)) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# Do a select on the data, putting some buffers into the prefetch
|
||||
# queue.
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT count(id) FROM (select * from test1 LIMIT 256) a;
|
||||
"""
|
||||
)
|
||||
assert cur.fetchone() == (256,)
|
||||
|
||||
ps.stop()
|
||||
ps.start()
|
||||
fp_enable()
|
||||
|
||||
exec_may_cancel(
|
||||
"""
|
||||
SELECT COUNT(id) FROM test1;
|
||||
""",
|
||||
cur,
|
||||
(1024,),
|
||||
failpoint
|
||||
in (CRITICAL_PG_PS_WAIT_FAILPOINTS - {SMGR_EXISTS, SMGR_NBLOCKS, SMGR_DBSIZE}),
|
||||
)
|
||||
|
||||
with closing(ep.connect(options=options, autocommit=True)) as conn:
|
||||
with conn.cursor() as cur:
|
||||
exec_may_cancel(
|
||||
"""
|
||||
SELECT COUNT(id) FROM test2;
|
||||
""",
|
||||
cur,
|
||||
(1024,),
|
||||
failpoint in (CRITICAL_PG_PS_WAIT_FAILPOINTS - {SMGR_EXISTS, SMGR_DBSIZE}),
|
||||
)
|
||||
|
||||
fp_disable()
|
||||
fp_enable()
|
||||
|
||||
exec_may_cancel(
|
||||
"""
|
||||
SELECT 0 < pg_database_size(CURRENT_DATABASE());
|
||||
""",
|
||||
cur,
|
||||
(True,),
|
||||
failpoint
|
||||
in (CRITICAL_PG_PS_WAIT_FAILPOINTS - {SMGR_EXISTS, SMGR_GETPAGE, SMGR_NBLOCKS}),
|
||||
)
|
||||
|
||||
fp_disable()
|
||||
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT count(id), count(distinct payload), min(id), max(id), sum(id) FROM test2;
|
||||
"""
|
||||
)
|
||||
|
||||
assert cur.fetchone() == (1024, 1024, 1025, 2048, 1573376)
|
||||
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT count(id), count(distinct payload), min(id), max(id), sum(id) FROM test1;
|
||||
"""
|
||||
)
|
||||
|
||||
assert cur.fetchone() == (1024, 1024, 1, 1024, 524800)
|
||||
|
||||
ep.stop()
|
||||
@@ -177,16 +177,7 @@ def test_sharding_split_unsharded(
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"failpoint",
|
||||
[
|
||||
None,
|
||||
"compact-shard-ancestors-localonly",
|
||||
"compact-shard-ancestors-enqueued",
|
||||
"compact-shard-ancestors-persistent",
|
||||
],
|
||||
)
|
||||
def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder, failpoint: Optional[str]):
|
||||
def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test that after a split, we clean up parent layer data in the child shards via compaction.
|
||||
"""
|
||||
@@ -205,11 +196,6 @@ def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder, failpoint:
|
||||
"image_layer_creation_check_threshold": "0",
|
||||
}
|
||||
|
||||
neon_env_builder.storage_controller_config = {
|
||||
# Default neon_local uses a small timeout: use a longer one to tolerate longer pageserver restarts.
|
||||
"max_unavailable": "300s"
|
||||
}
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
@@ -227,10 +213,6 @@ def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder, failpoint:
|
||||
# Split one shard into two
|
||||
shards = env.storage_controller.tenant_shard_split(tenant_id, shard_count=2)
|
||||
|
||||
# Let all shards move into their stable locations, so that during subsequent steps we
|
||||
# don't have reconciles in progress (simpler to reason about what messages we expect in logs)
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
# Check we got the shard IDs we expected
|
||||
assert env.storage_controller.inspect(TenantShardId(tenant_id, 0, 2)) is not None
|
||||
assert env.storage_controller.inspect(TenantShardId(tenant_id, 1, 2)) is not None
|
||||
@@ -255,90 +237,6 @@ def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder, failpoint:
|
||||
# Compaction shouldn't make anything unreadable
|
||||
workload.validate()
|
||||
|
||||
# Force a generation increase: layer rewrites are a long-term thing and only happen after
|
||||
# the generation has increased.
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
# Cleanup part 2: once layers are outside the PITR window, they will be rewritten if they are partially redundant
|
||||
env.storage_controller.pageserver_api().set_tenant_config(tenant_id, {"pitr_interval": "0s"})
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
for shard in shards:
|
||||
ps = env.get_tenant_pageserver(shard)
|
||||
|
||||
# Apply failpoints for the layer-rewriting phase: this is the area of code that has sensitive behavior
|
||||
# across restarts, as we will have local layer files that temporarily disagree with the remote metadata
|
||||
# for the same local layer file name.
|
||||
if failpoint is not None:
|
||||
ps.http_client().configure_failpoints((failpoint, "exit"))
|
||||
|
||||
# Do a GC to update gc_info (compaction uses this to decide whether a layer is to be rewritten)
|
||||
# Set gc_horizon=0 to let PITR horizon control GC cutoff exclusively.
|
||||
ps.http_client().timeline_gc(shard, timeline_id, gc_horizon=0)
|
||||
|
||||
# We will compare stats before + after compaction
|
||||
detail_before = ps.http_client().timeline_detail(shard, timeline_id)
|
||||
|
||||
# Invoke compaction: this should rewrite layers that are behind the pitr horizon
|
||||
try:
|
||||
ps.http_client().timeline_compact(shard, timeline_id)
|
||||
except requests.ConnectionError as e:
|
||||
if failpoint is None:
|
||||
raise e
|
||||
else:
|
||||
log.info(f"Compaction failed (failpoint={failpoint}): {e}")
|
||||
|
||||
if failpoint in (
|
||||
"compact-shard-ancestors-localonly",
|
||||
"compact-shard-ancestors-enqueued",
|
||||
):
|
||||
# If we left local files that don't match remote metadata, we expect warnings on next startup
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*removing local file .+ because it has unexpected length.*"
|
||||
)
|
||||
|
||||
# Post-failpoint: we check that the pageserver comes back online happily.
|
||||
env.pageserver.running = False
|
||||
env.pageserver.start()
|
||||
else:
|
||||
assert failpoint is None # We shouldn't reach success path if a failpoint was set
|
||||
|
||||
detail_after = ps.http_client().timeline_detail(shard, timeline_id)
|
||||
|
||||
# Physical size should shrink because layers are smaller
|
||||
assert detail_after["current_physical_size"] < detail_before["current_physical_size"]
|
||||
|
||||
# Validate size statistics
|
||||
for shard in shards:
|
||||
ps = env.get_tenant_pageserver(shard)
|
||||
timeline_info = ps.http_client().timeline_detail(shard, timeline_id)
|
||||
reported_size = timeline_info["current_physical_size"]
|
||||
layer_paths = ps.list_layers(shard, timeline_id)
|
||||
measured_size = 0
|
||||
for p in layer_paths:
|
||||
abs_path = ps.timeline_dir(shard, timeline_id) / p
|
||||
measured_size += os.stat(abs_path).st_size
|
||||
|
||||
log.info(
|
||||
f"shard {shard} reported size {reported_size}, measured size {measured_size} ({len(layer_paths)} layers)"
|
||||
)
|
||||
|
||||
if failpoint in (
|
||||
"compact-shard-ancestors-localonly",
|
||||
"compact-shard-ancestors-enqueued",
|
||||
):
|
||||
# If we injected a failure between local rewrite and remote upload, then after
|
||||
# restart we may end up with neither version of the file on local disk (the new file
|
||||
# is cleaned up because it doesn't matchc remote metadata). So local size isn't
|
||||
# necessarily going to match remote physical size.
|
||||
continue
|
||||
|
||||
assert measured_size == reported_size
|
||||
|
||||
# Compaction shouldn't make anything unreadable
|
||||
workload.validate()
|
||||
|
||||
|
||||
def test_sharding_split_smoke(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
|
||||
@@ -194,7 +194,7 @@ files:
|
||||
|
||||
- metric_name: pg_stats_userdb
|
||||
type: gauge
|
||||
help: 'Stats for several oldest non-system dbs'
|
||||
help: 'Stats for the oldest non-system db'
|
||||
key_labels:
|
||||
- datname
|
||||
value_label: kind
|
||||
@@ -205,8 +205,9 @@ files:
|
||||
- inserted
|
||||
- updated
|
||||
- deleted
|
||||
# We export stats for 10 non-system database. Without this limit
|
||||
# We export stats for only one non-system database. Without this limit
|
||||
# it is too easy to abuse the system by creating lots of databases.
|
||||
# We can try lifting this limit in the future after we understand the needs better.
|
||||
query: |
|
||||
select pg_database_size(datname) as db_size, deadlocks,
|
||||
tup_inserted as inserted, tup_updated as updated, tup_deleted as deleted,
|
||||
@@ -217,7 +218,7 @@ files:
|
||||
from pg_database
|
||||
where datname <> 'postgres' and not datistemplate
|
||||
order by oid
|
||||
limit 10
|
||||
limit 1
|
||||
);
|
||||
|
||||
- metric_name: max_cluster_size
|
||||
@@ -319,7 +320,7 @@ files:
|
||||
|
||||
- metric_name: wal_is_lost
|
||||
type: gauge
|
||||
help: 'Whether or not the replication slot wal_status is lost'
|
||||
help: 'Whether or not the replication slot\'s wal_status is lost'
|
||||
key_labels:
|
||||
- slot_name
|
||||
values: [wal_status_is_lost]
|
||||
|
||||
@@ -59,7 +59,7 @@ regex = { version = "1" }
|
||||
regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] }
|
||||
regex-syntax = { version = "0.8" }
|
||||
reqwest-5ef9efb8ec2df382 = { package = "reqwest", version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls", "stream"] }
|
||||
reqwest-a6292c17cd707f01 = { package = "reqwest", version = "0.11", default-features = false, features = ["blocking", "rustls-tls", "stream"] }
|
||||
reqwest-a6292c17cd707f01 = { package = "reqwest", version = "0.11", default-features = false, features = ["blocking", "default-tls", "stream"] }
|
||||
rustls = { version = "0.21", features = ["dangerous_configuration"] }
|
||||
scopeguard = { version = "1" }
|
||||
serde = { version = "1", features = ["alloc", "derive"] }
|
||||
|
||||
Reference in New Issue
Block a user