Compare commits

..

16 Commits

Author SHA1 Message Date
John Spray
8f943d9c1f DNM: disable rerun_failed 2025-04-25 15:01:43 +02:00
John Spray
9b0d02e61d tests: fail tests which write too much data 2025-04-25 15:01:43 +02:00
Vlad Lazar
6f0046b688 storage_controller: ensure mutual exclusion for imports and shard splits (#11632)
## Problem

Shard splits break timeline imports.

## Summary of Changes

Ensure mutual exclusion for imports and shard splits.

On the shard split code path:
1. Right before shard splitting, check the database to ensure that
no-import is on-going for the tenant. Exclusion is guaranteed because
this validation is done while holding the exclusive tenant lock.
Timeline creation (and import creation implicitly) requires a shared
tenant lock.
2. When selecting a shard to split, use the in-mem state to exclude
shards with an on-going import. This is opportunistic since an import
might start after the check, but allows shard splits to make progres
instead of continously retrying to split the same shard.

On the timeline creation code path:
1. Check the in-memory splitting flag on all shards of the tenant. If
any of them are splitting, error out asking the client to retry. On the
happy path this is not required, due to the tenant lock set-up described
above, but it covers the case where we restart with a pending
shard-split.

Closes https://github.com/neondatabase/neon/issues/11567
2025-04-25 11:46:15 +00:00
Em Sharnoff
2b0248cd76 fix(proxy): s/Console/Control plane/ in cplane error (#11716)
I got bamboozled by the error message while debugging, seems no
objections to updating it.

ref https://neondb.slack.com/archives/C060N3SEF9D/p1745570961111509

ref https://neondb.slack.com/archives/C039YKBRZB4/p1745570811957019?thread_ts=1745393368.283599
2025-04-25 11:09:56 +00:00
Fedor Dikarev
7b03216dca CI(check-macos-build): use gh native cache (#11707)
## Problem
- using Hetzner buckets for cache requires secrets, we either need
`secrets: inherit` to make it works
- we don't have self-hosted MacOs runners, so actually GH native cache
is more optimal solution there

## Summary of changes
- switch to GH native cache for macos builds
2025-04-25 09:18:20 +00:00
a-masterov
992aa91075 Refresh the codestyle of docker compose test script (#11715)
## Problem
The docker compose test script (`docker_compose_test.sh`) had
inconsistent codestyle, mixing legacy syntax with modern approaches and
not following best practices at all. This inconsistency could lead to
potential issues with variable expansion, path handling, and
maintainability.
## Summary of changes
This PR modernizes the test script with several codestyle improvements:
* Variable scoping and exports:
  * Added proper export declarations for environment variables
  * Added explicit COMPOSE_PROFILES export to avoid repetitive flags
* Modern Bash syntax:
  * Replaced [ ] with [[ ]] for safer conditional testing
  * Used arithmetic operations (( cnt += 3 )) instead of expr
  * Added proper variable expansion with braces ${variable}
  * Added proper quoting around variables and paths with "${variable}"
* Docker Compose commands:
  * Replaced hardcoded container names with service names
  * Used docker compose exec instead of docker exec $CONTAINER_NAME
  * Removed repetitive flags by using environment variables
* Shell script best practices:
  * Added function keyword before function definition
  * Used safer path handling with "$(dirname "${0}")"
These changes make the script more maintainable, less error-prone, and
more consistent with modern shell scripting standards.
2025-04-25 09:13:35 +00:00
Conrad Ludgate
afe9b27983 fix(compute/tls): support for checking certificate chains (#11683)
## Problem

It seems are production-ready cert-manager setup now includes a full
certificate chain. This was not accounted for and the decoder would
error.

## Summary of changes

Change the way we decode certificates to support cert-chains, ignoring
all but the first cert.

This also changes a log line to not use multi-line errors.

~~I have tested this code manually against real certificates/keys, I
didn't want to embed those in a test just yet, not until the cert
expires in 24 hours.~~
2025-04-25 09:09:14 +00:00
Alex Chi Z.
5d91d4e843 fix(pageserver): reduce gc-compaction memory usage (#11709)
## Problem

close https://github.com/neondatabase/neon/issues/11694

We had the delta layer iterator and image layer iterator set to buffer
at most 8MB data. Note that 8MB is the compressed size, so it is
possible for those iterators contain more than 8MB data in memory.

For the recent OOM case, gc-compaction was running over 556 layers,
which means that we will have 556 active iterators. So in theory, it
could take up to 556*8=4448MB memory when the compaction is going on. If
images get compressed and the compression ratio is high (for that
tenant, we see 3x compression ratio across image layers), then that's
13344MB memory.

Also we have layer rewrites, which explains the memory taken by
gc-compaction itself (versus the iterators). We rewrite 424 out of 556
layers, and each of such rewrites need a pair of delta layer writer. So
we are buffering a lot of deltas in the memory.

The flamegraph shows that gc-compaction itself takes 6GB memory, delta
iterator 7GB, and image iterator 2GB, which can be explained by the
above theory.

## Summary of changes

- Reduce the buffer sizes.
- Estimate memory consumption and if it is too high.
- Also give up if the number of layers-to-rewrite is too high.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-25 08:45:31 +00:00
Alexander Bayandin
2465e9141f test_runner: bump httpcore to 1.0.9 and h11 to 0.16.0 (#11711)
## Problem

https://github.com/advisories/GHSA-vqfr-h8mv-ghfj

## Summary of changes
- Bump `h11` to 0.16.0 (required to bump `httpcore` to 1.0.9)
2025-04-25 08:44:40 +00:00
Tristan Partin
2526f6aea1 Add remote extension test with library component (#11301)
The current test was just SQL files only, but we also want to test a
remote extension which includes a loadable library. With both extensions
we should cover a larger portion of compute_ctl's remote extension code
paths.

Fixes: https://github.com/neondatabase/neon/issues/11146

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-04-24 22:33:46 +00:00
Vlad Lazar
5ba7315c84 storage_controller: reconcile completed imports at start-up (#11614)
## Problem

In https://github.com/neondatabase/neon/pull/11345 coordination of
imports moved to the storage controller.
It involves notifying cplane when the import has been completed by
calling an idempotent endpoint.

If the storage controller shuts down in the middle of finalizing an
import, it would never be retried.

## Summary of changes

Reconcile imports at start-up by fetching the complete imports from the
database and spawning a background
task which notifies cplane.

Closes: https://github.com/neondatabase/neon/issues/11570
2025-04-24 18:39:19 +00:00
Vlad Lazar
6f7e3c18e4 storage_controller: make leadership protocol more robust (#11703)
## Problem

We saw the following scenario in staging:
1. Pod A starts up. Becomes leader and steps down the previous pod
cleanly.
2. Pod B starts up (deployment).
3. Step down request from pod B to pod A times out. Pod A did not manage
to stop its reconciliations within 10 seconds and exited with return
code 1
([code](7ba8519b43/storage_controller/src/service.rs (L8686-L8702))).
4. Pod B marks itself as the leader and finishes start-up
5. k8s restarts pod A
6. k8s marks pod B as ready
7. pod A sends step down request to pod A - this succeeds => pod A is
now the leader
8. k8s kills pod A because it thinks pod B is healthy and pod A is part
of the old replica set

We end up in a situation where the only pod we have (B) is stepped down
and attempts to forward requests to a leader that doesn't exist. k8s
can't detect that pod B is in a bad state since the /status endpoint
simply returns 200 hundred if the pod is running.

## Summary of changes

This PR includes a number of robustness improvements to the leadership
protocol:
* use a single step down task per controller
* add a new endpoint to be used as k8s liveness probe and check
leadership status there
* handle restarts explicitly (i.e. don't step yourself down)
* increase the step down retry count
* don't kill the process on long step down since k8s will just restart
it
2025-04-24 16:59:56 +00:00
Christian Schwarz
8afb783708 feat: Direct IO for the pageserver write path (#11558)
# Problem

The Pageserver read path exclusively uses direct IO if
`virtual_file_io_mode=direct`.

The write path is half-finished. Here is what the various writing
components use:

|what|buffering|flags on <br/>`v_f_io_mode`<br/>=`buffered`|flags on
<br/>`virtual_file_io_mode`<br/>=`direct`|
|-|-|-|-|
|`DeltaLayerWriter`| BlobWriter<BUFFERED=true> | () | () |
|`ImageLayerWriter`| BlobWriter<BUFFERED=false> | () | () |
|`download_layer_file`|BufferedWriter|()|()|
|`InMemoryLayer`|BufferedWriter|()|O_DIRECT|


The vehicle towards direct IO support is `BufferedWriter` which
- largely takes care of O_DIRECT alignment & size-multiple requirements 
- double-buffering to mask latency

`DeltaLayerWriter`, `ImageLayerWriter` use `blob_io::BlobWriter` , which
has neither of these.

# Changes

## High-Level

At a high-level this PR makes the following primary changes:

- switch the two layer writer types to use `BufferedWriter` & make
sensitive to `virtual_file_io_mode` (via open_with_options_**v2**)
- make `download_layer_file` sensitive to `virtual_file_io_mode` (also
via open_with_options_**v2**)
- add `virtual_file_io_mode=direct-rw` as a feature gate
- we're hackish-ly piggybacking on OpenOptions's ask for write access
here
- this means with just `=direct` InMemoryLayer reads and writes no
longer uses O_DIRECT
- this is transitory and we'll remove the `direct-rw` variant once the
rollout is complete

(The `_v2` APIs for opening / creating VirtualFile are those that are
sensitive to `virtual_file_io_mode`)

The result is:

|what|uses <br/>`BufferedWriter`|flags on
<br/>`v_f_io_mode`<br/>=`buffered`|flags on
<br/>`v_f_io_mode`<br/>=`direct`|flags on
<br/>`v_f_io_mode`<br/>=`direct-rw`|
|-|-|-|-|-|
|`DeltaLayerWriter`| ~~Blob~~BufferedWriter | () | () |  O_DIRECT |
|`ImageLayerWriter`| ~~Blob~~BufferedWriter | () | () |  O_DIRECT |
|`download_layer_file`|BufferedWriter|()|()|O_DIRECT|
|`InMemoryLayer`|BufferedWriter|()|~~O_DIRECT~~()|O_DIRECT|


## Code-Level


The main change is:
- Switch `blob_io::BlobWriter` away from its own buffering method to use
`BufferedWriter`.

Additional prep for upholding `O_DIRECT` requirements:
- Layer writer `finish()` methods switched to use IoBufferMut for
guaranteed buffer address alignment. The size of the buffers is PAGE_SZ
and thereby implicitly assumed to fulfill O_DIRECT requirements.

For the hacky feature-gating via `=direct-rw`:
- Track `OpenOptions::write(true|false)` in a field; bunch of mechanical
churn.
- Consolidate the APIs in which we "open" or "create" VirtualFile for
better overview over which parts of the code use the `_v2` APIs.

Necessary refactorings & infra work:
- Add doc comments explaining how BufferedWriter ensures that writes are
compliant with O_DIRECT alignment & size constraints. This isn't new,
but should be spelled out.
- Add the concept of shutdown modes to `BufferedWriter::shutdown` to
make writer shutdown adhere to these constraints.
- The `PadThenTruncate` mode might not be necessary in practice because
I believe all layer files ever written are sized in multiples `PAGE_SZ`
and since `PAGE_SZ` is larger than the current alignment requirements
(512/4k depending on platform), it won't be necesary to pad.
- Some test (I believe `round_trip_test_compressed`?) required it though
- [ ] TODO: decide if we want to accept that complexity; if we do then
address TODO in the code to separate alignment requirement from buffer
capacity
- Add `set_len` (=`ftruncate`) VirtualFile operation to support the
above.
- Allow `BufferedWriter` to start at a non-zero offset (to make room for
the summary block).

Cleanups unlocked by this change:
- Remove non-positional APIs from VirtualFile (e.g. seek, write_full,
read_full)

Drive-by fixes:
- PR https://github.com/neondatabase/neon/pull/11585 aimed to run unit
tests for all `virtual_file_io_mode` combinations but didn't because of
a missing `_` in the env var.

# Performance

This section assesses this PR's impact on deployments with current
production setting (`=direct`) and anticipated impact of switching to
(`=direct-rw`).

For `DeltaLayerWriter`, `=direct` should remain unchanged to slightly
improved on throughput because the `BlobWriter`'s buffer had the same
size as the `BufferedWriter`'s buffer, but it didn't have the
double-buffering that `BufferedWriter` has.
The `=direct-rw` enables direct IO; throughput should not be suffering
because of double-buffering; benchmarks will show if this is true.

The `ImageLayerWriter` was previously not doing any buffering
(`BUFFERED=false`).
It went straight to issuing the IO operation to the underlying
VirtualFile and the buffering was done by the kernel.
The switch to `BufferedWriter` under `=direct` adds an additional memcpy
into the BufferedWriter's buffer.
We will win back that memcpy when enabling direct IO via `=direct-rw`.

A nice win from the switch to `BufferedWriter` is that ImageLayerWriter
performs >=16x fewer write operations to VirtualFile (the BlobWriter
performs one write per len field and one write per image value).
This should save low tens of microseconds of CPU overhead from doing all
these syscalls/io_uring operations, regardless of `=direct` or
`=direct-rw`.
Aside from problems with alignment, this write frequency without
double-buffering is prohibitive if we actually have to wait for the
disk, which is what will happen when we enable direct IO via
(`=direct-rw`).
Throughput should not be suffering because of BufferedWrite's
double-buffering; benchmarks will show if this is true.

`InMemoryLayer` at `=direct` will flip back to using buffered IO but
remain on BufferedWriter.
The buffered IO adds back one memcpy of CPU overhead.
Throughput should not suffer and will might improve on
not-memory-pressured Pageservers but let's remember that we're doing the
whole direct IO thing to eliminate global memory pressure as a source of
perf variability.

## bench_ingest

I reran `bench_ingest` on `im4gn.2xlarge` and `Hetzner AX102`.
Use `git diff` with `--word-diff` or similar to see the change.

General guidance on interpretation:
- immediate production impact of this PR without production config
change can be gauged by comparing the same `io_mode=Direct`
- end state of production switched over to `io_mode=DirectRw` can be
gauged by comparing old results' `io_mode=Direct` to new results'
`io_mode=DirectRw`

Given above guidance, on `im4gn.2xlarge`
- immediate impact is a significant improvement in all cases
- end state after switching has same significant improvements in all
cases
- ... except `ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=8192
key_layout=Sequential write_delta=Yes` which only achieves `238 MiB/s`
instead of `253.43 MiB/s`
  - this is a 6% degradation
  - this workload is typical for image layer creation

# Refs
- epic https://github.com/neondatabase/neon/issues/9868
- stacked atop
  - preliminary refactor https://github.com/neondatabase/neon/pull/11549
- bench_ingest overhaul https://github.com/neondatabase/neon/pull/11667
- derived from https://github.com/neondatabase/neon/pull/10063

Co-authored-by: Yuchen Liang <yuchen@neon.tech>
2025-04-24 14:57:36 +00:00
Konstantin Knizhnik
1531712555 Undo commit d1728a6bcd because it causes problems with creating pg_search extension (#11700)
## Problem

See https://neondb.slack.com/archives/C03H1K0PGKH/p1745489241982209

pg_search extension now can not be created.

## Summary of changes

Undo d1728a6bcd

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-04-24 14:46:10 +00:00
Alexander Bayandin
5e989a3148 CI(build-tools): bump packages in build-tools image (#11697)
## Problem

`cargo-deny` 0.16.2 spits a bunch of warnings like:
```
warning[index-failure]: unable to check for yanked crates
```

The issue is fixed for the latest version of `cargo-deny` (0.18.2). And
while we're here, let's bump all the packages we have in `build-tools`
image

## Summary of changes
- bump cargo-hakari to 0.9.36
- bump cargo-deny to 0.18.2
- bump cargo-hack to 0.6.36
- bump cargo-nextest to 0.9.94
- bump diesel_cli to 2.2.9
- bump s5cmd to 2.3.0
- bump mold to 2.37.1
- bump python to 3.11.12
2025-04-24 14:13:04 +00:00
Alexey Kondratov
985056be37 feat(compute): Introduce Postgres downtime metrics (#11346)
## Problem

Currently, we only report the timestamp of the last moment we think
Postgres was active. The problem is that if Postgres gets completely
unresponsive, we still report some old timestamp, and it's impossible to
distinguish situations 'Postgres is effectively down' and 'Postgres is
running, but no client activity'.

## Summary of changes

Refactor the `compute_ctl`'s compute monitor so that it was easier to
track the connection errors and failed activity checks, and report
- `now() - last_successful_check` as current downtime on any failure
- cumulative Postgres downtime during the whole compute lifetime

After adding a test, I also noticed that the compute monitor may not
reconnect even though queries fail with `connection closed` or `error
communicating with the server: Connection reset by peer (os error 54)`,
but for some reason we do not catch it with `client.is_closed()`, so I
added an explicit reconnect in case of any failures.

Discussion:
https://neondb.slack.com/archives/C03TN5G758R/p1742489426966639
2025-04-24 13:51:09 +00:00
62 changed files with 2575 additions and 1247 deletions

View File

@@ -275,7 +275,7 @@ jobs:
for io_mode in buffered direct direct-rw ; do
NEON_PAGESERVER_UNIT_TEST_GET_VECTORED_CONCURRENT_IO=$get_vectored_concurrent_io \
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine \
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOMODE=$io_mode \
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IO_MODE=$io_mode \
${cov_prefix} \
cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(pageserver)'
done
@@ -381,7 +381,7 @@ jobs:
run_with_real_s3: true
real_s3_bucket: neon-github-ci-tests
real_s3_region: eu-central-1
rerun_failed: true
rerun_failed: false
pg_version: ${{ matrix.pg_version }}
sanitizers: ${{ inputs.sanitizers }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
@@ -395,7 +395,7 @@ jobs:
BUILD_TAG: ${{ inputs.build-tag }}
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
PAGESERVER_GET_VECTORED_CONCURRENT_IO: sidecar-task
PAGESERVER_VIRTUAL_FILE_IO_MODE: direct
PAGESERVER_VIRTUAL_FILE_IO_MODE: direct-rw
USE_LFC: ${{ matrix.lfc_state == 'with-lfc' && 'true' || 'false' }}
# Temporary disable this step until we figure out why it's so flaky

View File

@@ -63,13 +63,8 @@ jobs:
- name: Cache postgres ${{ matrix.postgres-version }} build
id: cache_pg
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/${{ matrix.postgres-version }}
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ matrix.postgres-version }}-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
@@ -134,25 +129,15 @@ jobs:
- name: Cache postgres v17 build
id: cache_pg
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v17
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v17-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache walproposer-lib
id: cache_walproposer_lib
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/build/walproposer-lib
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-walproposer_lib-v17-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
@@ -218,57 +203,32 @@ jobs:
- name: Cache postgres v14 build
id: cache_pg
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v14
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v14-${{ steps.pg_rev_v14.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache postgres v15 build
id: cache_pg_v15
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v15
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v15-${{ steps.pg_rev_v15.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache postgres v16 build
id: cache_pg_v16
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v16
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v16-${{ steps.pg_rev_v16.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache postgres v17 build
id: cache_pg_v17
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v17
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v17-${{ steps.pg_rev_v17.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache cargo deps (only for v17)
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: |
~/.cargo/registry
!~/.cargo/registry/src
@@ -278,13 +238,8 @@ jobs:
- name: Cache walproposer-lib
id: cache_walproposer_lib
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/build/walproposer-lib
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-walproposer_lib-v17-${{ steps.pg_rev_v17.outputs.pg_rev }}-${{ hashFiles('Makefile') }}

View File

@@ -324,7 +324,7 @@ jobs:
TEST_RESULT_CONNSTR: "${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}"
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
PAGESERVER_GET_VECTORED_CONCURRENT_IO: sidecar-task
PAGESERVER_VIRTUAL_FILE_IO_MODE: direct
PAGESERVER_VIRTUAL_FILE_IO_MODE: direct-rw
SYNC_BETWEEN_TESTS: true
# XXX: no coverage data handling here, since benchmarks are run on release builds,
# while coverage is currently collected for the debug ones

2
Cargo.lock generated
View File

@@ -1323,7 +1323,6 @@ dependencies = [
"serde_json",
"serde_with",
"signal-hook",
"spki 0.7.3",
"tar",
"thiserror 1.0.69",
"tokio",
@@ -4302,6 +4301,7 @@ dependencies = [
"remote_storage",
"reqwest",
"rpds",
"rstest",
"rustls 0.23.18",
"scopeguard",
"send-future",

View File

@@ -173,7 +173,7 @@ RUN curl -fsSL "https://github.com/protocolbuffers/protobuf/releases/download/v$
&& rm -rf protoc.zip protoc
# s5cmd
ENV S5CMD_VERSION=2.2.2
ENV S5CMD_VERSION=2.3.0
RUN curl -sL "https://github.com/peak/s5cmd/releases/download/v${S5CMD_VERSION}/s5cmd_${S5CMD_VERSION}_Linux-$(uname -m | sed 's/x86_64/64bit/g' | sed 's/aarch64/arm64/g').tar.gz" | tar zxvf - s5cmd \
&& chmod +x s5cmd \
&& mv s5cmd /usr/local/bin/s5cmd
@@ -206,7 +206,7 @@ RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-$(uname -m).zip" -o "aws
&& rm awscliv2.zip
# Mold: A Modern Linker
ENV MOLD_VERSION=v2.34.1
ENV MOLD_VERSION=v2.37.1
RUN set -e \
&& git clone https://github.com/rui314/mold.git \
&& mkdir mold/build \
@@ -268,7 +268,7 @@ WORKDIR /home/nonroot
RUN echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /home/nonroot/.curlrc
# Python
ENV PYTHON_VERSION=3.11.10 \
ENV PYTHON_VERSION=3.11.12 \
PYENV_ROOT=/home/nonroot/.pyenv \
PATH=/home/nonroot/.pyenv/shims:/home/nonroot/.pyenv/bin:/home/nonroot/.poetry/bin:$PATH
RUN set -e \
@@ -296,12 +296,12 @@ ENV RUSTC_VERSION=1.86.0
ENV RUSTUP_HOME="/home/nonroot/.rustup"
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
ARG RUSTFILT_VERSION=0.2.1
ARG CARGO_HAKARI_VERSION=0.9.33
ARG CARGO_DENY_VERSION=0.16.2
ARG CARGO_HACK_VERSION=0.6.33
ARG CARGO_NEXTEST_VERSION=0.9.85
ARG CARGO_HAKARI_VERSION=0.9.36
ARG CARGO_DENY_VERSION=0.18.2
ARG CARGO_HACK_VERSION=0.6.36
ARG CARGO_NEXTEST_VERSION=0.9.94
ARG CARGO_CHEF_VERSION=0.1.71
ARG CARGO_DIESEL_CLI_VERSION=2.2.6
ARG CARGO_DIESEL_CLI_VERSION=2.2.9
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && whoami && \
chmod +x rustup-init && \
./rustup-init -y --default-toolchain ${RUSTC_VERSION} && \

View File

@@ -44,7 +44,6 @@ serde.workspace = true
serde_with.workspace = true
serde_json.workspace = true
signal-hook.workspace = true
spki = { version = "0.7.3", features = ["std"] }
tar.workspace = true
tower.workspace = true
tower-http.workspace = true

View File

@@ -57,13 +57,24 @@ use tracing::{error, info};
use url::Url;
use utils::failpoint_support;
// Compatibility hack: if the control plane specified any remote-ext-config
// use the default value for extension storage proxy gateway.
// Remove this once the control plane is updated to pass the gateway URL
fn parse_remote_ext_config(arg: &str) -> Result<String> {
if arg.starts_with("http") {
Ok(arg.trim_end_matches('/').to_string())
} else {
Ok("http://pg-ext-s3-gateway".to_string())
}
}
#[derive(Parser)]
#[command(rename_all = "kebab-case")]
struct Cli {
#[arg(short = 'b', long, default_value = "postgres", env = "POSTGRES_PATH")]
pub pgbin: String,
#[arg(short = 'r', long)]
#[arg(short = 'r', long, value_parser = parse_remote_ext_config)]
pub remote_ext_config: Option<String>,
/// The port to bind the external listening HTTP server to. Clients running

View File

@@ -1,8 +1,8 @@
use metrics::core::{AtomicF64, Collector, GenericGauge};
use metrics::core::{AtomicF64, AtomicU64, Collector, GenericCounter, GenericGauge};
use metrics::proto::MetricFamily;
use metrics::{
IntCounterVec, IntGaugeVec, UIntGaugeVec, register_gauge, register_int_counter_vec,
register_int_gauge_vec, register_uint_gauge_vec,
IntCounterVec, IntGaugeVec, UIntGaugeVec, register_gauge, register_int_counter,
register_int_counter_vec, register_int_gauge_vec, register_uint_gauge_vec,
};
use once_cell::sync::Lazy;
@@ -81,6 +81,22 @@ pub(crate) static COMPUTE_CTL_UP: Lazy<IntGaugeVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub(crate) static PG_CURR_DOWNTIME_MS: Lazy<GenericGauge<AtomicF64>> = Lazy::new(|| {
register_gauge!(
"compute_pg_current_downtime_ms",
"Non-cumulative duration of Postgres downtime in ms; resets after successful check",
)
.expect("failed to define a metric")
});
pub(crate) static PG_TOTAL_DOWNTIME_MS: Lazy<GenericCounter<AtomicU64>> = Lazy::new(|| {
register_int_counter!(
"compute_pg_downtime_ms_total",
"Cumulative duration of Postgres downtime in ms",
)
.expect("failed to define a metric")
});
pub fn collect() -> Vec<MetricFamily> {
let mut metrics = COMPUTE_CTL_UP.collect();
metrics.extend(INSTALLED_EXTENSIONS.collect());
@@ -88,5 +104,7 @@ pub fn collect() -> Vec<MetricFamily> {
metrics.extend(REMOTE_EXT_REQUESTS_TOTAL.collect());
metrics.extend(DB_MIGRATION_FAILED.collect());
metrics.extend(AUDIT_LOG_DIR_SIZE.collect());
metrics.extend(PG_CURR_DOWNTIME_MS.collect());
metrics.extend(PG_TOTAL_DOWNTIME_MS.collect());
metrics
}

View File

@@ -6,197 +6,294 @@ use chrono::{DateTime, Utc};
use compute_api::responses::ComputeStatus;
use compute_api::spec::ComputeFeature;
use postgres::{Client, NoTls};
use tracing::{debug, error, info, warn};
use tracing::{Level, error, info, instrument, span};
use crate::compute::ComputeNode;
use crate::metrics::{PG_CURR_DOWNTIME_MS, PG_TOTAL_DOWNTIME_MS};
const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
// Spin in a loop and figure out the last activity time in the Postgres.
// Then update it in the shared state. This function never errors out.
// NB: the only expected panic is at `Mutex` unwrap(), all other errors
// should be handled gracefully.
fn watch_compute_activity(compute: &ComputeNode) {
// Suppose that `connstr` doesn't change
let connstr = compute.params.connstr.clone();
let conf = compute.get_conn_conf(Some("compute_ctl:activity_monitor"));
struct ComputeMonitor {
compute: Arc<ComputeNode>,
// During startup and configuration we connect to every Postgres database,
// but we don't want to count this as some user activity. So wait until
// the compute fully started before monitoring activity.
wait_for_postgres_start(compute);
/// The moment when Postgres had some activity,
/// that should prevent compute from being suspended.
last_active: Option<DateTime<Utc>>,
// Define `client` outside of the loop to reuse existing connection if it's active.
let mut client = conf.connect(NoTls);
/// The moment when we last tried to check Postgres.
last_checked: DateTime<Utc>,
/// The last moment we did a successful Postgres check.
last_up: DateTime<Utc>,
let mut sleep = false;
let mut prev_active_time: Option<f64> = None;
let mut prev_sessions: Option<i64> = None;
/// Only used for internal statistics change tracking
/// between monitor runs and can be outdated.
active_time: Option<f64>,
/// Only used for internal statistics change tracking
/// between monitor runs and can be outdated.
sessions: Option<i64>,
if compute.has_feature(ComputeFeature::ActivityMonitorExperimental) {
info!("starting experimental activity monitor for {}", connstr);
} else {
info!("starting activity monitor for {}", connstr);
/// Use experimental statistics-based activity monitor. It's no longer
/// 'experimental' per se, as it's enabled for everyone, but we still
/// keep the flag as an option to turn it off in some cases if it will
/// misbehave.
experimental: bool,
}
impl ComputeMonitor {
fn report_down(&self) {
let now = Utc::now();
// Calculate and report current downtime
// (since the last time Postgres was up)
let downtime = now.signed_duration_since(self.last_up);
PG_CURR_DOWNTIME_MS.set(downtime.num_milliseconds() as f64);
// Calculate and update total downtime
// (cumulative duration of Postgres downtime in ms)
let inc = now
.signed_duration_since(self.last_checked)
.num_milliseconds();
PG_TOTAL_DOWNTIME_MS.inc_by(inc as u64);
}
loop {
// We use `continue` a lot, so it's more convenient to sleep at the top of the loop.
// But skip the first sleep, so we can connect to Postgres immediately.
if sleep {
// Should be outside of the mutex lock to allow others to read while we sleep.
thread::sleep(MONITOR_CHECK_INTERVAL);
} else {
sleep = true;
}
fn report_up(&mut self) {
self.last_up = Utc::now();
PG_CURR_DOWNTIME_MS.set(0.0);
}
match &mut client {
Ok(cli) => {
if cli.is_closed() {
info!("connection to Postgres is closed, trying to reconnect");
fn downtime_info(&self) -> String {
format!(
"total_ms: {}, current_ms: {}, last_up: {}",
PG_TOTAL_DOWNTIME_MS.get(),
PG_CURR_DOWNTIME_MS.get(),
self.last_up
)
}
// Connection is closed, reconnect and try again.
client = conf.connect(NoTls);
continue;
}
/// Spin in a loop and figure out the last activity time in the Postgres.
/// Then update it in the shared state. This function never errors out.
/// NB: the only expected panic is at `Mutex` unwrap(), all other errors
/// should be handled gracefully.
#[instrument(skip_all)]
pub fn run(&mut self) {
// Suppose that `connstr` doesn't change
let connstr = self.compute.params.connstr.clone();
let conf = self
.compute
.get_conn_conf(Some("compute_ctl:compute_monitor"));
// This is a new logic, only enable if the feature flag is set.
// TODO: remove this once we are sure that it works OR drop it altogether.
if compute.has_feature(ComputeFeature::ActivityMonitorExperimental) {
// First, check if the total active time or sessions across all databases has changed.
// If it did, it means that user executed some queries. In theory, it can even go down if
// some databases were dropped, but it's still a user activity.
match get_database_stats(cli) {
Ok((active_time, sessions)) => {
let mut detected_activity = false;
// During startup and configuration we connect to every Postgres database,
// but we don't want to count this as some user activity. So wait until
// the compute fully started before monitoring activity.
wait_for_postgres_start(&self.compute);
prev_active_time = match prev_active_time {
Some(prev_active_time) => {
if active_time != prev_active_time {
detected_activity = true;
}
Some(active_time)
}
None => Some(active_time),
};
prev_sessions = match prev_sessions {
Some(prev_sessions) => {
if sessions != prev_sessions {
detected_activity = true;
}
Some(sessions)
}
None => Some(sessions),
};
// Define `client` outside of the loop to reuse existing connection if it's active.
let mut client = conf.connect(NoTls);
if detected_activity {
// Update the last active time and continue, we don't need to
// check backends state change.
compute.update_last_active(Some(Utc::now()));
continue;
}
}
Err(e) => {
error!("could not get database statistics: {}", e);
continue;
}
}
}
info!("starting compute monitor for {}", connstr);
// Second, if database statistics is the same, check all backends state change,
// maybe there is some with more recent activity. `get_backends_state_change()`
// can return None or stale timestamp, so it's `compute.update_last_active()`
// responsibility to check if the new timestamp is more recent than the current one.
// This helps us to discover new sessions, that did nothing yet.
match get_backends_state_change(cli) {
Ok(last_active) => {
compute.update_last_active(last_active);
}
Err(e) => {
error!("could not get backends state change: {}", e);
}
}
// Finally, if there are existing (logical) walsenders, do not suspend.
//
// walproposer doesn't currently show up in pg_stat_replication,
// but protect if it will be
let ws_count_query = "select count(*) from pg_stat_replication where application_name != 'walproposer';";
match cli.query_one(ws_count_query, &[]) {
Ok(r) => match r.try_get::<&str, i64>("count") {
Ok(num_ws) => {
if num_ws > 0 {
compute.update_last_active(Some(Utc::now()));
continue;
}
}
Err(e) => {
warn!("failed to parse walsenders count: {:?}", e);
continue;
}
},
Err(e) => {
warn!("failed to get list of walsenders: {:?}", e);
continue;
}
}
//
// Don't suspend compute if there is an active logical replication subscription
//
// `where pid is not null` to filter out read only computes and subscription on branches
//
let logical_subscriptions_query =
"select count(*) from pg_stat_subscription where pid is not null;";
match cli.query_one(logical_subscriptions_query, &[]) {
Ok(row) => match row.try_get::<&str, i64>("count") {
Ok(num_subscribers) => {
if num_subscribers > 0 {
compute.update_last_active(Some(Utc::now()));
continue;
}
}
Err(e) => {
warn!("failed to parse `pg_stat_subscription` count: {:?}", e);
continue;
}
},
Err(e) => {
warn!(
"failed to get list of active logical replication subscriptions: {:?}",
e
loop {
match &mut client {
Ok(cli) => {
if cli.is_closed() {
info!(
downtime_info = self.downtime_info(),
"connection to Postgres is closed, trying to reconnect"
);
continue;
}
}
//
// Do not suspend compute if autovacuum is running
//
let autovacuum_count_query = "select count(*) from pg_stat_activity where backend_type = 'autovacuum worker'";
match cli.query_one(autovacuum_count_query, &[]) {
Ok(r) => match r.try_get::<&str, i64>("count") {
Ok(num_workers) => {
if num_workers > 0 {
compute.update_last_active(Some(Utc::now()));
continue;
self.report_down();
// Connection is closed, reconnect and try again.
client = conf.connect(NoTls);
} else {
match self.check(cli) {
Ok(_) => {
self.report_up();
self.compute.update_last_active(self.last_active);
}
Err(e) => {
// Although we have many places where we can return errors in `check()`,
// normally it shouldn't happen. I.e., we will likely return error if
// connection got broken, query timed out, Postgres returned invalid data, etc.
// In all such cases it's suspicious, so let's report this as downtime.
self.report_down();
error!(
downtime_info = self.downtime_info(),
"could not check Postgres: {}", e
);
// Reconnect to Postgres just in case. During tests, I noticed
// that queries in `check()` can fail with `connection closed`,
// but `cli.is_closed()` above doesn't detect it. Even if old
// connection is still alive, it will be dropped when we reassign
// `client` to a new connection.
client = conf.connect(NoTls);
}
}
Err(e) => {
warn!("failed to parse autovacuum workers count: {:?}", e);
continue;
}
},
Err(e) => {
warn!("failed to get list of autovacuum workers: {:?}", e);
continue;
}
}
}
Err(e) => {
debug!("could not connect to Postgres: {}, retrying", e);
Err(e) => {
info!(
downtime_info = self.downtime_info(),
"could not connect to Postgres: {}, retrying", e
);
self.report_down();
// Establish a new connection and try again.
client = conf.connect(NoTls);
// Establish a new connection and try again.
client = conf.connect(NoTls);
}
}
// Reset the `last_checked` timestamp and sleep before the next iteration.
self.last_checked = Utc::now();
thread::sleep(MONITOR_CHECK_INTERVAL);
}
}
#[instrument(skip_all)]
fn check(&mut self, cli: &mut Client) -> anyhow::Result<()> {
// This is new logic, only enable if the feature flag is set.
// TODO: remove this once we are sure that it works OR drop it altogether.
if self.experimental {
// Check if the total active time or sessions across all databases has changed.
// If it did, it means that user executed some queries. In theory, it can even go down if
// some databases were dropped, but it's still user activity.
match get_database_stats(cli) {
Ok((active_time, sessions)) => {
let mut detected_activity = false;
if let Some(prev_active_time) = self.active_time {
if active_time != prev_active_time {
detected_activity = true;
}
}
self.active_time = Some(active_time);
if let Some(prev_sessions) = self.sessions {
if sessions != prev_sessions {
detected_activity = true;
}
}
self.sessions = Some(sessions);
if detected_activity {
// Update the last active time and continue, we don't need to
// check backends state change.
self.last_active = Some(Utc::now());
return Ok(());
}
}
Err(e) => {
return Err(anyhow::anyhow!("could not get database statistics: {}", e));
}
}
}
// If database statistics are the same, check all backends for state changes.
// Maybe there are some with more recent activity. `get_backends_state_change()`
// can return None or stale timestamp, so it's `compute.update_last_active()`
// responsibility to check if the new timestamp is more recent than the current one.
// This helps us to discover new sessions that have not done anything yet.
match get_backends_state_change(cli) {
Ok(last_active) => match (last_active, self.last_active) {
(Some(last_active), Some(prev_last_active)) => {
if last_active > prev_last_active {
self.last_active = Some(last_active);
return Ok(());
}
}
(Some(last_active), None) => {
self.last_active = Some(last_active);
return Ok(());
}
_ => {}
},
Err(e) => {
return Err(anyhow::anyhow!(
"could not get backends state change: {}",
e
));
}
}
// If there are existing (logical) walsenders, do not suspend.
//
// N.B. walproposer doesn't currently show up in pg_stat_replication,
// but protect if it will.
const WS_COUNT_QUERY: &str =
"select count(*) from pg_stat_replication where application_name != 'walproposer';";
match cli.query_one(WS_COUNT_QUERY, &[]) {
Ok(r) => match r.try_get::<&str, i64>("count") {
Ok(num_ws) => {
if num_ws > 0 {
self.last_active = Some(Utc::now());
return Ok(());
}
}
Err(e) => {
let err: anyhow::Error = e.into();
return Err(err.context("failed to parse walsenders count"));
}
},
Err(e) => {
return Err(anyhow::anyhow!("failed to get list of walsenders: {}", e));
}
}
// Don't suspend compute if there is an active logical replication subscription
//
// `where pid is not null` to filter out read only computes and subscription on branches
const LOGICAL_SUBSCRIPTIONS_QUERY: &str =
"select count(*) from pg_stat_subscription where pid is not null;";
match cli.query_one(LOGICAL_SUBSCRIPTIONS_QUERY, &[]) {
Ok(row) => match row.try_get::<&str, i64>("count") {
Ok(num_subscribers) => {
if num_subscribers > 0 {
self.last_active = Some(Utc::now());
return Ok(());
}
}
Err(e) => {
return Err(anyhow::anyhow!(
"failed to parse 'pg_stat_subscription' count: {}",
e
));
}
},
Err(e) => {
return Err(anyhow::anyhow!(
"failed to get list of active logical replication subscriptions: {}",
e
));
}
}
// Do not suspend compute if autovacuum is running
const AUTOVACUUM_COUNT_QUERY: &str =
"select count(*) from pg_stat_activity where backend_type = 'autovacuum worker'";
match cli.query_one(AUTOVACUUM_COUNT_QUERY, &[]) {
Ok(r) => match r.try_get::<&str, i64>("count") {
Ok(num_workers) => {
if num_workers > 0 {
self.last_active = Some(Utc::now());
return Ok(());
};
}
Err(e) => {
return Err(anyhow::anyhow!(
"failed to parse autovacuum workers count: {}",
e
));
}
},
Err(e) => {
return Err(anyhow::anyhow!(
"failed to get list of autovacuum workers: {}",
e
));
}
}
Ok(())
}
}
@@ -315,9 +412,24 @@ fn get_backends_state_change(cli: &mut Client) -> anyhow::Result<Option<DateTime
/// Launch a separate compute monitor thread and return its `JoinHandle`.
pub fn launch_monitor(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
let compute = Arc::clone(compute);
let experimental = compute.has_feature(ComputeFeature::ActivityMonitorExperimental);
let now = Utc::now();
let mut monitor = ComputeMonitor {
compute,
last_active: None,
last_checked: now,
last_up: now,
active_time: None,
sessions: None,
experimental,
};
let span = span!(Level::INFO, "compute_monitor");
thread::Builder::new()
.name("compute-monitor".into())
.spawn(move || watch_compute_activity(&compute))
.spawn(move || {
let _enter = span.enter();
monitor.run();
})
.expect("cannot launch compute monitor thread")
}

View File

@@ -3,7 +3,6 @@ use std::{io::Write, os::unix::fs::OpenOptionsExt, path::Path, time::Duration};
use anyhow::{Context, Result, bail};
use compute_api::responses::TlsConfig;
use ring::digest;
use spki::der::{Decode, PemReader};
use x509_cert::Certificate;
#[derive(Clone, Copy)]
@@ -52,7 +51,7 @@ pub fn update_key_path_blocking(pg_data: &Path, tls_config: &TlsConfig) {
match try_update_key_path_blocking(pg_data, tls_config) {
Ok(()) => break,
Err(e) => {
tracing::error!("could not create key file {e:?}");
tracing::error!(error = ?e, "could not create key file");
std::thread::sleep(Duration::from_secs(1))
}
}
@@ -92,8 +91,14 @@ fn try_update_key_path_blocking(pg_data: &Path, tls_config: &TlsConfig) -> Resul
fn verify_key_cert(key: &str, cert: &str) -> Result<()> {
use x509_cert::der::oid::db::rfc5912::ECDSA_WITH_SHA_256;
let cert = Certificate::decode(&mut PemReader::new(cert.as_bytes()).context("pem reader")?)
.context("decode cert")?;
let certs = Certificate::load_pem_chain(cert.as_bytes())
.context("decoding PEM encoded certificates")?;
// First certificate is our server-cert,
// all the rest of the certs are the CA cert chain.
let Some(cert) = certs.first() else {
bail!("no certificates found");
};
match cert.signature_algorithm.oid {
ECDSA_WITH_SHA_256 => {
@@ -115,3 +120,82 @@ fn verify_key_cert(key: &str, cert: &str) -> Result<()> {
Ok(())
}
#[cfg(test)]
mod tests {
use super::verify_key_cert;
/// Real certificate chain file, generated by cert-manager in dev.
/// The server auth certificate has expired since 2025-04-24T15:41:35Z.
const CERT: &str = "
-----BEGIN CERTIFICATE-----
MIICCDCCAa+gAwIBAgIQKhLomFcNULbZA/bPdGzaSzAKBggqhkjOPQQDAjBEMQsw
CQYDVQQGEwJVUzESMBAGA1UEChMJTmVvbiBJbmMuMSEwHwYDVQQDExhOZW9uIEs4
cyBJbnRlcm1lZGlhdGUgQ0EwHhcNMjUwNDIzMTU0MTM1WhcNMjUwNDI0MTU0MTM1
WjBBMT8wPQYDVQQDEzZjb21wdXRlLXdpc3B5LWdyYXNzLXcwY21laWp3LmRlZmF1
bHQuc3ZjLmNsdXN0ZXIubG9jYWwwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAATF
QCcG2m/EVHAiZtSsYgVnHgoTjUL/Jtwfdrpvz2t0bVRZmBmSKhlo53uPV9Y5eKFG
AmR54p9/gT2eO3xU7vAgo4GFMIGCMA4GA1UdDwEB/wQEAwIFoDAMBgNVHRMBAf8E
AjAAMB8GA1UdIwQYMBaAFFR2JAhXkeiNQNEixTvAYIwxUu3QMEEGA1UdEQQ6MDiC
NmNvbXB1dGUtd2lzcHktZ3Jhc3MtdzBjbWVpancuZGVmYXVsdC5zdmMuY2x1c3Rl
ci5sb2NhbDAKBggqhkjOPQQDAgNHADBEAiBLG22wKG8XS9e9RxBT+kmUx/kIThcP
DIpp7jx0PrFcdQIgEMTdnXpx5Cv/Z0NIEDxtMHUD7G0vuRPfztki36JuakM=
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIICFzCCAb6gAwIBAgIUbbX98N2Ip6lWAONRk8dU9hSz+YIwCgYIKoZIzj0EAwIw
RDELMAkGA1UEBhMCVVMxEjAQBgNVBAoTCU5lb24gSW5jLjEhMB8GA1UEAxMYTmVv
biBBV1MgSW50ZXJtZWRpYXRlIENBMB4XDTI1MDQyMjE1MTAxMFoXDTI1MDcyMTE1
MTAxMFowRDELMAkGA1UEBhMCVVMxEjAQBgNVBAoTCU5lb24gSW5jLjEhMB8GA1UE
AxMYTmVvbiBLOHMgSW50ZXJtZWRpYXRlIENBMFkwEwYHKoZIzj0CAQYIKoZIzj0D
AQcDQgAE5++m5owqNI4BPMTVNIUQH0qvU7pYhdpHGVGhdj/Lgars6ROvE6uSNQV4
SAmJN5HBzj5/6kLQaTPWpXW7EHXjK6OBjTCBijAOBgNVHQ8BAf8EBAMCAQYwEgYD
VR0TAQH/BAgwBgEB/wIBADAdBgNVHQ4EFgQUVHYkCFeR6I1A0SLFO8BgjDFS7dAw
HwYDVR0jBBgwFoAUgHfNXfyKtHO0V9qoLOWCjkNiaI8wJAYDVR0eAQH/BBowGKAW
MBSCEi5zdmMuY2x1c3Rlci5sb2NhbDAKBggqhkjOPQQDAgNHADBEAiBObVFFdXaL
QpOXmN60dYUNnQRwjKreFduEkQgOdOlssgIgVAdJJQFgvlrvEOBhY8j5WyeKRwUN
k/ALs6KpgaFBCGY=
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIB4jCCAYegAwIBAgIUFlxWFn/11yoGdmD+6gf+yQMToS0wCgYIKoZIzj0EAwIw
ODELMAkGA1UEBhMCVVMxEjAQBgNVBAoTCU5lb24gSW5jLjEVMBMGA1UEAxMMTmVv
biBSb290IENBMB4XDTI1MDQwMzA3MTUyMloXDTI2MDQwMzA3MTUyMlowRDELMAkG
A1UEBhMCVVMxEjAQBgNVBAoTCU5lb24gSW5jLjEhMB8GA1UEAxMYTmVvbiBBV1Mg
SW50ZXJtZWRpYXRlIENBMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEqonG/IQ6
ZxtEtOUTkkoNopPieXDO5CBKUkNFTGeJEB7OxRlSpYJgsBpaYIaD6Vc4sVk3thIF
p+pLw52idQOIN6NjMGEwDgYDVR0PAQH/BAQDAgEGMA8GA1UdEwEB/wQFMAMBAf8w
HQYDVR0OBBYEFIB3zV38irRztFfaqCzlgo5DYmiPMB8GA1UdIwQYMBaAFKh7M4/G
FHvr/ORDQZt4bMLlJvHCMAoGCCqGSM49BAMCA0kAMEYCIQCbS4x7QPslONzBYbjC
UQaQ0QLDW4CJHvQ4u4gbWFG87wIhAJMsHQHjP9qTT27Q65zQCR7O8QeLAfha1jrH
Ag/LsxSr
-----END CERTIFICATE-----
";
/// The key corresponding to [`CERT`]
const KEY: &str = "
-----BEGIN EC PRIVATE KEY-----
MHcCAQEEIDnAnrqmIJjndCLWP1iIO5X3X63Aia48TGpGuMXwvm6IoAoGCCqGSM49
AwEHoUQDQgAExUAnBtpvxFRwImbUrGIFZx4KE41C/ybcH3a6b89rdG1UWZgZkioZ
aOd7j1fWOXihRgJkeeKff4E9njt8VO7wIA==
-----END EC PRIVATE KEY-----
";
/// An incorrect key.
const INCORRECT_KEY: &str = "
-----BEGIN EC PRIVATE KEY-----
MHcCAQEEIL6WqqBDyvM0HWz7Ir5M5+jhFWB7IzOClGn26OPrzHCXoAoGCCqGSM49
AwEHoUQDQgAE7XVvdOy5lfwtNKb+gJEUtnG+DrnnXLY5LsHDeGQKV9PTRcEMeCrG
YZzHyML4P6Sr4yi2ts+4B9i47uvAG8+XwQ==
-----END EC PRIVATE KEY-----
";
#[test]
fn certificate_verification() {
verify_key_cert(KEY, CERT).unwrap();
}
#[test]
#[should_panic(expected = "private key file does not match certificate")]
fn certificate_verification_fail() {
verify_key_cert(INCORRECT_KEY, CERT).unwrap();
}
}

View File

@@ -9,21 +9,20 @@
# to verify custom image builds (e.g pre-published ones).
#
# A test script for postgres extensions
# Currently supports only v16
# Currently supports only v16+
#
set -eux -o pipefail
COMPOSE_FILE='docker-compose.yml'
cd $(dirname $0)
COMPUTE_CONTAINER_NAME=docker-compose-compute-1
TEST_CONTAINER_NAME=docker-compose-neon-test-extensions-1
export COMPOSE_FILE='docker-compose.yml'
export COMPOSE_PROFILES=test-extensions
cd "$(dirname "${0}")"
PSQL_OPTION="-h localhost -U cloud_admin -p 55433 -d postgres"
cleanup() {
function cleanup() {
echo "show container information"
docker ps
echo "stop containers..."
docker compose --profile test-extensions -f $COMPOSE_FILE down
docker compose down
}
for pg_version in ${TEST_VERSION_ONLY-14 15 16 17}; do
@@ -31,55 +30,55 @@ for pg_version in ${TEST_VERSION_ONLY-14 15 16 17}; do
echo "clean up containers if exists"
cleanup
PG_TEST_VERSION=$((pg_version < 16 ? 16 : pg_version))
PG_VERSION=$pg_version PG_TEST_VERSION=$PG_TEST_VERSION docker compose --profile test-extensions -f $COMPOSE_FILE up --quiet-pull --build -d
PG_VERSION=${pg_version} PG_TEST_VERSION=${PG_TEST_VERSION} docker compose up --quiet-pull --build -d
echo "wait until the compute is ready. timeout after 60s. "
cnt=0
while sleep 3; do
# check timeout
cnt=`expr $cnt + 3`
if [ $cnt -gt 60 ]; then
(( cnt += 3 ))
if [[ ${cnt} -gt 60 ]]; then
echo "timeout before the compute is ready."
exit 1
fi
if docker compose --profile test-extensions -f $COMPOSE_FILE logs "compute_is_ready" | grep -q "accepting connections"; then
if docker compose logs "compute_is_ready" | grep -q "accepting connections"; then
echo "OK. The compute is ready to connect."
echo "execute simple queries."
docker exec $COMPUTE_CONTAINER_NAME /bin/bash -c "psql $PSQL_OPTION"
docker compose exec compute /bin/bash -c "psql ${PSQL_OPTION} -c 'SELECT 1'"
break
fi
done
if [ $pg_version -ge 16 ]; then
if [[ ${pg_version} -ge 16 ]]; then
# This is required for the pg_hint_plan test, to prevent flaky log message causing the test to fail
# It cannot be moved to Dockerfile now because the database directory is created after the start of the container
echo Adding dummy config
docker exec $COMPUTE_CONTAINER_NAME touch /var/db/postgres/compute/compute_ctl_temp_override.conf
docker compose exec compute touch /var/db/postgres/compute/compute_ctl_temp_override.conf
# The following block copies the files for the pg_hintplan test to the compute node for the extension test in an isolated docker-compose environment
TMPDIR=$(mktemp -d)
docker cp $TEST_CONTAINER_NAME:/ext-src/pg_hint_plan-src/data $TMPDIR/data
docker cp $TMPDIR/data $COMPUTE_CONTAINER_NAME:/ext-src/pg_hint_plan-src/
rm -rf $TMPDIR
docker compose cp neon-test-extensions:/ext-src/pg_hint_plan-src/data "${TMPDIR}/data"
docker compose cp "${TMPDIR}/data" compute:/ext-src/pg_hint_plan-src/
rm -rf "${TMPDIR}"
# The following block does the same for the contrib/file_fdw test
TMPDIR=$(mktemp -d)
docker cp $TEST_CONTAINER_NAME:/postgres/contrib/file_fdw/data $TMPDIR/data
docker cp $TMPDIR/data $COMPUTE_CONTAINER_NAME:/postgres/contrib/file_fdw/data
rm -rf $TMPDIR
docker compose cp neon-test-extensions:/postgres/contrib/file_fdw/data "${TMPDIR}/data"
docker compose cp "${TMPDIR}/data" compute:/postgres/contrib/file_fdw/data
rm -rf "${TMPDIR}"
# Apply patches
cat ../compute/patches/contrib_pg${pg_version}.patch | docker exec -i $TEST_CONTAINER_NAME bash -c "(cd /postgres && patch -p1)"
docker compose exec -i neon-test-extensions bash -c "(cd /postgres && patch -p1)" <"../compute/patches/contrib_pg${pg_version}.patch"
# We are running tests now
rm -f testout.txt testout_contrib.txt
docker exec -e USE_PGXS=1 -e SKIP=timescaledb-src,rdkit-src,postgis-src,pg_jsonschema-src,kq_imcx-src,wal2json_2_5-src,rag_jina_reranker_v1_tiny_en-src,rag_bge_small_en_v15-src \
$TEST_CONTAINER_NAME /run-tests.sh /ext-src | tee testout.txt && EXT_SUCCESS=1 || EXT_SUCCESS=0
docker exec -e SKIP=start-scripts,postgres_fdw,ltree_plpython,jsonb_plpython,jsonb_plperl,hstore_plpython,hstore_plperl,dblink,bool_plperl \
$TEST_CONTAINER_NAME /run-tests.sh /postgres/contrib | tee testout_contrib.txt && CONTRIB_SUCCESS=1 || CONTRIB_SUCCESS=0
if [ $EXT_SUCCESS -eq 0 ] || [ $CONTRIB_SUCCESS -eq 0 ]; then
docker compose exec -e USE_PGXS=1 -e SKIP=timescaledb-src,rdkit-src,postgis-src,pg_jsonschema-src,kq_imcx-src,wal2json_2_5-src,rag_jina_reranker_v1_tiny_en-src,rag_bge_small_en_v15-src \
neon-test-extensions /run-tests.sh /ext-src | tee testout.txt && EXT_SUCCESS=1 || EXT_SUCCESS=0
docker compose exec -e SKIP=start-scripts,postgres_fdw,ltree_plpython,jsonb_plpython,jsonb_plperl,hstore_plpython,hstore_plperl,dblink,bool_plperl \
neon-test-extensions /run-tests.sh /postgres/contrib | tee testout_contrib.txt && CONTRIB_SUCCESS=1 || CONTRIB_SUCCESS=0
if [[ ${EXT_SUCCESS} -eq 0 || ${CONTRIB_SUCCESS} -eq 0 ]]; then
CONTRIB_FAILED=
FAILED=
[ $EXT_SUCCESS -eq 0 ] && FAILED=$(tail -1 testout.txt | awk '{for(i=1;i<=NF;i++){print "/ext-src/"$i;}}')
[ $CONTRIB_SUCCESS -eq 0 ] && CONTRIB_FAILED=$(tail -1 testout_contrib.txt | awk '{for(i=0;i<=NF;i++){print "/postgres/contrib/"$i;}}')
for d in $FAILED $CONTRIB_FAILED; do
docker exec $TEST_CONTAINER_NAME bash -c 'for file in $(find '"$d"' -name regression.diffs -o -name regression.out); do cat $file; done' || [ $? -eq 1 ]
[[ ${EXT_SUCCESS} -eq 0 ]] && FAILED=$(tail -1 testout.txt | awk '{for(i=1;i<=NF;i++){print "/ext-src/"$i;}}')
[[ ${CONTRIB_SUCCESS} -eq 0 ]] && CONTRIB_FAILED=$(tail -1 testout_contrib.txt | awk '{for(i=0;i<=NF;i++){print "/postgres/contrib/"$i;}}')
for d in ${FAILED} ${CONTRIB_FAILED}; do
docker compose exec neon-test-extensions bash -c 'for file in $(find '"${d}"' -name regression.diffs -o -name regression.out); do cat ${file}; done' || [[ ${?} -eq 1 ]]
done
exit 1
fi

View File

@@ -169,6 +169,8 @@ pub struct TenantDescribeResponseShard {
pub is_pending_compute_notification: bool,
/// A shard split is currently underway
pub is_splitting: bool,
/// A timeline is being imported into this tenant
pub is_importing: bool,
pub scheduling_policy: ShardSchedulingPolicy,

View File

@@ -1803,6 +1803,8 @@ pub struct TopTenantShardsResponse {
}
pub mod virtual_file {
use std::sync::LazyLock;
#[derive(
Copy,
Clone,
@@ -1840,35 +1842,33 @@ pub mod virtual_file {
pub enum IoMode {
/// Uses buffered IO.
Buffered,
/// Uses direct IO, error out if the operation fails.
/// Uses direct IO for reads only.
#[cfg(target_os = "linux")]
Direct,
/// Use direct IO for reads and writes.
#[cfg(target_os = "linux")]
DirectRw,
}
impl IoMode {
pub fn preferred() -> Self {
// The default behavior when running Rust unit tests without any further
// flags is to use the newest behavior if available on the platform (Direct).
// flags is to use the newest behavior (DirectRw).
// The CI uses the following environment variable to unit tests for all
// different modes.
// NB: the Python regression & perf tests have their own defaults management
// that writes pageserver.toml; they do not use this variable.
if cfg!(test) {
use once_cell::sync::Lazy;
static CACHED: Lazy<IoMode> = Lazy::new(|| {
static CACHED: LazyLock<IoMode> = LazyLock::new(|| {
utils::env::var_serde_json_string(
"NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IO_MODE",
)
.unwrap_or({
.unwrap_or(
#[cfg(target_os = "linux")]
{
IoMode::Direct
}
IoMode::DirectRw,
#[cfg(not(target_os = "linux"))]
{
IoMode::Buffered
}
})
IoMode::Buffered,
)
});
*CACHED
} else {
@@ -1885,6 +1885,8 @@ pub mod virtual_file {
v if v == (IoMode::Buffered as u8) => IoMode::Buffered,
#[cfg(target_os = "linux")]
v if v == (IoMode::Direct as u8) => IoMode::Direct,
#[cfg(target_os = "linux")]
v if v == (IoMode::DirectRw as u8) => IoMode::DirectRw,
x => return Err(x),
})
}

View File

@@ -106,6 +106,7 @@ hex-literal.workspace = true
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time", "test-util"] }
indoc.workspace = true
uuid.workspace = true
rstest.workspace = true
[[bench]]
name = "bench_layer_map"

View File

@@ -61,7 +61,7 @@ async fn ingest(
tokio::fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id)).await?;
let ctx2 =
let ctx =
RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error).with_scope_debug_tools();
let gate = utils::sync::gate::Gate::default();
@@ -248,6 +248,8 @@ fn criterion_benchmark(c: &mut Criterion) {
IoMode::Buffered,
#[cfg(target_os = "linux")]
IoMode::Direct,
#[cfg(target_os = "linux")]
IoMode::DirectRw,
] {
for param in expect.clone() {
let HandPickedParameters {
@@ -309,78 +311,114 @@ cargo bench --bench bench_ingest
im4gn.2xlarge:
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
time: [1.8491 s 1.8540 s 1.8592 s]
thrpt: [68.847 MiB/s 69.039 MiB/s 69.222 MiB/s]
time: [1.2901 s 1.2943 s 1.2991 s]
thrpt: [98.533 MiB/s 98.892 MiB/s 99.220 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
time: [2.6976 s 2.7123 s 2.7286 s]
thrpt: [46.911 MiB/s 47.193 MiB/s 47.450 MiB/s]
time: [2.1387 s 2.1623 s 2.1845 s]
thrpt: [58.595 MiB/s 59.197 MiB/s 59.851 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Y...
time: [1.7433 s 1.7510 s 1.7600 s]
thrpt: [72.729 MiB/s 73.099 MiB/s 73.423 MiB/s]
time: [1.2036 s 1.2074 s 1.2122 s]
thrpt: [105.60 MiB/s 106.01 MiB/s 106.35 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
time: [499.63 ms 500.07 ms 500.46 ms]
thrpt: [255.77 MiB/s 255.96 MiB/s 256.19 MiB/s]
time: [520.55 ms 521.46 ms 522.57 ms]
thrpt: [244.94 MiB/s 245.47 MiB/s 245.89 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
time: [456.97 ms 459.61 ms 461.92 ms]
thrpt: [277.11 MiB/s 278.50 MiB/s 280.11 MiB/s]
time: [440.33 ms 442.24 ms 444.10 ms]
thrpt: [288.22 MiB/s 289.43 MiB/s 290.69 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
time: [158.82 ms 159.16 ms 159.56 ms]
thrpt: [802.22 MiB/s 804.24 MiB/s 805.93 MiB/s]
time: [168.78 ms 169.42 ms 170.18 ms]
thrpt: [752.16 MiB/s 755.52 MiB/s 758.40 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
time: [1.8856 s 1.8997 s 1.9179 s]
thrpt: [66.740 MiB/s 67.380 MiB/s 67.882 MiB/s]
time: [1.2978 s 1.3094 s 1.3227 s]
thrpt: [96.775 MiB/s 97.758 MiB/s 98.632 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
time: [2.7468 s 2.7625 s 2.7785 s]
thrpt: [46.068 MiB/s 46.335 MiB/s 46.600 MiB/s]
time: [2.1976 s 2.2067 s 2.2154 s]
thrpt: [57.777 MiB/s 58.006 MiB/s 58.245 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Yes
time: [1.7689 s 1.7726 s 1.7767 s]
thrpt: [72.045 MiB/s 72.208 MiB/s 72.363 MiB/s]
time: [1.2103 s 1.2160 s 1.2233 s]
thrpt: [104.64 MiB/s 105.26 MiB/s 105.76 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
time: [497.64 ms 498.60 ms 499.67 ms]
thrpt: [256.17 MiB/s 256.72 MiB/s 257.21 MiB/s]
time: [525.05 ms 526.37 ms 527.79 ms]
thrpt: [242.52 MiB/s 243.17 MiB/s 243.79 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
time: [493.72 ms 505.07 ms 518.03 ms]
thrpt: [247.09 MiB/s 253.43 MiB/s 259.26 MiB/s]
time: [443.06 ms 444.88 ms 447.15 ms]
thrpt: [286.26 MiB/s 287.72 MiB/s 288.90 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
time: [267.76 ms 267.85 ms 267.96 ms]
thrpt: [477.69 MiB/s 477.88 MiB/s 478.03 MiB/s]
time: [169.40 ms 169.80 ms 170.17 ms]
thrpt: [752.21 MiB/s 753.81 MiB/s 755.60 MiB/s]
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
time: [1.2844 s 1.2915 s 1.2990 s]
thrpt: [98.536 MiB/s 99.112 MiB/s 99.657 MiB/s]
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
time: [2.1431 s 2.1663 s 2.1900 s]
thrpt: [58.446 MiB/s 59.087 MiB/s 59.726 MiB/s]
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Y...
time: [1.1906 s 1.1926 s 1.1947 s]
thrpt: [107.14 MiB/s 107.33 MiB/s 107.51 MiB/s]
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
time: [516.86 ms 518.25 ms 519.47 ms]
thrpt: [246.40 MiB/s 246.98 MiB/s 247.65 MiB/s]
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
time: [536.50 ms 536.53 ms 536.60 ms]
thrpt: [238.54 MiB/s 238.57 MiB/s 238.59 MiB/s]
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
time: [267.77 ms 267.90 ms 268.04 ms]
thrpt: [477.53 MiB/s 477.79 MiB/s 478.02 MiB/s]
Hetzner AX102:
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
time: [1.0683 s 1.1006 s 1.1386 s]
thrpt: [112.42 MiB/s 116.30 MiB/s 119.82 MiB/s]
time: [836.58 ms 861.93 ms 886.57 ms]
thrpt: [144.38 MiB/s 148.50 MiB/s 153.00 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
time: [1.5719 s 1.6012 s 1.6228 s]
thrpt: [78.877 MiB/s 79.938 MiB/s 81.430 MiB/s]
time: [1.2782 s 1.3191 s 1.3665 s]
thrpt: [93.668 MiB/s 97.037 MiB/s 100.14 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Y...
time: [1.1095 s 1.1331 s 1.1580 s]
thrpt: [110.53 MiB/s 112.97 MiB/s 115.37 MiB/s]
time: [791.27 ms 807.08 ms 822.95 ms]
thrpt: [155.54 MiB/s 158.60 MiB/s 161.77 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
time: [303.20 ms 307.83 ms 311.90 ms]
thrpt: [410.39 MiB/s 415.81 MiB/s 422.16 MiB/s]
time: [310.78 ms 314.66 ms 318.47 ms]
thrpt: [401.92 MiB/s 406.79 MiB/s 411.87 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
time: [406.34 ms 429.37 ms 451.63 ms]
thrpt: [283.42 MiB/s 298.11 MiB/s 315.00 MiB/s]
time: [377.11 ms 387.77 ms 399.21 ms]
thrpt: [320.63 MiB/s 330.10 MiB/s 339.42 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
time: [134.01 ms 135.78 ms 137.48 ms]
thrpt: [931.03 MiB/s 942.68 MiB/s 955.12 MiB/s]
time: [128.37 ms 132.96 ms 138.55 ms]
thrpt: [923.83 MiB/s 962.69 MiB/s 997.11 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
time: [1.0406 s 1.0580 s 1.0772 s]
thrpt: [118.83 MiB/s 120.98 MiB/s 123.00 MiB/s]
time: [900.38 ms 914.88 ms 928.86 ms]
thrpt: [137.80 MiB/s 139.91 MiB/s 142.16 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
time: [1.5059 s 1.5339 s 1.5625 s]
thrpt: [81.920 MiB/s 83.448 MiB/s 84.999 MiB/s]
time: [1.2538 s 1.2936 s 1.3313 s]
thrpt: [96.149 MiB/s 98.946 MiB/s 102.09 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Yes
time: [1.0714 s 1.0934 s 1.1161 s]
thrpt: [114.69 MiB/s 117.06 MiB/s 119.47 MiB/s]
time: [787.17 ms 803.89 ms 820.63 ms]
thrpt: [155.98 MiB/s 159.23 MiB/s 162.61 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
time: [262.68 ms 265.14 ms 267.71 ms]
thrpt: [478.13 MiB/s 482.76 MiB/s 487.29 MiB/s]
time: [318.78 ms 321.89 ms 324.74 ms]
thrpt: [394.16 MiB/s 397.65 MiB/s 401.53 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
time: [375.19 ms 393.80 ms 411.40 ms]
thrpt: [311.14 MiB/s 325.04 MiB/s 341.16 MiB/s]
time: [374.01 ms 383.45 ms 393.20 ms]
thrpt: [325.53 MiB/s 333.81 MiB/s 342.24 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
time: [123.02 ms 123.85 ms 124.66 ms]
thrpt: [1.0027 GiB/s 1.0093 GiB/s 1.0161 GiB/s]
time: [137.98 ms 141.31 ms 143.57 ms]
thrpt: [891.58 MiB/s 905.79 MiB/s 927.66 MiB/s]
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
time: [613.69 ms 622.48 ms 630.97 ms]
thrpt: [202.86 MiB/s 205.63 MiB/s 208.57 MiB/s]
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
time: [1.0299 s 1.0766 s 1.1273 s]
thrpt: [113.55 MiB/s 118.90 MiB/s 124.29 MiB/s]
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Y...
time: [637.80 ms 647.78 ms 658.01 ms]
thrpt: [194.53 MiB/s 197.60 MiB/s 200.69 MiB/s]
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
time: [266.09 ms 267.20 ms 268.31 ms]
thrpt: [477.06 MiB/s 479.04 MiB/s 481.04 MiB/s]
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
time: [269.34 ms 273.27 ms 277.69 ms]
thrpt: [460.95 MiB/s 468.40 MiB/s 475.24 MiB/s]
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
time: [123.18 ms 124.24 ms 125.15 ms]
thrpt: [1022.8 MiB/s 1.0061 GiB/s 1.0148 GiB/s]
*/

View File

@@ -1289,6 +1289,7 @@ pub(crate) enum StorageIoOperation {
Seek,
Fsync,
Metadata,
SetLen,
}
impl StorageIoOperation {
@@ -1303,6 +1304,7 @@ impl StorageIoOperation {
StorageIoOperation::Seek => "seek",
StorageIoOperation::Fsync => "fsync",
StorageIoOperation::Metadata => "metadata",
StorageIoOperation::SetLen => "set_len",
}
}
}

View File

@@ -3816,6 +3816,24 @@ impl TenantShard {
MaybeDeletedIndexPart::IndexPart(p) => p,
};
// A shard split may not take place while a timeline import is on-going
// for the tenant. Timeline imports run as part of each tenant shard
// and rely on the sharding scheme to split the work among pageservers.
// If we were to split in the middle of this process, we would have to
// either ensure that it's driven to completion on the old shard set
// or transfer it to the new shard set. It's technically possible, but complex.
match index_part.import_pgdata {
Some(ref import) if !import.is_done() => {
anyhow::bail!(
"Cannot split due to import with idempotency key: {:?}",
import.idempotency_key()
);
}
Some(_) | None => {
// fallthrough
}
}
for child_shard in child_shards {
tracing::info!(%timeline_id, "Uploading index_part for child {}", child_shard.to_index());
upload_index_part(

View File

@@ -15,21 +15,23 @@
//! len >= 128: 1CCCXXXX XXXXXXXX XXXXXXXX XXXXXXXX
//!
use std::cmp::min;
use std::io::Error;
use anyhow::Context;
use async_compression::Level;
use bytes::{BufMut, BytesMut};
use pageserver_api::models::ImageCompressionAlgorithm;
use tokio::io::AsyncWriteExt;
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
use tokio_epoll_uring::IoBuf;
use tokio_util::sync::CancellationToken;
use tracing::warn;
use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
use crate::tenant::block_io::BlockCursor;
use crate::virtual_file::TempVirtualFile;
use crate::virtual_file::IoBufferMut;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
use crate::virtual_file::owned_buffers_io::write::{BufferedWriter, FlushTaskError};
use crate::virtual_file::owned_buffers_io::write::{BufferedWriterShutdownMode, OwnedAsyncWriter};
#[derive(Copy, Clone, Debug)]
pub struct CompressionInfo {
@@ -50,12 +52,9 @@ pub struct Header {
impl Header {
/// Decodes a header from a byte slice.
pub fn decode(bytes: &[u8]) -> Result<Self, std::io::Error> {
pub fn decode(bytes: &[u8]) -> anyhow::Result<Self> {
let Some(&first_header_byte) = bytes.first() else {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"zero-length blob header",
));
anyhow::bail!("zero-length blob header");
};
// If the first bit is 0, this is just a 1-byte length prefix up to 128 bytes.
@@ -69,12 +68,9 @@ impl Header {
// Otherwise, this is a 4-byte header containing compression information and length.
const HEADER_LEN: usize = 4;
let mut header_buf: [u8; HEADER_LEN] = bytes[0..HEADER_LEN].try_into().map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("blob header too short: {bytes:?}"),
)
})?;
let mut header_buf: [u8; HEADER_LEN] = bytes[0..HEADER_LEN]
.try_into()
.map_err(|_| anyhow::anyhow!("blob header too short: {bytes:?}"))?;
// TODO: verify the compression bits and convert to an enum.
let compression_bits = header_buf[0] & LEN_COMPRESSION_BIT_MASK;
@@ -94,6 +90,16 @@ impl Header {
}
}
#[derive(Debug, thiserror::Error)]
pub enum WriteBlobError {
#[error(transparent)]
Flush(FlushTaskError),
#[error("blob too large ({len} bytes)")]
BlobTooLarge { len: usize },
#[error(transparent)]
WriteBlobRaw(anyhow::Error),
}
impl BlockCursor<'_> {
/// Read a blob into a new buffer.
pub async fn read_blob(
@@ -213,143 +219,64 @@ pub(super) const BYTE_UNCOMPRESSED: u8 = 0x80;
pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
/// A wrapper of `VirtualFile` that allows users to write blobs.
///
/// If a `BlobWriter` is dropped, the internal buffer will be
/// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
/// manually before dropping.
pub struct BlobWriter<const BUFFERED: bool> {
inner: TempVirtualFile,
offset: u64,
/// A buffer to save on write calls, only used if BUFFERED=true
buf: Vec<u8>,
pub struct BlobWriter<W> {
/// We do tiny writes for the length headers; they need to be in an owned buffer;
io_buf: Option<BytesMut>,
writer: BufferedWriter<IoBufferMut, W>,
offset: u64,
}
impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
impl<W> BlobWriter<W>
where
W: OwnedAsyncWriter + std::fmt::Debug + Send + Sync + 'static,
{
/// See [`BufferedWriter`] struct-level doc comment for semantics of `start_offset`.
pub fn new(
inner: TempVirtualFile,
file: W,
start_offset: u64,
_gate: &utils::sync::gate::Gate,
_cancel: CancellationToken,
_ctx: &RequestContext,
) -> Self {
Self {
inner,
offset: start_offset,
buf: Vec::with_capacity(Self::CAPACITY),
gate: &utils::sync::gate::Gate,
cancel: CancellationToken,
ctx: &RequestContext,
flush_task_span: tracing::Span,
) -> anyhow::Result<Self> {
Ok(Self {
io_buf: Some(BytesMut::new()),
}
writer: BufferedWriter::new(
file,
start_offset,
|| IoBufferMut::with_capacity(Self::CAPACITY),
gate.enter()?,
cancel,
ctx,
flush_task_span,
),
offset: start_offset,
})
}
pub fn size(&self) -> u64 {
self.offset
}
const CAPACITY: usize = if BUFFERED { 64 * 1024 } else { 0 };
const CAPACITY: usize = 64 * 1024;
/// Writes the given buffer directly to the underlying `VirtualFile`.
/// You need to make sure that the internal buffer is empty, otherwise
/// data will be written in wrong order.
#[inline(always)]
async fn write_all_unbuffered<Buf: IoBuf + Send>(
&mut self,
src_buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<(), Error>) {
let (src_buf, res) = self.inner.write_all(src_buf, ctx).await;
let nbytes = match res {
Ok(nbytes) => nbytes,
Err(e) => return (src_buf, Err(e)),
};
self.offset += nbytes as u64;
(src_buf, Ok(()))
}
#[inline(always)]
/// Flushes the internal buffer to the underlying `VirtualFile`.
pub async fn flush_buffer(&mut self, ctx: &RequestContext) -> Result<(), Error> {
let buf = std::mem::take(&mut self.buf);
let (slice, res) = self.inner.write_all(buf.slice_len(), ctx).await;
res?;
let mut buf = slice.into_raw_slice().into_inner();
buf.clear();
self.buf = buf;
Ok(())
}
#[inline(always)]
/// Writes as much of `src_buf` into the internal buffer as it fits
fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize {
let remaining = Self::CAPACITY - self.buf.len();
let to_copy = src_buf.len().min(remaining);
self.buf.extend_from_slice(&src_buf[..to_copy]);
self.offset += to_copy as u64;
to_copy
}
/// Internal, possibly buffered, write function
/// Writes `src_buf` to the file at the current offset.
async fn write_all<Buf: IoBuf + Send>(
&mut self,
src_buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<(), Error>) {
let src_buf = src_buf.into_raw_slice();
let src_buf_bounds = src_buf.bounds();
let restore = move |src_buf_slice: Slice<_>| {
FullSlice::must_new(Slice::from_buf_bounds(
src_buf_slice.into_inner(),
src_buf_bounds,
))
};
) -> (FullSlice<Buf>, Result<(), FlushTaskError>) {
let res = self
.writer
// TODO: why are we taking a FullSlice if we're going to pass a borrow downstack?
// Can remove all the complexity around owned buffers upstack
.write_buffered_borrowed(&src_buf, ctx)
.await
.map(|len| {
self.offset += len as u64;
});
if !BUFFERED {
assert!(self.buf.is_empty());
return self
.write_all_unbuffered(FullSlice::must_new(src_buf), ctx)
.await;
}
let remaining = Self::CAPACITY - self.buf.len();
let src_buf_len = src_buf.bytes_init();
if src_buf_len == 0 {
return (restore(src_buf), Ok(()));
}
let mut src_buf = src_buf.slice(0..src_buf_len);
// First try to copy as much as we can into the buffer
if remaining > 0 {
let copied = self.write_into_buffer(&src_buf);
src_buf = src_buf.slice(copied..);
}
// Then, if the buffer is full, flush it out
if self.buf.len() == Self::CAPACITY {
if let Err(e) = self.flush_buffer(ctx).await {
return (restore(src_buf), Err(e));
}
}
// Finally, write the tail of src_buf:
// If it wholly fits into the buffer without
// completely filling it, then put it there.
// If not, write it out directly.
let src_buf = if !src_buf.is_empty() {
assert_eq!(self.buf.len(), 0);
if src_buf.len() < Self::CAPACITY {
let copied = self.write_into_buffer(&src_buf);
// We just verified above that src_buf fits into our internal buffer.
assert_eq!(copied, src_buf.len());
restore(src_buf)
} else {
let (src_buf, res) = self
.write_all_unbuffered(FullSlice::must_new(src_buf), ctx)
.await;
if let Err(e) = res {
return (src_buf, Err(e));
}
src_buf
}
} else {
restore(src_buf)
};
(src_buf, Ok(()))
(src_buf, res)
}
/// Write a blob of data. Returns the offset that it was written to,
@@ -358,7 +285,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
&mut self,
srcbuf: FullSlice<Buf>,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<u64, Error>) {
) -> (FullSlice<Buf>, Result<u64, WriteBlobError>) {
let (buf, res) = self
.write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled)
.await;
@@ -372,7 +299,10 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
srcbuf: FullSlice<Buf>,
ctx: &RequestContext,
algorithm: ImageCompressionAlgorithm,
) -> (FullSlice<Buf>, Result<(u64, CompressionInfo), Error>) {
) -> (
FullSlice<Buf>,
Result<(u64, CompressionInfo), WriteBlobError>,
) {
let offset = self.offset;
let mut compression_info = CompressionInfo {
written_compressed: false,
@@ -388,14 +318,16 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
if len < 128 {
// Short blob. Write a 1-byte length header
io_buf.put_u8(len as u8);
(self.write_all(io_buf.slice_len(), ctx).await, srcbuf)
let (slice, res) = self.write_all(io_buf.slice_len(), ctx).await;
let res = res.map_err(WriteBlobError::Flush);
((slice, res), srcbuf)
} else {
// Write a 4-byte length header
if len > MAX_SUPPORTED_BLOB_LEN {
return (
(
io_buf.slice_len(),
Err(Error::other(format!("blob too large ({len} bytes)"))),
Err(WriteBlobError::BlobTooLarge { len }),
),
srcbuf,
);
@@ -429,7 +361,9 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
assert_eq!(len_buf[0] & 0xf0, 0);
len_buf[0] |= high_bit_mask;
io_buf.extend_from_slice(&len_buf[..]);
(self.write_all(io_buf.slice_len(), ctx).await, srcbuf)
let (slice, res) = self.write_all(io_buf.slice_len(), ctx).await;
let res = res.map_err(WriteBlobError::Flush);
((slice, res), srcbuf)
}
}
.await;
@@ -444,6 +378,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
} else {
self.write_all(srcbuf, ctx).await
};
let res = res.map_err(WriteBlobError::Flush);
(srcbuf, res.map(|_| (offset, compression_info)))
}
@@ -452,9 +387,12 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
&mut self,
raw_with_header: FullSlice<Buf>,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<u64, Error>) {
) -> (FullSlice<Buf>, Result<u64, WriteBlobError>) {
// Verify the header, to ensure we don't write invalid/corrupt data.
let header = match Header::decode(&raw_with_header) {
let header = match Header::decode(&raw_with_header)
.context("decoding blob header")
.map_err(WriteBlobError::WriteBlobRaw)
{
Ok(header) => header,
Err(err) => return (raw_with_header, Err(err)),
};
@@ -463,29 +401,26 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
let raw_len = raw_with_header.len();
return (
raw_with_header,
Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("header length mismatch: {header_total_len} != {raw_len}"),
)),
Err(WriteBlobError::WriteBlobRaw(anyhow::anyhow!(
"header length mismatch: {header_total_len} != {raw_len}"
))),
);
}
let offset = self.offset;
let (raw_with_header, result) = self.write_all(raw_with_header, ctx).await;
let result = result.map_err(WriteBlobError::Flush);
(raw_with_header, result.map(|_| offset))
}
}
impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
/// Finish this blob writer and return the underlying [`TempVirtualFile`].
///
/// If there is an internal buffer (depends on `BUFFERED`), it will
/// be flushed before this method returns.
pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<TempVirtualFile, Error> {
if BUFFERED {
self.flush_buffer(ctx).await?;
}
Ok(self.inner)
/// Finish this blob writer and return the underlying `W`.
pub async fn shutdown(
self,
mode: BufferedWriterShutdownMode,
ctx: &RequestContext,
) -> Result<W, FlushTaskError> {
let (_, file) = self.writer.shutdown(mode, ctx).await?;
Ok(file)
}
}
@@ -494,22 +429,25 @@ pub(crate) mod tests {
use camino::Utf8PathBuf;
use camino_tempfile::Utf8TempDir;
use rand::{Rng, SeedableRng};
use tracing::info_span;
use super::*;
use crate::context::DownloadBehavior;
use crate::task_mgr::TaskKind;
use crate::tenant::block_io::BlockReaderRef;
use crate::virtual_file;
use crate::virtual_file::TempVirtualFile;
use crate::virtual_file::VirtualFile;
async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
round_trip_test_compressed::<BUFFERED>(blobs, false).await
async fn round_trip_test(blobs: &[Vec<u8>]) -> anyhow::Result<()> {
round_trip_test_compressed(blobs, false).await
}
pub(crate) async fn write_maybe_compressed<const BUFFERED: bool>(
pub(crate) async fn write_maybe_compressed(
blobs: &[Vec<u8>],
compression: bool,
ctx: &RequestContext,
) -> Result<(Utf8TempDir, Utf8PathBuf, Vec<u64>), Error> {
) -> anyhow::Result<(Utf8TempDir, Utf8PathBuf, Vec<u64>)> {
let temp_dir = camino_tempfile::tempdir()?;
let pathbuf = temp_dir.path().join("file");
let gate = utils::sync::gate::Gate::default();
@@ -519,10 +457,18 @@ pub(crate) mod tests {
let mut offsets = Vec::new();
{
let file = TempVirtualFile::new(
VirtualFile::create(pathbuf.as_path(), ctx).await?,
gate.enter().unwrap(),
VirtualFile::open_with_options_v2(
pathbuf.as_path(),
virtual_file::OpenOptions::new()
.create_new(true)
.write(true),
ctx,
)
.await?,
gate.enter()?,
);
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0, &gate, cancel.clone(), ctx);
let mut wtr =
BlobWriter::new(file, 0, &gate, cancel.clone(), ctx, info_span!("test")).unwrap();
for blob in blobs.iter() {
let (_, res) = if compression {
let res = wtr
@@ -539,28 +485,28 @@ pub(crate) mod tests {
let offs = res?;
offsets.push(offs);
}
// Write out one page worth of zeros so that we can
// read again with read_blk
let (_, res) = wtr.write_blob(vec![0; PAGE_SZ].slice_len(), ctx).await;
let offs = res?;
println!("Writing final blob at offs={offs}");
let file = wtr.into_inner(ctx).await?;
file.disarm_into_inner();
}
let file = wtr
.shutdown(
BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
ctx,
)
.await?;
file.disarm_into_inner()
};
Ok((temp_dir, pathbuf, offsets))
}
async fn round_trip_test_compressed<const BUFFERED: bool>(
async fn round_trip_test_compressed(
blobs: &[Vec<u8>],
compression: bool,
) -> Result<(), Error> {
) -> anyhow::Result<()> {
let ctx =
RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
let (_temp_dir, pathbuf, offsets) =
write_maybe_compressed::<BUFFERED>(blobs, compression, &ctx).await?;
write_maybe_compressed(blobs, compression, &ctx).await?;
let file = VirtualFile::open(pathbuf, &ctx).await?;
println!("Done writing!");
let file = VirtualFile::open_v2(pathbuf, &ctx).await?;
let rdr = BlockReaderRef::VirtualFile(&file);
let rdr = BlockCursor::new_with_compression(rdr, compression);
for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
@@ -579,30 +525,27 @@ pub(crate) mod tests {
}
#[tokio::test]
async fn test_one() -> Result<(), Error> {
async fn test_one() -> anyhow::Result<()> {
let blobs = &[vec![12, 21, 22]];
round_trip_test::<false>(blobs).await?;
round_trip_test::<true>(blobs).await?;
round_trip_test(blobs).await?;
Ok(())
}
#[tokio::test]
async fn test_hello_simple() -> Result<(), Error> {
async fn test_hello_simple() -> anyhow::Result<()> {
let blobs = &[
vec![0, 1, 2, 3],
b"Hello, World!".to_vec(),
Vec::new(),
b"foobar".to_vec(),
];
round_trip_test::<false>(blobs).await?;
round_trip_test::<true>(blobs).await?;
round_trip_test_compressed::<false>(blobs, true).await?;
round_trip_test_compressed::<true>(blobs, true).await?;
round_trip_test(blobs).await?;
round_trip_test_compressed(blobs, true).await?;
Ok(())
}
#[tokio::test]
async fn test_really_big_array() -> Result<(), Error> {
async fn test_really_big_array() -> anyhow::Result<()> {
let blobs = &[
b"test".to_vec(),
random_array(10 * PAGE_SZ),
@@ -611,25 +554,22 @@ pub(crate) mod tests {
vec![0xf3; 24 * PAGE_SZ],
b"foobar".to_vec(),
];
round_trip_test::<false>(blobs).await?;
round_trip_test::<true>(blobs).await?;
round_trip_test_compressed::<false>(blobs, true).await?;
round_trip_test_compressed::<true>(blobs, true).await?;
round_trip_test(blobs).await?;
round_trip_test_compressed(blobs, true).await?;
Ok(())
}
#[tokio::test]
async fn test_arrays_inc() -> Result<(), Error> {
async fn test_arrays_inc() -> anyhow::Result<()> {
let blobs = (0..PAGE_SZ / 8)
.map(|v| random_array(v * 16))
.collect::<Vec<_>>();
round_trip_test::<false>(&blobs).await?;
round_trip_test::<true>(&blobs).await?;
round_trip_test(&blobs).await?;
Ok(())
}
#[tokio::test]
async fn test_arrays_random_size() -> Result<(), Error> {
async fn test_arrays_random_size() -> anyhow::Result<()> {
let mut rng = rand::rngs::StdRng::seed_from_u64(42);
let blobs = (0..1024)
.map(|_| {
@@ -641,20 +581,18 @@ pub(crate) mod tests {
random_array(sz.into())
})
.collect::<Vec<_>>();
round_trip_test::<false>(&blobs).await?;
round_trip_test::<true>(&blobs).await?;
round_trip_test(&blobs).await?;
Ok(())
}
#[tokio::test]
async fn test_arrays_page_boundary() -> Result<(), Error> {
async fn test_arrays_page_boundary() -> anyhow::Result<()> {
let blobs = &[
random_array(PAGE_SZ - 4),
random_array(PAGE_SZ - 4),
random_array(PAGE_SZ - 4),
];
round_trip_test::<false>(blobs).await?;
round_trip_test::<true>(blobs).await?;
round_trip_test(blobs).await?;
Ok(())
}
}

View File

@@ -4,14 +4,12 @@
use std::ops::Deref;
use bytes::Bytes;
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
use crate::context::RequestContext;
use crate::page_cache::{self, FileId, PAGE_SZ, PageReadGuard, PageWriteGuard, ReadBufResult};
#[cfg(test)]
use crate::virtual_file::IoBufferMut;
use crate::virtual_file::VirtualFile;
use crate::virtual_file::{IoBuffer, VirtualFile};
/// This is implemented by anything that can read 8 kB (PAGE_SZ)
/// blocks, using the page cache
@@ -247,17 +245,17 @@ pub trait BlockWriter {
/// 'buf' must be of size PAGE_SZ. Returns the block number the page was
/// written to.
///
fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error>;
fn write_blk(&mut self, buf: IoBuffer) -> Result<u32, std::io::Error>;
}
///
/// A simple in-memory buffer of blocks.
///
pub struct BlockBuf {
pub blocks: Vec<Bytes>,
pub blocks: Vec<IoBuffer>,
}
impl BlockWriter for BlockBuf {
fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error> {
fn write_blk(&mut self, buf: IoBuffer) -> Result<u32, std::io::Error> {
assert!(buf.len() == PAGE_SZ);
let blknum = self.blocks.len();
self.blocks.push(buf);

View File

@@ -25,7 +25,7 @@ use std::{io, result};
use async_stream::try_stream;
use byteorder::{BE, ReadBytesExt};
use bytes::{BufMut, Bytes, BytesMut};
use bytes::BufMut;
use either::Either;
use futures::{Stream, StreamExt};
use hex;
@@ -34,6 +34,7 @@ use tracing::error;
use crate::context::RequestContext;
use crate::tenant::block_io::{BlockReader, BlockWriter};
use crate::virtual_file::{IoBuffer, IoBufferMut, owned_buffers_io::write::Buffer};
// The maximum size of a value stored in the B-tree. 5 bytes is enough currently.
pub const VALUE_SZ: usize = 5;
@@ -787,12 +788,12 @@ impl<const L: usize> BuildNode<L> {
///
/// Serialize the node to on-disk format.
///
fn pack(&self) -> Bytes {
fn pack(&self) -> IoBuffer {
assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
assert!(self.num_children > 0);
let mut buf = BytesMut::new();
let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
buf.put_u16(self.num_children);
buf.put_u8(self.level);
@@ -805,7 +806,7 @@ impl<const L: usize> BuildNode<L> {
assert!(buf.len() == self.size);
assert!(buf.len() <= PAGE_SZ);
buf.resize(PAGE_SZ, 0);
buf.extend_with(0, PAGE_SZ - buf.len());
buf.freeze()
}
@@ -839,7 +840,7 @@ pub(crate) mod tests {
#[derive(Clone, Default)]
pub(crate) struct TestDisk {
blocks: Vec<Bytes>,
blocks: Vec<IoBuffer>,
}
impl TestDisk {
fn new() -> Self {
@@ -857,7 +858,7 @@ pub(crate) mod tests {
}
}
impl BlockWriter for &mut TestDisk {
fn write_blk(&mut self, buf: Bytes) -> io::Result<u32> {
fn write_blk(&mut self, buf: IoBuffer) -> io::Result<u32> {
let blknum = self.blocks.len();
self.blocks.push(buf);
Ok(blknum as u32)

View File

@@ -79,9 +79,9 @@ impl EphemeralFile {
VirtualFile::open_with_options_v2(
&filename,
virtual_file::OpenOptions::new()
.create_new(true)
.read(true)
.write(true)
.create(true),
.write(true),
ctx,
)
.await?,
@@ -98,6 +98,7 @@ impl EphemeralFile {
file: file.clone(),
buffered_writer: BufferedWriter::new(
file,
0,
|| IoBufferMut::with_capacity(TAIL_SZ),
gate.enter()?,
cancel.child_token(),
@@ -130,6 +131,14 @@ impl OwnedAsyncWriter for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter
> + Send {
self.inner.write_all_at(buf, offset, ctx)
}
fn set_len(
&self,
len: u64,
ctx: &RequestContext,
) -> impl Future<Output = std::io::Result<()>> + Send {
self.inner.set_len(len, ctx)
}
}
impl std::ops::Deref for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {

View File

@@ -91,9 +91,7 @@ pub async fn download_layer_file<'a>(
);
let temp_file = TempVirtualFile::new(
// Not _v2 yet which is sensitive to virtual_file_io_mode.
// That'll happen in PR https://github.com/neondatabase/neon/pull/11558
VirtualFile::open_with_options(
VirtualFile::open_with_options_v2(
&temp_file_path,
virtual_file::OpenOptions::new()
.create_new(true)
@@ -197,6 +195,7 @@ async fn download_object(
let dst_path = destination_file.path().to_owned();
let mut buffered = owned_buffers_io::write::BufferedWriter::<IoBufferMut, _>::new(
destination_file,
0,
|| IoBufferMut::with_capacity(super::BUFFER_SIZE),
gate.enter().map_err(|_| DownloadError::Cancelled)?,
cancel.child_token(),
@@ -219,10 +218,15 @@ async fn download_object(
FlushTaskError::Cancelled => DownloadError::Cancelled,
})?;
}
let inner = buffered.shutdown(ctx).await.map_err(|e| match e {
FlushTaskError::Cancelled => DownloadError::Cancelled,
})?;
Ok(inner)
buffered
.shutdown(
owned_buffers_io::write::BufferedWriterShutdownMode::PadThenTruncate,
ctx,
)
.await
.map_err(|e| match e {
FlushTaskError::Cancelled => DownloadError::Cancelled,
})
}
.await?;

View File

@@ -1521,12 +1521,11 @@ async fn load_heatmap(
path: &Utf8PathBuf,
ctx: &RequestContext,
) -> Result<Option<HeatMapTenant>, anyhow::Error> {
let mut file = match VirtualFile::open(path, ctx).await {
Ok(file) => file,
let st = match VirtualFile::read_to_string(path, ctx).await {
Ok(st) => st,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(e) => Err(e)?,
};
let st = file.read_to_string(ctx).await?;
let htm = serde_json::from_str(&st)?;
Ok(Some(htm))
}

View File

@@ -29,7 +29,6 @@
//!
use std::collections::{HashMap, VecDeque};
use std::fs::File;
use std::io::SeekFrom;
use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::str::FromStr;
@@ -52,6 +51,7 @@ use tokio_epoll_uring::IoBuf;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::bin_ser::BeSer;
use utils::bin_ser::SerializeError;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
@@ -75,7 +75,8 @@ use crate::tenant::vectored_blob_io::{
};
use crate::virtual_file::TempVirtualFile;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::virtual_file::owned_buffers_io::write::{Buffer, BufferedWriterShutdownMode};
use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
///
@@ -113,6 +114,15 @@ impl From<&DeltaLayer> for Summary {
}
impl Summary {
/// Serializes the summary header into an aligned buffer of lenth `PAGE_SZ`.
pub fn ser_into_page(&self) -> Result<IoBuffer, SerializeError> {
let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
Self::ser_into(self, &mut buf)?;
// Pad zeroes to the buffer so the length is a multiple of the alignment.
buf.extend_with(0, buf.capacity() - buf.len());
Ok(buf.freeze())
}
pub(super) fn expected(
tenant_id: TenantId,
timeline_id: TimelineId,
@@ -392,7 +402,7 @@ struct DeltaLayerWriterInner {
tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
blob_writer: BlobWriter<true>,
blob_writer: BlobWriter<TempVirtualFile>,
// Number of key-lsns in the layer.
num_keys: usize,
@@ -416,16 +426,29 @@ impl DeltaLayerWriterInner {
// Create the file initially with a temporary filename. We don't know
// the end key yet, so we cannot form the final filename yet. We will
// rename it when we're done.
//
// Note: This overwrites any existing file. There shouldn't be any.
// FIXME: throw an error instead?
let path =
DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
let file = TempVirtualFile::new(
VirtualFile::open_with_options_v2(
&path,
virtual_file::OpenOptions::new()
.create_new(true)
.write(true),
ctx,
)
.await?,
gate.enter()?,
);
let mut file = TempVirtualFile::new(VirtualFile::create(&path, ctx).await?, gate.enter()?);
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
// Start at PAGE_SZ, make room for the header block
let blob_writer = BlobWriter::new(
file,
PAGE_SZ as u64,
gate,
cancel,
ctx,
info_span!(parent: None, "delta_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path),
)?;
// Initialize the b-tree index builder
let block_buf = BlockBuf::new();
@@ -519,15 +542,24 @@ impl DeltaLayerWriterInner {
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
let mut file = self.blob_writer.into_inner(ctx).await?;
let file = self
.blob_writer
.shutdown(
BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
ctx,
)
.await?;
// Write out the index
let (index_root_blk, block_buf) = self.tree.finish()?;
file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
.await?;
let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
// TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
// Should we just replace BlockBuf::blocks with one big buffer
for buf in block_buf.blocks {
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
res?;
offset += PAGE_SZ as u64;
}
assert!(self.lsn_range.start < self.lsn_range.end);
// Fill in the summary on blk 0
@@ -542,11 +574,9 @@ impl DeltaLayerWriterInner {
index_root_blk,
};
let mut buf = Vec::with_capacity(PAGE_SZ);
// TODO: could use smallvec here but it's a pain with Slice<T>
Summary::ser_into(&summary, &mut buf)?;
file.seek(SeekFrom::Start(0)).await?;
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
// Writes summary at the first block (offset 0).
let buf = summary.ser_into_page()?;
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
res?;
let metadata = file
@@ -738,7 +768,7 @@ impl DeltaLayer {
where
F: Fn(Summary) -> Summary,
{
let mut file = VirtualFile::open_with_options(
let file = VirtualFile::open_with_options_v2(
path,
virtual_file::OpenOptions::new().read(true).write(true),
ctx,
@@ -755,11 +785,8 @@ impl DeltaLayer {
let new_summary = rewrite(actual_summary);
let mut buf = Vec::with_capacity(PAGE_SZ);
// TODO: could use smallvec here, but it's a pain with Slice<T>
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
file.seek(SeekFrom::Start(0)).await?;
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
let buf = new_summary.ser_into_page().context("serialize")?;
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
res?;
Ok(())
}
@@ -1415,6 +1442,19 @@ impl DeltaLayerInner {
}
pub fn iter<'a>(&'a self, ctx: &'a RequestContext) -> DeltaLayerIterator<'a> {
self.iter_with_options(
ctx,
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
1024, // The default value. Unit tests might use a different value
)
}
pub fn iter_with_options<'a>(
&'a self,
ctx: &'a RequestContext,
max_read_size: u64,
max_batch_size: usize,
) -> DeltaLayerIterator<'a> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
@@ -1424,10 +1464,7 @@ impl DeltaLayerInner {
index_iter: tree_reader.iter(&[0; DELTA_KEY_SIZE], ctx),
key_values_batch: std::collections::VecDeque::new(),
is_end: false,
planner: StreamingVectoredReadPlanner::new(
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
1024, // The default value. Unit tests might use a different value
),
planner: StreamingVectoredReadPlanner::new(max_read_size, max_batch_size),
}
}

View File

@@ -27,7 +27,6 @@
//! actual page images are stored in the "values" part.
use std::collections::{HashMap, VecDeque};
use std::fs::File;
use std::io::SeekFrom;
use std::ops::Range;
use std::os::unix::prelude::FileExt;
use std::str::FromStr;
@@ -50,6 +49,7 @@ use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::bin_ser::BeSer;
use utils::bin_ser::SerializeError;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
@@ -73,7 +73,8 @@ use crate::tenant::vectored_blob_io::{
};
use crate::virtual_file::TempVirtualFile;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::virtual_file::owned_buffers_io::write::{Buffer, BufferedWriterShutdownMode};
use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
///
@@ -112,6 +113,15 @@ impl From<&ImageLayer> for Summary {
}
impl Summary {
/// Serializes the summary header into an aligned buffer of lenth `PAGE_SZ`.
pub fn ser_into_page(&self) -> Result<IoBuffer, SerializeError> {
let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
Self::ser_into(self, &mut buf)?;
// Pad zeroes to the buffer so the length is a multiple of the alignment.
buf.extend_with(0, buf.capacity() - buf.len());
Ok(buf.freeze())
}
pub(super) fn expected(
tenant_id: TenantId,
timeline_id: TimelineId,
@@ -353,7 +363,7 @@ impl ImageLayer {
where
F: Fn(Summary) -> Summary,
{
let mut file = VirtualFile::open_with_options(
let file = VirtualFile::open_with_options_v2(
path,
virtual_file::OpenOptions::new().read(true).write(true),
ctx,
@@ -370,11 +380,8 @@ impl ImageLayer {
let new_summary = rewrite(actual_summary);
let mut buf = Vec::with_capacity(PAGE_SZ);
// TODO: could use smallvec here but it's a pain with Slice<T>
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
file.seek(SeekFrom::Start(0)).await?;
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
let buf = new_summary.ser_into_page().context("serialize")?;
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
res?;
Ok(())
}
@@ -678,6 +685,19 @@ impl ImageLayerInner {
}
pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
self.iter_with_options(
ctx,
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
1024, // The default value. Unit tests might use a different value
)
}
pub(crate) fn iter_with_options<'a>(
&'a self,
ctx: &'a RequestContext,
max_read_size: u64,
max_batch_size: usize,
) -> ImageLayerIterator<'a> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
@@ -687,10 +707,7 @@ impl ImageLayerInner {
index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx),
key_values_batch: VecDeque::new(),
is_end: false,
planner: StreamingVectoredReadPlanner::new(
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
1024, // The default value. Unit tests might use a different value
),
planner: StreamingVectoredReadPlanner::new(max_read_size, max_batch_size),
}
}
@@ -743,7 +760,7 @@ struct ImageLayerWriterInner {
// Number of keys in the layer.
num_keys: usize,
blob_writer: BlobWriter<false>,
blob_writer: BlobWriter<TempVirtualFile>,
tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
#[cfg(feature = "testing")]
@@ -777,20 +794,27 @@ impl ImageLayerWriterInner {
},
);
trace!("creating image layer {}", path);
let mut file = TempVirtualFile::new(
VirtualFile::open_with_options(
let file = TempVirtualFile::new(
VirtualFile::open_with_options_v2(
&path,
virtual_file::OpenOptions::new()
.write(true)
.create_new(true),
.create_new(true)
.write(true),
ctx,
)
.await?,
gate.enter()?,
);
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
// Start at `PAGE_SZ` to make room for the header block.
let blob_writer = BlobWriter::new(
file,
PAGE_SZ as u64,
gate,
cancel,
ctx,
info_span!(parent: None, "image_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path),
)?;
// Initialize the b-tree index builder
let block_buf = BlockBuf::new();
@@ -918,15 +942,24 @@ impl ImageLayerWriterInner {
crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
};
let mut file = self.blob_writer.into_inner(ctx).await?;
let file = self
.blob_writer
.shutdown(
BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
ctx,
)
.await?;
// Write out the index
file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
.await?;
let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
let (index_root_blk, block_buf) = self.tree.finish()?;
// TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
// Should we just replace BlockBuf::blocks with one big buffer?
for buf in block_buf.blocks {
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
res?;
offset += PAGE_SZ as u64;
}
let final_key_range = if let Some(end_key) = end_key {
@@ -947,11 +980,9 @@ impl ImageLayerWriterInner {
index_root_blk,
};
let mut buf = Vec::with_capacity(PAGE_SZ);
// TODO: could use smallvec here but it's a pain with Slice<T>
Summary::ser_into(&summary, &mut buf)?;
file.seek(SeekFrom::Start(0)).await?;
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
// Writes summary at the first block (offset 0).
let buf = summary.ser_into_page()?;
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
res?;
let metadata = file

View File

@@ -19,6 +19,7 @@ pub(crate) enum LayerRef<'a> {
}
impl<'a> LayerRef<'a> {
#[allow(dead_code)]
fn iter(self, ctx: &'a RequestContext) -> LayerIterRef<'a> {
match self {
Self::Image(x) => LayerIterRef::Image(x.iter(ctx)),
@@ -26,6 +27,22 @@ impl<'a> LayerRef<'a> {
}
}
fn iter_with_options(
self,
ctx: &'a RequestContext,
max_read_size: u64,
max_batch_size: usize,
) -> LayerIterRef<'a> {
match self {
Self::Image(x) => {
LayerIterRef::Image(x.iter_with_options(ctx, max_read_size, max_batch_size))
}
Self::Delta(x) => {
LayerIterRef::Delta(x.iter_with_options(ctx, max_read_size, max_batch_size))
}
}
}
fn layer_dbg_info(&self) -> String {
match self {
Self::Image(x) => x.layer_dbg_info(),
@@ -66,6 +83,8 @@ pub(crate) enum IteratorWrapper<'a> {
first_key_lower_bound: (Key, Lsn),
layer: LayerRef<'a>,
source_desc: Arc<PersistentLayerKey>,
max_read_size: u64,
max_batch_size: usize,
},
Loaded {
iter: PeekableLayerIterRef<'a>,
@@ -146,6 +165,8 @@ impl<'a> IteratorWrapper<'a> {
pub fn create_from_image_layer(
image_layer: &'a ImageLayerInner,
ctx: &'a RequestContext,
max_read_size: u64,
max_batch_size: usize,
) -> Self {
Self::NotLoaded {
layer: LayerRef::Image(image_layer),
@@ -157,12 +178,16 @@ impl<'a> IteratorWrapper<'a> {
is_delta: false,
}
.into(),
max_read_size,
max_batch_size,
}
}
pub fn create_from_delta_layer(
delta_layer: &'a DeltaLayerInner,
ctx: &'a RequestContext,
max_read_size: u64,
max_batch_size: usize,
) -> Self {
Self::NotLoaded {
layer: LayerRef::Delta(delta_layer),
@@ -174,6 +199,8 @@ impl<'a> IteratorWrapper<'a> {
is_delta: true,
}
.into(),
max_read_size,
max_batch_size,
}
}
@@ -204,11 +231,13 @@ impl<'a> IteratorWrapper<'a> {
first_key_lower_bound,
layer,
source_desc,
max_read_size,
max_batch_size,
} = self
else {
unreachable!()
};
let iter = layer.iter(ctx);
let iter = layer.iter_with_options(ctx, *max_read_size, *max_batch_size);
let iter = PeekableLayerIterRef::create(iter).await?;
if let Some((k1, l1, _)) = iter.peek() {
let (k2, l2) = first_key_lower_bound;
@@ -293,21 +322,41 @@ impl MergeIteratorItem for ((Key, Lsn, Value), Arc<PersistentLayerKey>) {
}
impl<'a> MergeIterator<'a> {
pub fn create_with_options(
deltas: &[&'a DeltaLayerInner],
images: &[&'a ImageLayerInner],
ctx: &'a RequestContext,
max_read_size: u64,
max_batch_size: usize,
) -> Self {
let mut heap = Vec::with_capacity(images.len() + deltas.len());
for image in images {
heap.push(IteratorWrapper::create_from_image_layer(
image,
ctx,
max_read_size,
max_batch_size,
));
}
for delta in deltas {
heap.push(IteratorWrapper::create_from_delta_layer(
delta,
ctx,
max_read_size,
max_batch_size,
));
}
Self {
heap: BinaryHeap::from(heap),
}
}
pub fn create(
deltas: &[&'a DeltaLayerInner],
images: &[&'a ImageLayerInner],
ctx: &'a RequestContext,
) -> Self {
let mut heap = Vec::with_capacity(images.len() + deltas.len());
for image in images {
heap.push(IteratorWrapper::create_from_image_layer(image, ctx));
}
for delta in deltas {
heap.push(IteratorWrapper::create_from_delta_layer(delta, ctx));
}
Self {
heap: BinaryHeap::from(heap),
}
Self::create_with_options(deltas, images, ctx, 1024 * 8192, 1024)
}
pub(crate) async fn next_inner<R: MergeIteratorItem>(&mut self) -> anyhow::Result<Option<R>> {

View File

@@ -2828,6 +2828,41 @@ impl Timeline {
Ok(())
}
/// Check if the memory usage is within the limit.
async fn check_memory_usage(
self: &Arc<Self>,
layer_selection: &[Layer],
) -> Result<(), CompactionError> {
let mut estimated_memory_usage_mb = 0.0;
let mut num_image_layers = 0;
let mut num_delta_layers = 0;
let target_layer_size_bytes = 256 * 1024 * 1024;
for layer in layer_selection {
let layer_desc = layer.layer_desc();
if layer_desc.is_delta() {
// Delta layers at most have 1MB buffer; 3x to make it safe (there're deltas as large as 16KB).
// Multiply the layer size so that tests can pass.
estimated_memory_usage_mb +=
3.0 * (layer_desc.file_size / target_layer_size_bytes) as f64;
num_delta_layers += 1;
} else {
// Image layers at most have 1MB buffer but it might be compressed; assume 5x compression ratio.
estimated_memory_usage_mb +=
5.0 * (layer_desc.file_size / target_layer_size_bytes) as f64;
num_image_layers += 1;
}
}
if estimated_memory_usage_mb > 1024.0 {
return Err(CompactionError::Other(anyhow!(
"estimated memory usage is too high: {}MB, giving up compaction; num_image_layers={}, num_delta_layers={}",
estimated_memory_usage_mb,
num_image_layers,
num_delta_layers
)));
}
Ok(())
}
/// Get a watermark for gc-compaction, that is the lowest LSN that we can use as the `gc_horizon` for
/// the compaction algorithm. It is min(space_cutoff, time_cutoff, latest_gc_cutoff, standby_horizon).
/// Leases and retain_lsns are considered in the gc-compaction job itself so we don't need to account for them
@@ -3264,6 +3299,17 @@ impl Timeline {
self.check_compaction_space(&job_desc.selected_layers)
.await?;
self.check_memory_usage(&job_desc.selected_layers).await?;
if job_desc.selected_layers.len() > 100
&& job_desc.rewrite_layers.len() as f64 >= job_desc.selected_layers.len() as f64 * 0.7
{
return Err(CompactionError::Other(anyhow!(
"too many layers to rewrite: {} / {}, giving up compaction",
job_desc.rewrite_layers.len(),
job_desc.selected_layers.len()
)));
}
// Generate statistics for the compaction
for layer in &job_desc.selected_layers {
let desc = layer.layer_desc();
@@ -3359,7 +3405,13 @@ impl Timeline {
.context("failed to collect gc compaction keyspace")
.map_err(CompactionError::Other)?;
let mut merge_iter = FilterIterator::create(
MergeIterator::create(&delta_layers, &image_layers, ctx),
MergeIterator::create_with_options(
&delta_layers,
&image_layers,
ctx,
128 * 8192, /* 1MB buffer for each of the inner iterators */
128,
),
dense_ks,
sparse_ks,
)

View File

@@ -507,7 +507,9 @@ impl<'a> VectoredBlobReader<'a> {
for (blob_start, meta) in blobs_at.iter().copied() {
let header_start = (blob_start - read.start) as usize;
let header = Header::decode(&buf[header_start..])?;
let header = Header::decode(&buf[header_start..]).map_err(|anyhow_err| {
std::io::Error::new(std::io::ErrorKind::InvalidData, anyhow_err)
})?;
let data_start = header_start + header.header_len;
let end = data_start + header.data_len;
let compression_bits = header.compression_bits;
@@ -662,7 +664,6 @@ impl StreamingVectoredReadPlanner {
#[cfg(test)]
mod tests {
use anyhow::Error;
use super::super::blob_io::tests::{random_array, write_maybe_compressed};
use super::*;
@@ -945,13 +946,16 @@ mod tests {
}
}
async fn round_trip_test_compressed(blobs: &[Vec<u8>], compression: bool) -> Result<(), Error> {
async fn round_trip_test_compressed(
blobs: &[Vec<u8>],
compression: bool,
) -> anyhow::Result<()> {
let ctx =
RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
let (_temp_dir, pathbuf, offsets) =
write_maybe_compressed::<true>(blobs, compression, &ctx).await?;
write_maybe_compressed(blobs, compression, &ctx).await?;
let file = VirtualFile::open(&pathbuf, &ctx).await?;
let file = VirtualFile::open_v2(&pathbuf, &ctx).await?;
let file_len = std::fs::metadata(&pathbuf)?.len();
// Multiply by two (compressed data might need more space), and add a few bytes for the header
@@ -997,7 +1001,7 @@ mod tests {
}
#[tokio::test]
async fn test_really_big_array() -> Result<(), Error> {
async fn test_really_big_array() -> anyhow::Result<()> {
let blobs = &[
b"test".to_vec(),
random_array(10 * PAGE_SZ),
@@ -1012,7 +1016,7 @@ mod tests {
}
#[tokio::test]
async fn test_arrays_inc() -> Result<(), Error> {
async fn test_arrays_inc() -> anyhow::Result<()> {
let blobs = (0..PAGE_SZ / 8)
.map(|v| random_array(v * 16))
.collect::<Vec<_>>();

View File

@@ -12,10 +12,11 @@
//! src/backend/storage/file/fd.c
//!
use std::fs::File;
use std::io::{Error, ErrorKind, Seek, SeekFrom};
use std::io::{Error, ErrorKind};
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
#[cfg(target_os = "linux")]
use std::os::unix::fs::OpenOptionsExt;
use std::sync::LazyLock;
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
use camino::{Utf8Path, Utf8PathBuf};
@@ -96,69 +97,38 @@ impl VirtualFile {
Self::open_with_options_v2(path.as_ref(), OpenOptions::new().read(true), ctx).await
}
pub async fn create<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
let inner = VirtualFileInner::create(path, ctx).await?;
Ok(VirtualFile {
inner,
_mode: IoMode::Buffered,
})
}
pub async fn create_v2<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
VirtualFile::open_with_options_v2(
path.as_ref(),
OpenOptions::new().write(true).create(true).truncate(true),
ctx,
)
.await
}
pub async fn open_with_options<P: AsRef<Utf8Path>>(
path: P,
open_options: &OpenOptions,
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
Ok(VirtualFile {
inner,
_mode: IoMode::Buffered,
})
}
pub async fn open_with_options_v2<P: AsRef<Utf8Path>>(
path: P,
open_options: &OpenOptions,
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
let file = match get_io_mode() {
IoMode::Buffered => {
let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
VirtualFile {
inner,
_mode: IoMode::Buffered,
}
}
let mode = get_io_mode();
let set_o_direct = match (mode, open_options.is_write()) {
(IoMode::Buffered, _) => false,
#[cfg(target_os = "linux")]
IoMode::Direct => {
let inner = VirtualFileInner::open_with_options(
path,
open_options.clone().custom_flags(nix::libc::O_DIRECT),
ctx,
)
.await?;
VirtualFile {
inner,
_mode: IoMode::Direct,
}
}
(IoMode::Direct, false) => true,
#[cfg(target_os = "linux")]
(IoMode::Direct, true) => false,
#[cfg(target_os = "linux")]
(IoMode::DirectRw, _) => true,
};
Ok(file)
let open_options = open_options.clone();
let open_options = if set_o_direct {
#[cfg(target_os = "linux")]
{
let mut open_options = open_options;
open_options.custom_flags(nix::libc::O_DIRECT);
open_options
}
#[cfg(not(target_os = "linux"))]
unreachable!(
"O_DIRECT is not supported on this platform, IoMode's that result in set_o_direct=true shouldn't even be defined"
);
} else {
open_options
};
let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
Ok(VirtualFile { inner, _mode: mode })
}
pub fn path(&self) -> &Utf8Path {
@@ -187,18 +157,14 @@ impl VirtualFile {
self.inner.sync_data().await
}
pub async fn set_len(&self, len: u64, ctx: &RequestContext) -> Result<(), Error> {
self.inner.set_len(len, ctx).await
}
pub async fn metadata(&self) -> Result<Metadata, Error> {
self.inner.metadata().await
}
pub fn remove(self) {
self.inner.remove();
}
pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
self.inner.seek(pos).await
}
pub async fn read_exact_at<Buf>(
&self,
slice: Slice<Buf>,
@@ -229,25 +195,31 @@ impl VirtualFile {
self.inner.write_all_at(buf, offset, ctx).await
}
pub async fn write_all<Buf: IoBuf + Send>(
&mut self,
buf: FullSlice<Buf>,
pub(crate) async fn read_to_string<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<usize, Error>) {
self.inner.write_all(buf, ctx).await
}
async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
self.inner.read_to_end(buf, ctx).await
}
pub(crate) async fn read_to_string(
&mut self,
ctx: &RequestContext,
) -> Result<String, anyhow::Error> {
) -> std::io::Result<String> {
let file = VirtualFile::open(path, ctx).await?; // TODO: open_v2
let mut buf = Vec::new();
self.read_to_end(&mut buf, ctx).await?;
Ok(String::from_utf8(buf)?)
let mut tmp = vec![0; 128];
let mut pos: u64 = 0;
loop {
let slice = tmp.slice(..128);
let (slice, res) = file.inner.read_at(slice, pos, ctx).await;
match res {
Ok(0) => break,
Ok(n) => {
pos += n as u64;
buf.extend_from_slice(&slice[..n]);
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
tmp = slice.into_inner();
}
String::from_utf8(buf).map_err(|_| {
std::io::Error::new(ErrorKind::InvalidData, "file contents are not valid UTF-8")
})
}
}
@@ -294,9 +266,6 @@ pub struct VirtualFileInner {
/// belongs to a different VirtualFile.
handle: RwLock<SlotHandle>,
/// Current file position
pos: u64,
/// File path and options to use to open it.
///
/// Note: this only contains the options needed to re-open it. For example,
@@ -561,21 +530,7 @@ impl VirtualFileInner {
path: P,
ctx: &RequestContext,
) -> Result<VirtualFileInner, std::io::Error> {
Self::open_with_options(path.as_ref(), OpenOptions::new().read(true), ctx).await
}
/// Create a new file for writing. If the file exists, it will be truncated.
/// Like File::create.
pub async fn create<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<VirtualFileInner, std::io::Error> {
Self::open_with_options(
path.as_ref(),
OpenOptions::new().write(true).create(true).truncate(true),
ctx,
)
.await
Self::open_with_options(path.as_ref(), OpenOptions::new().read(true).clone(), ctx).await
}
/// Open a file with given options.
@@ -585,7 +540,7 @@ impl VirtualFileInner {
/// on the first time. Make sure that's sane!
pub async fn open_with_options<P: AsRef<Utf8Path>>(
path: P,
open_options: &OpenOptions,
open_options: OpenOptions,
_ctx: &RequestContext,
) -> Result<VirtualFileInner, std::io::Error> {
let path = path.as_ref();
@@ -610,7 +565,6 @@ impl VirtualFileInner {
let vfile = VirtualFileInner {
handle: RwLock::new(handle),
pos: 0,
path: path.to_owned(),
open_options: reopen_options,
};
@@ -677,6 +631,13 @@ impl VirtualFileInner {
})
}
pub async fn set_len(&self, len: u64, _ctx: &RequestContext) -> Result<(), Error> {
with_file!(self, StorageIoOperation::SetLen, |file_guard| {
let (_file_guard, res) = io_engine::get().set_len(file_guard, len).await;
res.maybe_fatal_err("set_len")
})
}
/// Helper function internal to `VirtualFile` that looks up the underlying File,
/// opens it and evicts some other File if necessary. The passed parameter is
/// assumed to be a function available for the physical `File`.
@@ -744,38 +705,6 @@ impl VirtualFileInner {
})
}
pub fn remove(self) {
let path = self.path.clone();
drop(self);
std::fs::remove_file(path).expect("failed to remove the virtual file");
}
pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
match pos {
SeekFrom::Start(offset) => {
self.pos = offset;
}
SeekFrom::End(offset) => {
self.pos = with_file!(self, StorageIoOperation::Seek, |mut file_guard| file_guard
.with_std_file_mut(|std_file| std_file.seek(SeekFrom::End(offset))))?
}
SeekFrom::Current(offset) => {
let pos = self.pos as i128 + offset as i128;
if pos < 0 {
return Err(Error::new(
ErrorKind::InvalidInput,
"offset would be negative",
));
}
if pos > u64::MAX as i128 {
return Err(Error::new(ErrorKind::InvalidInput, "offset overflow"));
}
self.pos = pos as u64;
}
}
Ok(self.pos)
}
/// Read the file contents in range `offset..(offset + slice.bytes_total())` into `slice[0..slice.bytes_total()]`.
///
/// The returned `Slice<Buf>` is equivalent to the input `slice`, i.e., it's the same view into the same buffer.
@@ -859,59 +788,7 @@ impl VirtualFileInner {
(restore(buf), Ok(()))
}
/// Writes `buf` to the file at the current offset.
///
/// Panics if there is an uninitialized range in `buf`, as that is most likely a bug in the caller.
pub async fn write_all<Buf: IoBuf + Send>(
&mut self,
buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<usize, Error>) {
let buf = buf.into_raw_slice();
let bounds = buf.bounds();
let restore =
|buf: Slice<_>| FullSlice::must_new(Slice::from_buf_bounds(buf.into_inner(), bounds));
let nbytes = buf.len();
let mut buf = buf;
while !buf.is_empty() {
let (tmp, res) = self.write(FullSlice::must_new(buf), ctx).await;
buf = tmp.into_raw_slice();
match res {
Ok(0) => {
return (
restore(buf),
Err(Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
)),
);
}
Ok(n) => {
buf = buf.slice(n..);
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return (restore(buf), Err(e)),
}
}
(restore(buf), Ok(nbytes))
}
async fn write<B: IoBuf + Send>(
&mut self,
buf: FullSlice<B>,
ctx: &RequestContext,
) -> (FullSlice<B>, Result<usize, std::io::Error>) {
let pos = self.pos;
let (buf, res) = self.write_at(buf, pos, ctx).await;
let n = match res {
Ok(n) => n,
Err(e) => return (buf, Err(e)),
};
self.pos += n as u64;
(buf, Ok(n))
}
pub(crate) async fn read_at<Buf>(
pub(super) async fn read_at<Buf>(
&self,
buf: tokio_epoll_uring::Slice<Buf>,
offset: u64,
@@ -939,23 +816,11 @@ impl VirtualFileInner {
})
}
/// The function aborts the process if the error is fatal.
async fn write_at<B: IoBuf + Send>(
&self,
buf: FullSlice<B>,
offset: u64,
ctx: &RequestContext,
) -> (FullSlice<B>, Result<usize, Error>) {
let (slice, result) = self.write_at_inner(buf, offset, ctx).await;
let result = result.maybe_fatal_err("write_at");
(slice, result)
}
async fn write_at_inner<B: IoBuf + Send>(
&self,
buf: FullSlice<B>,
offset: u64,
ctx: &RequestContext,
) -> (FullSlice<B>, Result<usize, Error>) {
let file_guard = match self.lock_file().await {
Ok(file_guard) => file_guard,
@@ -964,30 +829,13 @@ impl VirtualFileInner {
observe_duration!(StorageIoOperation::Write, {
let ((_file_guard, buf), result) =
io_engine::get().write_at(file_guard, offset, buf).await;
let result = result.maybe_fatal_err("write_at");
if let Ok(size) = result {
ctx.io_size_metrics().write.add(size.into_u64());
}
(buf, result)
})
}
async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
let mut tmp = vec![0; 128];
loop {
let slice = tmp.slice(..128);
let (slice, res) = self.read_at(slice, self.pos, ctx).await;
match res {
Ok(0) => return Ok(()),
Ok(n) => {
self.pos += n as u64;
buf.extend_from_slice(&slice[..n]);
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
tmp = slice.into_inner();
}
}
}
// Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
@@ -1202,19 +1050,6 @@ impl FileGuard {
let _ = file.into_raw_fd();
res
}
/// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
fn with_std_file_mut<F, R>(&mut self, with: F) -> R
where
F: FnOnce(&mut File) -> R,
{
// SAFETY:
// - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
// - &mut usage below: `self` is `&mut`, hence this call is the only task/thread that has control over the underlying fd
let mut file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
let res = with(&mut file);
let _ = file.into_raw_fd();
res
}
}
impl tokio_epoll_uring::IoFd for FileGuard {
@@ -1304,6 +1139,9 @@ impl OwnedAsyncWriter for VirtualFile {
) -> (FullSlice<Buf>, std::io::Result<()>) {
VirtualFile::write_all_at(self, buf, offset, ctx).await
}
async fn set_len(&self, len: u64, ctx: &RequestContext) -> std::io::Result<()> {
VirtualFile::set_len(self, len, ctx).await
}
}
impl OpenFiles {
@@ -1368,8 +1206,7 @@ pub(crate) type IoBuffer = AlignedBuffer<ConstAlign<{ get_io_buffer_alignment()
pub(crate) type IoPageSlice<'a> =
AlignedSlice<'a, PAGE_SZ, ConstAlign<{ get_io_buffer_alignment() }>>;
static IO_MODE: once_cell::sync::Lazy<AtomicU8> =
once_cell::sync::Lazy::new(|| AtomicU8::new(IoMode::preferred() as u8));
static IO_MODE: LazyLock<AtomicU8> = LazyLock::new(|| AtomicU8::new(IoMode::preferred() as u8));
pub fn set_io_mode(mode: IoMode) {
IO_MODE.store(mode as u8, std::sync::atomic::Ordering::Relaxed);
@@ -1383,7 +1220,6 @@ static SYNC_MODE: AtomicU8 = AtomicU8::new(SyncMode::Sync as u8);
#[cfg(test)]
mod tests {
use std::io::Write;
use std::os::unix::fs::FileExt;
use std::sync::Arc;
@@ -1436,43 +1272,6 @@ mod tests {
MaybeVirtualFile::File(file) => file.write_all_at(&buf[..], offset),
}
}
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.seek(pos).await,
MaybeVirtualFile::File(file) => file.seek(pos),
}
}
async fn write_all<Buf: IoBuf + Send>(
&mut self,
buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => {
let (_buf, res) = file.write_all(buf, ctx).await;
res.map(|_| ())
}
MaybeVirtualFile::File(file) => file.write_all(&buf[..]),
}
}
// Helper function to slurp contents of a file, starting at the current position,
// into a string
async fn read_string(&mut self, ctx: &RequestContext) -> Result<String, Error> {
use std::io::Read;
let mut buf = String::new();
match self {
MaybeVirtualFile::VirtualFile(file) => {
let mut buf = Vec::new();
file.read_to_end(&mut buf, ctx).await?;
return Ok(String::from_utf8(buf).unwrap());
}
MaybeVirtualFile::File(file) => {
file.read_to_string(&mut buf)?;
}
}
Ok(buf)
}
// Helper function to slurp a portion of a file into a string
async fn read_string_at(
@@ -1508,7 +1307,7 @@ mod tests {
opts: OpenOptions,
ctx: &RequestContext,
) -> Result<MaybeVirtualFile, anyhow::Error> {
let vf = VirtualFile::open_with_options(&path, &opts, ctx).await?;
let vf = VirtualFile::open_with_options_v2(&path, &opts, ctx).await?;
Ok(MaybeVirtualFile::VirtualFile(vf))
}
}
@@ -1568,48 +1367,23 @@ mod tests {
.await?;
file_a
.write_all(b"foobar".to_vec().slice_len(), &ctx)
.write_all_at(IoBuffer::from(b"foobar").slice_len(), 0, &ctx)
.await?;
// cannot read from a file opened in write-only mode
let _ = file_a.read_string(&ctx).await.unwrap_err();
let _ = file_a.read_string_at(0, 1, &ctx).await.unwrap_err();
// Close the file and re-open for reading
let mut file_a = A::open(path_a, OpenOptions::new().read(true).to_owned(), &ctx).await?;
// cannot write to a file opened in read-only mode
let _ = file_a
.write_all(b"bar".to_vec().slice_len(), &ctx)
.write_all_at(IoBuffer::from(b"bar").slice_len(), 0, &ctx)
.await
.unwrap_err();
// Try simple read
assert_eq!("foobar", file_a.read_string(&ctx).await?);
// It's positioned at the EOF now.
assert_eq!("", file_a.read_string(&ctx).await?);
// Test seeks.
assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
assert_eq!("oobar", file_a.read_string(&ctx).await?);
assert_eq!(file_a.seek(SeekFrom::End(-2)).await?, 4);
assert_eq!("ar", file_a.read_string(&ctx).await?);
assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
assert_eq!(file_a.seek(SeekFrom::Current(2)).await?, 3);
assert_eq!("bar", file_a.read_string(&ctx).await?);
assert_eq!(file_a.seek(SeekFrom::Current(-5)).await?, 1);
assert_eq!("oobar", file_a.read_string(&ctx).await?);
// Test erroneous seeks to before byte 0
file_a.seek(SeekFrom::End(-7)).await.unwrap_err();
assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
file_a.seek(SeekFrom::Current(-2)).await.unwrap_err();
// the erroneous seek should have left the position unchanged
assert_eq!("oobar", file_a.read_string(&ctx).await?);
assert_eq!("foobar", file_a.read_string_at(0, 6, &ctx).await?);
// Create another test file, and try FileExt functions on it.
let path_b = testdir.join("file_b");
@@ -1635,9 +1409,6 @@ mod tests {
// Open a lot of files, enough to cause some evictions. (Or to be precise,
// open the same file many times. The effect is the same.)
//
// leave file_a positioned at offset 1 before we start
assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
let mut vfiles = Vec::new();
for _ in 0..100 {
@@ -1647,7 +1418,7 @@ mod tests {
&ctx,
)
.await?;
assert_eq!("FOOBAR", vfile.read_string(&ctx).await?);
assert_eq!("FOOBAR", vfile.read_string_at(0, 6, &ctx).await?);
vfiles.push(vfile);
}
@@ -1655,8 +1426,8 @@ mod tests {
assert!(vfiles.len() > TEST_MAX_FILE_DESCRIPTORS * 2);
// The underlying file descriptor for 'file_a' should be closed now. Try to read
// from it again. We left the file positioned at offset 1 above.
assert_eq!("oobar", file_a.read_string(&ctx).await?);
// from it again.
assert_eq!("foobar", file_a.read_string_at(0, 6, &ctx).await?);
// Check that all the other FDs still work too. Use them in random order for
// good measure.
@@ -1695,7 +1466,7 @@ mod tests {
for _ in 0..VIRTUAL_FILES {
let f = VirtualFileInner::open_with_options(
&test_file_path,
OpenOptions::new().read(true),
OpenOptions::new().read(true).clone(),
&ctx,
)
.await?;
@@ -1750,7 +1521,7 @@ mod tests {
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
let post = file.read_string(&ctx).await.unwrap();
let post = file.read_string_at(0, 3, &ctx).await.unwrap();
assert_eq!(post, "foo");
assert!(!tmp_path.exists());
drop(file);
@@ -1759,7 +1530,7 @@ mod tests {
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
let post = file.read_string(&ctx).await.unwrap();
let post = file.read_string_at(0, 3, &ctx).await.unwrap();
assert_eq!(post, "bar");
assert!(!tmp_path.exists());
drop(file);
@@ -1784,7 +1555,7 @@ mod tests {
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
let post = file.read_string(&ctx).await.unwrap();
let post = file.read_string_at(0, 3, &ctx).await.unwrap();
assert_eq!(post, "foo");
assert!(!tmp_path.exists());
drop(file);

View File

@@ -209,6 +209,27 @@ impl IoEngine {
}
}
}
pub(super) async fn set_len(
&self,
file_guard: FileGuard,
len: u64,
) -> (FileGuard, std::io::Result<()>) {
match self {
IoEngine::NotSet => panic!("not initialized"),
IoEngine::StdFs => {
let res = file_guard.with_std_file(|std_file| std_file.set_len(len));
(file_guard, res)
}
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
// TODO: ftruncate op for tokio-epoll-uring
let res = file_guard.with_std_file(|std_file| std_file.set_len(len));
(file_guard, res)
}
}
}
pub(super) async fn write_at<B: IoBuf + Send>(
&self,
file_guard: FileGuard,

View File

@@ -6,7 +6,12 @@ use std::path::Path;
use super::io_engine::IoEngine;
#[derive(Debug, Clone)]
pub enum OpenOptions {
pub struct OpenOptions {
write: bool,
inner: Inner,
}
#[derive(Debug, Clone)]
enum Inner {
StdFs(std::fs::OpenOptions),
#[cfg(target_os = "linux")]
TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions),
@@ -14,13 +19,17 @@ pub enum OpenOptions {
impl Default for OpenOptions {
fn default() -> Self {
match super::io_engine::get() {
let inner = match super::io_engine::get() {
IoEngine::NotSet => panic!("io engine not set"),
IoEngine::StdFs => Self::StdFs(std::fs::OpenOptions::new()),
IoEngine::StdFs => Inner::StdFs(std::fs::OpenOptions::new()),
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
Self::TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions::new())
Inner::TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions::new())
}
};
Self {
write: false,
inner,
}
}
}
@@ -30,13 +39,17 @@ impl OpenOptions {
Self::default()
}
pub(super) fn is_write(&self) -> bool {
self.write
}
pub fn read(&mut self, read: bool) -> &mut OpenOptions {
match self {
OpenOptions::StdFs(x) => {
match &mut self.inner {
Inner::StdFs(x) => {
let _ = x.read(read);
}
#[cfg(target_os = "linux")]
OpenOptions::TokioEpollUring(x) => {
Inner::TokioEpollUring(x) => {
let _ = x.read(read);
}
}
@@ -44,12 +57,13 @@ impl OpenOptions {
}
pub fn write(&mut self, write: bool) -> &mut OpenOptions {
match self {
OpenOptions::StdFs(x) => {
self.write = write;
match &mut self.inner {
Inner::StdFs(x) => {
let _ = x.write(write);
}
#[cfg(target_os = "linux")]
OpenOptions::TokioEpollUring(x) => {
Inner::TokioEpollUring(x) => {
let _ = x.write(write);
}
}
@@ -57,12 +71,12 @@ impl OpenOptions {
}
pub fn create(&mut self, create: bool) -> &mut OpenOptions {
match self {
OpenOptions::StdFs(x) => {
match &mut self.inner {
Inner::StdFs(x) => {
let _ = x.create(create);
}
#[cfg(target_os = "linux")]
OpenOptions::TokioEpollUring(x) => {
Inner::TokioEpollUring(x) => {
let _ = x.create(create);
}
}
@@ -70,12 +84,12 @@ impl OpenOptions {
}
pub fn create_new(&mut self, create_new: bool) -> &mut OpenOptions {
match self {
OpenOptions::StdFs(x) => {
match &mut self.inner {
Inner::StdFs(x) => {
let _ = x.create_new(create_new);
}
#[cfg(target_os = "linux")]
OpenOptions::TokioEpollUring(x) => {
Inner::TokioEpollUring(x) => {
let _ = x.create_new(create_new);
}
}
@@ -83,12 +97,12 @@ impl OpenOptions {
}
pub fn truncate(&mut self, truncate: bool) -> &mut OpenOptions {
match self {
OpenOptions::StdFs(x) => {
match &mut self.inner {
Inner::StdFs(x) => {
let _ = x.truncate(truncate);
}
#[cfg(target_os = "linux")]
OpenOptions::TokioEpollUring(x) => {
Inner::TokioEpollUring(x) => {
let _ = x.truncate(truncate);
}
}
@@ -96,10 +110,10 @@ impl OpenOptions {
}
pub(in crate::virtual_file) async fn open(&self, path: &Path) -> std::io::Result<OwnedFd> {
match self {
OpenOptions::StdFs(x) => x.open(path).map(|file| file.into()),
match &self.inner {
Inner::StdFs(x) => x.open(path).map(|file| file.into()),
#[cfg(target_os = "linux")]
OpenOptions::TokioEpollUring(x) => {
Inner::TokioEpollUring(x) => {
let system = super::io_engine::tokio_epoll_uring_ext::thread_local_system().await;
system.open(path, x).await.map_err(|e| match e {
tokio_epoll_uring::Error::Op(e) => e,
@@ -114,12 +128,12 @@ impl OpenOptions {
impl std::os::unix::prelude::OpenOptionsExt for OpenOptions {
fn mode(&mut self, mode: u32) -> &mut OpenOptions {
match self {
OpenOptions::StdFs(x) => {
match &mut self.inner {
Inner::StdFs(x) => {
let _ = x.mode(mode);
}
#[cfg(target_os = "linux")]
OpenOptions::TokioEpollUring(x) => {
Inner::TokioEpollUring(x) => {
let _ = x.mode(mode);
}
}
@@ -127,12 +141,12 @@ impl std::os::unix::prelude::OpenOptionsExt for OpenOptions {
}
fn custom_flags(&mut self, flags: i32) -> &mut OpenOptions {
match self {
OpenOptions::StdFs(x) => {
match &mut self.inner {
Inner::StdFs(x) => {
let _ = x.custom_flags(flags);
}
#[cfg(target_os = "linux")]
OpenOptions::TokioEpollUring(x) => {
Inner::TokioEpollUring(x) => {
let _ = x.custom_flags(flags);
}
}

View File

@@ -282,6 +282,17 @@ unsafe impl<A: Alignment> tokio_epoll_uring::IoBufMut for AlignedBufferMut<A> {
}
}
impl<A: Alignment> std::io::Write for AlignedBufferMut<A> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
#[cfg(test)]
mod tests {

View File

@@ -1,14 +1,19 @@
mod flush;
use bytes::BufMut;
pub(crate) use flush::FlushControl;
use flush::FlushHandle;
pub(crate) use flush::FlushTaskError;
use flush::ShutdownRequest;
use tokio_epoll_uring::IoBuf;
use tokio_util::sync::CancellationToken;
use tracing::trace;
use super::io_buf_aligned::IoBufAligned;
use super::io_buf_aligned::IoBufAlignedMut;
use super::io_buf_ext::{FullSlice, IoBufExt};
use crate::context::RequestContext;
use crate::virtual_file::UsizeIsU64;
use crate::virtual_file::{IoBuffer, IoBufferMut};
pub(crate) trait CheapCloneForRead {
@@ -33,12 +38,49 @@ pub trait OwnedAsyncWriter {
offset: u64,
ctx: &RequestContext,
) -> impl std::future::Future<Output = (FullSlice<Buf>, std::io::Result<()>)> + Send;
fn set_len(
&self,
len: u64,
ctx: &RequestContext,
) -> impl Future<Output = std::io::Result<()>> + Send;
}
/// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch
/// small writes into larger writes of size [`Buffer::cap`].
///
/// The buffer is flushed if and only if it is full ([`Buffer::pending`] == [`Buffer::cap`]).
/// This guarantees that writes to the filesystem happen
/// - at offsets that are multiples of [`Buffer::cap`]
/// - in lengths that are multiples of [`Buffer::cap`]
///
/// Above property is useful for Direct IO, where whatever the
/// effectively dominating disk-sector/filesystem-block/memory-page size
/// determines the requirements on
/// - the alignment of the pointer passed to the read/write operation
/// - the value of `count` (i.e., the length of the read/write operation)
/// which must be a multiple of the dominating sector/block/page size.
///
/// See [`BufferedWriter::shutdown`] / [`BufferedWriterShutdownMode`] for different
/// ways of dealing with the special case that the buffer is not full by the time
/// we are done writing.
///
/// The first flush to the underlying `W` happens at offset `start_offset` (arg of [`BufferedWriter::new`]).
/// The next flush is to offset `start_offset + Buffer::cap`. The one after at `start_offset + 2 * Buffer::cap` and so on.
///
/// TODO: decouple buffer capacity from alignment requirement.
/// Right now we assume [`Buffer::cap`] is the alignment requirement,
/// but actually [`Buffer::cap`] should only determine how often we flush
/// while writing, while a separate alignment requirement argument should
/// be passed to determine alignment requirement. This could be used by
/// [`BufferedWriterShutdownMode::PadThenTruncate`] to avoid excessive
/// padding of zeroes. For example, today, with a capacity of 64KiB, we
/// would pad up to 64KiB-1 bytes of zeroes, then truncate off 64KiB-1.
/// This is wasteful, e.g., if the alignment requirement is 4KiB, we only
/// need to pad & truncate up to 4KiB-1 bytes of zeroes
///
// TODO(yuchen): For large write, implementing buffer bypass for aligned parts of the write could be beneficial to throughput,
// since we would avoid copying majority of the data into the internal buffer.
// https://github.com/neondatabase/neon/issues/10101
pub struct BufferedWriter<B: Buffer, W> {
/// Clone of the buffer that was last submitted to the flush loop.
/// `None` if no flush request has been submitted, Some forever after.
@@ -60,9 +102,24 @@ pub struct BufferedWriter<B: Buffer, W> {
bytes_submitted: u64,
}
/// How [`BufferedWriter::shutdown`] should deal with pending (=not-yet-flushed) data.
///
/// Cf the [`BufferedWriter`] comment's paragraph for context on why we need to think about this.
pub enum BufferedWriterShutdownMode {
/// Drop pending data, don't write back to file.
DropTail,
/// Pad the pending data with zeroes (cf [`usize::next_multiple_of`]).
ZeroPadToNextMultiple(usize),
/// Fill the IO buffer with zeroes, flush to disk, the `ftruncate` the
/// file to the exact number of bytes written to [`Self`].
///
/// TODO: see in [`BufferedWriter`] comment about decoupling buffer capacity from alignment requirement.
PadThenTruncate,
}
impl<B, Buf, W> BufferedWriter<B, W>
where
B: Buffer<IoBuf = Buf> + Send + 'static,
B: IoBufAlignedMut + Buffer<IoBuf = Buf> + Send + 'static,
Buf: IoBufAligned + Send + Sync + CheapCloneForRead,
W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug,
{
@@ -71,6 +128,7 @@ where
/// The `buf_new` function provides a way to initialize the owned buffers used by this writer.
pub fn new(
writer: W,
start_offset: u64,
buf_new: impl Fn() -> B,
gate_guard: utils::sync::gate::GateGuard,
cancel: CancellationToken,
@@ -88,7 +146,7 @@ where
ctx.attached_child(),
flush_task_span,
),
bytes_submitted: 0,
bytes_submitted: start_offset,
}
}
@@ -109,18 +167,80 @@ where
}
#[cfg_attr(target_os = "macos", allow(dead_code))]
pub async fn shutdown(mut self, ctx: &RequestContext) -> Result<(u64, W), FlushTaskError> {
self.flush(ctx).await?;
pub async fn shutdown(
mut self,
mode: BufferedWriterShutdownMode,
ctx: &RequestContext,
) -> Result<(u64, W), FlushTaskError> {
let mut mutable = self.mutable.take().expect("must not use after an error");
let unpadded_pending = mutable.pending();
let final_len: u64;
let shutdown_req;
match mode {
BufferedWriterShutdownMode::DropTail => {
trace!(pending=%mutable.pending(), "dropping pending data");
drop(mutable);
final_len = self.bytes_submitted;
shutdown_req = ShutdownRequest { set_len: None };
}
BufferedWriterShutdownMode::ZeroPadToNextMultiple(next_multiple) => {
let len = mutable.pending();
let cap = mutable.cap();
assert!(
len <= cap,
"buffer impl ensures this, but let's check because the extend_with below would panic if we go beyond"
);
let padded_len = len.next_multiple_of(next_multiple);
assert!(
padded_len <= cap,
"caller specified a multiple that is larger than the buffer capacity"
);
let count = padded_len - len;
mutable.extend_with(0, count);
trace!(count, "padding with zeros");
self.mutable = Some(mutable);
final_len = self.bytes_submitted + padded_len.into_u64();
shutdown_req = ShutdownRequest { set_len: None };
}
BufferedWriterShutdownMode::PadThenTruncate => {
let len = mutable.pending();
let cap = mutable.cap();
// TODO: see struct comment TODO on decoupling buffer capacity from alignment requirement.
let alignment_requirement = cap;
assert!(len <= cap, "buffer impl should ensure this");
let padding_end_offset = len.next_multiple_of(alignment_requirement);
assert!(
padding_end_offset <= cap,
"{padding_end_offset} <= {cap} ({alignment_requirement})"
);
let count = padding_end_offset - len;
mutable.extend_with(0, count);
trace!(count, "padding with zeros");
self.mutable = Some(mutable);
final_len = self.bytes_submitted + len.into_u64();
shutdown_req = ShutdownRequest {
// Avoid set_len call if we didn't need to pad anything.
set_len: if count > 0 { Some(final_len) } else { None },
};
}
};
let padded_pending = self.mutable.as_ref().map(|b| b.pending());
trace!(unpadded_pending, padded_pending, "padding done");
if self.mutable.is_some() {
self.flush(ctx).await?;
}
let Self {
mutable: buf,
mutable: _,
maybe_flushed: _,
mut flush_handle,
bytes_submitted: bytes_amount,
bytes_submitted: _,
} = self;
let writer = flush_handle.shutdown().await?;
assert!(buf.is_some());
Ok((bytes_amount, writer))
let writer = flush_handle.shutdown(shutdown_req).await?;
Ok((final_len, writer))
}
#[cfg(test)]
@@ -224,6 +344,10 @@ pub trait Buffer {
/// panics if `other.len() > self.cap() - self.pending()`.
fn extend_from_slice(&mut self, other: &[u8]);
/// Add `count` bytes `val` into `self`.
/// Panics if `count > self.cap() - self.pending()`.
fn extend_with(&mut self, val: u8, count: usize);
/// Number of bytes in the buffer.
fn pending(&self) -> usize;
@@ -251,6 +375,14 @@ impl Buffer for IoBufferMut {
IoBufferMut::extend_from_slice(self, other);
}
fn extend_with(&mut self, val: u8, count: usize) {
if self.len() + count > self.cap() {
panic!("Buffer capacity exceeded");
}
IoBufferMut::put_bytes(self, val, count);
}
fn pending(&self) -> usize {
self.len()
}
@@ -273,26 +405,22 @@ impl Buffer for IoBufferMut {
mod tests {
use std::sync::Mutex;
use rstest::rstest;
use super::*;
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::TaskKind;
#[derive(Debug, PartialEq, Eq)]
enum Op {
Write { buf: Vec<u8>, offset: u64 },
SetLen { len: u64 },
}
#[derive(Default, Debug)]
struct RecorderWriter {
/// record bytes and write offsets.
writes: Mutex<Vec<(Vec<u8>, u64)>>,
}
impl RecorderWriter {
/// Gets recorded bytes and write offsets.
fn get_writes(&self) -> Vec<Vec<u8>> {
self.writes
.lock()
.unwrap()
.iter()
.map(|(buf, _)| buf.clone())
.collect()
}
recording: Mutex<Vec<Op>>,
}
impl OwnedAsyncWriter for RecorderWriter {
@@ -302,28 +430,42 @@ mod tests {
offset: u64,
_: &RequestContext,
) -> (FullSlice<Buf>, std::io::Result<()>) {
self.writes
.lock()
.unwrap()
.push((Vec::from(&buf[..]), offset));
self.recording.lock().unwrap().push(Op::Write {
buf: Vec::from(&buf[..]),
offset,
});
(buf, Ok(()))
}
async fn set_len(&self, len: u64, _ctx: &RequestContext) -> std::io::Result<()> {
self.recording.lock().unwrap().push(Op::SetLen { len });
Ok(())
}
}
fn test_ctx() -> RequestContext {
RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
}
#[rstest]
#[tokio::test]
async fn test_write_all_borrowed_always_goes_through_buffer() -> anyhow::Result<()> {
async fn test_write_all_borrowed_always_goes_through_buffer(
#[values(
BufferedWriterShutdownMode::DropTail,
BufferedWriterShutdownMode::ZeroPadToNextMultiple(2),
BufferedWriterShutdownMode::PadThenTruncate
)]
mode: BufferedWriterShutdownMode,
) -> anyhow::Result<()> {
let ctx = test_ctx();
let ctx = &ctx;
let recorder = RecorderWriter::default();
let gate = utils::sync::gate::Gate::default();
let cancel = CancellationToken::new();
let cap = 4;
let mut writer = BufferedWriter::<_, RecorderWriter>::new(
recorder,
|| IoBufferMut::with_capacity(2),
0,
|| IoBufferMut::with_capacity(cap),
gate.enter()?,
cancel,
ctx,
@@ -333,23 +475,89 @@ mod tests {
writer.write_buffered_borrowed(b"abc", ctx).await?;
writer.write_buffered_borrowed(b"", ctx).await?;
writer.write_buffered_borrowed(b"d", ctx).await?;
writer.write_buffered_borrowed(b"e", ctx).await?;
writer.write_buffered_borrowed(b"fg", ctx).await?;
writer.write_buffered_borrowed(b"hi", ctx).await?;
writer.write_buffered_borrowed(b"j", ctx).await?;
writer.write_buffered_borrowed(b"klmno", ctx).await?;
writer.write_buffered_borrowed(b"efg", ctx).await?;
writer.write_buffered_borrowed(b"hijklm", ctx).await?;
let (_, recorder) = writer.shutdown(ctx).await?;
assert_eq!(
recorder.get_writes(),
{
let expect: &[&[u8]] = &[b"ab", b"cd", b"ef", b"gh", b"ij", b"kl", b"mn", b"o"];
expect
let mut expect = {
[(0, b"abcd"), (4, b"efgh"), (8, b"ijkl")]
.into_iter()
.map(|(offset, v)| Op::Write {
offset,
buf: v[..].to_vec(),
})
.collect::<Vec<_>>()
};
let expect_next_offset = 12;
match &mode {
BufferedWriterShutdownMode::DropTail => (),
// We test the case with padding to next multiple of 2 so that it's different
// from the alignment requirement of 4 inferred from buffer capacity.
// See TODOs in the `BufferedWriter` struct comment on decoupling buffer capacity from alignment requirement.
BufferedWriterShutdownMode::ZeroPadToNextMultiple(2) => {
expect.push(Op::Write {
offset: expect_next_offset,
// it's legitimate for pad-to-next multiple 2 to be < alignment requirement 4 inferred from buffer capacity
buf: b"m\0".to_vec(),
});
}
.iter()
.map(|v| v[..].to_vec())
.collect::<Vec<_>>()
BufferedWriterShutdownMode::ZeroPadToNextMultiple(_) => unimplemented!(),
BufferedWriterShutdownMode::PadThenTruncate => {
expect.push(Op::Write {
offset: expect_next_offset,
buf: b"m\0\0\0".to_vec(),
});
expect.push(Op::SetLen { len: 13 });
}
}
let (_, recorder) = writer.shutdown(mode, ctx).await?;
assert_eq!(&*recorder.recording.lock().unwrap(), &expect);
Ok(())
}
#[tokio::test]
async fn test_set_len_is_skipped_if_not_needed() -> anyhow::Result<()> {
let ctx = test_ctx();
let ctx = &ctx;
let recorder = RecorderWriter::default();
let gate = utils::sync::gate::Gate::default();
let cancel = CancellationToken::new();
let cap = 4;
let mut writer = BufferedWriter::<_, RecorderWriter>::new(
recorder,
0,
|| IoBufferMut::with_capacity(cap),
gate.enter()?,
cancel,
ctx,
tracing::Span::none(),
);
// write a multiple of `cap`
writer.write_buffered_borrowed(b"abc", ctx).await?;
writer.write_buffered_borrowed(b"defgh", ctx).await?;
let (_, recorder) = writer
.shutdown(BufferedWriterShutdownMode::PadThenTruncate, ctx)
.await?;
let expect = {
[(0, b"abcd"), (4, b"efgh")]
.into_iter()
.map(|(offset, v)| Op::Write {
offset,
buf: v[..].to_vec(),
})
.collect::<Vec<_>>()
};
assert_eq!(
&*recorder.recording.lock().unwrap(),
&expect,
"set_len should not be called if the buffer is already aligned"
);
Ok(())
}
}

View File

@@ -1,7 +1,7 @@
use std::ops::ControlFlow;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, info, info_span, warn};
use tracing::{Instrument, info_span, warn};
use utils::sync::duplex;
use super::{Buffer, CheapCloneForRead, OwnedAsyncWriter};
@@ -18,7 +18,7 @@ pub struct FlushHandle<Buf, W> {
pub struct FlushHandleInner<Buf, W> {
/// A bi-directional channel that sends (buffer, offset) for writes,
/// and receives recyled buffer.
channel: duplex::mpsc::Duplex<FlushRequest<Buf>, FullSlice<Buf>>,
channel: duplex::mpsc::Duplex<Request<Buf>, FullSlice<Buf>>,
/// Join handle for the background flush task.
join_handle: tokio::task::JoinHandle<Result<W, FlushTaskError>>,
}
@@ -27,9 +27,27 @@ struct FlushRequest<Buf> {
slice: FullSlice<Buf>,
offset: u64,
#[cfg(test)]
ready_to_flush_rx: tokio::sync::oneshot::Receiver<()>,
ready_to_flush_rx: Option<tokio::sync::oneshot::Receiver<()>>,
#[cfg(test)]
done_flush_tx: tokio::sync::oneshot::Sender<()>,
done_flush_tx: Option<tokio::sync::oneshot::Sender<()>>,
}
pub struct ShutdownRequest {
pub set_len: Option<u64>,
}
enum Request<Buf> {
Flush(FlushRequest<Buf>),
Shutdown(ShutdownRequest),
}
impl<Buf> Request<Buf> {
fn op_str(&self) -> &'static str {
match self {
Request::Flush(_) => "flush",
Request::Shutdown(_) => "shutdown",
}
}
}
/// Constructs a request and a control object for a new flush operation.
@@ -51,8 +69,8 @@ fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>,
let request = FlushRequest {
slice,
offset,
ready_to_flush_rx,
done_flush_tx,
ready_to_flush_rx: Some(ready_to_flush_rx),
done_flush_tx: Some(done_flush_tx),
};
(request, control)
}
@@ -159,10 +177,7 @@ where
let (request, flush_control) = new_flush_op(slice, offset);
// Submits the buffer to the background task.
let submit = self.inner_mut().channel.send(request).await;
if submit.is_err() {
return self.handle_error().await;
}
self.send(Request::Flush(request)).await?;
// Wait for an available buffer from the background flush task.
// This is the BACKPRESSURE mechanism: if the flush task can't keep up,
@@ -174,15 +189,28 @@ where
Ok((recycled, flush_control))
}
/// Sends poison pill to flush task and waits for it to exit.
pub async fn shutdown(&mut self, req: ShutdownRequest) -> Result<W, FlushTaskError> {
self.send(Request::Shutdown(req)).await?;
self.wait().await
}
async fn send(&mut self, request: Request<Buf>) -> Result<(), FlushTaskError> {
let submit = self.inner_mut().channel.send(request).await;
if submit.is_err() {
return self.handle_error().await;
}
Ok(())
}
async fn handle_error<T>(&mut self) -> Result<T, FlushTaskError> {
Err(self
.shutdown()
.wait()
.await
.expect_err("flush task only disconnects duplex if it exits with an error"))
}
/// Cleans up the channel, join the flush task.
pub async fn shutdown(&mut self) -> Result<W, FlushTaskError> {
async fn wait(&mut self) -> Result<W, FlushTaskError> {
let handle = self
.inner
.take()
@@ -204,7 +232,7 @@ where
pub struct FlushBackgroundTask<Buf, W> {
/// A bi-directional channel that receives (buffer, offset) for writes,
/// and send back recycled buffer.
channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
channel: duplex::mpsc::Duplex<FullSlice<Buf>, Request<Buf>>,
/// A writter for persisting data to disk.
writer: W,
ctx: RequestContext,
@@ -226,7 +254,7 @@ where
{
/// Creates a new background flush task.
fn new(
channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
channel: duplex::mpsc::Duplex<FullSlice<Buf>, Request<Buf>>,
file: W,
gate_guard: utils::sync::gate::GateGuard,
cancel: CancellationToken,
@@ -245,15 +273,9 @@ where
async fn run(mut self) -> Result<W, FlushTaskError> {
// Exit condition: channel is closed and there is no remaining buffer to be flushed
while let Some(request) = self.channel.recv().await {
#[cfg(test)]
{
// In test, wait for control to signal that we are ready to flush.
if request.ready_to_flush_rx.await.is_err() {
tracing::debug!("control dropped");
}
}
let op_kind = request.op_str();
// Write slice to disk at `offset`.
// Perform the requested operation.
//
// Error handling happens according to the current policy of crashing
// on fatal IO errors and retrying in place otherwise (deeming all other errors retryable).
@@ -262,52 +284,112 @@ where
//
// TODO: use utils::backoff::retry once async closures are actually usable
//
let mut slice_storage = Some(request.slice);
let mut request_storage = Some(request);
for attempt in 1.. {
if self.cancel.is_cancelled() {
return Err(FlushTaskError::Cancelled);
}
let result = async {
if attempt > 1 {
info!("retrying flush");
}
let slice = slice_storage.take().expect(
let request: Request<Buf> = request_storage .take().expect(
"likely previous invocation of this future didn't get polled to completion",
);
// Don't cancel this write by doing tokio::select with self.cancel.cancelled().
match &request {
Request::Shutdown(ShutdownRequest { set_len: None }) => {
request_storage = Some(request);
return ControlFlow::Break(());
},
Request::Flush(_) | Request::Shutdown(ShutdownRequest { set_len: Some(_) }) => {
},
}
if attempt > 1 {
warn!(op=%request.op_str(), "retrying");
}
// borrows so we can async move the requests into async block while not moving these borrows here
let writer = &self.writer;
let request_storage = &mut request_storage;
let ctx = &self.ctx;
let io_fut = match request {
Request::Flush(FlushRequest { slice, offset, #[cfg(test)] ready_to_flush_rx, #[cfg(test)] done_flush_tx }) => futures::future::Either::Left(async move {
#[cfg(test)]
if let Some(ready_to_flush_rx) = ready_to_flush_rx {
{
// In test, wait for control to signal that we are ready to flush.
if ready_to_flush_rx.await.is_err() {
tracing::debug!("control dropped");
}
}
}
let (slice, res) = writer.write_all_at(slice, offset, ctx).await;
*request_storage = Some(Request::Flush(FlushRequest {
slice,
offset,
#[cfg(test)]
ready_to_flush_rx: None, // the contract is that we notify before first attempt
#[cfg(test)]
done_flush_tx
}));
res
}),
Request::Shutdown(ShutdownRequest { set_len }) => futures::future::Either::Right(async move {
let set_len = set_len.expect("we filter out the None case above");
let res = writer.set_len(set_len, ctx).await;
*request_storage = Some(Request::Shutdown(ShutdownRequest {
set_len: Some(set_len),
}));
res
}),
};
// Don't cancel the io_fut by doing tokio::select with self.cancel.cancelled().
// The underlying tokio-epoll-uring slot / kernel operation is still ongoing and occupies resources.
// If we retry indefinitely, we'll deplete those resources.
// Future: teach tokio-epoll-uring io_uring operation cancellation, but still,
// wait for cancelled ops to complete and discard their error.
let (slice, res) = self.writer.write_all_at(slice, request.offset, &self.ctx).await;
slice_storage = Some(slice);
let res = io_fut.await;
let res = res.maybe_fatal_err("owned_buffers_io flush");
let Err(err) = res else {
if attempt > 1 {
warn!(op=%op_kind, "retry succeeded");
}
return ControlFlow::Break(());
};
warn!(%err, "error flushing buffered writer buffer to disk, retrying after backoff");
utils::backoff::exponential_backoff(attempt, 1.0, 10.0, &self.cancel).await;
ControlFlow::Continue(())
}
.instrument(info_span!("flush_attempt", %attempt))
.instrument(info_span!("attempt", %attempt, %op_kind))
.await;
match result {
ControlFlow::Break(()) => break,
ControlFlow::Continue(()) => continue,
}
}
let slice = slice_storage.expect("loop must have run at least once");
let request = request_storage.expect("loop must have run at least once");
#[cfg(test)]
{
// In test, tell control we are done flushing buffer.
if request.done_flush_tx.send(()).is_err() {
tracing::debug!("control dropped");
let slice = match request {
Request::Flush(FlushRequest {
slice,
#[cfg(test)]
mut done_flush_tx,
..
}) => {
#[cfg(test)]
{
// In test, tell control we are done flushing buffer.
if done_flush_tx.take().expect("always Some").send(()).is_err() {
tracing::debug!("control dropped");
}
}
slice
}
}
Request::Shutdown(_) => {
// next iteration will observe recv() returning None
continue;
}
};
// Sends the buffer back to the handle for reuse. The handle is in charged of cleaning the buffer.
if self.channel.send(slice).await.is_err() {
let send_res = self.channel.send(slice).await;
if send_res.is_err() {
// Although channel is closed. Still need to finish flushing the remaining buffers.
continue;
}

View File

@@ -33,6 +33,10 @@ impl OwnedAsyncWriter for TempVirtualFile {
) -> impl std::future::Future<Output = (FullSlice<Buf>, std::io::Result<()>)> + Send {
VirtualFile::write_all_at(self, buf, offset, ctx)
}
async fn set_len(&self, len: u64, ctx: &RequestContext) -> std::io::Result<()> {
VirtualFile::set_len(self, len, ctx).await
}
}
impl Drop for TempVirtualFile {

18
poetry.lock generated
View File

@@ -1274,14 +1274,14 @@ files = [
[[package]]
name = "h11"
version = "0.14.0"
version = "0.16.0"
description = "A pure-Python, bring-your-own-I/O implementation of HTTP/1.1"
optional = false
python-versions = ">=3.7"
python-versions = ">=3.8"
groups = ["main"]
files = [
{file = "h11-0.14.0-py3-none-any.whl", hash = "sha256:e3fe4ac4b851c468cc8363d500db52c2ead036020723024a109d37346efaa761"},
{file = "h11-0.14.0.tar.gz", hash = "sha256:8f19fbbe99e72420ff35c00b27a34cb9937e902a8b810e2c88300c6f0a3b699d"},
{file = "h11-0.16.0-py3-none-any.whl", hash = "sha256:63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86"},
{file = "h11-0.16.0.tar.gz", hash = "sha256:4e35b956cf45792e4caa5885e69fba00bdbc6ffafbfa020300e549b208ee5ff1"},
]
[[package]]
@@ -1314,25 +1314,25 @@ files = [
[[package]]
name = "httpcore"
version = "1.0.3"
version = "1.0.9"
description = "A minimal low-level HTTP client."
optional = false
python-versions = ">=3.8"
groups = ["main"]
files = [
{file = "httpcore-1.0.3-py3-none-any.whl", hash = "sha256:9a6a501c3099307d9fd76ac244e08503427679b1e81ceb1d922485e2f2462ad2"},
{file = "httpcore-1.0.3.tar.gz", hash = "sha256:5c0f9546ad17dac4d0772b0808856eb616eb8b48ce94f49ed819fd6982a8a544"},
{file = "httpcore-1.0.9-py3-none-any.whl", hash = "sha256:2d400746a40668fc9dec9810239072b40b4484b640a8c38fd654a024c7a1bf55"},
{file = "httpcore-1.0.9.tar.gz", hash = "sha256:6e34463af53fd2ab5d807f399a9b45ea31c3dfa2276f15a2c3f00afff6e176e8"},
]
[package.dependencies]
certifi = "*"
h11 = ">=0.13,<0.15"
h11 = ">=0.16"
[package.extras]
asyncio = ["anyio (>=4.0,<5.0)"]
http2 = ["h2 (>=3,<5)"]
socks = ["socksio (==1.*)"]
trio = ["trio (>=0.22.0,<0.24.0)"]
trio = ["trio (>=0.22.0,<1.0)"]
[[package]]
name = "httpx"

View File

@@ -8,7 +8,7 @@ use crate::error::{ErrorKind, ReportableError, UserFacingError};
use crate::proxy::retry::CouldRetry;
/// A go-to error message which doesn't leak any detail.
pub(crate) const REQUEST_FAILED: &str = "Console request failed";
pub(crate) const REQUEST_FAILED: &str = "Control plane request failed";
/// Common console API error.
#[derive(Debug, Error)]

View File

@@ -72,6 +72,7 @@ impl HttpState {
neon_metrics: NeonMetrics::new(build_info),
allowlist_routes: &[
"/status",
"/live",
"/ready",
"/metrics",
"/profile/cpu",
@@ -1260,16 +1261,8 @@ async fn handle_step_down(req: Request<Body>) -> Result<Response<Body>, ApiError
ForwardOutcome::NotForwarded(req) => req,
};
// Spawn a background task: once we start stepping down, we must finish: if the client drops
// their request we should avoid stopping in some part-stepped-down state.
let handle = tokio::spawn(async move {
let state = get_state(&req);
state.service.step_down().await
});
let result = handle
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
let state = get_state(&req);
let result = state.service.step_down().await;
json_response(StatusCode::OK, result)
}
@@ -1401,6 +1394,8 @@ async fn handle_reconcile_all(req: Request<Body>) -> Result<Response<Body>, ApiE
}
/// Status endpoint is just used for checking that our HTTP listener is up
///
/// This serves as our k8s startup probe.
async fn handle_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => {
@@ -1412,6 +1407,30 @@ async fn handle_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
json_response(StatusCode::OK, ())
}
/// Liveness endpoint indicates that this storage controller is in a state
/// where it can fulfill it's responsibilties. Namely, startup has finished
/// and it is the current leader.
///
/// This serves as our k8s liveness probe.
async fn handle_live(req: Request<Body>) -> Result<Response<Body>, ApiError> {
let req = match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => {
return res;
}
ForwardOutcome::NotForwarded(req) => req,
};
let state = get_state(&req);
let live = state.service.startup_complete.is_ready()
&& state.service.get_leadership_status() == LeadershipStatus::Leader;
if live {
json_response(StatusCode::OK, ())
} else {
json_response(StatusCode::SERVICE_UNAVAILABLE, ())
}
}
/// Readiness endpoint indicates when we're done doing startup I/O (e.g. reconciling
/// with remote pageserver nodes). This is intended for use as a kubernetes readiness probe.
async fn handle_ready(req: Request<Body>) -> Result<Response<Body>, ApiError> {
@@ -1745,6 +1764,7 @@ async fn maybe_forward(req: Request<Body>) -> ForwardOutcome {
const NOT_FOR_FORWARD: &[&str] = &[
"/control/v1/step_down",
"/status",
"/live",
"/ready",
"/metrics",
"/profile/cpu",
@@ -1969,6 +1989,9 @@ pub fn make_router(
.get("/status", |r| {
named_request_span(r, handle_status, RequestName("status"))
})
.get("/live", |r| {
named_request_span(r, handle_live, RequestName("live"))
})
.get("/ready", |r| {
named_request_span(r, handle_ready, RequestName("ready"))
})

View File

@@ -43,6 +43,19 @@ impl Leadership {
&self,
) -> Result<(Option<ControllerPersistence>, Option<GlobalObservedState>)> {
let leader = self.current_leader().await?;
if leader.as_ref().map(|l| &l.address)
== self
.config
.address_for_peers
.as_ref()
.map(Uri::to_string)
.as_ref()
{
// We already are the current leader. This is a restart.
return Ok((leader, None));
}
let leader_step_down_state = if let Some(ref leader) = leader {
if self.config.start_as_candidate {
self.request_step_down(leader).await

View File

@@ -55,9 +55,12 @@ impl ResponseErrorMessageExt for reqwest::Response {
}
}
#[derive(Serialize, Deserialize, Debug, Default)]
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
pub(crate) struct GlobalObservedState(pub(crate) HashMap<TenantShardId, ObservedState>);
const STEP_DOWN_RETRIES: u32 = 8;
const STEP_DOWN_TIMEOUT: Duration = Duration::from_secs(1);
impl PeerClient {
pub(crate) fn new(http_client: reqwest::Client, uri: Uri, jwt: Option<String>) -> Self {
Self {
@@ -76,7 +79,7 @@ impl PeerClient {
req
};
let req = req.timeout(Duration::from_secs(2));
let req = req.timeout(STEP_DOWN_TIMEOUT);
let res = req
.send()
@@ -94,8 +97,7 @@ impl PeerClient {
}
/// Request the peer to step down and return its current observed state
/// All errors are retried with exponential backoff for a maximum of 4 attempts.
/// Assuming all retries are performed, the function times out after roughly 4 seconds.
/// All errors are re-tried
pub(crate) async fn step_down(
&self,
cancel: &CancellationToken,
@@ -104,7 +106,7 @@ impl PeerClient {
|| self.request_step_down(),
|_e| false,
2,
4,
STEP_DOWN_RETRIES,
"Send step down request",
cancel,
)

View File

@@ -133,6 +133,8 @@ pub(crate) enum DatabaseOperation {
InsertTimelineImport,
UpdateTimelineImport,
DeleteTimelineImport,
ListTimelineImports,
IsTenantImportingTimeline,
}
#[must_use]
@@ -1640,6 +1642,30 @@ impl Persistence {
.await
}
pub(crate) async fn list_timeline_imports(&self) -> DatabaseResult<Vec<TimelineImport>> {
use crate::schema::timeline_imports::dsl;
let persistent = self
.with_measured_conn(DatabaseOperation::ListTimelineImports, move |conn| {
Box::pin(async move {
let from_db: Vec<TimelineImportPersistence> =
dsl::timeline_imports.load(conn).await?;
Ok(from_db)
})
})
.await?;
let imports: Result<Vec<TimelineImport>, _> = persistent
.into_iter()
.map(TimelineImport::from_persistent)
.collect();
match imports {
Ok(ok) => Ok(ok.into_iter().collect()),
Err(err) => Err(DatabaseError::Logical(format!(
"failed to deserialize import: {err}"
))),
}
}
pub(crate) async fn delete_timeline_import(
&self,
tenant_id: TenantId,
@@ -1743,6 +1769,25 @@ impl Persistence {
})
.await
}
pub(crate) async fn is_tenant_importing_timeline(
&self,
tenant_id: TenantId,
) -> DatabaseResult<bool> {
use crate::schema::timeline_imports::dsl;
self.with_measured_conn(DatabaseOperation::IsTenantImportingTimeline, move |conn| {
Box::pin(async move {
let imports: i64 = dsl::timeline_imports
.filter(dsl::tenant_id.eq(tenant_id.to_string()))
.count()
.get_result(conn)
.await?;
Ok(imports > 0)
})
})
.await
}
}
pub(crate) fn load_certs() -> anyhow::Result<Arc<rustls::RootCertStore>> {

View File

@@ -11,7 +11,7 @@ use std::num::NonZeroU32;
use std::ops::{Deref, DerefMut};
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::{Arc, OnceLock};
use std::time::{Duration, Instant, SystemTime};
use anyhow::Context;
@@ -97,7 +97,9 @@ use crate::tenant_shard::{
ReconcileNeeded, ReconcileResult, ReconcileWaitError, ReconcilerStatus, ReconcilerWaiter,
ScheduleOptimization, ScheduleOptimizationAction, TenantShard,
};
use crate::timeline_import::{ShardImportStatuses, TimelineImport, UpcallClient};
use crate::timeline_import::{
ShardImportStatuses, TimelineImport, TimelineImportState, UpcallClient,
};
const WAITER_FILL_DRAIN_POLL_TIMEOUT: Duration = Duration::from_millis(500);
@@ -524,6 +526,9 @@ pub struct Service {
/// HTTP client with proper CA certs.
http_client: reqwest::Client,
/// Handle for the step down background task if one was ever requested
step_down_barrier: OnceLock<tokio::sync::watch::Receiver<Option<GlobalObservedState>>>,
}
impl From<ReconcileWaitError> for ApiError {
@@ -875,6 +880,40 @@ impl Service {
});
}
// Reconcile the timeline imports:
// 1. Mark each tenant shard of tenants with an importing timeline as importing.
// 2. Finalize the completed imports in the background. This handles the case where
// the previous storage controller instance shut down whilst finalizing imports.
let imports = self.persistence.list_timeline_imports().await;
match imports {
Ok(mut imports) => {
{
let mut locked = self.inner.write().unwrap();
for import in &imports {
locked
.tenants
.range_mut(TenantShardId::tenant_range(import.tenant_id))
.for_each(|(_id, shard)| {
shard.importing = TimelineImportState::Importing
});
}
}
imports.retain(|import| import.is_complete());
tokio::task::spawn({
let finalize_imports_self = self.clone();
async move {
finalize_imports_self
.finalize_timeline_imports(imports)
.await
}
});
}
Err(err) => {
tracing::error!("Could not retrieve completed imports from database: {err}");
}
}
tracing::info!(
"Startup complete, spawned {reconcile_tasks} reconciliation tasks ({shard_count} shards total)"
);
@@ -1745,6 +1784,7 @@ impl Service {
tenant_op_locks: Default::default(),
node_op_locks: Default::default(),
http_client,
step_down_barrier: Default::default(),
});
let result_task_this = this.clone();
@@ -3752,6 +3792,22 @@ impl Service {
failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock");
let is_import = create_req.is_import();
if is_import {
// Ensure that there is no split on-going.
// [`Self::tenant_shard_split`] holds the exclusive tenant lock
// for the duration of the split, but here we handle the case
// where we restarted and the split is being aborted.
let locked = self.inner.read().unwrap();
let splitting = locked
.tenants
.range(TenantShardId::tenant_range(tenant_id))
.any(|(_id, shard)| shard.splitting != SplitState::Idle);
if splitting {
return Err(ApiError::Conflict("Tenant is splitting shard".to_string()));
}
}
let timeline_info = self
.tenant_timeline_create_pageservers(tenant_id, create_req)
.await?;
@@ -3789,6 +3845,14 @@ impl Service {
.context("timeline import insert")
.map_err(ApiError::InternalServerError)?;
// Set the importing flag on the tenant shards
self.inner
.write()
.unwrap()
.tenants
.range_mut(TenantShardId::tenant_range(tenant_id))
.for_each(|(_id, shard)| shard.importing = TimelineImportState::Importing);
match inserted {
true => {
tracing::info!(%tenant_id, %timeline_id, "Inserted timeline import");
@@ -3865,13 +3929,10 @@ impl Service {
self: &Arc<Self>,
import: TimelineImport,
) -> anyhow::Result<()> {
// TODO(vlad): On start-up, load up the imports and notify cplane of the
// ones that have been completed. This assumes the new cplane API will
// be idempotent. If that's not possible, bang a flag in the database.
// https://github.com/neondatabase/neon/issues/11570
tracing::info!("Finalizing timeline import");
pausable_failpoint!("timeline-import-pre-cplane-notification");
let import_failed = import.completion_error().is_some();
if !import_failed {
@@ -3914,6 +3975,13 @@ impl Service {
tracing::warn!("Failed to delete timeline import entry from database: {err}");
}
self.inner
.write()
.unwrap()
.tenants
.range_mut(TenantShardId::tenant_range(import.tenant_id))
.for_each(|(_id, shard)| shard.importing = TimelineImportState::Idle);
// TODO(vlad): Timeline creations in import mode do not return a correct initdb lsn,
// so we can't create the timeline on the safekeepers. Fix by moving creation here.
// https://github.com/neondatabase/neon/issues/11569
@@ -3922,6 +3990,15 @@ impl Service {
Ok(())
}
async fn finalize_timeline_imports(self: &Arc<Self>, imports: Vec<TimelineImport>) {
futures::future::join_all(
imports
.into_iter()
.map(|import| self.finalize_timeline_import(import)),
)
.await;
}
async fn timeline_active_on_all_shards(
self: &Arc<Self>,
import: &TimelineImport,
@@ -4888,6 +4965,7 @@ impl Service {
is_reconciling: shard.reconciler.is_some(),
is_pending_compute_notification: shard.pending_compute_notification,
is_splitting: matches!(shard.splitting, SplitState::Splitting),
is_importing: shard.importing == TimelineImportState::Importing,
scheduling_policy: shard.get_scheduling_policy(),
preferred_az_id: shard.preferred_az().map(ToString::to_string),
})
@@ -5378,6 +5456,27 @@ impl Service {
.enter()
.map_err(|_| ApiError::ShuttingDown)?;
// Timeline imports on the pageserver side can't handle shard-splits.
// If the tenant is importing a timeline, dont't shard split it.
match self
.persistence
.is_tenant_importing_timeline(tenant_id)
.await
{
Ok(importing) => {
if importing {
return Err(ApiError::Conflict(
"Cannot shard split during timeline import".to_string(),
));
}
}
Err(err) => {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Failed to check for running imports: {err}"
)));
}
}
let new_shard_count = ShardCount::new(split_req.new_shard_count);
let new_stripe_size = split_req.new_stripe_size;
@@ -8050,12 +8149,25 @@ impl Service {
candidates.extend(size_candidates);
}
// Filter out tenants in a prohibiting scheduling mode.
// Filter out tenants in a prohibiting scheduling modes
// and tenants with an ongoing import.
//
// Note that the import check here is oportunistic. An import might start
// after the check before we actually update [`TenantShard::splitting`].
// [`Self::tenant_shard_split`] checks the database whilst holding the exclusive
// tenant lock. Imports might take a long time, so the check here allows us
// to split something else instead of trying the same shard over and over.
{
let state = self.inner.read().unwrap();
candidates.retain(|i| {
let policy = state.tenants.get(&i.id).map(|s| s.get_scheduling_policy());
policy == Some(ShardSchedulingPolicy::Active)
let shard = state.tenants.get(&i.id);
match shard {
Some(t) => {
t.get_scheduling_policy() == ShardSchedulingPolicy::Active
&& t.importing == TimelineImportState::Idle
}
None => false,
}
});
}
@@ -8886,27 +8998,59 @@ impl Service {
self.inner.read().unwrap().get_leadership_status()
}
pub(crate) async fn step_down(&self) -> GlobalObservedState {
/// Handler for step down requests
///
/// Step down runs in separate task since once it's called it should
/// be driven to completion. Subsequent requests will wait on the same
/// step down task.
pub(crate) async fn step_down(self: &Arc<Self>) -> GlobalObservedState {
let handle = self.step_down_barrier.get_or_init(|| {
let step_down_self = self.clone();
let (tx, rx) = tokio::sync::watch::channel::<Option<GlobalObservedState>>(None);
tokio::spawn(async move {
let state = step_down_self.step_down_task().await;
tx.send(Some(state))
.expect("Task Arc<Service> keeps receiver alive");
});
rx
});
handle
.clone()
.wait_for(|observed_state| observed_state.is_some())
.await
.expect("Task Arc<Service> keeps sender alive")
.deref()
.clone()
.expect("Checked above")
}
async fn step_down_task(&self) -> GlobalObservedState {
tracing::info!("Received step down request from peer");
failpoint_support::sleep_millis_async!("sleep-on-step-down-handling");
self.inner.write().unwrap().step_down();
// Wait for reconciliations to stop, or terminate this process if they
// fail to stop in time (this indicates a bug in shutdown)
tokio::select! {
_ = self.stop_reconciliations(StopReconciliationsReason::SteppingDown) => {
tracing::info!("Reconciliations stopped, proceeding with step down");
}
_ = async {
failpoint_support::sleep_millis_async!("step-down-delay-timeout");
tokio::time::sleep(Duration::from_secs(10)).await
} => {
tracing::warn!("Step down timed out while waiting for reconciliation gate, terminating process");
let stop_reconciliations =
self.stop_reconciliations(StopReconciliationsReason::SteppingDown);
let mut stop_reconciliations = std::pin::pin!(stop_reconciliations);
// The caller may proceed to act as leader when it sees this request fail: reduce the chance
// of a split-brain situation by terminating this controller instead of leaving it up in a partially-shut-down state.
std::process::exit(1);
let started_at = Instant::now();
// Wait for reconciliations to stop and warn if that's taking a long time
loop {
tokio::select! {
_ = &mut stop_reconciliations => {
tracing::info!("Reconciliations stopped, proceeding with step down");
break;
}
_ = tokio::time::sleep(Duration::from_secs(10)) => {
tracing::warn!(
elapsed_sec=%started_at.elapsed().as_secs(),
"Stopping reconciliations during step down is taking too long"
);
}
}
}

View File

@@ -33,6 +33,7 @@ use crate::scheduler::{
RefCountUpdate, ScheduleContext, ScheduleError, Scheduler, SecondaryShardTag, ShardTag,
};
use crate::service::ReconcileResultRequest;
use crate::timeline_import::TimelineImportState;
use crate::{Sequence, service};
/// Serialization helper
@@ -100,6 +101,10 @@ pub(crate) struct TenantShard {
/// reconciliation, and timeline creation.
pub(crate) splitting: SplitState,
/// Flag indicating whether the tenant has an in-progress timeline import.
/// Used to disallow shard splits while an import is in progress.
pub(crate) importing: TimelineImportState,
/// If a tenant was enqueued for later reconcile due to hitting concurrency limit, this flag
/// is set. This flag is cleared when the tenant is popped off the delay queue.
pub(crate) delayed_reconcile: bool,
@@ -583,6 +588,7 @@ impl TenantShard {
config: TenantConfig::default(),
reconciler: None,
splitting: SplitState::Idle,
importing: TimelineImportState::Idle,
sequence: Sequence(1),
delayed_reconcile: false,
waiter: Arc::new(SeqWait::new(Sequence(0))),
@@ -1844,6 +1850,8 @@ impl TenantShard {
config: serde_json::from_str(&tsp.config).unwrap(),
reconciler: None,
splitting: tsp.splitting,
// Filled in during [`Service::startup_reconcile`]
importing: TimelineImportState::Idle,
waiter: Arc::new(SeqWait::new(Sequence::initial())),
error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
last_error: Arc::default(),

View File

@@ -14,6 +14,12 @@ use utils::{
use crate::{persistence::TimelineImportPersistence, service::Config};
#[derive(Deserialize, Serialize, PartialEq, Eq)]
pub(crate) enum TimelineImportState {
Importing,
Idle,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub(crate) struct ShardImportStatuses(pub(crate) HashMap<ShardIndex, ShardImportStatus>);
@@ -103,7 +109,7 @@ impl TimelineImport {
let crnt = occ.get_mut();
if *crnt == status {
Ok(TimelineImportUpdateFollowUp::None)
} else if crnt.is_terminal() && !status.is_terminal() {
} else if crnt.is_terminal() && *crnt != status {
Err(TimelineImportUpdateError::UnexpectedUpdate)
} else {
*crnt = status;

View File

@@ -16,4 +16,5 @@ pytest_plugins = (
"fixtures.slow",
"fixtures.reruns",
"fixtures.fast_import",
"fixtures.pg_config",
)

View File

@@ -100,6 +100,29 @@ class MetricsGetter:
return result
def get_metric_sum(
self, names: list[str], filter: dict[str, str] | None = None, absence_ok: bool = False
) -> float:
"""
Fetch all metrics matching `names` and `filter`, and sum their values
"""
metrics = self.get_metrics()
samples = []
for name in names:
samples.extend(metrics.query_all(name, filter=filter))
found = False
result = 0.0
for sample in samples:
result += sample.value
found = True
if not found and not absence_ok:
log.info(f"Metrics found: {metrics.metrics}")
raise RuntimeError(f"could not find any metrics matching {names}, {filter}")
return result
def parse_metrics(text: str, name: str = "") -> Metrics:
metrics = Metrics(name)

View File

@@ -975,7 +975,56 @@ class NeonEnvBuilder:
traceback: TracebackType | None,
):
# Stop all the nodes.
bytes_written: int = 0
getpage_requests: int = 0
if self.env:
log.info("Checking for lots of I/O in tests that shouldn't")
sk_bytes_written: float = 0
if self.env.safekeepers[0].running:
try:
_sk_bytes_written = (
self.env.safekeepers[0]
.http_client()
.get_metric_value("safekeeper_write_wal_bytes_sum")
)
except requests.exceptions.ConnectionError:
_sk_bytes_written = 0
if _sk_bytes_written is not None:
sk_bytes_written = int(_sk_bytes_written)
ps_bytes_written: float = 0
for pageserver in self.env.pageservers:
if pageserver.running:
try:
_tmp_bytes_written = pageserver.http_client().get_metric_sum(
["pageserver_io_operations_bytes_total"],
{"operation": "write"},
absence_ok=True,
)
except requests.exceptions.ConnectionError:
_tmp_bytes_written = 0
if _tmp_bytes_written is not None:
ps_bytes_written += int(_tmp_bytes_written)
try:
_tmp_getpage = pageserver.http_client().get_metric_value(
"pageserver_smgr_query_started_global_count_total",
{"smgr_query_type": "get_page_at_lsn"},
)
except requests.exceptions.ConnectionError:
_tmp_getpage = 0
if _tmp_getpage is not None:
getpage_requests += int(_tmp_getpage)
assert ps_bytes_written is not None
log.info(f"Bytes written: SK {sk_bytes_written}, PS {ps_bytes_written}")
log.info(f"GetPage@LSN requests: {getpage_requests}")
bytes_written = int(max(ps_bytes_written, sk_bytes_written))
log.info("Cleaning up all storage and compute nodes")
self.env.stop(
immediate=False,
@@ -1038,6 +1087,31 @@ class NeonEnvBuilder:
if cleanup_error is not None:
cleanup_error = e
if (
os.environ.get("BUILD_TYPE") == "debug"
and bytes_written
and bytes_written > 512 * 1024 * 1024
):
raise RuntimeError(
f"This test wrote too much data in debug mode: {bytes_written} bytes"
)
elif bytes_written > 1024 * 1024 * 1024:
raise RuntimeError(
f"This test wrote too much data in release mode: {bytes_written} bytes"
)
else:
log.info(f"This test wrote {bytes_written} bytes")
# Fail tests that do more than 100MB of GetPage@LSN requests in debug mode
if os.environ.get("BUILD_TYPE") == "debug" and getpage_requests > 12800:
raise RuntimeError(
f"This test read too much data from pageservers in debug mode: {getpage_requests * 8192} bytes"
)
elif getpage_requests > 128000:
raise RuntimeError(
f"This test read too much data from pageservers in release mode: {getpage_requests * 8192} bytes"
)
class NeonEnv:
"""
@@ -1291,7 +1365,11 @@ class NeonEnv:
ps_cfg[key] = value
if self.pageserver_virtual_file_io_mode is not None:
ps_cfg["virtual_file_io_mode"] = self.pageserver_virtual_file_io_mode
# TODO(christian): https://github.com/neondatabase/neon/issues/11598
if not config.test_may_use_compatibility_snapshot_binaries:
ps_cfg["virtual_file_io_mode"] = self.pageserver_virtual_file_io_mode
else:
log.info("ignoring virtual_file_io_mode parametrization for compatibility test")
if self.pageserver_wal_receiver_protocol is not None:
key, value = PageserverWalReceiverProtocol.to_config_key_value(
@@ -1450,6 +1528,7 @@ class NeonEnv:
for sk in self.safekeepers:
sk.stop(immediate=immediate)
for pageserver in self.pageservers:
if ps_assert_metric_no_errors:
try:
@@ -3380,6 +3459,9 @@ class VanillaPostgres(PgProtocol):
"""Return size of pgdatadir subdirectory in bytes."""
return get_dir_size(self.pgdatadir / subdir)
def is_running(self) -> bool:
return self.running
def __enter__(self) -> Self:
return self

View File

@@ -0,0 +1,249 @@
from __future__ import annotations
import shlex
from enum import StrEnum
from pathlib import Path
from typing import TYPE_CHECKING, cast, final
import pytest
if TYPE_CHECKING:
from collections.abc import Iterator
from typing import IO
from fixtures.neon_fixtures import PgBin
@final
class PgConfigKey(StrEnum):
BINDIR = "BINDIR"
DOCDIR = "DOCDIR"
HTMLDIR = "HTMLDIR"
INCLUDEDIR = "INCLUDEDIR"
PKGINCLUDEDIR = "PKGINCLUDEDIR"
INCLUDEDIR_SERVER = "INCLUDEDIR-SERVER"
LIBDIR = "LIBDIR"
PKGLIBDIR = "PKGLIBDIR"
LOCALEDIR = "LOCALEDIR"
MANDIR = "MANDIR"
SHAREDIR = "SHAREDIR"
SYSCONFDIR = "SYSCONFDIR"
PGXS = "PGXS"
CONFIGURE = "CONFIGURE"
CC = "CC"
CPPFLAGS = "CPPFLAGS"
CFLAGS = "CFLAGS"
CFLAGS_SL = "CFLAGS_SL"
LDFLAGS = "LDFLAGS"
LDFLAGS_EX = "LDFLAGS_EX"
LDFLAGS_SL = "LDFLAGS_SL"
LIBS = "LIBS"
VERSION = "VERSION"
if TYPE_CHECKING:
# TODO: This could become a TypedDict if Python ever allows StrEnums to be
# keys.
PgConfig = dict[PgConfigKey, str | Path | list[str]]
def __get_pg_config(pg_bin: PgBin) -> PgConfig:
"""Get pg_config values by invoking the command"""
cmd = pg_bin.run_nonblocking(["pg_config"])
cmd.wait()
if cmd.returncode != 0:
pytest.exit("")
assert cmd.stdout
stdout = cast("IO[str]", cmd.stdout)
# Parse the output into a dictionary
values: PgConfig = {}
for line in stdout.readlines():
if "=" in line:
key, value = line.split("=", 1)
value = value.strip()
match PgConfigKey(key.strip()):
case (
(
PgConfigKey.CC
| PgConfigKey.CPPFLAGS
| PgConfigKey.CFLAGS
| PgConfigKey.CFLAGS_SL
| PgConfigKey.LDFLAGS
| PgConfigKey.LDFLAGS_EX
| PgConfigKey.LDFLAGS_SL
| PgConfigKey.LIBS
) as k
):
values[k] = shlex.split(value)
case (
(
PgConfigKey.BINDIR
| PgConfigKey.DOCDIR
| PgConfigKey.HTMLDIR
| PgConfigKey.INCLUDEDIR
| PgConfigKey.PKGINCLUDEDIR
| PgConfigKey.INCLUDEDIR_SERVER
| PgConfigKey.LIBDIR
| PgConfigKey.PKGLIBDIR
| PgConfigKey.LOCALEDIR
| PgConfigKey.MANDIR
| PgConfigKey.SHAREDIR
| PgConfigKey.SYSCONFDIR
| PgConfigKey.PGXS
) as k
):
values[k] = Path(value)
case _ as k:
values[k] = value
return values
@pytest.fixture(scope="function")
def pg_config(pg_bin: PgBin) -> Iterator[PgConfig]:
"""Dictionary of all pg_config values from the system"""
yield __get_pg_config(pg_bin)
@pytest.fixture(scope="function")
def pg_config_bindir(pg_config: PgConfig) -> Iterator[Path]:
"""BINDIR value from pg_config"""
yield cast("Path", pg_config[PgConfigKey.BINDIR])
@pytest.fixture(scope="function")
def pg_config_docdir(pg_config: PgConfig) -> Iterator[Path]:
"""DOCDIR value from pg_config"""
yield cast("Path", pg_config[PgConfigKey.DOCDIR])
@pytest.fixture(scope="function")
def pg_config_htmldir(pg_config: PgConfig) -> Iterator[Path]:
"""HTMLDIR value from pg_config"""
yield cast("Path", pg_config[PgConfigKey.HTMLDIR])
@pytest.fixture(scope="function")
def pg_config_includedir(
pg_config: dict[PgConfigKey, str | Path | list[str]],
) -> Iterator[Path]:
"""INCLUDEDIR value from pg_config"""
yield cast("Path", pg_config[PgConfigKey.INCLUDEDIR])
@pytest.fixture(scope="function")
def pg_config_pkgincludedir(pg_config: PgConfig) -> Iterator[Path]:
"""PKGINCLUDEDIR value from pg_config"""
yield cast("Path", pg_config[PgConfigKey.PKGINCLUDEDIR])
@pytest.fixture(scope="function")
def pg_config_includedir_server(pg_config: PgConfig) -> Iterator[Path]:
"""INCLUDEDIR-SERVER value from pg_config"""
yield cast("Path", pg_config[PgConfigKey.INCLUDEDIR_SERVER])
@pytest.fixture(scope="function")
def pg_config_libdir(pg_config: PgConfig) -> Iterator[Path]:
"""LIBDIR value from pg_config"""
yield cast("Path", pg_config[PgConfigKey.LIBDIR])
@pytest.fixture(scope="function")
def pg_config_pkglibdir(pg_config: PgConfig) -> Iterator[Path]:
"""PKGLIBDIR value from pg_config"""
yield cast("Path", pg_config[PgConfigKey.PKGLIBDIR])
@pytest.fixture(scope="function")
def pg_config_localedir(pg_config: PgConfig) -> Iterator[Path]:
"""LOCALEDIR value from pg_config"""
yield cast("Path", pg_config[PgConfigKey.LOCALEDIR])
@pytest.fixture(scope="function")
def pg_config_mandir(pg_config: PgConfig) -> Iterator[Path]:
"""MANDIR value from pg_config"""
yield cast("Path", pg_config[PgConfigKey.MANDIR])
@pytest.fixture(scope="function")
def pg_config_sharedir(pg_config: PgConfig) -> Iterator[Path]:
"""SHAREDIR value from pg_config"""
yield cast("Path", pg_config[PgConfigKey.SHAREDIR])
@pytest.fixture(scope="function")
def pg_config_sysconfdir(pg_config: PgConfig) -> Iterator[Path]:
"""SYSCONFDIR value from pg_config"""
yield cast("Path", pg_config[PgConfigKey.SYSCONFDIR])
@pytest.fixture(scope="function")
def pg_config_pgxs(pg_config: PgConfig) -> Iterator[Path]:
"""PGXS value from pg_config"""
yield cast("Path", pg_config[PgConfigKey.PGXS])
@pytest.fixture(scope="function")
def pg_config_configure(pg_config: PgConfig) -> Iterator[str]:
"""CONFIGURE value from pg_config"""
yield cast("str", pg_config[PgConfigKey.CONFIGURE])
@pytest.fixture(scope="function")
def pg_config_cc(pg_config: PgConfig) -> Iterator[list[str]]:
"""CC value from pg_config"""
yield cast("list[str]", pg_config[PgConfigKey.CC])
@pytest.fixture(scope="function")
def pg_config_cppflags(pg_config: PgConfig) -> Iterator[list[str]]:
"""CPPFLAGS value from pg_config"""
yield cast("list[str]", pg_config[PgConfigKey.CPPFLAGS])
@pytest.fixture(scope="function")
def pg_config_cflags(pg_config: PgConfig) -> Iterator[list[str]]:
"""CFLAGS value from pg_config"""
yield cast("list[str]", pg_config[PgConfigKey.CFLAGS])
@pytest.fixture(scope="function")
def pg_config_cflags_sl(pg_config: PgConfig) -> Iterator[list[str]]:
"""CFLAGS_SL value from pg_config"""
yield cast("list[str]", pg_config[PgConfigKey.CFLAGS_SL])
@pytest.fixture(scope="function")
def pg_config_ldflags(pg_config: PgConfig) -> Iterator[list[str]]:
"""LDFLAGS value from pg_config"""
yield cast("list[str]", pg_config[PgConfigKey.LDFLAGS])
@pytest.fixture(scope="function")
def pg_config_ldflags_ex(pg_config: PgConfig) -> Iterator[list[str]]:
"""LDFLAGS_EX value from pg_config"""
yield cast("list[str]", pg_config[PgConfigKey.LDFLAGS_EX])
@pytest.fixture(scope="function")
def pg_config_ldflags_sl(pg_config: PgConfig) -> Iterator[list[str]]:
"""LDFLAGS_SL value from pg_config"""
yield cast("list[str]", pg_config[PgConfigKey.LDFLAGS_SL])
@pytest.fixture(scope="function")
def pg_config_libs(pg_config: PgConfig) -> Iterator[list[str]]:
"""LIBS value from pg_config"""
yield cast("list[str]", pg_config[PgConfigKey.LIBS])
@pytest.fixture(scope="function")
def pg_config_version(pg_config: PgConfig) -> Iterator[str]:
"""VERSION value from pg_config"""
yield cast("str", pg_config[PgConfigKey.VERSION])

View File

@@ -323,6 +323,7 @@ class NeonProject:
if self.restart_pgbench_on_console_errors and (
"ERROR: Couldn't connect to compute node" in err
or "ERROR: Console request failed" in err
or "ERROR: Control plane request failed" in err
):
log.info("Restarting benchmark for %s", target)
self.benchmarks.pop(target)

View File

@@ -1,12 +0,0 @@
\echo Use "CREATE EXTENSION test_extension" to load this file. \quit
CREATE SCHEMA test_extension;
CREATE FUNCTION test_extension.motd()
RETURNS void
IMMUTABLE LEAKPROOF PARALLEL SAFE
AS $$
BEGIN
RAISE NOTICE 'Have a great day';
END;
$$ LANGUAGE 'plpgsql';

View File

@@ -1,6 +1,6 @@
\echo Use "ALTER EXTENSION test_extension UPDATE TO '1.1'" to load this file. \quit
\echo Use "ALTER EXTENSION test_extension_sql_only UPDATE TO '1.1'" to load this file. \quit
CREATE FUNCTION test_extension.fun_fact()
CREATE FUNCTION test_extension_sql_only.fun_fact()
RETURNS void
IMMUTABLE LEAKPROOF PARALLEL SAFE
AS $$

View File

@@ -0,0 +1,12 @@
\echo Use "CREATE EXTENSION test_extension_sql_only" to load this file. \quit
CREATE SCHEMA test_extension_sql_only;
CREATE FUNCTION test_extension_sql_only.motd()
RETURNS void
IMMUTABLE LEAKPROOF PARALLEL SAFE
AS $$
BEGIN
RAISE NOTICE 'Have a great day';
END;
$$ LANGUAGE 'plpgsql';

View File

@@ -0,0 +1 @@
comment = 'Test extension SQL only'

View File

@@ -0,0 +1,6 @@
\echo Use "ALTER EXTENSION test_extension_with_lib UPDATE TO '1.1'" to load this file. \quit
CREATE FUNCTION test_extension_with_lib.fun_fact()
RETURNS void
IMMUTABLE LEAKPROOF PARALLEL SAFE
AS 'MODULE_PATHNAME', 'fun_fact' LANGUAGE C;

View File

@@ -0,0 +1,8 @@
\echo Use "CREATE EXTENSION test_extension_with_lib" to load this file. \quit
CREATE SCHEMA test_extension_with_lib;
CREATE FUNCTION test_extension_with_lib.motd()
RETURNS void
IMMUTABLE LEAKPROOF PARALLEL SAFE
AS 'MODULE_PATHNAME', 'motd' LANGUAGE C;

View File

@@ -0,0 +1,34 @@
#include <postgres.h>
#include <fmgr.h>
PG_MODULE_MAGIC;
PG_FUNCTION_INFO_V1(motd);
PG_FUNCTION_INFO_V1(fun_fact);
/* Old versions of Postgres didn't pre-declare this in fmgr.h */
#if PG_MAJORVERSION_NUM <= 15
void _PG_init(void);
#endif
void
_PG_init(void)
{
}
Datum
motd(PG_FUNCTION_ARGS)
{
elog(NOTICE, "Have a great day");
PG_RETURN_VOID();
}
Datum
fun_fact(PG_FUNCTION_ARGS)
{
elog(NOTICE, "Neon has a melting point of -246.08 C");
PG_RETURN_VOID();
}

View File

@@ -0,0 +1,2 @@
comment = 'Test extension with lib'
module_pathname = '$libdir/test_extension_with_lib'

View File

@@ -0,0 +1,70 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from fixtures.metrics import parse_metrics
from fixtures.utils import wait_until
if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnv
def test_compute_monitor(neon_simple_env: NeonEnv):
"""
Test that compute_ctl can detect Postgres going down (unresponsive) and
reconnect when it comes back online. Also check that the downtime metrics
are properly emitted.
"""
TEST_DB = "test_compute_monitor"
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
# Check that default postgres database is present
with endpoint.cursor() as cursor:
cursor.execute("SELECT datname FROM pg_database WHERE datname = 'postgres'")
catalog_db = cursor.fetchone()
assert catalog_db is not None
assert len(catalog_db) == 1
# Create a new database
cursor.execute(f"CREATE DATABASE {TEST_DB}")
# Drop database 'postgres'
with endpoint.cursor(dbname=TEST_DB) as cursor:
# Use FORCE to terminate all connections to the database
cursor.execute("DROP DATABASE postgres WITH (FORCE)")
client = endpoint.http_client()
def check_metrics_down():
raw_metrics = client.metrics()
metrics = parse_metrics(raw_metrics)
compute_pg_current_downtime_ms = metrics.query_all("compute_pg_current_downtime_ms")
assert len(compute_pg_current_downtime_ms) == 1
assert compute_pg_current_downtime_ms[0].value > 0
compute_pg_downtime_ms_total = metrics.query_all("compute_pg_downtime_ms_total")
assert len(compute_pg_downtime_ms_total) == 1
assert compute_pg_downtime_ms_total[0].value > 0
wait_until(check_metrics_down)
# Recreate postgres database
with endpoint.cursor(dbname=TEST_DB) as cursor:
cursor.execute("CREATE DATABASE postgres")
# Current downtime should reset to 0, but not total downtime
def check_metrics_up():
raw_metrics = client.metrics()
metrics = parse_metrics(raw_metrics)
compute_pg_current_downtime_ms = metrics.query_all("compute_pg_current_downtime_ms")
assert len(compute_pg_current_downtime_ms) == 1
assert compute_pg_current_downtime_ms[0].value == 0
compute_pg_downtime_ms_total = metrics.query_all("compute_pg_downtime_ms_total")
assert len(compute_pg_downtime_ms_total) == 1
assert compute_pg_downtime_ms_total[0].value > 0
wait_until(check_metrics_up)
# Just a sanity check that we log the downtime info
endpoint.log_contains("downtime_info")

View File

@@ -4,12 +4,17 @@ import os
import platform
import shutil
import tarfile
from typing import TYPE_CHECKING
from enum import StrEnum
from pathlib import Path
from typing import TYPE_CHECKING, cast, final
import pytest
import zstandard
from fixtures.log_helper import log
from fixtures.metrics import parse_metrics
from fixtures.paths import BASE_DIR
from fixtures.pg_config import PgConfigKey
from fixtures.utils import subprocess_capture
from werkzeug.wrappers.response import Response
if TYPE_CHECKING:
@@ -20,6 +25,7 @@ if TYPE_CHECKING:
from fixtures.neon_fixtures import (
NeonEnvBuilder,
)
from fixtures.pg_config import PgConfig
from fixtures.pg_version import PgVersion
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
@@ -46,46 +52,108 @@ def neon_env_builder_local(
return neon_env_builder
@final
class RemoteExtension(StrEnum):
SQL_ONLY = "test_extension_sql_only"
WITH_LIB = "test_extension_with_lib"
@property
def compressed_tarball_name(self) -> str:
return f"{self.tarball_name}.zst"
@property
def control_file_name(self) -> str:
return f"{self}.control"
@property
def directory(self) -> Path:
return BASE_DIR / "test_runner" / "regress" / "data" / "test_remote_extensions" / self
@property
def shared_library_name(self) -> str:
return f"{self}.so"
@property
def tarball_name(self) -> str:
return f"{self}.tar"
def archive_route(self, build_tag: str, arch: str, pg_version: PgVersion) -> str:
return f"{build_tag}/{arch}/v{pg_version}/extensions/{self.compressed_tarball_name}"
def build(self, pg_config: PgConfig, output_dir: Path) -> None:
if self is not RemoteExtension.WITH_LIB:
return
cmd: list[str] = [
*cast("list[str]", pg_config[PgConfigKey.CC]),
*cast("list[str]", pg_config[PgConfigKey.CPPFLAGS]),
*["-I", str(cast("Path", pg_config[PgConfigKey.INCLUDEDIR_SERVER]))],
*cast("list[str]", pg_config[PgConfigKey.CFLAGS]),
*cast("list[str]", pg_config[PgConfigKey.CFLAGS_SL]),
*cast("list[str]", pg_config[PgConfigKey.LDFLAGS_EX]),
*cast("list[str]", pg_config[PgConfigKey.LDFLAGS_SL]),
"-shared",
*["-o", str(output_dir / self.shared_library_name)],
str(self.directory / "src" / f"{self}.c"),
]
subprocess_capture(output_dir, cmd, check=True)
def control_file_contents(self) -> str:
with open(self.directory / self.control_file_name, encoding="utf-8") as f:
return f.read()
def files(self, output_dir: Path) -> dict[Path, str]:
files = {
# self.directory / self.control_file_name: f"share/extension/{self.control_file_name}",
self.directory / "sql" / f"{self}--1.0.sql": f"share/extension/{self}--1.0.sql",
self.directory
/ "sql"
/ f"{self}--1.0--1.1.sql": f"share/extension/{self}--1.0--1.1.sql",
}
if self is RemoteExtension.WITH_LIB:
files[output_dir / self.shared_library_name] = f"lib/{self.shared_library_name}"
return files
def package(self, output_dir: Path) -> Path:
tarball = output_dir / self.tarball_name
with tarfile.open(tarball, "x") as tarf:
for file, arcname in self.files(output_dir).items():
tarf.add(file, arcname=arcname)
return tarball
def remove(self, output_dir: Path, pg_version: PgVersion) -> None:
for file in self.files(output_dir).values():
if file.startswith("share/extension"):
file = f"share/postgresql/extension/{os.path.basename(file)}"
if file.startswith("lib"):
file = f"lib/postgresql/{os.path.basename(file)}"
(output_dir / "pg_install" / f"v{pg_version}" / file).unlink()
@pytest.mark.parametrize(
"extension",
(RemoteExtension.SQL_ONLY, RemoteExtension.WITH_LIB),
ids=["sql_only", "with_lib"],
)
def test_remote_extensions(
httpserver: HTTPServer,
neon_env_builder_local: NeonEnvBuilder,
httpserver_listen_address: ListenAddress,
test_output_dir: Path,
base_dir: Path,
pg_version: PgVersion,
pg_config: PgConfig,
extension: RemoteExtension,
):
# Setup a mock nginx S3 gateway which will return our test extension.
(host, port) = httpserver_listen_address
extensions_endpoint = f"http://{host}:{port}/pg-ext-s3-gateway"
build_tag = os.environ.get("BUILD_TAG", "latest")
# We have decided to use the Go naming convention due to Kubernetes.
arch = platform.machine()
match arch:
case "aarch64":
arch = "arm64"
case "x86_64":
arch = "amd64"
case _:
pass
archive_route = f"{build_tag}/{arch}/v{pg_version}/extensions/test_extension.tar.zst"
tarball = test_output_dir / "test_extension.tar"
extension_dir = (
base_dir / "test_runner" / "regress" / "data" / "test_remote_extensions" / "test_extension"
)
# Create tarball
with tarfile.open(tarball, "x") as tarf:
tarf.add(
extension_dir / "sql" / "test_extension--1.0.sql",
arcname="share/extension/test_extension--1.0.sql",
)
tarf.add(
extension_dir / "sql" / "test_extension--1.0--1.1.sql",
arcname="share/extension/test_extension--1.0--1.1.sql",
)
extension.build(pg_config, test_output_dir)
tarball = extension.package(test_output_dir)
def handler(request: Request) -> Response:
log.info(f"request: {request}")
@@ -104,8 +172,19 @@ def test_remote_extensions(
direct_passthrough=True,
)
# We have decided to use the Go naming convention due to Kubernetes.
arch = platform.machine()
match arch:
case "aarch64":
arch = "arm64"
case "x86_64":
arch = "amd64"
case _:
pass
httpserver.expect_request(
f"/pg-ext-s3-gateway/{archive_route}", method="GET"
f"/pg-ext-s3-gateway/{extension.archive_route(build_tag=os.environ.get('BUILD_TAG', 'latest'), arch=arch, pg_version=pg_version)}",
method="GET",
).respond_with_handler(handler)
# Start a compute node with remote_extension spec
@@ -114,21 +193,18 @@ def test_remote_extensions(
env.create_branch("test_remote_extensions")
endpoint = env.endpoints.create("test_remote_extensions")
with open(extension_dir / "test_extension.control", encoding="utf-8") as f:
control_data = f.read()
# mock remote_extensions spec
spec: dict[str, Any] = {
"public_extensions": ["test_extension"],
"public_extensions": [extension],
"custom_extensions": None,
"library_index": {
"test_extension": "test_extension",
extension: extension,
},
"extension_data": {
"test_extension": {
extension: {
"archive_path": "",
"control_data": {
"test_extension.control": control_data,
extension.control_file_name: extension.control_file_contents(),
},
},
},
@@ -141,8 +217,8 @@ def test_remote_extensions(
with endpoint.connect() as conn:
with conn.cursor() as cur:
# Check that appropriate files were downloaded
cur.execute("CREATE EXTENSION test_extension VERSION '1.0'")
cur.execute("SELECT test_extension.motd()")
cur.execute(f"CREATE EXTENSION {extension} VERSION '1.0'")
cur.execute(f"SELECT {extension}.motd()")
httpserver.check()
@@ -153,7 +229,7 @@ def test_remote_extensions(
remote_ext_requests = metrics.query_all(
"compute_ctl_remote_ext_requests_total",
# Check that we properly report the filename in the metrics
{"filename": "test_extension.tar.zst"},
{"filename": extension.compressed_tarball_name},
)
assert len(remote_ext_requests) == 1
for sample in remote_ext_requests:
@@ -162,20 +238,7 @@ def test_remote_extensions(
endpoint.stop()
# Remove the extension files to force a redownload of the extension.
for file in (
"test_extension.control",
"test_extension--1.0.sql",
"test_extension--1.0--1.1.sql",
):
(
test_output_dir
/ "pg_install"
/ f"v{pg_version}"
/ "share"
/ "postgresql"
/ "extension"
/ file
).unlink()
extension.remove(test_output_dir, pg_version)
endpoint.start(remote_ext_config=extensions_endpoint)
@@ -183,8 +246,8 @@ def test_remote_extensions(
with endpoint.connect() as conn:
with conn.cursor() as cur:
# Check that appropriate files were downloaded
cur.execute("ALTER EXTENSION test_extension UPDATE TO '1.1'")
cur.execute("SELECT test_extension.fun_fact()")
cur.execute(f"ALTER EXTENSION {extension} UPDATE TO '1.1'")
cur.execute(f"SELECT {extension}.fun_fact()")
# Check that we properly recorded downloads in the metrics
client = endpoint.http_client()
@@ -193,7 +256,7 @@ def test_remote_extensions(
remote_ext_requests = metrics.query_all(
"compute_ctl_remote_ext_requests_total",
# Check that we properly report the filename in the metrics
{"filename": "test_extension.tar.zst"},
{"filename": extension.compressed_tarball_name},
)
assert len(remote_ext_requests) == 1
for sample in remote_ext_requests:

View File

@@ -18,7 +18,12 @@ from fixtures.pageserver.http import (
from fixtures.pg_version import PgVersion
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import MockS3Server, RemoteStorageKind
from fixtures.utils import shared_buffers_for_max_cu, skip_in_debug_build, wait_until
from fixtures.utils import (
run_only_on_default_postgres,
shared_buffers_for_max_cu,
skip_in_debug_build,
wait_until,
)
from mypy_boto3_kms import KMSClient
from mypy_boto3_kms.type_defs import EncryptResponseTypeDef
from mypy_boto3_s3 import S3Client
@@ -43,6 +48,24 @@ smoke_params = [
]
def mock_import_bucket(vanilla_pg: VanillaPostgres, path: Path):
"""
Mock the import S3 bucket into a local directory for a provided vanilla PG instance.
"""
assert not vanilla_pg.is_running()
path.mkdir()
# what cplane writes before scheduling fast_import
specpath = path / "spec.json"
specpath.write_text(json.dumps({"branch_id": "somebranch", "project_id": "someproject"}))
# what fast_import writes
vanilla_pg.pgdatadir.rename(path / "pgdata")
statusdir = path / "status"
statusdir.mkdir()
(statusdir / "pgdata").write_text(json.dumps({"done": True}))
(statusdir / "fast_import").write_text(json.dumps({"command": "pgdata", "done": True}))
@skip_in_debug_build("MULTIPLE_RELATION_SEGMENTS has non trivial amount of data")
@pytest.mark.parametrize("shard_count,stripe_size,rel_block_size", smoke_params)
def test_pgdata_import_smoke(
@@ -155,17 +178,8 @@ def test_pgdata_import_smoke(
# TODO: actually exercise fast_import here
# TODO: test s3 remote storage
#
importbucket = neon_env_builder.repo_dir / "importbucket"
importbucket.mkdir()
# what cplane writes before scheduling fast_import
specpath = importbucket / "spec.json"
specpath.write_text(json.dumps({"branch_id": "somebranch", "project_id": "someproject"}))
# what fast_import writes
vanilla_pg.pgdatadir.rename(importbucket / "pgdata")
statusdir = importbucket / "status"
statusdir.mkdir()
(statusdir / "pgdata").write_text(json.dumps({"done": True}))
(statusdir / "fast_import").write_text(json.dumps({"command": "pgdata", "done": True}))
importbucket_path = neon_env_builder.repo_dir / "importbucket"
mock_import_bucket(vanilla_pg, importbucket_path)
#
# Do the import
@@ -192,7 +206,7 @@ def test_pgdata_import_smoke(
"new_timeline_id": str(timeline_id),
"import_pgdata": {
"idempotency_key": str(idempotency),
"location": {"LocalFs": {"path": str(importbucket.absolute())}},
"location": {"LocalFs": {"path": str(importbucket_path.absolute())}},
},
},
)
@@ -319,6 +333,87 @@ def test_pgdata_import_smoke(
br_initdb_endpoint.safe_psql("select * from othertable")
@run_only_on_default_postgres(reason="PG version is irrelevant here")
def test_import_completion_on_restart(
neon_env_builder: NeonEnvBuilder, vanilla_pg: VanillaPostgres, make_httpserver: HTTPServer
):
"""
Validate that the storage controller delivers the import completion notification
eventually even if it was restarted when the import initially completed.
"""
# Set up mock control plane HTTP server to listen for import completions
import_completion_signaled = Event()
def handler(request: Request) -> Response:
log.info(f"control plane /import_complete request: {request.json}")
import_completion_signaled.set()
return Response(json.dumps({}), status=200)
cplane_mgmt_api_server = make_httpserver
cplane_mgmt_api_server.expect_request(
"/storage/api/v1/import_complete", method="PUT"
).respond_with_handler(handler)
# Plug the cplane mock in
neon_env_builder.control_plane_hooks_api = (
f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/"
)
# The import will specifiy a local filesystem path mocking remote storage
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
vanilla_pg.start()
vanilla_pg.stop()
env = neon_env_builder.init_configs()
env.start()
importbucket_path = neon_env_builder.repo_dir / "test_import_completion_bucket"
mock_import_bucket(vanilla_pg, importbucket_path)
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
idempotency = ImportPgdataIdemptencyKey.random()
# Pause before sending the notification
failpoint_name = "timeline-import-pre-cplane-notification"
env.storage_controller.configure_failpoints((failpoint_name, "pause"))
env.storage_controller.tenant_create(tenant_id)
env.storage_controller.timeline_create(
tenant_id,
{
"new_timeline_id": str(timeline_id),
"import_pgdata": {
"idempotency_key": str(idempotency),
"location": {"LocalFs": {"path": str(importbucket_path.absolute())}},
},
},
)
def hit_failpoint():
log.info("Checking log for pattern...")
try:
assert env.storage_controller.log_contains(f".*at failpoint {failpoint_name}.*")
except Exception:
log.exception("Failed to find pattern in log")
raise
wait_until(hit_failpoint)
assert not import_completion_signaled.is_set()
# Restart the storage controller before signalling control plane.
# This clears the failpoint and we expect that the import start-up reconciliation
# kicks in and notifies cplane.
env.storage_controller.stop()
env.storage_controller.start()
def cplane_notified():
assert import_completion_signaled.is_set()
wait_until(cplane_notified)
def test_fast_import_with_pageserver_ingest(
test_output_dir,
vanilla_pg: VanillaPostgres,

View File

@@ -2894,12 +2894,10 @@ def test_storage_controller_leadership_transfer(
)
@pytest.mark.parametrize("step_down_times_out", [False, True])
def test_storage_controller_leadership_transfer_during_split(
neon_env_builder: NeonEnvBuilder,
storage_controller_proxy: StorageControllerProxy,
port_distributor: PortDistributor,
step_down_times_out: bool,
):
"""
Exercise a race between shard splitting and graceful leadership transfer. This is
@@ -2940,8 +2938,8 @@ def test_storage_controller_leadership_transfer_during_split(
)
env.storage_controller.reconcile_until_idle()
# We are testing scenarios where the step down API does not complete: either because it is stuck
# doing a shard split, or because it totally times out on some other failpoint.
# We are testing scenarios where the step down API does not complete: it is stuck
# doing a shard split
env.storage_controller.allowed_errors.extend(
[
".*step_down.*request was dropped before completing.*",
@@ -2949,6 +2947,7 @@ def test_storage_controller_leadership_transfer_during_split(
".*Send step down request failed, will retry.*",
".*Send step down request still failed after.*retries.*",
".*Leader .+ did not respond to step-down request.*",
".*Stopping reconciliations during step down is taking too long.*",
]
)
@@ -2960,13 +2959,6 @@ def test_storage_controller_leadership_transfer_during_split(
pause_failpoint = "shard-split-pre-complete"
env.storage_controller.configure_failpoints((pause_failpoint, "pause"))
if not step_down_times_out:
# Prevent the timeout self-terminate code from executing: we will block step down on the
# shard split itself
env.storage_controller.configure_failpoints(
("step-down-delay-timeout", "return(3600000)")
)
split_fut = executor.submit(
env.storage_controller.tenant_shard_split, list(tenants)[0], shard_count * 2
)
@@ -2985,13 +2977,9 @@ def test_storage_controller_leadership_transfer_during_split(
timeout_in_seconds=30, instance_id=2, base_port=storage_controller_2_port
)
if step_down_times_out:
# Step down will time out, original controller will terminate itself
env.storage_controller.allowed_errors.extend([".*terminating process.*"])
else:
# Step down does not time out: original controller hits its shard split completion
# code path and realises that it must not purge the parent shards from the database.
env.storage_controller.allowed_errors.extend([".*Enqueuing background abort.*"])
# Step down does not time out: original controller hits its shard split completion
# code path and realises that it must not purge the parent shards from the database.
env.storage_controller.allowed_errors.extend([".*Enqueuing background abort.*"])
def passed_split_abort():
try:
@@ -3007,42 +2995,34 @@ def test_storage_controller_leadership_transfer_during_split(
wait_until(passed_split_abort, interval=0.1, status_interval=1.0)
assert env.storage_controller.log_contains(".*Aborting shard split.*")
if step_down_times_out:
# We will let the old controller hit a timeout path where it terminates itself, rather than
# completing step_down and trying to complete a shard split
def old_controller_terminated():
assert env.storage_controller.log_contains(".*terminating process.*")
# Proxy is still talking to original controller here: disable its pause failpoint so
# that its shard split can run to completion.
log.info("Disabling failpoint")
# Bypass the proxy: the python test HTTPServer is single threaded and still blocked
# on handling the shard split request.
env.storage_controller.request(
"PUT",
f"http://127.0.0.1:{storage_controller_1_port}/debug/v1/failpoints",
json=[{"name": "shard-split-pre-complete", "actions": "off"}],
headers=env.storage_controller.headers(TokenScope.ADMIN),
)
wait_until(old_controller_terminated)
else:
# Proxy is still talking to original controller here: disable its pause failpoint so
# that its shard split can run to completion.
log.info("Disabling failpoint")
# Bypass the proxy: the python test HTTPServer is single threaded and still blocked
# on handling the shard split request.
env.storage_controller.request(
"PUT",
f"http://127.0.0.1:{storage_controller_1_port}/debug/v1/failpoints",
json=[{"name": "shard-split-pre-complete", "actions": "off"}],
headers=env.storage_controller.headers(TokenScope.ADMIN),
def previous_stepped_down():
assert (
env.storage_controller.get_leadership_status()
== StorageControllerLeadershipStatus.STEPPED_DOWN
)
def previous_stepped_down():
assert (
env.storage_controller.get_leadership_status()
== StorageControllerLeadershipStatus.STEPPED_DOWN
)
log.info("Awaiting step down")
wait_until(previous_stepped_down)
log.info("Awaiting step down")
wait_until(previous_stepped_down)
# Let the shard split complete: this may happen _after_ the replacement has come up
# and tried to clean up the databases
log.info("Unblocking & awaiting shard split")
with pytest.raises(Exception, match="Unexpected child shard count"):
# This split fails when it tries to persist results, because it encounters
# changes already made by the new controller's abort-on-startup
split_fut.result()
# Let the shard split complete: this may happen _after_ the replacement has come up
# and tried to clean up the databases
log.info("Unblocking & awaiting shard split")
with pytest.raises(Exception, match="Unexpected child shard count"):
# This split fails when it tries to persist results, because it encounters
# changes already made by the new controller's abort-on-startup
split_fut.result()
log.info("Routing to new leader")
storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_2_port}")
@@ -3060,14 +3040,13 @@ def test_storage_controller_leadership_transfer_during_split(
env.storage_controller.wait_until_ready()
env.storage_controller.consistency_check()
if not step_down_times_out:
# Check that the stepped down instance forwards requests
# to the new leader while it's still running.
storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_1_port}")
env.storage_controller.tenant_shard_dump()
env.storage_controller.node_configure(env.pageservers[0].id, {"scheduling": "Pause"})
status = env.storage_controller.node_status(env.pageservers[0].id)
assert status["scheduling"] == "Pause"
# Check that the stepped down instance forwards requests
# to the new leader while it's still running.
storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_1_port}")
env.storage_controller.tenant_shard_dump()
env.storage_controller.node_configure(env.pageservers[0].id, {"scheduling": "Pause"})
status = env.storage_controller.node_status(env.pageservers[0].id)
assert status["scheduling"] == "Pause"
def test_storage_controller_ps_restarted_during_drain(neon_env_builder: NeonEnvBuilder):