Compare commits

..

14 Commits

Author SHA1 Message Date
Arpad Müller
85593264b5 Merge branch 'arpad/rustls_everywhere' into arpad/musl_libc 2024-05-25 01:52:16 +02:00
Arpad Müller
5e4111699a wip 2024-05-25 01:52:11 +02:00
Arpad Müller
a356cf2a56 wip 2024-05-25 01:48:49 +02:00
Arpad Müller
4eb9c4eff3 Remove all direct dependencies of native-tls 2024-05-25 00:29:02 +02:00
Arpad Müller
a2026dd5a1 Drop postgres-native-tls in favour of tokio-postgres-rustls 2024-05-25 00:26:51 +02:00
Arpad Müller
a5003bdee9 wip 2024-05-24 22:36:12 +02:00
Alexander Bayandin
71a7fd983e CI(release): tune Storage & Compute release PR title (#7870)
## Problem

A title for automatic proxy release PRs is `Proxy release`, and for
storage & compute, it's just `Release`

## Summary of changes
- Amend PR title for Storage & Compute releases to "Storage & Compute
release"
2024-05-24 14:11:51 +01:00
Joonas Koivunen
a3f5b83677 chore: lower gate guard drop logging threshold to 100ms (#7862)
We have some 1001ms cases, which do not yield gate guard context.
2024-05-24 14:07:58 +01:00
John Spray
1455f5a261 pageserver: revert concurrent secondary downloads, make DownloadStream always yield Err after cancel (#7866)
## Problem

Ongoing hunt for secondary location shutdown hang issues.

## Summary of changes

- Revert the functional changes from #7675 
- Tweak a log in secondary downloads to make it more apparent when we
drop out on cancellation
- Modify DownloadStream's behavior to always return an Err after it has
been cancelled. This _should_ not impact anything, but it makes the
behavior simpler to reason about (e.g. even if the poll function somehow
got called again, it could never end up in an un-cancellable state)

Related #https://github.com/neondatabase/cloud/issues/13576
2024-05-24 11:45:34 +03:00
John Spray
3860bc9c6c pageserver: post-shard-split layer rewrites (2/2) (#7531)
## Problem

- After a shard split of a large existing tenant, child tenants can end
up with oversized historic layers indefinitely, if those layers are
prevented from being GC'd by branchpoints.

This PR follows https://github.com/neondatabase/neon/pull/7531, and adds
rewriting of layers that contain a mixture of needed & un-needed
contents, in addition to dropping un-needed layers.

Closes: https://github.com/neondatabase/neon/issues/7504

## Summary of changes

- Add methods to ImageLayer for reading back existing layers
- Extend `compact_shard_ancestors` to rewrite layer files that contain a
mixture of keys that we want and keys we do not, if unwanted keys are
the majority of those in the file.
- Amend initialization code to handle multiple layers with the same
LayerName properly
- Get rid of of renaming bad layer files to `.old` since that's now
expected on restarts during rewrites.
2024-05-24 08:33:19 +00:00
Roman Zaynetdinov
c1f4028fc0 Export db size metrics for 10 user databases (#7857)
## Problem

One database is too limiting. We have agreed to raise this limit to 10.

## Checklist before requesting a review

- [x] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist
2024-05-24 09:05:20 +01:00
MMeent
0e4f182680 Rework PageStream connection state handling: (#7611)
* Make PS connection startup use async APIs
   This allows for improved query cancellation when we start connections
 * Make PS connections have per-shard connection retry state.
   Previously they shared global backoff state, which is bad for quickly
   getting all connections started and/or back online.
 * Make sure we clean up most connection state on failed connections.
   Previously, we could technically leak some resources that we'd otherwise
   clean up. Now, the resources are correctly cleaned up.
 * pagestore_smgr.c now PANICs on unexpected response message types.
   Unexpected responses are likely a symptom of having a desynchronized
   view of the connection state. As a desynchronized connection state can
   cause corruption, we PANIC, as we don't know what data may have been
   written to buffers: the only solution is to fail fast & hope we didn't
   write wrong data.
 * Catch errors in sync pagestream request handling.
   Previously, if a query was cancelled after a message was sent to
   the pageserver, but before the data was received, the backend
   could forget that it sent the synchronous request, and let others
   deal with the repercussions. This could then lead to incorrect
   responses, or errors such as "unexpected response from page
   server with tag 0x68"
2024-05-23 23:26:42 +02:00
Sasha Krassovsky
ea2e830707 Remove apostrophe (#7868)
## Problem

## Summary of changes

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist
2024-05-23 20:35:59 +00:00
Joonas Koivunen
7cf726e36e refactor(rtc): remove the duplicate IndexLayerMetadata (#7860)
Once upon a time, we used to have duplicated types for runtime IndexPart
and whatever we stored. Because of the serde fixes in #5335 we have no
need for duplicated IndexPart type anymore, but the `IndexLayerMetadata`
stayed.

- remove the type
- remove LayerFileMetadata::file_size() in favor of direct field access

Split off from #7833. Cc: #3072.
2024-05-23 23:24:31 +03:00
45 changed files with 1519 additions and 864 deletions

View File

@@ -53,7 +53,7 @@ jobs:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
run: |
cat << EOF > body.md
## Release ${RELEASE_DATE}
## Storage & Compute release ${RELEASE_DATE}
**Please merge this Pull Request using 'Create a merge commit' button**
EOF

167
Cargo.lock generated
View File

@@ -734,8 +734,6 @@ dependencies = [
[[package]]
name = "azure_core"
version = "0.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70fd680c0d0424a518229b1150922f92653ba2ac933aa000abc8bf1ca08105f7"
dependencies = [
"async-trait",
"base64 0.21.1",
@@ -764,8 +762,6 @@ dependencies = [
[[package]]
name = "azure_identity"
version = "0.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6d2060f5b2e1c664026ca4edd561306c473be887c1f7a81f10bf06f9b71c63f"
dependencies = [
"async-lock",
"async-trait",
@@ -784,8 +780,6 @@ dependencies = [
[[package]]
name = "azure_storage"
version = "0.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15d3da73bfa09350e1bd6ae2a260806fcf90048c7e78cd2d8f88be60b19a7266"
dependencies = [
"RustyXML",
"async-lock",
@@ -803,8 +797,6 @@ dependencies = [
[[package]]
name = "azure_storage_blobs"
version = "0.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "149c21834a4105d761e3dd33d91c2a3064acc05a3c978848ea8089102ae45c94"
dependencies = [
"RustyXML",
"azure_core",
@@ -824,8 +816,6 @@ dependencies = [
[[package]]
name = "azure_svc_blobstorage"
version = "0.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88c888b7bf522d5405218b8613bf0fae7ddaae6ef3bf4ad42ae005993c96ab8b"
dependencies = [
"azure_core",
"bytes",
@@ -1976,21 +1966,6 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "foreign-types"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
dependencies = [
"foreign-types-shared",
]
[[package]]
name = "foreign-types-shared"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
[[package]]
name = "form_urlencoded"
version = "1.1.0"
@@ -2620,19 +2595,6 @@ dependencies = [
"tokio-io-timeout",
]
[[package]]
name = "hyper-tls"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"
dependencies = [
"bytes",
"hyper 0.14.26",
"native-tls",
"tokio",
"tokio-native-tls",
]
[[package]]
name = "hyper-util"
version = "0.1.3"
@@ -3168,24 +3130,6 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]]
name = "native-tls"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e"
dependencies = [
"lazy_static",
"libc",
"log",
"openssl",
"openssl-probe",
"openssl-sys",
"schannel",
"security-framework",
"security-framework-sys",
"tempfile",
]
[[package]]
name = "nix"
version = "0.25.1"
@@ -3414,55 +3358,15 @@ version = "11.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
[[package]]
name = "openssl"
version = "0.10.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79a4c6c3a2b158f7f8f2a2fc5a969fa3a068df6fc9dbb4a43845436e3af7c800"
dependencies = [
"bitflags 2.4.1",
"cfg-if",
"foreign-types",
"libc",
"once_cell",
"openssl-macros",
"openssl-sys",
]
[[package]]
name = "openssl-macros"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.52",
]
[[package]]
name = "openssl-probe"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "openssl-sys"
version = "0.9.96"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3812c071ba60da8b5677cc12bcb1d42989a65553772897a7e0355545a819838f"
dependencies = [
"cc",
"libc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "opentelemetry"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9591d937bc0e6d2feb6f71a559540ab300ea49955229c347a517a28d27784c54"
dependencies = [
"opentelemetry_api",
"opentelemetry_sdk",
@@ -3471,8 +3375,6 @@ dependencies = [
[[package]]
name = "opentelemetry-http"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7594ec0e11d8e33faf03530a4c49af7064ebba81c1480e01be67d90b356508b"
dependencies = [
"async-trait",
"bytes",
@@ -3484,8 +3386,6 @@ dependencies = [
[[package]]
name = "opentelemetry-otlp"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e5e5a5c4135864099f3faafbe939eb4d7f9b80ebf68a8448da961b32a7c1275"
dependencies = [
"async-trait",
"futures-core",
@@ -3505,8 +3405,6 @@ dependencies = [
[[package]]
name = "opentelemetry-proto"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1e3f814aa9f8c905d0ee4bde026afd3b2577a97c10e1699912e3e44f0c4cbeb"
dependencies = [
"opentelemetry_api",
"opentelemetry_sdk",
@@ -3517,8 +3415,6 @@ dependencies = [
[[package]]
name = "opentelemetry-semantic-conventions"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73c9f9340ad135068800e7f1b24e9e09ed9e7143f5bf8518ded3d3ec69789269"
dependencies = [
"opentelemetry",
]
@@ -3526,8 +3422,6 @@ dependencies = [
[[package]]
name = "opentelemetry_api"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a81f725323db1b1206ca3da8bb19874bbd3f57c3bcd59471bfb04525b265b9b"
dependencies = [
"futures-channel",
"futures-util",
@@ -3542,8 +3436,6 @@ dependencies = [
[[package]]
name = "opentelemetry_sdk"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa8e705a0612d48139799fcbaba0d4a90f06277153e43dd2bdc16c6f0edd8026"
dependencies = [
"async-trait",
"crossbeam-channel",
@@ -4105,17 +3997,6 @@ dependencies = [
"tokio-postgres",
]
[[package]]
name = "postgres-native-tls"
version = "0.5.0"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
dependencies = [
"native-tls",
"tokio",
"tokio-native-tls",
"tokio-postgres",
]
[[package]]
name = "postgres-protocol"
version = "0.6.4"
@@ -4423,7 +4304,6 @@ dependencies = [
"md5",
"measured",
"metrics",
"native-tls",
"once_cell",
"opentelemetry",
"parking_lot 0.12.1",
@@ -4431,7 +4311,6 @@ dependencies = [
"parquet_derive",
"pbkdf2",
"pin-project-lite",
"postgres-native-tls",
"postgres-protocol",
"postgres_backend",
"pq_proto",
@@ -4479,7 +4358,7 @@ dependencies = [
"utils",
"uuid",
"walkdir",
"webpki-roots 0.25.2",
"webpki-roots 0.26.1",
"workspace_hack",
"x509-parser",
]
@@ -4786,20 +4665,21 @@ dependencies = [
"http 0.2.9",
"http-body 0.4.5",
"hyper 0.14.26",
"hyper-tls",
"hyper-rustls 0.24.0",
"ipnet",
"js-sys",
"log",
"mime",
"native-tls",
"once_cell",
"percent-encoding",
"pin-project-lite",
"rustls 0.21.11",
"rustls-pemfile 1.0.2",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-native-tls",
"tokio-rustls 0.24.0",
"tokio-util",
"tower-service",
"url",
@@ -4807,6 +4687,7 @@ dependencies = [
"wasm-bindgen-futures",
"wasm-streams 0.3.0",
"web-sys",
"webpki-roots 0.25.2",
"winreg 0.50.0",
]
@@ -5232,20 +5113,20 @@ dependencies = [
"hex",
"histogram",
"itertools",
"native-tls",
"pageserver",
"pageserver_api",
"postgres-native-tls",
"postgres_ffi",
"rand 0.8.5",
"remote_storage",
"reqwest 0.12.4",
"rustls 0.22.4",
"serde",
"serde_json",
"serde_with",
"thiserror",
"tokio",
"tokio-postgres",
"tokio-postgres-rustls",
"tokio-rustls 0.25.0",
"tokio-stream",
"tokio-util",
@@ -5253,6 +5134,7 @@ dependencies = [
"tracing-appender",
"tracing-subscriber",
"utils",
"webpki-roots 0.26.1",
"workspace_hack",
]
@@ -5843,6 +5725,15 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "statx-sys"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69c325f46f705b7a66fb87f0ebb999524a7363f30f05d373277b4ef7f409fe87"
dependencies = [
"libc",
]
[[package]]
name = "storage_broker"
version = "0.1.0"
@@ -6266,7 +6157,7 @@ dependencies = [
[[package]]
name = "tokio-epoll-uring"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#342ddd197a060a8354e8f11f4d12994419fff939"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=arpad/statx_sys#ca8446b8edb5e0aef88520f2fc209a13a834fd25"
dependencies = [
"futures",
"nix 0.26.4",
@@ -6300,16 +6191,6 @@ dependencies = [
"syn 2.0.52",
]
[[package]]
name = "tokio-native-tls"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2"
dependencies = [
"native-tls",
"tokio",
]
[[package]]
name = "tokio-postgres"
version = "0.7.7"
@@ -6336,10 +6217,7 @@ dependencies = [
[[package]]
name = "tokio-postgres-rustls"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ea13f22eda7127c827983bdaf0d7fff9df21c8817bab02815ac277a21143677"
dependencies = [
"futures",
"ring 0.17.6",
"rustls 0.22.4",
"tokio",
@@ -6797,11 +6675,12 @@ dependencies = [
[[package]]
name = "uring-common"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#342ddd197a060a8354e8f11f4d12994419fff939"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=arpad/statx_sys#ca8446b8edb5e0aef88520f2fc209a13a834fd25"
dependencies = [
"bytes",
"io-uring",
"libc",
"statx-sys",
]
[[package]]
@@ -7629,9 +7508,9 @@ dependencies = [
[[package]]
name = "zeroize"
version = "1.6.0"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9"
checksum = "525b4ec142c6b68a2d10f01f7bbf6755599ca3f81ea53b8431b7dd348f5fdb2d"
dependencies = [
"zeroize_derive",
]

View File

@@ -46,10 +46,10 @@ anyhow = { version = "1.0", features = ["backtrace"] }
arc-swap = "1.6"
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
atomic-take = "1.1.0"
azure_core = "0.19"
azure_identity = "0.19"
azure_storage = "0.19"
azure_storage_blobs = "0.19"
azure_core = { path = "../azure-sdk-for-rust/sdk/core" } # "0.19"
azure_identity = { path = "../azure-sdk-for-rust/sdk/identity" } # "0.19"
azure_storage = { path = "../azure-sdk-for-rust/sdk/storage" } # "0.19"
azure_storage_blobs = { path = "../azure-sdk-for-rust/sdk/storage_blobs" } # "0.19"
flate2 = "1.0.26"
async-stream = "0.3"
async-trait = "0.1"
@@ -114,14 +114,13 @@ md5 = "0.7.0"
measured = { version = "0.0.21", features=["lasso"] }
measured-process = { version = "0.0.21" }
memoffset = "0.8"
native-tls = "0.2"
nix = { version = "0.27", features = ["fs", "process", "socket", "signal", "poll"] }
notify = "6.0.0"
num_cpus = "1.15"
num-traits = "0.2.15"
once_cell = "1.13"
opentelemetry = "0.20.0"
opentelemetry-otlp = { version = "0.13.0", default_features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
opentelemetry-otlp = { path="../opentelemetry-rust/opentelemetry-otlp", default_features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
opentelemetry-semantic-conventions = "0.12.0"
parking_lot = "0.12"
parquet = { version = "51.0.0", default-features = false, features = ["zstd"] }
@@ -171,9 +170,9 @@ thiserror = "1.0"
tikv-jemallocator = "0.5"
tikv-jemalloc-ctl = "0.5"
tokio = { version = "1.17", features = ["macros"] }
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "arpad/statx_sys" }
tokio-io-timeout = "1.2.0"
tokio-postgres-rustls = "0.11.0"
#tokio-postgres-rustls = "0.11.0"
tokio-rustls = "0.25"
tokio-stream = "0.1"
tokio-tar = "0.3"
@@ -191,7 +190,7 @@ url = "2.2"
urlencoding = "2.1"
uuid = { version = "1.6.1", features = ["v4", "v7", "serde"] }
walkdir = "2.3.2"
webpki-roots = "0.25"
webpki-roots = "0.26"
x509-parser = "0.15"
## TODO replace this with tracing
@@ -200,7 +199,7 @@ log = "0.4"
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
tokio-postgres-rustls = {path = "../tokio-postgres-rustls"}
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
@@ -241,6 +240,11 @@ tonic-build = "0.9"
[patch.crates-io]
opentelemetry_api = { path = "../opentelemetry-rust/opentelemetry-api" }
opentelemetry_sdk = { path = "../opentelemetry-rust/opentelemetry-sdk" }
opentelemetry-semantic-conventions = { path = "../opentelemetry-rust/opentelemetry-semantic-conventions" }
opentelemetry = { path = "../opentelemetry-rust/opentelemetry" }
# This is only needed for proxy's tests.
# TODO: we should probably fork `tokio-postgres-rustls` instead.
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }

View File

@@ -23,7 +23,7 @@ serde.workspace = true
serde_json.workspace = true
signal-hook.workspace = true
tar.workspace = true
reqwest = { workspace = true, features = ["json"] }
reqwest = { workspace = true, features = ["json", "rustls-tls"] }
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
tokio-postgres.workspace = true
tokio-util.workspace = true

View File

@@ -20,7 +20,7 @@ hex.workspace = true
humantime-serde.workspace = true
hyper.workspace = true
regex.workspace = true
reqwest = { workspace = true, features = ["blocking", "json"] }
reqwest = { workspace = true, features = ["blocking", "json", "rustls-tls"] }
scopeguard.workspace = true
serde.workspace = true
serde_json.workspace = true

View File

@@ -12,7 +12,7 @@ comfy-table.workspace = true
hyper.workspace = true
pageserver_api.workspace = true
pageserver_client.workspace = true
reqwest.workspace = true
reqwest = { workspace = true }
serde.workspace = true
serde_json = { workspace = true, features = ["raw_value"] }
thiserror.workspace = true

View File

@@ -29,7 +29,6 @@ use http_types::{StatusCode, Url};
use tokio_util::sync::CancellationToken;
use tracing::debug;
use crate::RemoteStorageActivity;
use crate::{
error::Cancelled, s3_bucket::RequestKind, AzureConfig, ConcurrencyLimiter, Download,
DownloadError, Listing, ListingMode, RemotePath, RemoteStorage, StorageMetadata,
@@ -526,10 +525,6 @@ impl RemoteStorage for AzureBlobStorage {
// https://learn.microsoft.com/en-us/azure/storage/blobs/point-in-time-restore-overview
Err(TimeTravelError::Unimplemented)
}
fn activity(&self) -> RemoteStorageActivity {
self.concurrency_limiter.activity()
}
}
pin_project_lite::pin_project! {

View File

@@ -263,17 +263,6 @@ pub trait RemoteStorage: Send + Sync + 'static {
done_if_after: SystemTime,
cancel: &CancellationToken,
) -> Result<(), TimeTravelError>;
/// Query how busy we currently are: may be used by callers which wish to politely
/// back off if there are already a lot of operations underway.
fn activity(&self) -> RemoteStorageActivity;
}
pub struct RemoteStorageActivity {
pub read_available: usize,
pub read_total: usize,
pub write_available: usize,
pub write_total: usize,
}
/// DownloadStream is sensitive to the timeout and cancellation used with the original
@@ -455,15 +444,6 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
}
}
}
pub fn activity(&self) -> RemoteStorageActivity {
match self {
Self::LocalFs(s) => s.activity(),
Self::AwsS3(s) => s.activity(),
Self::AzureBlob(s) => s.activity(),
Self::Unreliable(s) => s.activity(),
}
}
}
impl GenericRemoteStorage {
@@ -794,9 +774,6 @@ struct ConcurrencyLimiter {
// The helps to ensure we don't exceed the thresholds.
write: Arc<Semaphore>,
read: Arc<Semaphore>,
write_total: usize,
read_total: usize,
}
impl ConcurrencyLimiter {
@@ -825,21 +802,10 @@ impl ConcurrencyLimiter {
Arc::clone(self.for_kind(kind)).acquire_owned().await
}
fn activity(&self) -> RemoteStorageActivity {
RemoteStorageActivity {
read_available: self.read.available_permits(),
read_total: self.read_total,
write_available: self.write.available_permits(),
write_total: self.write_total,
}
}
fn new(limit: usize) -> ConcurrencyLimiter {
Self {
read: Arc::new(Semaphore::new(limit)),
write: Arc::new(Semaphore::new(limit)),
read_total: limit,
write_total: limit,
}
}
}

View File

@@ -23,8 +23,8 @@ use tokio_util::{io::ReaderStream, sync::CancellationToken};
use utils::crashsafe::path_with_suffix_extension;
use crate::{
Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorageActivity,
TimeTravelError, TimeoutOrCancel, REMOTE_STORAGE_PREFIX_SEPARATOR,
Download, DownloadError, Listing, ListingMode, RemotePath, TimeTravelError, TimeoutOrCancel,
REMOTE_STORAGE_PREFIX_SEPARATOR,
};
use super::{RemoteStorage, StorageMetadata};
@@ -605,16 +605,6 @@ impl RemoteStorage for LocalFs {
) -> Result<(), TimeTravelError> {
Err(TimeTravelError::Unimplemented)
}
fn activity(&self) -> RemoteStorageActivity {
// LocalFS has no concurrency limiting: give callers the impression that plenty of units are available
RemoteStorageActivity {
read_available: 16,
read_total: 16,
write_available: 16,
write_total: 16,
}
}
}
fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {

View File

@@ -47,8 +47,8 @@ use utils::backoff;
use super::StorageMetadata;
use crate::{
error::Cancelled, support::PermitCarrying, ConcurrencyLimiter, Download, DownloadError,
Listing, ListingMode, RemotePath, RemoteStorage, RemoteStorageActivity, S3Config,
TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
Listing, ListingMode, RemotePath, RemoteStorage, S3Config, TimeTravelError, TimeoutOrCancel,
MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
};
pub(super) mod metrics;
@@ -975,10 +975,6 @@ impl RemoteStorage for S3Bucket {
}
Ok(())
}
fn activity(&self) -> RemoteStorageActivity {
self.concurrency_limiter.activity()
}
}
/// On drop (cancellation) count towards [`metrics::BucketMetrics::cancelled_waits`].

View File

@@ -12,7 +12,7 @@ use tokio_util::sync::CancellationToken;
use crate::{
Download, DownloadError, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorage,
RemoteStorageActivity, StorageMetadata, TimeTravelError,
StorageMetadata, TimeTravelError,
};
pub struct UnreliableWrapper {
@@ -213,8 +213,4 @@ impl RemoteStorage for UnreliableWrapper {
.time_travel_recover(prefix, timestamp, done_if_after, cancel)
.await
}
fn activity(&self) -> RemoteStorageActivity {
self.inner.activity()
}
}

View File

@@ -135,7 +135,8 @@ impl Gate {
let started_at = std::time::Instant::now();
let mut do_close = std::pin::pin!(self.do_close());
let nag_after = Duration::from_secs(1);
// with 1s we rarely saw anything, let's try if we get more gate closing reasons with 100ms
let nag_after = Duration::from_millis(100);
let Err(_timeout) = tokio::time::timeout(nag_after, &mut do_close).await else {
return;

View File

@@ -84,7 +84,7 @@ storage_broker.workspace = true
tenant_size_model.workspace = true
utils.workspace = true
workspace_hack.workspace = true
reqwest.workspace = true
reqwest = { workspace = true }
rpds.workspace = true
enum-map.workspace = true
enumset = { workspace = true, features = ["serde"]}

View File

@@ -8,7 +8,7 @@ license.workspace = true
pageserver_api.workspace = true
thiserror.workspace = true
async-trait.workspace = true
reqwest.workspace = true
reqwest = { workspace = true }
utils.workspace = true
serde.workspace = true
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -380,8 +380,8 @@ impl interface::CompactionLayer<Key> for MockLayer {
}
fn file_size(&self) -> u64 {
match self {
MockLayer::Delta(this) => this.file_size(),
MockLayer::Image(this) => this.file_size(),
MockLayer::Delta(this) => this.file_size,
MockLayer::Image(this) => this.file_size,
}
}
fn short_id(&self) -> String {

View File

@@ -2,7 +2,7 @@ use std::collections::HashMap;
use anyhow::Context;
use camino::Utf8PathBuf;
use pageserver::tenant::remote_timeline_client::index::IndexLayerMetadata;
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
use pageserver::tenant::storage_layer::LayerName;
use pageserver::tenant::{metadata::TimelineMetadata, IndexPart};
use utils::lsn::Lsn;
@@ -19,7 +19,7 @@ pub(crate) async fn main(cmd: &IndexPartCmd) -> anyhow::Result<()> {
let des: IndexPart = IndexPart::from_s3_bytes(&bytes).context("deserialize")?;
#[derive(serde::Serialize)]
struct Output<'a> {
layer_metadata: &'a HashMap<LayerName, IndexLayerMetadata>,
layer_metadata: &'a HashMap<LayerName, LayerFileMetadata>,
disk_consistent_lsn: Lsn,
timeline_metadata: &'a TimelineMetadata,
}

View File

@@ -534,7 +534,7 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
});
}
EvictionLayer::Secondary(layer) => {
let file_size = layer.metadata.file_size();
let file_size = layer.metadata.file_size;
js.spawn(async move {
layer
@@ -641,7 +641,7 @@ impl EvictionLayer {
pub(crate) fn get_file_size(&self) -> u64 {
match self {
Self::Attached(l) => l.layer_desc().file_size,
Self::Secondary(sl) => sl.metadata.file_size(),
Self::Secondary(sl) => sl.metadata.file_size,
}
}
}

View File

@@ -260,6 +260,8 @@ async fn page_service_conn_main(
socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
let socket = std::pin::pin!(socket);
fail::fail_point!("ps::connection-start::pre-login");
// XXX: pgbackend.run() should take the connection_ctx,
// and create a child per-query context when it invokes process_query.
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
@@ -603,6 +605,7 @@ impl PageServerHandler {
};
trace!("query: {copy_data_bytes:?}");
fail::fail_point!("ps::handle-pagerequest-message");
// Trace request if needed
if let Some(t) = tracer.as_mut() {
@@ -617,6 +620,7 @@ impl PageServerHandler {
let (response, span) = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {
fail::fail_point!("ps::handle-pagerequest-message::exists");
let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn);
(
self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
@@ -626,6 +630,7 @@ impl PageServerHandler {
)
}
PagestreamFeMessage::Nblocks(req) => {
fail::fail_point!("ps::handle-pagerequest-message::nblocks");
let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn);
(
self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
@@ -635,6 +640,7 @@ impl PageServerHandler {
)
}
PagestreamFeMessage::GetPage(req) => {
fail::fail_point!("ps::handle-pagerequest-message::getpage");
// shard_id is filled in by the handler
let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn);
(
@@ -645,6 +651,7 @@ impl PageServerHandler {
)
}
PagestreamFeMessage::DbSize(req) => {
fail::fail_point!("ps::handle-pagerequest-message::dbsize");
let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn);
(
self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
@@ -654,6 +661,7 @@ impl PageServerHandler {
)
}
PagestreamFeMessage::GetSlruSegment(req) => {
fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn);
(
self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx)
@@ -1505,6 +1513,7 @@ where
_pgb: &mut PostgresBackend<IO>,
_sm: &FeStartupPacket,
) -> Result<(), QueryError> {
fail::fail_point!("ps::connection-start::startup-packet");
Ok(())
}
@@ -1519,6 +1528,8 @@ where
Err(QueryError::SimulatedConnectionError)
});
fail::fail_point!("ps::connection-start::process-query");
let ctx = self.connection_ctx.attached_child();
debug!("process query {query_string:?}");
let parts = query_string.split_whitespace().collect::<Vec<_>>();

View File

@@ -1192,7 +1192,7 @@ impl RemoteTimelineClient {
&self.storage_impl,
uploaded.local_path(),
&remote_path,
uploaded.metadata().file_size(),
uploaded.metadata().file_size,
cancel,
)
.await
@@ -1573,7 +1573,7 @@ impl RemoteTimelineClient {
&self.storage_impl,
local_path,
&remote_path,
layer_metadata.file_size(),
layer_metadata.file_size,
&self.cancel,
)
.measure_remote_op(
@@ -1768,7 +1768,7 @@ impl RemoteTimelineClient {
UploadOp::UploadLayer(_, m) => (
RemoteOpFileKind::Layer,
RemoteOpKind::Upload,
RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size()),
RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size),
),
UploadOp::UploadMetadata(_, _) => (
RemoteOpFileKind::Index,

View File

@@ -84,7 +84,7 @@ pub async fn download_layer_file<'a>(
)
.await?;
let expected = layer_metadata.file_size();
let expected = layer_metadata.file_size;
if expected != bytes_amount {
return Err(DownloadError::Other(anyhow!(
"According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}",

View File

@@ -17,46 +17,6 @@ use pageserver_api::shard::ShardIndex;
use utils::lsn::Lsn;
/// Metadata gathered for each of the layer files.
///
/// Fields have to be `Option`s because remote [`IndexPart`]'s can be from different version, which
/// might have less or more metadata depending if upgrading or rolling back an upgrade.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
//#[cfg_attr(test, derive(Default))]
pub struct LayerFileMetadata {
file_size: u64,
pub(crate) generation: Generation,
pub(crate) shard: ShardIndex,
}
impl From<&'_ IndexLayerMetadata> for LayerFileMetadata {
fn from(other: &IndexLayerMetadata) -> Self {
LayerFileMetadata {
file_size: other.file_size,
generation: other.generation,
shard: other.shard,
}
}
}
impl LayerFileMetadata {
pub fn new(file_size: u64, generation: Generation, shard: ShardIndex) -> Self {
LayerFileMetadata {
file_size,
generation,
shard,
}
}
pub fn file_size(&self) -> u64 {
self.file_size
}
}
// TODO seems like another part of the remote storage file format
// compatibility issue, see https://github.com/neondatabase/neon/issues/3072
/// In-memory representation of an `index_part.json` file
///
/// Contains the data about all files in the timeline, present remotely and its metadata.
@@ -77,7 +37,7 @@ pub struct IndexPart {
///
/// Older versions of `IndexPart` will not have this property or have only a part of metadata
/// that latest version stores.
pub layer_metadata: HashMap<LayerName, IndexLayerMetadata>,
pub layer_metadata: HashMap<LayerName, LayerFileMetadata>,
// 'disk_consistent_lsn' is a copy of the 'disk_consistent_lsn' in the metadata.
// It's duplicated for convenience when reading the serialized structure, but is
@@ -127,10 +87,7 @@ impl IndexPart {
lineage: Lineage,
last_aux_file_policy: Option<AuxFilePolicy>,
) -> Self {
let layer_metadata = layers_and_metadata
.iter()
.map(|(k, v)| (k.to_owned(), IndexLayerMetadata::from(v)))
.collect();
let layer_metadata = layers_and_metadata.clone();
Self {
version: Self::LATEST_VERSION,
@@ -194,9 +151,12 @@ impl From<&UploadQueueInitialized> for IndexPart {
}
}
/// Serialized form of [`LayerFileMetadata`].
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct IndexLayerMetadata {
/// Metadata gathered for each of the layer files.
///
/// Fields have to be `Option`s because remote [`IndexPart`]'s can be from different version, which
/// might have less or more metadata depending if upgrading or rolling back an upgrade.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct LayerFileMetadata {
pub file_size: u64,
#[serde(default = "Generation::none")]
@@ -208,12 +168,12 @@ pub struct IndexLayerMetadata {
pub shard: ShardIndex,
}
impl From<&LayerFileMetadata> for IndexLayerMetadata {
fn from(other: &LayerFileMetadata) -> Self {
IndexLayerMetadata {
file_size: other.file_size,
generation: other.generation,
shard: other.shard,
impl LayerFileMetadata {
pub fn new(file_size: u64, generation: Generation, shard: ShardIndex) -> Self {
LayerFileMetadata {
file_size,
generation,
shard,
}
}
}
@@ -307,12 +267,12 @@ mod tests {
// note this is not verified, could be anything, but exists for humans debugging.. could be the git version instead?
version: 1,
layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
// serde_json should always parse this but this might be a double with jq for
// example.
file_size: 9007199254741001,
@@ -349,12 +309,12 @@ mod tests {
// note this is not verified, could be anything, but exists for humans debugging.. could be the git version instead?
version: 1,
layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
// serde_json should always parse this but this might be a double with jq for
// example.
file_size: 9007199254741001,
@@ -392,12 +352,12 @@ mod tests {
// note this is not verified, could be anything, but exists for humans debugging.. could be the git version instead?
version: 2,
layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
// serde_json should always parse this but this might be a double with jq for
// example.
file_size: 9007199254741001,
@@ -480,12 +440,12 @@ mod tests {
let expected = IndexPart {
version: 4,
layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
// serde_json should always parse this but this might be a double with jq for
// example.
file_size: 9007199254741001,
@@ -522,12 +482,12 @@ mod tests {
let expected = IndexPart {
version: 5,
layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000014EF420-00000000014EF499".parse().unwrap(), IndexLayerMetadata {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000014EF420-00000000014EF499".parse().unwrap(), LayerFileMetadata {
file_size: 23289856,
generation: Generation::new(1),
shard: ShardIndex::unsharded(),
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000014EF499-00000000015A7619".parse().unwrap(), IndexLayerMetadata {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000014EF499-00000000015A7619".parse().unwrap(), LayerFileMetadata {
file_size: 1015808,
generation: Generation::new(1),
shard: ShardIndex::unsharded(),
@@ -569,12 +529,12 @@ mod tests {
let expected = IndexPart {
version: 6,
layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
// serde_json should always parse this but this might be a double with jq for
// example.
file_size: 9007199254741001,

View File

@@ -45,10 +45,10 @@ use crate::tenant::{
use camino::Utf8PathBuf;
use chrono::format::{DelayedFormat, StrftimeItems};
use futures::{Future, StreamExt};
use futures::Future;
use pageserver_api::models::SecondaryProgress;
use pageserver_api::shard::TenantShardId;
use remote_storage::{DownloadError, Etag, GenericRemoteStorage, RemoteStorageActivity};
use remote_storage::{DownloadError, Etag, GenericRemoteStorage};
use tokio_util::sync::CancellationToken;
use tracing::{info_span, instrument, warn, Instrument};
@@ -67,12 +67,6 @@ use super::{
/// download, if the uploader populated it.
const DEFAULT_DOWNLOAD_INTERVAL: Duration = Duration::from_millis(60000);
/// Range of concurrency we may use when downloading layers within a timeline. This is independent
/// for each tenant we're downloading: the concurrency of _tenants_ is defined separately in
/// `PageServerConf::secondary_download_concurrency`
const MAX_LAYER_CONCURRENCY: usize = 16;
const MIN_LAYER_CONCURRENCY: usize = 1;
pub(super) async fn downloader_task(
tenant_manager: Arc<TenantManager>,
remote_storage: GenericRemoteStorage,
@@ -81,15 +75,14 @@ pub(super) async fn downloader_task(
cancel: CancellationToken,
root_ctx: RequestContext,
) {
// How many tenants' secondary download operations we will run concurrently
let tenant_concurrency = tenant_manager.get_conf().secondary_download_concurrency;
let concurrency = tenant_manager.get_conf().secondary_download_concurrency;
let generator = SecondaryDownloader {
tenant_manager,
remote_storage,
root_ctx,
};
let mut scheduler = Scheduler::new(generator, tenant_concurrency);
let mut scheduler = Scheduler::new(generator, concurrency);
scheduler
.run(command_queue, background_jobs_can_start, cancel)
@@ -414,7 +407,7 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
tracing::warn!("Insufficient space while downloading. Will retry later.");
}
Err(UpdateError::Cancelled) => {
tracing::debug!("Shut down while downloading");
tracing::info!("Shut down while downloading");
},
Err(UpdateError::Deserialize(e)) => {
tracing::error!("Corrupt content while downloading tenant: {e}");
@@ -716,7 +709,7 @@ impl<'a> TenantDownloader<'a> {
let mut layer_byte_count: u64 = timeline_state
.on_disk_layers
.values()
.map(|l| l.metadata.file_size())
.map(|l| l.metadata.file_size)
.sum();
// Remove on-disk layers that are no longer present in heatmap
@@ -727,7 +720,7 @@ impl<'a> TenantDownloader<'a> {
.get(layer_file_name)
.unwrap()
.metadata
.file_size();
.file_size;
let local_path = local_layer_path(
self.conf,
@@ -848,8 +841,6 @@ impl<'a> TenantDownloader<'a> {
tracing::debug!(timeline_id=%timeline.timeline_id, "Downloading layers, {} in heatmap", timeline.layers.len());
let mut download_futs = Vec::new();
// Download heatmap layers that are not present on local disk, or update their
// access time if they are already present.
for layer in timeline.layers {
@@ -886,9 +877,7 @@ impl<'a> TenantDownloader<'a> {
}
}
if on_disk.metadata != LayerFileMetadata::from(&layer.metadata)
|| on_disk.access_time != layer.access_time
{
if on_disk.metadata != layer.metadata || on_disk.access_time != layer.access_time {
// We already have this layer on disk. Update its access time.
tracing::debug!(
"Access time updated for layer {}: {} -> {}",
@@ -924,31 +913,14 @@ impl<'a> TenantDownloader<'a> {
}
}
download_futs.push(self.download_layer(
tenant_shard_id,
&timeline.timeline_id,
layer,
ctx,
));
}
// Break up layer downloads into chunks, so that for each chunk we can re-check how much
// concurrency to use based on activity level of remote storage.
while !download_futs.is_empty() {
let chunk =
download_futs.split_off(download_futs.len().saturating_sub(MAX_LAYER_CONCURRENCY));
let concurrency = Self::layer_concurrency(self.remote_storage.activity());
let mut result_stream = futures::stream::iter(chunk).buffered(concurrency);
let mut result_stream = std::pin::pin!(result_stream);
while let Some(result) = result_stream.next().await {
match result {
Err(e) => return Err(e),
Ok(None) => {
// No error, but we didn't download the layer. Don't mark it touched
}
Ok(Some(layer)) => touched.push(layer),
match self
.download_layer(tenant_shard_id, &timeline.timeline_id, layer, ctx)
.await?
{
Some(layer) => touched.push(layer),
None => {
// Not an error but we didn't download it: remote layer is missing. Don't add it to the list of
// things to consider touched.
}
}
}
@@ -979,7 +951,7 @@ impl<'a> TenantDownloader<'a> {
tenant_shard_id,
&timeline.timeline_id,
t.name,
LayerFileMetadata::from(&t.metadata),
t.metadata.clone(),
t.access_time,
local_path,
));
@@ -1024,7 +996,7 @@ impl<'a> TenantDownloader<'a> {
*tenant_shard_id,
*timeline_id,
&layer.name,
&LayerFileMetadata::from(&layer.metadata),
&layer.metadata,
&local_path,
&self.secondary_state.cancel,
ctx,
@@ -1083,19 +1055,6 @@ impl<'a> TenantDownloader<'a> {
Ok(Some(layer))
}
/// Calculate the currently allowed parallelism of layer download tasks, based on activity level of the remote storage
fn layer_concurrency(activity: RemoteStorageActivity) -> usize {
// When less than 75% of units are available, use minimum concurrency. Else, do a linear mapping
// of our concurrency range to the units available within the remaining 25%.
let clamp_at = (activity.read_total * 3) / 4;
if activity.read_available > clamp_at {
(MAX_LAYER_CONCURRENCY * (activity.read_available - clamp_at))
/ (activity.read_total - clamp_at)
} else {
MIN_LAYER_CONCURRENCY
}
}
}
/// Scan local storage and build up Layer objects based on the metadata in a HeatMapTimeline
@@ -1185,7 +1144,7 @@ async fn init_timeline_state(
tenant_shard_id,
&heatmap.timeline_id,
name,
LayerFileMetadata::from(&remote_meta.metadata),
remote_meta.metadata.clone(),
remote_meta.access_time,
file_path,
),
@@ -1219,58 +1178,3 @@ async fn init_timeline_state(
detail
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn layer_concurrency() {
// Totally idle
assert_eq!(
TenantDownloader::layer_concurrency(RemoteStorageActivity {
read_available: 16,
read_total: 16,
write_available: 16,
write_total: 16
}),
MAX_LAYER_CONCURRENCY
);
// Totally busy
assert_eq!(
TenantDownloader::layer_concurrency(RemoteStorageActivity {
read_available: 0,
read_total: 16,
write_available: 16,
write_total: 16
}),
MIN_LAYER_CONCURRENCY
);
// Edge of the range at which we interpolate
assert_eq!(
TenantDownloader::layer_concurrency(RemoteStorageActivity {
read_available: 12,
read_total: 16,
write_available: 16,
write_total: 16
}),
MIN_LAYER_CONCURRENCY
);
// Midpoint of the range in which we interpolate
assert_eq!(
TenantDownloader::layer_concurrency(RemoteStorageActivity {
read_available: 14,
read_total: 16,
write_available: 16,
write_total: 16
}),
MAX_LAYER_CONCURRENCY / 2
);
}
}

View File

@@ -1,6 +1,6 @@
use std::time::SystemTime;
use crate::tenant::{remote_timeline_client::index::IndexLayerMetadata, storage_layer::LayerName};
use crate::tenant::{remote_timeline_client::index::LayerFileMetadata, storage_layer::LayerName};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr, TimestampSeconds};
@@ -38,7 +38,7 @@ pub(crate) struct HeatMapTimeline {
#[derive(Serialize, Deserialize)]
pub(crate) struct HeatMapLayer {
pub(super) name: LayerName,
pub(super) metadata: IndexLayerMetadata,
pub(super) metadata: LayerFileMetadata,
#[serde_as(as = "TimestampSeconds<i64>")]
pub(super) access_time: SystemTime,
@@ -49,7 +49,7 @@ pub(crate) struct HeatMapLayer {
impl HeatMapLayer {
pub(crate) fn new(
name: LayerName,
metadata: IndexLayerMetadata,
metadata: LayerFileMetadata,
access_time: SystemTime,
) -> Self {
Self {

View File

@@ -47,7 +47,7 @@ use hex;
use itertools::Itertools;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::LayerAccessKind;
use pageserver_api::shard::TenantShardId;
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::fs::File;
@@ -473,7 +473,7 @@ impl ImageLayerInner {
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
let reads = self
.plan_reads(keyspace, ctx)
.plan_reads(keyspace, None, ctx)
.await
.map_err(GetVectoredError::Other)?;
@@ -485,9 +485,15 @@ impl ImageLayerInner {
Ok(())
}
/// Traverse the layer's index to build read operations on the overlap of the input keyspace
/// and the keys in this layer.
///
/// If shard_identity is provided, it will be used to filter keys down to those stored on
/// this shard.
async fn plan_reads(
&self,
keyspace: KeySpace,
shard_identity: Option<&ShardIdentity>,
ctx: &RequestContext,
) -> anyhow::Result<Vec<VectoredRead>> {
let mut planner = VectoredReadPlanner::new(
@@ -507,7 +513,6 @@ impl ImageLayerInner {
for range in keyspace.ranges.iter() {
let mut range_end_handled = false;
let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
range.start.write_to_byte_slice(&mut search_key);
@@ -520,12 +525,22 @@ impl ImageLayerInner {
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
assert!(key >= range.start);
let flag = if let Some(shard_identity) = shard_identity {
if shard_identity.is_key_disposable(&key) {
BlobFlag::Ignore
} else {
BlobFlag::None
}
} else {
BlobFlag::None
};
if key >= range.end {
planner.handle_range_end(offset);
range_end_handled = true;
break;
} else {
planner.handle(key, self.lsn, offset, BlobFlag::None);
planner.handle(key, self.lsn, offset, flag);
}
}
@@ -538,6 +553,50 @@ impl ImageLayerInner {
Ok(planner.finish())
}
/// Given a key range, select the parts of that range that should be retained by the ShardIdentity,
/// then execute vectored GET operations, passing the results of all read keys into the writer.
pub(super) async fn filter(
&self,
shard_identity: &ShardIdentity,
writer: &mut ImageLayerWriter,
ctx: &RequestContext,
) -> anyhow::Result<usize> {
// Fragment the range into the regions owned by this ShardIdentity
let plan = self
.plan_reads(
KeySpace {
// If asked for the total key space, plan_reads will give us all the keys in the layer
ranges: vec![Key::MIN..Key::MAX],
},
Some(shard_identity),
ctx,
)
.await?;
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
let mut key_count = 0;
for read in plan.into_iter() {
let buf_size = read.size();
let buf = BytesMut::with_capacity(buf_size);
let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
let frozen_buf = blobs_buf.buf.freeze();
for meta in blobs_buf.blobs.iter() {
let img_buf = frozen_buf.slice(meta.start..meta.end);
key_count += 1;
writer
.put_image(meta.meta.key, img_buf, ctx)
.await
.context(format!("Storing key {}", meta.meta.key))?;
}
}
Ok(key_count)
}
async fn do_reads_and_update_state(
&self,
reads: Vec<VectoredRead>,
@@ -855,3 +914,136 @@ impl Drop for ImageLayerWriter {
}
}
}
#[cfg(test)]
mod test {
use bytes::Bytes;
use pageserver_api::{
key::Key,
shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize},
};
use utils::{id::TimelineId, lsn::Lsn};
use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};
use super::ImageLayerWriter;
#[tokio::test]
async fn image_layer_rewrite() {
let harness = TenantHarness::create("test_image_layer_rewrite").unwrap();
let (tenant, ctx) = harness.load().await;
// The LSN at which we will create an image layer to filter
let lsn = Lsn(0xdeadbeef0000);
let timeline_id = TimelineId::generate();
let timeline = tenant
.create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
.await
.unwrap();
// This key range contains several 0x8000 page stripes, only one of which belongs to shard zero
let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
let range = input_start..input_end;
// Build an image layer to filter
let resident = {
let mut writer = ImageLayerWriter::new(
harness.conf,
timeline_id,
harness.tenant_shard_id,
&range,
lsn,
&ctx,
)
.await
.unwrap();
let foo_img = Bytes::from_static(&[1, 2, 3, 4]);
let mut key = range.start;
while key < range.end {
writer.put_image(key, foo_img.clone(), &ctx).await.unwrap();
key = key.next();
}
writer.finish(&timeline, &ctx).await.unwrap()
};
let original_size = resident.metadata().file_size;
// Filter for various shards: this exercises cases like values at start of key range, end of key
// range, middle of key range.
for shard_number in 0..4 {
let mut filtered_writer = ImageLayerWriter::new(
harness.conf,
timeline_id,
harness.tenant_shard_id,
&range,
lsn,
&ctx,
)
.await
.unwrap();
// TenantHarness gave us an unsharded tenant, but we'll use a sharded ShardIdentity
// to exercise filter()
let shard_identity = ShardIdentity::new(
ShardNumber(shard_number),
ShardCount::new(4),
ShardStripeSize(0x8000),
)
.unwrap();
let wrote_keys = resident
.filter(&shard_identity, &mut filtered_writer, &ctx)
.await
.unwrap();
let replacement = if wrote_keys > 0 {
Some(filtered_writer.finish(&timeline, &ctx).await.unwrap())
} else {
None
};
// This exact size and those below will need updating as/when the layer encoding changes, but
// should be deterministic for a given version of the format, as we used no randomness generating the input.
assert_eq!(original_size, 1597440);
match shard_number {
0 => {
// We should have written out just one stripe for our shard identity
assert_eq!(wrote_keys, 0x8000);
let replacement = replacement.unwrap();
// We should have dropped some of the data
assert!(replacement.metadata().file_size < original_size);
assert!(replacement.metadata().file_size > 0);
// Assert that we dropped ~3/4 of the data.
assert_eq!(replacement.metadata().file_size, 417792);
}
1 => {
// Shard 1 has no keys in our input range
assert_eq!(wrote_keys, 0x0);
assert!(replacement.is_none());
}
2 => {
// Shard 2 has one stripes in the input range
assert_eq!(wrote_keys, 0x8000);
let replacement = replacement.unwrap();
assert!(replacement.metadata().file_size < original_size);
assert!(replacement.metadata().file_size > 0);
assert_eq!(replacement.metadata().file_size, 417792);
}
3 => {
// Shard 3 has two stripes in the input range
assert_eq!(wrote_keys, 0x10000);
let replacement = replacement.unwrap();
assert!(replacement.metadata().file_size < original_size);
assert!(replacement.metadata().file_size > 0);
assert_eq!(replacement.metadata().file_size, 811008);
}
_ => unreachable!(),
}
}
}
}

View File

@@ -4,7 +4,7 @@ use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::{
HistoricLayerInfo, LayerAccessKind, LayerResidenceEventReason, LayerResidenceStatus,
};
use pageserver_api::shard::{ShardIndex, TenantShardId};
use pageserver_api::shard::{ShardIdentity, ShardIndex, TenantShardId};
use std::ops::Range;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
@@ -23,10 +23,10 @@ use crate::tenant::timeline::GetVectoredError;
use crate::tenant::{remote_timeline_client::LayerFileMetadata, Timeline};
use super::delta_layer::{self, DeltaEntry};
use super::image_layer;
use super::image_layer::{self};
use super::{
AsLayerDesc, LayerAccessStats, LayerAccessStatsReset, LayerName, PersistentLayerDesc,
ValueReconstructResult, ValueReconstructState, ValuesReconstructState,
AsLayerDesc, ImageLayerWriter, LayerAccessStats, LayerAccessStatsReset, LayerName,
PersistentLayerDesc, ValueReconstructResult, ValueReconstructState, ValuesReconstructState,
};
use utils::generation::Generation;
@@ -161,7 +161,7 @@ impl Layer {
timeline.tenant_shard_id,
timeline.timeline_id,
file_name,
metadata.file_size(),
metadata.file_size,
);
let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted);
@@ -194,7 +194,7 @@ impl Layer {
timeline.tenant_shard_id,
timeline.timeline_id,
file_name,
metadata.file_size(),
metadata.file_size,
);
let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident);
@@ -227,7 +227,7 @@ impl Layer {
timeline
.metrics
.resident_physical_size_add(metadata.file_size());
.resident_physical_size_add(metadata.file_size);
ResidentLayer { downloaded, owner }
}
@@ -1802,16 +1802,15 @@ impl ResidentLayer {
use LayerKind::*;
let owner = &self.owner.0;
match self.downloaded.get(owner, ctx).await? {
Delta(ref d) => {
// this is valid because the DownloadedLayer::kind is a OnceCell, not a
// Mutex<OnceCell>, so we cannot go and deinitialize the value with OnceCell::take
// while it's being held.
owner
.access_stats
.record_access(LayerAccessKind::KeyIter, ctx);
// this is valid because the DownloadedLayer::kind is a OnceCell, not a
// Mutex<OnceCell>, so we cannot go and deinitialize the value with OnceCell::take
// while it's being held.
delta_layer::DeltaLayerInner::load_keys(d, ctx)
.await
.with_context(|| format!("Layer index is corrupted for {self}"))
@@ -1820,6 +1819,23 @@ impl ResidentLayer {
}
}
/// Read all they keys in this layer which match the ShardIdentity, and write them all to
/// the provided writer. Return the number of keys written.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(layer=%self))]
pub(crate) async fn filter<'a>(
&'a self,
shard_identity: &ShardIdentity,
writer: &mut ImageLayerWriter,
ctx: &RequestContext,
) -> anyhow::Result<usize> {
use LayerKind::*;
match self.downloaded.get(&self.owner.0, ctx).await? {
Delta(_) => anyhow::bail!(format!("cannot filter() on a delta layer {self}")),
Image(i) => i.filter(shard_identity, writer, ctx).await,
}
}
/// Returns the amount of keys and values written to the writer.
pub(crate) async fn copy_delta_prefix(
&self,

View File

@@ -41,6 +41,7 @@ use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::{
bin_ser::BeSer,
fs_ext,
sync::gate::{Gate, GateGuard},
vec_map::VecMap,
};
@@ -60,6 +61,7 @@ use std::{
ops::ControlFlow,
};
use crate::pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS;
use crate::{
aux_file::AuxFileSizeEstimator,
tenant::{
@@ -88,9 +90,6 @@ use crate::{
metrics::ScanLatencyOngoingRecording, tenant::timeline::logical_size::CurrentLogicalSize,
};
use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind};
use crate::{
pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS, tenant::timeline::init::LocalLayerFileMetadata,
};
use crate::{
pgdatadir_mapping::{AuxFilesDirectory, DirectoryKind},
virtual_file::{MaybeFatalIo, VirtualFile},
@@ -1424,7 +1423,7 @@ impl Timeline {
let layer_map = guard.layer_map();
let mut size = 0;
for l in layer_map.iter_historic_layers() {
size += l.file_size();
size += l.file_size;
}
size
}
@@ -2454,8 +2453,6 @@ impl Timeline {
let span = tracing::Span::current();
// Copy to move into the task we're about to spawn
let generation = self.generation;
let shard = self.get_shard_index();
let this = self.myself.upgrade().expect("&self method holds the arc");
let (loaded_layers, needs_cleanup, total_physical_size) = tokio::task::spawn_blocking({
@@ -2469,11 +2466,14 @@ impl Timeline {
for discovered in discovered {
let (name, kind) = match discovered {
Discovered::Layer(layer_file_name, local_path, file_size) => {
discovered_layers.push((layer_file_name, local_path, file_size));
Discovered::Layer(layer_file_name, local_metadata) => {
discovered_layers.push((layer_file_name, local_metadata));
continue;
}
Discovered::IgnoredBackup => {
Discovered::IgnoredBackup(path) => {
std::fs::remove_file(path)
.or_else(fs_ext::ignore_not_found)
.fatal_err("Removing .old file");
continue;
}
Discovered::Unknown(file_name) => {
@@ -2499,13 +2499,8 @@ impl Timeline {
);
}
let decided = init::reconcile(
discovered_layers,
index_part.as_ref(),
disk_consistent_lsn,
generation,
shard,
);
let decided =
init::reconcile(discovered_layers, index_part.as_ref(), disk_consistent_lsn);
let mut loaded_layers = Vec::new();
let mut needs_cleanup = Vec::new();
@@ -2513,21 +2508,6 @@ impl Timeline {
for (name, decision) in decided {
let decision = match decision {
Ok(UseRemote { local, remote }) => {
// Remote is authoritative, but we may still choose to retain
// the local file if the contents appear to match
if local.metadata.file_size() == remote.file_size() {
// Use the local file, but take the remote metadata so that we pick up
// the correct generation.
UseLocal(LocalLayerFileMetadata {
metadata: remote,
local_path: local.local_path,
})
} else {
init::cleanup_local_file_for_remote(&local, &remote)?;
UseRemote { local, remote }
}
}
Ok(decision) => decision,
Err(DismissedLayer::Future { local }) => {
if let Some(local) = local {
@@ -2545,6 +2525,11 @@ impl Timeline {
// this file never existed remotely, we will have to do rework
continue;
}
Err(DismissedLayer::BadMetadata(local)) => {
init::cleanup_local_file_for_remote(&local)?;
// this file never existed remotely, we will have to do rework
continue;
}
};
match &name {
@@ -2555,14 +2540,12 @@ impl Timeline {
tracing::debug!(layer=%name, ?decision, "applied");
let layer = match decision {
UseLocal(local) => {
total_physical_size += local.metadata.file_size();
Layer::for_resident(conf, &this, local.local_path, name, local.metadata)
Resident { local, remote } => {
total_physical_size += local.file_size;
Layer::for_resident(conf, &this, local.local_path, name, remote)
.drop_eviction_guard()
}
Evicted(remote) | UseRemote { remote, .. } => {
Layer::for_evicted(conf, &this, name, remote)
}
Evicted(remote) => Layer::for_evicted(conf, &this, name, remote),
};
loaded_layers.push(layer);
@@ -3071,7 +3054,7 @@ impl Timeline {
HeatMapLayer::new(
layer.layer_desc().layer_name(),
(&layer.metadata()).into(),
layer.metadata(),
last_activity_ts,
)
});
@@ -3882,7 +3865,7 @@ impl Timeline {
}
}
let (layers_to_upload, delta_layers_to_add) = if create_image_layer {
let (layers_to_upload, delta_layer_to_add) = if create_image_layer {
// Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
// require downloading anything during initial import.
let ((rel_partition, metadata_partition), _lsn) = self
@@ -3899,23 +3882,26 @@ impl Timeline {
}
// For metadata, always create delta layers.
let delta_layers = if !metadata_partition.parts.is_empty() {
// In the current implementation, the metadata partition will only have one part, and the part will only have
// one single key range. This might change in the future.
let mut delta_layers_created = Vec::new();
for ks in &metadata_partition.parts {
for range in &ks.0.ranges {
let layer = self
.create_delta_layer(&frozen_layer, Some(range.clone()), ctx)
.await?;
if let Some(layer) = layer {
delta_layers_created.push(layer);
}
}
}
delta_layers_created
let delta_layer = if !metadata_partition.parts.is_empty() {
assert_eq!(
metadata_partition.parts.len(),
1,
"currently sparse keyspace should only contain a single aux file keyspace"
);
let metadata_keyspace = &metadata_partition.parts[0];
assert_eq!(
metadata_keyspace.0.ranges.len(),
1,
"aux file keyspace should be a single range"
);
self.create_delta_layer(
&frozen_layer,
Some(metadata_keyspace.0.ranges[0].clone()),
ctx,
)
.await?
} else {
Vec::new()
None
};
// For image layers, we add them immediately into the layer map.
@@ -3930,8 +3916,12 @@ impl Timeline {
.await?,
);
layers_to_upload.extend(delta_layers.iter().cloned());
(layers_to_upload, delta_layers)
if let Some(delta_layer) = delta_layer {
layers_to_upload.push(delta_layer.clone());
(layers_to_upload, Some(delta_layer))
} else {
(layers_to_upload, None)
}
} else {
// Normal case, write out a L0 delta layer file.
// `create_delta_layer` will not modify the layer map.
@@ -3939,7 +3929,12 @@ impl Timeline {
let Some(layer) = self.create_delta_layer(&frozen_layer, None, ctx).await? else {
panic!("delta layer cannot be empty if no filter is applied");
};
(vec![layer.clone()], vec![layer])
(
// FIXME: even though we have a single image and single delta layer assumption
// we push them to vec
vec![layer.clone()],
Some(layer),
)
};
pausable_failpoint!("flush-layer-cancel-after-writing-layer-out-pausable");
@@ -3960,7 +3955,7 @@ impl Timeline {
return Err(FlushLayerError::Cancelled);
}
guard.finish_flush_l0_layer(&delta_layers_to_add, &frozen_layer, &self.metrics);
guard.finish_flush_l0_layer(delta_layer_to_add.as_ref(), &frozen_layer, &self.metrics);
if self.set_disk_consistent_lsn(disk_consistent_lsn) {
// Schedule remote uploads that will reflect our new disk_consistent_lsn
@@ -4713,11 +4708,16 @@ impl Timeline {
async fn rewrite_layers(
self: &Arc<Self>,
replace_layers: Vec<(Layer, ResidentLayer)>,
drop_layers: Vec<Layer>,
mut replace_layers: Vec<(Layer, ResidentLayer)>,
mut drop_layers: Vec<Layer>,
) -> anyhow::Result<()> {
let mut guard = self.layers.write().await;
// Trim our lists in case our caller (compaction) raced with someone else (GC) removing layers: we want
// to avoid double-removing, and avoid rewriting something that was removed.
replace_layers.retain(|(l, _)| guard.contains(l));
drop_layers.retain(|l| guard.contains(l));
guard.rewrite_layers(&replace_layers, &drop_layers, &self.metrics);
let upload_layers: Vec<_> = replace_layers.into_iter().map(|r| r.1).collect();
@@ -5592,26 +5592,6 @@ fn is_send() {
_assert_send::<TimelineWriter<'_>>();
}
/// Add a suffix to a layer file's name: .{num}.old
/// Uses the first available num (starts at 0)
fn rename_to_backup(path: &Utf8Path) -> anyhow::Result<()> {
let filename = path
.file_name()
.ok_or_else(|| anyhow!("Path {path} don't have a file name"))?;
let mut new_path = path.to_owned();
for i in 0u32.. {
new_path.set_file_name(format!("{filename}.{i}.old"));
if !new_path.exists() {
std::fs::rename(path, &new_path)
.with_context(|| format!("rename {path:?} to {new_path:?}"))?;
return Ok(());
}
}
bail!("couldn't find an unused backup number for {:?}", path)
}
#[cfg(test)]
mod tests {
use utils::{id::TimelineId, lsn::Lsn};

View File

@@ -176,13 +176,24 @@ impl Timeline {
async fn compact_shard_ancestors(
self: &Arc<Self>,
rewrite_max: usize,
_ctx: &RequestContext,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let mut drop_layers = Vec::new();
let layers_to_rewrite: Vec<Layer> = Vec::new();
let mut layers_to_rewrite: Vec<Layer> = Vec::new();
// We will use the PITR cutoff as a condition for rewriting layers.
let pitr_cutoff = self.gc_info.read().unwrap().cutoffs.pitr;
// We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
// layer is behind this Lsn, it indicates that the layer is being retained beyond the
// pitr_interval, for example because a branchpoint references it.
//
// Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
// are rewriting layers.
let latest_gc_cutoff = self.get_latest_gc_cutoff_lsn();
tracing::info!(
"latest_gc_cutoff: {}, pitr cutoff {}",
*latest_gc_cutoff,
self.gc_info.read().unwrap().cutoffs.pitr
);
let layers = self.layers.read().await;
for layer_desc in layers.layer_map().iter_historic_layers() {
@@ -241,9 +252,9 @@ impl Timeline {
// Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
// without incurring the I/O cost of a rewrite.
if layer_desc.get_lsn_range().end >= pitr_cutoff {
debug!(%layer, "Skipping rewrite of layer still in PITR window ({} >= {})",
layer_desc.get_lsn_range().end, pitr_cutoff);
if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
layer_desc.get_lsn_range().end, *latest_gc_cutoff);
continue;
}
@@ -253,13 +264,10 @@ impl Timeline {
continue;
}
// Only rewrite layers if they would have different remote paths: either they belong to this
// shard but an old generation, or they belonged to another shard. This also implicitly
// guarantees that the layer is persistent in remote storage (as only remote persistent
// layers are carried across shard splits, any local-only layer would be in the current generation)
if layer.metadata().generation == self.generation
&& layer.metadata().shard.shard_count == self.shard_identity.count
{
// Only rewrite layers if their generations differ. This guarantees:
// - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
// - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
if layer.metadata().generation == self.generation {
debug!(%layer, "Skipping rewrite, is not from old generation");
continue;
}
@@ -272,18 +280,69 @@ impl Timeline {
}
// Fall through: all our conditions for doing a rewrite passed.
// TODO: implement rewriting
tracing::debug!(%layer, "Would rewrite layer");
layers_to_rewrite.push(layer);
}
// Drop the layers read lock: we will acquire it for write in [`Self::rewrite_layers`]
// Drop read lock on layer map before we start doing time-consuming I/O
drop(layers);
// TODO: collect layers to rewrite
let replace_layers = Vec::new();
let mut replace_image_layers = Vec::new();
for layer in layers_to_rewrite {
tracing::info!(layer=%layer, "Rewriting layer after shard split...");
let mut image_layer_writer = ImageLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
&layer.layer_desc().key_range,
layer.layer_desc().image_layer_lsn(),
ctx,
)
.await?;
// Safety of layer rewrites:
// - We are writing to a different local file path than we are reading from, so the old Layer
// cannot interfere with the new one.
// - In the page cache, contents for a particular VirtualFile are stored with a file_id that
// is different for two layers with the same name (in `ImageLayerInner::new` we always
// acquire a fresh id from [`crate::page_cache::next_file_id`]. So readers do not risk
// reading the index from one layer file, and then data blocks from the rewritten layer file.
// - Any readers that have a reference to the old layer will keep it alive until they are done
// with it. If they are trying to promote from remote storage, that will fail, but this is the same
// as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
// - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
// - GC, which at worst witnesses us "undelete" a layer that they just deleted.
// - ingestion, which only inserts layers, therefore cannot collide with us.
let resident = layer.download_and_keep_resident().await?;
let keys_written = resident
.filter(&self.shard_identity, &mut image_layer_writer, ctx)
.await?;
if keys_written > 0 {
let new_layer = image_layer_writer.finish(self, ctx).await?;
tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
layer.metadata().file_size,
new_layer.metadata().file_size);
replace_image_layers.push((layer, new_layer));
} else {
// Drop the old layer. Usually for this case we would already have noticed that
// the layer has no data for us with the ShardedRange check above, but
drop_layers.push(layer);
}
}
// At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
// metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
// to remote index) and be removed. This is inefficient but safe.
fail::fail_point!("compact-shard-ancestors-localonly");
// Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
self.rewrite_layers(replace_layers, drop_layers).await?;
self.rewrite_layers(replace_image_layers, drop_layers)
.await?;
fail::fail_point!("compact-shard-ancestors-enqueued");
// We wait for all uploads to complete before finishing this compaction stage. This is not
// necessary for correctness, but it simplifies testing, and avoids proceeding with another
@@ -291,6 +350,8 @@ impl Timeline {
// load.
self.remote_client.wait_completion().await?;
fail::fail_point!("compact-shard-ancestors-persistent");
Ok(())
}

View File

@@ -7,19 +7,20 @@ use crate::{
index::{IndexPart, LayerFileMetadata},
},
storage_layer::LayerName,
Generation,
},
};
use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::shard::ShardIndex;
use std::{collections::HashMap, str::FromStr};
use std::{
collections::{hash_map, HashMap},
str::FromStr,
};
use utils::lsn::Lsn;
/// Identified files in the timeline directory.
pub(super) enum Discovered {
/// The only one we care about
Layer(LayerName, Utf8PathBuf, u64),
Layer(LayerName, LocalLayerFileMetadata),
/// Old ephmeral files from previous launches, should be removed
Ephemeral(String),
/// Old temporary timeline files, unsure what these really are, should be removed
@@ -27,7 +28,7 @@ pub(super) enum Discovered {
/// Temporary on-demand download files, should be removed
TemporaryDownload(String),
/// Backup file from previously future layers
IgnoredBackup,
IgnoredBackup(Utf8PathBuf),
/// Unrecognized, warn about these
Unknown(String),
}
@@ -43,12 +44,15 @@ pub(super) fn scan_timeline_dir(path: &Utf8Path) -> anyhow::Result<Vec<Discovere
let discovered = match LayerName::from_str(&file_name) {
Ok(file_name) => {
let file_size = direntry.metadata()?.len();
Discovered::Layer(file_name, direntry.path().to_owned(), file_size)
Discovered::Layer(
file_name,
LocalLayerFileMetadata::new(direntry.path().to_owned(), file_size),
)
}
Err(_) => {
if file_name.ends_with(".old") {
// ignore these
Discovered::IgnoredBackup
Discovered::IgnoredBackup(direntry.path().to_owned())
} else if remote_timeline_client::is_temp_download_file(direntry.path()) {
Discovered::TemporaryDownload(file_name)
} else if is_ephemeral_file(&file_name) {
@@ -71,37 +75,32 @@ pub(super) fn scan_timeline_dir(path: &Utf8Path) -> anyhow::Result<Vec<Discovere
/// this structure extends it with metadata describing the layer's presence in local storage.
#[derive(Clone, Debug)]
pub(super) struct LocalLayerFileMetadata {
pub(super) metadata: LayerFileMetadata,
pub(super) file_size: u64,
pub(super) local_path: Utf8PathBuf,
}
impl LocalLayerFileMetadata {
pub fn new(
local_path: Utf8PathBuf,
file_size: u64,
generation: Generation,
shard: ShardIndex,
) -> Self {
pub fn new(local_path: Utf8PathBuf, file_size: u64) -> Self {
Self {
local_path,
metadata: LayerFileMetadata::new(file_size, generation, shard),
file_size,
}
}
}
/// Decision on what to do with a layer file after considering its local and remote metadata.
/// For a layer that is present in remote metadata, this type describes how to handle
/// it during startup: it is either Resident (and we have some metadata about a local file),
/// or it is Evicted (and we only have remote metadata).
#[derive(Clone, Debug)]
pub(super) enum Decision {
/// The layer is not present locally.
Evicted(LayerFileMetadata),
/// The layer is present locally, but local metadata does not match remote; we must
/// delete it and treat it as evicted.
UseRemote {
/// The layer is present locally, and metadata matches: we may hook up this layer to the
/// existing file in local storage.
Resident {
local: LocalLayerFileMetadata,
remote: LayerFileMetadata,
},
/// The layer is present locally, and metadata matches.
UseLocal(LocalLayerFileMetadata),
}
/// A layer needs to be left out of the layer map.
@@ -117,77 +116,81 @@ pub(super) enum DismissedLayer {
/// In order to make crash safe updates to layer map, we must dismiss layers which are only
/// found locally or not yet included in the remote `index_part.json`.
LocalOnly(LocalLayerFileMetadata),
/// The layer exists in remote storage but the local layer's metadata (e.g. file size)
/// does not match it
BadMetadata(LocalLayerFileMetadata),
}
/// Merges local discoveries and remote [`IndexPart`] to a collection of decisions.
pub(super) fn reconcile(
discovered: Vec<(LayerName, Utf8PathBuf, u64)>,
local_layers: Vec<(LayerName, LocalLayerFileMetadata)>,
index_part: Option<&IndexPart>,
disk_consistent_lsn: Lsn,
generation: Generation,
shard: ShardIndex,
) -> Vec<(LayerName, Result<Decision, DismissedLayer>)> {
use Decision::*;
let Some(index_part) = index_part else {
// If we have no remote metadata, no local layer files are considered valid to load
return local_layers
.into_iter()
.map(|(layer_name, local_metadata)| {
(layer_name, Err(DismissedLayer::LocalOnly(local_metadata)))
})
.collect();
};
// name => (local_metadata, remote_metadata)
type Collected =
HashMap<LayerName, (Option<LocalLayerFileMetadata>, Option<LayerFileMetadata>)>;
let mut result = Vec::new();
let mut discovered = discovered
.into_iter()
.map(|(layer_name, local_path, file_size)| {
(
layer_name,
// The generation and shard here will be corrected to match IndexPart in the merge below, unless
// it is not in IndexPart, in which case using our current generation makes sense
// because it will be uploaded in this generation.
(
Some(LocalLayerFileMetadata::new(
local_path, file_size, generation, shard,
)),
None,
),
)
})
.collect::<Collected>();
let mut remote_layers = HashMap::new();
// merge any index_part information, when available
// Construct Decisions for layers that are found locally, if they're in remote metadata. Otherwise
// construct DismissedLayers to get rid of them.
for (layer_name, local_metadata) in local_layers {
let Some(remote_metadata) = index_part.layer_metadata.get(&layer_name) else {
result.push((layer_name, Err(DismissedLayer::LocalOnly(local_metadata))));
continue;
};
if remote_metadata.file_size != local_metadata.file_size {
result.push((layer_name, Err(DismissedLayer::BadMetadata(local_metadata))));
continue;
}
remote_layers.insert(
layer_name,
Decision::Resident {
local: local_metadata,
remote: remote_metadata.clone(),
},
);
}
// Construct Decision for layers that were not found locally
index_part
.as_ref()
.map(|ip| ip.layer_metadata.iter())
.into_iter()
.flatten()
.map(|(name, metadata)| (name, LayerFileMetadata::from(metadata)))
.layer_metadata
.iter()
.for_each(|(name, metadata)| {
if let Some(existing) = discovered.get_mut(name) {
existing.1 = Some(metadata);
} else {
discovered.insert(name.to_owned(), (None, Some(metadata)));
if let hash_map::Entry::Vacant(entry) = remote_layers.entry(name.clone()) {
entry.insert(Decision::Evicted(metadata.clone()));
}
});
discovered
.into_iter()
.map(|(name, (local, remote))| {
let decision = if name.is_in_future(disk_consistent_lsn) {
Err(DismissedLayer::Future { local })
} else {
match (local, remote) {
(Some(local), Some(remote)) if local.metadata != remote => {
Ok(UseRemote { local, remote })
}
(Some(x), Some(_)) => Ok(UseLocal(x)),
(None, Some(x)) => Ok(Evicted(x)),
(Some(x), None) => Err(DismissedLayer::LocalOnly(x)),
(None, None) => {
unreachable!("there must not be any non-local non-remote files")
}
}
};
// For layers that were found in authoritative remote metadata, apply a final check that they are within
// the disk_consistent_lsn.
result.extend(remote_layers.into_iter().map(|(name, decision)| {
if name.is_in_future(disk_consistent_lsn) {
match decision {
Decision::Evicted(_remote) => (name, Err(DismissedLayer::Future { local: None })),
Decision::Resident {
local,
remote: _remote,
} => (name, Err(DismissedLayer::Future { local: Some(local) })),
}
} else {
(name, Ok(decision))
}
}));
(name, decision)
})
.collect::<Vec<_>>()
result
}
pub(super) fn cleanup(path: &Utf8Path, kind: &str) -> anyhow::Result<()> {
@@ -196,25 +199,15 @@ pub(super) fn cleanup(path: &Utf8Path, kind: &str) -> anyhow::Result<()> {
std::fs::remove_file(path).with_context(|| format!("failed to remove {kind} at {path}"))
}
pub(super) fn cleanup_local_file_for_remote(
local: &LocalLayerFileMetadata,
remote: &LayerFileMetadata,
) -> anyhow::Result<()> {
let local_size = local.metadata.file_size();
let remote_size = remote.file_size();
pub(super) fn cleanup_local_file_for_remote(local: &LocalLayerFileMetadata) -> anyhow::Result<()> {
let local_size = local.file_size;
let path = &local.local_path;
let file_name = path.file_name().expect("must be file path");
tracing::warn!("removing local file {file_name:?} because it has unexpected length {local_size}; length in remote index is {remote_size}");
if let Err(err) = crate::tenant::timeline::rename_to_backup(path) {
assert!(
path.exists(),
"we would leave the local_layer without a file if this does not hold: {path}",
);
Err(err)
} else {
Ok(())
}
tracing::warn!(
"removing local file {file_name:?} because it has unexpected length {local_size};"
);
std::fs::remove_file(path).with_context(|| format!("failed to remove layer at {path}"))
}
pub(super) fn cleanup_future_layer(
@@ -236,8 +229,8 @@ pub(super) fn cleanup_local_only_file(
) -> anyhow::Result<()> {
let kind = name.kind();
tracing::info!(
"found local-only {kind} layer {name}, metadata {:?}",
local.metadata
"found local-only {kind} layer {name} size {}",
local.file_size
);
std::fs::remove_file(&local.local_path)?;
Ok(())

View File

@@ -166,7 +166,7 @@ impl LayerManager {
/// Flush a frozen layer and add the written delta layer to the layer map.
pub(crate) fn finish_flush_l0_layer(
&mut self,
delta_layers: &[ResidentLayer],
delta_layer: Option<&ResidentLayer>,
frozen_layer_for_check: &Arc<InMemoryLayer>,
metrics: &TimelineMetrics,
) {
@@ -181,12 +181,10 @@ impl LayerManager {
// layer to disk at the same time, that would not work.
assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
if !delta_layers.is_empty() {
if let Some(l) = delta_layer {
let mut updates = self.layer_map.batch_update();
for l in delta_layers {
Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
metrics.record_new_file_metrics(l.layer_desc().file_size);
}
Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
metrics.record_new_file_metrics(l.layer_desc().file_size);
updates.flush();
}
}
@@ -214,13 +212,34 @@ impl LayerManager {
&mut self,
rewrite_layers: &[(Layer, ResidentLayer)],
drop_layers: &[Layer],
_metrics: &TimelineMetrics,
metrics: &TimelineMetrics,
) {
let mut updates = self.layer_map.batch_update();
for (old_layer, new_layer) in rewrite_layers {
debug_assert_eq!(
old_layer.layer_desc().key_range,
new_layer.layer_desc().key_range
);
debug_assert_eq!(
old_layer.layer_desc().lsn_range,
new_layer.layer_desc().lsn_range
);
// TODO: implement rewrites (currently this code path only used for drops)
assert!(rewrite_layers.is_empty());
// Safety: we may never rewrite the same file in-place. Callers are responsible
// for ensuring that they only rewrite layers after something changes the path,
// such as an increment in the generation number.
assert_ne!(old_layer.local_path(), new_layer.local_path());
Self::delete_historic_layer(old_layer, &mut updates, &mut self.layer_fmgr);
Self::insert_historic_layer(
new_layer.as_ref().clone(),
&mut updates,
&mut self.layer_fmgr,
);
metrics.record_new_file_metrics(new_layer.layer_desc().file_size);
}
for l in drop_layers {
Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
}

View File

@@ -213,10 +213,7 @@ impl UploadQueue {
let mut files = HashMap::with_capacity(index_part.layer_metadata.len());
for (layer_name, layer_metadata) in &index_part.layer_metadata {
files.insert(
layer_name.to_owned(),
LayerFileMetadata::from(layer_metadata),
);
files.insert(layer_name.to_owned(), layer_metadata.clone());
}
info!(
@@ -322,9 +319,7 @@ impl std::fmt::Display for UploadOp {
write!(
f,
"UploadLayer({}, size={:?}, gen={:?})",
layer,
metadata.file_size(),
metadata.generation
layer, metadata.file_size, metadata.generation
)
}
UploadOp::UploadMetadata(_, lsn) => {

View File

@@ -51,7 +51,6 @@ int flush_every_n_requests = 8;
int neon_protocol_version = 2;
static int n_reconnect_attempts = 0;
static int max_reconnect_attempts = 60;
static int stripe_size;
@@ -95,18 +94,44 @@ static shmem_startup_hook_type prev_shmem_startup_hook;
static PagestoreShmemState *pagestore_shared;
static uint64 pagestore_local_counter = 0;
typedef enum PSConnectionState {
PS_Disconnected, /* no connection yet */
PS_Connecting_Startup, /* connection starting up */
PS_Connecting_PageStream, /* negotiating pagestream */
PS_Connected, /* connected, pagestream established */
} PSConnectionState;
/* This backend's per-shard connections */
typedef struct
{
PGconn *conn;
TimestampTz last_connect_time; /* read-only debug value */
TimestampTz last_reconnect_time;
uint32 delay_us;
int n_reconnect_attempts;
/*---
* WaitEventSet containing:
* - WL_SOCKET_READABLE on 'conn'
* - WL_LATCH_SET on MyLatch, and
* - WL_EXIT_ON_PM_DEATH.
* Pageserver connection state, i.e.
* disconnected: conn == NULL, wes == NULL;
* conn_startup: connection initiated, waiting for connection establishing
* conn_ps: PageStream query sent, waiting for confirmation
* connected: PageStream established
*/
WaitEventSet *wes;
PSConnectionState state;
PGconn *conn;
/*---
* WaitEventSet containing:
* - WL_SOCKET_READABLE on 'conn'
* - WL_LATCH_SET on MyLatch, and
* - WL_EXIT_ON_PM_DEATH.
*/
WaitEventSet *wes_read;
/*---
* WaitEventSet containing:
* - WL_SOCKET_WRITABLE on 'conn'
* - WL_LATCH_SET on MyLatch, and
* - WL_EXIT_ON_PM_DEATH.
*/
WaitEventSet *wes_write;
} PageServer;
static PageServer page_servers[MAX_SHARDS];
@@ -303,119 +328,269 @@ get_shard_number(BufferTag *tag)
return hash % n_shards;
}
static inline void
CLEANUP_AND_DISCONNECT(PageServer *shard)
{
if (shard->wes_read)
{
FreeWaitEventSet(shard->wes_read);
shard->wes_read = NULL;
}
if (shard->wes_write)
{
FreeWaitEventSet(shard->wes_write);
shard->wes_write = NULL;
}
if (shard->conn)
{
PQfinish(shard->conn);
shard->conn = NULL;
}
shard->state = PS_Disconnected;
}
/*
* Connect to a pageserver, or continue to try to connect if we're yet to
* complete the connection (e.g. due to receiving an earlier cancellation
* during connection start).
* Returns true if successfully connected; false if the connection failed.
*
* Throws errors in unrecoverable situations, or when this backend's query
* is canceled.
*/
static bool
pageserver_connect(shardno_t shard_no, int elevel)
{
char *query;
int ret;
const char *keywords[3];
const char *values[3];
int n;
PGconn *conn;
WaitEventSet *wes;
PageServer *shard = &page_servers[shard_no];
char connstr[MAX_PAGESERVER_CONNSTRING_SIZE];
static TimestampTz last_connect_time = 0;
static uint64_t delay_us = MIN_RECONNECT_INTERVAL_USEC;
TimestampTz now;
uint64_t us_since_last_connect;
bool broke_from_loop = false;
Assert(page_servers[shard_no].conn == NULL);
/*
* Get the connection string for this shard. If the shard map has been
* updated since we last looked, this will also disconnect any existing
* pageserver connections as a side effect.
* Note that connstr is used both during connection start, and when we
* log the successful connection.
*/
load_shard_map(shard_no, connstr, NULL);
now = GetCurrentTimestamp();
us_since_last_connect = now - last_connect_time;
if (us_since_last_connect < MAX_RECONNECT_INTERVAL_USEC)
switch (shard->state)
{
pg_usleep(delay_us);
delay_us *= 2;
}
else
case PS_Disconnected:
{
delay_us = MIN_RECONNECT_INTERVAL_USEC;
}
const char *keywords[3];
const char *values[3];
int n_pgsql_params;
TimestampTz now;
int64 us_since_last_attempt;
/*
* Connect using the connection string we got from the
* neon.pageserver_connstring GUC. If the NEON_AUTH_TOKEN environment
* variable was set, use that as the password.
*
* The connection options are parsed in the order they're given, so when
* we set the password before the connection string, the connection string
* can override the password from the env variable. Seems useful, although
* we don't currently use that capability anywhere.
*/
n = 0;
if (neon_auth_token)
{
keywords[n] = "password";
values[n] = neon_auth_token;
n++;
/* Make sure we start with a clean slate */
CLEANUP_AND_DISCONNECT(shard);
neon_shard_log(shard_no, DEBUG5, "Connection state: Disconnected");
now = GetCurrentTimestamp();
us_since_last_attempt = (int64) (now - shard->last_reconnect_time);
shard->last_reconnect_time = now;
/*
* If we did other tasks between reconnect attempts, then we won't
* need to wait as long as a full delay.
*/
if (us_since_last_attempt < shard->delay_us)
{
pg_usleep(shard->delay_us - us_since_last_attempt);
}
/* update the delay metric */
shard->delay_us = Min(shard->delay_us * 2, MAX_RECONNECT_INTERVAL_USEC);
/*
* Connect using the connection string we got from the
* neon.pageserver_connstring GUC. If the NEON_AUTH_TOKEN environment
* variable was set, use that as the password.
*
* The connection options are parsed in the order they're given, so when
* we set the password before the connection string, the connection string
* can override the password from the env variable. Seems useful, although
* we don't currently use that capability anywhere.
*/
keywords[0] = "dbname";
values[0] = connstr;
n_pgsql_params = 1;
if (neon_auth_token)
{
keywords[1] = "password";
values[1] = neon_auth_token;
n_pgsql_params++;
}
keywords[n_pgsql_params] = NULL;
values[n_pgsql_params] = NULL;
shard->conn = PQconnectStartParams(keywords, values, 1);
if (!shard->conn)
{
neon_shard_log(shard_no, elevel, "Failed to connect to pageserver: out of memory");
return false;
}
shard->wes_read = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(shard->wes_read, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
AddWaitEventToSet(shard->wes_read, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
AddWaitEventToSet(shard->wes_read, WL_SOCKET_READABLE, PQsocket(shard->conn), NULL, NULL);
shard->wes_write = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(shard->wes_write, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
AddWaitEventToSet(shard->wes_write, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
AddWaitEventToSet(shard->wes_write, WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE,
PQsocket(shard->conn),
NULL, NULL);
shard->state = PS_Connecting_Startup;
/* fallthrough */
}
keywords[n] = "dbname";
values[n] = connstr;
n++;
keywords[n] = NULL;
values[n] = NULL;
n++;
conn = PQconnectdbParams(keywords, values, 1);
last_connect_time = GetCurrentTimestamp();
if (PQstatus(conn) == CONNECTION_BAD)
case PS_Connecting_Startup:
{
char *msg = pchomp(PQerrorMessage(conn));
char *pagestream_query;
int ps_send_query_ret;
bool connected = false;
PQfinish(conn);
neon_shard_log(shard_no, DEBUG5, "Connection state: Connecting_Startup");
ereport(elevel,
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
errmsg(NEON_TAG "[shard %d] could not establish connection to pageserver", shard_no),
errdetail_internal("%s", msg)));
pfree(msg);
return false;
}
switch (neon_protocol_version)
{
do
{
WaitEvent event;
int poll_result = PQconnectPoll(shard->conn);
switch (poll_result)
{
default: /* unknown/unused states are handled as a failed connection */
case PGRES_POLLING_FAILED:
{
char *pqerr = PQerrorMessage(shard->conn);
char *msg = NULL;
neon_shard_log(shard_no, DEBUG5, "POLLING_FAILED");
if (pqerr)
msg = pchomp(pqerr);
CLEANUP_AND_DISCONNECT(shard);
if (msg)
{
neon_shard_log(shard_no, elevel,
"could not connect to pageserver: %s",
msg);
pfree(msg);
}
else
neon_shard_log(shard_no, elevel,
"could not connect to pageserver");
return false;
}
case PGRES_POLLING_READING:
/* Sleep until there's something to do */
(void) WaitEventSetWait(shard->wes_read, -1L, &event, 1,
PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
/* query cancellation, backend shutdown */
CHECK_FOR_INTERRUPTS();
/* PQconnectPoll() handles the socket polling state updates */
break;
case PGRES_POLLING_WRITING:
/* Sleep until there's something to do */
(void) WaitEventSetWait(shard->wes_write, -1L, &event, 1,
PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
/* query cancellation, backend shutdown */
CHECK_FOR_INTERRUPTS();
/* PQconnectPoll() handles the socket polling state updates */
break;
case PGRES_POLLING_OK:
neon_shard_log(shard_no, DEBUG5, "POLLING_OK");
connected = true;
break;
}
}
while (!connected);
/* No more polling needed; connection succeeded */
shard->last_connect_time = GetCurrentTimestamp();
switch (neon_protocol_version)
{
case 2:
query = psprintf("pagestream_v2 %s %s", neon_tenant, neon_timeline);
pagestream_query = psprintf("pagestream_v2 %s %s", neon_tenant, neon_timeline);
break;
case 1:
query = psprintf("pagestream %s %s", neon_tenant, neon_timeline);
pagestream_query = psprintf("pagestream %s %s", neon_tenant, neon_timeline);
break;
default:
elog(ERROR, "unexpected neon_protocol_version %d", neon_protocol_version);
}
ret = PQsendQuery(conn, query);
pfree(query);
if (ret != 1)
{
PQfinish(conn);
neon_shard_log(shard_no, elevel, "could not send pagestream command to pageserver");
return false;
}
}
wes = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
AddWaitEventToSet(wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
AddWaitEventToSet(wes, WL_SOCKET_READABLE, PQsocket(conn), NULL, NULL);
if (PQstatus(shard->conn) == CONNECTION_BAD)
{
char *msg = pchomp(PQerrorMessage(shard->conn));
PG_TRY();
CLEANUP_AND_DISCONNECT(shard);
ereport(elevel,
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
errmsg(NEON_TAG "[shard %d] could not establish connection to pageserver", shard_no),
errdetail_internal("%s", msg)));
pfree(msg);
return false;
}
ps_send_query_ret = PQsendQuery(shard->conn, pagestream_query);
pfree(pagestream_query);
if (ps_send_query_ret != 1)
{
CLEANUP_AND_DISCONNECT(shard);
neon_shard_log(shard_no, elevel, "could not send pagestream command to pageserver");
return false;
}
shard->state = PS_Connecting_PageStream;
/* fallthrough */
}
case PS_Connecting_PageStream:
{
while (PQisBusy(conn))
neon_shard_log(shard_no, DEBUG5, "Connection state: Connecting_PageStream");
if (PQstatus(shard->conn) == CONNECTION_BAD)
{
char *msg = pchomp(PQerrorMessage(shard->conn));
CLEANUP_AND_DISCONNECT(shard);
ereport(elevel,
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
errmsg(NEON_TAG "[shard %d] could not establish connection to pageserver", shard_no),
errdetail_internal("%s", msg)));
pfree(msg);
return false;
}
while (PQisBusy(shard->conn))
{
WaitEvent event;
/* Sleep until there's something to do */
(void) WaitEventSetWait(wes, -1L, &event, 1, PG_WAIT_EXTENSION);
(void) WaitEventSetWait(shard->wes_read, -1L, &event, 1, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
@@ -423,40 +598,37 @@ pageserver_connect(shardno_t shard_no, int elevel)
/* Data available in socket? */
if (event.events & WL_SOCKET_READABLE)
{
if (!PQconsumeInput(conn))
if (!PQconsumeInput(shard->conn))
{
char *msg = pchomp(PQerrorMessage(conn));
PQfinish(conn);
FreeWaitEventSet(wes);
char *msg = pchomp(PQerrorMessage(shard->conn));
CLEANUP_AND_DISCONNECT(shard);
neon_shard_log(shard_no, elevel, "could not complete handshake with pageserver: %s",
msg);
/* Returning from inside PG_TRY is bad, so we break/return later */
broke_from_loop = true;
break;
pfree(msg);
return false;
}
}
}
}
PG_CATCH();
{
PQfinish(conn);
FreeWaitEventSet(wes);
PG_RE_THROW();
}
PG_END_TRY();
if (broke_from_loop)
{
return false;
shard->state = PS_Connected;
/* fallthrough */
}
case PS_Connected:
/*
* We successfully connected. Future connections to this PageServer
* will do fast retries again, with exponential backoff.
*/
shard->delay_us = MIN_RECONNECT_INTERVAL_USEC;
neon_shard_log(shard_no, LOG, "libpagestore: connected to '%s' with protocol version %d", connstr, neon_protocol_version);
page_servers[shard_no].conn = conn;
page_servers[shard_no].wes = wes;
return true;
neon_shard_log(shard_no, DEBUG5, "Connection state: Connected");
neon_shard_log(shard_no, LOG, "libpagestore: connected to '%s' with protocol version %d", connstr, neon_protocol_version);
return true;
default:
neon_shard_log(shard_no, ERROR, "libpagestore: invalid connection state %d", shard->state);
}
/* This shouldn't be hit */
Assert(false);
}
/*
@@ -476,7 +648,7 @@ retry:
WaitEvent event;
/* Sleep until there's something to do */
(void) WaitEventSetWait(page_servers[shard_no].wes, -1L, &event, 1, PG_WAIT_EXTENSION);
(void) WaitEventSetWait(page_servers[shard_no].wes_read, -1L, &event, 1, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
@@ -502,7 +674,8 @@ retry:
/*
* Reset prefetch and drop connection to the shard.
* It also drops connection to all other shards involved in prefetch.
* It also drops connection to all other shards involved in prefetch, through
* prefetch_on_ps_disconnect().
*/
static void
pageserver_disconnect(shardno_t shard_no)
@@ -512,9 +685,6 @@ pageserver_disconnect(shardno_t shard_no)
* whole prefetch queue, even for other pageservers. It should not
* cause big problems, because connection loss is supposed to be a
* rare event.
*
* Prefetch state should be reset even if page_servers[shard_no].conn == NULL,
* because prefetch request may be registered before connection is established.
*/
prefetch_on_ps_disconnect();
@@ -527,37 +697,36 @@ pageserver_disconnect(shardno_t shard_no)
static void
pageserver_disconnect_shard(shardno_t shard_no)
{
PageServer *shard = &page_servers[shard_no];
/*
* If anything goes wrong while we were sending a request, it's not clear
* what state the connection is in. For example, if we sent the request
* but didn't receive a response yet, we might receive the response some
* time later after we have already sent a new unrelated request. Close
* the connection to avoid getting confused.
* Similarly, even when we're in PS_DISCONNECTED, we may have junk to
* clean up: It is possible that we encountered an error allocating any
* of the wait event sets or the psql connection, or failed when we tried
* to attach wait events to the WaitEventSets.
*/
if (page_servers[shard_no].conn)
{
neon_shard_log(shard_no, LOG, "dropping connection to page server due to error");
PQfinish(page_servers[shard_no].conn);
page_servers[shard_no].conn = NULL;
}
if (page_servers[shard_no].wes != NULL)
{
FreeWaitEventSet(page_servers[shard_no].wes);
page_servers[shard_no].wes = NULL;
}
CLEANUP_AND_DISCONNECT(shard);
shard->state = PS_Disconnected;
}
static bool
pageserver_send(shardno_t shard_no, NeonRequest *request)
{
StringInfoData req_buff;
PGconn *pageserver_conn = page_servers[shard_no].conn;
PageServer *shard = &page_servers[shard_no];
PGconn *pageserver_conn;
/* If the connection was lost for some reason, reconnect */
if (pageserver_conn && PQstatus(pageserver_conn) == CONNECTION_BAD)
if (shard->state == PS_Connected && PQstatus(shard->conn) == CONNECTION_BAD)
{
neon_shard_log(shard_no, LOG, "pageserver_send disconnect bad connection");
pageserver_disconnect(shard_no);
pageserver_conn = NULL;
}
req_buff = nm_pack_request(request);
@@ -571,17 +740,19 @@ pageserver_send(shardno_t shard_no, NeonRequest *request)
* https://github.com/neondatabase/neon/issues/1138 So try to reestablish
* connection in case of failure.
*/
if (!page_servers[shard_no].conn)
if (shard->state != PS_Connected)
{
while (!pageserver_connect(shard_no, n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
while (!pageserver_connect(shard_no, shard->n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
{
HandleMainLoopInterrupts();
n_reconnect_attempts += 1;
shard->n_reconnect_attempts += 1;
}
n_reconnect_attempts = 0;
shard->n_reconnect_attempts = 0;
} else {
Assert(shard->conn != NULL);
}
pageserver_conn = page_servers[shard_no].conn;
pageserver_conn = shard->conn;
/*
* Send request.
@@ -590,13 +761,17 @@ pageserver_send(shardno_t shard_no, NeonRequest *request)
* should use async mode and check for interrupts while waiting. In
* practice, our requests are small enough to always fit in the output and
* TCP buffer.
*
* Note that this also will fail when the connection is in the
* PGRES_POLLING_WRITING state. It's kinda dirty to disconnect at this
* point, but on the grand scheme of things it's only a small issue.
*/
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0)
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
pageserver_disconnect(shard_no);
neon_shard_log(shard_no, LOG, "pageserver_send disconnect because failed to send page request (try to reconnect): %s", msg);
neon_shard_log(shard_no, LOG, "pageserver_send disconnected: failed to send page request (try to reconnect): %s", msg);
pfree(msg);
pfree(req_buff.data);
return false;
@@ -611,6 +786,7 @@ pageserver_send(shardno_t shard_no, NeonRequest *request)
neon_shard_log(shard_no, PageStoreTrace, "sent request: %s", msg);
pfree(msg);
}
return true;
}
@@ -619,58 +795,68 @@ pageserver_receive(shardno_t shard_no)
{
StringInfoData resp_buff;
NeonResponse *resp;
PGconn *pageserver_conn = page_servers[shard_no].conn;
PageServer *shard = &page_servers[shard_no];
PGconn *pageserver_conn = shard->conn;
/* read response */
int rc;
if (!pageserver_conn)
return NULL;
PG_TRY();
if (shard->state != PS_Connected)
{
/* read response */
int rc;
neon_shard_log(shard_no, LOG,
"pageserver_receive: returning NULL for non-connected pageserver connection: 0x%02x",
shard->state);
return NULL;
}
rc = call_PQgetCopyData(shard_no, &resp_buff.data);
if (rc >= 0)
Assert(pageserver_conn);
rc = call_PQgetCopyData(shard_no, &resp_buff.data);
if (rc >= 0)
{
/* call_PQgetCopyData handles rc == 0 */
Assert(rc > 0);
PG_TRY();
{
resp_buff.len = rc;
resp_buff.cursor = 0;
resp = nm_unpack_response(&resp_buff);
PQfreemem(resp_buff.data);
if (message_level_is_interesting(PageStoreTrace))
{
char *msg = nm_to_string((NeonMessage *) resp);
neon_shard_log(shard_no, PageStoreTrace, "got response: %s", msg);
pfree(msg);
}
}
else if (rc == -1)
PG_CATCH();
{
neon_shard_log(shard_no, LOG, "pageserver_receive disconnect because call_PQgetCopyData returns -1: %s", pchomp(PQerrorMessage(pageserver_conn)));
neon_shard_log(shard_no, LOG, "pageserver_receive: disconnect due malformatted response");
pageserver_disconnect(shard_no);
resp = NULL;
PG_RE_THROW();
}
else if (rc == -2)
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
PG_END_TRY();
pageserver_disconnect(shard_no);
neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect because could not read COPY data: %s", msg);
}
else
if (message_level_is_interesting(PageStoreTrace))
{
pageserver_disconnect(shard_no);
neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect because unexpected PQgetCopyData return value: %d", rc);
char *msg = nm_to_string((NeonMessage *) resp);
neon_shard_log(shard_no, PageStoreTrace, "got response: %s", msg);
pfree(msg);
}
}
PG_CATCH();
else if (rc == -1)
{
neon_shard_log(shard_no, LOG, "pageserver_receive disconnect due to caught exception");
neon_shard_log(shard_no, LOG, "pageserver_receive disconnect: psql end of copy data: %s", pchomp(PQerrorMessage(pageserver_conn)));
pageserver_disconnect(shard_no);
PG_RE_THROW();
resp = NULL;
}
else if (rc == -2)
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
pageserver_disconnect(shard_no);
neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect: could not read COPY data: %s", msg);
}
else
{
pageserver_disconnect(shard_no);
neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect: unexpected PQgetCopyData return value: %d", rc);
}
PG_END_TRY();
return (NeonResponse *) resp;
}
@@ -681,7 +867,7 @@ pageserver_flush(shardno_t shard_no)
{
PGconn *pageserver_conn = page_servers[shard_no].conn;
if (!pageserver_conn)
if (page_servers[shard_no].state != PS_Connected)
{
neon_shard_log(shard_no, WARNING, "Tried to flush while disconnected");
}
@@ -697,6 +883,7 @@ pageserver_flush(shardno_t shard_no)
return false;
}
}
return true;
}
@@ -891,5 +1078,7 @@ pg_init_libpagestore(void)
dbsize_hook = neon_dbsize;
}
memset(page_servers, 0, sizeof(page_servers));
lfc_init();
}

View File

@@ -94,6 +94,10 @@ static char *hexdump_page(char *page);
const int SmgrTrace = DEBUG5;
#define NEON_PANIC_CONNECTION_STATE(shard_no, elvl, message, ...) \
neon_shard_log(shard_no, elvl, "Broken connection state: " message, \
##__VA_ARGS__)
page_server_api *page_server;
/* unlogged relation build states */
@@ -526,6 +530,8 @@ prefetch_flush_requests(void)
*
* NOTE: this function may indirectly update MyPState->pfs_hash; which
* invalidates any active pointers into the hash table.
* NOTE: callers should make sure they can handle query cancellations in this
* function's call path.
*/
static bool
prefetch_wait_for(uint64 ring_index)
@@ -561,6 +567,8 @@ prefetch_wait_for(uint64 ring_index)
*
* NOTE: this function may indirectly update MyPState->pfs_hash; which
* invalidates any active pointers into the hash table.
*
* NOTE: this does IO, and can get canceled out-of-line.
*/
static bool
prefetch_read(PrefetchRequest *slot)
@@ -572,6 +580,14 @@ prefetch_read(PrefetchRequest *slot)
Assert(slot->response == NULL);
Assert(slot->my_ring_index == MyPState->ring_receive);
if (slot->status != PRFS_REQUESTED ||
slot->response != NULL ||
slot->my_ring_index != MyPState->ring_receive)
neon_shard_log(slot->shard_no, ERROR,
"Incorrect prefetch read: status=%d response=%llx my=%llu receive=%llu",
slot->status, (size_t) (void *) slot->response,
slot->my_ring_index, MyPState->ring_receive);
old = MemoryContextSwitchTo(MyPState->errctx);
response = (NeonResponse *) page_server->receive(slot->shard_no);
MemoryContextSwitchTo(old);
@@ -589,6 +605,11 @@ prefetch_read(PrefetchRequest *slot)
}
else
{
neon_shard_log(slot->shard_no, WARNING,
"No response from reading prefetch entry %llu: %u/%u/%u.%u block %u. This can be caused by a concurrent disconnect",
slot->my_ring_index,
RelFileInfoFmt(BufTagGetNRelFileInfo(slot->buftag)),
slot->buftag.forkNum, slot->buftag.blockNum);
return false;
}
}
@@ -603,6 +624,7 @@ void
prefetch_on_ps_disconnect(void)
{
MyPState->ring_flush = MyPState->ring_unused;
while (MyPState->ring_receive < MyPState->ring_unused)
{
PrefetchRequest *slot;
@@ -625,6 +647,7 @@ prefetch_on_ps_disconnect(void)
slot->status = PRFS_TAG_REMAINS;
MyPState->n_requests_inflight -= 1;
MyPState->ring_receive += 1;
prefetch_set_unused(ring_index);
}
}
@@ -691,6 +714,8 @@ static void
prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns)
{
bool found;
uint64 mySlotNo = slot->my_ring_index;
NeonGetPageRequest request = {
.req.tag = T_NeonGetPageRequest,
/* lsn and not_modified_since are filled in below */
@@ -699,6 +724,8 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
.blkno = slot->buftag.blockNum,
};
Assert(mySlotNo == MyPState->ring_unused);
if (force_request_lsns)
slot->request_lsns = *force_request_lsns;
else
@@ -711,7 +738,11 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
Assert(slot->response == NULL);
Assert(slot->my_ring_index == MyPState->ring_unused);
while (!page_server->send(slot->shard_no, (NeonRequest *) &request));
while (!page_server->send(slot->shard_no, (NeonRequest *) &request))
{
Assert(mySlotNo == MyPState->ring_unused);
/* loop */
}
/* update prefetch state */
MyPState->n_requests_inflight += 1;
@@ -722,7 +753,6 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
/* update slot state */
slot->status = PRFS_REQUESTED;
prfh_insert(MyPState->prf_hash, slot, &found);
Assert(!found);
}
@@ -894,6 +924,10 @@ Retry:
return ring_index;
}
/*
* Note: this function can get canceled and use a long jump to the next catch
* context. Take care.
*/
static NeonResponse *
page_server_request(void const *req)
{
@@ -925,19 +959,38 @@ page_server_request(void const *req)
* Current sharding model assumes that all metadata is present only at shard 0.
* We still need to call get_shard_no() to check if shard map is up-to-date.
*/
if (((NeonRequest *) req)->tag != T_NeonGetPageRequest || ((NeonGetPageRequest *) req)->forknum != MAIN_FORKNUM)
if (((NeonRequest *) req)->tag != T_NeonGetPageRequest ||
((NeonGetPageRequest *) req)->forknum != MAIN_FORKNUM)
{
shard_no = 0;
}
do
{
while (!page_server->send(shard_no, (NeonRequest *) req) || !page_server->flush(shard_no));
consume_prefetch_responses();
resp = page_server->receive(shard_no);
} while (resp == NULL);
return resp;
PG_TRY();
{
while (!page_server->send(shard_no, (NeonRequest *) req)
|| !page_server->flush(shard_no))
{
/* do nothing */
}
consume_prefetch_responses();
resp = page_server->receive(shard_no);
}
PG_CATCH();
{
/*
* Cancellation in this code needs to be handled better at some
* point, but this currently seems fine for now.
*/
page_server->disconnect(shard_no);
PG_RE_THROW();
}
PG_END_TRY();
} while (resp == NULL);
return resp;
}
@@ -1905,7 +1958,9 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
break;
default:
neon_log(ERROR, "unexpected response from page server with tag 0x%02x in neon_exists", resp->tag);
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
"Expected Exists (0x%02x) or Error (0x%02x) response to ExistsRequest, but got 0x%02x",
T_NeonExistsResponse, T_NeonErrorResponse, resp->tag);
}
pfree(resp);
return exists;
@@ -2357,7 +2412,7 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
/*
* Try to find prefetched page in the list of received pages.
*/
Retry:
Retry:
entry = prfh_lookup(MyPState->prf_hash, (PrefetchRequest *) &buftag);
if (entry != NULL)
@@ -2443,7 +2498,9 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
((NeonErrorResponse *) resp)->message)));
break;
default:
neon_log(ERROR, "unexpected response from page server with tag 0x%02x in neon_read_at_lsn", resp->tag);
NEON_PANIC_CONNECTION_STATE(slot->shard_no, PANIC,
"Expected GetPage (0x%02x) or Error (0x%02x) response to GetPageRequest, but got 0x%02x",
T_NeonGetPageResponse, T_NeonErrorResponse, resp->tag);
}
/* buffer was used, clean up for later reuse */
@@ -2714,7 +2771,9 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
break;
default:
neon_log(ERROR, "unexpected response from page server with tag 0x%02x in neon_nblocks", resp->tag);
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
"Expected Nblocks (0x%02x) or Error (0x%02x) response to NblocksRequest, but got 0x%02x",
T_NeonNblocksResponse, T_NeonErrorResponse, resp->tag);
}
update_cached_relsize(InfoFromSMgrRel(reln), forknum, n_blocks);
@@ -2767,7 +2826,9 @@ neon_dbsize(Oid dbNode)
break;
default:
neon_log(ERROR, "unexpected response from page server with tag 0x%02x in neon_dbsize", resp->tag);
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
"Expected DbSize (0x%02x) or Error (0x%02x) response to DbSizeRequest, but got 0x%02x",
T_NeonDbSizeResponse, T_NeonErrorResponse, resp->tag);
}
neon_log(SmgrTrace, "neon_dbsize: db %u (request LSN %X/%08X): %ld bytes",
@@ -3106,7 +3167,9 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf
break;
default:
neon_log(ERROR, "unexpected response from page server with tag 0x%02x in neon_read_slru_segment", resp->tag);
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
"Expected GetSlruSegment (0x%02x) or Error (0x%02x) response to GetSlruSegmentRequest, but got 0x%02x",
T_NeonGetSlruSegmentResponse, T_NeonErrorResponse, resp->tag);
}
pfree(resp);

View File

@@ -82,6 +82,7 @@ thiserror.workspace = true
tikv-jemallocator.workspace = true
tikv-jemalloc-ctl = { workspace = true, features = ["use_std"] }
tokio-postgres.workspace = true
tokio-postgres-rustls.workspace = true
tokio-rustls.workspace = true
tokio-util.workspace = true
tokio = { workspace = true, features = ["signal"] }
@@ -96,8 +97,6 @@ utils.workspace = true
uuid.workspace = true
webpki-roots.workspace = true
x509-parser.workspace = true
native-tls.workspace = true
postgres-native-tls.workspace = true
postgres-protocol.workspace = true
redis.workspace = true

View File

@@ -11,10 +11,12 @@ use crate::{
use futures::{FutureExt, TryFutureExt};
use itertools::Itertools;
use pq_proto::StartupMessageParams;
use std::{io, net::SocketAddr, time::Duration};
use rustls::{client::danger::ServerCertVerifier, pki_types::InvalidDnsNameError};
use std::{io, net::SocketAddr, sync::Arc, time::Duration};
use thiserror::Error;
use tokio::net::TcpStream;
use tokio_postgres::tls::MakeTlsConnect;
use tokio_postgres_rustls::MakeRustlsConnect;
use tracing::{error, info, warn};
const COULD_NOT_CONNECT: &str = "Couldn't connect to compute node";
@@ -30,7 +32,7 @@ pub enum ConnectionError {
CouldNotConnect(#[from] io::Error),
#[error("{COULD_NOT_CONNECT}: {0}")]
TlsError(#[from] native_tls::Error),
TlsError(#[from] InvalidDnsNameError),
#[error("{COULD_NOT_CONNECT}: {0}")]
WakeComputeError(#[from] WakeComputeError),
@@ -257,7 +259,7 @@ pub struct PostgresConnection {
/// Socket connected to a compute node.
pub stream: tokio_postgres::maybe_tls_stream::MaybeTlsStream<
tokio::net::TcpStream,
postgres_native_tls::TlsStream<tokio::net::TcpStream>,
tokio_postgres_rustls::RustlsStream<tokio::net::TcpStream>,
>,
/// PostgreSQL connection parameters.
pub params: std::collections::HashMap<String, String>,
@@ -282,12 +284,24 @@ impl ConnCfg {
let (socket_addr, stream, host) = self.connect_raw(timeout).await?;
drop(pause);
let tls_connector = native_tls::TlsConnector::builder()
.danger_accept_invalid_certs(allow_self_signed_compute)
.build()
.unwrap();
let mut mk_tls = postgres_native_tls::MakeTlsConnector::new(tls_connector);
let tls = MakeTlsConnect::<tokio::net::TcpStream>::make_tls_connect(&mut mk_tls, host)?;
let client_config = if allow_self_signed_compute {
let verifier = Arc::new(AcceptEverythingVerifier) as Arc<dyn ServerCertVerifier>;
rustls::ClientConfig::builder()
.dangerous()
.with_custom_certificate_verifier(verifier)
} else {
let root_store = rustls::RootCertStore {
roots: webpki_roots::TLS_SERVER_ROOTS.to_vec(),
};
rustls::ClientConfig::builder().with_root_certificates(root_store)
};
let client_config = client_config.with_no_client_auth();
let mut mk_tls = tokio_postgres_rustls::MakeRustlsConnect::new(client_config);
let tls = <MakeRustlsConnect as MakeTlsConnect<tokio::net::TcpStream>>::make_tls_connect(
&mut mk_tls,
host,
)?;
// connect_raw() will not use TLS if sslmode is "disable"
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Compute);
@@ -340,6 +354,50 @@ fn filtered_options(params: &StartupMessageParams) -> Option<String> {
Some(options)
}
#[derive(Debug)]
struct AcceptEverythingVerifier;
impl ServerCertVerifier for AcceptEverythingVerifier {
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
use rustls::SignatureScheme::*;
// The schemes for which `SignatureScheme::supported_in_tls13` returns true.
vec![
ECDSA_NISTP521_SHA512,
ECDSA_NISTP384_SHA384,
ECDSA_NISTP256_SHA256,
RSA_PSS_SHA512,
RSA_PSS_SHA384,
RSA_PSS_SHA256,
ED25519,
]
}
fn verify_server_cert(
&self,
_end_entity: &rustls::pki_types::CertificateDer<'_>,
_intermediates: &[rustls::pki_types::CertificateDer<'_>],
_server_name: &rustls::pki_types::ServerName<'_>,
_ocsp_response: &[u8],
_now: rustls::pki_types::UnixTime,
) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
Ok(rustls::client::danger::ServerCertVerified::assertion())
}
fn verify_tls12_signature(
&self,
_message: &[u8],
_cert: &rustls::pki_types::CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
}
fn verify_tls13_signature(
&self,
_message: &[u8],
_cert: &rustls::pki_types::CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -22,8 +22,7 @@ serde_with.workspace = true
workspace_hack.workspace = true
utils.workspace = true
async-stream.workspace = true
native-tls.workspace = true
postgres-native-tls.workspace = true
tokio-postgres-rustls.workspace = true
postgres_ffi.workspace = true
tokio-stream.workspace = true
tokio-postgres.workspace = true
@@ -31,6 +30,8 @@ tokio-util = { workspace = true }
futures-util.workspace = true
itertools.workspace = true
camino.workspace = true
rustls.workspace = true
webpki-roots.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
chrono = { workspace = true, default-features = false, features = ["clock", "serde"] }

View File

@@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet};
use anyhow::Context;
use aws_sdk_s3::{types::ObjectIdentifier, Client};
use pageserver::tenant::remote_timeline_client::index::IndexLayerMetadata;
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
use pageserver_api::shard::ShardIndex;
use tracing::{error, info, warn};
use utils::generation::Generation;
@@ -208,7 +208,7 @@ impl TenantObjectListing {
&mut self,
timeline_id: TimelineId,
layer_file: &LayerName,
metadata: &IndexLayerMetadata,
metadata: &LayerFileMetadata,
) -> bool {
let Some(shard_tl) = self.shard_timelines.get_mut(&(metadata.shard, timeline_id)) else {
return false;

View File

@@ -71,8 +71,13 @@ pub async fn scan_safekeeper_metadata(
bucket_config.bucket, bucket_config.region, dump_db_table
);
// Use the native TLS implementation (Neon requires TLS)
let tls_connector =
postgres_native_tls::MakeTlsConnector::new(native_tls::TlsConnector::new().unwrap());
let root_store = rustls::RootCertStore {
roots: webpki_roots::TLS_SERVER_ROOTS.to_vec(),
};
let client_config = rustls::ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth();
let tls_connector = tokio_postgres_rustls::MakeRustlsConnect::new(client_config);
let (client, connection) = tokio_postgres::connect(&dump_db_connstr, tls_connector).await?;
// The connection object performs the actual communication with the database,
// so spawn it off to run on its own.

View File

@@ -11,7 +11,7 @@ use async_stream::stream;
use aws_sdk_s3::Client;
use camino::Utf8PathBuf;
use futures::{StreamExt, TryStreamExt};
use pageserver::tenant::remote_timeline_client::index::IndexLayerMetadata;
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
use pageserver::tenant::storage_layer::LayerName;
use pageserver::tenant::IndexPart;
use pageserver_api::shard::TenantShardId;
@@ -49,8 +49,8 @@ impl SnapshotDownloader {
&self,
ttid: TenantShardTimelineId,
layer_name: LayerName,
layer_metadata: IndexLayerMetadata,
) -> anyhow::Result<(LayerName, IndexLayerMetadata)> {
layer_metadata: LayerFileMetadata,
) -> anyhow::Result<(LayerName, LayerFileMetadata)> {
// Note this is local as in a local copy of S3 data, not local as in the pageserver's local format. They use
// different layer names (remote-style has the generation suffix)
let local_path = self.output_path.join(format!(
@@ -110,7 +110,7 @@ impl SnapshotDownloader {
async fn download_layers(
&self,
ttid: TenantShardTimelineId,
layers: Vec<(LayerName, IndexLayerMetadata)>,
layers: Vec<(LayerName, LayerFileMetadata)>,
) -> anyhow::Result<()> {
let layer_count = layers.len();
tracing::info!("Downloading {} layers for timeline {ttid}...", layer_count);
@@ -161,10 +161,7 @@ impl SnapshotDownloader {
ttid: TenantShardTimelineId,
index_part: Box<IndexPart>,
index_part_generation: Generation,
ancestor_layers: &mut HashMap<
TenantShardTimelineId,
HashMap<LayerName, IndexLayerMetadata>,
>,
ancestor_layers: &mut HashMap<TenantShardTimelineId, HashMap<LayerName, LayerFileMetadata>>,
) -> anyhow::Result<()> {
let index_bytes = serde_json::to_string(&index_part).unwrap();
@@ -234,7 +231,7 @@ impl SnapshotDownloader {
// happen if this tenant has been split at some point)
let mut ancestor_layers: HashMap<
TenantShardTimelineId,
HashMap<LayerName, IndexLayerMetadata>,
HashMap<LayerName, LayerFileMetadata>,
> = Default::default();
for shard in shards.into_iter().filter(|s| s.shard_count == shard_count) {

View File

@@ -36,7 +36,7 @@ postgres-protocol.workspace = true
rand.workspace = true
regex.workspace = true
scopeguard.workspace = true
reqwest = { workspace = true, features = ["json"] }
reqwest = { workspace = true, features = ["rustls-tls", "json"] }
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true

View File

@@ -31,7 +31,7 @@ once_cell.workspace = true
pageserver_api.workspace = true
pageserver_client.workspace = true
postgres_connection.workspace = true
reqwest = { workspace = true, features = ["stream"] }
reqwest = { workspace = true, features = ["rustls-tls", "stream"] }
routerify.workspace = true
serde.workspace = true
serde_json.workspace = true

View File

@@ -2667,7 +2667,9 @@ class NeonPageserver(PgProtocol, LogUtils):
tenant_id, generation=self.env.storage_controller.attach_hook_issue(tenant_id, self.id)
)
def list_layers(self, tenant_id: TenantId, timeline_id: TimelineId) -> list[Path]:
def list_layers(
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId
) -> list[Path]:
"""
Inspect local storage on a pageserver to discover which layer files are present.

View File

@@ -0,0 +1,282 @@
from contextlib import closing
from typing import Set
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonPageserver
from fixtures.pageserver.http import PageserverHttpClient
from psycopg2.errors import QueryCanceled
CRITICAL_PG_PS_WAIT_FAILPOINTS: Set[str] = {
"ps::connection-start::pre-login",
"ps::connection-start::startup-packet",
"ps::connection-start::process-query",
"ps::handle-pagerequest-message::exists",
"ps::handle-pagerequest-message::nblocks",
"ps::handle-pagerequest-message::getpage",
"ps::handle-pagerequest-message::dbsize",
# We don't yet have a good way to on-demand guarantee the download of an
# SLRU segment, so that's disabled for now.
# "ps::handle-pagerequest-message::slrusegment",
}
PG_PS_START_FAILPOINTS = {
"ps::connection-start::pre-login",
"ps::connection-start::startup-packet",
"ps::connection-start::process-query",
}
SMGR_EXISTS = "ps::handle-pagerequest-message::exists"
SMGR_NBLOCKS = "ps::handle-pagerequest-message::nblocks"
SMGR_GETPAGE = "ps::handle-pagerequest-message::getpage"
SMGR_DBSIZE = "ps::handle-pagerequest-message::dbsize"
"""
Test that we can handle connection delays and cancellations at various
unfortunate connection startup and request states.
"""
def test_cancellations(neon_simple_env: NeonEnv):
env = neon_simple_env
ps = env.pageserver
ps_http = ps.http_client()
ps_http.is_testing_enabled_or_skip()
env.neon_cli.create_branch("test_config", "empty")
# We don't want to have any racy behaviour with autovacuum IOs
ep = env.endpoints.create_start(
"test_config",
config_lines=[
"autovacuum = off",
"shared_buffers = 128MB",
],
)
with closing(ep.connect()) as conn:
with conn.cursor() as cur:
cur.execute(
"""
CREATE TABLE test1 AS
SELECT id, sha256(id::text::bytea) payload
FROM generate_series(1, 1024::bigint) p(id);
"""
)
cur.execute(
"""
CREATE TABLE test2 AS
SELECT id, sha256(id::text::bytea) payload
FROM generate_series(1025, 2048::bigint) p(id);
"""
)
cur.execute(
"""
VACUUM (ANALYZE, FREEZE) test1, test2;
"""
)
cur.execute(
"""
CREATE EXTENSION pg_buffercache;
"""
)
cur.execute(
"""
CREATE EXTENSION pg_prewarm;
"""
)
# data preparation is now complete, with 2 disjoint tables that aren't
# preloaded into any caches.
ep.stop()
for failpoint in CRITICAL_PG_PS_WAIT_FAILPOINTS:
connect_works_correctly(failpoint, ep, ps, ps_http)
ENABLED_FAILPOINTS: Set[str] = set()
def connect_works_correctly(
failpoint: str, ep: Endpoint, ps: NeonPageserver, ps_http: PageserverHttpClient
):
log.debug("Starting work on %s", failpoint)
# All queries we use should finish (incl. IO) within 500ms,
# including all their IO.
# This allows us to use `SET statement_timeout` to let the query
# timeout system cancel queries, rather than us having to go
# through the most annoying effort of manual query cancellation
# in psycopg2.
options = "-cstatement_timeout=500ms -ceffective_io_concurrency=1"
ep.start()
def fp_enable():
global ENABLED_FAILPOINTS
ps_http.configure_failpoints(
[
(failpoint, "pause"),
]
)
ENABLED_FAILPOINTS = ENABLED_FAILPOINTS | {failpoint}
log.info(
'Enabled failpoint "%s", current_active=%s', failpoint, ENABLED_FAILPOINTS, stacklevel=2
)
def fp_disable():
global ENABLED_FAILPOINTS
ps_http.configure_failpoints(
[
(failpoint, "off"),
]
)
ENABLED_FAILPOINTS = ENABLED_FAILPOINTS - {failpoint}
log.info(
'Disabled failpoint "%s", current_active=%s',
failpoint,
ENABLED_FAILPOINTS,
stacklevel=2,
)
def check_buffers(cur):
cur.execute(
"""
SELECT n.nspname AS nspname
, c.relname AS relname
, count(*) AS count
FROM pg_buffercache b
JOIN pg_class c
ON b.relfilenode = pg_relation_filenode(c.oid) AND
b.reldatabase = (SELECT oid FROM pg_database WHERE datname = current_database())
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.oid IN ('test1'::regclass::oid, 'test2'::regclass::oid)
GROUP BY n.nspname, c.relname
ORDER BY 3 DESC
LIMIT 10
"""
)
return cur.fetchone()
def exec_may_cancel(query, cursor, result, cancels):
if cancels:
with pytest.raises(QueryCanceled):
cursor.execute(query)
assert cursor.fetchone() == result
else:
cursor.execute(query)
assert cursor.fetchone() == result
fp_disable()
# Warm caches required for new connections, so that they can run without
# requiring catalog reads.
with closing(ep.connect()) as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT 1;
"""
)
assert cur.fetchone() == (1,)
assert check_buffers(cur) is None
# Ensure all caches required for connection start are correctly
# filled, so that we don't have any "accidents" in this test run
# caused by changes in connection startup plans that require
# requests to the PageServer.
cur.execute(
"""
select array_agg(distinct (pg_prewarm(c.oid::regclass, 'buffer') >= 0))
from pg_class c
where c.oid < 16384 AND c.relkind IN ('i', 'r');
"""
)
assert cur.fetchone() == ([True],)
# Enable failpoint
fp_enable()
with closing(ep.connect(options=options, autocommit=True)) as conn:
with conn.cursor() as cur:
cur.execute("SHOW statement_timeout;")
assert cur.fetchone() == ("500ms",)
assert check_buffers(cur) is None
exec_may_cancel(
"""
SELECT min(id) FROM test1;
""",
cur,
(1,),
failpoint in (CRITICAL_PG_PS_WAIT_FAILPOINTS - {SMGR_EXISTS, SMGR_DBSIZE}),
)
fp_disable()
with closing(ep.connect(options=options, autocommit=True)) as conn:
with conn.cursor() as cur:
# Do a select on the data, putting some buffers into the prefetch
# queue.
cur.execute(
"""
SELECT count(id) FROM (select * from test1 LIMIT 256) a;
"""
)
assert cur.fetchone() == (256,)
ps.stop()
ps.start()
fp_enable()
exec_may_cancel(
"""
SELECT COUNT(id) FROM test1;
""",
cur,
(1024,),
failpoint
in (CRITICAL_PG_PS_WAIT_FAILPOINTS - {SMGR_EXISTS, SMGR_NBLOCKS, SMGR_DBSIZE}),
)
with closing(ep.connect(options=options, autocommit=True)) as conn:
with conn.cursor() as cur:
exec_may_cancel(
"""
SELECT COUNT(id) FROM test2;
""",
cur,
(1024,),
failpoint in (CRITICAL_PG_PS_WAIT_FAILPOINTS - {SMGR_EXISTS, SMGR_DBSIZE}),
)
fp_disable()
fp_enable()
exec_may_cancel(
"""
SELECT 0 < pg_database_size(CURRENT_DATABASE());
""",
cur,
(True,),
failpoint
in (CRITICAL_PG_PS_WAIT_FAILPOINTS - {SMGR_EXISTS, SMGR_GETPAGE, SMGR_NBLOCKS}),
)
fp_disable()
cur.execute(
"""
SELECT count(id), count(distinct payload), min(id), max(id), sum(id) FROM test2;
"""
)
assert cur.fetchone() == (1024, 1024, 1025, 2048, 1573376)
cur.execute(
"""
SELECT count(id), count(distinct payload), min(id), max(id), sum(id) FROM test1;
"""
)
assert cur.fetchone() == (1024, 1024, 1, 1024, 524800)
ep.stop()

View File

@@ -177,7 +177,16 @@ def test_sharding_split_unsharded(
env.storage_controller.consistency_check()
def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize(
"failpoint",
[
None,
"compact-shard-ancestors-localonly",
"compact-shard-ancestors-enqueued",
"compact-shard-ancestors-persistent",
],
)
def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder, failpoint: Optional[str]):
"""
Test that after a split, we clean up parent layer data in the child shards via compaction.
"""
@@ -196,6 +205,11 @@ def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder):
"image_layer_creation_check_threshold": "0",
}
neon_env_builder.storage_controller_config = {
# Default neon_local uses a small timeout: use a longer one to tolerate longer pageserver restarts.
"max_unavailable": "300s"
}
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
@@ -213,6 +227,10 @@ def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder):
# Split one shard into two
shards = env.storage_controller.tenant_shard_split(tenant_id, shard_count=2)
# Let all shards move into their stable locations, so that during subsequent steps we
# don't have reconciles in progress (simpler to reason about what messages we expect in logs)
env.storage_controller.reconcile_until_idle()
# Check we got the shard IDs we expected
assert env.storage_controller.inspect(TenantShardId(tenant_id, 0, 2)) is not None
assert env.storage_controller.inspect(TenantShardId(tenant_id, 1, 2)) is not None
@@ -237,6 +255,90 @@ def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder):
# Compaction shouldn't make anything unreadable
workload.validate()
# Force a generation increase: layer rewrites are a long-term thing and only happen after
# the generation has increased.
env.pageserver.stop()
env.pageserver.start()
# Cleanup part 2: once layers are outside the PITR window, they will be rewritten if they are partially redundant
env.storage_controller.pageserver_api().set_tenant_config(tenant_id, {"pitr_interval": "0s"})
env.storage_controller.reconcile_until_idle()
for shard in shards:
ps = env.get_tenant_pageserver(shard)
# Apply failpoints for the layer-rewriting phase: this is the area of code that has sensitive behavior
# across restarts, as we will have local layer files that temporarily disagree with the remote metadata
# for the same local layer file name.
if failpoint is not None:
ps.http_client().configure_failpoints((failpoint, "exit"))
# Do a GC to update gc_info (compaction uses this to decide whether a layer is to be rewritten)
# Set gc_horizon=0 to let PITR horizon control GC cutoff exclusively.
ps.http_client().timeline_gc(shard, timeline_id, gc_horizon=0)
# We will compare stats before + after compaction
detail_before = ps.http_client().timeline_detail(shard, timeline_id)
# Invoke compaction: this should rewrite layers that are behind the pitr horizon
try:
ps.http_client().timeline_compact(shard, timeline_id)
except requests.ConnectionError as e:
if failpoint is None:
raise e
else:
log.info(f"Compaction failed (failpoint={failpoint}): {e}")
if failpoint in (
"compact-shard-ancestors-localonly",
"compact-shard-ancestors-enqueued",
):
# If we left local files that don't match remote metadata, we expect warnings on next startup
env.pageserver.allowed_errors.append(
".*removing local file .+ because it has unexpected length.*"
)
# Post-failpoint: we check that the pageserver comes back online happily.
env.pageserver.running = False
env.pageserver.start()
else:
assert failpoint is None # We shouldn't reach success path if a failpoint was set
detail_after = ps.http_client().timeline_detail(shard, timeline_id)
# Physical size should shrink because layers are smaller
assert detail_after["current_physical_size"] < detail_before["current_physical_size"]
# Validate size statistics
for shard in shards:
ps = env.get_tenant_pageserver(shard)
timeline_info = ps.http_client().timeline_detail(shard, timeline_id)
reported_size = timeline_info["current_physical_size"]
layer_paths = ps.list_layers(shard, timeline_id)
measured_size = 0
for p in layer_paths:
abs_path = ps.timeline_dir(shard, timeline_id) / p
measured_size += os.stat(abs_path).st_size
log.info(
f"shard {shard} reported size {reported_size}, measured size {measured_size} ({len(layer_paths)} layers)"
)
if failpoint in (
"compact-shard-ancestors-localonly",
"compact-shard-ancestors-enqueued",
):
# If we injected a failure between local rewrite and remote upload, then after
# restart we may end up with neither version of the file on local disk (the new file
# is cleaned up because it doesn't matchc remote metadata). So local size isn't
# necessarily going to match remote physical size.
continue
assert measured_size == reported_size
# Compaction shouldn't make anything unreadable
workload.validate()
def test_sharding_split_smoke(
neon_env_builder: NeonEnvBuilder,

View File

@@ -194,7 +194,7 @@ files:
- metric_name: pg_stats_userdb
type: gauge
help: 'Stats for the oldest non-system db'
help: 'Stats for several oldest non-system dbs'
key_labels:
- datname
value_label: kind
@@ -205,9 +205,8 @@ files:
- inserted
- updated
- deleted
# We export stats for only one non-system database. Without this limit
# We export stats for 10 non-system database. Without this limit
# it is too easy to abuse the system by creating lots of databases.
# We can try lifting this limit in the future after we understand the needs better.
query: |
select pg_database_size(datname) as db_size, deadlocks,
tup_inserted as inserted, tup_updated as updated, tup_deleted as deleted,
@@ -218,7 +217,7 @@ files:
from pg_database
where datname <> 'postgres' and not datistemplate
order by oid
limit 1
limit 10
);
- metric_name: max_cluster_size
@@ -320,7 +319,7 @@ files:
- metric_name: wal_is_lost
type: gauge
help: 'Whether or not the replication slot\'s wal_status is lost'
help: 'Whether or not the replication slot wal_status is lost'
key_labels:
- slot_name
values: [wal_status_is_lost]

View File

@@ -59,7 +59,7 @@ regex = { version = "1" }
regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] }
regex-syntax = { version = "0.8" }
reqwest-5ef9efb8ec2df382 = { package = "reqwest", version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls", "stream"] }
reqwest-a6292c17cd707f01 = { package = "reqwest", version = "0.11", default-features = false, features = ["blocking", "default-tls", "stream"] }
reqwest-a6292c17cd707f01 = { package = "reqwest", version = "0.11", default-features = false, features = ["blocking", "rustls-tls", "stream"] }
rustls = { version = "0.21", features = ["dangerous_configuration"] }
scopeguard = { version = "1" }
serde = { version = "1", features = ["alloc", "derive"] }