Compare commits

..

32 Commits

Author SHA1 Message Date
Konstantin Knizhnik
bb86bd230d Edit revisions.json 2024-08-05 14:57:26 +03:00
Konstantin Knizhnik
0f855757a8 Bump postgres version 2024-08-05 14:55:33 +03:00
Konstantin Knizhnik
33f2a2bc55 Use prefetch for reading DuckDB pages 2024-08-05 14:53:27 +03:00
Konstantin Knizhnik
06b8f013f0 Supprot opaque mode for log_newpages 2024-08-05 14:51:16 +03:00
Alexander Bayandin
bd845c7587 CI(trigger-e2e-tests): wait for promote-images job from the last commit (#8592)
## Problem

We don't trigger e2e tests for draft PRs, but we do trigger them once a
PR is in the "Ready for review" state.
Sometimes, a PR can be marked as "Ready for review" before we finish
image building. In such cases, triggering e2e tests fails.

## Summary of changes
- Make `trigger-e2e-tests` job poll status of `promote-images` job from
the build-and-test workflow for the last commit. And trigger only if the
status is `success`
- Remove explicit image checking from the workflow
- Add `concurrency` for `triggere-e2e-tests` workflow to make it
possible to cancel jobs in progress (if PR moves from "Draft" to "Ready
for review" several times in a row)
2024-08-05 12:25:23 +01:00
Konstantin Knizhnik
f63c8e5a8c Update Postgres versions to use smgrexists() instead of access() to check if Oid is used (#8597)
## Problem

PR #7992 was merged without correspondent changes in Postgres submodules
and this is why test_oid_overflow.py is failed now.

## Summary of changes

Bump Postgres versions

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2024-08-05 14:24:54 +03:00
Alex Chi Z.
200fa56b04 feat(pageserver): support split delta layers (#8599)
part of https://github.com/neondatabase/neon/issues/8002

Similar to https://github.com/neondatabase/neon/pull/8574, we add
auto-split support for delta layers. Tests are reused from image layer
split writers.


---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-08-05 10:30:49 +00:00
dotdister
0f3dac265b safekeeper: remove unused partial_backup_enabled option (#8547)
## Problem
There is an unused safekeeper option `partial_backup_enabled`.

`partial_backup_enabled` was implemented in #6530, but this option was
always turned into enabled in #8022.

If you intended to keep this option for a specific reason, I will close
this PR.

## Summary of changes
I removed an unused safekeeper option `partial_backup_enabled`.
2024-08-05 09:23:59 +02:00
Alex Chi Z.
1dc496a2c9 feat(pageserver): support auto split layers based on size (#8574)
part of https://github.com/neondatabase/neon/issues/8002

## Summary of changes

Add a `SplitImageWriter` that automatically splits image layer based on
estimated target image layer size. This does not consider compression
and we might need a better metrics.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
2024-08-05 06:55:36 +01:00
Alex Chi Z.
6814bdd30b fix(pageserver): deadlock in gc-compaction (#8590)
We need both compaction and gc lock for gc-compaction. The lock order
should be the same everywhere, otherwise there could be a deadlock where
A waits for B and B waits for A.

We also had a double-lock issue. The compaction lock gets acquired in
the outer `compact` function. Note that the unit tests directly call
`compact_with_gc`, and therefore not triggering the issue.

## Summary of changes

Ensure all places acquire compact lock and then gc lock. Remove an extra
compact lock acqusition.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-08-03 00:52:04 +01:00
John Spray
0a667bc8ef tests: add test_historic_storage_formats (#8423)
## Problem

Currently, our backward compatibility tests only look one release back.
That means, for example, that when we switch on image layer compression
by default, we'll test reading of uncompressed layers for one release,
and then stop doing it. When we make an index_part.json format change,
we'll test against the old format for a week, then stop (unless we write
separate unit tests for each old format).

The reality in the field is that data in old formats will continue to
exist for weeks/months/years. When we make major format changes, we
should retain examples of the old format data, and continuously verify
that the latest code can still read them.

This test uses contents from a new path in the public S3 bucket,
`compatibility-data-snapshots/`. It is populated by hand. The first
important artifact is one from before we switch on compression, so that
we will keep testing reads of uncompressed data. We will generate more
artifacts ahead of other key changes, like when we update remote storage
format for archival timelines.

Closes: https://github.com/neondatabase/cloud/issues/15576
2024-08-02 18:28:23 +01:00
Arthur Petukhovsky
f3acfb2d80 Improve safekeepers eviction rate limiting (#8456)
This commit tries to fix regular load spikes on staging, caused by too
many eviction and partial upload operations running at the same time.
Usually it was hapenning after restart, for partial backup the load was
delayed.
- Add a semaphore for evictions (2 permits by default)
- Rename `resident_since` to `evict_not_before` and smooth out the curve
by using random duration
- Use random duration in partial uploads as well

related to https://github.com/neondatabase/neon/issues/6338
some discussion in
https://neondb.slack.com/archives/C033RQ5SPDH/p1720601531744029
2024-08-02 15:26:46 +01:00
Arpad Müller
8c828c586e Wait for completion of the upload queue in flush_frozen_layer (#8550)
Makes `flush_frozen_layer` add a barrier to the upload queue and makes
it wait for that barrier to be reached until it lets the flushing be
completed.

This gives us backpressure and ensures that writes can't build up in an
unbounded fashion.

Fixes #7317
2024-08-02 13:07:12 +02:00
John Spray
2334fed762 storage_controller: start adding chaos hooks (#7946)
Chaos injection bridges the gap between automated testing (where we do
lots of different things with small, short-lived tenants), and staging
(where we do many fewer things, but with larger, long-lived tenants).

This PR adds a first type of chaos which isn't really very chaotic: it's
live migration of tenants between healthy pageservers. This nevertheless
provides continuous checks that things like clean, prompt shutdown of
tenants works for realistically deployed pageservers with realistically
large tenants.
2024-08-02 09:37:44 +01:00
John Spray
c53799044d pageserver: refine how we delete timelines after shard split (#8436)
## Problem

Previously, when we do a timeline deletion, shards will delete layers
that belong to an ancestor. That is not a correctness issue, because
when we delete a timeline, we're always deleting it from all shards, and
destroying data for that timeline is clearly fine.

However, there exists a race where one shard might start doing this
deletion while another shard has not yet received the deletion request,
and might try to access an ancestral layer. This creates ambiguity over
the "all layers referenced by my index should always exist" invariant,
which is important to detecting and reporting corruption.

Now that we have a GC mode for clearing up ancestral layers, we can rely
on that to clean up such layers, and avoid deleting them right away.
This makes things easier to reason about: there are now no cases where a
shard will delete a layer that belongs to a ShardIndex other than
itself.

## Summary of changes

- Modify behavior of RemoteTimelineClient::delete_all
- Add `test_scrubber_physical_gc_timeline_deletion` to exercise this
case
- Tweak AWS SDK config in the scrubber to enable retries. Motivated by
seeing the test for this feature encounter some transient "service
error" S3 errors (which are probably nothing to do with the changes in
this PR)
2024-08-02 08:00:46 +01:00
Alexander Bayandin
e7477855b7 test_runner: don't create artifacts if Allure is not enabled (#8580)
## Problem

`allure_attach_from_dir` method might create `tar.zst` archives even
if `--alluredir` is not set (i.e. Allure results collection is disabled)

## Summary of changes
- Don't run `allure_attach_from_dir` if `--alluredir`  is not set
2024-08-01 15:55:43 +00:00
Alex Chi Z.
f4a668a27d fix(pageserver): skip existing layers for btm-gc-compaction (#8498)
part of https://github.com/neondatabase/neon/issues/8002

Due to the limitation of the current layer map implementation, we cannot
directly replace a layer. It's interpreted as an insert and a deletion,
and there will be file exist error when renaming the newly-created layer
to replace the old layer. We work around that by changing the end key of
the image layer. A long-term fix would involve a refactor around the
layer file naming. For delta layers, we simply skip layers with the same
key range produced, though it is possible to add an extra key as an
alternative solution.

* The image layer range for the layers generated from gc-compaction will
be Key::MIN..(Key..MAX-1), to avoid being recognized as an L0 delta
layer.
* Skip existing layers if it turns out that we need to generate a layer
with the same persistent key in the same generation.

Note that it is possible that the newly-generated layer has different
content from the existing layer. For example, when the user drops a
retain_lsn, the compaction could have combined or dropped some records,
therefore creating a smaller layer than the existing one. We discard the
"optimized" layer for now because we cannot deal with such rewrites
within the same generation.


---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
Co-authored-by: Christian Schwarz <christian@neon.tech>
2024-08-01 15:00:06 +01:00
Alex Chi Z.
970f2923b2 storage-scrubber: log version on start (#8571)
Helps us better identify which version of storage scrubber is running.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-08-01 13:52:34 +00:00
John Spray
1678dea20f pageserver: add layer visibility calculation (#8511)
## Problem

We recently added a "visibility" state to layers, but nothing
initializes it.

Part of:
- #8398 

## Summary of changes

- Add a dependency on `range-set-blaze`, which is used as a fast
incrementally updated alternative to KeySpace. We could also use this to
replace the internals of KeySpaceRandomAccum if we wanted to. Writing a
type that does this kind of "BtreeMap & merge overlapping entries" thing
isn't super complicated, but no reason to write this ourselves when
there's a third party impl available.
- Add a function to layermap to calculate visibilities for each layer
- Add a function to Timeline to call into layermap and then apply these
visibilities to the Layer objects.
- Invoke the calculation during startup, after image layer creations,
and when removing branches. Branch removal and image layer creation are
the two ways that a layer can go from Visible to Covered.
- Add unit test & benchmark for the visibility calculation
- Expose `pageserver_visible_physical_size` metric, which should always
be <= `pageserver_remote_physical_size`.
- This metric will feed into the /v1/utilization endpoint later: the
visible size indicates how much space we would like to use on this
pageserver for this tenant.
- When `pageserver_visible_physical_size` is greater than
`pageserver_resident_physical_size`, this is a sign that the tenant has
long-idle branches, which result in layers that are visible in
principle, but not used in practice.

This does not keep visibility hints up to date in all cases:
particularly, when creating a child timeline, any previously covered
layers will not get marked Visible until they are accessed.

Updates after image layer creation could be implemented as more of a
special case, but this would require more new code: the existing depth
calculation code doesn't maintain+yield the list of deltas that would be
covered by an image layer.

## Performance

This operation is done rarely (at startup and at timeline deletion), so
needs to be efficient but not ultra-fast.

There is a new `visibility` bench that measures runtime for a synthetic
100k layers case (`sequential`) and a real layer map (`real_map`) with
~26k layers.

The benchmark shows runtimes of single digit milliseconds (on a ryzen
7950). This confirms that the runtime shouldn't be a problem at startup
(as we already incur S3-level latencies there), but that it's slow
enough that we definitely shouldn't call it more often than necessary,
and it may be worthwhile to optimize further later (things like: when
removing a branch, only bother scanning layers below the branchpoint)

```
visibility/sequential   time:   [4.5087 ms 4.5894 ms 4.6775 ms]
                        change: [+2.0826% +3.9097% +5.8995%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 24 outliers among 100 measurements (24.00%)
  2 (2.00%) high mild
  22 (22.00%) high severe
min: 0/1696070, max: 93/1C0887F0
visibility/real_map     time:   [7.0796 ms 7.0832 ms 7.0871 ms]
                        change: [+0.3900% +0.4505% +0.5164%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 4 outliers among 100 measurements (4.00%)
  3 (3.00%) high mild
  1 (1.00%) high severe
min: 0/1696070, max: 93/1C0887F0
visibility/real_map_many_branches
                        time:   [4.5285 ms 4.5355 ms 4.5434 ms]
                        change: [-1.0012% -0.8004% -0.5969%] (p = 0.00 < 0.05)
                        Change within noise threshold.
```
2024-08-01 09:25:35 +00:00
Arpad Müller
163f2eaf79 Reduce linux-raw-sys duplication (#8577)
Before, we had four versions of linux-raw-sys in our dependency graph:

```
  linux-raw-sys@0.1.4
  linux-raw-sys@0.3.8
  linux-raw-sys@0.4.13
  linux-raw-sys@0.6.4
```

now it's only two:

```
  linux-raw-sys@0.4.13
  linux-raw-sys@0.6.4
```

The changes in this PR are minimal. In order to get to its state one
only has to update procfs in Cargo.toml to 0.16 and do `cargo update -p
tempfile -p is-terminal -p prometheus`.
2024-08-01 08:22:21 +00:00
Christian Schwarz
980d506bda pageserver: shutdown all walredo managers 8s into shutdown (#8572)
# Motivation

The working theory for hung systemd during PS deploy
(https://github.com/neondatabase/cloud/issues/11387) is that leftover
walredo processes trigger a race condition.

In https://github.com/neondatabase/neon/pull/8150 I arranged that a
clean Tenant shutdown does actually kill its walredo processes.

But many prod machines don't manage to shut down all their tenants until
the 10s systemd timeout hits and, presumably, triggers the race
condition in systemd / the Linux kernel that causes the frozen systemd

# Solution

This PR bolts on a rather ugly mechanism to shut down tenant managers
out of order 8s after we've received the SIGTERM from systemd.

# Changes

- add a global registry of `Weak<WalRedoManager>`
- add a special thread spawned during `shutdown_pageserver` that sleeps
for 8s, then shuts down all redo managers in the registry and prevents
new redo managers from being created
- propagate the new failure mode of tenant spawning throughout the code
base
- make sure shut down tenant manager results in
PageReconstructError::Cancelled so that if Timeline::get calls come in
after the shutdown, they do the right thing
2024-08-01 07:57:09 +02:00
Alex Chi Z.
d6c79b77df test(pageserver): add test_gc_feedback_with_snapshots (#8474)
should be working after https://github.com/neondatabase/neon/pull/8328
gets merged. Part of https://github.com/neondatabase/neon/issues/8002

adds a new perf benchmark case that ensures garbages can be collected
with branches

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-07-31 17:55:19 -04:00
Alexander Bayandin
3350daeb9a CI(create-test-report): fix missing benchmark results in Allure report (#8540)
## Problem

In https://github.com/neondatabase/neon/pull/8241 I've accidentally
removed `create-test-report` dependency on `benchmarks` job

## Summary of changes
- Run `create-test-report` after `benchmarks` job
2024-07-31 19:47:59 +01:00
Arpad Müller
939d50a41c storage_scrubber: migrate FindGarbage to remote_storage (#8548)
Uses the newly added APIs from #8541 named `stream_tenants_generic` and
`stream_objects_with_retries` and extends them with
`list_objects_with_retries_generic` and
`stream_tenant_timelines_generic` to migrate the `find-garbage` command
of the scrubber to `GenericRemoteStorage`.

Part of https://github.com/neondatabase/neon/issues/7547
2024-07-31 18:24:42 +00:00
John Spray
2f9ada13c4 controller: simplify reconciler generation increment logic (#8560)
## Problem

This code was confusing, untested and covered:
- an impossible case, where intent state is AttacheStale (we never do
this)
- a rare edge case (going from AttachedMulti to Attached), which we were
not testing, and in any case the pageserver internally does the same
Tenant reset in this transition as it would do if we incremented
generation.

Closes: https://github.com/neondatabase/neon/issues/8367

## Summary of changes

- Simplify the logic to only skip incrementing the generation if the
location already has the expected generation and the exact same mode.
2024-07-31 18:37:47 +01:00
Cihan Demirci
ff51b565d3 cicd: change Azure storage details [2/2] (#8562)
Change Azure storage configuration to point to updated variables/secrets.

Also update subscription id variable.
2024-07-31 17:42:10 +01:00
Tristan Partin
5e0409de95 Fix negative replication delay metric
In some cases, we can get a negative metric for replication_delay_bytes.
My best guess from all the research I've done is that we evaluate
pg_last_wal_receive_lsn() before pg_last_wal_replay_lsn(), and that by
the time everything is said and done, the replay LSN has advanced past
the receive LSN. In this case, our lag can effectively be modeled as
0 due to the speed of the WAL reception and replay.
2024-07-31 10:16:58 -05:00
Christian Schwarz
4e3b70e308 refactor(page_service): Timeline gate guard holding + cancellation + shutdown (#8339)
Since the introduction of sharding, the protocol handling loop in
`handle_pagerequests` cannot know anymore which concrete
`Tenant`/`Timeline` object any of the incoming `PagestreamFeMessage`
resolves to.
In fact, one message might resolve to one `Tenant`/`Timeline` while
the next one may resolve to another one.

To avoid going to tenant manager, we added the `shard_timelines` which
acted as an ever-growing cache that held timeline gate guards open for
the lifetime of the connection.
The consequence of holding the gate guards open was that we had to be
sensitive to every cached `Timeline::cancel` on each interaction with
the network connection, so that Timeline shutdown would not have to wait
for network connection interaction.

We can do better than that, meaning more efficiency & better
abstraction.
I proposed a sketch for it in

* https://github.com/neondatabase/neon/pull/8286

and this PR implements an evolution of that sketch.

The main idea is is that `mod page_service` shall be solely concerned
with the following:
1. receiving requests by speaking the protocol / pagestream subprotocol
2. dispatching the request to a corresponding method on the correct
shard/`Timeline` object
3. sending response by speaking the protocol / pagestream subprotocol.

The cancellation sensitivity responsibilities are clear cut:
* while in `page_service` code, sensitivity to page_service cancellation
is sufficient
* while in `Timeline` code, sensitivity to `Timeline::cancel` is
sufficient

To enforce these responsibilities, we introduce the notion of a
`timeline::handle::Handle` to a `Timeline` object that is checked out
from a `timeline::handle::Cache` for **each request**.
The `Handle` derefs to `Timeline` and is supposed to be used for a
single async method invocation on `Timeline`.
See the lengthy doc comment in `mod handle` for details of the design.
2024-07-31 17:05:45 +02:00
Alex Chi Z.
61a65f61f3 feat(pageserver): support btm-gc-compaction for child branches (#8519)
part of https://github.com/neondatabase/neon/issues/8002

For child branches, we will pull the image of the modified keys from the
parant into the child branch, which creates a full history for
generating key retention. If there are not enough delta keys, the image
won't be wrote eventually, and we will only keep the deltas inside the
child branch. We could avoid the wasteful work to pull the image from
the parent if we can know the number of deltas in advance, in the future
(currently we always pull image for all modified keys in the child
branch)


---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-07-31 15:48:48 +01:00
Alexander Bayandin
d21246c8bd CI(regress-tests): run less regression tests (#8561)
## Problem
We run regression tests on `release` & `debug` builds for each of the
three supported Postgres versions (6 in total).
With upcoming ARM support and Postgres 17, the number of jobs will jump
to 16, which is a lot.

See the internal discussion here:
https://neondb.slack.com/archives/C033A2WE6BZ/p1722365908404329

## Summary of changes
- Run `regress-tests` job in debug builds only with the latest Postgres
version
- Do not do `debug` builds on release branches
2024-07-31 15:10:27 +01:00
Christian Schwarz
4825b0fec3 compaction_level0_phase1: bypass PS PageCache for data blocks (#8543)
part of https://github.com/neondatabase/neon/issues/8184

# Problem

We want to bypass PS PageCache for all data block reads, but
`compact_level0_phase1` currently uses `ValueRef::load` to load the WAL
records from delta layers.
Internally, that maps to `FileBlockReader:read_blk` which hits the
PageCache
[here](e78341e1c2/pageserver/src/tenant/block_io.rs (L229-L236)).

# Solution

This PR adds a mode for `compact_level0_phase1` that uses the
`MergeIterator` for reading the `Value`s from the delta layer files.

`MergeIterator` is a streaming k-merge that uses vectored blob_io under
the hood, which bypasses the PS PageCache for data blocks.

Other notable changes:
* change the `DiskBtreeReader::into_stream` to buffer the node, instead
of holding a `PageCache` `PageReadGuard`.
* Without this, we run out of page cache slots in
`test_pageserver_compaction_smoke`.
* Generally, `PageReadGuard`s aren't supposed to be held across await
points, so, this is a general bugfix.

# Testing / Validation / Performance

`MergeIterator` has not yet been used in production; it's being
developed as part of
* https://github.com/neondatabase/neon/issues/8002

Therefore, this PR adds a validation mode that compares the existing
approach's value iterator with the new approach's stream output, item by
item.
If they're not identical, we log a warning / fail the unit/regression
test.
To avoid flooding the logs, we apply a global rate limit of once per 10
seconds.
In any case, we use the existing approach's value.

Expected performance impact that will be monitored in staging / nightly
benchmarks / eventually pre-prod:
* with validation:
  * increased CPU usage
  * ~doubled VirtualFile read bytes/second metric
* no change in disk IO usage because the kernel page cache will likely
have the pages buffered on the second read
* without validation:
* slightly higher DRAM usage because each iterator participating in the
k-merge has a dedicated buffer (as opposed to before, where compactions
would rely on the PS PageCaceh as a shared evicting buffer)
* less disk IO if previously there were repeat PageCache misses (likely
case on a busy production Pageserver)
* lower CPU usage: PageCache out of the picture, fewer syscalls are made
(vectored blob io batches reads)

# Rollout

The new code is used with validation mode enabled-by-default.
This gets us validation everywhere by default, specifically in
- Rust unit tests
- Python tests
- Nightly pagebench (shouldn't really matter)
- Staging

Before the next release, I'll merge the following aws.git PR that
configures prod to continue using the existing behavior:

* https://github.com/neondatabase/aws/pull/1663

# Interactions With Other Features

This work & rollout should complete before Direct IO is enabled because
Direct IO would double the IOPS & latency for each compaction read
(#8240).

# Future Work

The streaming k-merge's memory usage is proportional to the amount of
memory per participating layer.

But `compact_level0_phase1` still loads all keys into memory for
`all_keys_iter`.
Thus, it continues to have active memory usage proportional to the
number of keys involved in the compaction.

Future work should replace `all_keys_iter` with a streaming keys
iterator.
This PR has a draft in its first commit, which I later reverted because
it's not necessary to achieve the goal of this PR / issue #8184.
2024-07-31 14:17:59 +02:00
Cihan Demirci
a4df3c8488 cicd: change Azure storage details [1/2] (#8553)
Change Azure storage configuration to point to new variables/secrets. They have
the `_NEW` suffix in order not to disrupt any tests while we complete the
switch.
2024-07-30 19:34:15 +00:00
80 changed files with 4629 additions and 1428 deletions

View File

@@ -19,6 +19,10 @@ on:
description: 'debug or release'
required: true
type: string
pg-versions:
description: 'a json array of postgres versions to run regression tests on'
required: true
type: string
defaults:
run:
@@ -254,7 +258,7 @@ jobs:
strategy:
fail-fast: false
matrix:
pg_version: [ v14, v15, v16 ]
pg_version: ${{ fromJson(inputs.pg-versions) }}
steps:
- uses: actions/checkout@v4
with:
@@ -284,5 +288,5 @@ jobs:
- name: Merge and upload coverage data
if: |
false &&
inputs.build-type == 'debug' && matrix.pg_version == 'v14'
inputs.build-type == 'debug' && matrix.pg_version == 'v16'
uses: ./.github/actions/save-coverage-data

View File

@@ -203,7 +203,8 @@ jobs:
fail-fast: false
matrix:
arch: [ x64 ]
build-type: [ debug, release ]
# Do not build or run tests in debug for release branches
build-type: ${{ fromJson((startsWith(github.ref_name, 'release' && github.event_name == 'push')) && '["release"]' || '["debug", "release"]') }}
include:
- build-type: release
arch: arm64
@@ -213,6 +214,8 @@ jobs:
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}
build-tag: ${{ needs.tag.outputs.build-tag }}
build-type: ${{ matrix.build-type }}
# Run tests on all Postgres versions in release builds and only on the latest version in debug builds
pg-versions: ${{ matrix.build-type == 'release' && '["v14", "v15", "v16"]' || '["v16"]' }}
secrets: inherit
# Keep `benchmarks` job outside of `build-and-test-locally` workflow to make job failures non-blocking
@@ -306,7 +309,7 @@ jobs:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
create-test-report:
needs: [ check-permissions, build-and-test-locally, coverage-report, build-build-tools-image ]
needs: [ check-permissions, build-and-test-locally, coverage-report, build-build-tools-image, benchmarks ]
if: ${{ !cancelled() && contains(fromJSON('["skipped", "success"]'), needs.check-permissions.result) }}
outputs:
report-url: ${{ steps.create-allure-report.outputs.report-url }}
@@ -868,7 +871,7 @@ jobs:
with:
client-id: ${{ secrets.AZURE_DEV_CLIENT_ID }}
tenant-id: ${{ secrets.AZURE_TENANT_ID }}
subscription-id: ${{ secrets.AZURE_SUBSCRIPTION_ID }}
subscription-id: ${{ secrets.AZURE_DEV_SUBSCRIPTION_ID }}
- name: Login to ACR
if: github.ref_name == 'main'

View File

@@ -10,11 +10,13 @@ defaults:
run:
shell: bash -euxo pipefail {0}
concurrency:
group: ${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
cancel-in-progress: true
env:
# A concurrency group that we use for e2e-tests runs, matches `concurrency.group` above with `github.repository` as a prefix
E2E_CONCURRENCY_GROUP: ${{ github.repository }}-e2e-tests-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }}
jobs:
cancel-previous-e2e-tests:
@@ -64,19 +66,35 @@ jobs:
needs: [ tag ]
runs-on: ubuntu-22.04
env:
EVENT_ACTION: ${{ github.event.action }}
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
TAG: ${{ needs.tag.outputs.build-tag }}
steps:
- name: check if ecr image are present
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }}
- name: Wait for `promote-images` job to finish
# It's important to have a timeout here, the script in the step can run infinitely
timeout-minutes: 60
run: |
for REPO in neon compute-tools compute-node-v14 vm-compute-node-v14 compute-node-v15 vm-compute-node-v15 compute-node-v16 vm-compute-node-v16; do
OUTPUT=$(aws ecr describe-images --repository-name ${REPO} --region eu-central-1 --query "imageDetails[?imageTags[?contains(@, '${TAG}')]]" --output text)
if [ "$OUTPUT" == "" ]; then
echo "$REPO with image tag $TAG not found" >> $GITHUB_OUTPUT
exit 1
fi
if [ "${GITHUB_EVENT_NAME}" != "pull_request" ] || [ "${EVENT_ACTION}" != "ready_for_review" ]; then
exit 0
fi
# For PRs we use the run id as the tag
BUILD_AND_TEST_RUN_ID=${TAG}
while true; do
conclusion=$(gh run --repo ${GITHUB_REPOSITORY} view ${BUILD_AND_TEST_RUN_ID} --json jobs --jq '.jobs[] | select(.name == "promote-images") | .conclusion')
case "$conclusion" in
success)
break
;;
failure | cancelled | skipped)
echo "The 'promote-images' job didn't succeed: '${conclusion}'. Exiting..."
exit 1
;;
*)
echo "The 'promote-images' hasn't succeed yet. Waiting..."
sleep 60
;;
esac
done
- name: Set e2e-platforms

199
Cargo.lock generated
View File

@@ -1418,7 +1418,7 @@ dependencies = [
"clap",
"criterion-plot",
"is-terminal",
"itertools",
"itertools 0.10.5",
"num-traits",
"once_cell",
"oorandom",
@@ -1439,7 +1439,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1"
dependencies = [
"cast",
"itertools",
"itertools 0.10.5",
]
[[package]]
@@ -1744,18 +1744,6 @@ dependencies = [
"const-random",
]
[[package]]
name = "dns-lookup"
version = "2.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5766087c2235fec47fafa4cfecc81e494ee679d0fd4a59887ea0919bfb0e4fc"
dependencies = [
"cfg-if",
"libc",
"socket2 0.5.5",
"windows-sys 0.48.0",
]
[[package]]
name = "dsl_auto_type"
version = "0.1.1"
@@ -2146,6 +2134,12 @@ dependencies = [
"slab",
]
[[package]]
name = "gen_ops"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "304de19db7028420975a296ab0fcbbc8e69438c4ed254a1e41e2a7f37d5f0e0a"
[[package]]
name = "generic-array"
version = "0.14.7"
@@ -2722,17 +2716,6 @@ version = "3.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02"
[[package]]
name = "io-lifetimes"
version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2"
dependencies = [
"hermit-abi",
"libc",
"windows-sys 0.48.0",
]
[[package]]
name = "io-uring"
version = "0.6.2"
@@ -2751,14 +2734,13 @@ checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3"
[[package]]
name = "is-terminal"
version = "0.4.7"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f"
checksum = "f23ff5ef2b80d608d61efee834934d862cd92461afc0560dedf493e4c033738b"
dependencies = [
"hermit-abi",
"io-lifetimes",
"rustix 0.37.25",
"windows-sys 0.48.0",
"libc",
"windows-sys 0.52.0",
]
[[package]]
@@ -2770,6 +2752,15 @@ dependencies = [
"either",
]
[[package]]
name = "itertools"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569"
dependencies = [
"either",
]
[[package]]
name = "itoa"
version = "1.0.6"
@@ -2884,18 +2875,6 @@ version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058"
[[package]]
name = "linux-raw-sys"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f051f77a7c8e6957c0696eac88f26b0117e54f52d3fc682ab19397a8812846a4"
[[package]]
name = "linux-raw-sys"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519"
[[package]]
name = "linux-raw-sys"
version = "0.4.13"
@@ -3013,7 +2992,7 @@ checksum = "7c4b80445aeb08e832d87bf1830049a924cdc1d6b7ef40b6b9b365bff17bf8ec"
dependencies = [
"libc",
"measured",
"procfs 0.16.0",
"procfs",
]
[[package]]
@@ -3058,7 +3037,7 @@ dependencies = [
"measured",
"measured-process",
"once_cell",
"procfs 0.14.2",
"procfs",
"prometheus",
"rand 0.8.5",
"rand_distr",
@@ -3587,7 +3566,7 @@ dependencies = [
"humantime",
"humantime-serde",
"hyper 0.14.26",
"itertools",
"itertools 0.10.5",
"leaky-bucket",
"md5",
"metrics",
@@ -3605,8 +3584,9 @@ dependencies = [
"postgres_connection",
"postgres_ffi",
"pq_proto",
"procfs 0.14.2",
"procfs",
"rand 0.8.5",
"range-set-blaze",
"regex",
"remote_storage",
"reqwest 0.12.4",
@@ -3657,7 +3637,7 @@ dependencies = [
"hex",
"humantime",
"humantime-serde",
"itertools",
"itertools 0.10.5",
"postgres_ffi",
"rand 0.8.5",
"serde",
@@ -3715,7 +3695,7 @@ dependencies = [
"hex-literal",
"humantime",
"humantime-serde",
"itertools",
"itertools 0.10.5",
"metrics",
"once_cell",
"pageserver_api",
@@ -4047,7 +4027,7 @@ name = "postgres_connection"
version = "0.1.0"
dependencies = [
"anyhow",
"itertools",
"itertools 0.10.5",
"once_cell",
"postgres",
"tokio-postgres",
@@ -4105,7 +4085,7 @@ version = "0.1.0"
dependencies = [
"byteorder",
"bytes",
"itertools",
"itertools 0.10.5",
"pin-project-lite",
"postgres-protocol",
"rand 0.8.5",
@@ -4151,21 +4131,6 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "procfs"
version = "0.14.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1de8dacb0873f77e6aefc6d71e044761fcc68060290f5b1089fcdf84626bb69"
dependencies = [
"bitflags 1.3.2",
"byteorder",
"chrono",
"flate2",
"hex",
"lazy_static",
"rustix 0.36.16",
]
[[package]]
name = "procfs"
version = "0.16.0"
@@ -4173,10 +4138,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "731e0d9356b0c25f16f33b5be79b1c57b562f141ebfcdb0ad8ac2c13a24293b4"
dependencies = [
"bitflags 2.4.1",
"chrono",
"flate2",
"hex",
"lazy_static",
"procfs-core",
"rustix 0.38.28",
"rustix",
]
[[package]]
@@ -4186,14 +4153,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d3554923a69f4ce04c4a754260c338f505ce22642d3830e049a399fc2059a29"
dependencies = [
"bitflags 2.4.1",
"chrono",
"hex",
]
[[package]]
name = "prometheus"
version = "0.13.3"
version = "0.13.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "449811d15fbdf5ceb5c1144416066429cf82316e2ec8ce0c1f6f8a02e7bbcf8c"
checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1"
dependencies = [
"cfg-if",
"fnv",
@@ -4201,7 +4169,7 @@ dependencies = [
"libc",
"memchr",
"parking_lot 0.12.1",
"procfs 0.14.2",
"procfs",
"thiserror",
]
@@ -4223,7 +4191,7 @@ checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270"
dependencies = [
"bytes",
"heck 0.4.1",
"itertools",
"itertools 0.10.5",
"lazy_static",
"log",
"multimap",
@@ -4244,7 +4212,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4"
dependencies = [
"anyhow",
"itertools",
"itertools 0.10.5",
"proc-macro2",
"quote",
"syn 1.0.109",
@@ -4301,7 +4269,7 @@ dependencies = [
"hyper-util",
"indexmap 2.0.1",
"ipnet",
"itertools",
"itertools 0.10.5",
"lasso",
"md5",
"measured",
@@ -4477,6 +4445,18 @@ dependencies = [
"rand_core 0.5.1",
]
[[package]]
name = "range-set-blaze"
version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8421b5d459262eabbe49048d362897ff3e3830b44eac6cfe341d6acb2f0f13d2"
dependencies = [
"gen_ops",
"itertools 0.12.1",
"num-integer",
"num-traits",
]
[[package]]
name = "rayon"
version = "1.7.0"
@@ -4645,7 +4625,7 @@ dependencies = [
"humantime",
"humantime-serde",
"hyper 0.14.26",
"itertools",
"itertools 0.10.5",
"metrics",
"once_cell",
"pin-project-lite",
@@ -4955,34 +4935,6 @@ dependencies = [
"nom",
]
[[package]]
name = "rustix"
version = "0.36.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6da3636faa25820d8648e0e31c5d519bbb01f72fdf57131f0f5f7da5fed36eab"
dependencies = [
"bitflags 1.3.2",
"errno",
"io-lifetimes",
"libc",
"linux-raw-sys 0.1.4",
"windows-sys 0.45.0",
]
[[package]]
name = "rustix"
version = "0.37.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4eb579851244c2c03e7c24f501c3432bed80b8f720af1d6e5b0e0f01555a035"
dependencies = [
"bitflags 1.3.2",
"errno",
"io-lifetimes",
"libc",
"linux-raw-sys 0.3.8",
"windows-sys 0.48.0",
]
[[package]]
name = "rustix"
version = "0.38.28"
@@ -5736,14 +5688,13 @@ dependencies = [
"control_plane",
"diesel",
"diesel_migrations",
"dns-lookup",
"fail",
"futures",
"git-version",
"hex",
"humantime",
"hyper 0.14.26",
"itertools",
"itertools 0.10.5",
"lasso",
"measured",
"metrics",
@@ -5752,6 +5703,7 @@ dependencies = [
"pageserver_client",
"postgres_connection",
"r2d2",
"rand 0.8.5",
"reqwest 0.12.4",
"routerify",
"scopeguard",
@@ -5807,9 +5759,10 @@ dependencies = [
"either",
"futures",
"futures-util",
"git-version",
"hex",
"humantime",
"itertools",
"itertools 0.10.5",
"once_cell",
"pageserver",
"pageserver_api",
@@ -5986,15 +5939,15 @@ dependencies = [
[[package]]
name = "tempfile"
version = "3.5.0"
version = "3.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9fbec84f381d5795b08656e4912bec604d162bff9291d6189a78f4c8ab87998"
checksum = "01ce4141aa927a6d1bd34a041795abd0db1cccba5d5f24b009f694bdf3a1f3fa"
dependencies = [
"cfg-if",
"fastrand 1.9.0",
"redox_syscall 0.3.5",
"rustix 0.37.25",
"windows-sys 0.45.0",
"fastrand 2.0.0",
"redox_syscall 0.4.1",
"rustix",
"windows-sys 0.52.0",
]
[[package]]
@@ -7191,15 +7144,6 @@ dependencies = [
"windows_x86_64_msvc 0.42.2",
]
[[package]]
name = "windows-sys"
version = "0.45.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0"
dependencies = [
"windows-targets 0.42.2",
]
[[package]]
name = "windows-sys"
version = "0.48.0"
@@ -7218,21 +7162,6 @@ dependencies = [
"windows-targets 0.52.4",
]
[[package]]
name = "windows-targets"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071"
dependencies = [
"windows_aarch64_gnullvm 0.42.2",
"windows_aarch64_msvc 0.42.2",
"windows_i686_gnu 0.42.2",
"windows_i686_msvc 0.42.2",
"windows_x86_64_gnu 0.42.2",
"windows_x86_64_gnullvm 0.42.2",
"windows_x86_64_msvc 0.42.2",
]
[[package]]
name = "windows-targets"
version = "0.48.0"
@@ -7462,7 +7391,7 @@ dependencies = [
"hmac",
"hyper 0.14.26",
"indexmap 1.9.3",
"itertools",
"itertools 0.10.5",
"libc",
"log",
"memchr",

View File

@@ -126,7 +126,7 @@ parquet = { version = "51.0.0", default-features = false, features = ["zstd"] }
parquet_derive = "51.0.0"
pbkdf2 = { version = "0.12.1", features = ["simple", "std"] }
pin-project-lite = "0.2"
procfs = "0.14"
procfs = "0.16"
prometheus = {version = "0.13", default-features=false, features = ["process"]} # removes protobuf dependency
prost = "0.11"
rand = "0.8"

View File

@@ -4,6 +4,11 @@ version = "0.1.0"
edition.workspace = true
license.workspace = true
[features]
default = []
# Enables test specific features.
testing = []
[dependencies]
anyhow.workspace = true
async-compression.workspace = true

View File

@@ -400,7 +400,15 @@ impl ComputeNode {
pub fn get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
let mut retry_period_ms = 500.0;
let mut attempts = 0;
let max_attempts = 10;
const DEFAULT_ATTEMPTS: u16 = 10;
#[cfg(feature = "testing")]
let max_attempts = if let Ok(v) = env::var("NEON_COMPUTE_TESTING_BASEBACKUP_RETRIES") {
u16::from_str(&v).unwrap()
} else {
DEFAULT_ATTEMPTS
};
#[cfg(not(feature = "testing"))]
let max_attempts = DEFAULT_ATTEMPTS;
loop {
let result = self.try_get_basebackup(compute_state, lsn);
match result {

View File

@@ -289,7 +289,7 @@ fn fill_remote_storage_secrets_vars(mut cmd: &mut Command) -> &mut Command {
fn fill_env_vars_prefixed_neon(mut cmd: &mut Command) -> &mut Command {
for (var, val) in std::env::vars() {
if var.starts_with("NEON_PAGESERVER_") {
if var.starts_with("NEON_") {
cmd = cmd.env(var, val);
}
}

View File

@@ -203,8 +203,9 @@ pub const XLR_BLOCK_ID_DATA_LONG: u8 = 254;
pub const XLR_BLOCK_ID_ORIGIN: u8 = 253;
pub const XLR_BLOCK_ID_TOPLEVEL_XID: u8 = 252;
pub const BKPBLOCK_FORK_MASK: u8 = 0x0F;
pub const _BKPBLOCK_FLAG_MASK: u8 = 0xF0;
pub const BKPBLOCK_FORK_MASK: u8 = 0x07;
pub const BKPBLOCK_FLAG_MASK: u8 = 0xF8;
pub const BKPBLOCK_OPAQUE: u8 = 0x08; /* page has no page header */
pub const BKPBLOCK_HAS_IMAGE: u8 = 0x10; /* block data is an XLogRecordBlockImage */
pub const BKPBLOCK_HAS_DATA: u8 = 0x20;
pub const BKPBLOCK_WILL_INIT: u8 = 0x40; /* redo will re-init the page */

View File

@@ -144,6 +144,7 @@ impl RemotePath {
///
/// The WithDelimiter mode will populate `prefixes` and `keys` in the result. The
/// NoDelimiter mode will only populate `keys`.
#[derive(Copy, Clone)]
pub enum ListingMode {
WithDelimiter,
NoDelimiter,

View File

@@ -49,6 +49,7 @@ postgres_backend.workspace = true
postgres-protocol.workspace = true
postgres-types.workspace = true
rand.workspace = true
range-set-blaze = { version = "0.1.16", features = ["alloc"] }
regex.workspace = true
scopeguard.workspace = true
serde.workspace = true

View File

@@ -1,3 +1,4 @@
use criterion::measurement::WallTime;
use pageserver::keyspace::{KeyPartitioning, KeySpace};
use pageserver::repository::Key;
use pageserver::tenant::layer_map::LayerMap;
@@ -15,7 +16,11 @@ use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use criterion::{black_box, criterion_group, criterion_main, BenchmarkGroup, Criterion};
fn fixture_path(relative: &str) -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(relative)
}
fn build_layer_map(filename_dump: PathBuf) -> LayerMap {
let mut layer_map = LayerMap::default();
@@ -109,7 +114,7 @@ fn uniform_key_partitioning(layer_map: &LayerMap, _lsn: Lsn) -> KeyPartitioning
// between each test run.
fn bench_from_captest_env(c: &mut Criterion) {
// TODO consider compressing this file
let layer_map = build_layer_map(PathBuf::from("benches/odd-brook-layernames.txt"));
let layer_map = build_layer_map(fixture_path("benches/odd-brook-layernames.txt"));
let queries: Vec<(Key, Lsn)> = uniform_query_pattern(&layer_map);
// Test with uniform query pattern
@@ -139,7 +144,7 @@ fn bench_from_captest_env(c: &mut Criterion) {
fn bench_from_real_project(c: &mut Criterion) {
// Init layer map
let now = Instant::now();
let layer_map = build_layer_map(PathBuf::from("benches/odd-brook-layernames.txt"));
let layer_map = build_layer_map(fixture_path("benches/odd-brook-layernames.txt"));
println!("Finished layer map init in {:?}", now.elapsed());
// Choose uniformly distributed queries
@@ -242,7 +247,72 @@ fn bench_sequential(c: &mut Criterion) {
group.finish();
}
fn bench_visibility_with_map(
group: &mut BenchmarkGroup<WallTime>,
layer_map: LayerMap,
read_points: Vec<Lsn>,
bench_name: &str,
) {
group.bench_function(bench_name, |b| {
b.iter(|| black_box(layer_map.get_visibility(read_points.clone())));
});
}
// Benchmark using synthetic data. Arrange image layers on stacked diagonal lines.
fn bench_visibility(c: &mut Criterion) {
let mut group = c.benchmark_group("visibility");
{
// Init layer map. Create 100_000 layers arranged in 1000 diagonal lines.
let now = Instant::now();
let mut layer_map = LayerMap::default();
let mut updates = layer_map.batch_update();
for i in 0..100_000 {
let i32 = (i as u32) % 100;
let zero = Key::from_hex("000000000000000000000000000000000000").unwrap();
let layer = PersistentLayerDesc::new_img(
TenantShardId::unsharded(TenantId::generate()),
TimelineId::generate(),
zero.add(10 * i32)..zero.add(10 * i32 + 1),
Lsn(i),
0,
);
updates.insert_historic(layer);
}
updates.flush();
println!("Finished layer map init in {:?}", now.elapsed());
let mut read_points = Vec::new();
for i in (0..100_000).step_by(1000) {
read_points.push(Lsn(i));
}
bench_visibility_with_map(&mut group, layer_map, read_points, "sequential");
}
{
let layer_map = build_layer_map(fixture_path("benches/odd-brook-layernames.txt"));
let read_points = vec![Lsn(0x1C760FA190)];
bench_visibility_with_map(&mut group, layer_map, read_points, "real_map");
let layer_map = build_layer_map(fixture_path("benches/odd-brook-layernames.txt"));
let read_points = vec![
Lsn(0x1C760FA190),
Lsn(0x000000931BEAD539),
Lsn(0x000000931BF63011),
Lsn(0x000000931B33AE68),
Lsn(0x00000038E67ABFA0),
Lsn(0x000000931B33AE68),
Lsn(0x000000914E3F38F0),
Lsn(0x000000931B33AE68),
];
bench_visibility_with_map(&mut group, layer_map, read_points, "real_map_many_branches");
}
group.finish();
}
criterion_group!(group_1, bench_from_captest_env);
criterion_group!(group_2, bench_from_real_project);
criterion_group!(group_3, bench_sequential);
criterion_main!(group_1, group_2, group_3);
criterion_group!(group_4, bench_visibility);
criterion_main!(group_1, group_2, group_3, group_4);

View File

@@ -17,11 +17,9 @@ use pageserver::config::PageserverIdentity;
use pageserver::control_plane_client::ControlPlaneClient;
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
use pageserver::task_mgr::WALRECEIVER_RUNTIME;
use pageserver::task_mgr::{COMPUTE_REQUEST_RUNTIME, WALRECEIVER_RUNTIME};
use pageserver::tenant::{secondary, TenantSharedResources};
use pageserver::{
CancellableTask, ConsumptionMetricsTasks, HttpEndpointListener, LibpqEndpointListener,
};
use pageserver::{CancellableTask, ConsumptionMetricsTasks, HttpEndpointListener};
use remote_storage::GenericRemoteStorage;
use tokio::signal::unix::SignalKind;
use tokio::time::Instant;
@@ -31,11 +29,9 @@ use tracing::*;
use metrics::set_build_info_metric;
use pageserver::{
config::PageServerConf,
context::{DownloadBehavior, RequestContext},
deletion_queue::DeletionQueue,
http, page_cache, page_service, task_mgr,
task_mgr::TaskKind,
task_mgr::{BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME},
task_mgr::{BACKGROUND_RUNTIME, MGMT_REQUEST_RUNTIME},
tenant::mgr,
virtual_file,
};
@@ -129,6 +125,7 @@ fn main() -> anyhow::Result<()> {
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
info!(?conf.get_impl, "starting with get page implementation");
info!(?conf.get_vectored_impl, "starting with vectored get page implementation");
info!(?conf.compact_level0_phase1_value_access, "starting with setting for compact_level0_phase1_value_access");
let tenants_path = conf.tenants_path();
if !tenants_path.exists() {
@@ -593,30 +590,13 @@ fn start_pageserver(
// Spawn a task to listen for libpq connections. It will spawn further tasks
// for each connection. We created the listener earlier already.
let libpq_listener = {
let cancel = CancellationToken::new();
let libpq_ctx = RequestContext::todo_child(
TaskKind::LibpqEndpointListener,
// listener task shouldn't need to download anything. (We will
// create a separate sub-contexts for each connection, with their
// own download behavior. This context is used only to listen and
// accept connections.)
DownloadBehavior::Error,
);
let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
"libpq listener",
page_service::libpq_listener_main(
tenant_manager.clone(),
pg_auth,
pageserver_listener,
conf.pg_auth_type,
libpq_ctx,
cancel.clone(),
),
));
LibpqEndpointListener(CancellableTask { task, cancel })
};
let page_service = page_service::spawn(conf, tenant_manager.clone(), pg_auth, {
let _entered = COMPUTE_REQUEST_RUNTIME.enter(); // TcpListener::from_std requires it
pageserver_listener
.set_nonblocking(true)
.context("set listener to nonblocking")?;
tokio::net::TcpListener::from_std(pageserver_listener).context("create tokio listener")?
});
let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard());
@@ -644,7 +624,7 @@ fn start_pageserver(
shutdown_pageserver.take();
pageserver::shutdown_pageserver(
http_endpoint_listener,
libpq_listener,
page_service,
consumption_metrics_tasks,
disk_usage_eviction_task,
&tenant_manager,

View File

@@ -29,6 +29,7 @@ use utils::{
logging::LogFormat,
};
use crate::tenant::timeline::compaction::CompactL0Phase1ValueAccess;
use crate::tenant::vectored_blob_io::MaxVectoredReadBytes;
use crate::tenant::{config::TenantConfOpt, timeline::GetImpl};
use crate::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
@@ -295,6 +296,10 @@ pub struct PageServerConf {
pub ephemeral_bytes_per_memory_kb: usize,
pub l0_flush: L0FlushConfig,
/// This flag is temporary and will be removed after gradual rollout.
/// See <https://github.com/neondatabase/neon/issues/8184>.
pub compact_level0_phase1_value_access: CompactL0Phase1ValueAccess,
}
/// We do not want to store this in a PageServerConf because the latter may be logged
@@ -401,6 +406,8 @@ struct PageServerConfigBuilder {
ephemeral_bytes_per_memory_kb: BuilderValue<usize>,
l0_flush: BuilderValue<L0FlushConfig>,
compact_level0_phase1_value_access: BuilderValue<CompactL0Phase1ValueAccess>,
}
impl PageServerConfigBuilder {
@@ -490,6 +497,7 @@ impl PageServerConfigBuilder {
validate_vectored_get: Set(DEFAULT_VALIDATE_VECTORED_GET),
ephemeral_bytes_per_memory_kb: Set(DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
l0_flush: Set(L0FlushConfig::default()),
compact_level0_phase1_value_access: Set(CompactL0Phase1ValueAccess::default()),
}
}
}
@@ -673,6 +681,10 @@ impl PageServerConfigBuilder {
self.l0_flush = BuilderValue::Set(value);
}
pub fn compact_level0_phase1_value_access(&mut self, value: CompactL0Phase1ValueAccess) {
self.compact_level0_phase1_value_access = BuilderValue::Set(value);
}
pub fn build(self, id: NodeId) -> anyhow::Result<PageServerConf> {
let default = Self::default_values();
@@ -730,6 +742,7 @@ impl PageServerConfigBuilder {
image_compression,
ephemeral_bytes_per_memory_kb,
l0_flush,
compact_level0_phase1_value_access,
}
CUSTOM LOGIC
{
@@ -1002,6 +1015,9 @@ impl PageServerConf {
"l0_flush" => {
builder.l0_flush(utils::toml_edit_ext::deserialize_item(item).context("l0_flush")?)
}
"compact_level0_phase1_value_access" => {
builder.compact_level0_phase1_value_access(utils::toml_edit_ext::deserialize_item(item).context("compact_level0_phase1_value_access")?)
}
_ => bail!("unrecognized pageserver option '{key}'"),
}
}
@@ -1086,6 +1102,7 @@ impl PageServerConf {
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
l0_flush: L0FlushConfig::default(),
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),
}
}
}
@@ -1327,6 +1344,7 @@ background_task_maximum_delay = '334 s'
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
l0_flush: L0FlushConfig::default(),
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),
},
"Correct defaults should be used when no config values are provided"
);
@@ -1401,6 +1419,7 @@ background_task_maximum_delay = '334 s'
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
l0_flush: L0FlushConfig::default(),
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),
},
"Should be able to parse all basic config values correctly"
);

View File

@@ -296,6 +296,11 @@ impl From<GetActiveTenantError> for ApiError {
GetActiveTenantError::WaitForActiveTimeout { .. } => {
ApiError::ResourceUnavailable(format!("{}", e).into())
}
GetActiveTenantError::SwitchedTenant => {
// in our HTTP handlers, this error doesn't happen
// TODO: separate error types
ApiError::ResourceUnavailable("switched tenant".into())
}
}
}
}

View File

@@ -12,6 +12,8 @@ pub mod disk_usage_eviction_task;
pub mod http;
pub mod import_datadir;
pub mod l0_flush;
use futures::{stream::FuturesUnordered, StreamExt};
pub use pageserver_api::keyspace;
use tokio_util::sync::CancellationToken;
pub mod aux_file;
@@ -30,14 +32,13 @@ pub mod walingest;
pub mod walrecord;
pub mod walredo;
use crate::task_mgr::TaskKind;
use camino::Utf8Path;
use deletion_queue::DeletionQueue;
use tenant::{
mgr::{BackgroundPurges, TenantManager},
secondary,
};
use tracing::info;
use tracing::{info, info_span};
/// Current storage format version
///
@@ -63,7 +64,6 @@ pub struct CancellableTask {
pub cancel: CancellationToken,
}
pub struct HttpEndpointListener(pub CancellableTask);
pub struct LibpqEndpointListener(pub CancellableTask);
pub struct ConsumptionMetricsTasks(pub CancellableTask);
pub struct DiskUsageEvictionTask(pub CancellableTask);
impl CancellableTask {
@@ -77,7 +77,7 @@ impl CancellableTask {
#[allow(clippy::too_many_arguments)]
pub async fn shutdown_pageserver(
http_listener: HttpEndpointListener,
libpq_listener: LibpqEndpointListener,
page_service: page_service::Listener,
consumption_metrics_worker: ConsumptionMetricsTasks,
disk_usage_eviction_task: Option<DiskUsageEvictionTask>,
tenant_manager: &TenantManager,
@@ -87,10 +87,83 @@ pub async fn shutdown_pageserver(
exit_code: i32,
) {
use std::time::Duration;
// If the orderly shutdown below takes too long, we still want to make
// sure that all walredo processes are killed and wait()ed on by us, not systemd.
//
// (Leftover walredo processes are the hypothesized trigger for the systemd freezes
// that we keep seeing in prod => https://github.com/neondatabase/cloud/issues/11387.
//
// We use a thread instead of a tokio task because the background runtime is likely busy
// with the final flushing / uploads. This activity here has priority, and due to lack
// of scheduling priority feature sin the tokio scheduler, using a separate thread is
// an effective priority booster.
let walredo_extraordinary_shutdown_thread_span = {
let span = info_span!(parent: None, "walredo_extraordinary_shutdown_thread");
span.follows_from(tracing::Span::current());
span
};
let walredo_extraordinary_shutdown_thread_cancel = CancellationToken::new();
let walredo_extraordinary_shutdown_thread = std::thread::spawn({
let walredo_extraordinary_shutdown_thread_cancel =
walredo_extraordinary_shutdown_thread_cancel.clone();
move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let _entered = rt.enter();
let _entered = walredo_extraordinary_shutdown_thread_span.enter();
if let Ok(()) = rt.block_on(tokio::time::timeout(
Duration::from_secs(8),
walredo_extraordinary_shutdown_thread_cancel.cancelled(),
)) {
info!("cancellation requested");
return;
}
let managers = tenant::WALREDO_MANAGERS
.lock()
.unwrap()
// prevents new walredo managers from being inserted
.take()
.expect("only we take()");
// Use FuturesUnordered to get in queue early for each manager's
// heavier_once_cell semaphore wait list.
// Also, for idle tenants that for some reason haven't
// shut down yet, it's quite likely that we're not going
// to get Poll::Pending once.
let mut futs: FuturesUnordered<_> = managers
.into_iter()
.filter_map(|(_, mgr)| mgr.upgrade())
.map(|mgr| async move { tokio::task::unconstrained(mgr.shutdown()).await })
.collect();
info!(count=%futs.len(), "built FuturesUnordered");
let mut last_log_at = std::time::Instant::now();
#[derive(Debug, Default)]
struct Results {
initiated: u64,
already: u64,
}
let mut results = Results::default();
while let Some(we_initiated) = rt.block_on(futs.next()) {
if we_initiated {
results.initiated += 1;
} else {
results.already += 1;
}
if last_log_at.elapsed() > Duration::from_millis(100) {
info!(remaining=%futs.len(), ?results, "progress");
last_log_at = std::time::Instant::now();
}
}
info!(?results, "done");
}
});
// Shut down the libpq endpoint task. This prevents new connections from
// being accepted.
timed(
libpq_listener.0.shutdown(),
let remaining_connections = timed(
page_service.stop_accepting(),
"shutdown LibpqEndpointListener",
Duration::from_secs(1),
)
@@ -108,7 +181,7 @@ pub async fn shutdown_pageserver(
// Shut down any page service tasks: any in-progress work for particular timelines or tenants
// should already have been canclled via mgr::shutdown_all_tenants
timed(
task_mgr::shutdown_tasks(Some(TaskKind::PageRequestHandler), None, None),
remaining_connections.shutdown(),
"shutdown PageRequestHandlers",
Duration::from_secs(1),
)
@@ -162,6 +235,12 @@ pub async fn shutdown_pageserver(
Duration::from_secs(1),
)
.await;
info!("cancel & join walredo_extraordinary_shutdown_thread");
walredo_extraordinary_shutdown_thread_cancel.cancel();
walredo_extraordinary_shutdown_thread.join().unwrap();
info!("walredo_extraordinary_shutdown_thread done");
info!("Shut down successfully completed");
std::process::exit(exit_code);
}

View File

@@ -525,6 +525,15 @@ static RESIDENT_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
static VISIBLE_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_visible_physical_size",
"The size of the layer files present in the pageserver's filesystem.",
&["tenant_id", "shard_id", "timeline_id"]
)
.expect("failed to define a metric")
});
pub(crate) static RESIDENT_PHYSICAL_SIZE_GLOBAL: Lazy<UIntGauge> = Lazy::new(|| {
register_uint_gauge!(
"pageserver_resident_physical_size_global",
@@ -2204,6 +2213,7 @@ pub(crate) struct TimelineMetrics {
pub(crate) layer_count_delta: UIntGauge,
pub standby_horizon_gauge: IntGauge,
pub resident_physical_size_gauge: UIntGauge,
pub visible_physical_size_gauge: UIntGauge,
/// copy of LayeredTimeline.current_logical_size
pub current_logical_size_gauge: UIntGauge,
pub aux_file_size_gauge: IntGauge,
@@ -2326,6 +2336,9 @@ impl TimelineMetrics {
let resident_physical_size_gauge = RESIDENT_PHYSICAL_SIZE
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let visible_physical_size_gauge = VISIBLE_PHYSICAL_SIZE
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
// TODO: we shouldn't expose this metric
let current_logical_size_gauge = CURRENT_LOGICAL_SIZE
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
@@ -2380,6 +2393,7 @@ impl TimelineMetrics {
layer_count_delta,
standby_horizon_gauge,
resident_physical_size_gauge,
visible_physical_size_gauge,
current_logical_size_gauge,
aux_file_size_gauge,
directory_entries_count_gauge,
@@ -2431,6 +2445,7 @@ impl TimelineMetrics {
RESIDENT_PHYSICAL_SIZE_GLOBAL.sub(self.resident_physical_size_get());
let _ = RESIDENT_PHYSICAL_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);
}
let _ = VISIBLE_PHYSICAL_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = CURRENT_LOGICAL_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);
if let Some(metric) = Lazy::get(&DIRECTORY_ENTRIES_COUNT) {
let _ = metric.remove_label_values(&[tenant_id, shard_id, timeline_id]);

File diff suppressed because it is too large Load Diff

View File

@@ -8,8 +8,7 @@ use std::time::Duration;
pub use pageserver_api::key::{Key, KEY_SIZE};
/// A 'value' stored for a one Key.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum Value {
/// An Image value contains a full copy of the value
Image(Bytes),

View File

@@ -33,6 +33,7 @@ use remote_storage::GenericRemoteStorage;
use remote_storage::TimeoutOrCancel;
use std::collections::BTreeMap;
use std::fmt;
use std::sync::Weak;
use std::time::SystemTime;
use storage_broker::BrokerClientChannel;
use tokio::io::BufReader;
@@ -312,14 +313,66 @@ impl std::fmt::Debug for Tenant {
}
pub(crate) enum WalRedoManager {
Prod(PostgresRedoManager),
Prod(WalredoManagerId, PostgresRedoManager),
#[cfg(test)]
Test(harness::TestRedoManager),
}
impl From<PostgresRedoManager> for WalRedoManager {
fn from(mgr: PostgresRedoManager) -> Self {
Self::Prod(mgr)
#[derive(thiserror::Error, Debug)]
#[error("pageserver is shutting down")]
pub(crate) struct GlobalShutDown;
impl WalRedoManager {
pub(crate) fn new(mgr: PostgresRedoManager) -> Result<Arc<Self>, GlobalShutDown> {
let id = WalredoManagerId::next();
let arc = Arc::new(Self::Prod(id, mgr));
let mut guard = WALREDO_MANAGERS.lock().unwrap();
match &mut *guard {
Some(map) => {
map.insert(id, Arc::downgrade(&arc));
Ok(arc)
}
None => Err(GlobalShutDown),
}
}
}
impl Drop for WalRedoManager {
fn drop(&mut self) {
match self {
Self::Prod(id, _) => {
let mut guard = WALREDO_MANAGERS.lock().unwrap();
if let Some(map) = &mut *guard {
map.remove(id).expect("new() registers, drop() unregisters");
}
}
#[cfg(test)]
Self::Test(_) => {
// Not applicable to test redo manager
}
}
}
}
/// Global registry of all walredo managers so that [`crate::shutdown_pageserver`] can shut down
/// the walredo processes outside of the regular order.
///
/// This is necessary to work around a systemd bug where it freezes if there are
/// walredo processes left => <https://github.com/neondatabase/cloud/issues/11387>
#[allow(clippy::type_complexity)]
pub(crate) static WALREDO_MANAGERS: once_cell::sync::Lazy<
Mutex<Option<HashMap<WalredoManagerId, Weak<WalRedoManager>>>>,
> = once_cell::sync::Lazy::new(|| Mutex::new(Some(HashMap::new())));
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
pub(crate) struct WalredoManagerId(u64);
impl WalredoManagerId {
pub fn next() -> Self {
static NEXT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
let id = NEXT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if id == 0 {
panic!("WalredoManagerId::new() returned 0, indicating wraparound, risking it's no longer unique");
}
Self(id)
}
}
@@ -331,19 +384,20 @@ impl From<harness::TestRedoManager> for WalRedoManager {
}
impl WalRedoManager {
pub(crate) async fn shutdown(&self) {
pub(crate) async fn shutdown(&self) -> bool {
match self {
Self::Prod(mgr) => mgr.shutdown().await,
Self::Prod(_, mgr) => mgr.shutdown().await,
#[cfg(test)]
Self::Test(_) => {
// Not applicable to test redo manager
true
}
}
}
pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
match self {
Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout),
Self::Prod(_, mgr) => mgr.maybe_quiesce(idle_timeout),
#[cfg(test)]
Self::Test(_) => {
// Not applicable to test redo manager
@@ -363,7 +417,7 @@ impl WalRedoManager {
pg_version: u32,
) -> Result<bytes::Bytes, walredo::Error> {
match self {
Self::Prod(mgr) => {
Self::Prod(_, mgr) => {
mgr.request_redo(key, lsn, base_img, records, pg_version)
.await
}
@@ -377,7 +431,7 @@ impl WalRedoManager {
pub(crate) fn status(&self) -> Option<WalRedoManagerStatus> {
match self {
WalRedoManager::Prod(m) => Some(m.status()),
WalRedoManager::Prod(_, m) => Some(m.status()),
#[cfg(test)]
WalRedoManager::Test(_) => None,
}
@@ -386,6 +440,8 @@ impl WalRedoManager {
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
pub enum GetTimelineError {
#[error("Timeline is shutting down")]
ShuttingDown,
#[error("Timeline {tenant_id}/{timeline_id} is not active, state: {state:?}")]
NotActive {
tenant_id: TenantShardId,
@@ -675,11 +731,9 @@ impl Tenant {
init_order: Option<InitializationOrder>,
mode: SpawnMode,
ctx: &RequestContext,
) -> Arc<Tenant> {
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
conf,
tenant_shard_id,
)));
) -> Result<Arc<Tenant>, GlobalShutDown> {
let wal_redo_manager =
WalRedoManager::new(PostgresRedoManager::new(conf, tenant_shard_id))?;
let TenantSharedResources {
broker_client,
@@ -878,7 +932,7 @@ impl Tenant {
}
.instrument(tracing::info_span!(parent: None, "attach", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), gen=?generation)),
);
tenant
Ok(tenant)
}
#[instrument(skip_all)]
@@ -1580,7 +1634,7 @@ impl Tenant {
self: Arc<Self>,
timeline_id: TimelineId,
) -> Result<(), DeleteTimelineError> {
DeleteTimelineFlow::run(&self, timeline_id, false).await?;
DeleteTimelineFlow::run(&self, timeline_id).await?;
Ok(())
}
@@ -6909,7 +6963,11 @@ mod tests {
vec![
// Image layer at GC horizon
PersistentLayerKey {
key_range: Key::MIN..Key::MAX,
key_range: {
let mut key = Key::MAX;
key.field6 -= 1;
Key::MIN..key
},
lsn_range: Lsn(0x30)..Lsn(0x31),
is_delta: false
},
@@ -6928,6 +6986,15 @@ mod tests {
]
);
// increase GC horizon and compact again
{
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
guard.cutoffs.time = Lsn(0x40);
guard.cutoffs.space = Lsn(0x40);
}
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
Ok(())
}
@@ -7279,6 +7346,15 @@ mod tests {
);
}
// increase GC horizon and compact again
{
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
guard.cutoffs.time = Lsn(0x40);
guard.cutoffs.space = Lsn(0x40);
}
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
Ok(())
}
@@ -7347,6 +7423,7 @@ mod tests {
Lsn(0x60),
&[Lsn(0x20), Lsn(0x40), Lsn(0x50)],
3,
None,
)
.await
.unwrap();
@@ -7471,7 +7548,7 @@ mod tests {
),
];
let res = tline
.generate_key_retention(key, &history, Lsn(0x60), &[Lsn(0x40), Lsn(0x50)], 3)
.generate_key_retention(key, &history, Lsn(0x60), &[Lsn(0x40), Lsn(0x50)], 3, None)
.await
.unwrap();
let expected_res = KeyHistoryRetention {
@@ -7517,6 +7594,114 @@ mod tests {
};
assert_eq!(res, expected_res);
// In case of branch compaction, the branch itself does not have the full history, and we need to provide
// the ancestor image in the test case.
let history = vec![
(
key,
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_append(";0x20")),
),
(
key,
Lsn(0x30),
Value::WalRecord(NeonWalRecord::wal_append(";0x30")),
),
(
key,
Lsn(0x40),
Value::WalRecord(NeonWalRecord::wal_append(";0x40")),
),
(
key,
Lsn(0x70),
Value::WalRecord(NeonWalRecord::wal_append(";0x70")),
),
];
let res = tline
.generate_key_retention(
key,
&history,
Lsn(0x60),
&[],
3,
Some((key, Lsn(0x10), Bytes::copy_from_slice(b"0x10"))),
)
.await
.unwrap();
let expected_res = KeyHistoryRetention {
below_horizon: vec![(
Lsn(0x60),
KeyLogAtLsn(vec![(
Lsn(0x60),
Value::Image(Bytes::copy_from_slice(b"0x10;0x20;0x30;0x40")), // use the ancestor image to reconstruct the page
)]),
)],
above_horizon: KeyLogAtLsn(vec![(
Lsn(0x70),
Value::WalRecord(NeonWalRecord::wal_append(";0x70")),
)]),
};
assert_eq!(res, expected_res);
let history = vec![
(
key,
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_append(";0x20")),
),
(
key,
Lsn(0x40),
Value::WalRecord(NeonWalRecord::wal_append(";0x40")),
),
(
key,
Lsn(0x60),
Value::WalRecord(NeonWalRecord::wal_append(";0x60")),
),
(
key,
Lsn(0x70),
Value::WalRecord(NeonWalRecord::wal_append(";0x70")),
),
];
let res = tline
.generate_key_retention(
key,
&history,
Lsn(0x60),
&[Lsn(0x30)],
3,
Some((key, Lsn(0x10), Bytes::copy_from_slice(b"0x10"))),
)
.await
.unwrap();
let expected_res = KeyHistoryRetention {
below_horizon: vec![
(
Lsn(0x30),
KeyLogAtLsn(vec![(
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_append(";0x20")),
)]),
),
(
Lsn(0x60),
KeyLogAtLsn(vec![(
Lsn(0x60),
Value::Image(Bytes::copy_from_slice(b"0x10;0x20;0x40;0x60")),
)]),
),
],
above_horizon: KeyLogAtLsn(vec![(
Lsn(0x70),
Value::WalRecord(NeonWalRecord::wal_append(";0x70")),
)]),
};
assert_eq!(res, expected_res);
Ok(())
}
@@ -7674,6 +7859,10 @@ mod tests {
];
let verify_result = || async {
let gc_horizon = {
let gc_info = tline.gc_info.read().unwrap();
gc_info.cutoffs.time
};
for idx in 0..10 {
assert_eq!(
tline
@@ -7684,7 +7873,7 @@ mod tests {
);
assert_eq!(
tline
.get(get_key(idx as u32), Lsn(0x30), &ctx)
.get(get_key(idx as u32), gc_horizon, &ctx)
.await
.unwrap(),
&expected_result_at_gc_horizon[idx]
@@ -7710,6 +7899,205 @@ mod tests {
let cancel = CancellationToken::new();
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
verify_result().await;
// compact again
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
verify_result().await;
// increase GC horizon and compact again
{
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
guard.cutoffs.time = Lsn(0x38);
guard.cutoffs.space = Lsn(0x38);
}
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
verify_result().await; // no wals between 0x30 and 0x38, so we should obtain the same result
// not increasing the GC horizon and compact again
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
verify_result().await;
Ok(())
}
#[tokio::test]
async fn test_simple_bottom_most_compaction_on_branch() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_simple_bottom_most_compaction_on_branch").await?;
let (tenant, ctx) = harness.load().await;
fn get_key(id: u32) -> Key {
let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
key.field6 = id;
key
}
let img_layer = (0..10)
.map(|id| (get_key(id), Bytes::from(format!("value {id}@0x10"))))
.collect_vec();
let delta1 = vec![
(
get_key(1),
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_append("@0x20")),
),
(
get_key(2),
Lsn(0x30),
Value::WalRecord(NeonWalRecord::wal_append("@0x30")),
),
(
get_key(3),
Lsn(0x28),
Value::WalRecord(NeonWalRecord::wal_append("@0x28")),
),
(
get_key(3),
Lsn(0x30),
Value::WalRecord(NeonWalRecord::wal_append("@0x30")),
),
(
get_key(3),
Lsn(0x40),
Value::WalRecord(NeonWalRecord::wal_append("@0x40")),
),
];
let delta2 = vec![
(
get_key(5),
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_append("@0x20")),
),
(
get_key(6),
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_append("@0x20")),
),
];
let delta3 = vec![
(
get_key(8),
Lsn(0x48),
Value::WalRecord(NeonWalRecord::wal_append("@0x48")),
),
(
get_key(9),
Lsn(0x48),
Value::WalRecord(NeonWalRecord::wal_append("@0x48")),
),
];
let parent_tline = tenant
.create_test_timeline_with_layers(
TIMELINE_ID,
Lsn(0x10),
DEFAULT_PG_VERSION,
&ctx,
vec![], // delta layers
vec![(Lsn(0x18), img_layer)], // image layers
Lsn(0x18),
)
.await?;
parent_tline.add_extra_test_dense_keyspace(KeySpace::single(get_key(0)..get_key(10)));
let branch_tline = tenant
.branch_timeline_test_with_layers(
&parent_tline,
NEW_TIMELINE_ID,
Some(Lsn(0x18)),
&ctx,
vec![
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x48), delta1),
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x48), delta2),
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x48)..Lsn(0x50), delta3),
], // delta layers
vec![], // image layers
Lsn(0x50),
)
.await?;
branch_tline.add_extra_test_dense_keyspace(KeySpace::single(get_key(0)..get_key(10)));
{
// Update GC info
let mut guard = parent_tline.gc_info.write().unwrap();
*guard = GcInfo {
retain_lsns: vec![(Lsn(0x18), branch_tline.timeline_id)],
cutoffs: GcCutoffs {
time: Lsn(0x10),
space: Lsn(0x10),
},
leases: Default::default(),
within_ancestor_pitr: false,
};
}
{
// Update GC info
let mut guard = branch_tline.gc_info.write().unwrap();
*guard = GcInfo {
retain_lsns: vec![(Lsn(0x40), branch_tline.timeline_id)],
cutoffs: GcCutoffs {
time: Lsn(0x50),
space: Lsn(0x50),
},
leases: Default::default(),
within_ancestor_pitr: false,
};
}
let expected_result_at_gc_horizon = [
Bytes::from_static(b"value 0@0x10"),
Bytes::from_static(b"value 1@0x10@0x20"),
Bytes::from_static(b"value 2@0x10@0x30"),
Bytes::from_static(b"value 3@0x10@0x28@0x30@0x40"),
Bytes::from_static(b"value 4@0x10"),
Bytes::from_static(b"value 5@0x10@0x20"),
Bytes::from_static(b"value 6@0x10@0x20"),
Bytes::from_static(b"value 7@0x10"),
Bytes::from_static(b"value 8@0x10@0x48"),
Bytes::from_static(b"value 9@0x10@0x48"),
];
let expected_result_at_lsn_40 = [
Bytes::from_static(b"value 0@0x10"),
Bytes::from_static(b"value 1@0x10@0x20"),
Bytes::from_static(b"value 2@0x10@0x30"),
Bytes::from_static(b"value 3@0x10@0x28@0x30@0x40"),
Bytes::from_static(b"value 4@0x10"),
Bytes::from_static(b"value 5@0x10@0x20"),
Bytes::from_static(b"value 6@0x10@0x20"),
Bytes::from_static(b"value 7@0x10"),
Bytes::from_static(b"value 8@0x10"),
Bytes::from_static(b"value 9@0x10"),
];
let verify_result = || async {
for idx in 0..10 {
assert_eq!(
branch_tline
.get(get_key(idx as u32), Lsn(0x50), &ctx)
.await
.unwrap(),
&expected_result_at_gc_horizon[idx]
);
assert_eq!(
branch_tline
.get(get_key(idx as u32), Lsn(0x40), &ctx)
.await
.unwrap(),
&expected_result_at_lsn_40[idx]
);
}
};
verify_result().await;
let cancel = CancellationToken::new();
branch_tline.compact_with_gc(&cancel, &ctx).await.unwrap();
verify_result().await;

View File

@@ -296,13 +296,19 @@ where
let mut stack = Vec::new();
stack.push((self.root_blk, None));
let block_cursor = self.reader.block_cursor();
let mut node_buf = [0_u8; PAGE_SZ];
while let Some((node_blknum, opt_iter)) = stack.pop() {
// Locate the node.
let node_buf = block_cursor
// Read the node, through the PS PageCache, into local variable `node_buf`.
// We could keep the page cache read guard alive, but, at the time of writing,
// we run quite small PS PageCache s => can't risk running out of
// PageCache space because this stream isn't consumed fast enough.
let page_read_guard = block_cursor
.read_blk(self.start_blk + node_blknum, ctx)
.await?;
node_buf.copy_from_slice(page_read_guard.as_ref());
drop(page_read_guard); // drop page cache read guard early
let node = OnDiskNode::deparse(node_buf.as_ref())?;
let node = OnDiskNode::deparse(&node_buf)?;
let prefix_len = node.prefix_len as usize;
let suffix_len = node.suffix_len as usize;
@@ -345,6 +351,7 @@ where
Either::Left(idx..node.num_children.into())
};
// idx points to the first match now. Keep going from there
while let Some(idx) = iter.next() {
let key_off = idx * suffix_len;

View File

@@ -51,7 +51,8 @@ use crate::keyspace::KeyPartitioning;
use crate::repository::Key;
use crate::tenant::storage_layer::InMemoryLayer;
use anyhow::Result;
use pageserver_api::keyspace::KeySpaceAccum;
use pageserver_api::keyspace::{KeySpace, KeySpaceAccum};
use range_set_blaze::{CheckSortedDisjoint, RangeSetBlaze};
use std::collections::{HashMap, VecDeque};
use std::iter::Peekable;
use std::ops::Range;
@@ -61,7 +62,7 @@ use utils::lsn::Lsn;
use historic_layer_coverage::BufferedHistoricLayerCoverage;
pub use historic_layer_coverage::LayerKey;
use super::storage_layer::PersistentLayerDesc;
use super::storage_layer::{LayerVisibilityHint, PersistentLayerDesc};
///
/// LayerMap tracks what layers exist on a timeline.
@@ -871,11 +872,183 @@ impl LayerMap {
println!("End dump LayerMap");
Ok(())
}
/// `read_points` represent the tip of a timeline and any branch points, i.e. the places
/// where we expect to serve reads.
///
/// This function is O(N) and should be called infrequently. The caller is responsible for
/// looking up and updating the Layer objects for these layer descriptors.
pub fn get_visibility(
&self,
mut read_points: Vec<Lsn>,
) -> (
Vec<(Arc<PersistentLayerDesc>, LayerVisibilityHint)>,
KeySpace,
) {
// This is like a KeySpace, but this type is intended for efficient unions with image layer ranges, whereas
// KeySpace is intended to be composed statically and iterated over.
struct KeyShadow {
// Map of range start to range end
inner: RangeSetBlaze<i128>,
}
impl KeyShadow {
fn new() -> Self {
Self {
inner: Default::default(),
}
}
fn contains(&self, range: Range<Key>) -> bool {
let range_incl = range.start.to_i128()..=range.end.to_i128() - 1;
self.inner.is_superset(&RangeSetBlaze::from_sorted_disjoint(
CheckSortedDisjoint::from([range_incl]),
))
}
/// Add the input range to the keys covered by self.
///
/// Return true if inserting this range covered some keys that were previously not covered
fn cover(&mut self, insert: Range<Key>) -> bool {
let range_incl = insert.start.to_i128()..=insert.end.to_i128() - 1;
self.inner.ranges_insert(range_incl)
}
fn reset(&mut self) {
self.inner = Default::default();
}
fn to_keyspace(&self) -> KeySpace {
let mut accum = KeySpaceAccum::new();
for range_incl in self.inner.ranges() {
let range = Range {
start: Key::from_i128(*range_incl.start()),
end: Key::from_i128(range_incl.end() + 1),
};
accum.add_range(range)
}
accum.to_keyspace()
}
}
// The 'shadow' will be updated as we sweep through the layers: an image layer subtracts from the shadow,
// and a ReadPoint
read_points.sort_by_key(|rp| rp.0);
let mut shadow = KeyShadow::new();
// We will interleave all our read points and layers into a sorted collection
enum Item {
ReadPoint { lsn: Lsn },
Layer(Arc<PersistentLayerDesc>),
}
let mut items = Vec::with_capacity(self.historic.len() + read_points.len());
items.extend(self.iter_historic_layers().map(Item::Layer));
items.extend(
read_points
.into_iter()
.map(|rp| Item::ReadPoint { lsn: rp }),
);
// Ordering: we want to iterate like this:
// 1. Highest LSNs first
// 2. Consider images before deltas if they end at the same LSNs (images cover deltas)
// 3. Consider ReadPoints before image layers if they're at the same LSN (readpoints make that image visible)
items.sort_by_key(|item| {
std::cmp::Reverse(match item {
Item::Layer(layer) => {
if layer.is_delta() {
(Lsn(layer.get_lsn_range().end.0 - 1), 0)
} else {
(layer.image_layer_lsn(), 1)
}
}
Item::ReadPoint { lsn } => (*lsn, 2),
})
});
let mut results = Vec::with_capacity(self.historic.len());
let mut maybe_covered_deltas: Vec<Arc<PersistentLayerDesc>> = Vec::new();
for item in items {
let (reached_lsn, is_readpoint) = match &item {
Item::ReadPoint { lsn } => (lsn, true),
Item::Layer(layer) => (&layer.lsn_range.start, false),
};
maybe_covered_deltas.retain(|d| {
if *reached_lsn >= d.lsn_range.start && is_readpoint {
// We encountered a readpoint within the delta layer: it is visible
results.push((d.clone(), LayerVisibilityHint::Visible));
false
} else if *reached_lsn < d.lsn_range.start {
// We passed the layer's range without encountering a read point: it is not visible
results.push((d.clone(), LayerVisibilityHint::Covered));
false
} else {
// We're still in the delta layer: continue iterating
true
}
});
match item {
Item::ReadPoint { lsn: _lsn } => {
// TODO: propagate the child timeline's shadow from their own run of this function, so that we don't have
// to assume that the whole key range is visible at the branch point.
shadow.reset();
}
Item::Layer(layer) => {
let visibility = if layer.is_delta() {
if shadow.contains(layer.get_key_range()) {
// If a layer isn't visible based on current state, we must defer deciding whether
// it is truly not visible until we have advanced past the delta's range: we might
// encounter another branch point within this delta layer's LSN range.
maybe_covered_deltas.push(layer);
continue;
} else {
LayerVisibilityHint::Visible
}
} else {
let modified = shadow.cover(layer.get_key_range());
if modified {
// An image layer in a region which wasn't fully covered yet: this layer is visible, but layers below it will be covered
LayerVisibilityHint::Visible
} else {
// An image layer in a region that was already covered
LayerVisibilityHint::Covered
}
};
results.push((layer, visibility));
}
}
}
// Drain any remaining maybe_covered deltas
results.extend(
maybe_covered_deltas
.into_iter()
.map(|d| (d, LayerVisibilityHint::Covered)),
);
(results, shadow.to_keyspace())
}
}
#[cfg(test)]
mod tests {
use pageserver_api::keyspace::KeySpace;
use crate::tenant::{storage_layer::LayerName, IndexPart};
use pageserver_api::{
key::DBDIR_KEY,
keyspace::{KeySpace, KeySpaceRandomAccum},
};
use std::{collections::HashMap, path::PathBuf};
use utils::{
id::{TenantId, TimelineId},
shard::TenantShardId,
};
use super::*;
@@ -1002,4 +1175,299 @@ mod tests {
}
}
}
#[test]
fn layer_visibility_basic() {
// A simple synthetic input, as a smoke test.
let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());
let timeline_id = TimelineId::generate();
let mut layer_map = LayerMap::default();
let mut updates = layer_map.batch_update();
const FAKE_LAYER_SIZE: u64 = 1024;
let inject_delta = |updates: &mut BatchedUpdates,
key_start: i128,
key_end: i128,
lsn_start: u64,
lsn_end: u64| {
let desc = PersistentLayerDesc::new_delta(
tenant_shard_id,
timeline_id,
Range {
start: Key::from_i128(key_start),
end: Key::from_i128(key_end),
},
Range {
start: Lsn(lsn_start),
end: Lsn(lsn_end),
},
1024,
);
updates.insert_historic(desc.clone());
desc
};
let inject_image =
|updates: &mut BatchedUpdates, key_start: i128, key_end: i128, lsn: u64| {
let desc = PersistentLayerDesc::new_img(
tenant_shard_id,
timeline_id,
Range {
start: Key::from_i128(key_start),
end: Key::from_i128(key_end),
},
Lsn(lsn),
FAKE_LAYER_SIZE,
);
updates.insert_historic(desc.clone());
desc
};
//
// Construct our scenario: the following lines go in backward-LSN order, constructing the various scenarios
// we expect to handle. You can follow these examples through in the same order as they would be processed
// by the function under test.
//
let mut read_points = vec![Lsn(1000)];
// A delta ahead of any image layer
let ahead_layer = inject_delta(&mut updates, 10, 20, 101, 110);
// An image layer is visible and covers some layers beneath itself
let visible_covering_img = inject_image(&mut updates, 5, 25, 99);
// A delta layer covered by the image layer: should be covered
let covered_delta = inject_delta(&mut updates, 10, 20, 90, 100);
// A delta layer partially covered by an image layer: should be visible
let partially_covered_delta = inject_delta(&mut updates, 1, 7, 90, 100);
// A delta layer not covered by an image layer: should be visible
let not_covered_delta = inject_delta(&mut updates, 1, 4, 90, 100);
// An image layer covered by the image layer above: should be covered
let covered_image = inject_image(&mut updates, 10, 20, 89);
// An image layer partially covered by an image layer: should be visible
let partially_covered_image = inject_image(&mut updates, 1, 7, 89);
// An image layer not covered by an image layer: should be visible
let not_covered_image = inject_image(&mut updates, 1, 4, 89);
// A read point: this will make subsequent layers below here visible, even if there are
// more recent layers covering them.
read_points.push(Lsn(80));
// A delta layer covered by an earlier image layer, but visible to a readpoint below that covering layer
let covered_delta_below_read_point = inject_delta(&mut updates, 10, 20, 70, 79);
// A delta layer whose end LSN is covered, but where a read point is present partway through its LSN range:
// the read point should make it visible, even though its end LSN is covered
let covering_img_between_read_points = inject_image(&mut updates, 10, 20, 69);
let covered_delta_between_read_points = inject_delta(&mut updates, 10, 15, 67, 69);
read_points.push(Lsn(65));
let covered_delta_intersects_read_point = inject_delta(&mut updates, 15, 20, 60, 69);
let visible_img_after_last_read_point = inject_image(&mut updates, 10, 20, 65);
updates.flush();
let (layer_visibilities, shadow) = layer_map.get_visibility(read_points);
let layer_visibilities = layer_visibilities.into_iter().collect::<HashMap<_, _>>();
assert_eq!(
layer_visibilities.get(&ahead_layer),
Some(&LayerVisibilityHint::Visible)
);
assert_eq!(
layer_visibilities.get(&visible_covering_img),
Some(&LayerVisibilityHint::Visible)
);
assert_eq!(
layer_visibilities.get(&covered_delta),
Some(&LayerVisibilityHint::Covered)
);
assert_eq!(
layer_visibilities.get(&partially_covered_delta),
Some(&LayerVisibilityHint::Visible)
);
assert_eq!(
layer_visibilities.get(&not_covered_delta),
Some(&LayerVisibilityHint::Visible)
);
assert_eq!(
layer_visibilities.get(&covered_image),
Some(&LayerVisibilityHint::Covered)
);
assert_eq!(
layer_visibilities.get(&partially_covered_image),
Some(&LayerVisibilityHint::Visible)
);
assert_eq!(
layer_visibilities.get(&not_covered_image),
Some(&LayerVisibilityHint::Visible)
);
assert_eq!(
layer_visibilities.get(&covered_delta_below_read_point),
Some(&LayerVisibilityHint::Visible)
);
assert_eq!(
layer_visibilities.get(&covering_img_between_read_points),
Some(&LayerVisibilityHint::Visible)
);
assert_eq!(
layer_visibilities.get(&covered_delta_between_read_points),
Some(&LayerVisibilityHint::Covered)
);
assert_eq!(
layer_visibilities.get(&covered_delta_intersects_read_point),
Some(&LayerVisibilityHint::Visible)
);
assert_eq!(
layer_visibilities.get(&visible_img_after_last_read_point),
Some(&LayerVisibilityHint::Visible)
);
// Shadow should include all the images below the last read point
let expected_shadow = KeySpace {
ranges: vec![Key::from_i128(10)..Key::from_i128(20)],
};
assert_eq!(shadow, expected_shadow);
}
fn fixture_path(relative: &str) -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(relative)
}
#[test]
fn layer_visibility_realistic() {
// Load a large example layermap
let index_raw = std::fs::read_to_string(fixture_path(
"test_data/indices/mixed_workload/index_part.json",
))
.unwrap();
let index: IndexPart = serde_json::from_str::<IndexPart>(&index_raw).unwrap();
let tenant_id = TenantId::generate();
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
let timeline_id = TimelineId::generate();
let mut layer_map = LayerMap::default();
let mut updates = layer_map.batch_update();
for (layer_name, layer_metadata) in index.layer_metadata {
let layer_desc = match layer_name {
LayerName::Image(layer_name) => PersistentLayerDesc {
key_range: layer_name.key_range.clone(),
lsn_range: layer_name.lsn_as_range(),
tenant_shard_id,
timeline_id,
is_delta: false,
file_size: layer_metadata.file_size,
},
LayerName::Delta(layer_name) => PersistentLayerDesc {
key_range: layer_name.key_range,
lsn_range: layer_name.lsn_range,
tenant_shard_id,
timeline_id,
is_delta: true,
file_size: layer_metadata.file_size,
},
};
updates.insert_historic(layer_desc);
}
updates.flush();
let read_points = vec![index.metadata.disk_consistent_lsn()];
let (layer_visibilities, shadow) = layer_map.get_visibility(read_points);
for (layer_desc, visibility) in &layer_visibilities {
tracing::info!("{layer_desc:?}: {visibility:?}");
eprintln!("{layer_desc:?}: {visibility:?}");
}
// The shadow should be non-empty, since there were some image layers
assert!(!shadow.ranges.is_empty());
// At least some layers should be marked covered
assert!(layer_visibilities
.iter()
.any(|i| matches!(i.1, LayerVisibilityHint::Covered)));
let layer_visibilities = layer_visibilities.into_iter().collect::<HashMap<_, _>>();
// Brute force validation: a layer should be marked covered if and only if there are image layers above it in LSN order which cover it
for (layer_desc, visible) in &layer_visibilities {
let mut coverage = KeySpaceRandomAccum::new();
let mut covered_by = Vec::new();
for other_layer in layer_map.iter_historic_layers() {
if &other_layer == layer_desc {
continue;
}
if !other_layer.is_delta()
&& other_layer.image_layer_lsn() >= Lsn(layer_desc.get_lsn_range().end.0 - 1)
&& other_layer.key_range.start <= layer_desc.key_range.end
&& layer_desc.key_range.start <= other_layer.key_range.end
{
coverage.add_range(other_layer.get_key_range());
covered_by.push((*other_layer).clone());
}
}
let coverage = coverage.to_keyspace();
let expect_visible = if coverage.ranges.len() == 1
&& coverage.contains(&layer_desc.key_range.start)
&& coverage.contains(&Key::from_i128(layer_desc.key_range.end.to_i128() - 1))
{
LayerVisibilityHint::Covered
} else {
LayerVisibilityHint::Visible
};
if expect_visible != *visible {
eprintln!(
"Layer {}..{} @ {}..{} (delta={}) is {visible:?}, should be {expect_visible:?}",
layer_desc.key_range.start,
layer_desc.key_range.end,
layer_desc.lsn_range.start,
layer_desc.lsn_range.end,
layer_desc.is_delta()
);
if expect_visible == LayerVisibilityHint::Covered {
eprintln!("Covered by:");
for other in covered_by {
eprintln!(
" {}..{} @ {}",
other.get_key_range().start,
other.get_key_range().end,
other.image_layer_lsn()
);
}
if let Some(range) = coverage.ranges.first() {
eprintln!(
"Total coverage from contributing layers: {}..{}",
range.start, range.end
);
} else {
eprintln!(
"Total coverage from contributing layers: {:?}",
coverage.ranges
);
}
}
}
assert_eq!(expect_visible, *visible);
}
// Sanity: the layer that holds latest data for the DBDIR key should always be visible
// (just using this key as a key that will always exist for any layermap fixture)
let dbdir_layer = layer_map
.search(DBDIR_KEY, index.metadata.disk_consistent_lsn())
.unwrap();
assert!(matches!(
layer_visibilities.get(&dbdir_layer.layer).unwrap(),
LayerVisibilityHint::Visible
));
}
}

View File

@@ -521,6 +521,10 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
Ok(&self.historic_coverage)
}
pub(crate) fn len(&self) -> usize {
self.layers.len()
}
}
#[test]

View File

@@ -55,7 +55,7 @@ use utils::id::{TenantId, TimelineId};
use super::remote_timeline_client::remote_tenant_path;
use super::secondary::SecondaryTenant;
use super::timeline::detach_ancestor::PreparedTimelineDetach;
use super::TenantSharedResources;
use super::{GlobalShutDown, TenantSharedResources};
/// For a tenant that appears in TenantsMap, it may either be
/// - `Attached`: has a full Tenant object, is elegible to service
@@ -116,8 +116,6 @@ pub(crate) enum ShardSelector {
/// Only return the 0th shard, if it is present. If a non-0th shard is present,
/// ignore it.
Zero,
/// Pick the first shard we find for the TenantId
First,
/// Pick the shard that holds this key
Page(Key),
/// The shard ID is known: pick the given shard
@@ -667,17 +665,20 @@ pub async fn init_tenant_mgr(
let tenant_dir_path = conf.tenant_path(&tenant_shard_id);
let shard_identity = location_conf.shard;
let slot = match location_conf.mode {
LocationMode::Attached(attached_conf) => TenantSlot::Attached(tenant_spawn(
conf,
tenant_shard_id,
&tenant_dir_path,
resources.clone(),
AttachedTenantConf::new(location_conf.tenant_conf, attached_conf),
shard_identity,
Some(init_order.clone()),
SpawnMode::Lazy,
&ctx,
)),
LocationMode::Attached(attached_conf) => TenantSlot::Attached(
tenant_spawn(
conf,
tenant_shard_id,
&tenant_dir_path,
resources.clone(),
AttachedTenantConf::new(location_conf.tenant_conf, attached_conf),
shard_identity,
Some(init_order.clone()),
SpawnMode::Lazy,
&ctx,
)
.expect("global shutdown during init_tenant_mgr cannot happen"),
),
LocationMode::Secondary(secondary_conf) => {
info!(
tenant_id = %tenant_shard_id.tenant_id,
@@ -725,7 +726,7 @@ fn tenant_spawn(
init_order: Option<InitializationOrder>,
mode: SpawnMode,
ctx: &RequestContext,
) -> Arc<Tenant> {
) -> Result<Arc<Tenant>, GlobalShutDown> {
// All these conditions should have been satisfied by our caller: the tenant dir exists, is a well formed
// path, and contains a configuration file. Assertions that do synchronous I/O are limited to debug mode
// to avoid impacting prod runtime performance.
@@ -1192,7 +1193,10 @@ impl TenantManager {
None,
spawn_mode,
ctx,
);
)
.map_err(|_: GlobalShutDown| {
UpsertLocationError::Unavailable(TenantMapError::ShuttingDown)
})?;
TenantSlot::Attached(tenant)
}
@@ -1313,7 +1317,7 @@ impl TenantManager {
None,
SpawnMode::Eager,
ctx,
);
)?;
slot_guard.upsert(TenantSlot::Attached(tenant))?;
@@ -2047,7 +2051,7 @@ impl TenantManager {
None,
SpawnMode::Eager,
ctx,
);
)?;
slot_guard.upsert(TenantSlot::Attached(tenant))?;
@@ -2088,7 +2092,6 @@ impl TenantManager {
};
match selector {
ShardSelector::First => return ShardResolveResult::Found(tenant.clone()),
ShardSelector::Zero if slot.0.shard_number == ShardNumber(0) => {
return ShardResolveResult::Found(tenant.clone())
}
@@ -2170,6 +2173,9 @@ pub(crate) enum GetActiveTenantError {
/// never happen.
#[error("Tenant is broken: {0}")]
Broken(String),
#[error("reconnect to switch tenant id")]
SwitchedTenant,
}
#[derive(Debug, thiserror::Error)]

View File

@@ -1378,6 +1378,18 @@ impl RemoteTimelineClient {
.dirty
.layer_metadata
.drain()
.filter(|(_file_name, meta)| {
// Filter out layers that belonged to an ancestor shard. Since we are deleting the whole timeline from
// all shards anyway, we _could_ delete these, but
// - it creates a potential race if other shards are still
// using the layers while this shard deletes them.
// - it means that if we rolled back the shard split, the ancestor shards would be in a state where
// these timelines are present but corrupt (their index exists but some layers don't)
//
// These layers will eventually be cleaned up by the scrubber when it does physical GC.
meta.shard.shard_number == self.tenant_shard_id.shard_number
&& meta.shard.shard_count == self.tenant_shard_id.shard_count
})
.map(|(file_name, meta)| {
remote_layer_path(
&self.tenant_shard_id.tenant_id,

View File

@@ -8,6 +8,9 @@ mod layer_desc;
mod layer_name;
pub mod merge_iterator;
#[cfg(test)]
pub mod split_writer;
use crate::context::{AccessStatsBehavior, RequestContext};
use crate::repository::Value;
use crate::walrecord::NeonWalRecord;
@@ -451,20 +454,14 @@ pub enum ValueReconstructResult {
/// than an authoritative value, so that we do not have to update it synchronously when changing the visibility
/// of layers (for example when creating a branch that makes some previously covered layers visible). It should
/// be used for cache management but not for correctness-critical checks.
#[derive(Default, Debug, Clone, PartialEq, Eq)]
pub(crate) enum LayerVisibilityHint {
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LayerVisibilityHint {
/// A Visible layer might be read while serving a read, because there is not an image layer between it
/// and a readable LSN (the tip of the branch or a child's branch point)
Visible,
/// A Covered layer probably won't be read right now, but _can_ be read in future if someone creates
/// a branch or ephemeral endpoint at an LSN below the layer that covers this.
#[allow(unused)]
Covered,
/// Calculating layer visibilty requires I/O, so until this has happened layers are loaded
/// in this state. Note that newly written layers may be called Visible immediately, this uninitialized
/// state is for when existing layers are constructed while loading a timeline.
#[default]
Uninitialized,
}
pub(crate) struct LayerAccessStats(std::sync::atomic::AtomicU64);
@@ -626,23 +623,30 @@ impl LayerAccessStats {
}
}
pub(crate) fn set_visibility(&self, visibility: LayerVisibilityHint) {
let value = match visibility {
LayerVisibilityHint::Visible => 0x1 << Self::VISIBILITY_SHIFT,
LayerVisibilityHint::Covered | LayerVisibilityHint::Uninitialized => 0x0,
};
self.write_bits(0x1 << Self::VISIBILITY_SHIFT, value);
}
pub(crate) fn visibility(&self) -> LayerVisibilityHint {
let read = self.0.load(std::sync::atomic::Ordering::Relaxed);
match (read >> Self::VISIBILITY_SHIFT) & 0x1 {
/// Helper for extracting the visibility hint from the literal value of our inner u64
fn decode_visibility(&self, bits: u64) -> LayerVisibilityHint {
match (bits >> Self::VISIBILITY_SHIFT) & 0x1 {
1 => LayerVisibilityHint::Visible,
0 => LayerVisibilityHint::Covered,
_ => unreachable!(),
}
}
/// Returns the old value which has been replaced
pub(crate) fn set_visibility(&self, visibility: LayerVisibilityHint) -> LayerVisibilityHint {
let value = match visibility {
LayerVisibilityHint::Visible => 0x1 << Self::VISIBILITY_SHIFT,
LayerVisibilityHint::Covered => 0x0,
};
let old_bits = self.write_bits(0x1 << Self::VISIBILITY_SHIFT, value);
self.decode_visibility(old_bits)
}
pub(crate) fn visibility(&self) -> LayerVisibilityHint {
let read = self.0.load(std::sync::atomic::Ordering::Relaxed);
self.decode_visibility(read)
}
}
/// Get a layer descriptor from a layer.

View File

@@ -384,6 +384,9 @@ struct DeltaLayerWriterInner {
tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
blob_writer: BlobWriter<true>,
// Number of key-lsns in the layer.
num_keys: usize,
}
impl DeltaLayerWriterInner {
@@ -425,6 +428,7 @@ impl DeltaLayerWriterInner {
lsn_range,
tree: tree_builder,
blob_writer,
num_keys: 0,
})
}
@@ -475,6 +479,9 @@ impl DeltaLayerWriterInner {
let delta_key = DeltaKey::from_key_lsn(&key, lsn);
let res = self.tree.append(&delta_key.0, blob_ref.0);
self.num_keys += 1;
(val, res.map_err(|e| anyhow::anyhow!(e)))
}
@@ -686,6 +693,17 @@ impl DeltaLayerWriter {
.finish(key_end, timeline, ctx)
.await
}
#[cfg(test)]
pub(crate) fn num_keys(&self) -> usize {
self.inner.as_ref().unwrap().num_keys
}
#[cfg(test)]
pub(crate) fn estimated_size(&self) -> u64 {
let inner = self.inner.as_ref().unwrap();
inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
}
}
impl Drop for DeltaLayerWriter {

View File

@@ -742,8 +742,14 @@ struct ImageLayerWriterInner {
// where we have chosen their compressed form
uncompressed_bytes_chosen: u64,
// Number of keys in the layer.
num_keys: usize,
blob_writer: BlobWriter<false>,
tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
#[cfg_attr(not(feature = "testing"), allow(dead_code))]
last_written_key: Key,
}
impl ImageLayerWriterInner {
@@ -800,6 +806,8 @@ impl ImageLayerWriterInner {
uncompressed_bytes: 0,
uncompressed_bytes_eligible: 0,
uncompressed_bytes_chosen: 0,
num_keys: 0,
last_written_key: Key::MIN,
};
Ok(writer)
@@ -820,6 +828,7 @@ impl ImageLayerWriterInner {
let compression = self.conf.image_compression;
let uncompressed_len = img.len() as u64;
self.uncompressed_bytes += uncompressed_len;
self.num_keys += 1;
let (_img, res) = self
.blob_writer
.write_blob_maybe_compressed(img, ctx, compression)
@@ -839,6 +848,11 @@ impl ImageLayerWriterInner {
key.write_to_byte_slice(&mut keybuf);
self.tree.append(&keybuf, off)?;
#[cfg(feature = "testing")]
{
self.last_written_key = key;
}
Ok(())
}
@@ -849,6 +863,7 @@ impl ImageLayerWriterInner {
self,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
end_key: Option<Key>,
) -> anyhow::Result<ResidentLayer> {
let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
@@ -899,11 +914,23 @@ impl ImageLayerWriterInner {
let desc = PersistentLayerDesc::new_img(
self.tenant_shard_id,
self.timeline_id,
self.key_range.clone(),
if let Some(end_key) = end_key {
self.key_range.start..end_key
} else {
self.key_range.clone()
},
self.lsn,
metadata.len(),
);
#[cfg(feature = "testing")]
if let Some(end_key) = end_key {
assert!(
self.last_written_key < end_key,
"written key violates end_key range"
);
}
// Note: Because we open the file in write-only mode, we cannot
// reuse the same VirtualFile for reading later. That's why we don't
// set inner.file here. The first read will have to re-open it.
@@ -980,6 +1007,18 @@ impl ImageLayerWriter {
self.inner.as_mut().unwrap().put_image(key, img, ctx).await
}
#[cfg(test)]
/// Estimated size of the image layer.
pub(crate) fn estimated_size(&self) -> u64 {
let inner = self.inner.as_ref().unwrap();
inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
}
#[cfg(test)]
pub(crate) fn num_keys(&self) -> usize {
self.inner.as_ref().unwrap().num_keys
}
///
/// Finish writing the image layer.
///
@@ -988,7 +1027,22 @@ impl ImageLayerWriter {
timeline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<super::ResidentLayer> {
self.inner.take().unwrap().finish(timeline, ctx).await
self.inner.take().unwrap().finish(timeline, ctx, None).await
}
#[cfg(test)]
/// Finish writing the image layer with an end key, used in [`super::split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive.
pub(super) async fn finish_with_end_key(
mut self,
timeline: &Arc<Timeline>,
end_key: Key,
ctx: &RequestContext,
) -> anyhow::Result<super::ResidentLayer> {
self.inner
.take()
.unwrap()
.finish(timeline, ctx, Some(end_key))
.await
}
}

View File

@@ -24,7 +24,8 @@ use super::delta_layer::{self, DeltaEntry};
use super::image_layer::{self};
use super::{
AsLayerDesc, ImageLayerWriter, LayerAccessStats, LayerAccessStatsReset, LayerName,
PersistentLayerDesc, ValueReconstructResult, ValueReconstructState, ValuesReconstructState,
LayerVisibilityHint, PersistentLayerDesc, ValueReconstructResult, ValueReconstructState,
ValuesReconstructState,
};
use utils::generation::Generation;
@@ -246,7 +247,7 @@ impl Layer {
&timeline.generation,
);
let layer = LayerInner::new(
LayerInner::new(
conf,
timeline,
local_path,
@@ -254,14 +255,7 @@ impl Layer {
Some(inner),
timeline.generation,
timeline.get_shard_index(),
);
// Newly created layers are marked visible by default: the usual case is that they were created to be read.
layer
.access_stats
.set_visibility(super::LayerVisibilityHint::Visible);
layer
)
}));
let downloaded = resident.expect("just initialized");
@@ -493,6 +487,32 @@ impl Layer {
}
}
}
pub(crate) fn set_visibility(&self, visibility: LayerVisibilityHint) {
let old_visibility = self.access_stats().set_visibility(visibility.clone());
use LayerVisibilityHint::*;
match (old_visibility, visibility) {
(Visible, Covered) => {
// Subtract this layer's contribution to the visible size metric
if let Some(tl) = self.0.timeline.upgrade() {
tl.metrics
.visible_physical_size_gauge
.sub(self.0.desc.file_size)
}
}
(Covered, Visible) => {
// Add this layer's contribution to the visible size metric
if let Some(tl) = self.0.timeline.upgrade() {
tl.metrics
.visible_physical_size_gauge
.add(self.0.desc.file_size)
}
}
(Covered, Covered) | (Visible, Visible) => {
// no change
}
}
}
}
/// The download-ness ([`DownloadedLayer`]) can be either resident or wanted evicted.
@@ -693,6 +713,13 @@ impl Drop for LayerInner {
timeline.metrics.layer_count_image.dec();
timeline.metrics.layer_size_image.sub(self.desc.file_size);
}
if matches!(self.access_stats.visibility(), LayerVisibilityHint::Visible) {
timeline
.metrics
.visible_physical_size_gauge
.sub(self.desc.file_size);
}
}
if !*self.wanted_deleted.get_mut() {
@@ -801,6 +828,12 @@ impl LayerInner {
timeline.metrics.layer_size_image.add(desc.file_size);
}
// New layers are visible by default. This metric is later updated on drop or in set_visibility
timeline
.metrics
.visible_physical_size_gauge
.add(desc.file_size);
LayerInner {
conf,
debug_str: {

View File

@@ -41,6 +41,20 @@ pub struct PersistentLayerKey {
pub is_delta: bool,
}
impl std::fmt::Display for PersistentLayerKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}..{} {}..{} is_delta={}",
self.key_range.start,
self.key_range.end,
self.lsn_range.start,
self.lsn_range.end,
self.is_delta
)
}
}
impl PersistentLayerDesc {
pub fn key(&self) -> PersistentLayerKey {
PersistentLayerKey {

View File

@@ -0,0 +1,449 @@
use std::{ops::Range, sync::Arc};
use bytes::Bytes;
use pageserver_api::key::{Key, KEY_SIZE};
use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId};
use crate::{config::PageServerConf, context::RequestContext, repository::Value, tenant::Timeline};
use super::{DeltaLayerWriter, ImageLayerWriter, ResidentLayer};
/// An image writer that takes images and produces multiple image layers. The interface does not
/// guarantee atomicity (i.e., if the image layer generation fails, there might be leftover files
/// to be cleaned up)
#[must_use]
pub struct SplitImageLayerWriter {
inner: ImageLayerWriter,
target_layer_size: u64,
generated_layers: Vec<ResidentLayer>,
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
lsn: Lsn,
}
impl SplitImageLayerWriter {
pub async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
start_key: Key,
lsn: Lsn,
target_layer_size: u64,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
Ok(Self {
target_layer_size,
inner: ImageLayerWriter::new(
conf,
timeline_id,
tenant_shard_id,
&(start_key..Key::MAX),
lsn,
ctx,
)
.await?,
generated_layers: Vec::new(),
conf,
timeline_id,
tenant_shard_id,
lsn,
})
}
pub async fn put_image(
&mut self,
key: Key,
img: Bytes,
tline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
// The current estimation is an upper bound of the space that the key/image could take
// because we did not consider compression in this estimation. The resulting image layer
// could be smaller than the target size.
let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
if self.inner.num_keys() >= 1
&& self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
{
let next_image_writer = ImageLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
&(key..Key::MAX),
self.lsn,
ctx,
)
.await?;
let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
self.generated_layers.push(
prev_image_writer
.finish_with_end_key(tline, key, ctx)
.await?,
);
}
self.inner.put_image(key, img, ctx).await
}
pub(crate) async fn finish(
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
end_key: Key,
) -> anyhow::Result<Vec<ResidentLayer>> {
let Self {
mut generated_layers,
inner,
..
} = self;
generated_layers.push(inner.finish_with_end_key(tline, end_key, ctx).await?);
Ok(generated_layers)
}
/// When split writer fails, the caller should call this function and handle partially generated layers.
#[allow(dead_code)]
pub(crate) async fn take(self) -> anyhow::Result<(Vec<ResidentLayer>, ImageLayerWriter)> {
Ok((self.generated_layers, self.inner))
}
}
/// A delta writer that takes key-lsn-values and produces multiple delta layers. The interface does not
/// guarantee atomicity (i.e., if the delta layer generation fails, there might be leftover files
/// to be cleaned up).
#[must_use]
pub struct SplitDeltaLayerWriter {
inner: DeltaLayerWriter,
target_layer_size: u64,
generated_layers: Vec<ResidentLayer>,
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
lsn_range: Range<Lsn>,
}
impl SplitDeltaLayerWriter {
pub async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
start_key: Key,
lsn_range: Range<Lsn>,
target_layer_size: u64,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
Ok(Self {
target_layer_size,
inner: DeltaLayerWriter::new(
conf,
timeline_id,
tenant_shard_id,
start_key,
lsn_range.clone(),
ctx,
)
.await?,
generated_layers: Vec::new(),
conf,
timeline_id,
tenant_shard_id,
lsn_range,
})
}
pub async fn put_value(
&mut self,
key: Key,
lsn: Lsn,
val: Value,
tline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
// The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
// number, and therefore the final layer size could be a little bit larger or smaller than the target.
let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
if self.inner.num_keys() >= 1
&& self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
{
let next_delta_writer = DeltaLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
key,
self.lsn_range.clone(),
ctx,
)
.await?;
let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer);
self.generated_layers
.push(prev_delta_writer.finish(key, tline, ctx).await?);
}
self.inner.put_value(key, lsn, val, ctx).await
}
pub(crate) async fn finish(
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
end_key: Key,
) -> anyhow::Result<Vec<ResidentLayer>> {
let Self {
mut generated_layers,
inner,
..
} = self;
generated_layers.push(inner.finish(end_key, tline, ctx).await?);
Ok(generated_layers)
}
/// When split writer fails, the caller should call this function and handle partially generated layers.
#[allow(dead_code)]
pub(crate) async fn take(self) -> anyhow::Result<(Vec<ResidentLayer>, DeltaLayerWriter)> {
Ok((self.generated_layers, self.inner))
}
}
#[cfg(test)]
mod tests {
use crate::{
tenant::{
harness::{TenantHarness, TIMELINE_ID},
storage_layer::AsLayerDesc,
},
DEFAULT_PG_VERSION,
};
use super::*;
fn get_key(id: u32) -> Key {
let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
key.field6 = id;
key
}
fn get_img(id: u32) -> Bytes {
format!("{id:064}").into()
}
fn get_large_img() -> Bytes {
vec![0; 8192].into()
}
#[tokio::test]
async fn write_one_image() {
let harness = TenantHarness::create("split_writer_write_one_image")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await
.unwrap();
let mut image_writer = SplitImageLayerWriter::new(
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
get_key(0),
Lsn(0x18),
4 * 1024 * 1024,
&ctx,
)
.await
.unwrap();
let mut delta_writer = SplitDeltaLayerWriter::new(
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
get_key(0),
Lsn(0x18)..Lsn(0x20),
4 * 1024 * 1024,
&ctx,
)
.await
.unwrap();
image_writer
.put_image(get_key(0), get_img(0), &tline, &ctx)
.await
.unwrap();
let layers = image_writer
.finish(&tline, &ctx, get_key(10))
.await
.unwrap();
assert_eq!(layers.len(), 1);
delta_writer
.put_value(
get_key(0),
Lsn(0x18),
Value::Image(get_img(0)),
&tline,
&ctx,
)
.await
.unwrap();
let layers = delta_writer
.finish(&tline, &ctx, get_key(10))
.await
.unwrap();
assert_eq!(layers.len(), 1);
}
#[tokio::test]
async fn write_split() {
let harness = TenantHarness::create("split_writer_write_split")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await
.unwrap();
let mut image_writer = SplitImageLayerWriter::new(
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
get_key(0),
Lsn(0x18),
4 * 1024 * 1024,
&ctx,
)
.await
.unwrap();
let mut delta_writer = SplitDeltaLayerWriter::new(
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
get_key(0),
Lsn(0x18)..Lsn(0x20),
4 * 1024 * 1024,
&ctx,
)
.await
.unwrap();
const N: usize = 2000;
for i in 0..N {
let i = i as u32;
image_writer
.put_image(get_key(i), get_large_img(), &tline, &ctx)
.await
.unwrap();
delta_writer
.put_value(
get_key(i),
Lsn(0x20),
Value::Image(get_large_img()),
&tline,
&ctx,
)
.await
.unwrap();
}
let image_layers = image_writer
.finish(&tline, &ctx, get_key(N as u32))
.await
.unwrap();
let delta_layers = delta_writer
.finish(&tline, &ctx, get_key(N as u32))
.await
.unwrap();
assert_eq!(image_layers.len(), N / 512 + 1);
assert_eq!(delta_layers.len(), N / 512 + 1);
for idx in 0..image_layers.len() {
assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN);
assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX);
assert_ne!(delta_layers[idx].layer_desc().key_range.start, Key::MIN);
assert_ne!(delta_layers[idx].layer_desc().key_range.end, Key::MAX);
if idx > 0 {
assert_eq!(
image_layers[idx - 1].layer_desc().key_range.end,
image_layers[idx].layer_desc().key_range.start
);
assert_eq!(
delta_layers[idx - 1].layer_desc().key_range.end,
delta_layers[idx].layer_desc().key_range.start
);
}
}
}
#[tokio::test]
async fn write_large_img() {
let harness = TenantHarness::create("split_writer_write_large_img")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await
.unwrap();
let mut image_writer = SplitImageLayerWriter::new(
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
get_key(0),
Lsn(0x18),
4 * 1024,
&ctx,
)
.await
.unwrap();
let mut delta_writer = SplitDeltaLayerWriter::new(
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
get_key(0),
Lsn(0x18)..Lsn(0x20),
4 * 1024,
&ctx,
)
.await
.unwrap();
image_writer
.put_image(get_key(0), get_img(0), &tline, &ctx)
.await
.unwrap();
image_writer
.put_image(get_key(1), get_large_img(), &tline, &ctx)
.await
.unwrap();
let layers = image_writer
.finish(&tline, &ctx, get_key(10))
.await
.unwrap();
assert_eq!(layers.len(), 2);
delta_writer
.put_value(
get_key(0),
Lsn(0x18),
Value::Image(get_img(0)),
&tline,
&ctx,
)
.await
.unwrap();
delta_writer
.put_value(
get_key(1),
Lsn(0x1A),
Value::Image(get_large_img()),
&tline,
&ctx,
)
.await
.unwrap();
let layers = delta_writer
.finish(&tline, &ctx, get_key(10))
.await
.unwrap();
assert_eq!(layers.len(), 2);
}
}

View File

@@ -3,6 +3,7 @@ pub(crate) mod compaction;
pub mod delete;
pub(crate) mod detach_ancestor;
mod eviction_task;
pub(crate) mod handle;
mod init;
pub mod layer_manager;
pub(crate) mod logical_size;
@@ -17,6 +18,7 @@ use camino::Utf8Path;
use chrono::{DateTime, Utc};
use enumset::EnumSet;
use fail::fail_point;
use handle::ShardTimelineId;
use once_cell::sync::Lazy;
use pageserver_api::{
key::{
@@ -74,6 +76,7 @@ use crate::{
metadata::TimelineMetadata,
storage_layer::PersistentLayerDesc,
},
walredo,
};
use crate::{
context::{DownloadBehavior, RequestContext},
@@ -140,7 +143,10 @@ use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::{config::TenantConf, upload_queue::NotInitialized};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
use super::{remote_timeline_client::RemoteTimelineClient, storage_layer::ReadableLayer};
use super::{
remote_timeline_client::RemoteTimelineClient, remote_timeline_client::WaitCompletionError,
storage_layer::ReadableLayer,
};
use super::{
secondary::heatmap::{HeatMapLayer, HeatMapTimeline},
GcError,
@@ -424,6 +430,8 @@ pub struct Timeline {
pub(crate) extra_test_dense_keyspace: ArcSwap<KeySpace>,
pub(crate) l0_flush_global_state: L0FlushGlobalState,
pub(crate) handles: handle::PerTimelineState<crate::page_service::TenantManagerTypes>,
}
pub struct WalReceiverInfo {
@@ -529,7 +537,6 @@ impl GetVectoredError {
}
}
#[derive(Debug)]
pub struct MissingKeyError {
key: Key,
shard: ShardNumber,
@@ -540,6 +547,12 @@ pub struct MissingKeyError {
backtrace: Option<std::backtrace::Backtrace>,
}
impl std::fmt::Debug for MissingKeyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self)
}
}
impl std::fmt::Display for MissingKeyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
@@ -991,7 +1004,10 @@ impl Timeline {
.for_get_kind(GetKind::Singular)
.observe(elapsed.as_secs_f64());
if cfg!(feature = "testing") && res.is_err() {
if cfg!(feature = "testing")
&& res.is_err()
&& !matches!(res, Err(PageReconstructError::Cancelled))
{
// it can only be walredo issue
use std::fmt::Write;
@@ -1910,6 +1926,9 @@ impl Timeline {
tracing::debug!("Cancelling CancellationToken");
self.cancel.cancel();
// Ensure Prevent new page service requests from starting.
self.handles.shutdown();
// Transition the remote_client into a state where it's only useful for timeline deletion.
// (The deletion use case is why we can't just hook up remote_client to Self::cancel).)
self.remote_client.stop();
@@ -2435,6 +2454,8 @@ impl Timeline {
extra_test_dense_keyspace: ArcSwap::new(Arc::new(KeySpace::default())),
l0_flush_global_state: resources.l0_flush_global_state,
handles: Default::default(),
};
result.repartition_threshold =
result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
@@ -2718,6 +2739,10 @@ impl Timeline {
// Tenant::create_timeline will wait for these uploads to happen before returning, or
// on retry.
// Now that we have the full layer map, we may calculate the visibility of layers within it (a global scan)
drop(guard); // drop write lock, update_layer_visibility will take a read lock.
self.update_layer_visibility().await;
info!(
"loaded layer map with {} layers at {}, total physical size: {}",
num_layers, disk_consistent_lsn, total_physical_size
@@ -3704,6 +3729,17 @@ impl Timeline {
&self.shard_identity
}
#[inline(always)]
pub(crate) fn shard_timeline_id(&self) -> ShardTimelineId {
ShardTimelineId {
shard_index: ShardIndex {
shard_number: self.shard_identity.number,
shard_count: self.shard_identity.count,
},
timeline_id: self.timeline_id,
}
}
///
/// Get a handle to the latest layer for appending.
///
@@ -4056,6 +4092,21 @@ impl Timeline {
// release lock on 'layers'
};
// Backpressure mechanism: wait with continuation of the flush loop until we have uploaded all layer files.
// This makes us refuse ingest until the new layers have been persisted to the remote.
self.remote_client
.wait_completion()
.await
.map_err(|e| match e {
WaitCompletionError::UploadQueueShutDownOrStopped
| WaitCompletionError::NotInitialized(
NotInitialized::ShuttingDown | NotInitialized::Stopped,
) => FlushLayerError::Cancelled,
WaitCompletionError::NotInitialized(NotInitialized::Uninitialized) => {
FlushLayerError::Other(anyhow!(e).into())
}
})?;
// FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
// a compaction can delete the file and then it won't be available for uploads any more.
// We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this
@@ -4648,27 +4699,6 @@ impl Timeline {
}
}
// The writer.finish() above already did the fsync of the inodes.
// We just need to fsync the directory in which these inodes are linked,
// which we know to be the timeline directory.
if !image_layers.is_empty() {
// We use fatal_err() below because the after writer.finish() returns with success,
// the in-memory state of the filesystem already has the layer file in its final place,
// and subsequent pageserver code could think it's durable while it really isn't.
let timeline_dir = VirtualFile::open(
&self
.conf
.timeline_path(&self.tenant_shard_id, &self.timeline_id),
ctx,
)
.await
.fatal_err("VirtualFile::open for timeline dir fsync");
timeline_dir
.sync_all()
.await
.fatal_err("VirtualFile::sync_all timeline dir");
}
let mut guard = self.layers.write().await;
// FIXME: we could add the images to be uploaded *before* returning from here, but right
@@ -4677,6 +4707,9 @@ impl Timeline {
drop_wlock(guard);
timer.stop_and_record();
// Creating image layers may have caused some previously visible layers to be covered
self.update_layer_visibility().await;
Ok(image_layers)
}
@@ -5441,20 +5474,22 @@ impl Timeline {
} else {
trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn);
};
let img = match self
let res = self
.walredo_mgr
.as_ref()
.context("timeline has no walredo manager")
.map_err(PageReconstructError::WalRedo)?
.request_redo(key, request_lsn, data.img, data.records, self.pg_version)
.await
.context("reconstruct a page image")
{
.await;
let img = match res {
Ok(img) => img,
Err(e) => return Err(PageReconstructError::WalRedo(e)),
Err(walredo::Error::Cancelled) => return Err(PageReconstructError::Cancelled),
Err(walredo::Error::Other(e)) => {
return Err(PageReconstructError::WalRedo(
e.context("reconstruct a page image"),
))
}
};
Ok(img)
}
}

View File

@@ -4,7 +4,7 @@
//!
//! The old legacy algorithm is implemented directly in `timeline.rs`.
use std::collections::BinaryHeap;
use std::collections::{BinaryHeap, HashSet};
use std::ops::{Deref, Range};
use std::sync::Arc;
@@ -15,6 +15,7 @@ use super::{
};
use anyhow::{anyhow, Context};
use bytes::Bytes;
use enumset::EnumSet;
use fail::fail_point;
use itertools::Itertools;
@@ -29,7 +30,9 @@ use crate::page_cache;
use crate::tenant::config::defaults::{DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD};
use crate::tenant::remote_timeline_client::WaitCompletionError;
use crate::tenant::storage_layer::merge_iterator::MergeIterator;
use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc, ValueReconstructState};
use crate::tenant::storage_layer::{
AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
};
use crate::tenant::timeline::ImageLayerCreationOutcome;
use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
use crate::tenant::timeline::{Layer, ResidentLayer};
@@ -69,17 +72,21 @@ impl KeyHistoryRetention {
self,
key: Key,
delta_writer: &mut Vec<(Key, Lsn, Value)>,
image_writer: &mut ImageLayerWriter,
mut image_writer: Option<&mut ImageLayerWriter>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let mut first_batch = true;
for (_, KeyLogAtLsn(logs)) in self.below_horizon {
for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon {
if first_batch {
if logs.len() == 1 && logs[0].1.is_image() {
let Value::Image(img) = &logs[0].1 else {
unreachable!()
};
image_writer.put_image(key, img.clone(), ctx).await?;
if let Some(image_writer) = image_writer.as_mut() {
image_writer.put_image(key, img.clone(), ctx).await?;
} else {
delta_writer.push((key, cutoff_lsn, Value::Image(img.clone())));
}
} else {
for (lsn, val) in logs {
delta_writer.push((key, lsn, val));
@@ -438,6 +445,45 @@ impl Timeline {
Ok(())
}
/// Update the LayerVisibilityHint of layers covered by image layers, based on whether there is
/// an image layer between them and the most recent readable LSN (branch point or tip of timeline). The
/// purpose of the visibility hint is to record which layers need to be available to service reads.
///
/// The result may be used as an input to eviction and secondary downloads to de-prioritize layers
/// that we know won't be needed for reads.
pub(super) async fn update_layer_visibility(&self) {
let head_lsn = self.get_last_record_lsn();
// We will sweep through layers in reverse-LSN order. We only do historic layers. L0 deltas
// are implicitly left visible, because LayerVisibilityHint's default is Visible, and we never modify it here.
// Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
// they will be subject to L0->L1 compaction in the near future.
let layer_manager = self.layers.read().await;
let layer_map = layer_manager.layer_map();
let readable_points = {
let children = self.gc_info.read().unwrap().retain_lsns.clone();
let mut readable_points = Vec::with_capacity(children.len() + 1);
for (child_lsn, _child_timeline_id) in &children {
readable_points.push(*child_lsn);
}
readable_points.push(head_lsn);
readable_points
};
let (layer_visibility, covered) = layer_map.get_visibility(readable_points);
for (layer_desc, visibility) in layer_visibility {
// FIXME: a more efficiency bulk zip() through the layers rather than NlogN getting each one
let layer = layer_manager.get_from_desc(&layer_desc);
layer.set_visibility(visibility);
}
// TODO: publish our covered KeySpace to our parent, so that when they update their visibility, they can
// avoid assuming that everything at a branch point is visible.
drop(covered);
}
/// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
/// as Level 1 files. Returns whether the L0 layers are fully compacted.
async fn compact_level0(
@@ -698,7 +744,140 @@ impl Timeline {
// This iterator walks through all key-value pairs from all the layers
// we're compacting, in key, LSN order.
let all_values_iter = all_keys.iter();
// If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
// then the Value::Image is ordered before Value::WalRecord.
//
// TODO(https://github.com/neondatabase/neon/issues/8184): remove the page cached blob_io
// option and validation code once we've reached confidence.
enum AllValuesIter<'a> {
PageCachedBlobIo {
all_keys_iter: VecIter<'a>,
},
StreamingKmergeBypassingPageCache {
merge_iter: MergeIterator<'a>,
},
ValidatingStreamingKmergeBypassingPageCache {
mode: CompactL0BypassPageCacheValidation,
merge_iter: MergeIterator<'a>,
all_keys_iter: VecIter<'a>,
},
}
type VecIter<'a> = std::slice::Iter<'a, DeltaEntry<'a>>; // TODO: distinguished lifetimes
impl AllValuesIter<'_> {
async fn next_all_keys_iter(
iter: &mut VecIter<'_>,
ctx: &RequestContext,
) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
let Some(DeltaEntry {
key,
lsn,
val: value_ref,
..
}) = iter.next()
else {
return Ok(None);
};
let value = value_ref.load(ctx).await?;
Ok(Some((*key, *lsn, value)))
}
async fn next(
&mut self,
ctx: &RequestContext,
) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
match self {
AllValuesIter::PageCachedBlobIo { all_keys_iter: iter } => {
Self::next_all_keys_iter(iter, ctx).await
}
AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter } => merge_iter.next().await,
AllValuesIter::ValidatingStreamingKmergeBypassingPageCache { mode, merge_iter, all_keys_iter } => async {
// advance both iterators
let all_keys_iter_item = Self::next_all_keys_iter(all_keys_iter, ctx).await;
let merge_iter_item = merge_iter.next().await;
// compare results & log warnings as needed
macro_rules! rate_limited_warn {
($($arg:tt)*) => {{
if cfg!(debug_assertions) || cfg!(feature = "testing") {
warn!($($arg)*);
panic!("CompactL0BypassPageCacheValidation failure, check logs");
}
use once_cell::sync::Lazy;
use utils::rate_limit::RateLimit;
use std::sync::Mutex;
use std::time::Duration;
static LOGGED: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
let mut rate_limit = LOGGED.lock().unwrap();
rate_limit.call(|| {
warn!($($arg)*);
});
}}
}
match (&all_keys_iter_item, &merge_iter_item) {
(Err(_), Err(_)) => {
// don't bother asserting equivality of the errors
}
(Err(all_keys), Ok(merge)) => {
rate_limited_warn!(?merge, "all_keys_iter returned an error where merge did not: {all_keys:?}");
},
(Ok(all_keys), Err(merge)) => {
rate_limited_warn!(?all_keys, "merge returned an error where all_keys_iter did not: {merge:?}");
},
(Ok(None), Ok(None)) => { }
(Ok(Some(all_keys)), Ok(None)) => {
rate_limited_warn!(?all_keys, "merge returned None where all_keys_iter returned Some");
}
(Ok(None), Ok(Some(merge))) => {
rate_limited_warn!(?merge, "all_keys_iter returned None where merge returned Some");
}
(Ok(Some((all_keys_key, all_keys_lsn, all_keys_value))), Ok(Some((merge_key, merge_lsn, merge_value)))) => {
match mode {
// TODO: in this mode, we still load the value from disk for both iterators, even though we only need the all_keys_iter one
CompactL0BypassPageCacheValidation::KeyLsn => {
let all_keys = (all_keys_key, all_keys_lsn);
let merge = (merge_key, merge_lsn);
if all_keys != merge {
rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN) than all_keys_iter");
}
}
CompactL0BypassPageCacheValidation::KeyLsnValue => {
let all_keys = (all_keys_key, all_keys_lsn, all_keys_value);
let merge = (merge_key, merge_lsn, merge_value);
if all_keys != merge {
rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN,Value) than all_keys_iter");
}
}
}
}
}
// in case of mismatch, trust the legacy all_keys_iter_item
all_keys_iter_item
}.instrument(info_span!("next")).await
}
}
}
let mut all_values_iter = match &self.conf.compact_level0_phase1_value_access {
CompactL0Phase1ValueAccess::PageCachedBlobIo => AllValuesIter::PageCachedBlobIo {
all_keys_iter: all_keys.iter(),
},
CompactL0Phase1ValueAccess::StreamingKmerge { validate } => {
let merge_iter = {
let mut deltas = Vec::with_capacity(deltas_to_compact.len());
for l in deltas_to_compact.iter() {
let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
deltas.push(l);
}
MergeIterator::create(&deltas, &[], ctx)
};
match validate {
None => AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter },
Some(validate) => AllValuesIter::ValidatingStreamingKmergeBypassingPageCache {
mode: validate.clone(),
merge_iter,
all_keys_iter: all_keys.iter(),
},
}
}
};
// This iterator walks through all keys and is needed to calculate size used by each key
let mut all_keys_iter = all_keys
@@ -771,11 +950,11 @@ impl Timeline {
let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
let mut next_hole = 0; // index of next hole in holes vector
for &DeltaEntry {
key, lsn, ref val, ..
} in all_values_iter
while let Some((key, lsn, value)) = all_values_iter
.next(ctx)
.await
.map_err(CompactionError::Other)?
{
let value = val.load(ctx).await.map_err(CompactionError::Other)?;
let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
// We need to check key boundaries once we reach next key or end of layer with the same key
if !same_key || lsn == dup_end_lsn {
@@ -960,6 +1139,10 @@ impl Timeline {
}
}
// Without this, rustc complains about deltas_to_compact still
// being borrowed when we `.into_iter()` below.
drop(all_values_iter);
Ok(CompactLevel0Phase1Result {
new_layers,
deltas_to_compact: deltas_to_compact
@@ -1067,6 +1250,43 @@ impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
}
}
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
pub enum CompactL0Phase1ValueAccess {
/// The old way.
PageCachedBlobIo,
/// The new way.
StreamingKmerge {
/// If set, we run both the old way and the new way, validate that
/// they are identical (=> [`CompactL0BypassPageCacheValidation`]),
/// and if the validation fails,
/// - in tests: fail them with a panic or
/// - in prod, log a rate-limited warning and use the old way's results.
///
/// If not set, we only run the new way and trust its results.
validate: Option<CompactL0BypassPageCacheValidation>,
},
}
/// See [`CompactL0Phase1ValueAccess::StreamingKmerge`].
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "kebab-case")]
pub enum CompactL0BypassPageCacheValidation {
/// Validate that the series of (key, lsn) pairs are the same.
KeyLsn,
/// Validate that the entire output of old and new way is identical.
KeyLsnValue,
}
impl Default for CompactL0Phase1ValueAccess {
fn default() -> Self {
CompactL0Phase1ValueAccess::StreamingKmerge {
// TODO(https://github.com/neondatabase/neon/issues/8184): change to None once confident
validate: Some(CompactL0BypassPageCacheValidation::KeyLsnValue),
}
}
}
impl Timeline {
/// Entry point for new tiered compaction algorithm.
///
@@ -1150,21 +1370,22 @@ impl Timeline {
pub(crate) async fn generate_key_retention(
self: &Arc<Timeline>,
key: Key,
history: &[(Key, Lsn, Value)],
full_history: &[(Key, Lsn, Value)],
horizon: Lsn,
retain_lsn_below_horizon: &[Lsn],
delta_threshold_cnt: usize,
base_img_from_ancestor: Option<(Key, Lsn, Bytes)>,
) -> anyhow::Result<KeyHistoryRetention> {
// Pre-checks for the invariants
if cfg!(debug_assertions) {
for (log_key, _, _) in history {
for (log_key, _, _) in full_history {
assert_eq!(log_key, &key, "mismatched key");
}
for i in 1..history.len() {
assert!(history[i - 1].1 <= history[i].1, "unordered LSN");
if history[i - 1].1 == history[i].1 {
for i in 1..full_history.len() {
assert!(full_history[i - 1].1 <= full_history[i].1, "unordered LSN");
if full_history[i - 1].1 == full_history[i].1 {
assert!(
matches!(history[i - 1].2, Value::Image(_)),
matches!(full_history[i - 1].2, Value::Image(_)),
"unordered delta/image, or duplicated delta"
);
}
@@ -1183,6 +1404,7 @@ impl Timeline {
);
}
}
let has_ancestor = base_img_from_ancestor.is_some();
// Step 1: split history into len(retain_lsn_below_horizon) + 2 buckets, where the last bucket is for all deltas above the horizon,
// and the second-to-last bucket is for the horizon. Each bucket contains lsn_last_bucket < deltas <= lsn_this_bucket.
let (mut split_history, lsn_split_points) = {
@@ -1194,7 +1416,7 @@ impl Timeline {
}
lsn_split_points.push(horizon);
let mut current_idx = 0;
for item @ (_, lsn, _) in history {
for item @ (_, lsn, _) in full_history {
while current_idx < lsn_split_points.len() && *lsn > lsn_split_points[current_idx] {
current_idx += 1;
}
@@ -1216,6 +1438,9 @@ impl Timeline {
// For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
// keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
// dropped.
//
// TODO: in case we have both delta + images for a given LSN and it does not exceed the delta
// threshold, we could have kept delta instead to save space. This is an optimization for the future.
continue;
}
}
@@ -1233,9 +1458,75 @@ impl Timeline {
"should have at least below + above horizon batches"
);
let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new();
if let Some((key, lsn, img)) = base_img_from_ancestor {
replay_history.push((key, lsn, Value::Image(img)));
}
/// Generate debug information for the replay history
fn generate_history_trace(replay_history: &[(Key, Lsn, Value)]) -> String {
use std::fmt::Write;
let mut output = String::new();
if let Some((key, _, _)) = replay_history.first() {
write!(output, "key={} ", key).unwrap();
let mut cnt = 0;
for (_, lsn, val) in replay_history {
if val.is_image() {
write!(output, "i@{} ", lsn).unwrap();
} else if val.will_init() {
write!(output, "di@{} ", lsn).unwrap();
} else {
write!(output, "d@{} ", lsn).unwrap();
}
cnt += 1;
if cnt >= 128 {
write!(output, "... and more").unwrap();
break;
}
}
} else {
write!(output, "<no history>").unwrap();
}
output
}
fn generate_debug_trace(
replay_history: Option<&[(Key, Lsn, Value)]>,
full_history: &[(Key, Lsn, Value)],
lsns: &[Lsn],
horizon: Lsn,
) -> String {
use std::fmt::Write;
let mut output = String::new();
if let Some(replay_history) = replay_history {
writeln!(
output,
"replay_history: {}",
generate_history_trace(replay_history)
)
.unwrap();
} else {
writeln!(output, "replay_history: <disabled>",).unwrap();
}
writeln!(
output,
"full_history: {}",
generate_history_trace(full_history)
)
.unwrap();
writeln!(
output,
"when processing: [{}] horizon={}",
lsns.iter().map(|l| format!("{l}")).join(","),
horizon
)
.unwrap();
output
}
for (i, split_for_lsn) in split_history.into_iter().enumerate() {
// TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly.
records_since_last_image += split_for_lsn.len();
let generate_image = if i == 0 {
let generate_image = if i == 0 && !has_ancestor {
// We always generate images for the first batch (below horizon / lowest retain_lsn)
true
} else if i == batch_cnt - 1 {
@@ -1256,10 +1547,27 @@ impl Timeline {
}
}
if let Some((_, _, val)) = replay_history.first() {
assert!(val.will_init(), "invalid history, no base image");
if !val.will_init() {
return Err(anyhow::anyhow!("invalid history, no base image")).with_context(
|| {
generate_debug_trace(
Some(&replay_history),
full_history,
retain_lsn_below_horizon,
horizon,
)
},
);
}
}
if generate_image && records_since_last_image > 0 {
records_since_last_image = 0;
let replay_history_for_debug = if cfg!(debug_assertions) {
Some(replay_history.clone())
} else {
None
};
let replay_history_for_debug_ref = replay_history_for_debug.as_deref();
let history = std::mem::take(&mut replay_history);
let mut img = None;
let mut records = Vec::with_capacity(history.len());
@@ -1267,14 +1575,30 @@ impl Timeline {
img = Some((*lsn, val.clone()));
for (_, lsn, val) in history.into_iter().skip(1) {
let Value::WalRecord(rec) = val else {
panic!("invalid record")
return Err(anyhow::anyhow!(
"invalid record, first record is image, expect walrecords"
))
.with_context(|| {
generate_debug_trace(
replay_history_for_debug_ref,
full_history,
retain_lsn_below_horizon,
horizon,
)
});
};
records.push((lsn, rec));
}
} else {
for (_, lsn, val) in history.into_iter() {
let Value::WalRecord(rec) = val else {
panic!("invalid record")
return Err(anyhow::anyhow!("invalid record, first record is walrecord, expect rest are walrecord"))
.with_context(|| generate_debug_trace(
replay_history_for_debug_ref,
full_history,
retain_lsn_below_horizon,
horizon,
));
};
records.push((lsn, rec));
}
@@ -1286,12 +1610,11 @@ impl Timeline {
replay_history.push((key, request_lsn, Value::Image(img.clone())));
retention.push(vec![(request_lsn, Value::Image(img))]);
} else {
retention.push(
split_for_lsn
.iter()
.map(|(_, lsn, value)| (*lsn, value.clone()))
.collect(),
);
let deltas = split_for_lsn
.iter()
.map(|(_, lsn, value)| (*lsn, value.clone()))
.collect_vec();
retention.push(deltas);
}
}
let mut result = Vec::with_capacity(retention.len());
@@ -1306,7 +1629,7 @@ impl Timeline {
result.push((lsn_split_points[idx], KeyLogAtLsn(logs)));
}
}
unreachable!()
unreachable!("key retention is empty")
}
/// An experimental compaction building block that combines compaction with garbage collection.
@@ -1317,11 +1640,30 @@ impl Timeline {
/// and create delta layers with all deltas >= gc horizon.
pub(crate) async fn compact_with_gc(
self: &Arc<Self>,
_cancel: &CancellationToken,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<()> {
use std::collections::BTreeSet;
// Block other compaction/GC tasks from running for now. GC-compaction could run along
// with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
// Note that we already acquired the compaction lock when the outer `compact` function gets called.
let gc_lock = async {
tokio::select! {
guard = self.gc_lock.lock() => Ok(guard),
// TODO: refactor to CompactionError to correctly pass cancelled error
_ = cancel.cancelled() => Err(anyhow!("cancelled")),
}
};
let gc_lock = crate::timed(
gc_lock,
"acquires gc lock",
std::time::Duration::from_secs(5),
)
.await?;
info!("running enhanced gc bottom-most compaction");
scopeguard::defer! {
@@ -1358,20 +1700,25 @@ impl Timeline {
retain_lsns_below_horizon.sort();
(selected_layers, gc_cutoff, retain_lsns_below_horizon)
};
let lowest_retain_lsn = retain_lsns_below_horizon
.first()
.copied()
.unwrap_or(gc_cutoff);
if cfg!(debug_assertions) {
assert_eq!(
lowest_retain_lsn,
retain_lsns_below_horizon
.iter()
.min()
.copied()
.unwrap_or(gc_cutoff)
);
}
let lowest_retain_lsn = if self.ancestor_timeline.is_some() {
Lsn(self.ancestor_lsn.0 + 1)
} else {
let res = retain_lsns_below_horizon
.first()
.copied()
.unwrap_or(gc_cutoff);
if cfg!(debug_assertions) {
assert_eq!(
res,
retain_lsns_below_horizon
.iter()
.min()
.copied()
.unwrap_or(gc_cutoff)
);
}
res
};
info!(
"picked {} layers for compaction with gc_cutoff={} lowest_retain_lsn={}",
layer_selection.len(),
@@ -1412,6 +1759,14 @@ impl Timeline {
let mut accumulated_values = Vec::new();
let mut last_key: Option<Key> = None;
enum FlushDeltaResult {
/// Create a new resident layer
CreateResidentLayer(ResidentLayer),
/// Keep an original delta layer
KeepLayer(PersistentLayerKey),
}
#[allow(clippy::too_many_arguments)]
async fn flush_deltas(
deltas: &mut Vec<(Key, Lsn, crate::repository::Value)>,
last_key: Key,
@@ -1420,7 +1775,8 @@ impl Timeline {
tline: &Arc<Timeline>,
lowest_retain_lsn: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<Option<ResidentLayer>> {
last_batch: bool,
) -> anyhow::Result<Option<FlushDeltaResult>> {
// Check if we need to split the delta layer. We split at the original delta layer boundary to avoid
// overlapping layers.
//
@@ -1440,40 +1796,155 @@ impl Timeline {
*current_delta_split_point += 1;
need_split = true;
}
if !need_split {
if !need_split && !last_batch {
return Ok(None);
}
let deltas = std::mem::take(deltas);
let deltas: Vec<(Key, Lsn, Value)> = std::mem::take(deltas);
if deltas.is_empty() {
return Ok(None);
}
let end_lsn = deltas.iter().map(|(_, lsn, _)| lsn).max().copied().unwrap() + 1;
let delta_key = PersistentLayerKey {
key_range: {
let key_start = deltas.first().unwrap().0;
let key_end = deltas.last().unwrap().0.next();
key_start..key_end
},
lsn_range: lowest_retain_lsn..end_lsn,
is_delta: true,
};
{
// Hack: skip delta layer if we need to produce a layer of a same key-lsn.
//
// This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range.
// For example, consider the case where a single delta with range [0x10,0x50) exists.
// And we have branches at LSN 0x10, 0x20, 0x30.
// Then we delete branch @ 0x20.
// Bottom-most compaction may now delete the delta [0x20,0x30).
// And that wouldnt' change the shape of the layer.
//
// Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes.
// That's why it's safe to skip.
let guard = tline.layers.read().await;
if guard.contains_key(&delta_key) {
let layer_generation = guard.get_from_key(&delta_key).metadata().generation;
drop(guard);
if layer_generation == tline.generation {
// TODO: depending on whether we design this compaction process to run along with
// other compactions, there could be layer map modifications after we drop the
// layer guard, and in case it creates duplicated layer key, we will still error
// in the end.
info!(
key=%delta_key,
?layer_generation,
"discard delta layer due to duplicated layer in the same generation"
);
return Ok(Some(FlushDeltaResult::KeepLayer(delta_key)));
}
}
}
let mut delta_layer_writer = DeltaLayerWriter::new(
tline.conf,
tline.timeline_id,
tline.tenant_shard_id,
deltas.first().unwrap().0,
delta_key.key_range.start,
lowest_retain_lsn..end_lsn,
ctx,
)
.await?;
let key_end = deltas.last().unwrap().0.next();
for (key, lsn, val) in deltas {
delta_layer_writer.put_value(key, lsn, val, ctx).await?;
}
let delta_layer = delta_layer_writer.finish(key_end, tline, ctx).await?;
Ok(Some(delta_layer))
let delta_layer = delta_layer_writer
.finish(delta_key.key_range.end, tline, ctx)
.await?;
Ok(Some(FlushDeltaResult::CreateResidentLayer(delta_layer)))
}
let mut image_layer_writer = ImageLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
&(Key::MIN..Key::MAX), // covers the full key range
lowest_retain_lsn,
ctx,
)
.await?;
// Hack the key range to be min..(max-1). Otherwise, the image layer will be
// interpreted as an L0 delta layer.
let hack_image_layer_range = {
let mut end_key = Key::MAX;
end_key.field6 -= 1;
Key::MIN..end_key
};
// Only create image layers when there is no ancestor branches. TODO: create covering image layer
// when some condition meet.
let mut image_layer_writer = if self.ancestor_timeline.is_none() {
Some(
ImageLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
&hack_image_layer_range, // covers the full key range
lowest_retain_lsn,
ctx,
)
.await?,
)
} else {
None
};
/// Returns None if there is no ancestor branch. Throw an error when the key is not found.
///
/// Currently, we always get the ancestor image for each key in the child branch no matter whether the image
/// is needed for reconstruction. This should be fixed in the future.
///
/// Furthermore, we should do vectored get instead of a single get, or better, use k-merge for ancestor
/// images.
async fn get_ancestor_image(
tline: &Arc<Timeline>,
key: Key,
ctx: &RequestContext,
) -> anyhow::Result<Option<(Key, Lsn, Bytes)>> {
if tline.ancestor_timeline.is_none() {
return Ok(None);
};
// This function is implemented as a get of the current timeline at ancestor LSN, therefore reusing
// as much existing code as possible.
let img = tline.get(key, tline.ancestor_lsn, ctx).await?;
Ok(Some((key, tline.ancestor_lsn, img)))
}
let image_layer_key = PersistentLayerKey {
key_range: hack_image_layer_range,
lsn_range: PersistentLayerDesc::image_layer_lsn_range(lowest_retain_lsn),
is_delta: false,
};
// Like with delta layers, it can happen that we re-produce an already existing image layer.
// This could happen when a user triggers force compaction and image generation. In this case,
// it's always safe to rewrite the layer.
let discard_image_layer = {
let guard = self.layers.read().await;
if guard.contains_key(&image_layer_key) {
let layer_generation = guard.get_from_key(&image_layer_key).metadata().generation;
drop(guard);
if layer_generation == self.generation {
// TODO: depending on whether we design this compaction process to run along with
// other compactions, there could be layer map modifications after we drop the
// layer guard, and in case it creates duplicated layer key, we will still error
// in the end.
info!(
key=%image_layer_key,
?layer_generation,
"discard image layer due to duplicated layer key in the same generation",
);
true
} else {
false
}
} else {
false
}
};
// Actually, we can decide not to write to the image layer at all at this point because
// the key and LSN range are determined. However, to keep things simple here, we still
// create this writer, and discard the writer in the end.
let mut delta_values = Vec::new();
let delta_split_points = delta_split_points.into_iter().collect_vec();
@@ -1494,11 +1965,17 @@ impl Timeline {
gc_cutoff,
&retain_lsns_below_horizon,
COMPACTION_DELTA_THRESHOLD,
get_ancestor_image(self, *last_key, ctx).await?,
)
.await?;
// Put the image into the image layer. Currently we have a single big layer for the compaction.
retention
.pipe_to(*last_key, &mut delta_values, &mut image_layer_writer, ctx)
.pipe_to(
*last_key,
&mut delta_values,
image_layer_writer.as_mut(),
ctx,
)
.await?;
delta_layers.extend(
flush_deltas(
@@ -1509,6 +1986,7 @@ impl Timeline {
self,
lowest_retain_lsn,
ctx,
false,
)
.await?,
);
@@ -1527,11 +2005,17 @@ impl Timeline {
gc_cutoff,
&retain_lsns_below_horizon,
COMPACTION_DELTA_THRESHOLD,
get_ancestor_image(self, last_key, ctx).await?,
)
.await?;
// Put the image into the image layer. Currently we have a single big layer for the compaction.
retention
.pipe_to(last_key, &mut delta_values, &mut image_layer_writer, ctx)
.pipe_to(
last_key,
&mut delta_values,
image_layer_writer.as_mut(),
ctx,
)
.await?;
delta_layers.extend(
flush_deltas(
@@ -1542,27 +2026,52 @@ impl Timeline {
self,
lowest_retain_lsn,
ctx,
true,
)
.await?,
);
assert!(delta_values.is_empty(), "unprocessed keys");
let image_layer = image_layer_writer.finish(self, ctx).await?;
let image_layer = if discard_image_layer {
None
} else if let Some(writer) = image_layer_writer {
Some(writer.finish(self, ctx).await?)
} else {
None
};
info!(
"produced {} delta layers and {} image layers",
delta_layers.len(),
1
if image_layer.is_some() { 1 } else { 0 }
);
let mut compact_to = Vec::new();
compact_to.extend(delta_layers);
compact_to.push(image_layer);
let mut keep_layers = HashSet::new();
for action in delta_layers {
match action {
FlushDeltaResult::CreateResidentLayer(layer) => {
compact_to.push(layer);
}
FlushDeltaResult::KeepLayer(l) => {
keep_layers.insert(l);
}
}
}
if discard_image_layer {
keep_layers.insert(image_layer_key);
}
let mut layer_selection = layer_selection;
layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
compact_to.extend(image_layer);
// Step 3: Place back to the layer map.
{
let mut guard = self.layers.write().await;
guard.finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
};
self.remote_client
.schedule_compaction_update(&layer_selection, &compact_to)?;
drop(gc_lock);
Ok(())
}
}

View File

@@ -63,10 +63,19 @@ pub(super) async fn delete_local_timeline_directory(
tenant_shard_id: TenantShardId,
timeline: &Timeline,
) -> anyhow::Result<()> {
let guards = async { tokio::join!(timeline.gc_lock.lock(), timeline.compaction_lock.lock()) };
let guards = crate::timed(
guards,
"acquire gc and compaction locks",
// Always ensure the lock order is compaction -> gc.
let compaction_lock = timeline.compaction_lock.lock();
let compaction_lock = crate::timed(
compaction_lock,
"acquires compaction lock",
std::time::Duration::from_secs(5),
)
.await;
let gc_lock = timeline.gc_lock.lock();
let gc_lock = crate::timed(
gc_lock,
"acquires gc lock",
std::time::Duration::from_secs(5),
)
.await;
@@ -107,7 +116,8 @@ pub(super) async fn delete_local_timeline_directory(
.context("fsync_pre_mark_remove")?;
info!("finished deleting layer files, releasing locks");
drop(guards);
drop(gc_lock);
drop(compaction_lock);
fail::fail_point!("timeline-delete-after-rm", |_| {
Err(anyhow::anyhow!("failpoint: timeline-delete-after-rm"))?
@@ -206,11 +216,10 @@ impl DeleteTimelineFlow {
// NB: If this fails half-way through, and is retried, the retry will go through
// all the same steps again. Make sure the code here is idempotent, and don't
// error out if some of the shutdown tasks have already been completed!
#[instrument(skip_all, fields(%inplace))]
#[instrument(skip_all)]
pub async fn run(
tenant: &Arc<Tenant>,
timeline_id: TimelineId,
inplace: bool,
) -> Result<(), DeleteTimelineError> {
super::debug_assert_current_span_has_tenant_and_timeline_id();
@@ -235,11 +244,7 @@ impl DeleteTimelineFlow {
))?
});
if inplace {
Self::background(guard, tenant.conf, tenant, &timeline).await?
} else {
Self::schedule_background(guard, tenant.conf, Arc::clone(tenant), timeline);
}
Self::schedule_background(guard, tenant.conf, Arc::clone(tenant), timeline);
Ok(())
}

View File

@@ -0,0 +1,967 @@
//! An efficient way to keep the timeline gate open without preventing
//! timeline shutdown for longer than a single call to a timeline method.
//!
//! # Motivation
//!
//! On a single page service connection, we're typically serving a single TenantTimelineId.
//!
//! Without sharding, there is a single Timeline object to which we dispatch
//! all requests. For example, a getpage request gets dispatched to the
//! Timeline::get method of the Timeline object that represents the
//! (tenant,timeline) of that connection.
//!
//! With sharding, for each request that comes in on the connection,
//! we first have to perform shard routing based on the requested key (=~ page number).
//! The result of shard routing is a Timeline object.
//! We then dispatch the request to that Timeline object.
//!
//! Regardless of whether the tenant is sharded or not, we want to ensure that
//! we hold the Timeline gate open while we're invoking the method on the
//! Timeline object.
//!
//! However, we want to avoid the overhead of entering the gate for every
//! method invocation.
//!
//! Further, for shard routing, we want to avoid calling the tenant manager to
//! resolve the shard for every request. Instead, we want to cache the
//! routing result so we can bypass the tenant manager for all subsequent requests
//! that get routed to that shard.
//!
//! Regardless of how we accomplish the above, it should not
//! prevent the Timeline from shutting down promptly.
//!
//! # Design
//!
//! There are three user-facing data structures:
//! - `PerTimelineState`: a struct embedded into each Timeline struct. Lifetime == Timeline lifetime.
//! - `Cache`: a struct private to each connection handler; Lifetime == connection lifetime.
//! - `Handle`: a smart pointer that holds the Timeline gate open and derefs to `&Timeline`.
//! Lifetime: for a single request dispatch on the Timeline (i.e., one getpage request)
//!
//! The `Handle` is just a wrapper around an `Arc<HandleInner>`.
//!
//! There is one long-lived `Arc<HandleInner>`, which is stored in the `PerTimelineState`.
//! The `Cache` stores a `Weak<HandleInner>` for each cached Timeline.
//!
//! To dispatch a request, the page service connection calls `Cache::get`.
//!
//! A cache miss means we consult the tenant manager for shard routing,
//! resulting in an `Arc<Timeline>`. We enter its gate _once_ and construct an
//! `Arc<HandleInner>`. We store a `Weak<HandleInner>` in the cache
//! and the `Arc<HandleInner>` in the `PerTimelineState`.
//!
//! For subsequent requests, `Cache::get` will perform a "fast path" shard routing
//! and find the `Weak<HandleInner>` in the cache.
//! We upgrade the `Weak<HandleInner>` to an `Arc<HandleInner>` and wrap it in the user-facing `Handle` type.
//!
//! The request handler dispatches the request to the right `<Handle as Deref<Target = Timeline>>::$request_method`.
//! It then drops the `Handle`, which drops the `Arc<HandleInner>`.
//!
//! # Memory Management / How The Reference Cycle Is Broken
//!
//! The attentive reader may have noticed the strong reference cycle
//! from `Arc<HandleInner>` to `PerTimelineState` to `Arc<Timeline>`.
//!
//! This cycle is intentional: while it exists, the `Cache` can upgrade its
//! `Weak<HandleInner>` to an `Arc<HandleInner>` in a single atomic operation.
//!
//! The cycle is broken by either
//! - `PerTimelineState::shutdown` or
//! - dropping the `Cache`.
//!
//! Concurrently existing `Handle`s will extend the existence of the cycle.
//! However, since `Handle`s are short-lived and new `Handle`s are not
//! handed out after either `PerTimelineState::shutdown` or `Cache` drop,
//! that extension of the cycle is bounded.
//!
//! # Fast Path for Shard Routing
//!
//! The `Cache` has a fast path for shard routing to avoid calling into
//! the tenant manager for every request.
//!
//! The `Cache` maintains a hash map of `ShardTimelineId` to `Weak<HandleInner>`.
//!
//! The current implementation uses the first entry in the hash map
//! to determine the `ShardParameters` and derive the correct
//! `ShardIndex` for the requested key.
//!
//! It then looks up the hash map for that `ShardTimelineId := {ShardIndex,TimelineId}`.
//!
//! If the lookup is successful and the `Weak<HandleInner>` can be upgraded,
//! it's a hit.
//!
//! ## Cache invalidation
//!
//! The insight is that cache invalidation is sufficient and most efficiently done lazily.
//! The only reasons why an entry in the cache can become stale are:
//! 1. The `PerTimelineState` / Timeline is shutting down e.g. because the shard is
//! being detached, timeline or shard deleted, or pageserver is shutting down.
//! 2. We're doing a shard split and new traffic should be routed to the child shards.
//!
//! Regarding (1), we will eventually fail to upgrade the `Weak<HandleInner>` once the
//! timeline has shut down, and when that happens, we remove the entry from the cache.
//!
//! Regarding (2), the insight is that it is toally fine to keep dispatching requests
//! to the parent shard during a shard split. Eventually, the shard split task will
//! shut down the parent => case (1).
use std::collections::hash_map;
use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::Weak;
use pageserver_api::shard::ShardIdentity;
use tracing::instrument;
use tracing::trace;
use utils::id::TimelineId;
use utils::shard::ShardIndex;
use utils::shard::ShardNumber;
use crate::tenant::mgr::ShardSelector;
/// The requirement for Debug is so that #[derive(Debug)] works in some places.
pub(crate) trait Types: Sized + std::fmt::Debug {
type TenantManagerError: Sized + std::fmt::Debug;
type TenantManager: TenantManager<Self> + Sized;
type Timeline: ArcTimeline<Self> + Sized;
}
/// Uniquely identifies a [`Cache`] instance over the lifetime of the process.
/// Required so [`Cache::drop`] can take out the handles from the [`PerTimelineState`].
/// Alternative to this would be to allocate [`Cache`] in a `Box` and identify it by the pointer.
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
struct CacheId(u64);
impl CacheId {
fn next() -> Self {
static NEXT_ID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
let id = NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if id == 0 {
panic!("CacheId::new() returned 0, overflow");
}
Self(id)
}
}
/// See module-level comment.
pub(crate) struct Cache<T: Types> {
id: CacheId,
map: Map<T>,
}
type Map<T> = HashMap<ShardTimelineId, Weak<HandleInner<T>>>;
impl<T: Types> Default for Cache<T> {
fn default() -> Self {
Self {
id: CacheId::next(),
map: Default::default(),
}
}
}
#[derive(PartialEq, Eq, Debug, Hash, Clone, Copy)]
pub(crate) struct ShardTimelineId {
pub(crate) shard_index: ShardIndex,
pub(crate) timeline_id: TimelineId,
}
/// See module-level comment.
pub(crate) struct Handle<T: Types>(Arc<HandleInner<T>>);
struct HandleInner<T: Types> {
shut_down: AtomicBool,
timeline: T::Timeline,
// The timeline's gate held open.
_gate_guard: utils::sync::gate::GateGuard,
}
/// Embedded in each [`Types::Timeline`] as the anchor for the only long-lived strong ref to `HandleInner`.
///
/// See module-level comment for details.
pub struct PerTimelineState<T: Types> {
// None = shutting down
handles: Mutex<Option<HashMap<CacheId, Arc<HandleInner<T>>>>>,
}
impl<T: Types> Default for PerTimelineState<T> {
fn default() -> Self {
Self {
handles: Mutex::new(Some(Default::default())),
}
}
}
/// Abstract view of [`crate::tenant::mgr`], for testability.
pub(crate) trait TenantManager<T: Types> {
/// Invoked by [`Cache::get`] to resolve a [`ShardTimelineId`] to a [`Types::Timeline`].
/// Errors are returned as [`GetError::TenantManager`].
async fn resolve(
&self,
timeline_id: TimelineId,
shard_selector: ShardSelector,
) -> Result<T::Timeline, T::TenantManagerError>;
}
/// Abstract view of an [`Arc<Timeline>`], for testability.
pub(crate) trait ArcTimeline<T: Types>: Clone {
fn gate(&self) -> &utils::sync::gate::Gate;
fn shard_timeline_id(&self) -> ShardTimelineId;
fn get_shard_identity(&self) -> &ShardIdentity;
fn per_timeline_state(&self) -> &PerTimelineState<T>;
}
/// Errors returned by [`Cache::get`].
#[derive(Debug)]
pub(crate) enum GetError<T: Types> {
TenantManager(T::TenantManagerError),
TimelineGateClosed,
PerTimelineStateShutDown,
}
/// Internal type used in [`Cache::get`].
enum RoutingResult<T: Types> {
FastPath(Handle<T>),
SlowPath(ShardTimelineId),
NeedConsultTenantManager,
}
impl<T: Types> Cache<T> {
/// See module-level comment for details.
///
/// Does NOT check for the shutdown state of [`Types::Timeline`].
/// Instead, the methods of [`Types::Timeline`] that are invoked through
/// the [`Handle`] are responsible for checking these conditions
/// and if so, return an error that causes the page service to
/// close the connection.
#[instrument(level = "trace", skip_all)]
pub(crate) async fn get(
&mut self,
timeline_id: TimelineId,
shard_selector: ShardSelector,
tenant_manager: &T::TenantManager,
) -> Result<Handle<T>, GetError<T>> {
// terminates because each iteration removes an element from the map
loop {
let handle = self
.get_impl(timeline_id, shard_selector, tenant_manager)
.await?;
if handle.0.shut_down.load(Ordering::Relaxed) {
let removed = self
.map
.remove(&handle.0.timeline.shard_timeline_id())
.expect("invariant of get_impl is that the returned handle is in the map");
assert!(
Weak::ptr_eq(&removed, &Arc::downgrade(&handle.0)),
"shard_timeline_id() incorrect?"
);
} else {
return Ok(handle);
}
}
}
#[instrument(level = "trace", skip_all)]
async fn get_impl(
&mut self,
timeline_id: TimelineId,
shard_selector: ShardSelector,
tenant_manager: &T::TenantManager,
) -> Result<Handle<T>, GetError<T>> {
let miss: ShardSelector = {
let routing_state = self.shard_routing(timeline_id, shard_selector);
match routing_state {
RoutingResult::FastPath(handle) => return Ok(handle),
RoutingResult::SlowPath(key) => match self.map.get(&key) {
Some(cached) => match cached.upgrade() {
Some(upgraded) => return Ok(Handle(upgraded)),
None => {
trace!("handle cache stale");
self.map.remove(&key).unwrap();
ShardSelector::Known(key.shard_index)
}
},
None => ShardSelector::Known(key.shard_index),
},
RoutingResult::NeedConsultTenantManager => shard_selector,
}
};
self.get_miss(timeline_id, miss, tenant_manager).await
}
#[inline(always)]
fn shard_routing(
&mut self,
timeline_id: TimelineId,
shard_selector: ShardSelector,
) -> RoutingResult<T> {
loop {
// terminates because when every iteration we remove an element from the map
let Some((first_key, first_handle)) = self.map.iter().next() else {
return RoutingResult::NeedConsultTenantManager;
};
let Some(first_handle) = first_handle.upgrade() else {
// TODO: dedup with get()
trace!("handle cache stale");
let first_key_owned = *first_key;
self.map.remove(&first_key_owned).unwrap();
continue;
};
let first_handle_shard_identity = first_handle.timeline.get_shard_identity();
let make_shard_index = |shard_num: ShardNumber| ShardIndex {
shard_number: shard_num,
shard_count: first_handle_shard_identity.count,
};
let need_idx = match shard_selector {
ShardSelector::Page(key) => {
make_shard_index(first_handle_shard_identity.get_shard_number(&key))
}
ShardSelector::Zero => make_shard_index(ShardNumber(0)),
ShardSelector::Known(shard_idx) => shard_idx,
};
let need_shard_timeline_id = ShardTimelineId {
shard_index: need_idx,
timeline_id,
};
let first_handle_shard_timeline_id = ShardTimelineId {
shard_index: first_handle_shard_identity.shard_index(),
timeline_id: first_handle.timeline.shard_timeline_id().timeline_id,
};
if need_shard_timeline_id == first_handle_shard_timeline_id {
return RoutingResult::FastPath(Handle(first_handle));
} else {
return RoutingResult::SlowPath(need_shard_timeline_id);
}
}
}
#[instrument(level = "trace", skip_all)]
#[inline(always)]
async fn get_miss(
&mut self,
timeline_id: TimelineId,
shard_selector: ShardSelector,
tenant_manager: &T::TenantManager,
) -> Result<Handle<T>, GetError<T>> {
match tenant_manager.resolve(timeline_id, shard_selector).await {
Ok(timeline) => {
let key = timeline.shard_timeline_id();
match &shard_selector {
ShardSelector::Zero => assert_eq!(key.shard_index.shard_number, ShardNumber(0)),
ShardSelector::Page(_) => (), // gotta trust tenant_manager
ShardSelector::Known(idx) => assert_eq!(idx, &key.shard_index),
}
let gate_guard = match timeline.gate().enter() {
Ok(guard) => guard,
Err(_) => {
return Err(GetError::TimelineGateClosed);
}
};
trace!("creating new HandleInner");
let handle = Arc::new(
// TODO: global metric that keeps track of the number of live HandlerTimeline instances
// so we can identify reference cycle bugs.
HandleInner {
shut_down: AtomicBool::new(false),
_gate_guard: gate_guard,
timeline: timeline.clone(),
},
);
let handle = {
let mut lock_guard = timeline
.per_timeline_state()
.handles
.lock()
.expect("mutex poisoned");
match &mut *lock_guard {
Some(per_timeline_state) => {
let replaced = per_timeline_state.insert(self.id, Arc::clone(&handle));
assert!(replaced.is_none(), "some earlier code left a stale handle");
match self.map.entry(key) {
hash_map::Entry::Occupied(_o) => {
// This cannot not happen because
// 1. we're the _miss_ handle, i.e., `self.map` didn't contain an entry and
// 2. we were holding &mut self during .resolve().await above, so, no other thread can have inserted a handle
// while we were waiting for the tenant manager.
unreachable!()
}
hash_map::Entry::Vacant(v) => {
v.insert(Arc::downgrade(&handle));
handle
}
}
}
None => {
return Err(GetError::PerTimelineStateShutDown);
}
}
};
Ok(Handle(handle))
}
Err(e) => Err(GetError::TenantManager(e)),
}
}
}
impl<T: Types> PerTimelineState<T> {
/// After this method returns, [`Cache::get`] will never again return a [`Handle`]
/// to the [`Types::Timeline`] that embeds this per-timeline state.
/// Even if [`TenantManager::resolve`] would still resolve to it.
///
/// Already-alive [`Handle`]s for will remain open, usable, and keeping the [`ArcTimeline`] alive.
/// That's ok because they're short-lived. See module-level comment for details.
#[instrument(level = "trace", skip_all)]
pub(super) fn shutdown(&self) {
let handles = self
.handles
.lock()
.expect("mutex poisoned")
// NB: this .take() sets locked to None.
// That's what makes future `Cache::get` misses fail.
// Cache hits are taken care of below.
.take();
let Some(handles) = handles else {
trace!("already shut down");
return;
};
for handle in handles.values() {
// Make hits fail.
handle.shut_down.store(true, Ordering::Relaxed);
}
drop(handles);
}
}
impl<T: Types> std::ops::Deref for Handle<T> {
type Target = T::Timeline;
fn deref(&self) -> &Self::Target {
&self.0.timeline
}
}
#[cfg(test)]
impl<T: Types> Drop for HandleInner<T> {
fn drop(&mut self) {
trace!("HandleInner dropped");
}
}
// When dropping a [`Cache`], prune its handles in the [`PerTimelineState`] to break the reference cycle.
impl<T: Types> Drop for Cache<T> {
fn drop(&mut self) {
for (_, weak) in self.map.drain() {
if let Some(strong) = weak.upgrade() {
// handle is still being kept alive in PerTimelineState
let timeline = strong.timeline.per_timeline_state();
let mut handles = timeline.handles.lock().expect("mutex poisoned");
if let Some(handles) = &mut *handles {
let Some(removed) = handles.remove(&self.id) else {
// There could have been a shutdown inbetween us upgrading the weak and locking the mutex.
continue;
};
assert!(Arc::ptr_eq(&removed, &strong));
}
}
}
}
}
#[cfg(test)]
mod tests {
use pageserver_api::{
key::{rel_block_to_key, Key, DBDIR_KEY},
models::ShardParameters,
reltag::RelTag,
shard::ShardStripeSize,
};
use utils::shard::ShardCount;
use super::*;
const FOREVER: std::time::Duration = std::time::Duration::from_secs(u64::MAX);
#[derive(Debug)]
struct TestTypes;
impl Types for TestTypes {
type TenantManagerError = anyhow::Error;
type TenantManager = StubManager;
type Timeline = Arc<StubTimeline>;
}
struct StubManager {
shards: Vec<Arc<StubTimeline>>,
}
struct StubTimeline {
gate: utils::sync::gate::Gate,
id: TimelineId,
shard: ShardIdentity,
per_timeline_state: PerTimelineState<TestTypes>,
myself: Weak<StubTimeline>,
}
impl StubTimeline {
fn getpage(&self) {
// do nothing
}
}
impl ArcTimeline<TestTypes> for Arc<StubTimeline> {
fn gate(&self) -> &utils::sync::gate::Gate {
&self.gate
}
fn shard_timeline_id(&self) -> ShardTimelineId {
ShardTimelineId {
shard_index: self.shard.shard_index(),
timeline_id: self.id,
}
}
fn get_shard_identity(&self) -> &ShardIdentity {
&self.shard
}
fn per_timeline_state(&self) -> &PerTimelineState<TestTypes> {
&self.per_timeline_state
}
}
impl TenantManager<TestTypes> for StubManager {
async fn resolve(
&self,
timeline_id: TimelineId,
shard_selector: ShardSelector,
) -> anyhow::Result<Arc<StubTimeline>> {
for timeline in &self.shards {
if timeline.id == timeline_id {
match &shard_selector {
ShardSelector::Zero if timeline.shard.is_shard_zero() => {
return Ok(Arc::clone(timeline));
}
ShardSelector::Zero => continue,
ShardSelector::Page(key) if timeline.shard.is_key_local(key) => {
return Ok(Arc::clone(timeline));
}
ShardSelector::Page(_) => continue,
ShardSelector::Known(idx) if idx == &timeline.shard.shard_index() => {
return Ok(Arc::clone(timeline));
}
ShardSelector::Known(_) => continue,
}
}
}
anyhow::bail!("not found")
}
}
#[tokio::test(start_paused = true)]
async fn test_timeline_shutdown() {
crate::tenant::harness::setup_logging();
let timeline_id = TimelineId::generate();
let shard0 = Arc::new_cyclic(|myself| StubTimeline {
gate: Default::default(),
id: timeline_id,
shard: ShardIdentity::unsharded(),
per_timeline_state: PerTimelineState::default(),
myself: myself.clone(),
});
let mgr = StubManager {
shards: vec![shard0.clone()],
};
let key = DBDIR_KEY;
let mut cache = Cache::<TestTypes>::default();
//
// fill the cache
//
assert_eq!(
(Arc::strong_count(&shard0), Arc::weak_count(&shard0)),
(2, 1),
"strong: shard0, mgr; weak: myself"
);
let handle: Handle<_> = cache
.get(timeline_id, ShardSelector::Page(key), &mgr)
.await
.expect("we have the timeline");
let handle_inner_weak = Arc::downgrade(&handle.0);
assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
assert_eq!(
(
Weak::strong_count(&handle_inner_weak),
Weak::weak_count(&handle_inner_weak)
),
(2, 2),
"strong: handle, per_timeline_state, weak: handle_inner_weak, cache"
);
assert_eq!(cache.map.len(), 1);
assert_eq!(
(Arc::strong_count(&shard0), Arc::weak_count(&shard0)),
(3, 1),
"strong: handleinner(per_timeline_state), shard0, mgr; weak: myself"
);
drop(handle);
assert_eq!(
(Arc::strong_count(&shard0), Arc::weak_count(&shard0)),
(3, 1),
"strong: handleinner(per_timeline_state), shard0, mgr; weak: myself"
);
//
// demonstrate that Handle holds up gate closure
// but shutdown prevents new handles from being handed out
//
tokio::select! {
_ = shard0.gate.close() => {
panic!("cache and per-timeline handler state keep cache open");
}
_ = tokio::time::sleep(FOREVER) => {
// NB: first poll of close() makes it enter closing state
}
}
let handle = cache
.get(timeline_id, ShardSelector::Page(key), &mgr)
.await
.expect("we have the timeline");
assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
// SHUTDOWN
shard0.per_timeline_state.shutdown(); // keeping handle alive across shutdown
assert_eq!(
1,
Weak::strong_count(&handle_inner_weak),
"through local var handle"
);
assert_eq!(
cache.map.len(),
1,
"this is an implementation detail but worth pointing out: we can't clear the cache from shutdown(), it's cleared on first access after"
);
assert_eq!(
(Arc::strong_count(&shard0), Arc::weak_count(&shard0)),
(3, 1),
"strong: handleinner(via handle), shard0, mgr; weak: myself"
);
// this handle is perfectly usable
handle.getpage();
cache
.get(timeline_id, ShardSelector::Page(key), &mgr)
.await
.err()
.expect("documented behavior: can't get new handle after shutdown, even if there is an alive Handle");
assert_eq!(
cache.map.len(),
0,
"first access after shutdown cleans up the Weak's from the cache"
);
tokio::select! {
_ = shard0.gate.close() => {
panic!("handle is keeping gate open");
}
_ = tokio::time::sleep(FOREVER) => { }
}
drop(handle);
assert_eq!(
0,
Weak::strong_count(&handle_inner_weak),
"the HandleInner destructor already ran"
);
assert_eq!(
(Arc::strong_count(&shard0), Arc::weak_count(&shard0)),
(2, 1),
"strong: shard0, mgr; weak: myself"
);
// closing gate succeeds after dropping handle
tokio::select! {
_ = shard0.gate.close() => { }
_ = tokio::time::sleep(FOREVER) => {
panic!("handle is dropped, no other gate holders exist")
}
}
// map gets cleaned on next lookup
cache
.get(timeline_id, ShardSelector::Page(key), &mgr)
.await
.err()
.expect("documented behavior: can't get new handle after shutdown");
assert_eq!(cache.map.len(), 0);
// ensure all refs to shard0 are gone and we're not leaking anything
let myself = Weak::clone(&shard0.myself);
drop(shard0);
drop(mgr);
assert_eq!(Weak::strong_count(&myself), 0);
}
#[tokio::test]
async fn test_multiple_timelines_and_deletion() {
crate::tenant::harness::setup_logging();
let timeline_a = TimelineId::generate();
let timeline_b = TimelineId::generate();
assert_ne!(timeline_a, timeline_b);
let timeline_a = Arc::new_cyclic(|myself| StubTimeline {
gate: Default::default(),
id: timeline_a,
shard: ShardIdentity::unsharded(),
per_timeline_state: PerTimelineState::default(),
myself: myself.clone(),
});
let timeline_b = Arc::new_cyclic(|myself| StubTimeline {
gate: Default::default(),
id: timeline_b,
shard: ShardIdentity::unsharded(),
per_timeline_state: PerTimelineState::default(),
myself: myself.clone(),
});
let mut mgr = StubManager {
shards: vec![timeline_a.clone(), timeline_b.clone()],
};
let key = DBDIR_KEY;
let mut cache = Cache::<TestTypes>::default();
cache
.get(timeline_a.id, ShardSelector::Page(key), &mgr)
.await
.expect("we have it");
cache
.get(timeline_b.id, ShardSelector::Page(key), &mgr)
.await
.expect("we have it");
assert_eq!(cache.map.len(), 2);
// delete timeline A
timeline_a.per_timeline_state.shutdown();
mgr.shards.retain(|t| t.id != timeline_a.id);
assert!(
mgr.resolve(timeline_a.id, ShardSelector::Page(key))
.await
.is_err(),
"broken StubManager implementation"
);
assert_eq!(
cache.map.len(),
2,
"cache still has a Weak handle to Timeline A"
);
cache
.get(timeline_a.id, ShardSelector::Page(key), &mgr)
.await
.err()
.expect("documented behavior: can't get new handle after shutdown");
assert_eq!(cache.map.len(), 1, "next access cleans up the cache");
cache
.get(timeline_b.id, ShardSelector::Page(key), &mgr)
.await
.expect("we still have it");
}
fn make_relation_key_for_shard(shard: ShardNumber, params: &ShardParameters) -> Key {
rel_block_to_key(
RelTag {
spcnode: 1663,
dbnode: 208101,
relnode: 2620,
forknum: 0,
},
shard.0 as u32 * params.stripe_size.0,
)
}
#[tokio::test(start_paused = true)]
async fn test_shard_split() {
crate::tenant::harness::setup_logging();
let timeline_id = TimelineId::generate();
let parent = Arc::new_cyclic(|myself| StubTimeline {
gate: Default::default(),
id: timeline_id,
shard: ShardIdentity::unsharded(),
per_timeline_state: PerTimelineState::default(),
myself: myself.clone(),
});
let child_params = ShardParameters {
count: ShardCount(2),
stripe_size: ShardStripeSize::default(),
};
let child0 = Arc::new_cyclic(|myself| StubTimeline {
gate: Default::default(),
id: timeline_id,
shard: ShardIdentity::from_params(ShardNumber(0), &child_params),
per_timeline_state: PerTimelineState::default(),
myself: myself.clone(),
});
let child1 = Arc::new_cyclic(|myself| StubTimeline {
gate: Default::default(),
id: timeline_id,
shard: ShardIdentity::from_params(ShardNumber(1), &child_params),
per_timeline_state: PerTimelineState::default(),
myself: myself.clone(),
});
let child_shards_by_shard_number = [child0.clone(), child1.clone()];
let mut cache = Cache::<TestTypes>::default();
// fill the cache with the parent
for i in 0..2 {
let handle = cache
.get(
timeline_id,
ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)),
&StubManager {
shards: vec![parent.clone()],
},
)
.await
.expect("we have it");
assert!(
Weak::ptr_eq(&handle.myself, &parent.myself),
"mgr returns parent first"
);
drop(handle);
}
//
// SHARD SPLIT: tenant manager changes, but the cache isn't informed
//
// while we haven't shut down the parent, the cache will return the cached parent, even
// if the tenant manager returns the child
for i in 0..2 {
let handle = cache
.get(
timeline_id,
ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)),
&StubManager {
shards: vec![], // doesn't matter what's in here, the cache is fully loaded
},
)
.await
.expect("we have it");
assert!(
Weak::ptr_eq(&handle.myself, &parent.myself),
"mgr returns parent"
);
drop(handle);
}
let parent_handle = cache
.get(
timeline_id,
ShardSelector::Page(make_relation_key_for_shard(ShardNumber(0), &child_params)),
&StubManager {
shards: vec![parent.clone()],
},
)
.await
.expect("we have it");
assert!(Weak::ptr_eq(&parent_handle.myself, &parent.myself));
// invalidate the cache
parent.per_timeline_state.shutdown();
// the cache will now return the child, even though the parent handle still exists
for i in 0..2 {
let handle = cache
.get(
timeline_id,
ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)),
&StubManager {
shards: vec![child0.clone(), child1.clone()], // <====== this changed compared to previous loop
},
)
.await
.expect("we have it");
assert!(
Weak::ptr_eq(
&handle.myself,
&child_shards_by_shard_number[i as usize].myself
),
"mgr returns child"
);
drop(handle);
}
// all the while the parent handle kept the parent gate open
tokio::select! {
_ = parent_handle.gate.close() => {
panic!("parent handle is keeping gate open");
}
_ = tokio::time::sleep(FOREVER) => { }
}
drop(parent_handle);
tokio::select! {
_ = parent.gate.close() => { }
_ = tokio::time::sleep(FOREVER) => {
panic!("parent handle is dropped, no other gate holders exist")
}
}
}
#[tokio::test(start_paused = true)]
async fn test_connection_handler_exit() {
crate::tenant::harness::setup_logging();
let timeline_id = TimelineId::generate();
let shard0 = Arc::new_cyclic(|myself| StubTimeline {
gate: Default::default(),
id: timeline_id,
shard: ShardIdentity::unsharded(),
per_timeline_state: PerTimelineState::default(),
myself: myself.clone(),
});
let mgr = StubManager {
shards: vec![shard0.clone()],
};
let key = DBDIR_KEY;
// Simulate 10 connections that's opened, used, and closed
let mut used_handles = vec![];
for _ in 0..10 {
let mut cache = Cache::<TestTypes>::default();
let handle = {
let handle = cache
.get(timeline_id, ShardSelector::Page(key), &mgr)
.await
.expect("we have the timeline");
assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
handle
};
handle.getpage();
used_handles.push(Arc::downgrade(&handle.0));
}
// No handles exist, thus gates are closed and don't require shutdown
assert!(used_handles
.iter()
.all(|weak| Weak::strong_count(weak) == 0));
// ... thus the gate should close immediately, even without shutdown
tokio::select! {
_ = shard0.gate.close() => { }
_ = tokio::time::sleep(FOREVER) => {
panic!("handle is dropped, no other gate holders exist")
}
}
}
}

View File

@@ -35,6 +35,10 @@ impl LayerManager {
self.layer_fmgr.get_from_desc(desc)
}
pub(crate) fn get_from_key(&self, desc: &PersistentLayerKey) -> Layer {
self.layer_fmgr.get_from_key(desc)
}
/// Get an immutable reference to the layer map.
///
/// We expect users only to be able to get an immutable layer map. If users want to make modifications,
@@ -365,16 +369,20 @@ impl<T> Default for LayerFileManager<T> {
}
impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
fn get_from_desc(&self, desc: &PersistentLayerDesc) -> T {
fn get_from_key(&self, key: &PersistentLayerKey) -> T {
// The assumption for the `expect()` is that all code maintains the following invariant:
// A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
self.0
.get(&desc.key())
.with_context(|| format!("get layer from desc: {}", desc.layer_name()))
.get(key)
.with_context(|| format!("get layer from key: {}", key))
.expect("not found")
.clone()
}
fn get_from_desc(&self, desc: &PersistentLayerDesc) -> T {
self.get_from_key(&desc.key())
}
fn contains_key(&self, key: &PersistentLayerKey) -> bool {
self.0.contains_key(key)
}

View File

@@ -535,7 +535,7 @@ impl WalIngest {
// The page may be uninitialized. If so, we can't set the LSN because
// that would corrupt the page.
//
if !page_is_new(&image) {
if !blk.opaque && !page_is_new(&image) {
page_set_lsn(&mut image, lsn)
}
assert_eq!(image.len(), BLCKSZ as usize);

View File

@@ -129,6 +129,7 @@ pub struct DecodedBkpBlock {
pub apply_image: bool,
/* has image that should be restored */
pub will_init: bool,
pub opaque: bool,
/* record doesn't need previous page version to apply */
//char *bkp_image;
pub hole_offset: u16,
@@ -1000,6 +1001,7 @@ pub fn decode_wal_record(
blk.has_image = (fork_flags & pg_constants::BKPBLOCK_HAS_IMAGE) != 0;
blk.has_data = (fork_flags & pg_constants::BKPBLOCK_HAS_DATA) != 0;
blk.will_init = (fork_flags & pg_constants::BKPBLOCK_WILL_INIT) != 0;
blk.opaque = (fork_flags & pg_constants::BKPBLOCK_OPAQUE) != 0;
blk.data_len = buf.get_u16_le();
/* TODO cross-check that the HAS_DATA flag is set iff data_length > 0 */

View File

@@ -241,6 +241,9 @@ impl PostgresRedoManager {
/// Shut down the WAL redo manager.
///
/// Returns `true` if this call was the one that initiated shutdown.
/// `true` may be observed by no caller if the first caller stops polling.
///
/// After this future completes
/// - no redo process is running
/// - no new redo process will be spawned
@@ -250,22 +253,32 @@ impl PostgresRedoManager {
/// # Cancel-Safety
///
/// This method is cancellation-safe.
pub async fn shutdown(&self) {
pub async fn shutdown(&self) -> bool {
// prevent new processes from being spawned
let permit = match self.redo_process.get_or_init_detached().await {
let maybe_permit = match self.redo_process.get_or_init_detached().await {
Ok(guard) => {
let (proc, permit) = guard.take_and_deinit();
drop(proc); // this just drops the Arc, its refcount may not be zero yet
permit
if matches!(&*guard, ProcessOnceCell::ManagerShutDown) {
None
} else {
let (proc, permit) = guard.take_and_deinit();
drop(proc); // this just drops the Arc, its refcount may not be zero yet
Some(permit)
}
}
Err(permit) => permit,
Err(permit) => Some(permit),
};
let it_was_us = if let Some(permit) = maybe_permit {
self.redo_process
.set(ProcessOnceCell::ManagerShutDown, permit);
true
} else {
false
};
self.redo_process
.set(ProcessOnceCell::ManagerShutDown, permit);
// wait for ongoing requests to drain and the refcounts of all Arc<WalRedoProcess> that
// we ever launched to drop to zero, which when it happens synchronously kill()s & wait()s
// for the underlying process.
self.launched_processes.close().await;
it_was_us
}
/// This type doesn't have its own background task to check for idleness: we

View File

@@ -0,0 +1,7 @@
# This was captured from one shard of a large tenant in staging.
# It has a mixture of deltas and image layers, >1000 layers in total.
# This is suitable for general smoke tests that want an index which is not
# trivially small, but doesn't contain weird/pathological cases.

File diff suppressed because one or more lines are too long

View File

@@ -170,11 +170,6 @@ struct Args {
/// still needed for existing replication connection.
#[arg(long)]
walsenders_keep_horizon: bool,
/// Enable partial backup. If disabled, safekeeper will not upload partial
/// segments to remote storage.
/// TODO: now partial backup is always enabled, remove this flag.
#[arg(long)]
partial_backup_enabled: bool,
/// Controls how long backup will wait until uploading the partial segment.
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_PARTIAL_BACKUP_TIMEOUT, verbatim_doc_comment)]
partial_backup_timeout: Duration,
@@ -347,7 +342,6 @@ async fn main() -> anyhow::Result<()> {
sk_auth_token,
current_thread_runtime: args.current_thread_runtime,
walsenders_keep_horizon: args.walsenders_keep_horizon,
partial_backup_enabled: true,
partial_backup_timeout: args.partial_backup_timeout,
disable_periodic_broker_push: args.disable_periodic_broker_push,
enable_offload: args.enable_offload,

View File

@@ -21,6 +21,7 @@ pub mod json_ctrl;
pub mod metrics;
pub mod patch_control_file;
pub mod pull_timeline;
pub mod rate_limit;
pub mod receive_wal;
pub mod recovery;
pub mod remove_wal;
@@ -53,6 +54,7 @@ pub mod defaults {
pub const DEFAULT_PARTIAL_BACKUP_TIMEOUT: &str = "15m";
pub const DEFAULT_CONTROL_FILE_SAVE_INTERVAL: &str = "300s";
pub const DEFAULT_PARTIAL_BACKUP_CONCURRENCY: &str = "5";
pub const DEFAULT_EVICTION_CONCURRENCY: usize = 2;
// By default, our required residency before eviction is the same as the period that passes
// before uploading a partial segment, so that in normal operation the eviction can happen
@@ -91,7 +93,6 @@ pub struct SafeKeeperConf {
pub sk_auth_token: Option<SecretString>,
pub current_thread_runtime: bool,
pub walsenders_keep_horizon: bool,
pub partial_backup_enabled: bool,
pub partial_backup_timeout: Duration,
pub disable_periodic_broker_push: bool,
pub enable_offload: bool,
@@ -135,7 +136,6 @@ impl SafeKeeperConf {
max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES,
current_thread_runtime: false,
walsenders_keep_horizon: false,
partial_backup_enabled: false,
partial_backup_timeout: Duration::from_secs(0),
disable_periodic_broker_push: false,
enable_offload: false,

View File

@@ -0,0 +1,49 @@
use std::sync::Arc;
use rand::Rng;
use crate::metrics::MISC_OPERATION_SECONDS;
/// Global rate limiter for background tasks.
#[derive(Clone)]
pub struct RateLimiter {
partial_backup: Arc<tokio::sync::Semaphore>,
eviction: Arc<tokio::sync::Semaphore>,
}
impl RateLimiter {
/// Create a new rate limiter.
/// - `partial_backup_max`: maximum number of concurrent partial backups.
/// - `eviction_max`: maximum number of concurrent timeline evictions.
pub fn new(partial_backup_max: usize, eviction_max: usize) -> Self {
Self {
partial_backup: Arc::new(tokio::sync::Semaphore::new(partial_backup_max)),
eviction: Arc::new(tokio::sync::Semaphore::new(eviction_max)),
}
}
/// Get a permit for partial backup. This will block if the maximum number of concurrent
/// partial backups is reached.
pub async fn acquire_partial_backup(&self) -> tokio::sync::OwnedSemaphorePermit {
let _timer = MISC_OPERATION_SECONDS
.with_label_values(&["partial_permit_acquire"])
.start_timer();
self.partial_backup
.clone()
.acquire_owned()
.await
.expect("semaphore is closed")
}
/// Try to get a permit for timeline eviction. This will return None if the maximum number of
/// concurrent timeline evictions is reached.
pub fn try_acquire_eviction(&self) -> Option<tokio::sync::OwnedSemaphorePermit> {
self.eviction.clone().try_acquire_owned().ok()
}
}
/// Generate a random duration that is a fraction of the given duration.
pub fn rand_duration(duration: &std::time::Duration) -> std::time::Duration {
let randf64 = rand::thread_rng().gen_range(0.0..1.0);
duration.mul_f64(randf64)
}

View File

@@ -25,6 +25,7 @@ use utils::{
use storage_broker::proto::SafekeeperTimelineInfo;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use crate::rate_limit::RateLimiter;
use crate::receive_wal::WalReceivers;
use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, ServerInfo, Term, TermLsn,
@@ -36,7 +37,7 @@ use crate::timeline_guard::ResidenceGuard;
use crate::timeline_manager::{AtomicStatus, ManagerCtl};
use crate::timelines_set::TimelinesSet;
use crate::wal_backup::{self};
use crate::wal_backup_partial::{PartialRemoteSegment, RateLimiter};
use crate::wal_backup_partial::PartialRemoteSegment;
use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
use crate::metrics::{FullTimelineInfo, WalStorageMetrics, MISC_OPERATION_SECONDS};

View File

@@ -5,7 +5,6 @@
use anyhow::Context;
use camino::Utf8PathBuf;
use remote_storage::RemotePath;
use std::time::Instant;
use tokio::{
fs::File,
io::{AsyncRead, AsyncWriteExt},
@@ -15,6 +14,7 @@ use utils::crashsafe::durable_rename;
use crate::{
metrics::{EvictionEvent, EVICTION_EVENTS_COMPLETED, EVICTION_EVENTS_STARTED},
rate_limit::rand_duration,
timeline_manager::{Manager, StateSnapshot},
wal_backup,
wal_backup_partial::{self, PartialRemoteSegment},
@@ -50,7 +50,6 @@ impl Manager {
.flush_lsn
.segment_number(self.wal_seg_size)
== self.last_removed_segno + 1
&& self.resident_since.elapsed() >= self.conf.eviction_min_resident
}
/// Evict the timeline to remote storage.
@@ -112,7 +111,8 @@ impl Manager {
return;
}
self.resident_since = Instant::now();
self.evict_not_before =
tokio::time::Instant::now() + rand_duration(&self.conf.eviction_min_resident);
info!("successfully restored evicted timeline");
}

View File

@@ -23,6 +23,7 @@ use utils::lsn::Lsn;
use crate::{
control_file::{FileStorage, Storage},
metrics::{MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL, MISC_OPERATION_SECONDS},
rate_limit::{rand_duration, RateLimiter},
recovery::recovery_main,
remove_wal::calc_horizon_lsn,
safekeeper::Term,
@@ -32,7 +33,7 @@ use crate::{
timeline_guard::{AccessService, GuardId, ResidenceGuard},
timelines_set::{TimelineSetGuard, TimelinesSet},
wal_backup::{self, WalBackupTaskHandle},
wal_backup_partial::{self, PartialRemoteSegment, RateLimiter},
wal_backup_partial::{self, PartialRemoteSegment},
SafeKeeperConf,
};
@@ -185,11 +186,11 @@ pub(crate) struct Manager {
// misc
pub(crate) access_service: AccessService,
pub(crate) partial_backup_rate_limiter: RateLimiter,
pub(crate) global_rate_limiter: RateLimiter,
// Anti-flapping state: we evict timelines eagerly if they are inactive, but should not
// evict them if they go inactive very soon after being restored.
pub(crate) resident_since: std::time::Instant,
pub(crate) evict_not_before: Instant,
}
/// This task gets spawned alongside each timeline and is responsible for managing the timeline's
@@ -202,7 +203,7 @@ pub async fn main_task(
broker_active_set: Arc<TimelinesSet>,
manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
mut manager_rx: tokio::sync::mpsc::UnboundedReceiver<ManagerCtlMessage>,
partial_backup_rate_limiter: RateLimiter,
global_rate_limiter: RateLimiter,
) {
tli.set_status(Status::Started);
@@ -220,7 +221,7 @@ pub async fn main_task(
conf,
broker_active_set,
manager_tx,
partial_backup_rate_limiter,
global_rate_limiter,
)
.await;
@@ -254,9 +255,29 @@ pub async fn main_task(
mgr.set_status(Status::UpdatePartialBackup);
mgr.update_partial_backup(&state_snapshot).await;
if mgr.conf.enable_offload && mgr.ready_for_eviction(&next_event, &state_snapshot) {
mgr.set_status(Status::EvictTimeline);
mgr.evict_timeline().await;
let now = Instant::now();
if mgr.evict_not_before > now {
// we should wait until evict_not_before
update_next_event(&mut next_event, mgr.evict_not_before);
}
if mgr.conf.enable_offload
&& mgr.evict_not_before <= now
&& mgr.ready_for_eviction(&next_event, &state_snapshot)
{
// check rate limiter and evict timeline if possible
match mgr.global_rate_limiter.try_acquire_eviction() {
Some(_permit) => {
mgr.set_status(Status::EvictTimeline);
mgr.evict_timeline().await;
}
None => {
// we can't evict timeline now, will try again later
mgr.evict_not_before =
Instant::now() + rand_duration(&mgr.conf.eviction_min_resident);
update_next_event(&mut next_event, mgr.evict_not_before);
}
}
}
}
@@ -334,11 +355,10 @@ impl Manager {
conf: SafeKeeperConf,
broker_active_set: Arc<TimelinesSet>,
manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
partial_backup_rate_limiter: RateLimiter,
global_rate_limiter: RateLimiter,
) -> Manager {
let (is_offloaded, partial_backup_uploaded) = tli.bootstrap_mgr().await;
Manager {
conf,
wal_seg_size: tli.get_wal_seg_size().await,
walsenders: tli.get_walsenders().clone(),
state_version_rx: tli.get_state_version_rx(),
@@ -353,8 +373,10 @@ impl Manager {
partial_backup_uploaded,
access_service: AccessService::new(manager_tx),
tli,
partial_backup_rate_limiter,
resident_since: std::time::Instant::now(),
global_rate_limiter,
// to smooth out evictions spike after restart
evict_not_before: Instant::now() + rand_duration(&conf.eviction_min_resident),
conf,
}
}
@@ -522,8 +544,8 @@ impl Manager {
/// Spawns partial WAL backup task if needed.
async fn update_partial_backup(&mut self, state: &StateSnapshot) {
// check if partial backup is enabled and should be started
if !self.conf.is_wal_backup_enabled() || !self.conf.partial_backup_enabled {
// check if WAL backup is enabled and should be started
if !self.conf.is_wal_backup_enabled() {
return;
}
@@ -541,7 +563,7 @@ impl Manager {
self.partial_backup_task = Some(tokio::spawn(wal_backup_partial::main_task(
self.wal_resident_timeline(),
self.conf.clone(),
self.partial_backup_rate_limiter.clone(),
self.global_rate_limiter.clone(),
)));
}

View File

@@ -2,10 +2,11 @@
//! All timelines should always be present in this map, this is done by loading them
//! all from the disk on startup and keeping them in memory.
use crate::defaults::DEFAULT_EVICTION_CONCURRENCY;
use crate::rate_limit::RateLimiter;
use crate::safekeeper::ServerInfo;
use crate::timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError};
use crate::timelines_set::TimelinesSet;
use crate::wal_backup_partial::RateLimiter;
use crate::SafeKeeperConf;
use anyhow::{bail, Context, Result};
use camino::Utf8PathBuf;
@@ -31,7 +32,7 @@ struct GlobalTimelinesState {
conf: Option<SafeKeeperConf>,
broker_active_set: Arc<TimelinesSet>,
load_lock: Arc<tokio::sync::Mutex<TimelineLoadLock>>,
partial_backup_rate_limiter: RateLimiter,
global_rate_limiter: RateLimiter,
}
// Used to prevent concurrent timeline loading.
@@ -50,7 +51,7 @@ impl GlobalTimelinesState {
(
self.get_conf().clone(),
self.broker_active_set.clone(),
self.partial_backup_rate_limiter.clone(),
self.global_rate_limiter.clone(),
)
}
@@ -85,7 +86,7 @@ static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
conf: None,
broker_active_set: Arc::new(TimelinesSet::default()),
load_lock: Arc::new(tokio::sync::Mutex::new(TimelineLoadLock)),
partial_backup_rate_limiter: RateLimiter::new(1),
global_rate_limiter: RateLimiter::new(1, 1),
})
});
@@ -99,7 +100,10 @@ impl GlobalTimelines {
// lock, so use explicit block
let tenants_dir = {
let mut state = TIMELINES_STATE.lock().unwrap();
state.partial_backup_rate_limiter = RateLimiter::new(conf.partial_backup_concurrency);
state.global_rate_limiter = RateLimiter::new(
conf.partial_backup_concurrency,
DEFAULT_EVICTION_CONCURRENCY,
);
state.conf = Some(conf);
// Iterate through all directories and load tenants for all directories

View File

@@ -18,8 +18,6 @@
//! This way control file stores information about all potentially existing
//! remote partial segments and can clean them up after uploading a newer version.
use std::sync::Arc;
use camino::Utf8PathBuf;
use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
use remote_storage::RemotePath;
@@ -30,6 +28,7 @@ use utils::lsn::Lsn;
use crate::{
metrics::{MISC_OPERATION_SECONDS, PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS},
rate_limit::{rand_duration, RateLimiter},
safekeeper::Term,
timeline::WalResidentTimeline,
timeline_manager::StateSnapshot,
@@ -37,30 +36,6 @@ use crate::{
SafeKeeperConf,
};
#[derive(Clone)]
pub struct RateLimiter {
semaphore: Arc<tokio::sync::Semaphore>,
}
impl RateLimiter {
pub fn new(permits: usize) -> Self {
Self {
semaphore: Arc::new(tokio::sync::Semaphore::new(permits)),
}
}
async fn acquire_owned(&self) -> tokio::sync::OwnedSemaphorePermit {
let _timer = MISC_OPERATION_SECONDS
.with_label_values(&["partial_permit_acquire"])
.start_timer();
self.semaphore
.clone()
.acquire_owned()
.await
.expect("semaphore is closed")
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum UploadStatus {
/// Upload is in progress. This status should be used only for garbage collection,
@@ -352,6 +327,7 @@ pub async fn main_task(
) -> Option<PartialRemoteSegment> {
debug!("started");
let await_duration = conf.partial_backup_timeout;
let mut first_iteration = true;
let (_, persistent_state) = tli.get_state().await;
let mut commit_lsn_rx = tli.get_commit_lsn_watch_rx();
@@ -419,6 +395,15 @@ pub async fn main_task(
}
}
// smoothing the load after restart, by sleeping for a random time.
// if this is not the first iteration, we will wait for the full await_duration
let await_duration = if first_iteration {
first_iteration = false;
rand_duration(&await_duration)
} else {
await_duration
};
// fixing the segno and waiting some time to prevent reuploading the same segment too often
let pending_segno = backup.segno(flush_lsn_rx.borrow().lsn);
let timeout = tokio::time::sleep(await_duration);
@@ -454,7 +439,7 @@ pub async fn main_task(
}
// limit concurrent uploads
let _upload_permit = limiter.acquire_owned().await;
let _upload_permit = limiter.acquire_partial_backup().await;
let prepared = backup.prepare_upload().await;
if let Some(seg) = &uploaded_segment {

View File

@@ -181,7 +181,6 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
sk_auth_token: None,
current_thread_runtime: false,
walsenders_keep_horizon: false,
partial_backup_enabled: false,
partial_backup_timeout: Duration::from_secs(0),
disable_periodic_broker_push: false,
enable_offload: false,

View File

@@ -67,6 +67,7 @@ FALLBACK_DURATION = {
"test_runner/performance/test_copy.py::test_copy[neon]": 13.817,
"test_runner/performance/test_copy.py::test_copy[vanilla]": 11.736,
"test_runner/performance/test_gc_feedback.py::test_gc_feedback": 575.735,
"test_runner/performance/test_gc_feedback.py::test_gc_feedback_with_snapshots": 575.735,
"test_runner/performance/test_gist_build.py::test_gist_buffering_build[neon]": 14.868,
"test_runner/performance/test_gist_build.py::test_gist_buffering_build[vanilla]": 14.393,
"test_runner/performance/test_latency.py::test_measure_read_latency_heavy_write_workload[neon-1]": 20.588,

View File

@@ -32,6 +32,7 @@ once_cell.workspace = true
pageserver_api.workspace = true
pageserver_client.workspace = true
postgres_connection.workspace = true
rand.workspace = true
reqwest = { workspace = true, features = ["stream"] }
routerify.workspace = true
serde.workspace = true
@@ -53,7 +54,6 @@ diesel = { version = "2.1.4", features = [
] }
diesel_migrations = { version = "2.1.0" }
r2d2 = { version = "0.8.10" }
dns-lookup = { version = "2.0.4" }
utils = { path = "../libs/utils/" }
metrics = { path = "../libs/metrics/" }

View File

@@ -1 +0,0 @@
DROP TABLE leader;

View File

@@ -1,6 +0,0 @@
CREATE TABLE leader (
hostname VARCHAR NOT NULL,
port INTEGER NOT NULL,
started_at TIMESTAMPTZ NOT NULL,
PRIMARY KEY(hostname, port, started_at)
);

View File

@@ -10,7 +10,6 @@ mod id_lock_map;
pub mod metrics;
mod node;
mod pageserver_client;
mod peer_client;
pub mod persistence;
mod reconciler;
mod scheduler;

View File

@@ -9,12 +9,14 @@ use std::time::Duration;
use storage_controller::http::make_router;
use storage_controller::metrics::preinitialize_metrics;
use storage_controller::persistence::Persistence;
use storage_controller::service::chaos_injector::ChaosInjector;
use storage_controller::service::{
Config, Service, MAX_OFFLINE_INTERVAL_DEFAULT, MAX_WARMING_UP_INTERVAL_DEFAULT,
RECONCILER_CONCURRENCY_DEFAULT,
};
use tokio::signal::unix::SignalKind;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use utils::auth::{JwtAuth, SwappableJwtAuth};
use utils::logging::{self, LogFormat};
@@ -81,14 +83,15 @@ struct Cli {
#[arg(long, default_value = "5s")]
db_connect_timeout: humantime::Duration,
#[arg(long, default_value = "false")]
start_as_candidate: bool,
/// `neon_local` sets this to the path of the neon_local repo dir.
/// Only relevant for testing.
// TODO: make `cfg(feature = "testing")`
#[arg(long)]
neon_local_repo_dir: Option<PathBuf>,
/// Chaos testing
#[arg(long)]
chaos_interval: Option<humantime::Duration>,
}
enum StrictMode {
@@ -276,8 +279,6 @@ async fn async_main() -> anyhow::Result<()> {
.unwrap_or(RECONCILER_CONCURRENCY_DEFAULT),
split_threshold: args.split_threshold,
neon_local_repo_dir: args.neon_local_repo_dir,
start_as_candidate: args.start_as_candidate,
http_service_port: args.listen.port() as i32,
};
// After loading secrets & config, but before starting anything else, apply database migrations
@@ -314,6 +315,22 @@ async fn async_main() -> anyhow::Result<()> {
tracing::info!("Serving on {0}", args.listen);
let server_task = tokio::task::spawn(server);
let chaos_task = args.chaos_interval.map(|interval| {
let service = service.clone();
let cancel = CancellationToken::new();
let cancel_bg = cancel.clone();
(
tokio::task::spawn(
async move {
let mut chaos_injector = ChaosInjector::new(service, interval.into());
chaos_injector.run(cancel_bg).await
}
.instrument(tracing::info_span!("chaos_injector")),
),
cancel,
)
});
// Wait until we receive a signal
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?;
let mut sigquit = tokio::signal::unix::signal(SignalKind::quit())?;
@@ -342,6 +359,12 @@ async fn async_main() -> anyhow::Result<()> {
}
}
// If we were injecting chaos, stop that so that we're not calling into Service while it shuts down
if let Some((chaos_jh, chaos_cancel)) = chaos_task {
chaos_cancel.cancel();
chaos_jh.await.ok();
}
service.shutdown().await;
tracing::info!("Service shutdown complete");

View File

@@ -1,104 +0,0 @@
use crate::tenant_shard::ObservedState;
use pageserver_api::shard::TenantShardId;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tokio_util::sync::CancellationToken;
use reqwest::{StatusCode, Url};
use utils::{backoff, http::error::HttpErrorBody};
#[derive(Debug, Clone)]
pub(crate) struct PeerClient {
hostname: String,
port: i32,
jwt: Option<String>,
client: reqwest::Client,
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum StorageControllerPeerError {
#[error("failed to deserialize error response with status code {0} at {1}: {2}")]
DeserializationError(StatusCode, Url, reqwest::Error),
#[error("storage controller peer API error ({0}): {1}")]
ApiError(StatusCode, String),
#[error("failed to send HTTP request: {0}")]
SendError(reqwest::Error),
#[error("Cancelled")]
Cancelled,
}
pub(crate) type Result<T> = std::result::Result<T, StorageControllerPeerError>;
pub(crate) trait ResponseErrorMessageExt: Sized {
fn error_from_body(self) -> impl std::future::Future<Output = Result<Self>> + Send;
}
impl ResponseErrorMessageExt for reqwest::Response {
async fn error_from_body(self) -> Result<Self> {
let status = self.status();
if !(status.is_client_error() || status.is_server_error()) {
return Ok(self);
}
let url = self.url().to_owned();
Err(match self.json::<HttpErrorBody>().await {
Ok(HttpErrorBody { msg }) => StorageControllerPeerError::ApiError(status, msg),
Err(err) => StorageControllerPeerError::DeserializationError(status, url, err),
})
}
}
#[derive(Serialize, Deserialize, Debug, Default)]
pub(crate) struct GlobalObservedState(pub(crate) HashMap<TenantShardId, ObservedState>);
impl PeerClient {
pub(crate) fn new(hostname: String, port: i32, jwt: Option<String>) -> Self {
Self {
hostname,
port,
jwt,
client: reqwest::Client::new(),
}
}
async fn request_step_down(&self) -> Result<GlobalObservedState> {
let uri = format!("{}:{}/control/v1/step_down", self.hostname, self.port);
let req = self.client.put(uri);
let req = if let Some(jwt) = &self.jwt {
req.header(reqwest::header::AUTHORIZATION, format!("Bearer {jwt}"))
} else {
req
};
let res = req
.send()
.await
.map_err(StorageControllerPeerError::SendError)?;
let response = res.error_from_body().await?;
let status = response.status();
let url = response.url().to_owned();
response
.json()
.await
.map_err(|err| StorageControllerPeerError::DeserializationError(status, url, err))
}
pub(crate) async fn step_down(
&self,
cancel: &CancellationToken,
) -> Result<GlobalObservedState> {
backoff::retry(
|| self.request_step_down(),
|_e| false,
4,
8,
"Send step down request",
cancel,
)
.await
.ok_or_else(|| StorageControllerPeerError::Cancelled)
.and_then(|x| x)
}
}

View File

@@ -95,8 +95,6 @@ pub(crate) enum DatabaseOperation {
ListMetadataHealth,
ListMetadataHealthUnhealthy,
ListMetadataHealthOutdated,
GetLeader,
UpdateLeader,
}
#[must_use]
@@ -787,71 +785,6 @@ impl Persistence {
)
.await
}
/// Get the current entry from the `leader` table if one exists.
/// It is an error for the table to contain more than one entry.
pub(crate) async fn get_leader(&self) -> DatabaseResult<Option<LeaderPersistence>> {
let mut leader: Vec<LeaderPersistence> = self
.with_measured_conn(
DatabaseOperation::GetLeader,
move |conn| -> DatabaseResult<_> {
Ok(crate::schema::leader::table.load::<LeaderPersistence>(conn)?)
},
)
.await?;
if leader.len() > 1 {
return Err(DatabaseError::Logical(format!(
"More than one entry present in the leader table: {leader:?}"
)));
}
Ok(leader.pop())
}
/// Update the new leader with compare-exchange semantics. If `prev` does not
/// match the current leader entry, then the update is treated as a failure.
/// When `prev` is not specified, the update is forced.
pub(crate) async fn update_leader(
&self,
prev: Option<LeaderPersistence>,
new: LeaderPersistence,
) -> DatabaseResult<()> {
use crate::schema::leader::dsl::*;
let updated = self
.with_measured_conn(
DatabaseOperation::UpdateLeader,
move |conn| -> DatabaseResult<usize> {
let updated = match &prev {
Some(prev) => diesel::update(leader)
.filter(hostname.eq(prev.hostname.clone()))
.filter(port.eq(prev.port))
.filter(started_at.eq(prev.started_at))
.set((
hostname.eq(new.hostname.clone()),
port.eq(new.port),
started_at.eq(new.started_at),
))
.execute(conn)?,
None => diesel::insert_into(leader)
.values(new.clone())
.execute(conn)?,
};
Ok(updated)
},
)
.await?;
if updated == 0 {
return Err(DatabaseError::Logical(
"Leader table update failed".to_string(),
));
}
Ok(())
}
}
/// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably
@@ -977,13 +910,3 @@ impl From<MetadataHealthPersistence> for MetadataHealthRecord {
}
}
}
#[derive(
Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq, Debug, Clone,
)]
#[diesel(table_name = crate::schema::leader)]
pub(crate) struct LeaderPersistence {
pub(crate) hostname: String,
pub(crate) port: i32,
pub(crate) started_at: chrono::DateTime<chrono::Utc>,
}

View File

@@ -656,11 +656,8 @@ impl Reconciler {
// reconcile this location. This includes locations with different configurations, as well
// as locations with unknown (None) observed state.
// The general case is to increment the generation. However, there are cases
// where this is not necessary:
// - if we are only updating the TenantConf part of the location
// - if we are only changing the attachment mode (e.g. going to attachedmulti or attachedstale)
// and the location was already in the correct generation
// Incrementing generation is the safe general case, but is inefficient for changes that only
// modify some details (e.g. the tenant's config).
let increment_generation = match observed {
None => true,
Some(ObservedStateLocation { conf: None }) => true,
@@ -669,18 +666,11 @@ impl Reconciler {
}) => {
let generations_match = observed.generation == wanted_conf.generation;
use LocationConfigMode::*;
let mode_transition_requires_gen_inc =
match (observed.mode, wanted_conf.mode) {
// Usually the short-lived attachment modes (multi and stale) are only used
// in the case of [`Self::live_migrate`], but it is simple to handle them correctly
// here too. Locations are allowed to go Single->Stale and Multi->Single within the same generation.
(AttachedSingle, AttachedStale) => false,
(AttachedMulti, AttachedSingle) => false,
(lhs, rhs) => lhs != rhs,
};
!generations_match || mode_transition_requires_gen_inc
// We may skip incrementing the generation if the location is already in the expected mode and
// generation. In principle it would also be safe to skip from certain other modes (e.g. AttachedStale),
// but such states are handled inside `live_migrate`, and if we see that state here we're cleaning up
// after a restart/crash, so fall back to the universally safe path of incrementing generation.
!generations_match || (observed.mode != wanted_conf.mode)
}
};

View File

@@ -1,13 +1,5 @@
// @generated automatically by Diesel CLI.
diesel::table! {
leader (hostname, port, started_at) {
hostname -> Varchar,
port -> Int4,
started_at -> Timestamptz,
}
}
diesel::table! {
metadata_health (tenant_id, shard_number, shard_count) {
tenant_id -> Varchar,
@@ -44,4 +36,4 @@ diesel::table! {
}
}
diesel::allow_tables_to_appear_in_same_query!(leader, metadata_health, nodes, tenant_shards,);
diesel::allow_tables_to_appear_in_same_query!(metadata_health, nodes, tenant_shards,);

View File

@@ -16,10 +16,7 @@ use crate::{
compute_hook::NotifyError,
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, TracingExclusiveGuard},
metrics::LeadershipStatusGroup,
peer_client::{GlobalObservedState, PeerClient},
persistence::{
AbortShardSplitStatus, LeaderPersistence, MetadataHealthPersistence, TenantFilter,
},
persistence::{AbortShardSplitStatus, MetadataHealthPersistence, TenantFilter},
reconciler::{ReconcileError, ReconcileUnits},
scheduler::{MaySchedule, ScheduleContext, ScheduleMode},
tenant_shard::{
@@ -85,6 +82,9 @@ use crate::{
ReconcilerWaiter, TenantShard,
},
};
use serde::{Deserialize, Serialize};
pub mod chaos_injector;
// For operations that should be quick, like attaching a new tenant
const SHORT_RECONCILE_TIMEOUT: Duration = Duration::from_secs(5);
@@ -225,7 +225,6 @@ impl ServiceState {
tenants: BTreeMap<TenantShardId, TenantShard>,
scheduler: Scheduler,
delayed_reconcile_rx: tokio::sync::mpsc::Receiver<TenantShardId>,
initial_leadership_status: LeadershipStatus,
) -> Self {
let status = &crate::metrics::METRICS_REGISTRY
.metrics_group
@@ -233,13 +232,15 @@ impl ServiceState {
status.set(
LeadershipStatusGroup {
status: initial_leadership_status,
status: LeadershipStatus::Leader,
},
1,
);
Self {
leadership_status: initial_leadership_status,
// TODO: Starting up as Leader is a transient state. Once we enable rolling
// upgrades on the k8s side, we should start up as Candidate.
leadership_status: LeadershipStatus::Leader,
tenants,
nodes: Arc::new(nodes),
scheduler,
@@ -288,33 +289,6 @@ impl ServiceState {
0,
);
}
fn become_leader(&mut self) {
self.leadership_status = LeadershipStatus::Leader;
let status = &crate::metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_leadership_status;
status.set(
LeadershipStatusGroup {
status: LeadershipStatus::Leader,
},
1,
);
status.set(
LeadershipStatusGroup {
status: LeadershipStatus::SteppedDown,
},
0,
);
status.set(
LeadershipStatusGroup {
status: LeadershipStatus::Candidate,
},
0,
);
}
}
#[derive(Clone)]
@@ -351,10 +325,6 @@ pub struct Config {
// TODO: make this cfg(feature = "testing")
pub neon_local_repo_dir: Option<PathBuf>,
pub start_as_candidate: bool,
pub http_service_port: i32,
}
impl From<DatabaseError> for ApiError {
@@ -522,10 +492,9 @@ pub(crate) enum ReconcileResultRequest {
Stop,
}
struct LeaderStepDownState {
observed: GlobalObservedState,
leader: LeaderPersistence,
}
// TODO: move this into the storcon peer client when that gets added
#[derive(Serialize, Deserialize, Debug, Default)]
pub(crate) struct GlobalObservedState(HashMap<TenantShardId, ObservedState>);
impl Service {
pub fn get_config(&self) -> &Config {
@@ -537,11 +506,15 @@ impl Service {
#[instrument(skip_all)]
async fn startup_reconcile(
self: &Arc<Service>,
leader_step_down_state: Option<LeaderStepDownState>,
bg_compute_notify_result_tx: tokio::sync::mpsc::Sender<
Result<(), (TenantShardId, NotifyError)>,
>,
) {
// For all tenant shards, a vector of observed states on nodes (where None means
// indeterminate, same as in [`ObservedStateLocation`])
let mut observed: HashMap<TenantShardId, Vec<(NodeId, Option<LocationConfig>)>> =
HashMap::new();
// Startup reconciliation does I/O to other services: whether they
// are responsive or not, we should aim to finish within our deadline, because:
// - If we don't, a k8s readiness hook watching /ready will kill us.
@@ -555,29 +528,26 @@ impl Service {
.checked_add(STARTUP_RECONCILE_TIMEOUT / 2)
.expect("Reconcile timeout is a modest constant");
let (observed, current_leader) = if let Some(state) = leader_step_down_state {
tracing::info!(
"Using observed received from leader at {}:{}",
state.leader.hostname,
state.leader.port
);
(state.observed, Some(state.leader))
} else {
(
self.build_global_observed_state(node_scan_deadline).await,
None,
)
};
// Accumulate a list of any tenant locations that ought to be detached
let mut cleanup = Vec::new();
// Send initial heartbeat requests to all nodes loaded from the database
let all_nodes = {
let locked = self.inner.read().unwrap();
locked.nodes.clone()
};
let nodes_online = self.initial_heartbeat_round(all_nodes.keys()).await;
let node_listings = self.scan_node_locations(node_scan_deadline).await;
// Send initial heartbeat requests to nodes that replied to the location listing above.
let nodes_online = self.initial_heartbeat_round(node_listings.keys()).await;
for (node_id, list_response) in node_listings {
let tenant_shards = list_response.tenant_shards;
tracing::info!(
"Received {} shard statuses from pageserver {}, setting it to Active",
tenant_shards.len(),
node_id
);
for (tenant_shard_id, conf_opt) in tenant_shards {
let shard_observations = observed.entry(tenant_shard_id).or_default();
shard_observations.push((node_id, conf_opt));
}
}
// List of tenants for which we will attempt to notify compute of their location at startup
let mut compute_notifications = Vec::new();
@@ -600,16 +570,17 @@ impl Service {
}
*nodes = Arc::new(new_nodes);
for (tenant_shard_id, observed_state) in observed.0 {
let Some(tenant_shard) = tenants.get_mut(&tenant_shard_id) else {
for node_id in observed_state.locations.keys() {
cleanup.push((tenant_shard_id, *node_id));
}
continue;
};
tenant_shard.observed = observed_state;
for (tenant_shard_id, shard_observations) in observed {
for (node_id, observed_loc) in shard_observations {
let Some(tenant_shard) = tenants.get_mut(&tenant_shard_id) else {
cleanup.push((tenant_shard_id, node_id));
continue;
};
tenant_shard
.observed
.locations
.insert(node_id, ObservedStateLocation { conf: observed_loc });
}
}
// Populate each tenant's intent state
@@ -643,22 +614,6 @@ impl Service {
tenants.len()
};
// Before making any obeservable changes to the cluster, persist self
// as leader in database and memory.
let proposed_leader = self.get_proposed_leader_info();
if let Err(err) = self
.persistence
.update_leader(current_leader, proposed_leader)
.await
{
tracing::error!("Failed to persist self as leader: {err}. Aborting start-up ...");
std::process::exit(1);
}
self.inner.write().unwrap().become_leader();
// TODO: if any tenant's intent now differs from its loaded generation_pageserver, we should clear that
// generation_pageserver in the database.
@@ -824,31 +779,6 @@ impl Service {
node_results
}
async fn build_global_observed_state(&self, deadline: Instant) -> GlobalObservedState {
let node_listings = self.scan_node_locations(deadline).await;
let mut observed = GlobalObservedState::default();
for (node_id, location_confs) in node_listings {
tracing::info!(
"Received {} shard statuses from pageserver {}",
location_confs.tenant_shards.len(),
node_id
);
for (tid, location_conf) in location_confs.tenant_shards {
let entry = observed.0.entry(tid).or_default();
entry.locations.insert(
node_id,
ObservedStateLocation {
conf: location_conf,
},
);
}
}
observed
}
/// Used during [`Self::startup_reconcile`]: detach a list of unknown-to-us tenants from pageservers.
///
/// This is safe to run in the background, because if we don't have this TenantShardId in our map of
@@ -1327,20 +1257,12 @@ impl Service {
config.max_warming_up_interval,
cancel.clone(),
);
let initial_leadership_status = if config.start_as_candidate {
LeadershipStatus::Candidate
} else {
LeadershipStatus::Leader
};
let this = Arc::new(Self {
inner: Arc::new(std::sync::RwLock::new(ServiceState::new(
nodes,
tenants,
scheduler,
delayed_reconcile_rx,
initial_leadership_status,
))),
config: config.clone(),
persistence,
@@ -1409,16 +1331,7 @@ impl Service {
return;
};
let leadership_status = this.inner.read().unwrap().get_leadership_status();
let peer_observed_state = match leadership_status {
LeadershipStatus::Candidate => this.request_step_down().await,
LeadershipStatus::Leader => None,
LeadershipStatus::SteppedDown => unreachable!(),
};
this.startup_reconcile(peer_observed_state, bg_compute_notify_result_tx)
.await;
this.startup_reconcile(bg_compute_notify_result_tx).await;
drop(startup_completion);
}
});
@@ -6268,88 +6181,4 @@ impl Service {
global_observed
}
/// Collect the details for the current proccess wishing to become the storage controller
/// leader.
///
/// On failures to discover and resolve the hostname the process is killed and we rely on k8s to retry.
fn get_proposed_leader_info(&self) -> LeaderPersistence {
let hostname = match dns_lookup::get_hostname() {
Ok(name) => name,
Err(err) => {
tracing::error!("Failed to discover hostname: {err}. Aborting start-up ...");
std::process::exit(1);
}
};
let mut addrs = match dns_lookup::lookup_host(&hostname) {
Ok(addrs) => addrs,
Err(err) => {
tracing::error!("Failed to resolve hostname: {err}. Aborting start-up ...");
std::process::exit(1);
}
};
let addr = addrs
.pop()
.expect("k8s configured hostname always resolves");
let proposed = LeaderPersistence {
hostname: addr.to_string(),
port: self.get_config().http_service_port,
started_at: chrono::Utc::now(),
};
tracing::info!("Proposed leader details are: {proposed:?}");
proposed
}
/// Request step down from the currently registered leader in the database
///
/// If such an entry is persisted, the success path returns the observed
/// state and details of the leader. Otherwise, None is returned indicating
/// there is no leader currently.
///
/// On failures to query the database or step down error responses the process is killed
/// and we rely on k8s to retry.
async fn request_step_down(&self) -> Option<LeaderStepDownState> {
let leader = match self.persistence.get_leader().await {
Ok(leader) => leader,
Err(err) => {
tracing::error!(
"Failed to query database for current leader: {err}. Aborting start-up ..."
);
std::process::exit(1);
}
};
match leader {
Some(leader) => {
// TODO: jwt token
let client = PeerClient::new(
leader.hostname.to_owned(),
leader.port,
self.config.jwt_token.clone(),
);
let state = client.step_down(&self.cancel).await;
match state {
Ok(state) => Some(LeaderStepDownState {
observed: state,
leader: leader.clone(),
}),
Err(err) => {
tracing::error!(
"Leader ({}:{}) did not respond to step-down request: {}",
leader.hostname,
leader.port,
err
);
None
}
}
}
None => None,
}
}
}

View File

@@ -0,0 +1,71 @@
use std::{sync::Arc, time::Duration};
use rand::seq::SliceRandom;
use rand::thread_rng;
use tokio_util::sync::CancellationToken;
use super::Service;
pub struct ChaosInjector {
service: Arc<Service>,
interval: Duration,
}
impl ChaosInjector {
pub fn new(service: Arc<Service>, interval: Duration) -> Self {
Self { service, interval }
}
pub async fn run(&mut self, cancel: CancellationToken) {
let mut interval = tokio::time::interval(self.interval);
loop {
tokio::select! {
_ = interval.tick() => {}
_ = cancel.cancelled() => {
tracing::info!("Shutting down");
return;
}
}
self.inject_chaos().await;
tracing::info!("Chaos iteration...");
}
}
async fn inject_chaos(&mut self) {
// Pick some shards to interfere with
let batch_size = 128;
let mut inner = self.service.inner.write().unwrap();
let (nodes, tenants, scheduler) = inner.parts_mut();
let tenant_ids = tenants.keys().cloned().collect::<Vec<_>>();
let victims = tenant_ids.choose_multiple(&mut thread_rng(), batch_size);
for victim in victims {
let shard = tenants
.get_mut(victim)
.expect("Held lock between choosing ID and this get");
// Pick a secondary to promote
let Some(new_location) = shard
.intent
.get_secondary()
.choose(&mut thread_rng())
.cloned()
else {
tracing::info!("Skipping shard {victim}: no secondary location, can't migrate");
continue;
};
let Some(old_location) = *shard.intent.get_attached() else {
tracing::info!("Skipping shard {victim}: currently has no attached location");
continue;
};
shard.intent.demote_attached(scheduler, old_location);
shard.intent.promote_attached(scheduler, new_location);
self.service.maybe_reconcile_shard(shard, nodes);
}
}
}

View File

@@ -10,6 +10,7 @@ aws-smithy-async.workspace = true
either.workspace = true
tokio-rustls.workspace = true
anyhow.workspace = true
git-version.workspace = true
hex.workspace = true
humantime.workspace = true
thiserror.workspace = true

View File

@@ -19,8 +19,8 @@ use utils::id::TenantId;
use crate::{
cloud_admin_api::{CloudAdminApiClient, MaybeDeleted, ProjectData},
init_remote, init_remote_generic, list_objects_with_retries,
metadata_stream::{stream_tenant_timelines, stream_tenants},
init_remote_generic, list_objects_with_retries_generic,
metadata_stream::{stream_tenant_timelines_generic, stream_tenants_generic},
BucketConfig, ConsoleConfig, NodeKind, TenantShardTimelineId, TraversingDepth,
};
@@ -153,7 +153,7 @@ async fn find_garbage_inner(
node_kind: NodeKind,
) -> anyhow::Result<GarbageList> {
// Construct clients for S3 and for Console API
let (s3_client, target) = init_remote(bucket_config.clone(), node_kind).await?;
let (remote_client, target) = init_remote_generic(bucket_config.clone(), node_kind).await?;
let cloud_admin_api_client = Arc::new(CloudAdminApiClient::new(console_config));
// Build a set of console-known tenants, for quickly eliminating known-active tenants without having
@@ -179,7 +179,7 @@ async fn find_garbage_inner(
// Enumerate Tenants in S3, and check if each one exists in Console
tracing::info!("Finding all tenants in bucket {}...", bucket_config.bucket);
let tenants = stream_tenants(&s3_client, &target);
let tenants = stream_tenants_generic(&remote_client, &target);
let tenants_checked = tenants.map_ok(|t| {
let api_client = cloud_admin_api_client.clone();
let console_cache = console_cache.clone();
@@ -237,25 +237,26 @@ async fn find_garbage_inner(
// Special case: If it's missing in console, check for known bugs that would enable us to conclusively
// identify it as purge-able anyway
if console_result.is_none() {
let timelines = stream_tenant_timelines(&s3_client, &target, tenant_shard_id)
.await?
.collect::<Vec<_>>()
.await;
let timelines =
stream_tenant_timelines_generic(&remote_client, &target, tenant_shard_id)
.await?
.collect::<Vec<_>>()
.await;
if timelines.is_empty() {
// No timelines, but a heatmap: the deletion bug where we deleted everything but heatmaps
let tenant_objects = list_objects_with_retries(
&s3_client,
let tenant_objects = list_objects_with_retries_generic(
&remote_client,
ListingMode::WithDelimiter,
&target.tenant_root(&tenant_shard_id),
None,
)
.await?;
let object = tenant_objects.contents.as_ref().unwrap().first().unwrap();
if object.key.as_ref().unwrap().ends_with("heatmap-v1.json") {
let object = tenant_objects.keys.first().unwrap();
if object.key.get_path().as_str().ends_with("heatmap-v1.json") {
tracing::info!("Tenant {tenant_shard_id}: is missing in console and is only a heatmap (known historic deletion bug)");
garbage.append_buggy(GarbageEntity::Tenant(tenant_shard_id));
continue;
} else {
tracing::info!("Tenant {tenant_shard_id} is missing in console and contains one object: {}", object.key.as_ref().unwrap());
tracing::info!("Tenant {tenant_shard_id} is missing in console and contains one object: {}", object.key);
}
} else {
// A console-unknown tenant with timelines: check if these timelines only contain initdb.tar.zst, from the initial
@@ -264,24 +265,18 @@ async fn find_garbage_inner(
for timeline_r in timelines {
let timeline = timeline_r?;
let timeline_objects = list_objects_with_retries(
&s3_client,
let timeline_objects = list_objects_with_retries_generic(
&remote_client,
ListingMode::WithDelimiter,
&target.timeline_root(&timeline),
None,
)
.await?;
if timeline_objects
.common_prefixes
.as_ref()
.map(|v| v.len())
.unwrap_or(0)
> 0
{
if !timeline_objects.prefixes.is_empty() {
// Sub-paths? Unexpected
any_non_initdb = true;
} else {
let object = timeline_objects.contents.as_ref().unwrap().first().unwrap();
if object.key.as_ref().unwrap().ends_with("initdb.tar.zst") {
let object = timeline_objects.keys.first().unwrap();
if object.key.get_path().as_str().ends_with("initdb.tar.zst") {
tracing::info!("Timeline {timeline} contains only initdb.tar.zst");
} else {
any_non_initdb = true;
@@ -336,7 +331,8 @@ async fn find_garbage_inner(
// Construct a stream of all timelines within active tenants
let active_tenants = tokio_stream::iter(active_tenants.iter().map(Ok));
let timelines = active_tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, *t));
let timelines =
active_tenants.map_ok(|t| stream_tenant_timelines_generic(&remote_client, &target, *t));
let timelines = timelines.try_buffer_unordered(S3_CONCURRENCY);
let timelines = timelines.try_flatten();

View File

@@ -16,6 +16,7 @@ use std::sync::Arc;
use std::time::Duration;
use anyhow::{anyhow, Context};
use aws_config::retry::{RetryConfigBuilder, RetryMode};
use aws_sdk_s3::config::Region;
use aws_sdk_s3::error::DisplayErrorContext;
use aws_sdk_s3::Client;
@@ -314,8 +315,15 @@ pub fn init_logging(file_name: &str) -> Option<WorkerGuard> {
}
async fn init_s3_client(bucket_region: Region) -> Client {
let mut retry_config_builder = RetryConfigBuilder::new();
retry_config_builder
.set_max_attempts(Some(3))
.set_mode(Some(RetryMode::Adaptive));
let config = aws_config::defaults(aws_config::BehaviorVersion::v2024_03_28())
.region(bucket_region)
.retry_config(retry_config_builder.build())
.load()
.await;
Client::new(&config)
@@ -427,6 +435,7 @@ async fn list_objects_with_retries(
Err(anyhow!("unreachable unless MAX_RETRIES==0"))
}
/// Listing possibly large amounts of keys in a streaming fashion.
fn stream_objects_with_retries<'a>(
storage_client: &'a GenericRemoteStorage,
listing_mode: ListingMode,
@@ -465,6 +474,45 @@ fn stream_objects_with_retries<'a>(
}
}
/// If you want to list a bounded amount of prefixes or keys. For larger numbers of keys/prefixes,
/// use [`stream_objects_with_retries`] instead.
async fn list_objects_with_retries_generic(
remote_client: &GenericRemoteStorage,
listing_mode: ListingMode,
s3_target: &S3Target,
) -> anyhow::Result<Listing> {
let cancel = CancellationToken::new();
let prefix_str = &s3_target
.prefix_in_bucket
.strip_prefix("/")
.unwrap_or(&s3_target.prefix_in_bucket);
let prefix = RemotePath::from_string(prefix_str)?;
for trial in 0..MAX_RETRIES {
match remote_client
.list(Some(&prefix), listing_mode, None, &cancel)
.await
{
Ok(response) => return Ok(response),
Err(e) => {
if trial == MAX_RETRIES - 1 {
return Err(e)
.with_context(|| format!("Failed to list objects {MAX_RETRIES} times"));
}
error!(
"list_objects_v2 query failed: bucket_name={}, prefix={}, delimiter={}, error={}",
s3_target.bucket_name,
s3_target.prefix_in_bucket,
s3_target.delimiter,
DisplayErrorContext(e),
);
let backoff_time = 1 << trial.max(5);
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
}
}
}
panic!("MAX_RETRIES is not allowed to be 0");
}
async fn download_object_with_retries(
s3_client: &Client,
bucket_name: &str,

View File

@@ -17,6 +17,11 @@ use storage_scrubber::{
use clap::{Parser, Subcommand};
use utils::id::TenantId;
use utils::{project_build_tag, project_git_version};
project_git_version!(GIT_VERSION);
project_build_tag!(BUILD_TAG);
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
#[command(arg_required_else_help(true))]
@@ -101,6 +106,8 @@ enum Command {
async fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
tracing::info!("version: {}, build_tag {}", GIT_VERSION, BUILD_TAG);
let bucket_config = BucketConfig::from_env()?;
let command_log_name = match &cli.command {

View File

@@ -189,6 +189,63 @@ pub async fn stream_tenant_timelines<'a>(
})
}
/// Given a `TenantShardId`, output a stream of the timelines within that tenant, discovered
/// using a listing. The listing is done before the stream is built, so that this
/// function can be used to generate concurrency on a stream using buffer_unordered.
pub async fn stream_tenant_timelines_generic<'a>(
remote_client: &'a GenericRemoteStorage,
target: &'a RootTarget,
tenant: TenantShardId,
) -> anyhow::Result<impl Stream<Item = Result<TenantShardTimelineId, anyhow::Error>> + 'a> {
let mut timeline_ids: Vec<Result<TimelineId, anyhow::Error>> = Vec::new();
let timelines_target = target.timelines_root(&tenant);
let mut objects_stream = std::pin::pin!(stream_objects_with_retries(
remote_client,
ListingMode::WithDelimiter,
&timelines_target
));
loop {
tracing::debug!("Listing in {tenant}");
let fetch_response = match objects_stream.next().await {
None => break,
Some(Err(e)) => {
timeline_ids.push(Err(e));
break;
}
Some(Ok(r)) => r,
};
let new_entry_ids = fetch_response
.prefixes
.iter()
.filter_map(|prefix| -> Option<&str> {
prefix
.get_path()
.as_str()
.strip_prefix(&timelines_target.prefix_in_bucket)?
.strip_suffix('/')
})
.map(|entry_id_str| {
entry_id_str
.parse::<TimelineId>()
.with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
});
for i in new_entry_ids {
timeline_ids.push(i);
}
}
tracing::debug!("Yielding for {}", tenant);
Ok(stream! {
for i in timeline_ids {
let id = i?;
yield Ok(TenantShardTimelineId::new(tenant, id));
}
})
}
pub(crate) fn stream_listing<'a>(
s3_client: &'a Client,
target: &'a S3Target,

View File

@@ -150,6 +150,7 @@ PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
"pageserver_pitr_history_size",
"pageserver_layer_bytes",
"pageserver_layer_count",
"pageserver_visible_physical_size",
"pageserver_storage_operations_seconds_count_total",
"pageserver_storage_operations_seconds_sum_total",
"pageserver_evictions_total",

View File

@@ -1943,11 +1943,15 @@ class NeonCli(AbstractNeonCli):
remote_ext_config: Optional[str] = None,
pageserver_id: Optional[int] = None,
allow_multiple=False,
basebackup_request_tries: Optional[int] = None,
) -> "subprocess.CompletedProcess[str]":
args = [
"endpoint",
"start",
]
extra_env_vars = {}
if basebackup_request_tries is not None:
extra_env_vars["NEON_COMPUTE_TESTING_BASEBACKUP_TRIES"] = str(basebackup_request_tries)
if remote_ext_config is not None:
args.extend(["--remote-ext-config", remote_ext_config])
@@ -1960,7 +1964,7 @@ class NeonCli(AbstractNeonCli):
if allow_multiple:
args.extend(["--allow-multiple"])
res = self.raw_cli(args)
res = self.raw_cli(args, extra_env_vars)
res.check_returncode()
return res
@@ -3812,6 +3816,7 @@ class Endpoint(PgProtocol, LogUtils):
pageserver_id: Optional[int] = None,
safekeepers: Optional[List[int]] = None,
allow_multiple: bool = False,
basebackup_request_tries: Optional[int] = None,
) -> "Endpoint":
"""
Start the Postgres instance.
@@ -3833,6 +3838,7 @@ class Endpoint(PgProtocol, LogUtils):
remote_ext_config=remote_ext_config,
pageserver_id=pageserver_id,
allow_multiple=allow_multiple,
basebackup_request_tries=basebackup_request_tries,
)
self._running.release(1)
@@ -3979,6 +3985,7 @@ class Endpoint(PgProtocol, LogUtils):
remote_ext_config: Optional[str] = None,
pageserver_id: Optional[int] = None,
allow_multiple=False,
basebackup_request_tries: Optional[int] = None,
) -> "Endpoint":
"""
Create an endpoint, apply config, and start Postgres.
@@ -3999,6 +4006,7 @@ class Endpoint(PgProtocol, LogUtils):
remote_ext_config=remote_ext_config,
pageserver_id=pageserver_id,
allow_multiple=allow_multiple,
basebackup_request_tries=basebackup_request_tries,
)
log.info(f"Postgres startup took {time.time() - started_at} seconds")
@@ -4042,6 +4050,7 @@ class EndpointFactory:
config_lines: Optional[List[str]] = None,
remote_ext_config: Optional[str] = None,
pageserver_id: Optional[int] = None,
basebackup_request_tries: Optional[int] = None,
) -> Endpoint:
ep = Endpoint(
self.env,
@@ -4060,6 +4069,7 @@ class EndpointFactory:
lsn=lsn,
remote_ext_config=remote_ext_config,
pageserver_id=pageserver_id,
basebackup_request_tries=basebackup_request_tries,
)
def create(
@@ -4529,6 +4539,13 @@ def test_output_dir(
yield test_dir
# Allure artifacts creation might involve the creation of `.tar.zst` archives,
# which aren't going to be used if Allure results collection is not enabled
# (i.e. --alluredir is not set).
# Skip `allure_attach_from_dir` in this case
if not request.config.getoption("--alluredir"):
return
preserve_database_files = False
for k, v in request.node.user_properties:
# NB: the neon_env_builder fixture uses this fixture (test_output_dir).

View File

@@ -663,6 +663,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
force_image_layer_creation=False,
wait_until_uploaded=False,
compact: Optional[bool] = None,
**kwargs,
):
self.is_testing_enabled_or_skip()
query = {}
@@ -680,6 +681,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint",
params=query,
**kwargs,
)
log.info(f"Got checkpoint request response code: {res.status_code}")
self.verbose_error(res)

View File

@@ -6,21 +6,8 @@ from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
@pytest.mark.timeout(10000)
def test_gc_feedback(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
"""
Test that GC is able to collect all old layers even if them are forming
"stairs" and there are not three delta layers since last image layer.
Information about image layers needed to collect old layers should
be propagated by GC to compaction task which should take in in account
when make a decision which new image layers needs to be created.
NB: this test demonstrates the problem. The source tree contained the
`gc_feedback` mechanism for about 9 months, but, there were problems
with it and it wasn't enabled at runtime.
This PR removed the code: https://github.com/neondatabase/neon/pull/6863
"""
def gc_feedback_impl(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker, mode: str):
assert mode == "normal" or mode == "with_snapshots"
env = neon_env_builder.init_start()
client = env.pageserver.http_client()
@@ -74,6 +61,9 @@ def test_gc_feedback(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchma
physical_size = client.timeline_detail(tenant_id, timeline_id)["current_physical_size"]
log.info(f"Physical storage size {physical_size}")
if mode == "with_snapshots":
if step == n_steps / 2:
env.neon_cli.create_branch("child")
max_num_of_deltas_above_image = 0
max_total_num_of_deltas = 0
@@ -149,3 +139,37 @@ def test_gc_feedback(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchma
log.info(f"Writing layer map to {layer_map_path}")
with layer_map_path.open("w") as f:
f.write(json.dumps(client.timeline_layer_map_info(tenant_id, timeline_id)))
@pytest.mark.timeout(10000)
def test_gc_feedback(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
"""
Test that GC is able to collect all old layers even if them are forming
"stairs" and there are not three delta layers since last image layer.
Information about image layers needed to collect old layers should
be propagated by GC to compaction task which should take in in account
when make a decision which new image layers needs to be created.
NB: this test demonstrates the problem. The source tree contained the
`gc_feedback` mechanism for about 9 months, but, there were problems
with it and it wasn't enabled at runtime.
This PR removed the code: https://github.com/neondatabase/neon/pull/6863
And the bottom-most GC-compaction epic resolves the problem.
https://github.com/neondatabase/neon/issues/8002
"""
gc_feedback_impl(neon_env_builder, zenbenchmark, "normal")
@pytest.mark.timeout(10000)
def test_gc_feedback_with_snapshots(
neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker
):
"""
Compared with `test_gc_feedback`, we create a branch without written data (=snapshot) in the middle
of the benchmark, and the bottom-most compaction should collect as much garbage as possible below the GC
horizon. Ideally, there should be images (in an image layer) covering the full range at the branch point,
and images covering the full key range (in a delta layer) at the GC horizon.
"""
gc_feedback_impl(neon_env_builder, zenbenchmark, "with_snapshots")

View File

@@ -18,7 +18,6 @@ from fixtures.pageserver.utils import wait_until_tenant_active
from fixtures.utils import query_scalar
from performance.test_perf_pgbench import get_scales_matrix
from requests import RequestException
from requests.exceptions import RetryError
# Test branch creation
@@ -151,7 +150,7 @@ def test_cannot_create_endpoint_on_non_uploaded_timeline(neon_env_builder: NeonE
env.pageserver.allowed_errors.extend(
[
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*",
".*page_service_conn_main.*: query handler for 'basebackup .* is not active, state: Loading",
".*page_service_conn_main.*: query handler for 'basebackup .* ERROR: Not found: Timeline",
]
)
ps_http = env.pageserver.http_client()
@@ -176,10 +175,12 @@ def test_cannot_create_endpoint_on_non_uploaded_timeline(neon_env_builder: NeonE
env.neon_cli.map_branch(initial_branch, env.initial_tenant, env.initial_timeline)
with pytest.raises(RuntimeError, match="is not active, state: Loading"):
env.endpoints.create_start(initial_branch, tenant_id=env.initial_tenant)
with pytest.raises(RuntimeError, match="ERROR: Not found: Timeline"):
env.endpoints.create_start(
initial_branch, tenant_id=env.initial_tenant, basebackup_request_tries=2
)
ps_http.configure_failpoints(("before-upload-index-pausable", "off"))
finally:
# FIXME: paused uploads bother shutdown
env.pageserver.stop(immediate=True)
t.join()
@@ -193,8 +194,11 @@ def test_cannot_branch_from_non_uploaded_branch(neon_env_builder: NeonEnvBuilder
env = neon_env_builder.init_configs()
env.start()
env.pageserver.allowed_errors.append(
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
env.pageserver.allowed_errors.extend(
[
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*",
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: .*Cannot branch off the timeline that's not present in pageserver.*",
]
)
ps_http = env.pageserver.http_client()
@@ -216,7 +220,10 @@ def test_cannot_branch_from_non_uploaded_branch(neon_env_builder: NeonEnvBuilder
branch_id = TimelineId.generate()
with pytest.raises(RetryError, match="too many 503 error responses"):
with pytest.raises(
PageserverApiException,
match="Cannot branch off the timeline that's not present in pageserver",
):
ps_http.timeline_create(
env.pg_version,
env.initial_tenant,

View File

@@ -3,18 +3,15 @@ import re
import shutil
import subprocess
import tempfile
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional
import pytest
import toml
from fixtures.common_types import Lsn
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
PgBin,
)
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, PgBin
from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import (
timeline_delete_wait_completed,
@@ -22,7 +19,8 @@ from fixtures.pageserver.utils import (
wait_for_upload,
)
from fixtures.pg_version import PgVersion
from fixtures.remote_storage import RemoteStorageKind
from fixtures.remote_storage import RemoteStorageKind, S3Storage, s3_storage
from fixtures.workload import Workload
#
# A test suite that help to prevent unintentionally breaking backward or forward compatibility between Neon releases.
@@ -409,3 +407,133 @@ def dump_differs(
break
return differs
@dataclass
class HistoricDataSet:
name: str
tenant_id: TenantId
pg_version: PgVersion
url: str
def __str__(self):
return self.name
HISTORIC_DATA_SETS = [
# From before we enabled image layer compression.
# - IndexPart::LATEST_VERSION 7
# - STORAGE_FORMAT_VERSION 3
HistoricDataSet(
"2024-07-18",
TenantId("17bf64a53509714687664b3a84e9b3ba"),
PgVersion.V16,
"https://neon-github-public-dev.s3.eu-central-1.amazonaws.com/compatibility-data-snapshots/2024-07-18-pgv16.tar.zst",
),
]
@pytest.mark.parametrize("dataset", HISTORIC_DATA_SETS)
@pytest.mark.xdist_group("compatibility")
def test_historic_storage_formats(
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,
pg_version: PgVersion,
dataset: HistoricDataSet,
):
"""
This test is like test_backward_compatibility, but it looks back further to examples of our storage format from long ago.
"""
ARTIFACT_CACHE_DIR = "./artifact_cache"
import tarfile
from contextlib import closing
import requests
import zstandard
artifact_unpack_path = ARTIFACT_CACHE_DIR / Path("unpacked") / Path(dataset.name)
# Note: we assume that when running across a matrix of PG versions, the matrix includes all the versions needed by
# HISTORIC_DATA_SETS. If we ever remove a PG version from the matrix, then historic datasets built using that version
# will no longer be covered by this test.
if pg_version != dataset.pg_version:
pytest.skip(f"Dataset {dataset} is for different PG version, skipping")
with closing(requests.get(dataset.url, stream=True)) as r:
unzstd = zstandard.ZstdDecompressor()
with unzstd.stream_reader(r.raw) as stream:
with tarfile.open(mode="r|", fileobj=stream) as tf:
tf.extractall(artifact_unpack_path)
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.pg_version = dataset.pg_version
env = neon_env_builder.init_configs()
env.start()
assert isinstance(env.pageserver_remote_storage, S3Storage)
# Link artifact data into test's remote storage. We don't want the whole repo dir, just the remote storage part: we are not testing
# compat of local disk data across releases (test_backward_compat does that), we're testing really long-lived data in S3 like layer files and indices.
#
# The code generating the snapshot uses local_fs, but this test uses S3Storage, so we are copying a tree of files into a bucket. We use
# S3Storage so that the scrubber can run (the scrubber doesn't speak local_fs)
artifact_pageserver_path = (
artifact_unpack_path / Path("repo") / Path("local_fs_remote_storage") / Path("pageserver")
)
for root, _dirs, files in os.walk(artifact_pageserver_path):
for file in files:
local_path = os.path.join(root, file)
remote_key = (
env.pageserver_remote_storage.prefix_in_bucket
+ str(local_path)[len(str(artifact_pageserver_path)) :]
)
log.info(f"Uploading {local_path} -> {remote_key}")
env.pageserver_remote_storage.client.upload_file(
local_path, env.pageserver_remote_storage.bucket_name, remote_key
)
# Check the scrubber handles this old data correctly (can read it and doesn't consider it corrupt)
#
# Do this _before_ importing to the pageserver, as that import may start writing immediately
metadata_summary = env.storage_scrubber.scan_metadata()
assert metadata_summary["tenant_count"] >= 1
assert metadata_summary["timeline_count"] >= 1
assert not metadata_summary["with_errors"]
assert not metadata_summary["with_warnings"]
env.neon_cli.import_tenant(dataset.tenant_id)
# Discover timelines
timelines = env.pageserver.http_client().timeline_list(dataset.tenant_id)
# All our artifacts should contain at least one timeline
assert len(timelines) > 0
# TODO: ensure that the snapshots we're importing contain a sensible variety of content, at the very
# least they should include a mixture of deltas and image layers. Preferably they should also
# contain some "exotic" stuff like aux files from logical replication.
# Check we can start an endpoint and read the SQL that the artifact is meant to contain
reference_sql_dump = artifact_unpack_path / Path("dump.sql")
ep = env.endpoints.create_start("main", tenant_id=dataset.tenant_id)
pg_bin = PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version)
pg_bin.run_capture(
["pg_dumpall", f"--dbname={ep.connstr()}", f"--file={test_output_dir / 'dump.sql'}"]
)
assert not dump_differs(
reference_sql_dump,
test_output_dir / "dump.sql",
test_output_dir / "dump.filediff",
)
ep.stop()
# Check we can also do writes to the database
existing_timeline_id = TimelineId(timelines[0]["timeline_id"])
workload = Workload(env, dataset.tenant_id, existing_timeline_id)
workload.init()
workload.write_rows(100)
# Check that compaction works
env.pageserver.http_client().timeline_compact(
dataset.tenant_id, existing_timeline_id, force_image_layer_creation=True
)

View File

@@ -12,7 +12,6 @@ from fixtures.neon_fixtures import (
NeonEnvBuilder,
wait_for_last_flush_lsn,
)
from fixtures.pageserver.common_types import parse_layer_file_name
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
from fixtures.pageserver.utils import (
timeline_delete_wait_completed,
@@ -313,6 +312,7 @@ def test_remote_storage_upload_queue_retries(
def churn_while_failpoints_active(result):
overwrite_data_and_wait_for_it_to_arrive_at_pageserver("c")
# this call will wait for the failpoints to be turned off
client.timeline_checkpoint(tenant_id, timeline_id)
client.timeline_compact(tenant_id, timeline_id)
overwrite_data_and_wait_for_it_to_arrive_at_pageserver("d")
@@ -332,8 +332,8 @@ def test_remote_storage_upload_queue_retries(
# Exponential back-off in upload queue, so, gracious timeouts.
wait_until(30, 1, lambda: assert_gt(get_queued_count(file_kind="layer", op_kind="upload"), 0))
wait_until(30, 1, lambda: assert_ge(get_queued_count(file_kind="index", op_kind="upload"), 2))
wait_until(30, 1, lambda: assert_gt(get_queued_count(file_kind="layer", op_kind="delete"), 0))
wait_until(30, 1, lambda: assert_ge(get_queued_count(file_kind="index", op_kind="upload"), 1))
wait_until(30, 1, lambda: assert_eq(get_queued_count(file_kind="layer", op_kind="delete"), 0))
# unblock churn operations
configure_storage_sync_failpoints("off")
@@ -769,11 +769,11 @@ def test_empty_branch_remote_storage_upload_on_restart(neon_env_builder: NeonEnv
create_thread.join()
def test_compaction_waits_for_upload(
def test_paused_upload_stalls_checkpoint(
neon_env_builder: NeonEnvBuilder,
):
"""
This test forces a race between upload and compaction.
This test checks that checkpoints block on uploads to remote storage.
"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
@@ -788,6 +788,10 @@ def test_compaction_waits_for_upload(
}
)
env.pageserver.allowed_errors.append(
f".*PUT.* path=/v1/tenant/{env.initial_tenant}/timeline.* request was dropped before completing"
)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
@@ -808,76 +812,9 @@ def test_compaction_waits_for_upload(
endpoint.safe_psql("CREATE TABLE foo AS SELECT x FROM generate_series(1, 10000) g(x)")
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
client.timeline_checkpoint(tenant_id, timeline_id)
deltas_at_first = len(client.layer_map_info(tenant_id, timeline_id).delta_layers())
assert (
deltas_at_first == 2
), "are you fixing #5863? just add one more checkpoint after 'CREATE TABLE bar ...' statement."
endpoint.safe_psql("CREATE TABLE bar AS SELECT x FROM generate_series(1, 10000) g(x)")
endpoint.safe_psql("UPDATE foo SET x = 0 WHERE x = 1")
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
layers_before_last_checkpoint = client.layer_map_info(tenant_id, timeline_id).historic_by_name()
upload_stuck_layers = layers_before_last_checkpoint - layers_at_creation.historic_by_name()
assert len(upload_stuck_layers) > 0
for name in upload_stuck_layers:
assert env.pageserver.layer_exists(
tenant_id, timeline_id, parse_layer_file_name(name)
), "while uploads are stuck the layers should be present on disk"
# now this will do the L0 => L1 compaction and want to remove
# upload_stuck_layers and the original initdb L0
client.timeline_checkpoint(tenant_id, timeline_id)
# as uploads are paused, the upload_stuck_layers should still be with us
for name in upload_stuck_layers:
assert env.pageserver.layer_exists(
tenant_id, timeline_id, parse_layer_file_name(name)
), "uploads are stuck still over compaction"
compacted_layers = client.layer_map_info(tenant_id, timeline_id).historic_by_name()
overlap = compacted_layers.intersection(upload_stuck_layers)
assert len(overlap) == 0, "none of the L0's should remain after L0 => L1 compaction"
assert (
len(compacted_layers) == 1
), "there should be one L1 after L0 => L1 compaction (without #5863 being fixed)"
def layer_deletes_completed():
m = client.get_metric_value("pageserver_layer_completed_deletes_total")
if m is None:
return 0
return int(m)
# if initdb created an initial delta layer, it might already be gc'd
# because it was uploaded before the failpoint was enabled. however, the
# deletion is not guaranteed to be complete.
assert layer_deletes_completed() <= 1
client.configure_failpoints(("before-upload-layer-pausable", "off"))
# Ensure that this actually terminates
wait_upload_queue_empty(client, tenant_id, timeline_id)
def until_layer_deletes_completed():
deletes = layer_deletes_completed()
log.info(f"layer_deletes: {deletes}")
# ensure that initdb delta layer AND the previously stuck are now deleted
assert deletes >= len(upload_stuck_layers) + 1
wait_until(10, 1, until_layer_deletes_completed)
for name in upload_stuck_layers:
assert not env.pageserver.layer_exists(
tenant_id, timeline_id, parse_layer_file_name(name)
), "l0 should now be removed because of L0 => L1 compaction and completed uploads"
# We should not have hit the error handling path in uploads where a uploaded file is gone
assert not env.pageserver.log_contains(
"File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more."
)
with pytest.raises(ReadTimeout):
client.timeline_checkpoint(tenant_id, timeline_id, timeout=5)
client.configure_failpoints(("before-upload-layer-pausable", "off"))
def wait_upload_queue_empty(

View File

@@ -13,6 +13,7 @@ from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
)
from fixtures.pg_version import PgVersion
from fixtures.remote_storage import S3Storage, s3_storage
from fixtures.utils import wait_until
from fixtures.workload import Workload
@@ -265,10 +266,85 @@ def test_scrubber_physical_gc_ancestors(
# attach it, to drop any local state, then check it's still readable.
workload.stop()
drop_local_state(env, tenant_id)
workload.validate()
def test_scrubber_physical_gc_timeline_deletion(neon_env_builder: NeonEnvBuilder):
"""
When we delete a timeline after a shard split, the child shards do not directly delete the
layers in the ancestor shards. They rely on the scrubber to clean up.
"""
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_configs()
env.start()
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
env.neon_cli.create_tenant(
tenant_id,
timeline_id,
shard_count=None,
conf={
# Small layers and low compaction thresholds, so that when we split we can expect some to
# be dropped by child shards
"checkpoint_distance": f"{1024 * 1024}",
"compaction_threshold": "1",
"compaction_target_size": f"{1024 * 1024}",
"image_creation_threshold": "2",
"image_layer_creation_check_threshold": "0",
# Disable background compaction, we will do it explicitly
"compaction_period": "0s",
# No PITR, so that as soon as child shards generate an image layer, it covers ancestor deltas
# and makes them GC'able
"pitr_interval": "0s",
},
)
# Make sure the original shard has some layers
workload = Workload(env, tenant_id, timeline_id)
workload.init()
workload.write_rows(100)
new_shard_count = 4
shards = env.storage_controller.tenant_shard_split(tenant_id, shard_count=new_shard_count)
# Create a second timeline so that when we delete the first one, child shards still have some content in S3.
#
# This is a limitation of the scrubber: if a shard isn't in S3 (because it has no timelines), then the scrubber
# doesn't know about it, and won't perceive its ancestors as ancestors.
other_timeline_id = TimelineId.generate()
env.storage_controller.pageserver_api().timeline_create(
PgVersion.NOT_SET, tenant_id, other_timeline_id
)
# Write after split so that child shards have some indices in S3
workload.write_rows(100, upload=False)
for shard in shards:
ps = env.get_tenant_pageserver(shard)
log.info(f"Waiting for shard {shard} on pageserver {ps.id}")
ps.http_client().timeline_checkpoint(
shard, timeline_id, compact=False, wait_until_uploaded=True
)
# The timeline still exists in child shards and they reference its layers, so scrubbing
# now shouldn't delete anything.
gc_summary = env.storage_scrubber.pageserver_physical_gc(min_age_secs=0, mode="full")
assert gc_summary["remote_storage_errors"] == 0
assert gc_summary["indices_deleted"] == 0
assert gc_summary["ancestor_layers_deleted"] == 0
# Delete the timeline
env.storage_controller.pageserver_api().timeline_delete(tenant_id, timeline_id)
# Subsequently doing physical GC should clean up the ancestor layers
gc_summary = env.storage_scrubber.pageserver_physical_gc(min_age_secs=0, mode="full")
assert gc_summary["remote_storage_errors"] == 0
assert gc_summary["indices_deleted"] == 0
assert gc_summary["ancestor_layers_deleted"] > 0
def test_scrubber_physical_gc_ancestors_split(neon_env_builder: NeonEnvBuilder):
"""
Exercise ancestor GC while a tenant is partly split: this test ensures that if we have some child shards

View File

@@ -1,5 +1,5 @@
{
"v16": ["16.3", "b39f316137fdd29e2da15d2af2fdd1cfd18163be"],
"v15": ["15.7", "035b73a9c5998f9a0ef35cc8df1bae680bf770fc"],
"v14": ["14.12", "dbd0e6428b9274d72a10ac29bd3e3162faf109d4"]
"v16": ["16.3", "60fab0e62ca0150276bf03231cc1339b29d3465c"],
"v15": ["15.7", "9eba7dd382606ffca43aca865f337ec21bcdac73"],
"v14": ["14.12", "7bbe834c8c2dc37802eca8484311599bc47341f6"]
}

View File

@@ -277,8 +277,12 @@ files:
help: 'Bytes between received and replayed LSN'
key_labels:
values: [replication_delay_bytes]
# We use a GREATEST call here because this calculation can be negative.
# The calculation is not atomic, meaning after we've gotten the receive
# LSN, the replay LSN may have advanced past the receive LSN we
# are using for the calculation.
query: |
SELECT pg_wal_lsn_diff(pg_last_wal_receive_lsn(), pg_last_wal_replay_lsn()) AS replication_delay_bytes;
SELECT GREATEST(0, pg_wal_lsn_diff(pg_last_wal_receive_lsn(), pg_last_wal_replay_lsn())) AS replication_delay_bytes;
- metric_name: replication_delay_seconds
type: gauge