Compare commits

...

24 Commits

Author SHA1 Message Date
Christian Schwarz
c0881a1407 break bench_ingest compilation on all platforms 2025-04-24 16:51:42 +02:00
Christian Schwarz
9c6ff3aa2b refactor(BufferedWriter): flush task owns the VirtualFile & abstraction for cleanup on drop (#11549)
Main change:

- `BufferedWriter` owns the `W`; no more `Arc<W>`
- We introduce auto-delete-on-drop wrappers for `VirtualFile`.
  - `TempVirtualFile` for write-only users
- `TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter` for
EphemeralFile which requires read access to the immutable prefix of the
file (see doc comments for details)
- Users of `BufferedWriter` hand it such a wrapped `VirtualFile`.
- The wrapped `VirtualFile` moves to the background flush task.
- On `BufferedWriter` shutdown, ownership moves back.
- Callers remove the wrapper (`disarm_into_inner()`) after doing final
touches, e.g., flushing index blocks and summary for delta/image layer
writers.

If the BufferedWriter isn't shut down properly via
`BufferedWriter::shutdown`, or if there is an error during final
touches, the wrapper type ensures that the file gets unlinked.

We store a GateGuard inside the wrapper to ensure that the Timeline is
still alive when unlinking on drop.

Rust doesn't have async drop yet, so, the unlinking happens using a
synchronous syscall.
NB we don't fsync the surrounding directory.
This is how it's been before this PR; I believe it is correct because
all of these files are temporary paths that get cleaned up on timeline
load.
Again, timeline load does not need to fsync because the next timeline
load will unlink again if the file reappears.

The auto-delete-on-drop can happen after a higher-level mechanism
retries.
Therefore, we switch all users to monotonically increasing, never-reused
temp file disambiguators.

The aspects pointed out in the last two paragraphs will receive further
cleanup in follow-up task
- https://github.com/neondatabase/neon/issues/11692

Drive-by changes:
- It turns out we can remove the two-pronged code in the layer file
download code.
No need to make this a separate PR because all of production already
uses `tokio-epoll-uring` with the buffered writer for many weeks.


Refs
- epic https://github.com/neondatabase/neon/issues/9868
- alternative to https://github.com/neondatabase/neon/pull/11544
2025-04-24 13:07:57 +00:00
Folke Behrens
9d472c79ce Fix what's currently flagged by cargo deny (#11693)
* Replace yanked papaya version
* Remove unused allowed license: OpenSSL
* Remove Zlib license from general allow list since it's listed in the
exceptions section per crate
* Drop clarification for ring since they have separate LICENSE files now
* List the tower-otel repo as allowed source while we sort out the OTel
deps
2025-04-24 13:02:31 +00:00
Arpad Müller
b43203928f Switch tenant snapshot subcommand to remote_storage (#11685)
Switches the tenant snapshot subcommand of the storage scrubber to
`remote_storage`. As this is the last piece of the storage scrubber
still using the S3 SDK, this finishes the project started in #7547.

This allows us to do tenant snapshots on Azure as well.

Builds on #11671
Fixes #8830
2025-04-24 12:22:07 +00:00
Arpad Müller
c35d489539 versioning API for remote_storage (#11671)
Adds a versioning API to remote_storage. We want to use it in the
scrubber, both for tenant snapshot as well as for metadata checks.

for #8830
and for #11588
2025-04-24 11:41:48 +00:00
Vlad Lazar
3a50d95b6d storage_controller: coordinate imports across shards in the storage controller (#11345)
## Problem

Pageservers notify control plane directly when a shard import has
completed.
Control plane has to download the status of each shard from S3 and
figure out if everything is truly done,
before proceeding with branch activation.

Issues with this approach are:
* We can't control shard split behaviour on the storage controller side.
It's unsafe to split
during import.
* Control plane needs to know about shards and implement logic to check
all timelines are indeed ready.

## Summary of changes

In short, storage controller coordinates imports, and, only when
everything is done, notifies control plane.

Big rocks:
1. Store timeline imports in the storage controller database. Each
import stores the status of its shards in the database.
We hook into the timeline creation call as our entry point for this.
2. Pageservers get a new upcall endpoint to notify the storage
controller of shard import updates.
3. Storage controller handles these updates by updating persisted state.
If an update finalizes the import,
then poll pageservers until timeline activation, and, then, notify the
control plane that the import is complete.

Cplane side change with new endpoint is in
https://github.com/neondatabase/cloud/pull/26166

Closes https://github.com/neondatabase/neon/issues/11566
2025-04-24 11:26:06 +00:00
Arpad Müller
d43b8e73ae Update sentry to 0.37 (#11686)
Update the sentry crate to 0.37. This deduplicates the `webpki-roots`
crate in our crate graph, and brings another dependency onto newer
rustls `0.23.18`.
2025-04-24 11:20:41 +00:00
devin-ai-integration[bot]
1808dad269 Add --dev CLI flag to pageserver and safekeeper binaries (#11526)
# Add --dev CLI flag to pageserver and safekeeper binaries

This PR adds the `--dev` CLI flag to both the pageserver and safekeeper
binaries without implementing any functionality yet. This is a precursor
to PR #11517, which will implement the full functionality to require
authentication by default unless the `--dev` flag is specified.

## Changes
- Add `dev_mode` config field to pageserver binary
- Add `--dev` CLI flag to safekeeper binary

This PR is needed for forward compatibility tests to work properly, when
we try to merge #11517

Link to Devin run:
https://app.devin.ai/sessions/ad8231b4e2be430398072b6fc4e85d46
Requested by: John Spray (john@neon.tech)

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: John Spray <john@neon.tech>
2025-04-24 10:45:40 +00:00
Folke Behrens
7ba8519b43 proxy: Update zerocopy to 0.8 (#11681)
Also add some macros that might result in more efficient code.
2025-04-24 09:39:08 +00:00
Christian Schwarz
f8100d66d5 ci: extend 'Wait for extension build to finish' timeout (#11689)
Refs
- https://neondb.slack.com/archives/C059ZC138NR/p1745427571307149
2025-04-24 08:15:08 +00:00
Christian Schwarz
51cdb570eb bench_ingest: general overhaul & add parametrization over virtual_file_io_mode (#11667)
Changes:
- clean up existing parametrization & criterion `BenchmarkId`
- additional parametrization over `virtual_file_io_mode`
- switch to `multi_thread` to be closer to production ([Slack
thread](https://neondb.slack.com/archives/C033RQ5SPDH/p1745339543093159))

Refs
- epic https://github.com/neondatabase/neon/issues/9868
- extracted from https://github.com/neondatabase/neon/pull/11558
2025-04-24 07:38:18 +00:00
devin-ai-integration[bot]
8e09ecf2ab Fix KeyError in physical replication benchmark test (#11675)
# Fix KeyError in physical replication benchmark test

This PR fixes the failing physical replication benchmark test that was
encountering a KeyError: 'endpoints'.

The issue was in accessing `project["project"]["endpoints"][0]["id"]`
when it should be `project["endpoints"][0]["id"]`, consistent with how
endpoints are accessed elsewhere in the codebase.

Fixed the issue in both test functions:
- test_ro_replica_lag
- test_replication_start_stop

Link to Devin run:
https://app.devin.ai/sessions/be3fe9a9ee5942e4b12e74a7055f541b
Requested by: Peter Bendel

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: peterbendel@neon.tech <peterbendel@neon.tech>
2025-04-23 14:51:08 +00:00
Mikhail Kot
c3534cea39 Rename object_storage->endpoint_storage (#11678)
1. Rename service to avoid ambiguity as discussed in Slack
2. Ignore endpoint_id in read paths as requested in
https://github.com/neondatabase/cloud/issues/26346#issuecomment-2806758224
2025-04-23 14:03:19 +00:00
Folke Behrens
21d3d60cef proxy/pglb: Add in-process connection support (#11677)
Define a `Connection` and a `Stream` type that resemble simple QUIC
connections
and (multiplexed) streams.
2025-04-23 12:18:30 +00:00
Tristan Partin
b00db536bb Add CPU architecture to the remote extensions object key (#11590)
ARM computes are incoming and we need to account for that in remote
extensions. Previously, we just blindly assumed that all computes were
x86_64.

Note that we use the Go architecture naming convention instead of the
Rust one directly to do our best and be consistent across the stack.

Part-of: https://github.com/neondatabase/cloud/issues/23148

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-04-22 22:47:22 +00:00
Arpad Müller
149cbd1e0a Support single and two safekeeper scenarios (#11483)
In tests and when one safekeeper is down in small regions, we need to
contend with one or two safekeepers. Before, we gave an error in
`safekeepers_for_new_timeline`. Now we just silently allow the timeline
to be created on one or two safekeepers.

Part of #9011
2025-04-22 21:27:01 +00:00
Alexander Lakhin
7b949daf13 fix(test): allow reconcile errors in test_storage_controller_heartbeats (#11665)
## Problem

test_storage_controller_heartbeats is flaky because of unallowed
reconciler errors (#11625)

## Summary of changes

Allow reconcile errors as in other tests in test_storage_controller.py.
2025-04-22 18:13:16 +00:00
Konstantin Knizhnik
132b6154bb Unlogged build debug compare local v2 (#11554)
## Problem

Init fork is used in DEBUG_COMPARE_LOCAL to determine unlogged relation
or unlogged build.
But it is created only after the relation is initialized and so can be
swapped out, producing `Page is evicted with zero LSN` error.

## Summary of changes

Create init fork together with main fork for unlogged relations in
DEBUG_COMPARE_LOCAL mode.

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-04-22 18:07:45 +00:00
Alex Chi Z.
ad3519ebcb fix(pageserver): report synthetic size = 1 if all tls offloaded (#11648)
## Problem

A quick workaround for https://github.com/neondatabase/neon/issues/11631

## Summary of changes

Report synthetic size == 1 if all timelines are offloaded.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-22 14:28:22 +00:00
Dmitrii Kovalkov
6173c0f44c safekeeper: add enable_tls_wal_service_api (#11520)
## Problem
Safekeeper doesn't use TLS in wal service
- Closes: https://github.com/neondatabase/cloud/issues/27302

## Summary of changes
- Add `enable_tls_wal_service_api` option to safekeeper's cmd arguments
- Propagate `tls_server_config` to `wal_service` if the option is
enabled
- Create `BACKGROUND_RUNTIME` for small background tasks and offload SSL
certificate reloader to it.

No integration tests for now because support from compute side is
required: https://github.com/neondatabase/cloud/issues/25823
2025-04-22 13:19:03 +00:00
a-masterov
fd916abf25 Remove NOTICE messages, which can make the pg_repack regression test fail. (#11659)
## Problem
The pg_repack test can be flaky due to unpredictable `NOTICE` messages
about waiting for some processes.
E.g., 
```
 INFO: repacking table "public.issue3_2"
+NOTICE: Waiting for 1 transactions to finish. First PID: 427
```
## Summary of changes
The `client_min_messages` set to `warning` for the regression tests.
2025-04-22 11:43:45 +00:00
Alexander Bayandin
cd2e1fbc7c CI(benchmarks): upload perf results for passed tests (#11649)
## Problem

We run benchmarks in batches (five parallel jobs on different runners).
If any test in a batch fails, we won’t upload any results for that
batch, even for the tests that passed.

## Summary of changes
- Move the results upload to a separate step in the run-python-test-set
action, and execute this step even if tests fail.
2025-04-22 09:41:28 +00:00
Tristan Partin
5df4a747e6 Update pgbouncer in compute images to 1.24.1 (#11651)
Fixes CVE-2025-2291.

Link:
https://www.postgresql.org/about/news/pgbouncer-1241-released-fixes-cve-2025-2291-3059/

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-04-21 17:49:17 +00:00
Vlad Lazar
cbf442292b pageserver: handle empty get vectored queries (#11652)
## Problem

If all batched requests are excluded from the query by
`Timeine::get_rel_page_at_lsn_batched` (e.g. because they are past the
end of the relation), the read path would panic since it doesn't expect
empty queries. This is a change in behaviour that was introduced with
the scattered query implementation.

## Summary of Changes

Handle empty queries explicitly.
2025-04-21 17:45:16 +00:00
80 changed files with 2529 additions and 1238 deletions

View File

@@ -19,7 +19,7 @@
!pageserver/
!pgxn/
!proxy/
!object_storage/
!endpoint_storage/
!storage_scrubber/
!safekeeper/
!storage_broker/

View File

@@ -133,6 +133,7 @@ runs:
fi
PERF_REPORT_DIR="$(realpath test_runner/perf-report-local)"
echo "PERF_REPORT_DIR=${PERF_REPORT_DIR}" >> ${GITHUB_ENV}
rm -rf $PERF_REPORT_DIR
TEST_SELECTION="test_runner/${{ inputs.test_selection }}"
@@ -209,11 +210,12 @@ runs:
--verbose \
-rA $TEST_SELECTION $EXTRA_PARAMS
if [[ "${{ inputs.save_perf_report }}" == "true" ]]; then
export REPORT_FROM="$PERF_REPORT_DIR"
export REPORT_TO="$PLATFORM"
scripts/generate_and_push_perf_report.sh
fi
- name: Upload performance report
if: ${{ !cancelled() && inputs.save_perf_report == 'true' }}
shell: bash -euxo pipefail {0}
run: |
export REPORT_FROM="${PERF_REPORT_DIR}"
scripts/generate_and_push_perf_report.sh
- name: Upload compatibility snapshot
# Note, that we use `github.base_ref` which is a target branch for a PR

View File

@@ -1238,7 +1238,7 @@ jobs:
env:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
run: |
TIMEOUT=1800 # 30 minutes, usually it takes ~2-3 minutes, but if runners are busy, it might take longer
TIMEOUT=5400 # 90 minutes, usually it takes ~2-3 minutes, but if runners are busy, it might take longer
INTERVAL=15 # try each N seconds
last_status="" # a variable to carry the last status of the "build-and-upload-extensions" context

132
Cargo.lock generated
View File

@@ -40,7 +40,7 @@ dependencies = [
"getrandom 0.2.11",
"once_cell",
"version_check",
"zerocopy",
"zerocopy 0.7.31",
]
[[package]]
@@ -2037,6 +2037,33 @@ dependencies = [
"zeroize",
]
[[package]]
name = "endpoint_storage"
version = "0.0.1"
dependencies = [
"anyhow",
"axum",
"axum-extra",
"camino",
"camino-tempfile",
"futures",
"http-body-util",
"itertools 0.10.5",
"jsonwebtoken",
"prometheus",
"rand 0.8.5",
"remote_storage",
"serde",
"serde_json",
"test-log",
"tokio",
"tokio-util",
"tower 0.5.2",
"tracing",
"utils",
"workspace_hack",
]
[[package]]
name = "enum-map"
version = "2.5.0"
@@ -3998,33 +4025,6 @@ dependencies = [
"memchr",
]
[[package]]
name = "object_storage"
version = "0.0.1"
dependencies = [
"anyhow",
"axum",
"axum-extra",
"camino",
"camino-tempfile",
"futures",
"http-body-util",
"itertools 0.10.5",
"jsonwebtoken",
"prometheus",
"rand 0.8.5",
"remote_storage",
"serde",
"serde_json",
"test-log",
"tokio",
"tokio-util",
"tower 0.5.2",
"tracing",
"utils",
"workspace_hack",
]
[[package]]
name = "once_cell"
version = "1.20.2"
@@ -4415,9 +4415,9 @@ dependencies = [
[[package]]
name = "papaya"
version = "0.2.0"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aab21828b6b5952fdadd6c377728ffae53ec3a21b2febc47319ab65741f7e2fd"
checksum = "6827e3fc394523c21d4464d02c0bb1c19966ea4a58a9844ad6d746214179d2bc"
dependencies = [
"equivalent",
"seize",
@@ -5204,7 +5204,7 @@ dependencies = [
"walkdir",
"workspace_hack",
"x509-cert",
"zerocopy",
"zerocopy 0.8.24",
]
[[package]]
@@ -5594,7 +5594,7 @@ dependencies = [
"wasm-bindgen-futures",
"wasm-streams",
"web-sys",
"webpki-roots 0.26.1",
"webpki-roots",
"winreg",
]
@@ -6195,13 +6195,13 @@ checksum = "224e328af6e080cddbab3c770b1cf50f0351ba0577091ef2410c3951d835ff87"
[[package]]
name = "sentry"
version = "0.32.3"
version = "0.37.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00421ed8fa0c995f07cde48ba6c89e80f2b312f74ff637326f392fbfd23abe02"
checksum = "255914a8e53822abd946e2ce8baa41d4cded6b8e938913b7f7b9da5b7ab44335"
dependencies = [
"httpdate",
"reqwest",
"rustls 0.21.12",
"rustls 0.23.18",
"sentry-backtrace",
"sentry-contexts",
"sentry-core",
@@ -6209,14 +6209,14 @@ dependencies = [
"sentry-tracing",
"tokio",
"ureq",
"webpki-roots 0.25.2",
"webpki-roots",
]
[[package]]
name = "sentry-backtrace"
version = "0.32.3"
version = "0.37.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a79194074f34b0cbe5dd33896e5928bbc6ab63a889bd9df2264af5acb186921e"
checksum = "00293cd332a859961f24fd69258f7e92af736feaeb91020cff84dac4188a4302"
dependencies = [
"backtrace",
"once_cell",
@@ -6226,9 +6226,9 @@ dependencies = [
[[package]]
name = "sentry-contexts"
version = "0.32.3"
version = "0.37.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eba8870c5dba2bfd9db25c75574a11429f6b95957b0a78ac02e2970dd7a5249a"
checksum = "961990f9caa76476c481de130ada05614cd7f5aa70fb57c2142f0e09ad3fb2aa"
dependencies = [
"hostname",
"libc",
@@ -6240,9 +6240,9 @@ dependencies = [
[[package]]
name = "sentry-core"
version = "0.32.3"
version = "0.37.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46a75011ea1c0d5c46e9e57df03ce81f5c7f0a9e199086334a1f9c0a541e0826"
checksum = "1a6409d845707d82415c800290a5d63be5e3df3c2e417b0997c60531dfbd35ef"
dependencies = [
"once_cell",
"rand 0.8.5",
@@ -6253,9 +6253,9 @@ dependencies = [
[[package]]
name = "sentry-panic"
version = "0.32.3"
version = "0.37.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2eaa3ecfa3c8750c78dcfd4637cfa2598b95b52897ed184b4dc77fcf7d95060d"
checksum = "609b1a12340495ce17baeec9e08ff8ed423c337c1a84dffae36a178c783623f3"
dependencies = [
"sentry-backtrace",
"sentry-core",
@@ -6263,9 +6263,9 @@ dependencies = [
[[package]]
name = "sentry-tracing"
version = "0.32.3"
version = "0.37.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f715932bf369a61b7256687c6f0554141b7ce097287e30e3f7ed6e9de82498fe"
checksum = "49f4e86402d5c50239dc7d8fd3f6d5e048221d5fcb4e026d8d50ab57fe4644cb"
dependencies = [
"sentry-backtrace",
"sentry-core",
@@ -6275,9 +6275,9 @@ dependencies = [
[[package]]
name = "sentry-types"
version = "0.32.3"
version = "0.37.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4519c900ce734f7a0eb7aba0869dfb225a7af8820634a7dd51449e3b093cfb7c"
checksum = "3d3f117b8755dbede8260952de2aeb029e20f432e72634e8969af34324591631"
dependencies = [
"debugid",
"hex",
@@ -6711,8 +6711,6 @@ version = "0.1.0"
dependencies = [
"anyhow",
"async-stream",
"aws-config",
"aws-sdk-s3",
"camino",
"chrono",
"clap",
@@ -7801,7 +7799,7 @@ dependencies = [
"rustls 0.23.18",
"rustls-pki-types",
"url",
"webpki-roots 0.26.1",
"webpki-roots",
]
[[package]]
@@ -8169,12 +8167,6 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "webpki-roots"
version = "0.25.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14247bb57be4f377dfb94c72830b8ce8fc6beac03cf4bf7b9732eadd414123fc"
[[package]]
name = "webpki-roots"
version = "0.26.1"
@@ -8482,6 +8474,8 @@ dependencies = [
"regex-syntax 0.8.2",
"reqwest",
"rustls 0.23.18",
"rustls-pki-types",
"rustls-webpki 0.102.8",
"scopeguard",
"sec1 0.7.3",
"serde",
@@ -8510,7 +8504,6 @@ dependencies = [
"tracing-log",
"url",
"uuid",
"zerocopy",
"zeroize",
"zstd",
"zstd-safe",
@@ -8614,8 +8607,16 @@ version = "0.7.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c4061bedbb353041c12f413700357bec76df2c7e2ca8e4df8bac24c6bf68e3d"
dependencies = [
"byteorder",
"zerocopy-derive",
"zerocopy-derive 0.7.31",
]
[[package]]
name = "zerocopy"
version = "0.8.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2586fea28e186957ef732a5f8b3be2da217d65c5969d4b1e17f973ebbe876879"
dependencies = [
"zerocopy-derive 0.8.24",
]
[[package]]
@@ -8629,6 +8630,17 @@ dependencies = [
"syn 2.0.100",
]
[[package]]
name = "zerocopy-derive"
version = "0.8.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a996a8f63c5c4448cd959ac1bab0aaa3306ccfd060472f85943ee0750f0169be"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]]
name = "zerofrom"
version = "0.1.5"

View File

@@ -40,7 +40,7 @@ members = [
"libs/proxy/postgres-protocol2",
"libs/proxy/postgres-types2",
"libs/proxy/tokio-postgres2",
"object_storage",
"endpoint_storage",
]
[workspace.package]
@@ -164,7 +164,7 @@ scopeguard = "1.1"
sysinfo = "0.29.2"
sd-notify = "0.4.1"
send-future = "0.1.0"
sentry = { version = "0.32", default-features = false, features = ["backtrace", "contexts", "panic", "rustls", "reqwest" ] }
sentry = { version = "0.37", default-features = false, features = ["backtrace", "contexts", "panic", "rustls", "reqwest" ] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
serde_path_to_error = "0.1"
@@ -220,7 +220,7 @@ uuid = { version = "1.6.1", features = ["v4", "v7", "serde"] }
walkdir = "2.3.2"
rustls-native-certs = "0.8"
whoami = "1.5.1"
zerocopy = { version = "0.7", features = ["derive"] }
zerocopy = { version = "0.8", features = ["derive", "simd"] }
json-structural-diff = { version = "0.2.0" }
x509-cert = { version = "0.2.5" }

View File

@@ -89,7 +89,7 @@ RUN set -e \
--bin storage_broker \
--bin storage_controller \
--bin proxy \
--bin object_storage \
--bin endpoint_storage \
--bin neon_local \
--bin storage_scrubber \
--locked --release
@@ -122,7 +122,7 @@ COPY --from=build --chown=neon:neon /home/nonroot/target/release/safekeeper
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_broker /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_controller /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/object_storage /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/endpoint_storage /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/neon_local /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_scrubber /usr/local/bin

View File

@@ -1677,7 +1677,7 @@ RUN set -e \
&& apt clean && rm -rf /var/lib/apt/lists/*
# Use `dist_man_MANS=` to skip manpage generation (which requires python3/pandoc)
ENV PGBOUNCER_TAG=pgbouncer_1_22_1
ENV PGBOUNCER_TAG=pgbouncer_1_24_1
RUN set -e \
&& git clone --recurse-submodules --depth 1 --branch ${PGBOUNCER_TAG} https://github.com/pgbouncer/pgbouncer.git pgbouncer \
&& cd pgbouncer \

View File

@@ -11,6 +11,14 @@ index bf6edcb..89b4c7f 100644
USE_PGXS = 1 # use pgxs if not in contrib directory
PGXS := $(shell $(PG_CONFIG) --pgxs)
diff --git a/regress/expected/init-extension.out b/regress/expected/init-extension.out
index 9f2e171..f6e4f8d 100644
--- a/regress/expected/init-extension.out
+++ b/regress/expected/init-extension.out
@@ -1,3 +1,2 @@
SET client_min_messages = warning;
CREATE EXTENSION pg_repack;
-RESET client_min_messages;
diff --git a/regress/expected/nosuper.out b/regress/expected/nosuper.out
index 8d0a94e..63b68bf 100644
--- a/regress/expected/nosuper.out
@@ -42,6 +50,14 @@ index 8d0a94e..63b68bf 100644
INFO: repacking table "public.tbl_cluster"
ERROR: query failed: ERROR: current transaction is aborted, commands ignored until end of transaction block
DETAIL: query was: RESET lock_timeout
diff --git a/regress/sql/init-extension.sql b/regress/sql/init-extension.sql
index 9f2e171..f6e4f8d 100644
--- a/regress/sql/init-extension.sql
+++ b/regress/sql/init-extension.sql
@@ -1,3 +1,2 @@
SET client_min_messages = warning;
CREATE EXTENSION pg_repack;
-RESET client_min_messages;
diff --git a/regress/sql/nosuper.sql b/regress/sql/nosuper.sql
index 072f0fa..dbe60f8 100644
--- a/regress/sql/nosuper.sql

View File

@@ -18,12 +18,11 @@ use anyhow::{Context, Result, anyhow, bail};
use clap::Parser;
use compute_api::spec::ComputeMode;
use control_plane::endpoint::ComputeControlPlane;
use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_PORT, EndpointStorage};
use control_plane::local_env::{
InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf, NeonLocalInitPageserverConf,
ObjectStorageConf, SafekeeperConf,
EndpointStorageConf, InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf,
NeonLocalInitPageserverConf, SafekeeperConf,
};
use control_plane::object_storage::OBJECT_STORAGE_DEFAULT_PORT;
use control_plane::object_storage::ObjectStorage;
use control_plane::pageserver::PageServerNode;
use control_plane::safekeeper::SafekeeperNode;
use control_plane::storage_controller::{
@@ -93,7 +92,7 @@ enum NeonLocalCmd {
#[command(subcommand)]
Safekeeper(SafekeeperCmd),
#[command(subcommand)]
ObjectStorage(ObjectStorageCmd),
EndpointStorage(EndpointStorageCmd),
#[command(subcommand)]
Endpoint(EndpointCmd),
#[command(subcommand)]
@@ -460,14 +459,14 @@ enum SafekeeperCmd {
#[derive(clap::Subcommand)]
#[clap(about = "Manage object storage")]
enum ObjectStorageCmd {
Start(ObjectStorageStartCmd),
Stop(ObjectStorageStopCmd),
enum EndpointStorageCmd {
Start(EndpointStorageStartCmd),
Stop(EndpointStorageStopCmd),
}
#[derive(clap::Args)]
#[clap(about = "Start object storage")]
struct ObjectStorageStartCmd {
struct EndpointStorageStartCmd {
#[clap(short = 't', long, help = "timeout until we fail the command")]
#[arg(default_value = "10s")]
start_timeout: humantime::Duration,
@@ -475,7 +474,7 @@ struct ObjectStorageStartCmd {
#[derive(clap::Args)]
#[clap(about = "Stop object storage")]
struct ObjectStorageStopCmd {
struct EndpointStorageStopCmd {
#[arg(value_enum, default_value = "fast")]
#[clap(
short = 'm',
@@ -797,7 +796,9 @@ fn main() -> Result<()> {
}
NeonLocalCmd::StorageBroker(subcmd) => rt.block_on(handle_storage_broker(&subcmd, env)),
NeonLocalCmd::Safekeeper(subcmd) => rt.block_on(handle_safekeeper(&subcmd, env)),
NeonLocalCmd::ObjectStorage(subcmd) => rt.block_on(handle_object_storage(&subcmd, env)),
NeonLocalCmd::EndpointStorage(subcmd) => {
rt.block_on(handle_endpoint_storage(&subcmd, env))
}
NeonLocalCmd::Endpoint(subcmd) => rt.block_on(handle_endpoint(&subcmd, env)),
NeonLocalCmd::Mappings(subcmd) => handle_mappings(&subcmd, env),
};
@@ -1014,8 +1015,8 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
}
})
.collect(),
object_storage: ObjectStorageConf {
port: OBJECT_STORAGE_DEFAULT_PORT,
endpoint_storage: EndpointStorageConf {
port: ENDPOINT_STORAGE_DEFAULT_PORT,
},
pg_distrib_dir: None,
neon_distrib_dir: None,
@@ -1735,12 +1736,15 @@ async fn handle_safekeeper(subcmd: &SafekeeperCmd, env: &local_env::LocalEnv) ->
Ok(())
}
async fn handle_object_storage(subcmd: &ObjectStorageCmd, env: &local_env::LocalEnv) -> Result<()> {
use ObjectStorageCmd::*;
let storage = ObjectStorage::from_env(env);
async fn handle_endpoint_storage(
subcmd: &EndpointStorageCmd,
env: &local_env::LocalEnv,
) -> Result<()> {
use EndpointStorageCmd::*;
let storage = EndpointStorage::from_env(env);
// In tests like test_forward_compatibility or test_graceful_cluster_restart
// old neon binaries (without object_storage) are present
// old neon binaries (without endpoint_storage) are present
if !storage.bin.exists() {
eprintln!(
"{} binary not found. Ignore if this is a compatibility test",
@@ -1750,13 +1754,13 @@ async fn handle_object_storage(subcmd: &ObjectStorageCmd, env: &local_env::Local
}
match subcmd {
Start(ObjectStorageStartCmd { start_timeout }) => {
Start(EndpointStorageStartCmd { start_timeout }) => {
if let Err(e) = storage.start(start_timeout).await {
eprintln!("object_storage start failed: {e}");
eprintln!("endpoint_storage start failed: {e}");
exit(1);
}
}
Stop(ObjectStorageStopCmd { stop_mode }) => {
Stop(EndpointStorageStopCmd { stop_mode }) => {
let immediate = match stop_mode {
StopMode::Fast => false,
StopMode::Immediate => true,
@@ -1866,10 +1870,10 @@ async fn handle_start_all_impl(
}
js.spawn(async move {
ObjectStorage::from_env(env)
EndpointStorage::from_env(env)
.start(&retry_timeout)
.await
.map_err(|e| e.context("start object_storage"))
.map_err(|e| e.context("start endpoint_storage"))
});
})();
@@ -1968,9 +1972,9 @@ async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
}
}
let storage = ObjectStorage::from_env(env);
let storage = EndpointStorage::from_env(env);
if let Err(e) = storage.stop(immediate) {
eprintln!("object_storage stop failed: {:#}", e);
eprintln!("endpoint_storage stop failed: {:#}", e);
}
for ps_conf in &env.pageservers {

View File

@@ -1,34 +1,33 @@
use crate::background_process::{self, start_process, stop_process};
use crate::local_env::LocalEnv;
use anyhow::anyhow;
use anyhow::{Context, Result};
use camino::Utf8PathBuf;
use std::io::Write;
use std::time::Duration;
/// Directory within .neon which will be used by default for LocalFs remote storage.
pub const OBJECT_STORAGE_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/object_storage";
pub const OBJECT_STORAGE_DEFAULT_PORT: u16 = 9993;
pub const ENDPOINT_STORAGE_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/endpoint_storage";
pub const ENDPOINT_STORAGE_DEFAULT_PORT: u16 = 9993;
pub struct ObjectStorage {
pub struct EndpointStorage {
pub bin: Utf8PathBuf,
pub data_dir: Utf8PathBuf,
pub pemfile: Utf8PathBuf,
pub port: u16,
}
impl ObjectStorage {
pub fn from_env(env: &LocalEnv) -> ObjectStorage {
ObjectStorage {
bin: Utf8PathBuf::from_path_buf(env.object_storage_bin()).unwrap(),
data_dir: Utf8PathBuf::from_path_buf(env.object_storage_data_dir()).unwrap(),
impl EndpointStorage {
pub fn from_env(env: &LocalEnv) -> EndpointStorage {
EndpointStorage {
bin: Utf8PathBuf::from_path_buf(env.endpoint_storage_bin()).unwrap(),
data_dir: Utf8PathBuf::from_path_buf(env.endpoint_storage_data_dir()).unwrap(),
pemfile: Utf8PathBuf::from_path_buf(env.public_key_path.clone()).unwrap(),
port: env.object_storage.port,
port: env.endpoint_storage.port,
}
}
fn config_path(&self) -> Utf8PathBuf {
self.data_dir.join("object_storage.json")
self.data_dir.join("endpoint_storage.json")
}
fn listen_addr(&self) -> Utf8PathBuf {
@@ -49,7 +48,7 @@ impl ObjectStorage {
let cfg = Cfg {
listen: self.listen_addr(),
pemfile: parent.join(self.pemfile.clone()),
local_path: parent.join(OBJECT_STORAGE_REMOTE_STORAGE_DIR),
local_path: parent.join(ENDPOINT_STORAGE_REMOTE_STORAGE_DIR),
r#type: "LocalFs".to_string(),
};
std::fs::create_dir_all(self.config_path().parent().unwrap())?;
@@ -59,24 +58,19 @@ impl ObjectStorage {
}
pub async fn start(&self, retry_timeout: &Duration) -> Result<()> {
println!("Starting s3 proxy at {}", self.listen_addr());
println!("Starting endpoint_storage at {}", self.listen_addr());
std::io::stdout().flush().context("flush stdout")?;
let process_status_check = || async {
tokio::time::sleep(Duration::from_millis(500)).await;
let res = reqwest::Client::new()
.get(format!("http://{}/metrics", self.listen_addr()))
.send()
.await;
match res {
Ok(response) if response.status().is_success() => Ok(true),
Ok(_) => Err(anyhow!("Failed to query /metrics")),
Err(e) => Err(anyhow!("Failed to check node status: {e}")),
let res = reqwest::Client::new().get(format!("http://{}/metrics", self.listen_addr()));
match res.send().await {
Ok(res) => Ok(res.status().is_success()),
Err(_) => Ok(false),
}
};
let res = start_process(
"object_storage",
"endpoint_storage",
&self.data_dir.clone().into_std_path_buf(),
&self.bin.clone().into_std_path_buf(),
vec![self.config_path().to_string()],
@@ -94,14 +88,14 @@ impl ObjectStorage {
}
pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
stop_process(immediate, "object_storage", &self.pid_file())
stop_process(immediate, "endpoint_storage", &self.pid_file())
}
fn log_file(&self) -> Utf8PathBuf {
self.data_dir.join("object_storage.log")
self.data_dir.join("endpoint_storage.log")
}
fn pid_file(&self) -> Utf8PathBuf {
self.data_dir.join("object_storage.pid")
self.data_dir.join("endpoint_storage.pid")
}
}

View File

@@ -9,8 +9,8 @@
mod background_process;
pub mod broker;
pub mod endpoint;
pub mod endpoint_storage;
pub mod local_env;
pub mod object_storage;
pub mod pageserver;
pub mod postgresql_conf;
pub mod safekeeper;

View File

@@ -19,7 +19,7 @@ use serde::{Deserialize, Serialize};
use utils::auth::encode_from_key_file;
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use crate::object_storage::{OBJECT_STORAGE_REMOTE_STORAGE_DIR, ObjectStorage};
use crate::endpoint_storage::{ENDPOINT_STORAGE_REMOTE_STORAGE_DIR, EndpointStorage};
use crate::pageserver::{PAGESERVER_REMOTE_STORAGE_DIR, PageServerNode};
use crate::safekeeper::SafekeeperNode;
@@ -72,7 +72,7 @@ pub struct LocalEnv {
pub safekeepers: Vec<SafekeeperConf>,
pub object_storage: ObjectStorageConf,
pub endpoint_storage: EndpointStorageConf,
// Control plane upcall API for pageserver: if None, we will not run storage_controller If set, this will
// be propagated into each pageserver's configuration.
@@ -110,7 +110,7 @@ pub struct OnDiskConfig {
)]
pub pageservers: Vec<PageServerConf>,
pub safekeepers: Vec<SafekeeperConf>,
pub object_storage: ObjectStorageConf,
pub endpoint_storage: EndpointStorageConf,
pub control_plane_api: Option<Url>,
pub control_plane_hooks_api: Option<Url>,
pub control_plane_compute_hook_api: Option<Url>,
@@ -144,7 +144,7 @@ pub struct NeonLocalInitConf {
pub storage_controller: Option<NeonStorageControllerConf>,
pub pageservers: Vec<NeonLocalInitPageserverConf>,
pub safekeepers: Vec<SafekeeperConf>,
pub object_storage: ObjectStorageConf,
pub endpoint_storage: EndpointStorageConf,
pub control_plane_api: Option<Url>,
pub control_plane_hooks_api: Option<Url>,
pub generate_local_ssl_certs: bool,
@@ -152,7 +152,7 @@ pub struct NeonLocalInitConf {
#[derive(Serialize, Default, Deserialize, PartialEq, Eq, Clone, Debug)]
#[serde(default)]
pub struct ObjectStorageConf {
pub struct EndpointStorageConf {
pub port: u16,
}
@@ -413,8 +413,8 @@ impl LocalEnv {
self.pg_dir(pg_version, "lib")
}
pub fn object_storage_bin(&self) -> PathBuf {
self.neon_distrib_dir.join("object_storage")
pub fn endpoint_storage_bin(&self) -> PathBuf {
self.neon_distrib_dir.join("endpoint_storage")
}
pub fn pageserver_bin(&self) -> PathBuf {
@@ -450,8 +450,8 @@ impl LocalEnv {
self.base_data_dir.join("safekeepers").join(data_dir_name)
}
pub fn object_storage_data_dir(&self) -> PathBuf {
self.base_data_dir.join("object_storage")
pub fn endpoint_storage_data_dir(&self) -> PathBuf {
self.base_data_dir.join("endpoint_storage")
}
pub fn get_pageserver_conf(&self, id: NodeId) -> anyhow::Result<&PageServerConf> {
@@ -615,7 +615,7 @@ impl LocalEnv {
control_plane_compute_hook_api: _,
branch_name_mappings,
generate_local_ssl_certs,
object_storage,
endpoint_storage,
} = on_disk_config;
LocalEnv {
base_data_dir: repopath.to_owned(),
@@ -632,7 +632,7 @@ impl LocalEnv {
control_plane_hooks_api,
branch_name_mappings,
generate_local_ssl_certs,
object_storage,
endpoint_storage,
}
};
@@ -742,7 +742,7 @@ impl LocalEnv {
control_plane_compute_hook_api: None,
branch_name_mappings: self.branch_name_mappings.clone(),
generate_local_ssl_certs: self.generate_local_ssl_certs,
object_storage: self.object_storage.clone(),
endpoint_storage: self.endpoint_storage.clone(),
},
)
}
@@ -849,7 +849,7 @@ impl LocalEnv {
control_plane_api,
generate_local_ssl_certs,
control_plane_hooks_api,
object_storage,
endpoint_storage,
} = conf;
// Find postgres binaries.
@@ -901,7 +901,7 @@ impl LocalEnv {
control_plane_hooks_api,
branch_name_mappings: Default::default(),
generate_local_ssl_certs,
object_storage,
endpoint_storage,
};
if generate_local_ssl_certs {
@@ -929,13 +929,13 @@ impl LocalEnv {
.context("pageserver init failed")?;
}
ObjectStorage::from_env(&env)
EndpointStorage::from_env(&env)
.init()
.context("object storage init failed")?;
// setup remote remote location for default LocalFs remote storage
std::fs::create_dir_all(env.base_data_dir.join(PAGESERVER_REMOTE_STORAGE_DIR))?;
std::fs::create_dir_all(env.base_data_dir.join(OBJECT_STORAGE_REMOTE_STORAGE_DIR))?;
std::fs::create_dir_all(env.base_data_dir.join(ENDPOINT_STORAGE_REMOTE_STORAGE_DIR))?;
env.persist_config()
}

View File

@@ -45,9 +45,7 @@ allow = [
"ISC",
"MIT",
"MPL-2.0",
"OpenSSL",
"Unicode-3.0",
"Zlib",
]
confidence-threshold = 0.8
exceptions = [
@@ -56,14 +54,6 @@ exceptions = [
{ allow = ["Zlib"], name = "const_format", version = "*" },
]
[[licenses.clarify]]
name = "ring"
version = "*"
expression = "MIT AND ISC AND OpenSSL"
license-files = [
{ path = "LICENSE", hash = 0xbd0eed23 }
]
[licenses.private]
ignore = true
registries = []
@@ -116,7 +106,11 @@ name = "openssl"
unknown-registry = "warn"
unknown-git = "warn"
allow-registry = ["https://github.com/rust-lang/crates.io-index"]
allow-git = []
allow-git = [
# Crate pinned to commit in origin repo due to opentelemetry version.
# TODO: Remove this once crate is fetched from crates.io again.
"https://github.com/mattiapenati/tower-otel",
]
[sources.allow-org]
github = [

View File

@@ -1,5 +1,5 @@
[package]
name = "object_storage"
name = "endpoint_storage"
version = "0.0.1"
edition.workspace = true
license.workspace = true

View File

@@ -2,7 +2,7 @@ use anyhow::anyhow;
use axum::body::{Body, Bytes};
use axum::response::{IntoResponse, Response};
use axum::{Router, http::StatusCode};
use object_storage::{PrefixS3Path, S3Path, Storage, bad_request, internal_error, not_found, ok};
use endpoint_storage::{PrefixS3Path, S3Path, Storage, bad_request, internal_error, not_found, ok};
use remote_storage::TimeoutOrCancel;
use remote_storage::{DownloadError, DownloadOpts, GenericRemoteStorage, RemotePath};
use std::{sync::Arc, time::SystemTime, time::UNIX_EPOCH};
@@ -46,12 +46,12 @@ async fn metrics() -> Result {
async fn get(S3Path { path }: S3Path, state: State) -> Result {
info!(%path, "downloading");
let download_err = |e| {
if let DownloadError::NotFound = e {
info!(%path, %e, "downloading"); // 404 is not an issue of _this_ service
let download_err = |err| {
if let DownloadError::NotFound = err {
info!(%path, %err, "downloading"); // 404 is not an issue of _this_ service
return not_found(&path);
}
internal_error(e, &path, "downloading")
internal_error(err, &path, "downloading")
};
let cancel = state.cancel.clone();
let opts = &DownloadOpts::default();
@@ -249,7 +249,7 @@ mod tests {
};
let proxy = Storage {
auth: object_storage::JwtAuth::new(TEST_PUB_KEY_ED25519).unwrap(),
auth: endpoint_storage::JwtAuth::new(TEST_PUB_KEY_ED25519).unwrap(),
storage,
cancel: cancel.clone(),
max_upload_file_limit: usize::MAX,
@@ -343,14 +343,14 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
TimelineId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 7]);
const ENDPOINT_ID: &str = "ep-winter-frost-a662z3vg";
fn token() -> String {
let claims = object_storage::Claims {
let claims = endpoint_storage::Claims {
tenant_id: TENANT_ID,
timeline_id: TIMELINE_ID,
endpoint_id: ENDPOINT_ID.into(),
exp: u64::MAX,
};
let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap();
let header = jsonwebtoken::Header::new(object_storage::VALIDATION_ALGO);
let header = jsonwebtoken::Header::new(endpoint_storage::VALIDATION_ALGO);
jsonwebtoken::encode(&header, &claims, &key).unwrap()
}
@@ -364,7 +364,10 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
vec![TIMELINE_ID.to_string(), TimelineId::generate().to_string()],
vec![ENDPOINT_ID, "ep-ololo"]
)
.skip(1);
// first one is fully valid path, second path is valid for GET as
// read paths may have different endpoint if tenant and timeline matches
// (needed for prewarming RO->RW replica)
.skip(2);
for ((uri, method), (tenant, timeline, endpoint)) in iproduct!(routes(), args) {
info!(%uri, %method, %tenant, %timeline, %endpoint);
@@ -475,6 +478,16 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
requests_chain(chain.into_iter(), |_| token()).await;
}
#[testlog(tokio::test)]
async fn read_other_endpoint_data() {
let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/other_endpoint/key");
let chain = vec![
(uri.clone(), "GET", "", StatusCode::NOT_FOUND, false),
(uri.clone(), "PUT", "", StatusCode::UNAUTHORIZED, false),
];
requests_chain(chain.into_iter(), |_| token()).await;
}
fn delete_prefix_token(uri: &str) -> String {
use serde::Serialize;
let parts = uri.split("/").collect::<Vec<&str>>();
@@ -482,7 +495,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
struct PrefixClaims {
tenant_id: TenantId,
timeline_id: Option<TimelineId>,
endpoint_id: Option<object_storage::EndpointId>,
endpoint_id: Option<endpoint_storage::EndpointId>,
exp: u64,
}
let claims = PrefixClaims {
@@ -492,7 +505,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
exp: u64::MAX,
};
let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap();
let header = jsonwebtoken::Header::new(object_storage::VALIDATION_ALGO);
let header = jsonwebtoken::Header::new(endpoint_storage::VALIDATION_ALGO);
jsonwebtoken::encode(&header, &claims, &key).unwrap()
}

View File

@@ -169,10 +169,19 @@ impl FromRequestParts<Arc<Storage>> for S3Path {
.auth
.decode(bearer.token())
.map_err(|e| bad_request(e, "decoding token"))?;
// Read paths may have different endpoint ids. For readonly -> readwrite replica
// prewarming, endpoint must read other endpoint's data.
let endpoint_id = if parts.method == axum::http::Method::GET {
claims.endpoint_id.clone()
} else {
path.endpoint_id.clone()
};
let route = Claims {
tenant_id: path.tenant_id,
timeline_id: path.timeline_id,
endpoint_id: path.endpoint_id.clone(),
endpoint_id,
exp: claims.exp,
};
if route != claims {

View File

@@ -1,4 +1,4 @@
//! `object_storage` is a service which provides API for uploading and downloading
//! `endpoint_storage` is a service which provides API for uploading and downloading
//! files. It is used by compute and control plane for accessing LFC prewarm data.
//! This service is deployed either as a separate component or as part of compute image
//! for large computes.
@@ -33,7 +33,7 @@ async fn main() -> anyhow::Result<()> {
let config: String = std::env::args().skip(1).take(1).collect();
if config.is_empty() {
anyhow::bail!("Usage: object_storage config.json")
anyhow::bail!("Usage: endpoint_storage config.json")
}
info!("Reading config from {config}");
let config = std::fs::read_to_string(config.clone())?;
@@ -41,7 +41,7 @@ async fn main() -> anyhow::Result<()> {
info!("Reading pemfile from {}", config.pemfile.clone());
let pemfile = std::fs::read(config.pemfile.clone())?;
info!("Loading public key from {}", config.pemfile.clone());
let auth = object_storage::JwtAuth::new(&pemfile)?;
let auth = endpoint_storage::JwtAuth::new(&pemfile)?;
let listener = tokio::net::TcpListener::bind(config.listen).await.unwrap();
info!("listening on {}", listener.local_addr().unwrap());
@@ -50,7 +50,7 @@ async fn main() -> anyhow::Result<()> {
let cancel = tokio_util::sync::CancellationToken::new();
app::check_storage_permissions(&storage, cancel.clone()).await?;
let proxy = std::sync::Arc::new(object_storage::Storage {
let proxy = std::sync::Arc::new(endpoint_storage::Storage {
auth,
storage,
cancel: cancel.clone(),

View File

@@ -242,13 +242,22 @@ impl RemoteExtSpec {
match self.extension_data.get(real_ext_name) {
Some(_ext_data) => {
// We have decided to use the Go naming convention due to Kubernetes.
let arch = match std::env::consts::ARCH {
"x86_64" => "amd64",
"aarch64" => "arm64",
arch => arch,
};
// Construct the path to the extension archive
// BUILD_TAG/PG_MAJOR_VERSION/extensions/EXTENSION_NAME.tar.zst
//
// Keep it in sync with path generation in
// https://github.com/neondatabase/build-custom-extensions/tree/main
let archive_path_str =
format!("{build_tag}/{pg_major_version}/extensions/{real_ext_name}.tar.zst");
let archive_path_str = format!(
"{build_tag}/{arch}/{pg_major_version}/extensions/{real_ext_name}.tar.zst"
);
Ok((
real_ext_name.to_string(),
RemotePath::from_string(&archive_path_str)?,

View File

@@ -181,6 +181,7 @@ pub struct ConfigToml {
pub generate_unarchival_heatmap: Option<bool>,
pub tracing: Option<Tracing>,
pub enable_tls_page_service_api: bool,
pub dev_mode: bool,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -657,6 +658,7 @@ impl Default for ConfigToml {
generate_unarchival_heatmap: None,
tracing: None,
enable_tls_page_service_api: false,
dev_mode: false,
}
}
}

View File

@@ -320,6 +320,35 @@ pub struct TimelineCreateRequest {
pub mode: TimelineCreateRequestMode,
}
impl TimelineCreateRequest {
pub fn mode_tag(&self) -> &'static str {
match &self.mode {
TimelineCreateRequestMode::Branch { .. } => "branch",
TimelineCreateRequestMode::ImportPgdata { .. } => "import",
TimelineCreateRequestMode::Bootstrap { .. } => "bootstrap",
}
}
pub fn is_import(&self) -> bool {
matches!(self.mode, TimelineCreateRequestMode::ImportPgdata { .. })
}
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub enum ShardImportStatus {
InProgress,
Done,
Error(String),
}
impl ShardImportStatus {
pub fn is_terminal(&self) -> bool {
match self {
ShardImportStatus::InProgress => false,
ShardImportStatus::Done | ShardImportStatus::Error(_) => true,
}
}
}
/// Storage controller specific extensions to [`TimelineInfo`].
#[derive(Serialize, Deserialize, Clone)]
pub struct TimelineCreateResponseStorcon {

View File

@@ -4,10 +4,10 @@
//! See docs/rfcs/025-generation-numbers.md
use serde::{Deserialize, Serialize};
use utils::id::NodeId;
use utils::id::{NodeId, TimelineId};
use crate::controller_api::NodeRegisterRequest;
use crate::models::LocationConfigMode;
use crate::models::{LocationConfigMode, ShardImportStatus};
use crate::shard::TenantShardId;
/// Upcall message sent by the pageserver to the configured `control_plane_api` on
@@ -62,3 +62,10 @@ pub struct ValidateResponseTenant {
pub id: TenantShardId,
pub valid: bool,
}
#[derive(Serialize, Deserialize)]
pub struct PutTimelineImportStatusRequest {
pub tenant_shard_id: TenantShardId,
pub timeline_id: TimelineId,
pub status: ShardImportStatus,
}

View File

@@ -14,8 +14,9 @@ use anyhow::{Context, Result};
use azure_core::request_options::{IfMatchCondition, MaxResults, Metadata, Range};
use azure_core::{Continuable, HttpClient, RetryOptions, TransportOptions};
use azure_storage::StorageCredentials;
use azure_storage_blobs::blob::CopyStatus;
use azure_storage_blobs::blob::operations::GetBlobBuilder;
use azure_storage_blobs::blob::{Blob, CopyStatus};
use azure_storage_blobs::container::operations::ListBlobsBuilder;
use azure_storage_blobs::prelude::{ClientBuilder, ContainerClient};
use bytes::Bytes;
use futures::FutureExt;
@@ -253,53 +254,15 @@ impl AzureBlobStorage {
download
}
async fn permit(
&self,
kind: RequestKind,
cancel: &CancellationToken,
) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
let acquire = self.concurrency_limiter.acquire(kind);
tokio::select! {
permit = acquire => Ok(permit.expect("never closed")),
_ = cancel.cancelled() => Err(Cancelled),
}
}
pub fn container_name(&self) -> &str {
&self.container_name
}
}
fn to_azure_metadata(metadata: StorageMetadata) -> Metadata {
let mut res = Metadata::new();
for (k, v) in metadata.0.into_iter() {
res.insert(k, v);
}
res
}
fn to_download_error(error: azure_core::Error) -> DownloadError {
if let Some(http_err) = error.as_http_error() {
match http_err.status() {
StatusCode::NotFound => DownloadError::NotFound,
StatusCode::NotModified => DownloadError::Unmodified,
StatusCode::BadRequest => DownloadError::BadInput(anyhow::Error::new(error)),
_ => DownloadError::Other(anyhow::Error::new(error)),
}
} else {
DownloadError::Other(error.into())
}
}
impl RemoteStorage for AzureBlobStorage {
fn list_streaming(
fn list_streaming_for_fn<T: Default + ListingCollector>(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> impl Stream<Item = Result<Listing, DownloadError>> {
request_kind: RequestKind,
customize_builder: impl Fn(ListBlobsBuilder) -> ListBlobsBuilder,
) -> impl Stream<Item = Result<T, DownloadError>> {
// get the passed prefix or if it is not set use prefix_in_bucket value
let list_prefix = prefix.map(|p| self.relative_path_to_name(p)).or_else(|| {
self.prefix_in_container.clone().map(|mut s| {
@@ -311,7 +274,7 @@ impl RemoteStorage for AzureBlobStorage {
});
async_stream::stream! {
let _permit = self.permit(RequestKind::List, cancel).await?;
let _permit = self.permit(request_kind, cancel).await?;
let mut builder = self.client.list_blobs();
@@ -327,6 +290,8 @@ impl RemoteStorage for AzureBlobStorage {
builder = builder.max_results(MaxResults::new(limit));
}
builder = customize_builder(builder);
let mut next_marker = None;
let mut timeout_try_cnt = 1;
@@ -382,26 +347,20 @@ impl RemoteStorage for AzureBlobStorage {
break;
};
let mut res = Listing::default();
let mut res = T::default();
next_marker = entry.continuation();
let prefix_iter = entry
.blobs
.prefixes()
.map(|prefix| self.name_to_relative_path(&prefix.name));
res.prefixes.extend(prefix_iter);
res.add_prefixes(self, prefix_iter);
let blob_iter = entry
.blobs
.blobs()
.map(|k| ListingObject{
key: self.name_to_relative_path(&k.name),
last_modified: k.properties.last_modified.into(),
size: k.properties.content_length,
}
);
.blobs();
for key in blob_iter {
res.keys.push(key);
res.add_blob(self, key);
if let Some(mut mk) = max_keys {
assert!(mk > 0);
@@ -423,6 +382,128 @@ impl RemoteStorage for AzureBlobStorage {
}
}
async fn permit(
&self,
kind: RequestKind,
cancel: &CancellationToken,
) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
let acquire = self.concurrency_limiter.acquire(kind);
tokio::select! {
permit = acquire => Ok(permit.expect("never closed")),
_ = cancel.cancelled() => Err(Cancelled),
}
}
pub fn container_name(&self) -> &str {
&self.container_name
}
}
trait ListingCollector {
fn add_prefixes(&mut self, abs: &AzureBlobStorage, prefix_it: impl Iterator<Item = RemotePath>);
fn add_blob(&mut self, abs: &AzureBlobStorage, blob: &Blob);
}
impl ListingCollector for Listing {
fn add_prefixes(
&mut self,
_abs: &AzureBlobStorage,
prefix_it: impl Iterator<Item = RemotePath>,
) {
self.prefixes.extend(prefix_it);
}
fn add_blob(&mut self, abs: &AzureBlobStorage, blob: &Blob) {
self.keys.push(ListingObject {
key: abs.name_to_relative_path(&blob.name),
last_modified: blob.properties.last_modified.into(),
size: blob.properties.content_length,
});
}
}
impl ListingCollector for crate::VersionListing {
fn add_prefixes(
&mut self,
_abs: &AzureBlobStorage,
_prefix_it: impl Iterator<Item = RemotePath>,
) {
// nothing
}
fn add_blob(&mut self, abs: &AzureBlobStorage, blob: &Blob) {
let id = crate::VersionId(blob.version_id.clone().expect("didn't find version ID"));
self.versions.push(crate::Version {
key: abs.name_to_relative_path(&blob.name),
last_modified: blob.properties.last_modified.into(),
kind: crate::VersionKind::Version(id),
});
}
}
fn to_azure_metadata(metadata: StorageMetadata) -> Metadata {
let mut res = Metadata::new();
for (k, v) in metadata.0.into_iter() {
res.insert(k, v);
}
res
}
fn to_download_error(error: azure_core::Error) -> DownloadError {
if let Some(http_err) = error.as_http_error() {
match http_err.status() {
StatusCode::NotFound => DownloadError::NotFound,
StatusCode::NotModified => DownloadError::Unmodified,
StatusCode::BadRequest => DownloadError::BadInput(anyhow::Error::new(error)),
_ => DownloadError::Other(anyhow::Error::new(error)),
}
} else {
DownloadError::Other(error.into())
}
}
impl RemoteStorage for AzureBlobStorage {
fn list_streaming(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> impl Stream<Item = Result<Listing, DownloadError>> {
let customize_builder = |builder| builder;
let kind = RequestKind::ListVersions;
self.list_streaming_for_fn(prefix, mode, max_keys, cancel, kind, customize_builder)
}
async fn list_versions(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> std::result::Result<crate::VersionListing, DownloadError> {
let customize_builder = |mut builder: ListBlobsBuilder| {
builder = builder.include_versions(true);
builder
};
let kind = RequestKind::ListVersions;
let mut stream = std::pin::pin!(self.list_streaming_for_fn(
prefix,
mode,
max_keys,
cancel,
kind,
customize_builder
));
let mut combined: crate::VersionListing =
stream.next().await.expect("At least one item required")?;
while let Some(list) = stream.next().await {
let list = list?;
combined.versions.extend(list.versions.into_iter());
}
Ok(combined)
}
async fn head_object(
&self,
key: &RemotePath,
@@ -532,7 +613,12 @@ impl RemoteStorage for AzureBlobStorage {
let mut builder = blob_client.get();
if let Some(ref etag) = opts.etag {
builder = builder.if_match(IfMatchCondition::NotMatch(etag.to_string()))
builder = builder.if_match(IfMatchCondition::NotMatch(etag.to_string()));
}
if let Some(ref version_id) = opts.version_id {
let version_id = azure_storage_blobs::prelude::VersionId::new(version_id.0.clone());
builder = builder.blob_versioning(version_id);
}
if let Some((start, end)) = opts.byte_range() {

View File

@@ -176,6 +176,32 @@ pub struct Listing {
pub keys: Vec<ListingObject>,
}
#[derive(Default)]
pub struct VersionListing {
pub versions: Vec<Version>,
}
pub struct Version {
pub key: RemotePath,
pub last_modified: SystemTime,
pub kind: VersionKind,
}
impl Version {
pub fn version_id(&self) -> Option<&VersionId> {
match &self.kind {
VersionKind::Version(id) => Some(id),
VersionKind::DeletionMarker => None,
}
}
}
#[derive(Debug)]
pub enum VersionKind {
DeletionMarker,
Version(VersionId),
}
/// Options for downloads. The default value is a plain GET.
pub struct DownloadOpts {
/// If given, returns [`DownloadError::Unmodified`] if the object still has
@@ -186,6 +212,8 @@ pub struct DownloadOpts {
/// The end of the byte range to download, or unbounded. Must be after the
/// start bound.
pub byte_end: Bound<u64>,
/// Optionally request a specific version of a key
pub version_id: Option<VersionId>,
/// Indicate whether we're downloading something small or large: this indirectly controls
/// timeouts: for something like an index/manifest/heatmap, we should time out faster than
/// for layer files
@@ -197,12 +225,16 @@ pub enum DownloadKind {
Small,
}
#[derive(Debug, Clone)]
pub struct VersionId(pub String);
impl Default for DownloadOpts {
fn default() -> Self {
Self {
etag: Default::default(),
byte_start: Bound::Unbounded,
byte_end: Bound::Unbounded,
version_id: None,
kind: DownloadKind::Large,
}
}
@@ -295,6 +327,14 @@ pub trait RemoteStorage: Send + Sync + 'static {
Ok(combined)
}
async fn list_versions(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> Result<VersionListing, DownloadError>;
/// Obtain metadata information about an object.
async fn head_object(
&self,
@@ -475,6 +515,22 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
}
}
// See [`RemoteStorage::list_versions`].
pub async fn list_versions<'a>(
&'a self,
prefix: Option<&'a RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &'a CancellationToken,
) -> Result<VersionListing, DownloadError> {
match self {
Self::LocalFs(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
Self::AwsS3(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
Self::AzureBlob(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
Self::Unreliable(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
}
}
// See [`RemoteStorage::head_object`].
pub async fn head_object(
&self,
@@ -727,6 +783,7 @@ impl ConcurrencyLimiter {
RequestKind::Copy => &self.write,
RequestKind::TimeTravel => &self.write,
RequestKind::Head => &self.read,
RequestKind::ListVersions => &self.read,
}
}

View File

@@ -445,6 +445,16 @@ impl RemoteStorage for LocalFs {
}
}
async fn list_versions(
&self,
_prefix: Option<&RemotePath>,
_mode: ListingMode,
_max_keys: Option<NonZeroU32>,
_cancel: &CancellationToken,
) -> Result<crate::VersionListing, DownloadError> {
unimplemented!()
}
async fn head_object(
&self,
key: &RemotePath,

View File

@@ -14,6 +14,7 @@ pub(crate) enum RequestKind {
Copy = 4,
TimeTravel = 5,
Head = 6,
ListVersions = 7,
}
use RequestKind::*;
@@ -29,6 +30,7 @@ impl RequestKind {
Copy => "copy_object",
TimeTravel => "time_travel_recover",
Head => "head_object",
ListVersions => "list_versions",
}
}
const fn as_index(&self) -> usize {
@@ -36,7 +38,10 @@ impl RequestKind {
}
}
const REQUEST_KIND_COUNT: usize = 7;
const REQUEST_KIND_LIST: &[RequestKind] =
&[Get, Put, Delete, List, Copy, TimeTravel, Head, ListVersions];
const REQUEST_KIND_COUNT: usize = REQUEST_KIND_LIST.len();
pub(crate) struct RequestTyped<C>([C; REQUEST_KIND_COUNT]);
impl<C> RequestTyped<C> {
@@ -45,12 +50,11 @@ impl<C> RequestTyped<C> {
}
fn build_with(mut f: impl FnMut(RequestKind) -> C) -> Self {
use RequestKind::*;
let mut it = [Get, Put, Delete, List, Copy, TimeTravel, Head].into_iter();
let mut it = REQUEST_KIND_LIST.iter();
let arr = std::array::from_fn::<C, REQUEST_KIND_COUNT, _>(|index| {
let next = it.next().unwrap();
assert_eq!(index, next.as_index());
f(next)
f(*next)
});
if let Some(next) = it.next() {

View File

@@ -21,9 +21,8 @@ use aws_sdk_s3::config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep};
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::operation::get_object::GetObjectError;
use aws_sdk_s3::operation::head_object::HeadObjectError;
use aws_sdk_s3::types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion, StorageClass};
use aws_sdk_s3::types::{Delete, ObjectIdentifier, StorageClass};
use aws_smithy_async::rt::sleep::TokioSleep;
use aws_smithy_types::DateTime;
use aws_smithy_types::body::SdkBody;
use aws_smithy_types::byte_stream::ByteStream;
use aws_smithy_types::date_time::ConversionError;
@@ -46,7 +45,7 @@ use crate::support::PermitCarrying;
use crate::{
ConcurrencyLimiter, Download, DownloadError, DownloadOpts, Listing, ListingMode, ListingObject,
MAX_KEYS_PER_DELETE_S3, REMOTE_STORAGE_PREFIX_SEPARATOR, RemotePath, RemoteStorage,
TimeTravelError, TimeoutOrCancel,
TimeTravelError, TimeoutOrCancel, Version, VersionId, VersionKind, VersionListing,
};
/// AWS S3 storage.
@@ -66,6 +65,7 @@ struct GetObjectRequest {
key: String,
etag: Option<String>,
range: Option<String>,
version_id: Option<String>,
}
impl S3Bucket {
/// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
@@ -251,6 +251,7 @@ impl S3Bucket {
.get_object()
.bucket(request.bucket)
.key(request.key)
.set_version_id(request.version_id)
.set_range(request.range);
if let Some(etag) = request.etag {
@@ -405,6 +406,124 @@ impl S3Bucket {
Ok(())
}
async fn list_versions_with_permit(
&self,
_permit: &tokio::sync::SemaphorePermit<'_>,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> Result<crate::VersionListing, DownloadError> {
// get the passed prefix or if it is not set use prefix_in_bucket value
let prefix = prefix
.map(|p| self.relative_path_to_s3_object(p))
.or_else(|| self.prefix_in_bucket.clone());
let warn_threshold = 3;
let max_retries = 10;
let is_permanent = |e: &_| matches!(e, DownloadError::Cancelled);
let mut key_marker = None;
let mut version_id_marker = None;
let mut versions_and_deletes = Vec::new();
loop {
let response = backoff::retry(
|| async {
let mut request = self
.client
.list_object_versions()
.bucket(self.bucket_name.clone())
.set_prefix(prefix.clone())
.set_key_marker(key_marker.clone())
.set_version_id_marker(version_id_marker.clone());
if let ListingMode::WithDelimiter = mode {
request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
}
let op = request.send();
tokio::select! {
res = op => res.map_err(|e| DownloadError::Other(e.into())),
_ = cancel.cancelled() => Err(DownloadError::Cancelled),
}
},
is_permanent,
warn_threshold,
max_retries,
"listing object versions",
cancel,
)
.await
.ok_or_else(|| DownloadError::Cancelled)
.and_then(|x| x)?;
tracing::trace!(
" Got List response version_id_marker={:?}, key_marker={:?}",
response.version_id_marker,
response.key_marker
);
let versions = response
.versions
.unwrap_or_default()
.into_iter()
.map(|version| {
let key = version.key.expect("response does not contain a key");
let key = self.s3_object_to_relative_path(&key);
let version_id = VersionId(version.version_id.expect("needing version id"));
let last_modified =
SystemTime::try_from(version.last_modified.expect("no last_modified"))?;
Ok(Version {
key,
last_modified,
kind: crate::VersionKind::Version(version_id),
})
});
let deletes = response
.delete_markers
.unwrap_or_default()
.into_iter()
.map(|version| {
let key = version.key.expect("response does not contain a key");
let key = self.s3_object_to_relative_path(&key);
let last_modified =
SystemTime::try_from(version.last_modified.expect("no last_modified"))?;
Ok(Version {
key,
last_modified,
kind: crate::VersionKind::DeletionMarker,
})
});
itertools::process_results(versions.chain(deletes), |n_vds| {
versions_and_deletes.extend(n_vds)
})
.map_err(DownloadError::Other)?;
fn none_if_empty(v: Option<String>) -> Option<String> {
v.filter(|v| !v.is_empty())
}
version_id_marker = none_if_empty(response.next_version_id_marker);
key_marker = none_if_empty(response.next_key_marker);
if version_id_marker.is_none() {
// The final response is not supposed to be truncated
if response.is_truncated.unwrap_or_default() {
return Err(DownloadError::Other(anyhow::anyhow!(
"Received truncated ListObjectVersions response for prefix={prefix:?}"
)));
}
break;
}
if let Some(max_keys) = max_keys {
if versions_and_deletes.len() >= max_keys.get().try_into().unwrap() {
return Err(DownloadError::Other(anyhow::anyhow!("too many versions")));
}
}
}
Ok(VersionListing {
versions: versions_and_deletes,
})
}
pub fn bucket_name(&self) -> &str {
&self.bucket_name
}
@@ -621,6 +740,19 @@ impl RemoteStorage for S3Bucket {
}
}
async fn list_versions(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> Result<crate::VersionListing, DownloadError> {
let kind = RequestKind::ListVersions;
let permit = self.permit(kind, cancel).await?;
self.list_versions_with_permit(&permit, prefix, mode, max_keys, cancel)
.await
}
async fn head_object(
&self,
key: &RemotePath,
@@ -801,6 +933,7 @@ impl RemoteStorage for S3Bucket {
key: self.relative_path_to_s3_object(from),
etag: opts.etag.as_ref().map(|e| e.to_string()),
range: opts.byte_range_header(),
version_id: opts.version_id.as_ref().map(|v| v.0.to_owned()),
},
cancel,
)
@@ -845,94 +978,25 @@ impl RemoteStorage for S3Bucket {
let kind = RequestKind::TimeTravel;
let permit = self.permit(kind, cancel).await?;
let timestamp = DateTime::from(timestamp);
let done_if_after = DateTime::from(done_if_after);
tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
// get the passed prefix or if it is not set use prefix_in_bucket value
let prefix = prefix
.map(|p| self.relative_path_to_s3_object(p))
.or_else(|| self.prefix_in_bucket.clone());
// Limit the number of versions deletions, mostly so that we don't
// keep requesting forever if the list is too long, as we'd put the
// list in RAM.
// Building a list of 100k entries that reaches the limit roughly takes
// 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
const COMPLEXITY_LIMIT: Option<NonZeroU32> = NonZeroU32::new(100_000);
let warn_threshold = 3;
let max_retries = 10;
let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled);
let mut key_marker = None;
let mut version_id_marker = None;
let mut versions_and_deletes = Vec::new();
loop {
let response = backoff::retry(
|| async {
let op = self
.client
.list_object_versions()
.bucket(self.bucket_name.clone())
.set_prefix(prefix.clone())
.set_key_marker(key_marker.clone())
.set_version_id_marker(version_id_marker.clone())
.send();
tokio::select! {
res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
_ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
}
},
is_permanent,
warn_threshold,
max_retries,
"listing object versions for time_travel_recover",
cancel,
)
let mode = ListingMode::NoDelimiter;
let version_listing = self
.list_versions_with_permit(&permit, prefix, mode, COMPLEXITY_LIMIT, cancel)
.await
.ok_or_else(|| TimeTravelError::Cancelled)
.and_then(|x| x)?;
tracing::trace!(
" Got List response version_id_marker={:?}, key_marker={:?}",
response.version_id_marker,
response.key_marker
);
let versions = response
.versions
.unwrap_or_default()
.into_iter()
.map(VerOrDelete::from_version);
let deletes = response
.delete_markers
.unwrap_or_default()
.into_iter()
.map(VerOrDelete::from_delete_marker);
itertools::process_results(versions.chain(deletes), |n_vds| {
versions_and_deletes.extend(n_vds)
})
.map_err(TimeTravelError::Other)?;
fn none_if_empty(v: Option<String>) -> Option<String> {
v.filter(|v| !v.is_empty())
}
version_id_marker = none_if_empty(response.next_version_id_marker);
key_marker = none_if_empty(response.next_key_marker);
if version_id_marker.is_none() {
// The final response is not supposed to be truncated
if response.is_truncated.unwrap_or_default() {
return Err(TimeTravelError::Other(anyhow::anyhow!(
"Received truncated ListObjectVersions response for prefix={prefix:?}"
)));
}
break;
}
// Limit the number of versions deletions, mostly so that we don't
// keep requesting forever if the list is too long, as we'd put the
// list in RAM.
// Building a list of 100k entries that reaches the limit roughly takes
// 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
const COMPLEXITY_LIMIT: usize = 100_000;
if versions_and_deletes.len() >= COMPLEXITY_LIMIT {
return Err(TimeTravelError::TooManyVersions);
}
}
.map_err(|err| match err {
DownloadError::Other(e) => TimeTravelError::Other(e),
DownloadError::Cancelled => TimeTravelError::Cancelled,
other => TimeTravelError::Other(other.into()),
})?;
let versions_and_deletes = version_listing.versions;
tracing::info!(
"Built list for time travel with {} versions and deletions",
@@ -948,24 +1012,26 @@ impl RemoteStorage for S3Bucket {
let mut vds_for_key = HashMap::<_, Vec<_>>::new();
for vd in &versions_and_deletes {
let VerOrDelete {
version_id, key, ..
} = &vd;
if version_id == "null" {
let Version { key, .. } = &vd;
let version_id = vd.version_id().map(|v| v.0.as_str());
if version_id == Some("null") {
return Err(TimeTravelError::Other(anyhow!(
"Received ListVersions response for key={key} with version_id='null', \
indicating either disabled versioning, or legacy objects with null version id values"
)));
}
tracing::trace!(
"Parsing version key={key} version_id={version_id} kind={:?}",
vd.kind
);
tracing::trace!("Parsing version key={key} kind={:?}", vd.kind);
vds_for_key.entry(key).or_default().push(vd);
}
let warn_threshold = 3;
let max_retries = 10;
let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled);
for (key, versions) in vds_for_key {
let last_vd = versions.last().unwrap();
let key = self.relative_path_to_s3_object(key);
if last_vd.last_modified > done_if_after {
tracing::trace!("Key {key} has version later than done_if_after, skipping");
continue;
@@ -990,11 +1056,11 @@ impl RemoteStorage for S3Bucket {
do_delete = true;
} else {
match &versions[version_to_restore_to - 1] {
VerOrDelete {
kind: VerOrDeleteKind::Version,
version_id,
Version {
kind: VersionKind::Version(version_id),
..
} => {
let version_id = &version_id.0;
tracing::trace!("Copying old version {version_id} for {key}...");
// Restore the state to the last version by copying
let source_id =
@@ -1006,7 +1072,7 @@ impl RemoteStorage for S3Bucket {
.client
.copy_object()
.bucket(self.bucket_name.clone())
.key(key)
.key(&key)
.set_storage_class(self.upload_storage_class.clone())
.copy_source(&source_id)
.send();
@@ -1027,8 +1093,8 @@ impl RemoteStorage for S3Bucket {
.and_then(|x| x)?;
tracing::info!(%version_id, %key, "Copied old version in S3");
}
VerOrDelete {
kind: VerOrDeleteKind::DeleteMarker,
Version {
kind: VersionKind::DeletionMarker,
..
} => {
do_delete = true;
@@ -1036,7 +1102,7 @@ impl RemoteStorage for S3Bucket {
}
};
if do_delete {
if matches!(last_vd.kind, VerOrDeleteKind::DeleteMarker) {
if matches!(last_vd.kind, VersionKind::DeletionMarker) {
// Key has since been deleted (but there was some history), no need to do anything
tracing::trace!("Key {key} already deleted, skipping.");
} else {
@@ -1064,62 +1130,6 @@ impl RemoteStorage for S3Bucket {
}
}
// Save RAM and only store the needed data instead of the entire ObjectVersion/DeleteMarkerEntry
struct VerOrDelete {
kind: VerOrDeleteKind,
last_modified: DateTime,
version_id: String,
key: String,
}
#[derive(Debug)]
enum VerOrDeleteKind {
Version,
DeleteMarker,
}
impl VerOrDelete {
fn with_kind(
kind: VerOrDeleteKind,
last_modified: Option<DateTime>,
version_id: Option<String>,
key: Option<String>,
) -> anyhow::Result<Self> {
let lvk = (last_modified, version_id, key);
let (Some(last_modified), Some(version_id), Some(key)) = lvk else {
anyhow::bail!(
"One (or more) of last_modified, key, and id is None. \
Is versioning enabled in the bucket? last_modified={:?}, version_id={:?}, key={:?}",
lvk.0,
lvk.1,
lvk.2,
);
};
Ok(Self {
kind,
last_modified,
version_id,
key,
})
}
fn from_version(v: ObjectVersion) -> anyhow::Result<Self> {
Self::with_kind(
VerOrDeleteKind::Version,
v.last_modified,
v.version_id,
v.key,
)
}
fn from_delete_marker(v: DeleteMarkerEntry) -> anyhow::Result<Self> {
Self::with_kind(
VerOrDeleteKind::DeleteMarker,
v.last_modified,
v.version_id,
v.key,
)
}
}
#[cfg(test)]
mod tests {
use std::num::NonZeroUsize;

View File

@@ -139,6 +139,20 @@ impl RemoteStorage for UnreliableWrapper {
self.inner.list(prefix, mode, max_keys, cancel).await
}
async fn list_versions(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> Result<crate::VersionListing, DownloadError> {
self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))
.map_err(DownloadError::Other)?;
self.inner
.list_versions(prefix, mode, max_keys, cancel)
.await
}
async fn head_object(
&self,
key: &RemotePath,

View File

@@ -11,6 +11,7 @@ use pageserver::task_mgr::TaskKind;
use pageserver::tenant::storage_layer::InMemoryLayer;
use pageserver::{page_cache, virtual_file};
use pageserver_api::key::Key;
use pageserver_api::models::virtual_file::IoMode;
use pageserver_api::shard::TenantShardId;
use pageserver_api::value::Value;
use tokio_util::sync::CancellationToken;
@@ -28,6 +29,7 @@ fn murmurhash32(mut h: u32) -> u32 {
h
}
#[derive(serde::Serialize, Clone, Copy, Debug)]
enum KeyLayout {
/// Sequential unique keys
Sequential,
@@ -37,6 +39,7 @@ enum KeyLayout {
RandomReuse(u32),
}
#[derive(serde::Serialize, Clone, Copy, Debug)]
enum WriteDelta {
Yes,
No,
@@ -58,7 +61,7 @@ async fn ingest(
tokio::fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id)).await?;
let ctx =
let ctx2 =
RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error).with_scope_debug_tools();
let gate = utils::sync::gate::Gate::default();
@@ -138,12 +141,15 @@ async fn ingest(
/// Wrapper to instantiate a tokio runtime
fn ingest_main(
conf: &'static PageServerConf,
io_mode: IoMode,
put_size: usize,
put_count: usize,
key_layout: KeyLayout,
write_delta: WriteDelta,
) {
let runtime = tokio::runtime::Builder::new_current_thread()
pageserver::virtual_file::set_io_mode(io_mode);
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
@@ -174,93 +180,207 @@ fn criterion_benchmark(c: &mut Criterion) {
virtual_file::init(
16384,
virtual_file::io_engine_for_bench(),
// immaterial, each `ingest_main` invocation below overrides this
conf.virtual_file_io_mode,
// without actually doing syncs, buffered writes have an unfair advantage over direct IO writes
virtual_file::SyncMode::Sync,
);
page_cache::init(conf.page_cache_size);
{
let mut group = c.benchmark_group("ingest-small-values");
let put_size = 100usize;
let put_count = 128 * 1024 * 1024 / put_size;
group.throughput(criterion::Throughput::Bytes((put_size * put_count) as u64));
group.sample_size(10);
group.bench_function("ingest 128MB/100b seq", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Sequential,
WriteDelta::Yes,
)
})
});
group.bench_function("ingest 128MB/100b rand", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Random,
WriteDelta::Yes,
)
})
});
group.bench_function("ingest 128MB/100b rand-1024keys", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::RandomReuse(0x3ff),
WriteDelta::Yes,
)
})
});
group.bench_function("ingest 128MB/100b seq, no delta", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Sequential,
WriteDelta::No,
)
})
});
#[derive(serde::Serialize)]
struct ExplodedParameters {
io_mode: IoMode,
volume_mib: usize,
key_size: usize,
key_layout: KeyLayout,
write_delta: WriteDelta,
}
{
let mut group = c.benchmark_group("ingest-big-values");
let put_size = 8192usize;
let put_count = 128 * 1024 * 1024 / put_size;
group.throughput(criterion::Throughput::Bytes((put_size * put_count) as u64));
#[derive(Clone)]
struct HandPickedParameters {
volume_mib: usize,
key_size: usize,
key_layout: KeyLayout,
write_delta: WriteDelta,
}
let expect = vec![
// Small values (100b) tests
HandPickedParameters {
volume_mib: 128,
key_size: 100,
key_layout: KeyLayout::Sequential,
write_delta: WriteDelta::Yes,
},
HandPickedParameters {
volume_mib: 128,
key_size: 100,
key_layout: KeyLayout::Random,
write_delta: WriteDelta::Yes,
},
HandPickedParameters {
volume_mib: 128,
key_size: 100,
key_layout: KeyLayout::RandomReuse(0x3ff),
write_delta: WriteDelta::Yes,
},
HandPickedParameters {
volume_mib: 128,
key_size: 100,
key_layout: KeyLayout::Sequential,
write_delta: WriteDelta::No,
},
// Large values (8k) tests
HandPickedParameters {
volume_mib: 128,
key_size: 8192,
key_layout: KeyLayout::Sequential,
write_delta: WriteDelta::Yes,
},
HandPickedParameters {
volume_mib: 128,
key_size: 8192,
key_layout: KeyLayout::Sequential,
write_delta: WriteDelta::No,
},
];
let exploded_parameters = {
let mut out = Vec::new();
for io_mode in [
IoMode::Buffered,
#[cfg(target_os = "linux")]
IoMode::Direct,
] {
for param in expect.clone() {
let HandPickedParameters {
volume_mib,
key_size,
key_layout,
write_delta,
} = param;
out.push(ExplodedParameters {
io_mode,
volume_mib,
key_size,
key_layout,
write_delta,
});
}
}
out
};
impl ExplodedParameters {
fn benchmark_id(&self) -> String {
let ExplodedParameters {
io_mode,
volume_mib,
key_size,
key_layout,
write_delta,
} = self;
format!(
"io_mode={io_mode:?} volume_mib={volume_mib:?} key_size_bytes={key_size:?} key_layout={key_layout:?} write_delta={write_delta:?}"
)
}
}
let mut group = c.benchmark_group("ingest");
for params in exploded_parameters {
let id = params.benchmark_id();
let ExplodedParameters {
io_mode,
volume_mib,
key_size,
key_layout,
write_delta,
} = params;
let put_count = volume_mib * 1024 * 1024 / key_size;
group.throughput(criterion::Throughput::Bytes((key_size * put_count) as u64));
group.sample_size(10);
group.bench_function("ingest 128MB/8k seq", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Sequential,
WriteDelta::Yes,
)
})
});
group.bench_function("ingest 128MB/8k seq, no delta", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Sequential,
WriteDelta::No,
)
})
group.bench_function(id, |b| {
b.iter(|| ingest_main(conf, io_mode, key_size, put_count, key_layout, write_delta))
});
}
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
/*
cargo bench --bench bench_ingest
im4gn.2xlarge:
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
time: [1.8491 s 1.8540 s 1.8592 s]
thrpt: [68.847 MiB/s 69.039 MiB/s 69.222 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
time: [2.6976 s 2.7123 s 2.7286 s]
thrpt: [46.911 MiB/s 47.193 MiB/s 47.450 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Y...
time: [1.7433 s 1.7510 s 1.7600 s]
thrpt: [72.729 MiB/s 73.099 MiB/s 73.423 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
time: [499.63 ms 500.07 ms 500.46 ms]
thrpt: [255.77 MiB/s 255.96 MiB/s 256.19 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
time: [456.97 ms 459.61 ms 461.92 ms]
thrpt: [277.11 MiB/s 278.50 MiB/s 280.11 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
time: [158.82 ms 159.16 ms 159.56 ms]
thrpt: [802.22 MiB/s 804.24 MiB/s 805.93 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
time: [1.8856 s 1.8997 s 1.9179 s]
thrpt: [66.740 MiB/s 67.380 MiB/s 67.882 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
time: [2.7468 s 2.7625 s 2.7785 s]
thrpt: [46.068 MiB/s 46.335 MiB/s 46.600 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Yes
time: [1.7689 s 1.7726 s 1.7767 s]
thrpt: [72.045 MiB/s 72.208 MiB/s 72.363 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
time: [497.64 ms 498.60 ms 499.67 ms]
thrpt: [256.17 MiB/s 256.72 MiB/s 257.21 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
time: [493.72 ms 505.07 ms 518.03 ms]
thrpt: [247.09 MiB/s 253.43 MiB/s 259.26 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
time: [267.76 ms 267.85 ms 267.96 ms]
thrpt: [477.69 MiB/s 477.88 MiB/s 478.03 MiB/s]
Hetzner AX102:
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
time: [1.0683 s 1.1006 s 1.1386 s]
thrpt: [112.42 MiB/s 116.30 MiB/s 119.82 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
time: [1.5719 s 1.6012 s 1.6228 s]
thrpt: [78.877 MiB/s 79.938 MiB/s 81.430 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Y...
time: [1.1095 s 1.1331 s 1.1580 s]
thrpt: [110.53 MiB/s 112.97 MiB/s 115.37 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
time: [303.20 ms 307.83 ms 311.90 ms]
thrpt: [410.39 MiB/s 415.81 MiB/s 422.16 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
time: [406.34 ms 429.37 ms 451.63 ms]
thrpt: [283.42 MiB/s 298.11 MiB/s 315.00 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
time: [134.01 ms 135.78 ms 137.48 ms]
thrpt: [931.03 MiB/s 942.68 MiB/s 955.12 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
time: [1.0406 s 1.0580 s 1.0772 s]
thrpt: [118.83 MiB/s 120.98 MiB/s 123.00 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
time: [1.5059 s 1.5339 s 1.5625 s]
thrpt: [81.920 MiB/s 83.448 MiB/s 84.999 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Yes
time: [1.0714 s 1.0934 s 1.1161 s]
thrpt: [114.69 MiB/s 117.06 MiB/s 119.47 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
time: [262.68 ms 265.14 ms 267.71 ms]
thrpt: [478.13 MiB/s 482.76 MiB/s 487.29 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
time: [375.19 ms 393.80 ms 411.40 ms]
thrpt: [311.14 MiB/s 325.04 MiB/s 341.16 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
time: [123.02 ms 123.85 ms 124.66 ms]
thrpt: [1.0027 GiB/s 1.0093 GiB/s 1.0161 GiB/s]
*/

View File

@@ -419,6 +419,23 @@ impl Client {
}
}
pub async fn timeline_detail(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
) -> Result<TimelineInfo> {
let uri = format!(
"{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
self.mgmt_api_endpoint
);
self.request(Method::GET, &uri, ())
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
pub async fn timeline_archival_config(
&self,
tenant_shard_id: TenantShardId,

View File

@@ -225,6 +225,11 @@ pub struct PageServerConf {
/// Does not force TLS: the client negotiates TLS usage during the handshake.
/// Uses key and certificate from ssl_key_file/ssl_cert_file.
pub enable_tls_page_service_api: bool,
/// Run in development mode, which disables certain safety checks
/// such as authentication requirements for HTTP and PostgreSQL APIs.
/// This is insecure and should only be used in development environments.
pub dev_mode: bool,
}
/// Token for authentication to safekeepers
@@ -398,6 +403,7 @@ impl PageServerConf {
generate_unarchival_heatmap,
tracing,
enable_tls_page_service_api,
dev_mode,
} = config_toml;
let mut conf = PageServerConf {
@@ -449,6 +455,7 @@ impl PageServerConf {
get_vectored_concurrent_io,
tracing,
enable_tls_page_service_api,
dev_mode,
// ------------------------------------------------------------
// fields that require additional validation or custom handling

View File

@@ -263,7 +263,9 @@ where
while let Some((tenant_id, tenant)) = tenants.next().await {
let mut tenant_resident_size = 0;
for timeline in tenant.list_timelines() {
let timelines = tenant.list_timelines();
let timelines_len = timelines.len();
for timeline in timelines {
let timeline_id = timeline.timeline_id;
match TimelineSnapshot::collect(&timeline, ctx) {
@@ -289,6 +291,11 @@ where
tenant_resident_size += timeline.resident_physical_size();
}
if timelines_len == 0 {
// Force set it to 1 byte to avoid not being reported -- all timelines are offloaded.
tenant_resident_size = 1;
}
let snap = TenantSnapshot::collect(&tenant, tenant_resident_size);
snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics);
}

View File

@@ -3,10 +3,11 @@ use std::collections::HashMap;
use futures::Future;
use pageserver_api::config::NodeMetadata;
use pageserver_api::controller_api::{AvailabilityZone, NodeRegisterRequest};
use pageserver_api::models::ShardImportStatus;
use pageserver_api::shard::TenantShardId;
use pageserver_api::upcall_api::{
ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest,
ValidateRequestTenant, ValidateResponse,
PutTimelineImportStatusRequest, ReAttachRequest, ReAttachResponse, ReAttachResponseTenant,
ValidateRequest, ValidateRequestTenant, ValidateResponse,
};
use reqwest::Certificate;
use serde::Serialize;
@@ -14,7 +15,7 @@ use serde::de::DeserializeOwned;
use tokio_util::sync::CancellationToken;
use url::Url;
use utils::generation::Generation;
use utils::id::NodeId;
use utils::id::{NodeId, TimelineId};
use utils::{backoff, failpoint_support};
use crate::config::PageServerConf;
@@ -46,6 +47,12 @@ pub trait StorageControllerUpcallApi {
&self,
tenants: Vec<(TenantShardId, Generation)>,
) -> impl Future<Output = Result<HashMap<TenantShardId, bool>, RetryForeverError>> + Send;
fn put_timeline_import_status(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
status: ShardImportStatus,
) -> impl Future<Output = Result<(), RetryForeverError>> + Send;
}
impl StorageControllerUpcallClient {
@@ -273,4 +280,30 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient {
Ok(result.into_iter().collect())
}
/// Send a shard import status to the storage controller
///
/// The implementation must have at-least-once delivery semantics.
/// To this end, we retry the request until it succeeds. If the pageserver
/// restarts or crashes, the shard import will start again from the beggining.
#[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context
async fn put_timeline_import_status(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
status: ShardImportStatus,
) -> Result<(), RetryForeverError> {
let url = self
.base_url
.join("timeline_import_status")
.expect("Failed to build path");
let request = PutTimelineImportStatusRequest {
tenant_shard_id,
timeline_id,
status,
};
self.retry_http_forever(&url, request).await
}
}

View File

@@ -787,6 +787,15 @@ mod test {
Ok(result)
}
async fn put_timeline_import_status(
&self,
_tenant_shard_id: TenantShardId,
_timeline_id: TimelineId,
_status: pageserver_api::models::ShardImportStatus,
) -> Result<(), RetryForeverError> {
unimplemented!()
}
}
async fn setup(test_name: &str) -> anyhow::Result<TestSetup> {

View File

@@ -28,7 +28,7 @@ use tracing::warn;
use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
use crate::tenant::block_io::BlockCursor;
use crate::virtual_file::VirtualFile;
use crate::virtual_file::TempVirtualFile;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
#[derive(Copy, Clone, Debug)]
@@ -218,7 +218,7 @@ pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
/// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
/// manually before dropping.
pub struct BlobWriter<const BUFFERED: bool> {
inner: VirtualFile,
inner: TempVirtualFile,
offset: u64,
/// A buffer to save on write calls, only used if BUFFERED=true
buf: Vec<u8>,
@@ -228,7 +228,7 @@ pub struct BlobWriter<const BUFFERED: bool> {
impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
pub fn new(
inner: VirtualFile,
inner: TempVirtualFile,
start_offset: u64,
_gate: &utils::sync::gate::Gate,
_cancel: CancellationToken,
@@ -476,30 +476,17 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
}
}
impl BlobWriter<true> {
/// Access the underlying `VirtualFile`.
impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
/// Finish this blob writer and return the underlying [`TempVirtualFile`].
///
/// This function flushes the internal buffer before giving access
/// to the underlying `VirtualFile`.
pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<VirtualFile, Error> {
self.flush_buffer(ctx).await?;
/// If there is an internal buffer (depends on `BUFFERED`), it will
/// be flushed before this method returns.
pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<TempVirtualFile, Error> {
if BUFFERED {
self.flush_buffer(ctx).await?;
}
Ok(self.inner)
}
/// Access the underlying `VirtualFile`.
///
/// Unlike [`into_inner`](Self::into_inner), this doesn't flush
/// the internal buffer before giving access.
pub fn into_inner_no_flush(self) -> VirtualFile {
self.inner
}
}
impl BlobWriter<false> {
/// Access the underlying `VirtualFile`.
pub fn into_inner(self) -> VirtualFile {
self.inner
}
}
#[cfg(test)]
@@ -512,6 +499,7 @@ pub(crate) mod tests {
use crate::context::DownloadBehavior;
use crate::task_mgr::TaskKind;
use crate::tenant::block_io::BlockReaderRef;
use crate::virtual_file::VirtualFile;
async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
round_trip_test_compressed::<BUFFERED>(blobs, false).await
@@ -530,7 +518,10 @@ pub(crate) mod tests {
// Write part (in block to drop the file)
let mut offsets = Vec::new();
{
let file = VirtualFile::create(pathbuf.as_path(), ctx).await?;
let file = TempVirtualFile::new(
VirtualFile::create(pathbuf.as_path(), ctx).await?,
gate.enter().unwrap(),
);
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0, &gate, cancel.clone(), ctx);
for blob in blobs.iter() {
let (_, res) = if compression {
@@ -553,7 +544,9 @@ pub(crate) mod tests {
let (_, res) = wtr.write_blob(vec![0; PAGE_SZ].slice_len(), ctx).await;
let offs = res?;
println!("Writing final blob at offs={offs}");
wtr.flush_buffer(ctx).await?;
let file = wtr.into_inner(ctx).await?;
file.disarm_into_inner();
}
Ok((temp_dir, pathbuf, offsets))
}

View File

@@ -12,6 +12,7 @@ use tokio_epoll_uring::{BoundedBuf, Slice};
use tokio_util::sync::CancellationToken;
use tracing::{error, info_span};
use utils::id::TimelineId;
use utils::sync::gate::GateGuard;
use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64};
use crate::config::PageServerConf;
@@ -21,16 +22,33 @@ use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
use crate::virtual_file::owned_buffers_io::write::{Buffer, FlushTaskError};
use crate::virtual_file::{self, IoBufferMut, VirtualFile, owned_buffers_io};
use crate::virtual_file::{self, IoBufferMut, TempVirtualFile, VirtualFile, owned_buffers_io};
use self::owned_buffers_io::write::OwnedAsyncWriter;
pub struct EphemeralFile {
_tenant_shard_id: TenantShardId,
_timeline_id: TimelineId,
page_cache_file_id: page_cache::FileId,
bytes_written: u64,
buffered_writer: owned_buffers_io::write::BufferedWriter<IoBufferMut, VirtualFile>,
/// Gate guard is held on as long as we need to do operations in the path (delete on drop)
_gate_guard: utils::sync::gate::GateGuard,
file: TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter,
buffered_writer: BufferedWriter,
}
type BufferedWriter = owned_buffers_io::write::BufferedWriter<
IoBufferMut,
TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter,
>;
/// A TempVirtualFile that is co-owned by the [`EphemeralFile`]` and [`BufferedWriter`].
///
/// (Actually [`BufferedWriter`] internally is just a client to a background flush task.
/// The co-ownership is between [`EphemeralFile`] and that flush task.)
///
/// Co-ownership allows us to serve reads for data that has already been flushed by the [`BufferedWriter`].
#[derive(Debug, Clone)]
struct TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
inner: Arc<TempVirtualFile>,
}
const TAIL_SZ: usize = 64 * 1024;
@@ -44,9 +62,12 @@ impl EphemeralFile {
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<EphemeralFile> {
static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
// TempVirtualFile requires us to never reuse a filename while an old
// instance of TempVirtualFile created with that filename is not done dropping yet.
// So, we use a monotonic counter to disambiguate the filenames.
static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
let filename_disambiguator =
NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let filename = conf
.timeline_path(&tenant_shard_id, &timeline_id)
@@ -54,7 +75,7 @@ impl EphemeralFile {
"ephemeral-{filename_disambiguator}"
)));
let file = Arc::new(
let file = TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter::new(
VirtualFile::open_with_options_v2(
&filename,
virtual_file::OpenOptions::new()
@@ -64,6 +85,7 @@ impl EphemeralFile {
ctx,
)
.await?,
gate.enter()?,
);
let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore
@@ -73,7 +95,8 @@ impl EphemeralFile {
_timeline_id: timeline_id,
page_cache_file_id,
bytes_written: 0,
buffered_writer: owned_buffers_io::write::BufferedWriter::new(
file: file.clone(),
buffered_writer: BufferedWriter::new(
file,
|| IoBufferMut::with_capacity(TAIL_SZ),
gate.enter()?,
@@ -81,29 +104,42 @@ impl EphemeralFile {
ctx,
info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename),
),
_gate_guard: gate.enter()?,
})
}
}
impl Drop for EphemeralFile {
fn drop(&mut self) {
// unlink the file
// we are clear to do this, because we have entered a gate
let path = self.buffered_writer.as_inner().path();
let res = std::fs::remove_file(path);
if let Err(e) = res {
if e.kind() != std::io::ErrorKind::NotFound {
// just never log the not found errors, we cannot do anything for them; on detach
// the tenant directory is already gone.
//
// not found files might also be related to https://github.com/neondatabase/neon/issues/2442
error!("could not remove ephemeral file '{path}': {e}");
}
impl TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
fn new(file: VirtualFile, gate_guard: GateGuard) -> Self {
Self {
inner: Arc::new(TempVirtualFile::new(file, gate_guard)),
}
}
}
impl OwnedAsyncWriter for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
fn write_all_at<Buf: owned_buffers_io::io_buf_aligned::IoBufAligned + Send>(
&self,
buf: owned_buffers_io::io_buf_ext::FullSlice<Buf>,
offset: u64,
ctx: &RequestContext,
) -> impl std::future::Future<
Output = (
owned_buffers_io::io_buf_ext::FullSlice<Buf>,
std::io::Result<()>,
),
> + Send {
self.inner.write_all_at(buf, offset, ctx)
}
}
impl std::ops::Deref for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
type Target = VirtualFile;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum EphemeralFileWriteError {
#[error("{0}")]
@@ -262,9 +298,9 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
let mutable_range = Range(std::cmp::max(start, submitted_offset), end);
let dst = if written_range.len() > 0 {
let file: &VirtualFile = self.buffered_writer.as_inner();
let bounds = dst.bounds();
let slice = file
let slice = self
.file
.read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
.await?;
Slice::from_buf_bounds(Slice::into_inner(slice), bounds)
@@ -456,7 +492,7 @@ mod tests {
assert_eq!(&buf, &content[range]);
}
let file_contents = std::fs::read(file.buffered_writer.as_inner().path()).unwrap();
let file_contents = std::fs::read(file.file.path()).unwrap();
assert!(file_contents == content[0..cap * 2]);
let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap();
@@ -489,7 +525,7 @@ mod tests {
// assert the state is as this test expects it to be
let load_io_buf_res = file.load_to_io_buf(&ctx).await.unwrap();
assert_eq!(&load_io_buf_res[..], &content[0..cap * 2 + cap / 2]);
let md = file.buffered_writer.as_inner().path().metadata().unwrap();
let md = file.file.path().metadata().unwrap();
assert_eq!(
md.len(),
2 * cap.into_u64(),

View File

@@ -6,6 +6,7 @@
use std::collections::HashSet;
use std::future::Future;
use std::str::FromStr;
use std::sync::atomic::AtomicU64;
use std::time::SystemTime;
use anyhow::{Context, anyhow};
@@ -15,7 +16,7 @@ use remote_storage::{
DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath,
};
use tokio::fs::{self, File, OpenOptions};
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
use tokio::io::AsyncSeekExt;
use tokio_util::io::StreamReader;
use tokio_util::sync::CancellationToken;
use tracing::warn;
@@ -40,7 +41,10 @@ use crate::span::{
use crate::tenant::Generation;
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
use crate::tenant::storage_layer::LayerName;
use crate::virtual_file::{MaybeFatalIo, VirtualFile, on_fatal_io_error};
use crate::virtual_file;
use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
use crate::virtual_file::{IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::virtual_file::{TempVirtualFile, owned_buffers_io};
///
/// If 'metadata' is given, we will validate that the downloaded file's size matches that
@@ -72,21 +76,36 @@ pub async fn download_layer_file<'a>(
layer_metadata.generation,
);
// Perform a rename inspired by durable_rename from file_utils.c.
// The sequence:
// write(tmp)
// fsync(tmp)
// rename(tmp, new)
// fsync(new)
// fsync(parent)
// For more context about durable_rename check this email from postgres mailing list:
// https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
// If pageserver crashes the temp file will be deleted on startup and re-downloaded.
let temp_file_path = path_with_suffix_extension(local_path, TEMP_DOWNLOAD_EXTENSION);
let bytes_amount = download_retry(
let (bytes_amount, temp_file) = download_retry(
|| async {
download_object(storage, &remote_path, &temp_file_path, gate, cancel, ctx).await
// TempVirtualFile requires us to never reuse a filename while an old
// instance of TempVirtualFile created with that filename is not done dropping yet.
// So, we use a monotonic counter to disambiguate the filenames.
static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
let filename_disambiguator =
NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let temp_file_path = path_with_suffix_extension(
local_path,
&format!("{filename_disambiguator:x}.{TEMP_DOWNLOAD_EXTENSION}"),
);
let temp_file = TempVirtualFile::new(
// Not _v2 yet which is sensitive to virtual_file_io_mode.
// That'll happen in PR https://github.com/neondatabase/neon/pull/11558
VirtualFile::open_with_options(
&temp_file_path,
virtual_file::OpenOptions::new()
.create_new(true)
.write(true),
ctx,
)
.await
.with_context(|| format!("create a temp file for layer download: {temp_file_path}"))
.map_err(DownloadError::Other)?,
gate.enter().map_err(|_| DownloadError::Cancelled)?,
);
download_object(storage, &remote_path, temp_file, gate, cancel, ctx).await
},
&format!("download {remote_path:?}"),
cancel,
@@ -96,7 +115,8 @@ pub async fn download_layer_file<'a>(
let expected = layer_metadata.file_size;
if expected != bytes_amount {
return Err(DownloadError::Other(anyhow!(
"According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}",
"According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {:?}",
temp_file.path()
)));
}
@@ -106,11 +126,28 @@ pub async fn download_layer_file<'a>(
)))
});
fs::rename(&temp_file_path, &local_path)
// Try rename before disarming the temp file.
// That way, if rename fails for whatever reason, we clean up the temp file on the return path.
fs::rename(temp_file.path(), &local_path)
.await
.with_context(|| format!("rename download layer file to {local_path}"))
.map_err(DownloadError::Other)?;
// The temp file's VirtualFile points to the temp_file_path which we moved above.
// Drop it immediately, it's invalid.
// This will get better in https://github.com/neondatabase/neon/issues/11692
let _: VirtualFile = temp_file.disarm_into_inner();
// NB: The gate guard that was stored in `temp_file` is dropped but we continue
// to operate on it and on the parent timeline directory.
// Those operations are safe to do because higher-level code is holding another gate guard:
// - attached mode: the download task spawned by struct Layer is holding the gate guard
// - secondary mode: The TenantDownloader::download holds the gate open
// The rename above is not durable yet.
// It doesn't matter for crash consistency because pageserver startup deletes temp
// files and we'll re-download on demand if necessary.
// We use fatal_err() below because the after the rename above,
// the in-memory state of the filesystem already has the layer file in its final place,
// and subsequent pageserver code could think it's durable while it really isn't.
@@ -146,147 +183,58 @@ pub async fn download_layer_file<'a>(
async fn download_object(
storage: &GenericRemoteStorage,
src_path: &RemotePath,
dst_path: &Utf8PathBuf,
#[cfg_attr(target_os = "macos", allow(unused_variables))] gate: &utils::sync::gate::Gate,
destination_file: TempVirtualFile,
gate: &utils::sync::gate::Gate,
cancel: &CancellationToken,
#[cfg_attr(target_os = "macos", allow(unused_variables))] ctx: &RequestContext,
) -> Result<u64, DownloadError> {
let res = match crate::virtual_file::io_engine::get() {
crate::virtual_file::io_engine::IoEngine::NotSet => panic!("unset"),
crate::virtual_file::io_engine::IoEngine::StdFs => {
async {
let destination_file = tokio::fs::File::create(dst_path)
.await
.with_context(|| format!("create a destination file for layer '{dst_path}'"))
.map_err(DownloadError::Other)?;
ctx: &RequestContext,
) -> Result<(u64, TempVirtualFile), DownloadError> {
let mut download = storage
.download(src_path, &DownloadOpts::default(), cancel)
.await?;
let download = storage
.download(src_path, &DownloadOpts::default(), cancel)
.await?;
pausable_failpoint!("before-downloading-layer-stream-pausable");
pausable_failpoint!("before-downloading-layer-stream-pausable");
let dst_path = destination_file.path().to_owned();
let mut buffered = owned_buffers_io::write::BufferedWriter::<IoBufferMut, _>::new(
destination_file,
|| IoBufferMut::with_capacity(super::BUFFER_SIZE),
gate.enter().map_err(|_| DownloadError::Cancelled)?,
cancel.child_token(),
ctx,
tracing::info_span!(parent: None, "download_object_buffered_writer", %dst_path),
);
let mut buf_writer =
tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file);
let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
let bytes_amount = tokio::io::copy_buf(&mut reader, &mut buf_writer).await?;
buf_writer.flush().await?;
let mut destination_file = buf_writer.into_inner();
// Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
// A file will not be closed immediately when it goes out of scope if there are any IO operations
// that have not yet completed. To ensure that a file is closed immediately when it is dropped,
// you should call flush before dropping it.
//
// From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
// we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
// But for additional safety lets check/wait for any pending operations.
destination_file
.flush()
.await
.maybe_fatal_err("download_object sync_all")
.with_context(|| format!("flush source file at {dst_path}"))
.map_err(DownloadError::Other)?;
// not using sync_data because it can lose file size update
destination_file
.sync_all()
.await
.maybe_fatal_err("download_object sync_all")
.with_context(|| format!("failed to fsync source file at {dst_path}"))
.map_err(DownloadError::Other)?;
Ok(bytes_amount)
}
.await
}
#[cfg(target_os = "linux")]
crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
use std::sync::Arc;
use crate::virtual_file::{IoBufferMut, owned_buffers_io};
async {
let destination_file = Arc::new(
VirtualFile::create(dst_path, ctx)
.await
.with_context(|| {
format!("create a destination file for layer '{dst_path}'")
})
.map_err(DownloadError::Other)?,
);
let mut download = storage
.download(src_path, &DownloadOpts::default(), cancel)
.await?;
pausable_failpoint!("before-downloading-layer-stream-pausable");
let mut buffered = owned_buffers_io::write::BufferedWriter::<IoBufferMut, _>::new(
destination_file,
|| IoBufferMut::with_capacity(super::BUFFER_SIZE),
gate.enter().map_err(|_| DownloadError::Cancelled)?,
cancel.child_token(),
ctx,
tracing::info_span!(parent: None, "download_object_buffered_writer", %dst_path),
);
// TODO: use vectored write (writev) once supported by tokio-epoll-uring.
// There's chunks_vectored() on the stream.
let (bytes_amount, destination_file) = async {
while let Some(res) =
futures::StreamExt::next(&mut download.download_stream).await
{
let chunk = match res {
Ok(chunk) => chunk,
Err(e) => return Err(DownloadError::from(e)),
};
buffered
.write_buffered_borrowed(&chunk, ctx)
.await
.map_err(|e| match e {
FlushTaskError::Cancelled => DownloadError::Cancelled,
})?;
}
let inner = buffered
.flush_and_into_inner(ctx)
.await
.map_err(|e| match e {
FlushTaskError::Cancelled => DownloadError::Cancelled,
})?;
Ok(inner)
}
.await?;
// not using sync_data because it can lose file size update
destination_file
.sync_all()
.await
.maybe_fatal_err("download_object sync_all")
.with_context(|| format!("failed to fsync source file at {dst_path}"))
.map_err(DownloadError::Other)?;
Ok(bytes_amount)
}
.await
}
};
// in case the download failed, clean up
match res {
Ok(bytes_amount) => Ok(bytes_amount),
Err(e) => {
if let Err(e) = tokio::fs::remove_file(dst_path).await {
if e.kind() != std::io::ErrorKind::NotFound {
on_fatal_io_error(&e, &format!("Removing temporary file {dst_path}"));
}
}
Err(e)
// TODO: use vectored write (writev) once supported by tokio-epoll-uring.
// There's chunks_vectored() on the stream.
let (bytes_amount, destination_file) = async {
while let Some(res) = futures::StreamExt::next(&mut download.download_stream).await {
let chunk = match res {
Ok(chunk) => chunk,
Err(e) => return Err(DownloadError::from(e)),
};
buffered
.write_buffered_borrowed(&chunk, ctx)
.await
.map_err(|e| match e {
FlushTaskError::Cancelled => DownloadError::Cancelled,
})?;
}
let inner = buffered.shutdown(ctx).await.map_err(|e| match e {
FlushTaskError::Cancelled => DownloadError::Cancelled,
})?;
Ok(inner)
}
.await?;
// not using sync_data because it can lose file size update
destination_file
.sync_all()
.await
.maybe_fatal_err("download_object sync_all")
.with_context(|| format!("failed to fsync source file at {dst_path}"))
.map_err(DownloadError::Other)?;
Ok((bytes_amount, destination_file))
}
const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";

View File

@@ -646,7 +646,7 @@ enum UpdateError {
NoData,
#[error("Insufficient local storage space")]
NoSpace,
#[error("Failed to download")]
#[error("Failed to download: {0}")]
DownloadError(DownloadError),
#[error(transparent)]
Deserialize(#[from] serde_json::Error),

View File

@@ -34,6 +34,7 @@ use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use anyhow::{Context, Result, bail, ensure};
use camino::{Utf8Path, Utf8PathBuf};
@@ -45,8 +46,6 @@ use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::ImageCompressionAlgorithm;
use pageserver_api::shard::TenantShardId;
use pageserver_api::value::Value;
use rand::Rng;
use rand::distributions::Alphanumeric;
use serde::{Deserialize, Serialize};
use tokio::sync::OnceCell;
use tokio_epoll_uring::IoBuf;
@@ -74,6 +73,7 @@ use crate::tenant::vectored_blob_io::{
BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
VectoredReadPlanner,
};
use crate::virtual_file::TempVirtualFile;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
@@ -288,19 +288,20 @@ impl DeltaLayer {
key_start: Key,
lsn_range: &Range<Lsn>,
) -> Utf8PathBuf {
let rand_string: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(8)
.map(char::from)
.collect();
// TempVirtualFile requires us to never reuse a filename while an old
// instance of TempVirtualFile created with that filename is not done dropping yet.
// So, we use a monotonic counter to disambiguate the filenames.
static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
let filename_disambiguator =
NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
conf.timeline_path(tenant_shard_id, timeline_id)
.join(format!(
"{}-XXX__{:016X}-{:016X}.{}.{}",
"{}-XXX__{:016X}-{:016X}.{:x}.{}",
key_start,
u64::from(lsn_range.start),
u64::from(lsn_range.end),
rand_string,
filename_disambiguator,
TEMP_FILE_SUFFIX,
))
}
@@ -421,7 +422,7 @@ impl DeltaLayerWriterInner {
let path =
DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
let mut file = VirtualFile::create(&path, ctx).await?;
let mut file = TempVirtualFile::new(VirtualFile::create(&path, ctx).await?, gate.enter()?);
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
@@ -515,22 +516,6 @@ impl DeltaLayerWriterInner {
self,
key_end: Key,
ctx: &RequestContext,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let temp_path = self.path.clone();
let result = self.finish0(key_end, ctx).await;
if let Err(ref e) = result {
tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}");
if let Err(e) = std::fs::remove_file(&temp_path) {
tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
}
}
result
}
async fn finish0(
self,
key_end: Key,
ctx: &RequestContext,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
@@ -598,6 +583,10 @@ impl DeltaLayerWriterInner {
trace!("created delta layer {}", self.path);
// The gate guard stored in `destination_file` is dropped. Callers (e.g.. flush loop or compaction)
// keep the gate open also, so that it's safe for them to rename the file to its final destination.
file.disarm_into_inner();
Ok((desc, self.path))
}
}
@@ -726,17 +715,6 @@ impl DeltaLayerWriter {
}
}
impl Drop for DeltaLayerWriter {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
// We want to remove the virtual file here, so it's fine to not
// having completely flushed unwritten data.
let vfile = inner.blob_writer.into_inner_no_flush();
vfile.remove();
}
}
}
#[derive(thiserror::Error, Debug)]
pub enum RewriteSummaryError {
#[error("magic mismatch")]
@@ -1609,8 +1587,8 @@ pub(crate) mod test {
use bytes::Bytes;
use itertools::MinMaxResult;
use pageserver_api::value::Value;
use rand::RngCore;
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
use rand::{Rng, RngCore};
use super::*;
use crate::DEFAULT_PG_VERSION;

View File

@@ -32,6 +32,7 @@ use std::ops::Range;
use std::os::unix::prelude::FileExt;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use anyhow::{Context, Result, bail, ensure};
use bytes::Bytes;
@@ -43,8 +44,6 @@ use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use pageserver_api::value::Value;
use rand::Rng;
use rand::distributions::Alphanumeric;
use serde::{Deserialize, Serialize};
use tokio::sync::OnceCell;
use tokio_stream::StreamExt;
@@ -72,6 +71,7 @@ use crate::tenant::vectored_blob_io::{
BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
VectoredReadPlanner,
};
use crate::virtual_file::TempVirtualFile;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
@@ -252,14 +252,18 @@ impl ImageLayer {
tenant_shard_id: TenantShardId,
fname: &ImageLayerName,
) -> Utf8PathBuf {
let rand_string: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(8)
.map(char::from)
.collect();
// TempVirtualFile requires us to never reuse a filename while an old
// instance of TempVirtualFile created with that filename is not done dropping yet.
// So, we use a monotonic counter to disambiguate the filenames.
static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
let filename_disambiguator =
NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
conf.timeline_path(&tenant_shard_id, &timeline_id)
.join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
.join(format!(
"{fname}.{:x}.{TEMP_FILE_SUFFIX}",
filename_disambiguator
))
}
///
@@ -773,7 +777,7 @@ impl ImageLayerWriterInner {
},
);
trace!("creating image layer {}", path);
let mut file = {
let mut file = TempVirtualFile::new(
VirtualFile::open_with_options(
&path,
virtual_file::OpenOptions::new()
@@ -781,8 +785,9 @@ impl ImageLayerWriterInner {
.create_new(true),
ctx,
)
.await?
};
.await?,
gate.enter()?,
);
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
@@ -896,25 +901,6 @@ impl ImageLayerWriterInner {
self,
ctx: &RequestContext,
end_key: Option<Key>,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let temp_path = self.path.clone();
let result = self.finish0(ctx, end_key).await;
if let Err(ref e) = result {
tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}");
if let Err(e) = std::fs::remove_file(&temp_path) {
tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
}
}
result
}
///
/// Finish writing the image layer.
///
async fn finish0(
self,
ctx: &RequestContext,
end_key: Option<Key>,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
@@ -932,7 +918,7 @@ impl ImageLayerWriterInner {
crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
};
let mut file = self.blob_writer.into_inner();
let mut file = self.blob_writer.into_inner(ctx).await?;
// Write out the index
file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
@@ -1000,6 +986,10 @@ impl ImageLayerWriterInner {
trace!("created image layer {}", self.path);
// The gate guard stored in `destination_file` is dropped. Callers (e.g.. flush loop or compaction)
// keep the gate open also, so that it's safe for them to rename the file to its final destination.
file.disarm_into_inner();
Ok((desc, self.path))
}
}
@@ -1125,14 +1115,6 @@ impl ImageLayerWriter {
}
}
impl Drop for ImageLayerWriter {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
inner.blob_writer.into_inner().remove();
}
}
}
pub struct ImageLayerIterator<'a> {
image_layer: &'a ImageLayerInner,
ctx: &'a RequestContext,

View File

@@ -1285,6 +1285,10 @@ impl Timeline {
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
if query.is_empty() {
return Ok(BTreeMap::default());
}
let read_path = if self.conf.enable_read_path_debugging || ctx.read_path_debug() {
Some(ReadPath::new(
query.total_keyspace(),

View File

@@ -1,20 +1,21 @@
use std::sync::Arc;
use anyhow::{Context, bail};
use pageserver_api::models::ShardImportStatus;
use remote_storage::RemotePath;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, info, info_span};
use tracing::info;
use utils::lsn::Lsn;
use super::Timeline;
use crate::context::RequestContext;
use crate::controller_upcall_client::{StorageControllerUpcallApi, StorageControllerUpcallClient};
use crate::tenant::metadata::TimelineMetadata;
mod flow;
mod importbucket_client;
mod importbucket_format;
pub(crate) mod index_part_format;
pub(crate) mod upcall_api;
pub async fn doit(
timeline: &Arc<Timeline>,
@@ -34,23 +35,6 @@ pub async fn doit(
let storage = importbucket_client::new(timeline.conf, &location, cancel.clone()).await?;
info!("get spec early so we know we'll be able to upcall when done");
let Some(spec) = storage.get_spec().await? else {
bail!("spec not found")
};
let upcall_client =
upcall_api::Client::new(timeline.conf, cancel.clone()).context("create upcall client")?;
//
// send an early progress update to clean up k8s job early and generate potentially useful logs
//
info!("send early progress update");
upcall_client
.send_progress_until_success(&spec)
.instrument(info_span!("early_progress_update"))
.await?;
let status_prefix = RemotePath::from_string("status").unwrap();
//
@@ -176,7 +160,21 @@ pub async fn doit(
//
// Communicate that shard is done.
// Ensure at-least-once delivery of the upcall to storage controller
// before we mark the task as done and never come here again.
//
let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel)?
.expect("storcon configured");
storcon_client
.put_timeline_import_status(
timeline.tenant_shard_id,
timeline.timeline_id,
// TODO(vlad): What about import errors?
ShardImportStatus::Done,
)
.await
.map_err(|_err| anyhow::anyhow!("Shut down while putting timeline import status"))?;
storage
.put_json(
&shard_status_key,
@@ -186,16 +184,6 @@ pub async fn doit(
.context("put shard status")?;
}
//
// Ensure at-least-once deliver of the upcall to cplane
// before we mark the task as done and never come here again.
//
info!("send final progress update");
upcall_client
.send_progress_until_success(&spec)
.instrument(info_span!("final_progress_update"))
.await?;
//
// Mark as done in index_part.
// This makes subsequent timeline loads enter the normal load code path

View File

@@ -13,7 +13,7 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, info, instrument};
use utils::lsn::Lsn;
use super::{importbucket_format, index_part_format};
use super::index_part_format;
use crate::assert_u64_eq_usize::U64IsUsize;
use crate::config::PageServerConf;
@@ -173,12 +173,6 @@ impl RemoteStorageWrapper {
res
}
pub async fn get_spec(&self) -> Result<Option<importbucket_format::Spec>, anyhow::Error> {
self.get_json(&RemotePath::from_string("spec.json").unwrap())
.await
.context("get spec")
}
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
pub async fn get_json<T: DeserializeOwned>(
&self,
@@ -244,7 +238,8 @@ impl RemoteStorageWrapper {
kind: DownloadKind::Large,
etag: None,
byte_start: Bound::Included(start_inclusive),
byte_end: Bound::Excluded(end_exclusive)
byte_end: Bound::Excluded(end_exclusive),
version_id: None,
},
&self.cancel)
.await?;

View File

@@ -11,10 +11,3 @@ pub struct ShardStatus {
pub done: bool,
// TODO: remaining fields
}
// TODO: dedupe with fast_import code
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)]
pub struct Spec {
pub project_id: String,
pub branch_id: String,
}

View File

@@ -1,124 +0,0 @@
//! FIXME: most of this is copy-paste from mgmt_api.rs ; dedupe into a `reqwest_utils::Client` crate.
use pageserver_client::mgmt_api::{Error, ResponseErrorMessageExt};
use reqwest::{Certificate, Method};
use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
use tracing::error;
use super::importbucket_format::Spec;
use crate::config::PageServerConf;
pub struct Client {
base_url: String,
authorization_header: Option<String>,
client: reqwest::Client,
cancel: CancellationToken,
}
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Serialize, Deserialize, Debug)]
struct ImportProgressRequest {
// no fields yet, not sure if there every will be any
}
#[derive(Serialize, Deserialize, Debug)]
struct ImportProgressResponse {
// we don't care
}
impl Client {
pub fn new(conf: &PageServerConf, cancel: CancellationToken) -> anyhow::Result<Self> {
let Some(ref base_url) = conf.import_pgdata_upcall_api else {
anyhow::bail!("import_pgdata_upcall_api is not configured")
};
let mut http_client = reqwest::Client::builder();
for cert in &conf.ssl_ca_certs {
http_client = http_client.add_root_certificate(Certificate::from_der(cert.contents())?);
}
let http_client = http_client.build()?;
Ok(Self {
base_url: base_url.to_string(),
client: http_client,
cancel,
authorization_header: conf
.import_pgdata_upcall_api_token
.as_ref()
.map(|secret_string| secret_string.get_contents())
.map(|jwt| format!("Bearer {jwt}")),
})
}
fn start_request<U: reqwest::IntoUrl>(
&self,
method: Method,
uri: U,
) -> reqwest::RequestBuilder {
let req = self.client.request(method, uri);
if let Some(value) = &self.authorization_header {
req.header(reqwest::header::AUTHORIZATION, value)
} else {
req
}
}
async fn request_noerror<B: serde::Serialize, U: reqwest::IntoUrl>(
&self,
method: Method,
uri: U,
body: B,
) -> Result<reqwest::Response> {
self.start_request(method, uri)
.json(&body)
.send()
.await
.map_err(Error::ReceiveBody)
}
async fn request<B: serde::Serialize, U: reqwest::IntoUrl>(
&self,
method: Method,
uri: U,
body: B,
) -> Result<reqwest::Response> {
let res = self.request_noerror(method, uri, body).await?;
let response = res.error_from_body().await?;
Ok(response)
}
pub async fn send_progress_once(&self, spec: &Spec) -> Result<()> {
let url = format!(
"{}/projects/{}/branches/{}/import_progress",
self.base_url, spec.project_id, spec.branch_id
);
let ImportProgressResponse {} = self
.request(Method::POST, url, &ImportProgressRequest {})
.await?
.json()
.await
.map_err(Error::ReceiveBody)?;
Ok(())
}
pub async fn send_progress_until_success(&self, spec: &Spec) -> anyhow::Result<()> {
loop {
match self.send_progress_once(spec).await {
Ok(()) => return Ok(()),
Err(Error::Cancelled) => return Err(anyhow::anyhow!("cancelled")),
Err(err) => {
error!(?err, "error sending progress, retrying");
if tokio::time::timeout(
std::time::Duration::from_secs(10),
self.cancel.cancelled(),
)
.await
.is_ok()
{
anyhow::bail!("cancelled while sending early progress update");
}
}
}
}
}
}

View File

@@ -25,29 +25,31 @@ use owned_buffers_io::aligned_buffer::{AlignedBufferMut, AlignedSlice, ConstAlig
use owned_buffers_io::io_buf_aligned::{IoBufAligned, IoBufAlignedMut};
use owned_buffers_io::io_buf_ext::FullSlice;
use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
pub use pageserver_api::models::virtual_file as api;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::time::Instant;
use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};
use self::owned_buffers_io::write::OwnedAsyncWriter;
use crate::assert_u64_eq_usize::UsizeIsU64;
use crate::context::RequestContext;
use crate::metrics::{STORAGE_IO_TIME_METRIC, StorageIoOperation};
use crate::page_cache::{PAGE_SZ, PageWriteGuard};
pub(crate) mod io_engine;
pub(crate) use api::IoMode;
pub(crate) use io_engine::IoEngineKind;
pub use io_engine::{
FeatureTestResult as IoEngineFeatureTestResult, feature_test as io_engine_feature_test,
io_engine_for_bench,
};
mod metadata;
mod open_options;
pub(crate) use api::IoMode;
pub(crate) use io_engine::IoEngineKind;
pub(crate) use metadata::Metadata;
pub(crate) use open_options::*;
pub use pageserver_api::models::virtual_file as api;
pub use temporary::TempVirtualFile;
use self::owned_buffers_io::write::OwnedAsyncWriter;
pub(crate) mod io_engine;
mod metadata;
mod open_options;
mod temporary;
pub(crate) mod owned_buffers_io {
//! Abstractions for IO with owned buffers.
//!
@@ -1369,7 +1371,7 @@ pub(crate) type IoPageSlice<'a> =
static IO_MODE: once_cell::sync::Lazy<AtomicU8> =
once_cell::sync::Lazy::new(|| AtomicU8::new(IoMode::preferred() as u8));
pub(crate) fn set_io_mode(mode: IoMode) {
pub fn set_io_mode(mode: IoMode) {
IO_MODE.store(mode as u8, std::sync::atomic::Ordering::Relaxed);
}

View File

@@ -1,5 +1,4 @@
mod flush;
use std::sync::Arc;
pub(crate) use flush::FlushControl;
use flush::FlushHandle;
@@ -41,7 +40,6 @@ pub trait OwnedAsyncWriter {
// TODO(yuchen): For large write, implementing buffer bypass for aligned parts of the write could be beneficial to throughput,
// since we would avoid copying majority of the data into the internal buffer.
pub struct BufferedWriter<B: Buffer, W> {
writer: Arc<W>,
/// Clone of the buffer that was last submitted to the flush loop.
/// `None` if no flush request has been submitted, Some forever after.
pub(super) maybe_flushed: Option<FullSlice<B::IoBuf>>,
@@ -72,7 +70,7 @@ where
///
/// The `buf_new` function provides a way to initialize the owned buffers used by this writer.
pub fn new(
writer: Arc<W>,
writer: W,
buf_new: impl Fn() -> B,
gate_guard: utils::sync::gate::GateGuard,
cancel: CancellationToken,
@@ -80,7 +78,6 @@ where
flush_task_span: tracing::Span,
) -> Self {
Self {
writer: writer.clone(),
mutable: Some(buf_new()),
maybe_flushed: None,
flush_handle: FlushHandle::spawn_new(
@@ -95,10 +92,6 @@ where
}
}
pub fn as_inner(&self) -> &W {
&self.writer
}
/// Returns the number of bytes submitted to the background flush task.
pub fn bytes_submitted(&self) -> u64 {
self.bytes_submitted
@@ -116,20 +109,16 @@ where
}
#[cfg_attr(target_os = "macos", allow(dead_code))]
pub async fn flush_and_into_inner(
mut self,
ctx: &RequestContext,
) -> Result<(u64, Arc<W>), FlushTaskError> {
pub async fn shutdown(mut self, ctx: &RequestContext) -> Result<(u64, W), FlushTaskError> {
self.flush(ctx).await?;
let Self {
mutable: buf,
maybe_flushed: _,
writer,
mut flush_handle,
bytes_submitted: bytes_amount,
} = self;
flush_handle.shutdown().await?;
let writer = flush_handle.shutdown().await?;
assert!(buf.is_some());
Ok((bytes_amount, writer))
}
@@ -329,7 +318,7 @@ mod tests {
async fn test_write_all_borrowed_always_goes_through_buffer() -> anyhow::Result<()> {
let ctx = test_ctx();
let ctx = &ctx;
let recorder = Arc::new(RecorderWriter::default());
let recorder = RecorderWriter::default();
let gate = utils::sync::gate::Gate::default();
let cancel = CancellationToken::new();
let mut writer = BufferedWriter::<_, RecorderWriter>::new(
@@ -350,7 +339,7 @@ mod tests {
writer.write_buffered_borrowed(b"j", ctx).await?;
writer.write_buffered_borrowed(b"klmno", ctx).await?;
let (_, recorder) = writer.flush_and_into_inner(ctx).await?;
let (_, recorder) = writer.shutdown(ctx).await?;
assert_eq!(
recorder.get_writes(),
{

View File

@@ -1,5 +1,4 @@
use std::ops::ControlFlow;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, info, info_span, warn};
@@ -21,7 +20,7 @@ pub struct FlushHandleInner<Buf, W> {
/// and receives recyled buffer.
channel: duplex::mpsc::Duplex<FlushRequest<Buf>, FullSlice<Buf>>,
/// Join handle for the background flush task.
join_handle: tokio::task::JoinHandle<Result<Arc<W>, FlushTaskError>>,
join_handle: tokio::task::JoinHandle<Result<W, FlushTaskError>>,
}
struct FlushRequest<Buf> {
@@ -120,7 +119,7 @@ where
/// The queue depth is 1, and the passed-in `buf` seeds the queue depth.
/// I.e., the passed-in buf is immediately available to the handle as a recycled buffer.
pub fn spawn_new<B>(
file: Arc<W>,
file: W,
buf: B,
gate_guard: utils::sync::gate::GateGuard,
cancel: CancellationToken,
@@ -183,7 +182,7 @@ where
}
/// Cleans up the channel, join the flush task.
pub async fn shutdown(&mut self) -> Result<Arc<W>, FlushTaskError> {
pub async fn shutdown(&mut self) -> Result<W, FlushTaskError> {
let handle = self
.inner
.take()
@@ -207,7 +206,7 @@ pub struct FlushBackgroundTask<Buf, W> {
/// and send back recycled buffer.
channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
/// A writter for persisting data to disk.
writer: Arc<W>,
writer: W,
ctx: RequestContext,
cancel: CancellationToken,
/// Prevent timeline from shuting down until the flush background task finishes flushing all remaining buffers to disk.
@@ -228,7 +227,7 @@ where
/// Creates a new background flush task.
fn new(
channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
file: Arc<W>,
file: W,
gate_guard: utils::sync::gate::GateGuard,
cancel: CancellationToken,
ctx: RequestContext,
@@ -243,7 +242,7 @@ where
}
/// Runs the background flush task.
async fn run(mut self) -> Result<Arc<W>, FlushTaskError> {
async fn run(mut self) -> Result<W, FlushTaskError> {
// Exit condition: channel is closed and there is no remaining buffer to be flushed
while let Some(request) = self.channel.recv().await {
#[cfg(test)]

View File

@@ -0,0 +1,106 @@
use tracing::error;
use utils::sync::gate::GateGuard;
use crate::context::RequestContext;
use super::{
MaybeFatalIo, VirtualFile,
owned_buffers_io::{
io_buf_aligned::IoBufAligned, io_buf_ext::FullSlice, write::OwnedAsyncWriter,
},
};
/// A wrapper around [`super::VirtualFile`] that deletes the file on drop.
/// For use as a [`OwnedAsyncWriter`] in [`super::owned_buffers_io::write::BufferedWriter`].
#[derive(Debug)]
pub struct TempVirtualFile {
inner: Option<Inner>,
}
#[derive(Debug)]
struct Inner {
file: VirtualFile,
/// Gate guard is held on as long as we need to do operations in the path (delete on drop)
_gate_guard: GateGuard,
}
impl OwnedAsyncWriter for TempVirtualFile {
fn write_all_at<Buf: IoBufAligned + Send>(
&self,
buf: FullSlice<Buf>,
offset: u64,
ctx: &RequestContext,
) -> impl std::future::Future<Output = (FullSlice<Buf>, std::io::Result<()>)> + Send {
VirtualFile::write_all_at(self, buf, offset, ctx)
}
}
impl Drop for TempVirtualFile {
fn drop(&mut self) {
let Some(Inner { file, _gate_guard }) = self.inner.take() else {
return;
};
let path = file.path();
if let Err(e) =
std::fs::remove_file(path).maybe_fatal_err("failed to remove the virtual file")
{
error!(err=%e, path=%path, "failed to remove");
}
drop(_gate_guard);
}
}
impl std::ops::Deref for TempVirtualFile {
type Target = VirtualFile;
fn deref(&self) -> &Self::Target {
&self
.inner
.as_ref()
.expect("only None after into_inner or drop")
.file
}
}
impl std::ops::DerefMut for TempVirtualFile {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self
.inner
.as_mut()
.expect("only None after into_inner or drop")
.file
}
}
impl TempVirtualFile {
/// The caller is responsible for ensuring that the path of `virtual_file` is not reused
/// until after this TempVirtualFile's `Drop` impl has completed.
/// Failure to do so will result in unlinking of the reused path by the original instance's Drop impl.
/// The best way to do so is by using a monotonic counter as a disambiguator.
/// TODO: centralize this disambiguator pattern inside this struct.
/// => <https://github.com/neondatabase/neon/pull/11549#issuecomment-2824592831>
pub fn new(virtual_file: VirtualFile, gate_guard: GateGuard) -> Self {
Self {
inner: Some(Inner {
file: virtual_file,
_gate_guard: gate_guard,
}),
}
}
/// Dismantle this wrapper and return the underlying [`VirtualFile`].
/// This disables auto-unlinking functionality that is the essence of this wrapper.
///
/// The gate guard is dropped as well; it is the callers responsibility to ensure filesystem
/// operations after calls to this functions are still gated by some other gate guard.
///
/// TODO:
/// - centralize the common usage pattern of callers (sync_all(self), rename(self, dst), sync_all(dst.parent))
/// => <https://github.com/neondatabase/neon/pull/11549#issuecomment-2824592831>
pub fn disarm_into_inner(mut self) -> VirtualFile {
self.inner
.take()
.expect("only None after into_inner or drop, and we are into_inner, and we consume")
.file
}
}

View File

@@ -803,7 +803,13 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
case RELPERSISTENCE_TEMP:
case RELPERSISTENCE_UNLOGGED:
#ifdef DEBUG_COMPARE_LOCAL
mdcreate(reln, forkNum, forkNum == INIT_FORKNUM || isRedo);
if (forkNum == MAIN_FORKNUM)
mdcreate(reln, INIT_FORKNUM, true);
#else
mdcreate(reln, forkNum, isRedo);
#endif
return;
default:
@@ -1973,6 +1979,10 @@ neon_start_unlogged_build(SMgrRelation reln)
case RELPERSISTENCE_UNLOGGED:
unlogged_build_rel = reln;
unlogged_build_phase = UNLOGGED_BUILD_NOT_PERMANENT;
#ifdef DEBUG_COMPARE_LOCAL
if (!IsParallelWorker())
mdcreate(reln, INIT_FORKNUM, true);
#endif
return;
default:
@@ -1995,12 +2005,14 @@ neon_start_unlogged_build(SMgrRelation reln)
* FIXME: should we pass isRedo true to create the tablespace dir if it
* doesn't exist? Is it needed?
*/
#ifndef DEBUG_COMPARE_LOCAL
if (!IsParallelWorker())
{
#ifndef DEBUG_COMPARE_LOCAL
mdcreate(reln, MAIN_FORKNUM, false);
#else
mdcreate(reln, INIT_FORKNUM, false);
mdcreate(reln, INIT_FORKNUM, true);
#endif
}
}
/*
@@ -2099,12 +2111,12 @@ neon_end_unlogged_build(SMgrRelation reln)
#ifndef DEBUG_COMPARE_LOCAL
/* use isRedo == true, so that we drop it immediately */
mdunlink(rinfob, forknum, true);
#else
mdunlink(rinfob, INIT_FORKNUM, true);
#endif
}
#ifdef DEBUG_COMPARE_LOCAL
mdunlink(rinfob, INIT_FORKNUM, true);
#endif
}
unlogged_build_rel = NULL;
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
}

View File

@@ -91,6 +91,7 @@ mod jemalloc;
mod logging;
mod metrics;
mod parse;
mod pglb;
mod protocol2;
mod proxy;
mod rate_limiter;

193
proxy/src/pglb/inprocess.rs Normal file
View File

@@ -0,0 +1,193 @@
#![allow(dead_code, reason = "TODO: work in progress")]
use std::pin::{Pin, pin};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll};
use std::{fmt, io};
use tokio::io::{AsyncRead, AsyncWrite, DuplexStream, ReadBuf};
use tokio::sync::mpsc;
const STREAM_CHANNEL_SIZE: usize = 16;
const MAX_STREAM_BUFFER_SIZE: usize = 4096;
#[derive(Debug)]
pub struct Connection {
stream_sender: mpsc::Sender<Stream>,
stream_receiver: mpsc::Receiver<Stream>,
stream_id_counter: Arc<AtomicUsize>,
}
impl Connection {
pub fn new() -> (Connection, Connection) {
let (sender_a, receiver_a) = mpsc::channel(STREAM_CHANNEL_SIZE);
let (sender_b, receiver_b) = mpsc::channel(STREAM_CHANNEL_SIZE);
let stream_id_counter = Arc::new(AtomicUsize::new(1));
let conn_a = Connection {
stream_sender: sender_a,
stream_receiver: receiver_b,
stream_id_counter: Arc::clone(&stream_id_counter),
};
let conn_b = Connection {
stream_sender: sender_b,
stream_receiver: receiver_a,
stream_id_counter,
};
(conn_a, conn_b)
}
#[inline]
fn next_stream_id(&self) -> StreamId {
StreamId(self.stream_id_counter.fetch_add(1, Ordering::Relaxed))
}
#[tracing::instrument(skip_all, fields(stream_id = tracing::field::Empty, err))]
pub async fn open_stream(&self) -> io::Result<Stream> {
let (local, remote) = tokio::io::duplex(MAX_STREAM_BUFFER_SIZE);
let stream_id = self.next_stream_id();
tracing::Span::current().record("stream_id", stream_id.0);
let local = Stream {
inner: local,
id: stream_id,
};
let remote = Stream {
inner: remote,
id: stream_id,
};
self.stream_sender
.send(remote)
.await
.map_err(io::Error::other)?;
Ok(local)
}
#[tracing::instrument(skip_all, fields(stream_id = tracing::field::Empty, err))]
pub async fn accept_stream(&mut self) -> io::Result<Option<Stream>> {
Ok(self.stream_receiver.recv().await.inspect(|stream| {
tracing::Span::current().record("stream_id", stream.id.0);
}))
}
}
#[derive(Copy, Clone, Debug)]
pub struct StreamId(usize);
impl fmt::Display for StreamId {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
// TODO: Proper closing. Currently Streams can outlive their Connections.
// Carry WeakSender and check strong_count?
#[derive(Debug)]
pub struct Stream {
inner: DuplexStream,
id: StreamId,
}
impl Stream {
#[inline]
pub fn id(&self) -> StreamId {
self.id
}
}
impl AsyncRead for Stream {
#[tracing::instrument(level = "debug", skip_all, fields(stream_id = %self.id))]
#[inline]
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
pin!(&mut self.inner).poll_read(cx, buf)
}
}
impl AsyncWrite for Stream {
#[tracing::instrument(level = "debug", skip_all, fields(stream_id = %self.id))]
#[inline]
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
pin!(&mut self.inner).poll_write(cx, buf)
}
#[tracing::instrument(level = "debug", skip_all, fields(stream_id = %self.id))]
#[inline]
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
pin!(&mut self.inner).poll_flush(cx)
}
#[tracing::instrument(level = "debug", skip_all, fields(stream_id = %self.id))]
#[inline]
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
pin!(&mut self.inner).poll_shutdown(cx)
}
#[tracing::instrument(level = "debug", skip_all, fields(stream_id = %self.id))]
#[inline]
fn poll_write_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<Result<usize, io::Error>> {
pin!(&mut self.inner).poll_write_vectored(cx, bufs)
}
#[inline]
fn is_write_vectored(&self) -> bool {
self.inner.is_write_vectored()
}
}
#[cfg(test)]
mod tests {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use super::*;
#[tokio::test]
async fn test_simple_roundtrip() {
let (client, mut server) = Connection::new();
let server_task = tokio::spawn(async move {
while let Some(mut stream) = server.accept_stream().await.unwrap() {
tokio::spawn(async move {
let mut buf = [0; 64];
loop {
match stream.read(&mut buf).await.unwrap() {
0 => break,
n => stream.write(&buf[..n]).await.unwrap(),
};
}
});
}
});
let mut stream = client.open_stream().await.unwrap();
stream.write_all(b"hello!").await.unwrap();
let mut buf = [0; 64];
let n = stream.read(&mut buf).await.unwrap();
assert_eq!(n, 6);
assert_eq!(&buf[..n], b"hello!");
drop(stream);
drop(client);
server_task.await.unwrap();
}
}

1
proxy/src/pglb/mod.rs Normal file
View File

@@ -0,0 +1 @@
pub mod inprocess;

View File

@@ -12,7 +12,7 @@ use pin_project_lite::pin_project;
use smol_str::SmolStr;
use strum_macros::FromRepr;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, ReadBuf};
use zerocopy::{FromBytes, FromZeroes};
use zerocopy::{FromBytes, Immutable, KnownLayout, Unaligned, network_endian};
pin_project! {
/// A chained [`AsyncRead`] with [`AsyncWrite`] passthrough
@@ -339,49 +339,49 @@ trait BufExt: Sized {
}
impl BufExt for BytesMut {
fn try_get<T: FromBytes>(&mut self) -> Option<T> {
let res = T::read_from_prefix(self)?;
let (res, _) = T::read_from_prefix(self).ok()?;
self.advance(size_of::<T>());
Some(res)
}
}
#[derive(FromBytes, FromZeroes, Copy, Clone)]
#[repr(C)]
#[derive(FromBytes, KnownLayout, Immutable, Unaligned, Copy, Clone)]
#[repr(C, packed)]
struct ProxyProtocolV2Header {
signature: [u8; 12],
version_and_command: u8,
protocol_and_family: u8,
len: zerocopy::byteorder::network_endian::U16,
len: network_endian::U16,
}
#[derive(FromBytes, FromZeroes, Copy, Clone)]
#[repr(C)]
#[derive(FromBytes, KnownLayout, Immutable, Unaligned, Copy, Clone)]
#[repr(C, packed)]
struct ProxyProtocolV2HeaderV4 {
src_addr: NetworkEndianIpv4,
dst_addr: NetworkEndianIpv4,
src_port: zerocopy::byteorder::network_endian::U16,
dst_port: zerocopy::byteorder::network_endian::U16,
src_port: network_endian::U16,
dst_port: network_endian::U16,
}
#[derive(FromBytes, FromZeroes, Copy, Clone)]
#[repr(C)]
#[derive(FromBytes, KnownLayout, Immutable, Unaligned, Copy, Clone)]
#[repr(C, packed)]
struct ProxyProtocolV2HeaderV6 {
src_addr: NetworkEndianIpv6,
dst_addr: NetworkEndianIpv6,
src_port: zerocopy::byteorder::network_endian::U16,
dst_port: zerocopy::byteorder::network_endian::U16,
src_port: network_endian::U16,
dst_port: network_endian::U16,
}
#[derive(FromBytes, FromZeroes, Copy, Clone)]
#[repr(C)]
#[derive(FromBytes, KnownLayout, Immutable, Unaligned, Copy, Clone)]
#[repr(C, packed)]
struct TlvHeader {
kind: u8,
len: zerocopy::byteorder::network_endian::U16,
len: network_endian::U16,
}
#[derive(FromBytes, FromZeroes, Copy, Clone)]
#[derive(FromBytes, KnownLayout, Immutable, Unaligned, Copy, Clone)]
#[repr(transparent)]
struct NetworkEndianIpv4(zerocopy::byteorder::network_endian::U32);
struct NetworkEndianIpv4(network_endian::U32);
impl NetworkEndianIpv4 {
#[inline]
fn get(self) -> Ipv4Addr {
@@ -389,9 +389,9 @@ impl NetworkEndianIpv4 {
}
}
#[derive(FromBytes, FromZeroes, Copy, Clone)]
#[derive(FromBytes, KnownLayout, Immutable, Unaligned, Copy, Clone)]
#[repr(transparent)]
struct NetworkEndianIpv6(zerocopy::byteorder::network_endian::U128);
struct NetworkEndianIpv6(network_endian::U128);
impl NetworkEndianIpv6 {
#[inline]
fn get(self) -> Ipv6Addr {

View File

@@ -14,6 +14,7 @@ use clap::{ArgAction, Parser};
use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use http_utils::tls_certs::ReloadingCertificateResolver;
use metrics::set_build_info_metric;
use remote_storage::RemoteStorageConfig;
use safekeeper::defaults::{
@@ -23,8 +24,8 @@ use safekeeper::defaults::{
DEFAULT_SSL_CERT_RELOAD_PERIOD, DEFAULT_SSL_KEY_FILE,
};
use safekeeper::{
BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf, WAL_SERVICE_RUNTIME, broker,
control_file, http, wal_backup, wal_service,
BACKGROUND_RUNTIME, BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf,
WAL_SERVICE_RUNTIME, broker, control_file, http, wal_backup, wal_service,
};
use sd_notify::NotifyState;
use storage_broker::{DEFAULT_ENDPOINT, Uri};
@@ -215,16 +216,26 @@ struct Args {
ssl_cert_file: Utf8PathBuf,
/// Period to reload certificate and private key from files.
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_SSL_CERT_RELOAD_PERIOD)]
pub ssl_cert_reload_period: Duration,
ssl_cert_reload_period: Duration,
/// Trusted root CA certificates to use in https APIs.
#[arg(long)]
pub ssl_ca_file: Option<Utf8PathBuf>,
ssl_ca_file: Option<Utf8PathBuf>,
/// Flag to use https for requests to peer's safekeeper API.
#[arg(long)]
pub use_https_safekeeper_api: bool,
use_https_safekeeper_api: bool,
/// Path to the JWT auth token used to authenticate with other safekeepers.
#[arg(long)]
auth_token_path: Option<Utf8PathBuf>,
/// Enable TLS in WAL service API.
/// Does not force TLS: the client negotiates TLS usage during the handshake.
/// Uses key and certificate from ssl_key_file/ssl_cert_file.
#[arg(long)]
enable_tls_wal_service_api: bool,
/// Run in development mode (disables security checks)
#[arg(long, help = "Run in development mode (disables security checks)")]
dev: bool,
}
// Like PathBufValueParser, but allows empty string.
@@ -418,6 +429,7 @@ async fn main() -> anyhow::Result<()> {
ssl_cert_reload_period: args.ssl_cert_reload_period,
ssl_ca_certs,
use_https_safekeeper_api: args.use_https_safekeeper_api,
enable_tls_wal_service_api: args.enable_tls_wal_service_api,
});
// initialize sentry if SENTRY_DSN is provided
@@ -517,6 +529,36 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
info!("running in current thread runtime");
}
let tls_server_config = if conf.listen_https_addr.is_some() || conf.enable_tls_wal_service_api {
let ssl_key_file = conf.ssl_key_file.clone();
let ssl_cert_file = conf.ssl_cert_file.clone();
let ssl_cert_reload_period = conf.ssl_cert_reload_period;
// Create resolver in BACKGROUND_RUNTIME, so the background certificate reloading
// task is run in this runtime.
let cert_resolver = current_thread_rt
.as_ref()
.unwrap_or_else(|| BACKGROUND_RUNTIME.handle())
.spawn(async move {
ReloadingCertificateResolver::new(
"main",
&ssl_key_file,
&ssl_cert_file,
ssl_cert_reload_period,
)
.await
})
.await??;
let config = rustls::ServerConfig::builder()
.with_no_client_auth()
.with_cert_resolver(cert_resolver);
Some(Arc::new(config))
} else {
None
};
let wal_service_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
@@ -524,6 +566,9 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
conf.clone(),
pg_listener,
Scope::SafekeeperData,
conf.enable_tls_wal_service_api
.then(|| tls_server_config.clone())
.flatten(),
global_timelines.clone(),
))
// wrap with task name for error reporting
@@ -552,6 +597,9 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
conf.clone(),
pg_listener_tenant_only,
Scope::Tenant,
conf.enable_tls_wal_service_api
.then(|| tls_server_config.clone())
.flatten(),
global_timelines.clone(),
))
// wrap with task name for error reporting
@@ -577,6 +625,7 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
.spawn(http::task_main_https(
conf.clone(),
https_listener,
tls_server_config.expect("tls_server_config is set earlier if https is enabled"),
global_timelines.clone(),
))
.map(|res| ("HTTPS service main".to_owned(), res));

View File

@@ -1,7 +1,6 @@
pub mod routes;
use std::sync::Arc;
use http_utils::tls_certs::ReloadingCertificateResolver;
pub use routes::make_router;
pub use safekeeper_api::models;
use tokio_util::sync::CancellationToken;
@@ -28,21 +27,10 @@ pub async fn task_main_http(
pub async fn task_main_https(
conf: Arc<SafeKeeperConf>,
https_listener: std::net::TcpListener,
tls_config: Arc<rustls::ServerConfig>,
global_timelines: Arc<GlobalTimelines>,
) -> anyhow::Result<()> {
let cert_resolver = ReloadingCertificateResolver::new(
"main",
&conf.ssl_key_file,
&conf.ssl_cert_file,
conf.ssl_cert_reload_period,
)
.await?;
let server_config = rustls::ServerConfig::builder()
.with_no_client_auth()
.with_cert_resolver(cert_resolver);
let tls_acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(server_config));
let tls_acceptor = tokio_rustls::TlsAcceptor::from(tls_config);
let router = make_router(conf, global_timelines)
.build()

View File

@@ -122,6 +122,7 @@ pub struct SafeKeeperConf {
pub ssl_cert_reload_period: Duration,
pub ssl_ca_certs: Vec<Pem>,
pub use_https_safekeeper_api: bool,
pub enable_tls_wal_service_api: bool,
}
impl SafeKeeperConf {
@@ -172,6 +173,7 @@ impl SafeKeeperConf {
ssl_cert_reload_period: Duration::from_secs(60),
ssl_ca_certs: Vec::new(),
use_https_safekeeper_api: false,
enable_tls_wal_service_api: false,
}
}
}
@@ -209,3 +211,12 @@ pub static WAL_BACKUP_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
.build()
.expect("Failed to create WAL backup runtime")
});
pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("background worker")
.worker_threads(1) // there is only one task now (ssl certificate reloading), having more threads doesn't make sense
.enable_all()
.build()
.expect("Failed to create background runtime")
});

View File

@@ -29,6 +29,7 @@ pub async fn task_main(
conf: Arc<SafeKeeperConf>,
pg_listener: std::net::TcpListener,
allowed_auth_scope: Scope,
tls_config: Option<Arc<rustls::ServerConfig>>,
global_timelines: Arc<GlobalTimelines>,
) -> anyhow::Result<()> {
// Tokio's from_std won't do this for us, per its comment.
@@ -43,9 +44,10 @@ pub async fn task_main(
let conf = conf.clone();
let conn_id = issue_connection_id(&mut connection_count);
let global_timelines = global_timelines.clone();
let tls_config = tls_config.clone();
tokio::spawn(
async move {
if let Err(err) = handle_socket(socket, conf, conn_id, allowed_auth_scope, global_timelines).await {
if let Err(err) = handle_socket(socket, conf, conn_id, allowed_auth_scope, tls_config, global_timelines).await {
error!("connection handler exited: {}", err);
}
}
@@ -61,6 +63,7 @@ async fn handle_socket(
conf: Arc<SafeKeeperConf>,
conn_id: ConnectionId,
allowed_auth_scope: Scope,
tls_config: Option<Arc<rustls::ServerConfig>>,
global_timelines: Arc<GlobalTimelines>,
) -> Result<(), QueryError> {
socket.set_nodelay(true)?;
@@ -110,7 +113,8 @@ async fn handle_socket(
auth_pair,
global_timelines,
);
let pgbackend = PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, None)?;
let pgbackend =
PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, tls_config)?;
// libpq protocol between safekeeper and walproposer / pageserver
// We don't use shutdown.
pgbackend

View File

@@ -185,6 +185,7 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
ssl_cert_reload_period: Duration::ZERO,
ssl_ca_certs: Vec::new(),
use_https_safekeeper_api: false,
enable_tls_wal_service_api: false,
};
let mut global = GlobalMap::new(disk, conf.clone())?;

View File

@@ -0,0 +1 @@
DROP TABLE timeline_imports;

View File

@@ -0,0 +1,6 @@
CREATE TABLE timeline_imports (
tenant_id VARCHAR NOT NULL,
timeline_id VARCHAR NOT NULL,
shard_statuses JSONB NOT NULL,
PRIMARY KEY(tenant_id, timeline_id)
);

View File

@@ -30,7 +30,9 @@ use pageserver_api::models::{
TimelineArchivalConfigRequest, TimelineCreateRequest,
};
use pageserver_api::shard::TenantShardId;
use pageserver_api::upcall_api::{ReAttachRequest, ValidateRequest};
use pageserver_api::upcall_api::{
PutTimelineImportStatusRequest, ReAttachRequest, ValidateRequest,
};
use pageserver_client::{BlockUnblock, mgmt_api};
use routerify::Middleware;
use tokio_util::sync::CancellationToken;
@@ -154,6 +156,28 @@ async fn handle_validate(req: Request<Body>) -> Result<Response<Body>, ApiError>
json_response(StatusCode::OK, state.service.validate(validate_req).await?)
}
async fn handle_put_timeline_import_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::GenerationsApi)?;
let mut req = match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => {
return res;
}
ForwardOutcome::NotForwarded(req) => req,
};
let put_req = json_request::<PutTimelineImportStatusRequest>(&mut req).await?;
let state = get_state(&req);
json_response(
StatusCode::OK,
state
.service
.handle_timeline_shard_import_progress_upcall(put_req)
.await?,
)
}
/// Call into this before attaching a tenant to a pageserver, to acquire a generation number
/// (in the real control plane this is unnecessary, because the same program is managing
/// generation numbers and doing attachments).
@@ -1961,6 +1985,13 @@ pub fn make_router(
.post("/upcall/v1/validate", |r| {
named_request_span(r, handle_validate, RequestName("upcall_v1_validate"))
})
.post("/upcall/v1/timeline_import_status", |r| {
named_request_span(
r,
handle_put_timeline_import_status,
RequestName("upcall_v1_timeline_import_status"),
)
})
// Test/dev/debug endpoints
.post("/debug/v1/attach-hook", |r| {
named_request_span(r, handle_attach_hook, RequestName("debug_v1_attach_hook"))

View File

@@ -23,6 +23,7 @@ mod scheduler;
mod schema;
pub mod service;
mod tenant_shard;
mod timeline_import;
#[derive(Ord, PartialOrd, Eq, PartialEq, Copy, Clone, Serialize)]
struct Sequence(u64);

View File

@@ -212,6 +212,21 @@ impl PageserverClient {
)
}
pub(crate) async fn timeline_detail(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
) -> Result<TimelineInfo> {
measured_request!(
"timeline_detail",
crate::metrics::Method::Get,
&self.node_id_label,
self.inner
.timeline_detail(tenant_shard_id, timeline_id)
.await
)
}
pub(crate) async fn tenant_shard_split(
&self,
tenant_shard_id: TenantShardId,

View File

@@ -22,7 +22,7 @@ use pageserver_api::controller_api::{
AvailabilityZone, MetadataHealthRecord, NodeSchedulingPolicy, PlacementPolicy,
SafekeeperDescribeResponse, ShardSchedulingPolicy, SkSchedulingPolicy,
};
use pageserver_api::models::TenantConfig;
use pageserver_api::models::{ShardImportStatus, TenantConfig};
use pageserver_api::shard::{
ShardConfigError, ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId,
};
@@ -40,6 +40,9 @@ use crate::metrics::{
DatabaseQueryErrorLabelGroup, DatabaseQueryLatencyLabelGroup, METRICS_REGISTRY,
};
use crate::node::Node;
use crate::timeline_import::{
TimelineImport, TimelineImportUpdateError, TimelineImportUpdateFollowUp,
};
const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations");
/// ## What do we store?
@@ -127,6 +130,9 @@ pub(crate) enum DatabaseOperation {
RemoveTimelineReconcile,
ListTimelineReconcile,
ListTimelineReconcileStartup,
InsertTimelineImport,
UpdateTimelineImport,
DeleteTimelineImport,
}
#[must_use]
@@ -1614,6 +1620,129 @@ impl Persistence {
Ok(())
}
pub(crate) async fn insert_timeline_import(
&self,
import: TimelineImportPersistence,
) -> DatabaseResult<bool> {
self.with_measured_conn(DatabaseOperation::InsertTimelineImport, move |conn| {
Box::pin({
let import = import.clone();
async move {
let inserted = diesel::insert_into(crate::schema::timeline_imports::table)
.values(import)
.execute(conn)
.await?;
Ok(inserted == 1)
}
})
})
.await
}
pub(crate) async fn delete_timeline_import(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> DatabaseResult<()> {
use crate::schema::timeline_imports::dsl;
self.with_measured_conn(DatabaseOperation::DeleteTimelineImport, move |conn| {
Box::pin(async move {
diesel::delete(crate::schema::timeline_imports::table)
.filter(
dsl::tenant_id
.eq(tenant_id.to_string())
.and(dsl::timeline_id.eq(timeline_id.to_string())),
)
.execute(conn)
.await?;
Ok(())
})
})
.await
}
/// Idempotently update the status of one shard for an ongoing timeline import
///
/// If the update was persisted to the database, then the current state of the
/// import is returned to the caller. In case of logical errors a bespoke
/// [`TimelineImportUpdateError`] instance is returned. Other database errors
/// are covered by the outer [`DatabaseError`].
pub(crate) async fn update_timeline_import(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
shard_status: ShardImportStatus,
) -> DatabaseResult<Result<Option<TimelineImport>, TimelineImportUpdateError>> {
use crate::schema::timeline_imports::dsl;
self.with_measured_conn(DatabaseOperation::UpdateTimelineImport, move |conn| {
Box::pin({
let shard_status = shard_status.clone();
async move {
// Load the current state from the database
let mut from_db: Vec<TimelineImportPersistence> = dsl::timeline_imports
.filter(
dsl::tenant_id
.eq(tenant_shard_id.tenant_id.to_string())
.and(dsl::timeline_id.eq(timeline_id.to_string())),
)
.load(conn)
.await?;
assert!(from_db.len() <= 1);
let mut status = match from_db.pop() {
Some(some) => TimelineImport::from_persistent(some).unwrap(),
None => {
return Ok(Err(TimelineImportUpdateError::ImportNotFound {
tenant_id: tenant_shard_id.tenant_id,
timeline_id,
}));
}
};
// Perform the update in-memory
let follow_up = match status.update(tenant_shard_id.to_index(), shard_status) {
Ok(ok) => ok,
Err(err) => {
return Ok(Err(err));
}
};
let new_persistent = status.to_persistent();
// Write back if required (in the same transaction)
match follow_up {
TimelineImportUpdateFollowUp::Persist => {
let updated = diesel::update(dsl::timeline_imports)
.filter(
dsl::tenant_id
.eq(tenant_shard_id.tenant_id.to_string())
.and(dsl::timeline_id.eq(timeline_id.to_string())),
)
.set(dsl::shard_statuses.eq(new_persistent.shard_statuses))
.execute(conn)
.await?;
if updated != 1 {
return Ok(Err(TimelineImportUpdateError::ImportNotFound {
tenant_id: tenant_shard_id.tenant_id,
timeline_id,
}));
}
Ok(Ok(Some(status)))
}
TimelineImportUpdateFollowUp::None => Ok(Ok(None)),
}
}
})
})
.await
}
}
pub(crate) fn load_certs() -> anyhow::Result<Arc<rustls::RootCertStore>> {
@@ -2171,3 +2300,11 @@ impl ToSql<diesel::sql_types::VarChar, Pg> for SafekeeperTimelineOpKind {
.map_err(Into::into)
}
}
#[derive(Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq, Clone)]
#[diesel(table_name = crate::schema::timeline_imports)]
pub(crate) struct TimelineImportPersistence {
pub(crate) tenant_id: String,
pub(crate) timeline_id: String,
pub(crate) shard_statuses: serde_json::Value,
}

View File

@@ -76,6 +76,14 @@ diesel::table! {
}
}
diesel::table! {
timeline_imports (tenant_id, timeline_id) {
tenant_id -> Varchar,
timeline_id -> Varchar,
shard_statuses -> Jsonb,
}
}
diesel::table! {
use diesel::sql_types::*;
use super::sql_types::PgLsn;
@@ -99,5 +107,6 @@ diesel::allow_tables_to_appear_in_same_query!(
safekeeper_timeline_pending_ops,
safekeepers,
tenant_shards,
timeline_imports,
timelines,
);

View File

@@ -40,14 +40,14 @@ use pageserver_api::models::{
TenantLocationConfigResponse, TenantShardLocation, TenantShardSplitRequest,
TenantShardSplitResponse, TenantSorting, TenantTimeTravelRequest,
TimelineArchivalConfigRequest, TimelineCreateRequest, TimelineCreateResponseStorcon,
TimelineInfo, TopTenantShardItem, TopTenantShardsRequest,
TimelineInfo, TimelineState, TopTenantShardItem, TopTenantShardsRequest,
};
use pageserver_api::shard::{
DEFAULT_STRIPE_SIZE, ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId,
};
use pageserver_api::upcall_api::{
ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest, ValidateResponse,
ValidateResponseTenant,
PutTimelineImportStatusRequest, ReAttachRequest, ReAttachResponse, ReAttachResponseTenant,
ValidateRequest, ValidateResponse, ValidateResponseTenant,
};
use pageserver_client::{BlockUnblock, mgmt_api};
use reqwest::{Certificate, StatusCode};
@@ -97,6 +97,7 @@ use crate::tenant_shard::{
ReconcileNeeded, ReconcileResult, ReconcileWaitError, ReconcilerStatus, ReconcilerWaiter,
ScheduleOptimization, ScheduleOptimizationAction, TenantShard,
};
use crate::timeline_import::{ShardImportStatuses, TimelineImport, UpcallClient};
const WAITER_FILL_DRAIN_POLL_TIMEOUT: Duration = Duration::from_millis(500);
@@ -3732,11 +3733,14 @@ impl Service {
create_req: TimelineCreateRequest,
) -> Result<TimelineCreateResponseStorcon, ApiError> {
let safekeepers = self.config.timelines_onto_safekeepers;
let timeline_id = create_req.new_timeline_id;
tracing::info!(
mode=%create_req.mode_tag(),
%safekeepers,
"Creating timeline {}/{}",
tenant_id,
create_req.new_timeline_id,
timeline_id,
);
let _tenant_lock = trace_shared_lock(
@@ -3746,15 +3750,62 @@ impl Service {
)
.await;
failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock");
let create_mode = create_req.mode.clone();
let is_import = create_req.is_import();
let timeline_info = self
.tenant_timeline_create_pageservers(tenant_id, create_req)
.await?;
let safekeepers = if safekeepers {
let selected_safekeepers = if is_import {
let shards = {
let locked = self.inner.read().unwrap();
locked
.tenants
.range(TenantShardId::tenant_range(tenant_id))
.map(|(ts_id, _)| ts_id.to_index())
.collect::<Vec<_>>()
};
if !shards
.iter()
.map(|shard_index| shard_index.shard_count)
.all_equal()
{
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Inconsistent shard count"
)));
}
let import = TimelineImport {
tenant_id,
timeline_id,
shard_statuses: ShardImportStatuses::new(shards),
};
let inserted = self
.persistence
.insert_timeline_import(import.to_persistent())
.await
.context("timeline import insert")
.map_err(ApiError::InternalServerError)?;
match inserted {
true => {
tracing::info!(%tenant_id, %timeline_id, "Inserted timeline import");
}
false => {
tracing::info!(%tenant_id, %timeline_id, "Timeline import entry already present");
}
}
None
} else if safekeepers {
// Note that we do not support creating the timeline on the safekeepers
// for imported timelines. The `start_lsn` of the timeline is not known
// until the import finshes.
// https://github.com/neondatabase/neon/issues/11569
let res = self
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info, create_mode)
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info)
.instrument(tracing::info_span!("timeline_create_safekeepers", %tenant_id, timeline_id=%timeline_info.timeline_id))
.await?;
Some(res)
@@ -3764,10 +3815,168 @@ impl Service {
Ok(TimelineCreateResponseStorcon {
timeline_info,
safekeepers,
safekeepers: selected_safekeepers,
})
}
pub(crate) async fn handle_timeline_shard_import_progress_upcall(
self: &Arc<Self>,
req: PutTimelineImportStatusRequest,
) -> Result<(), ApiError> {
let res = self
.persistence
.update_timeline_import(req.tenant_shard_id, req.timeline_id, req.status)
.await;
let timeline_import = match res {
Ok(Ok(Some(timeline_import))) => timeline_import,
Ok(Ok(None)) => {
// Idempotency: we've already seen and handled this update.
return Ok(());
}
Ok(Err(logical_err)) => {
return Err(logical_err.into());
}
Err(db_err) => {
return Err(db_err.into());
}
};
tracing::info!(
tenant_id=%req.tenant_shard_id.tenant_id,
timeline_id=%req.timeline_id,
shard_id=%req.tenant_shard_id.shard_slug(),
"Updated timeline import status to: {timeline_import:?}");
if timeline_import.is_complete() {
tokio::task::spawn({
let this = self.clone();
async move { this.finalize_timeline_import(timeline_import).await }
});
}
Ok(())
}
#[instrument(skip_all, fields(
tenant_id=%import.tenant_id,
shard_id=%import.timeline_id,
))]
async fn finalize_timeline_import(
self: &Arc<Self>,
import: TimelineImport,
) -> anyhow::Result<()> {
// TODO(vlad): On start-up, load up the imports and notify cplane of the
// ones that have been completed. This assumes the new cplane API will
// be idempotent. If that's not possible, bang a flag in the database.
// https://github.com/neondatabase/neon/issues/11570
tracing::info!("Finalizing timeline import");
let import_failed = import.completion_error().is_some();
if !import_failed {
loop {
if self.cancel.is_cancelled() {
anyhow::bail!("Shut down requested while finalizing import");
}
let active = self.timeline_active_on_all_shards(&import).await?;
match active {
true => {
tracing::info!("Timeline became active on all shards");
break;
}
false => {
tracing::info!("Timeline not active on all shards yet");
tokio::select! {
_ = self.cancel.cancelled() => {
anyhow::bail!("Shut down requested while finalizing import");
},
_ = tokio::time::sleep(Duration::from_secs(5)) => {}
};
}
}
}
}
tracing::info!(%import_failed, "Notifying cplane of import completion");
let client = UpcallClient::new(self.get_config(), self.cancel.child_token());
client.notify_import_complete(&import).await?;
if let Err(err) = self
.persistence
.delete_timeline_import(import.tenant_id, import.timeline_id)
.await
{
tracing::warn!("Failed to delete timeline import entry from database: {err}");
}
// TODO(vlad): Timeline creations in import mode do not return a correct initdb lsn,
// so we can't create the timeline on the safekeepers. Fix by moving creation here.
// https://github.com/neondatabase/neon/issues/11569
tracing::info!(%import_failed, "Timeline import complete");
Ok(())
}
async fn timeline_active_on_all_shards(
self: &Arc<Self>,
import: &TimelineImport,
) -> anyhow::Result<bool> {
let targets = {
let locked = self.inner.read().unwrap();
let mut targets = Vec::new();
for (tenant_shard_id, shard) in locked
.tenants
.range(TenantShardId::tenant_range(import.tenant_id))
{
if !import
.shard_statuses
.0
.contains_key(&tenant_shard_id.to_index())
{
anyhow::bail!("Shard layout change detected on completion");
}
if let Some(node_id) = shard.intent.get_attached() {
let node = locked
.nodes
.get(node_id)
.expect("Pageservers may not be deleted while referenced");
targets.push((*tenant_shard_id, node.clone()));
} else {
return Ok(false);
}
}
targets
};
let results = self
.tenant_for_shards_api(
targets,
|tenant_shard_id, client| async move {
client
.timeline_detail(tenant_shard_id, import.timeline_id)
.await
},
1,
1,
SHORT_RECONCILE_TIMEOUT,
&self.cancel,
)
.await;
Ok(results.into_iter().all(|res| match res {
Ok(info) => info.state == TimelineState::Active,
Err(_) => false,
}))
}
pub(crate) async fn tenant_timeline_archival_config(
&self,
tenant_id: TenantId,

View File

@@ -15,7 +15,7 @@ use http_utils::error::ApiError;
use pageserver_api::controller_api::{
SafekeeperDescribeResponse, SkSchedulingPolicy, TimelineImportRequest,
};
use pageserver_api::models::{self, SafekeeperInfo, SafekeepersInfo, TimelineInfo};
use pageserver_api::models::{SafekeeperInfo, SafekeepersInfo, TimelineInfo};
use safekeeper_api::membership::{MemberSet, SafekeeperId};
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
@@ -151,11 +151,39 @@ impl Service {
"Got {} non-successful responses from initial creation request of total {total_result_count} responses",
remaining.len()
);
if remaining.len() >= 2 {
let target_sk_count = timeline_persistence.sk_set.len();
let quorum_size = match target_sk_count {
0 => {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"timeline configured without any safekeepers",
)));
}
1 | 2 => {
#[cfg(feature = "testing")]
{
// In test settings, it is allowed to have one or two safekeepers
target_sk_count
}
#[cfg(not(feature = "testing"))]
{
// The region is misconfigured: we need at least three safekeepers to be configured
// in order to schedule work to them
tracing::warn!(
"couldn't find at least 3 safekeepers for timeline, found: {:?}",
timeline_persistence.sk_set
);
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"couldn't find at least 3 safekeepers to put timeline to"
)));
}
}
_ => target_sk_count / 2 + 1,
};
let success_count = target_sk_count - remaining.len();
if success_count < quorum_size {
// Failure
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"not enough successful reconciliations to reach quorum, please retry: {} errored",
remaining.len()
"not enough successful reconciliations to reach quorum size: {success_count} of {quorum_size} of total {target_sk_count}"
)));
}
@@ -179,7 +207,6 @@ impl Service {
self: &Arc<Self>,
tenant_id: TenantId,
timeline_info: &TimelineInfo,
create_mode: models::TimelineCreateRequestMode,
) -> Result<SafekeepersInfo, ApiError> {
let timeline_id = timeline_info.timeline_id;
let pg_version = timeline_info.pg_version * 10000;
@@ -189,15 +216,8 @@ impl Service {
// previously existed as on retries in theory endpoint might have
// already written some data and advanced last_record_lsn, while we want
// safekeepers to have consistent start_lsn.
let start_lsn = match create_mode {
models::TimelineCreateRequestMode::Bootstrap { .. } => timeline_info.last_record_lsn,
models::TimelineCreateRequestMode::Branch { .. } => timeline_info.last_record_lsn,
models::TimelineCreateRequestMode::ImportPgdata { .. } => {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"import pgdata doesn't specify the start lsn, aborting creation on safekeepers"
)))?;
}
};
let start_lsn = timeline_info.last_record_lsn;
// Choose initial set of safekeepers respecting affinity
let sks = self.safekeepers_for_new_timeline().await?;
let sks_persistence = sks.iter().map(|sk| sk.id.0 as i64).collect::<Vec<_>>();
@@ -492,8 +512,6 @@ impl Service {
pub(crate) async fn safekeepers_for_new_timeline(
&self,
) -> Result<Vec<SafekeeperInfo>, ApiError> {
// Number of safekeepers in different AZs we are looking for
let wanted_count = 3;
let mut all_safekeepers = {
let locked = self.inner.read().unwrap();
locked
@@ -532,6 +550,19 @@ impl Service {
sk.1.id.0,
)
});
// Number of safekeepers in different AZs we are looking for
let wanted_count = match all_safekeepers.len() {
0 => {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"couldn't find any active safekeeper for new timeline",
)));
}
// Have laxer requirements on testig mode as we don't want to
// spin up three safekeepers for every single test
#[cfg(feature = "testing")]
1 | 2 => all_safekeepers.len(),
_ => 3,
};
let mut sks = Vec::new();
let mut azs = HashSet::new();
for (_sk_util, sk_info, az_id) in all_safekeepers.iter() {

View File

@@ -0,0 +1,260 @@
use std::time::Duration;
use std::{collections::HashMap, str::FromStr};
use http_utils::error::ApiError;
use reqwest::Method;
use serde::{Deserialize, Serialize};
use pageserver_api::models::ShardImportStatus;
use tokio_util::sync::CancellationToken;
use utils::{
id::{TenantId, TimelineId},
shard::ShardIndex,
};
use crate::{persistence::TimelineImportPersistence, service::Config};
#[derive(Serialize, Deserialize, Clone, Debug)]
pub(crate) struct ShardImportStatuses(pub(crate) HashMap<ShardIndex, ShardImportStatus>);
impl ShardImportStatuses {
pub(crate) fn new(shards: Vec<ShardIndex>) -> Self {
ShardImportStatuses(
shards
.into_iter()
.map(|ts_id| (ts_id, ShardImportStatus::InProgress))
.collect(),
)
}
}
#[derive(Debug)]
pub(crate) struct TimelineImport {
pub(crate) tenant_id: TenantId,
pub(crate) timeline_id: TimelineId,
pub(crate) shard_statuses: ShardImportStatuses,
}
pub(crate) enum TimelineImportUpdateFollowUp {
Persist,
None,
}
pub(crate) enum TimelineImportUpdateError {
ImportNotFound {
tenant_id: TenantId,
timeline_id: TimelineId,
},
MismatchedShards,
UnexpectedUpdate,
}
impl From<TimelineImportUpdateError> for ApiError {
fn from(err: TimelineImportUpdateError) -> ApiError {
match err {
TimelineImportUpdateError::ImportNotFound {
tenant_id,
timeline_id,
} => ApiError::NotFound(
anyhow::anyhow!("Import for {tenant_id}/{timeline_id} not found").into(),
),
TimelineImportUpdateError::MismatchedShards => {
ApiError::InternalServerError(anyhow::anyhow!(
"Import shards do not match update request, likely a shard split happened during import, this is a bug"
))
}
TimelineImportUpdateError::UnexpectedUpdate => {
ApiError::InternalServerError(anyhow::anyhow!("Update request is unexpected"))
}
}
}
}
impl TimelineImport {
pub(crate) fn from_persistent(persistent: TimelineImportPersistence) -> anyhow::Result<Self> {
let tenant_id = TenantId::from_str(persistent.tenant_id.as_str())?;
let timeline_id = TimelineId::from_str(persistent.timeline_id.as_str())?;
let shard_statuses = serde_json::from_value(persistent.shard_statuses)?;
Ok(TimelineImport {
tenant_id,
timeline_id,
shard_statuses,
})
}
pub(crate) fn to_persistent(&self) -> TimelineImportPersistence {
TimelineImportPersistence {
tenant_id: self.tenant_id.to_string(),
timeline_id: self.timeline_id.to_string(),
shard_statuses: serde_json::to_value(self.shard_statuses.clone()).unwrap(),
}
}
pub(crate) fn update(
&mut self,
shard: ShardIndex,
status: ShardImportStatus,
) -> Result<TimelineImportUpdateFollowUp, TimelineImportUpdateError> {
use std::collections::hash_map::Entry::*;
match self.shard_statuses.0.entry(shard) {
Occupied(mut occ) => {
let crnt = occ.get_mut();
if *crnt == status {
Ok(TimelineImportUpdateFollowUp::None)
} else if crnt.is_terminal() && !status.is_terminal() {
Err(TimelineImportUpdateError::UnexpectedUpdate)
} else {
*crnt = status;
Ok(TimelineImportUpdateFollowUp::Persist)
}
}
Vacant(_) => Err(TimelineImportUpdateError::MismatchedShards),
}
}
pub(crate) fn is_complete(&self) -> bool {
self.shard_statuses
.0
.values()
.all(|status| status.is_terminal())
}
pub(crate) fn completion_error(&self) -> Option<String> {
assert!(self.is_complete());
let shard_errors: HashMap<_, _> = self
.shard_statuses
.0
.iter()
.filter_map(|(shard, status)| {
if let ShardImportStatus::Error(err) = status {
Some((*shard, err.clone()))
} else {
None
}
})
.collect();
if shard_errors.is_empty() {
None
} else {
Some(serde_json::to_string(&shard_errors).unwrap())
}
}
}
pub(crate) struct UpcallClient {
authorization_header: Option<String>,
client: reqwest::Client,
cancel: CancellationToken,
base_url: String,
}
const IMPORT_COMPLETE_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(Serialize, Deserialize, Debug)]
struct ImportCompleteRequest {
tenant_id: TenantId,
timeline_id: TimelineId,
error: Option<String>,
}
impl UpcallClient {
pub(crate) fn new(config: &Config, cancel: CancellationToken) -> Self {
let authorization_header = config
.control_plane_jwt_token
.clone()
.map(|jwt| format!("Bearer {}", jwt));
let client = reqwest::ClientBuilder::new()
.timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT)
.build()
.expect("Failed to construct HTTP client");
let base_url = config
.control_plane_url
.clone()
.expect("must be configured");
Self {
authorization_header,
client,
cancel,
base_url,
}
}
/// Notify control plane of a completed import
///
/// This method guarantees at least once delivery semantics assuming
/// eventual cplane availability. The cplane API is idempotent.
pub(crate) async fn notify_import_complete(
&self,
import: &TimelineImport,
) -> anyhow::Result<()> {
let endpoint = if self.base_url.ends_with('/') {
format!("{}import_complete", self.base_url)
} else {
format!("{}/import_complete", self.base_url)
};
tracing::info!("Endpoint is {endpoint}");
let request = self
.client
.request(Method::PUT, endpoint)
.json(&ImportCompleteRequest {
tenant_id: import.tenant_id,
timeline_id: import.timeline_id,
error: import.completion_error(),
})
.timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT);
let request = if let Some(auth) = &self.authorization_header {
request.header(reqwest::header::AUTHORIZATION, auth)
} else {
request
};
const RETRY_DELAY: Duration = Duration::from_secs(1);
let mut attempt = 1;
loop {
if self.cancel.is_cancelled() {
return Err(anyhow::anyhow!(
"Shutting down while notifying cplane of import completion"
));
}
match request.try_clone().unwrap().send().await {
Ok(response) if response.status().is_success() => {
return Ok(());
}
Ok(response) => {
tracing::warn!(
"Import complete notification failed with status {}, attempt {}",
response.status(),
attempt
);
}
Err(e) => {
tracing::warn!(
"Import complete notification failed with error: {}, attempt {}",
e,
attempt
);
}
}
tokio::select! {
_ = tokio::time::sleep(RETRY_DELAY) => {}
_ = self.cancel.cancelled() => {
return Err(anyhow::anyhow!("Shutting down while notifying cplane of import completion"));
}
}
attempt += 1;
}
}
}

View File

@@ -5,8 +5,6 @@ edition = "2024"
license.workspace = true
[dependencies]
aws-config.workspace = true
aws-sdk-s3.workspace = true
either.workspace = true
anyhow.workspace = true
hex.workspace = true

View File

@@ -12,14 +12,9 @@ pub mod tenant_snapshot;
use std::env;
use std::fmt::Display;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use anyhow::Context;
use aws_config::retry::{RetryConfigBuilder, RetryMode};
use aws_sdk_s3::Client;
use aws_sdk_s3::config::Region;
use aws_sdk_s3::error::DisplayErrorContext;
use camino::{Utf8Path, Utf8PathBuf};
use clap::ValueEnum;
use futures::{Stream, StreamExt};
@@ -28,7 +23,7 @@ use pageserver::tenant::remote_timeline_client::{remote_tenant_path, remote_time
use pageserver_api::shard::TenantShardId;
use remote_storage::{
DownloadOpts, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorageConfig,
RemoteStorageKind, S3Config,
RemoteStorageKind, VersionId,
};
use reqwest::Url;
use serde::{Deserialize, Serialize};
@@ -351,21 +346,6 @@ pub fn init_logging(file_name: &str) -> Option<WorkerGuard> {
}
}
async fn init_s3_client(bucket_region: Region) -> Client {
let mut retry_config_builder = RetryConfigBuilder::new();
retry_config_builder
.set_max_attempts(Some(3))
.set_mode(Some(RetryMode::Adaptive));
let config = aws_config::defaults(aws_config::BehaviorVersion::v2024_03_28())
.region(bucket_region)
.retry_config(retry_config_builder.build())
.load()
.await;
Client::new(&config)
}
fn default_prefix_in_bucket(node_kind: NodeKind) -> &'static str {
match node_kind {
NodeKind::Pageserver => "pageserver/v1/",
@@ -385,23 +365,6 @@ fn make_root_target(desc_str: String, prefix_in_bucket: String, node_kind: NodeK
}
}
async fn init_remote_s3(
bucket_config: S3Config,
node_kind: NodeKind,
) -> anyhow::Result<(Arc<Client>, RootTarget)> {
let bucket_region = Region::new(bucket_config.bucket_region);
let s3_client = Arc::new(init_s3_client(bucket_region).await);
let default_prefix = default_prefix_in_bucket(node_kind).to_string();
let s3_root = make_root_target(
bucket_config.bucket_name,
bucket_config.prefix_in_bucket.unwrap_or(default_prefix),
node_kind,
);
Ok((s3_client, s3_root))
}
async fn init_remote(
mut storage_config: BucketConfig,
node_kind: NodeKind,
@@ -499,7 +462,7 @@ async fn list_objects_with_retries(
remote_client.bucket_name().unwrap_or_default(),
s3_target.prefix_in_bucket,
s3_target.delimiter,
DisplayErrorContext(e),
e,
);
let backoff_time = 1 << trial.min(5);
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
@@ -549,14 +512,18 @@ async fn download_object_with_retries(
anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
}
async fn download_object_to_file_s3(
s3_client: &Client,
bucket_name: &str,
key: &str,
version_id: Option<&str>,
async fn download_object_to_file(
remote_storage: &GenericRemoteStorage,
key: &RemotePath,
version_id: Option<VersionId>,
local_path: &Utf8Path,
) -> anyhow::Result<()> {
let opts = DownloadOpts {
version_id: version_id.clone(),
..Default::default()
};
let tmp_path = Utf8PathBuf::from(format!("{local_path}.tmp"));
let cancel = CancellationToken::new();
for _ in 0..MAX_RETRIES {
tokio::fs::remove_file(&tmp_path)
.await
@@ -566,28 +533,24 @@ async fn download_object_to_file_s3(
.await
.context("Opening output file")?;
let request = s3_client.get_object().bucket(bucket_name).key(key);
let res = remote_storage.download(key, &opts, &cancel).await;
let request = match version_id {
Some(version_id) => request.version_id(version_id),
None => request,
};
let response_stream = match request.send().await {
let download = match res {
Ok(response) => response,
Err(e) => {
error!(
"Failed to download object for key {key} version {}: {e:#}",
version_id.unwrap_or("")
"Failed to download object for key {key} version {:?}: {e:#}",
&version_id.as_ref().unwrap_or(&VersionId(String::new()))
);
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
};
let mut read_stream = response_stream.body.into_async_read();
//response_stream.download_stream
tokio::io::copy(&mut read_stream, &mut file).await?;
let mut body = tokio_util::io::StreamReader::new(download.download_stream);
tokio::io::copy(&mut body, &mut file).await?;
tokio::fs::rename(&tmp_path, local_path).await?;
return Ok(());

View File

@@ -1,31 +1,30 @@
use std::collections::HashMap;
use std::sync::Arc;
use anyhow::Context;
use async_stream::stream;
use aws_sdk_s3::Client;
use camino::Utf8PathBuf;
use futures::{StreamExt, TryStreamExt};
use pageserver::tenant::IndexPart;
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
use pageserver::tenant::remote_timeline_client::remote_layer_path;
use pageserver::tenant::storage_layer::LayerName;
use pageserver_api::shard::TenantShardId;
use remote_storage::{GenericRemoteStorage, S3Config};
use remote_storage::GenericRemoteStorage;
use tokio_util::sync::CancellationToken;
use utils::generation::Generation;
use utils::id::TenantId;
use crate::checks::{BlobDataParseResult, RemoteTimelineBlobData, list_timeline_blobs};
use crate::metadata_stream::{stream_tenant_shards, stream_tenant_timelines};
use crate::{
BucketConfig, NodeKind, RootTarget, TenantShardTimelineId, download_object_to_file_s3,
init_remote, init_remote_s3,
BucketConfig, NodeKind, RootTarget, TenantShardTimelineId, download_object_to_file, init_remote,
};
pub struct SnapshotDownloader {
s3_client: Arc<Client>,
s3_root: RootTarget,
remote_client: GenericRemoteStorage,
#[allow(dead_code)]
target: RootTarget,
bucket_config: BucketConfig,
bucket_config_s3: S3Config,
tenant_id: TenantId,
output_path: Utf8PathBuf,
concurrency: usize,
@@ -38,17 +37,13 @@ impl SnapshotDownloader {
output_path: Utf8PathBuf,
concurrency: usize,
) -> anyhow::Result<Self> {
let bucket_config_s3 = match &bucket_config.0.storage {
remote_storage::RemoteStorageKind::AwsS3(config) => config.clone(),
_ => panic!("only S3 configuration is supported for snapshot downloading"),
};
let (s3_client, s3_root) =
init_remote_s3(bucket_config_s3.clone(), NodeKind::Pageserver).await?;
let (remote_client, target) =
init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
Ok(Self {
s3_client,
s3_root,
remote_client,
target,
bucket_config,
bucket_config_s3,
tenant_id,
output_path,
concurrency,
@@ -61,6 +56,7 @@ impl SnapshotDownloader {
layer_name: LayerName,
layer_metadata: LayerFileMetadata,
) -> anyhow::Result<(LayerName, LayerFileMetadata)> {
let cancel = CancellationToken::new();
// Note this is local as in a local copy of S3 data, not local as in the pageserver's local format. They use
// different layer names (remote-style has the generation suffix)
let local_path = self.output_path.join(format!(
@@ -82,30 +78,27 @@ impl SnapshotDownloader {
} else {
tracing::debug!("{} requires download...", local_path);
let timeline_root = self.s3_root.timeline_root(&ttid);
let remote_layer_path = format!(
"{}{}{}",
timeline_root.prefix_in_bucket,
layer_name,
layer_metadata.generation.get_suffix()
let remote_path = remote_layer_path(
&ttid.tenant_shard_id.tenant_id,
&ttid.timeline_id,
layer_metadata.shard,
&layer_name,
layer_metadata.generation,
);
let mode = remote_storage::ListingMode::NoDelimiter;
// List versions: the object might be deleted.
let versions = self
.s3_client
.list_object_versions()
.bucket(self.bucket_config_s3.bucket_name.clone())
.prefix(&remote_layer_path)
.send()
.remote_client
.list_versions(Some(&remote_path), mode, None, &cancel)
.await?;
let Some(version) = versions.versions.as_ref().and_then(|v| v.first()) else {
return Err(anyhow::anyhow!("No versions found for {remote_layer_path}"));
let Some(version) = versions.versions.first() else {
return Err(anyhow::anyhow!("No versions found for {remote_path}"));
};
download_object_to_file_s3(
&self.s3_client,
&self.bucket_config_s3.bucket_name,
&remote_layer_path,
version.version_id.as_deref(),
download_object_to_file(
&self.remote_client,
&remote_path,
version.version_id().cloned(),
&local_path,
)
.await?;

View File

@@ -417,14 +417,14 @@ class NeonLocalCli(AbstractNeonCli):
cmd.append(f"--instance-id={instance_id}")
return self.raw_cli(cmd)
def object_storage_start(self, timeout_in_seconds: int | None = None):
cmd = ["object-storage", "start"]
def endpoint_storage_start(self, timeout_in_seconds: int | None = None):
cmd = ["endpoint-storage", "start"]
if timeout_in_seconds is not None:
cmd.append(f"--start-timeout={timeout_in_seconds}s")
return self.raw_cli(cmd)
def object_storage_stop(self, immediate: bool):
cmd = ["object-storage", "stop"]
def endpoint_storage_stop(self, immediate: bool):
cmd = ["endpoint-storage", "stop"]
if immediate:
cmd.extend(["-m", "immediate"])
return self.raw_cli(cmd)

View File

@@ -1029,7 +1029,7 @@ class NeonEnvBuilder:
self.env.broker.assert_no_errors()
self.env.object_storage.assert_no_errors()
self.env.endpoint_storage.assert_no_errors()
try:
self.overlay_cleanup_teardown()
@@ -1126,7 +1126,7 @@ class NeonEnv:
pagectl_env_vars["RUST_LOG"] = self.rust_log_override
self.pagectl = Pagectl(extra_env=pagectl_env_vars, binpath=self.neon_binpath)
self.object_storage = ObjectStorage(self)
self.endpoint_storage = EndpointStorage(self)
# The URL for the pageserver to use as its control_plane_api config
if config.storage_controller_port_override is not None:
@@ -1183,7 +1183,7 @@ class NeonEnv:
},
"safekeepers": [],
"pageservers": [],
"object_storage": {"port": self.port_distributor.get_port()},
"endpoint_storage": {"port": self.port_distributor.get_port()},
"generate_local_ssl_certs": self.generate_local_ssl_certs,
}
@@ -1420,7 +1420,7 @@ class NeonEnv:
self.storage_controller.on_safekeeper_deploy(sk_id, body)
self.storage_controller.safekeeper_scheduling_policy(sk_id, "Active")
self.object_storage.start(timeout_in_seconds=timeout_in_seconds)
self.endpoint_storage.start(timeout_in_seconds=timeout_in_seconds)
def stop(self, immediate=False, ps_assert_metric_no_errors=False, fail_on_endpoint_errors=True):
"""
@@ -1439,7 +1439,7 @@ class NeonEnv:
except Exception as e:
raise_later = e
self.object_storage.stop(immediate=immediate)
self.endpoint_storage.stop(immediate=immediate)
# Stop storage controller before pageservers: we don't want it to spuriously
# detect a pageserver "failure" during test teardown
@@ -2660,24 +2660,24 @@ class NeonStorageController(MetricsGetter, LogUtils):
self.stop(immediate=True)
class ObjectStorage(LogUtils):
class EndpointStorage(LogUtils):
def __init__(self, env: NeonEnv):
service_dir = env.repo_dir / "object_storage"
super().__init__(logfile=service_dir / "object_storage.log")
self.conf_path = service_dir / "object_storage.json"
service_dir = env.repo_dir / "endpoint_storage"
super().__init__(logfile=service_dir / "endpoint_storage.log")
self.conf_path = service_dir / "endpoint_storage.json"
self.env = env
def base_url(self):
return json.loads(self.conf_path.read_text())["listen"]
def start(self, timeout_in_seconds: int | None = None):
self.env.neon_cli.object_storage_start(timeout_in_seconds)
self.env.neon_cli.endpoint_storage_start(timeout_in_seconds)
def stop(self, immediate: bool = False):
self.env.neon_cli.object_storage_stop(immediate)
self.env.neon_cli.endpoint_storage_stop(immediate)
def assert_no_errors(self):
assert_no_errors(self.logfile, "object_storage", [])
assert_no_errors(self.logfile, "endpoint_storage", [])
class NeonProxiedStorageController(NeonStorageController):

View File

@@ -65,7 +65,7 @@ def test_ro_replica_lag(
project = neon_api.create_project(pg_version)
project_id = project["project"]["id"]
log.info("Project ID: %s", project_id)
log.info("Primary endpoint ID: %s", project["project"]["endpoints"][0]["id"])
log.info("Primary endpoint ID: %s", project["endpoints"][0]["id"])
neon_api.wait_for_operation_to_finish(project_id)
error_occurred = False
try:
@@ -198,7 +198,7 @@ def test_replication_start_stop(
project = neon_api.create_project(pg_version)
project_id = project["project"]["id"]
log.info("Project ID: %s", project_id)
log.info("Primary endpoint ID: %s", project["project"]["endpoints"][0]["id"])
log.info("Primary endpoint ID: %s", project["endpoints"][0]["id"])
neon_api.wait_for_operation_to_finish(project_id)
try:
branch_id = project["branch"]["id"]

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import os
import platform
import shutil
import tarfile
from typing import TYPE_CHECKING
@@ -58,7 +59,18 @@ def test_remote_extensions(
extensions_endpoint = f"http://{host}:{port}/pg-ext-s3-gateway"
build_tag = os.environ.get("BUILD_TAG", "latest")
archive_route = f"{build_tag}/v{pg_version}/extensions/test_extension.tar.zst"
# We have decided to use the Go naming convention due to Kubernetes.
arch = platform.machine()
match arch:
case "aarch64":
arch = "arm64"
case "x86_64":
arch = "amd64"
case _:
pass
archive_route = f"{build_tag}/{arch}/v{pg_version}/extensions/test_extension.tar.zst"
tarball = test_output_dir / "test_extension.tar"
extension_dir = (
base_dir / "test_runner" / "regress" / "data" / "test_remote_extensions" / "test_extension"

View File

@@ -8,7 +8,7 @@ from jwcrypto import jwk, jwt
@pytest.mark.asyncio
async def test_object_storage_insert_retrieve_delete(neon_simple_env: NeonEnv):
async def test_endpoint_storage_insert_retrieve_delete(neon_simple_env: NeonEnv):
"""
Inserts, retrieves, and deletes test file using a JWT token
"""
@@ -31,7 +31,7 @@ async def test_object_storage_insert_retrieve_delete(neon_simple_env: NeonEnv):
token.make_signed_token(key)
token = token.serialize()
base_url = env.object_storage.base_url()
base_url = env.endpoint_storage.base_url()
key = f"http://{base_url}/{tenant_id}/{timeline_id}/{endpoint_id}/key"
headers = {"Authorization": f"Bearer {token}"}
log.info(f"cache key url {key}")

View File

@@ -1,9 +1,9 @@
import base64
import json
import re
import time
from enum import Enum
from pathlib import Path
from threading import Event
import psycopg2
import psycopg2.errors
@@ -14,12 +14,11 @@ from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, PgProtocol, VanillaPostgres
from fixtures.pageserver.http import (
ImportPgdataIdemptencyKey,
PageserverApiException,
)
from fixtures.pg_version import PgVersion
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import MockS3Server, RemoteStorageKind
from fixtures.utils import shared_buffers_for_max_cu
from fixtures.utils import shared_buffers_for_max_cu, skip_in_debug_build, wait_until
from mypy_boto3_kms import KMSClient
from mypy_boto3_kms.type_defs import EncryptResponseTypeDef
from mypy_boto3_s3 import S3Client
@@ -44,6 +43,7 @@ smoke_params = [
]
@skip_in_debug_build("MULTIPLE_RELATION_SEGMENTS has non trivial amount of data")
@pytest.mark.parametrize("shard_count,stripe_size,rel_block_size", smoke_params)
def test_pgdata_import_smoke(
vanilla_pg: VanillaPostgres,
@@ -56,24 +56,29 @@ def test_pgdata_import_smoke(
#
# Setup fake control plane for import progress
#
import_completion_signaled = Event()
def handler(request: Request) -> Response:
log.info(f"control plane request: {request.json}")
log.info(f"control plane /import_complete request: {request.json}")
import_completion_signaled.set()
return Response(json.dumps({}), status=200)
cplane_mgmt_api_server = make_httpserver
cplane_mgmt_api_server.expect_request(re.compile(".*")).respond_with_handler(handler)
cplane_mgmt_api_server.expect_request(
"/storage/api/v1/import_complete", method="PUT"
).respond_with_handler(handler)
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
neon_env_builder.control_plane_hooks_api = (
f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/"
)
env = neon_env_builder.init_start()
# The test needs LocalFs support, which is only built in testing mode.
env.pageserver.is_testing_enabled_or_skip()
env.pageserver.patch_config_toml_nonrecursive(
{
"import_pgdata_upcall_api": f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/path/to/mgmt/api"
}
)
env.pageserver.stop()
env.pageserver.start()
@@ -193,40 +198,11 @@ def test_pgdata_import_smoke(
)
env.neon_cli.mappings_map_branch(import_branch_name, tenant_id, timeline_id)
while True:
locations = env.storage_controller.locate(tenant_id)
active_count = 0
for location in locations:
shard_id = TenantShardId.parse(location["shard_id"])
ps = env.get_pageserver(location["node_id"])
try:
detail = ps.http_client().timeline_detail(shard_id, timeline_id)
state = detail["state"]
log.info(f"shard {shard_id} state: {state}")
if state == "Active":
active_count += 1
except PageserverApiException as e:
if e.status_code == 404:
log.info("not found, import is in progress")
continue
elif e.status_code == 429:
log.info("import is in progress")
continue
else:
raise
def cplane_notified():
assert import_completion_signaled.is_set()
shard_status_file = statusdir / f"shard-{shard_id.shard_index}"
if state == "Active":
shard_status_file_contents = (
shard_status_file.read_text()
) # Active state implies import is done
shard_status = json.loads(shard_status_file_contents)
assert shard_status["done"] is True
if active_count == len(locations):
log.info("all shards are active")
break
time.sleep(1)
# Generous timeout for the MULTIPLE_RELATION_SEGMENTS test variants
wait_until(cplane_notified, timeout=90)
import_duration = time.monotonic() - start
log.info(f"import complete; duration={import_duration:.2f}s")
@@ -372,19 +348,27 @@ def test_fast_import_with_pageserver_ingest(
vanilla_pg.safe_psql("CREATE TABLE foo (a int); INSERT INTO foo SELECT generate_series(1, 10);")
# Setup pageserver and fake cplane for import progress
import_completion_signaled = Event()
def handler(request: Request) -> Response:
log.info(f"control plane request: {request.json}")
log.info(f"control plane /import_complete request: {request.json}")
import_completion_signaled.set()
return Response(json.dumps({}), status=200)
cplane_mgmt_api_server = make_httpserver
cplane_mgmt_api_server.expect_request(re.compile(".*")).respond_with_handler(handler)
cplane_mgmt_api_server.expect_request(
"/storage/api/v1/import_complete", method="PUT"
).respond_with_handler(handler)
neon_env_builder.control_plane_hooks_api = (
f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/"
)
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.MOCK_S3)
env = neon_env_builder.init_start()
env.pageserver.patch_config_toml_nonrecursive(
{
"import_pgdata_upcall_api": f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/path/to/mgmt/api",
# because import_pgdata code uses this endpoint, not the one in common remote storage config
# TODO: maybe use common remote_storage config in pageserver?
"import_pgdata_aws_endpoint_url": env.s3_mock_server.endpoint(),
@@ -476,42 +460,10 @@ def test_fast_import_with_pageserver_ingest(
conn = PgProtocol(dsn=f"postgresql://cloud_admin@localhost:{pg_port}/neondb")
validate_vanilla_equivalence(conn)
# Poll pageserver statuses in s3
while True:
locations = env.storage_controller.locate(tenant_id)
active_count = 0
for location in locations:
shard_id = TenantShardId.parse(location["shard_id"])
ps = env.get_pageserver(location["node_id"])
try:
detail = ps.http_client().timeline_detail(shard_id, timeline_id)
log.info(f"timeline {tenant_id}/{timeline_id} detail: {detail}")
state = detail["state"]
log.info(f"shard {shard_id} state: {state}")
if state == "Active":
active_count += 1
except PageserverApiException as e:
if e.status_code == 404:
log.info("not found, import is in progress")
continue
elif e.status_code == 429:
log.info("import is in progress")
continue
else:
raise
def cplane_notified():
assert import_completion_signaled.is_set()
if state == "Active":
key = f"{key_prefix}/status/shard-{shard_id.shard_index}"
shard_status_file_contents = (
mock_s3_client.get_object(Bucket=bucket, Key=key)["Body"].read().decode("utf-8")
)
shard_status = json.loads(shard_status_file_contents)
assert shard_status["done"] is True
if active_count == len(locations):
log.info("all shards are active")
break
time.sleep(0.5)
wait_until(cplane_notified, timeout=60)
import_duration = time.monotonic() - start
log.info(f"import complete; duration={import_duration:.2f}s")

View File

@@ -138,7 +138,7 @@ def test_cli_start_stop(neon_env_builder: NeonEnvBuilder):
env.neon_cli.pageserver_stop(env.pageserver.id)
env.neon_cli.safekeeper_stop()
env.neon_cli.storage_controller_stop(False)
env.neon_cli.object_storage_stop(False)
env.neon_cli.endpoint_storage_stop(False)
env.neon_cli.storage_broker_stop()
# Keep NeonEnv state up to date, it usually owns starting/stopping services
@@ -185,7 +185,7 @@ def test_cli_start_stop_multi(neon_env_builder: NeonEnvBuilder):
env.neon_cli.safekeeper_stop(neon_env_builder.safekeepers_id_start + 1)
env.neon_cli.safekeeper_stop(neon_env_builder.safekeepers_id_start + 2)
env.neon_cli.object_storage_stop(False)
env.neon_cli.endpoint_storage_stop(False)
# Stop this to get out of the way of the following `start`
env.neon_cli.storage_controller_stop(False)

View File

@@ -95,7 +95,7 @@ def test_storage_controller_smoke(
env.pageservers[1].start()
for sk in env.safekeepers:
sk.start()
env.object_storage.start()
env.endpoint_storage.start()
# The pageservers we started should have registered with the sharding service on startup
nodes = env.storage_controller.node_list()
@@ -347,7 +347,7 @@ def prepare_onboarding_env(
env = neon_env_builder.init_configs()
env.broker.start()
env.storage_controller.start()
env.object_storage.start()
env.endpoint_storage.start()
# This is the pageserver where we'll initially create the tenant. Run it in emergency
# mode so that it doesn't talk to storage controller, and do not register it.
@@ -1612,16 +1612,18 @@ def test_storage_controller_heartbeats(
env = neon_env_builder.init_configs()
env.start()
# Default log allow list permits connection errors, but this test will use error responses on
# the utilization endpoint.
env.storage_controller.allowed_errors.append(
".*Call to node.*management API.*failed.*failpoint.*"
)
# The server starts listening to the socket before sending re-attach request,
# but it starts serving HTTP only when re-attach is completed.
# If re-attach is slow (last scenario), storcon's heartbeat requests will time out.
env.storage_controller.allowed_errors.append(
".*Call to node.*management API.*failed.* Timeout.*"
env.storage_controller.allowed_errors.extend(
[
# Default log allow list permits connection errors, but this test will use error responses on
# the utilization endpoint.
".*Call to node.*management API.*failed.*failpoint.*",
# The server starts listening to the socket before sending re-attach request,
# but it starts serving HTTP only when re-attach is completed.
# If re-attach is slow (last scenario), storcon's heartbeat requests will time out.
".*Call to node.*management API.*failed.* Timeout.*",
# We will intentionally cause reconcile errors
".*Reconcile error.*",
]
)
# Initially we have two online pageservers
@@ -4240,6 +4242,63 @@ def test_storcon_create_delete_sk_down(
wait_until(timeline_deleted_on_sk)
@run_only_on_default_postgres("PG version is not interesting here")
@pytest.mark.parametrize("num_safekeepers", [1, 2, 3])
@pytest.mark.parametrize("deletetion_subject", [DeletionSubject.TENANT, DeletionSubject.TIMELINE])
def test_storcon_few_sk(
neon_env_builder: NeonEnvBuilder,
num_safekeepers: int,
deletetion_subject: DeletionSubject,
):
"""
Test that the storcon can create and delete tenants and timelines with a limited/special number of safekeepers
- num_safekeepers: number of safekeepers.
- deletion_subject: test that both single timeline and whole tenant deletion work.
"""
neon_env_builder.num_safekeepers = num_safekeepers
safekeeper_list = list(range(1, num_safekeepers + 1))
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": True,
}
env = neon_env_builder.init_start()
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
env.create_tenant(tenant_id, timeline_id)
child_timeline_id = env.create_branch("child_of_main", tenant_id)
env.safekeepers[0].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}")
config_lines = [
"neon.safekeeper_proto_version = 3",
]
with env.endpoints.create("main", tenant_id=tenant_id, config_lines=config_lines) as ep:
# endpoint should start.
ep.start(safekeeper_generation=1, safekeepers=safekeeper_list)
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
with env.endpoints.create(
"child_of_main", tenant_id=tenant_id, config_lines=config_lines
) as ep:
# endpoint should start.
ep.start(safekeeper_generation=1, safekeepers=safekeeper_list)
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
if deletetion_subject is DeletionSubject.TENANT:
env.storage_controller.pageserver_api().tenant_delete(tenant_id)
else:
env.storage_controller.pageserver_api().timeline_delete(tenant_id, child_timeline_id)
# ensure that there is log msgs for the third safekeeper too
def timeline_deleted_on_sk():
env.safekeepers[0].assert_log_contains(
f"deleting timeline {tenant_id}/{child_timeline_id} from disk"
)
wait_until(timeline_deleted_on_sk)
@pytest.mark.parametrize("wrong_az", [True, False])
def test_storage_controller_graceful_migration(neon_env_builder: NeonEnvBuilder, wrong_az: bool):
"""

View File

@@ -77,6 +77,8 @@ regex-automata = { version = "0.4", default-features = false, features = ["dfa-o
regex-syntax = { version = "0.8" }
reqwest = { version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls", "rustls-tls-native-roots", "stream"] }
rustls = { version = "0.23", default-features = false, features = ["logging", "ring", "std", "tls12"] }
rustls-pki-types = { version = "1", features = ["std"] }
rustls-webpki = { version = "0.102", default-features = false, features = ["ring", "std"] }
scopeguard = { version = "1" }
sec1 = { version = "0.7", features = ["pem", "serde", "std", "subtle"] }
serde = { version = "1", features = ["alloc", "derive"] }
@@ -103,7 +105,6 @@ tracing-core = { version = "0.1" }
tracing-log = { version = "0.2" }
url = { version = "2", features = ["serde"] }
uuid = { version = "1", features = ["serde", "v4", "v7"] }
zerocopy = { version = "0.7", features = ["derive", "simd"] }
zeroize = { version = "1", features = ["derive", "serde"] }
zstd = { version = "0.13" }
zstd-safe = { version = "7", default-features = false, features = ["arrays", "legacy", "std", "zdict_builder"] }
@@ -146,7 +147,6 @@ serde = { version = "1", features = ["alloc", "derive"] }
syn = { version = "2", features = ["extra-traits", "fold", "full", "visit", "visit-mut"] }
time-macros = { version = "0.2", default-features = false, features = ["formatting", "parsing", "serde"] }
toml_edit = { version = "0.22", features = ["serde"] }
zerocopy = { version = "0.7", features = ["derive", "simd"] }
zstd = { version = "0.13" }
zstd-safe = { version = "7", default-features = false, features = ["arrays", "legacy", "std", "zdict_builder"] }
zstd-sys = { version = "2", default-features = false, features = ["legacy", "std", "zdict_builder"] }