Compare commits

..

16 Commits

Author SHA1 Message Date
Heikki Linnakangas
b37cc3806b Stop using mold linker
There isn't anything wrong with with 'mold' as such, but it also
doesn't give any benefit. Better to stick to the defaults when there's
no reason to deviate.

'mold' might be a little faster, but it's insignificant when doing a
full build. It might make a difference when doing an incremental
build, but docker builds are not incremental.

I didn't remove 'mold' from the build-tools image yet, as I'm not sure
if the image might still be used to build old images that are still
using 'mold', or perhaps to build PRs created earlier. I will remove
'mold' from build-tools later as a separate PR, after some time has
passed.
2025-02-09 16:43:37 +02:00
Erik Grinaker
ac55e2dbe5 pageserver: improve tenant housekeeping task (#10725)
# Problem

walredo shutdown is done in the compaction task. Let's move it to tenant
housekeeping.

# Summary of changes

* Rename "ingest housekeeping" to "tenant housekeeping".
* Move walredo shutdown into tenant housekeeping.
* Add a constant `WALREDO_IDLE_TIMEOUT` set to 3 minutes (previously 10x
compaction threshold).
2025-02-08 12:42:55 +00:00
Erik Grinaker
874accd6ed pageserver: misc task cleanups (#10723)
This patch does a bunch of superficial cleanups of `tenant::tasks` to
avoid noise in subsequent PRs. There are no functional changes.

PS: enable "hide whitespace" when reviewing, due to the unindentation of
large async blocks.
2025-02-08 11:02:13 +00:00
Christian Schwarz
6cd3b501ec fix(page_service / batching): smgr op latency metrics includes the flush time of preceding requests (#10728)
Before this PR, if a batch contains N responses, the smgr op latency
reported for response (N-i) would include the time we spent flushing
the preceding requests.

refs:
- fixup of https://github.com/neondatabase/neon/pull/10042
- fixes https://github.com/neondatabase/neon/issues/10674
2025-02-08 09:28:09 +00:00
Christian Schwarz
bf20d78292 fix(page_service): page reconstruct error log does not include shard_id label (#10680)
# Problem

Before this PR, the `shard_id` field was missing when page_service logs
a reconstruct error.

This was caused by batching-related refactorings.

Example from staging:

```
2025-01-30T07:10:04.346022Z ERROR page_service_conn_main{peer_addr=...}:process_query{tenant_id=... timeline_id=...}:handle_pagerequests:request:handle_get_page_at_lsn_request_batched{req_lsn=FFFFFFFF/FFFFFFFF}: error reading relation or page version: Read error: whole vectored get request failed because one or more of the requested keys were missing: could not find data for key  ...
```

# Changes

Delay creation of the handler-specific span until after shard routing

This also avoids the need for the record() call in the pagestream hot
path.

# Testing

Manual testing with a failpoint that is part of this PR's history but
will be squashed away.


# Refs

- fixes https://github.com/neondatabase/neon/issues/10599
2025-02-07 19:45:39 +00:00
Arpad Müller
2656c713a4 Revert recent AWS SDK update (#10724)
We've been seeing some regressions in staging since the AWS SDK updates:
https://github.com/neondatabase/neon/issues/10695 . We aren't sure the
regression was caused by the SDK update, but the issues do involve S3,
so it's not unlikely. By reverting the SDK update we find out whether it
was really the SDK update, or something else.

Reverts the two PRs:

* https://github.com/neondatabase/neon/pull/10588
* https://github.com/neondatabase/neon/pull/10699

https://neondb.slack.com/archives/C08C2G15M6U/p1738576986047179
2025-02-07 17:37:53 +00:00
John Spray
5e95860e70 tests: wait for manifest persistence in test_timeline_archival_chaos (#10719)
## Problem

This test would sometimes fail its assertion that a timeline does not
revert to active once archived. That's because it was using the
in-memory offload state, not the persistent state, so this was sometimes
lost across a pageserver restart.

Closes: https://github.com/neondatabase/neon/issues/10389

## Summary of changes

- When reading offload status, read from pageserver API _and_ remote
storage before considering the timeline offloaded
2025-02-07 16:27:39 +00:00
Heikki Linnakangas
0abff59e97 compute: Allow postgres user to power off the VM (#10710)
I plan to use this when launching a fast_import job in a VM. There's
currently no good way for an executable running in a NeonVM to exit
gracefully and have the VM shut down. The inittab we use always respawns
the payload command. The idea is that the control plane can use
"fast_import ... && poweroff" as the command, so that when fast_import
completes successfully, the VM is terminated, and the k8s Pod and
VirtualMachine object are marked as completed successfully.

I'm working on bigger changes to how we launch VMs, and will try to come
up with a nicer system for that, but in the meanwhile, this quick hack
allows us to proceed with using VMs for one-off jobs like fast_import.
2025-02-07 16:03:01 +00:00
John Spray
9609f7547e tests: address warnings in timeline shutdown (#10702)
## Problem

There are a couple of log warnings tripping up
`test_timeline_archival_chaos`

- `[stopping left-over name="timeline_delete"
tenant_shard_id=2d526292b67dac0e6425266d7079c253
timeline_id=Some(44ba36bfdee5023672c93778985facd9)
kind=TimelineDeletionWorker\n')](https://neon-github-public-dev.s3.amazonaws.com/reports/pr-10672/13161357302/index.html#/testresult/716b997bb1d8a021)`
- `ignoring attempt to restart exited flush_loop
503d8f401d8887cfaae873040a6cc193/d5eed0673ba37d8992f7ec411363a7e3\n')`

Related: https://github.com/neondatabase/neon/issues/10389

## Summary of changes

- Downgrade the 'ignoring attempt to restart' to info -- there's nothing
in the design that forbids this happening, i.e. someone calling
maybe_spawn_flush_loop concurrently with shutdown()
- Prevent timeline deletion tasks outliving tenants by carrying a
gateguard. This logically makes sense because the deletion process does
call into Tenant to update manifests.
2025-02-07 15:29:34 +00:00
Erik Grinaker
d6e87a3a9c pageserver: add separate, disabled compaction semaphore (#10716)
## Problem

L0 compaction can get starved by other background tasks. It needs to be
responsive to avoid read amp blowing up during heavy write workloads.

Touches #10694.

## Summary of changes

Add a separate semaphore for compaction, configurable via
`use_compaction_semaphore` (disabled by default). This is primarily for
testing in staging; it needs further work (in particular to split
image/L0 compaction jobs) before it can be enabled.
2025-02-07 15:11:31 +00:00
Arpad Müller
f5243992fa safekeeper: make timeline deletions a bit more verbose (#10721)
Make timeline deletion print the sub-steps, so that we can narrow down
some stuck timeline deletion issues we are observing.

https://neondb.slack.com/archives/C08C2G15M6U/p1738930694716009
2025-02-07 15:06:26 +00:00
John Spray
95220ba43e tests: fix flaky endpoint in test_ingest_logical_message (#10700)
## Problem

Endpoint kept running while timeline was deleted, causing forbidden
warnings on the pageserver when the tenant is not found.

## Summary of changes

- Explicitly stop the endpoint before the end of the test, so that it
isn't trying to talk to the pageserver in the background while things
are torn down
2025-02-07 14:51:36 +00:00
John Spray
08f92bb916 pageserver: clean up DeletionQueue push_layers_sync (#10701)
## Problem

This is tech debt. While we introduced generations for tenants, some
legacy situations without generations needed to delete things inline
(async operation) instead of enqueing them (sync operation).

## Summary of changes

- Remove the async code, replace calls with the sync variant, and assert
that the generation is always set
2025-02-07 13:03:01 +00:00
Fedor Dikarev
8f651f9582 switch from localtest.me to local.neon.build (#10714)
## Problem
Ref: https://github.com/neondatabase/neon/issues/10632

We use dns named `*.localtest.me` in our test, and that domain is
well-known and widely used for that, with all the records there resolve
to the localhost, both IPv4 and IPv6: `127.0.0.1` and `::1`

In some cases on our runners these addresses resolves only to `IPv6`,
and so components fail to connect when runner doesn't have `IPv6`
address. We suspect issue in systemd-resolved here
(https://github.com/systemd/systemd/issues/17745)
To workaround that and improve test stability, we introduced our own
domain `*.local.neon.build` with IPv4 address `127.0.0.1` only

See full details and troubleshoot log in referred issue.

p.s.
If you're FritzBox user, don't forget to add that domain
`local.neon.build` to the `DNS Rebind Protection` section under `Home
Network -> Network -> Network Settings`, otherwise FritzBox will block
addresses, resolving to the local addresses.
For other devices/vendors, please check corresponding documentation, if
resolving `local.neon.build` will produce empty answer for you.

## Summary of changes
Replace all the occurrences of `localtest.me` with `local.neon.build`
2025-02-07 12:25:16 +00:00
Arseny Sher
b5a239c4ae Add reconciliation details to sk membership change rfc (#10514)
## Problem

RFC pointed out the need of reconciliation, but wasn't detailed how it
can be done.

## Summary of changes

Add these details.
2025-02-07 11:20:49 +00:00
Alexander Lakhin
de05258419 Adjust diesel schema check for build with sanitizers (#10711)
We need to disable the detection of memory leaks when running
``neon_local init` for build with sanitizers to avoid an error thrown by
AddressSanitizer.
2025-02-07 08:56:39 +00:00
32 changed files with 832 additions and 814 deletions

View File

@@ -145,32 +145,32 @@ jobs:
- name: Build postgres v14
if: steps.cache_pg_14.outputs.cache-hit != 'true'
run: mold -run make ${make_vars} postgres-v14 -j$(nproc)
run: make ${make_vars} postgres-v14 -j$(nproc)
- name: Build postgres v15
if: steps.cache_pg_15.outputs.cache-hit != 'true'
run: mold -run make ${make_vars} postgres-v15 -j$(nproc)
run: make ${make_vars} postgres-v15 -j$(nproc)
- name: Build postgres v16
if: steps.cache_pg_16.outputs.cache-hit != 'true'
run: mold -run make ${make_vars} postgres-v16 -j$(nproc)
run: make ${make_vars} postgres-v16 -j$(nproc)
- name: Build postgres v17
if: steps.cache_pg_17.outputs.cache-hit != 'true'
run: mold -run make ${make_vars} postgres-v17 -j$(nproc)
run: make ${make_vars} postgres-v17 -j$(nproc)
- name: Build neon extensions
run: mold -run make ${make_vars} neon-pg-ext -j$(nproc)
run: make ${make_vars} neon-pg-ext -j$(nproc)
- name: Build walproposer-lib
run: mold -run make ${make_vars} walproposer-lib -j$(nproc)
run: make ${make_vars} walproposer-lib -j$(nproc)
- name: Run cargo build
env:
WITH_TESTS: ${{ matrix.sanitizers != 'enabled' && '--tests' || '' }}
run: |
export ASAN_OPTIONS=detect_leaks=0
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins ${WITH_TESTS}
${cov_prefix} cargo build $CARGO_FLAGS $CARGO_FEATURES --bins ${WITH_TESTS}
# Do install *before* running rust tests because they might recompile the
# binaries with different features/flags.
@@ -287,6 +287,7 @@ jobs:
DATABASE_URL: postgresql://localhost:1235/storage_controller
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
run: |
export ASAN_OPTIONS=detect_leaks=0
/tmp/neon/bin/neon_local init
/tmp/neon/bin/neon_local storage_controller start

View File

@@ -24,7 +24,7 @@ COPY --chown=nonroot scripts/ninstall.sh scripts/ninstall.sh
ENV BUILD_TYPE=release
RUN set -e \
&& mold -run make -j $(nproc) -s neon-pg-ext \
&& make -j $(nproc) -s neon-pg-ext \
&& rm -rf pg_install/build \
&& tar -C pg_install -czf /home/nonroot/postgres_install.tar.gz .
@@ -45,7 +45,7 @@ COPY --chown=nonroot . .
ARG ADDITIONAL_RUSTFLAGS
RUN set -e \
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo build \
&& RUSTFLAGS="-Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo build \
--bin pg_sni_router \
--bin pageserver \
--bin pagectl \

View File

@@ -1578,7 +1578,7 @@ ENV BUILD_TAG=$BUILD_TAG
USER nonroot
# Copy entire project to get Cargo.* files with proper dependencies for the whole project
COPY --chown=nonroot . .
RUN mold -run cargo build --locked --profile release-line-debug-size-lto --bin compute_ctl --bin fast_import --bin local_proxy
RUN cargo build --locked --profile release-line-debug-size-lto --bin compute_ctl --bin fast_import --bin local_proxy
#########################################################################################
#

View File

@@ -47,7 +47,9 @@ files:
# Allow postgres user (which is what compute_ctl runs as) to run /neonvm/bin/resize-swap
# and /neonvm/bin/set-disk-quota as root without requiring entering a password (NOPASSWD),
# regardless of hostname (ALL)
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota
#
# Also allow it to shut down the VM. The fast_import job does that when it's finished.
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff
- filename: cgconfig.conf
content: |
# Configuration for cgroups in VM compute nodes

View File

@@ -285,10 +285,10 @@ To summarize, list of cplane changes:
### storage_controller implementation
Current 'load everything on startup and keep in memory' easy design is fine.
Single timeline shouldn't take more than 100 bytes (it's 16 byte tenant_id, 16
byte timeline_id, int generation, vec of ~3 safekeeper ids plus some flags), so
10^6 of timelines shouldn't take more than 100MB.
If desired, we may continue using current 'load everything on startup and keep
in memory' approach: single timeline shouldn't take more than 100 bytes (it's 16
byte tenant_id, 16 byte timeline_id, int generation, vec of ~3 safekeeper ids
plus some flags), so 10^6 of timelines shouldn't take more than 100MB.
Similar to pageserver attachment Intents storage_controller would have in-memory
`MigrationRequest` (or its absense) for each timeline and pool of tasks trying
@@ -296,7 +296,7 @@ to make these request reality; this ensures one instance of storage_controller
won't do several migrations on the same timeline concurrently. In the first
version it is simpler to have more manual control and no retries, i.e. migration
failure removes the request. Later we can build retries and automatic
scheduling/migration. `MigrationRequest` is
scheduling/migration around. `MigrationRequest` is
```
enum MigrationRequest {
To(Vec<NodeId>),
@@ -313,9 +313,9 @@ similarly, in the first version it is ok to trigger it manually).
#### Schema
`safekeepers` table mirroring current `nodes` should be added, except that for
`scheduling_policy` field (seems like `status` is a better name for it): it is enough
to have at least in the beginning only 3 fields: 1) `active` 2) `offline` 3)
`decomissioned`.
`scheduling_policy`: it is enough to have at least in the beginning only 3
fields: 1) `active` 2) `paused` (initially means only not assign new tlis there
3) `decomissioned` (node is removed).
`timelines` table:
```
@@ -324,18 +324,24 @@ table! {
timelines (tenant_id, timeline_id) {
timeline_id -> Varchar,
tenant_id -> Varchar,
start_lsn -> pg_lsn,
generation -> Int4,
sk_set -> Array<Int4>, // list of safekeeper ids
new_sk_set -> Nullable<Array<Int4>>, // list of safekeeper ids, null if not joint conf
new_sk_set -> Nullable<Array<Int8>>, // list of safekeeper ids, null if not joint conf
cplane_notified_generation -> Int4,
deleted_at -> Nullable<Timestamptz>,
}
}
```
`start_lsn` is needed to create timeline on safekeepers properly, see below. We
might also want to add ancestor_timeline_id to preserve the hierarchy, but for
this RFC it is not needed.
#### API
Node management is similar to pageserver:
1) POST `/control/v1/safekeepers` upserts safekeeper.
1) POST `/control/v1/safekeepers` inserts safekeeper.
2) GET `/control/v1/safekeepers` lists safekeepers.
3) GET `/control/v1/safekeepers/:node_id` gets safekeeper.
4) PUT `/control/v1/safekepers/:node_id/status` changes status to e.g.
@@ -345,25 +351,15 @@ Node management is similar to pageserver:
Safekeeper deploy scripts should register safekeeper at storage_contorller as
they currently do with cplane, under the same id.
Timeline creation/deletion: already existing POST `tenant/:tenant_id/timeline`
would 1) choose initial set of safekeepers; 2) write to the db initial
`Configuration` with `INSERT ON CONFLICT DO NOTHING` returning existing row in
case of conflict; 3) create timeline on the majority of safekeepers (already
created is ok).
Timeline creation/deletion will work through already existing POST and DELETE
`tenant/:tenant_id/timeline`. Cplane is expected to retry both until they
succeed. See next section on the implementation details.
We don't want to block timeline creation when one safekeeper is down. Currently
this is solved by compute implicitly creating timeline on any safekeeper it is
connected to. This creates ugly timeline state on safekeeper when timeline is
created, but start LSN is not defined yet. It would be nice to remove this; to
do that, controller can in the background retry to create timeline on
safekeeper(s) which missed that during initial creation call. It can do that
through `pull_timeline` from majority so it doesn't need to remember
`parent_lsn` in its db.
Timeline deletion removes the row from the db and forwards deletion to the
current configuration members. Without additional actions deletions might leak,
see below on this; initially let's ignore these, reporting to cplane success if
at least one safekeeper deleted the timeline (this will remove s3 data).
We don't want to block timeline creation/deletion when one safekeeper is down.
Currently this is crutched by compute implicitly creating timeline on any
safekeeper it is connected to. This creates ugly timeline state on safekeeper
when timeline is created, but start LSN is not defined yet. Next section
describes dealing with this.
Tenant deletion repeats timeline deletion for all timelines.
@@ -395,26 +391,6 @@ Similar call should be added for the tenant.
It would be great to have some way of subscribing to the results (apart from
looking at logs/metrics).
Migration is executed as described above. One subtlety is that (local) deletion on
source safekeeper might fail, which is not a problem if we are going to
decomission the node but leaves garbage otherwise. I'd propose in the first version
1) Don't attempt deletion at all if node status is `offline`.
2) If it failed, just issue warning.
And add PUT `/control/v1/safekeepers/:node_id/scrub` endpoint which would find and
remove garbage timelines for manual use. It will 1) list all timelines on the
safekeeper 2) compare each one against configuration storage: if timeline
doesn't exist at all (had been deleted), it can be deleted. Otherwise, it can
be deleted under generation number if node is not member of current generation.
Automating this is untrivial; we'd need to register all potential missing
deletions <tenant_id, timeline_id, generation, node_id> in the same transaction
which switches configurations. Similarly when timeline is fully deleted to
prevent cplane operation from blocking when some safekeeper is not available
deletion should be also registered.
One more task pool should infinitely retry notifying control plane about changed
safekeeper sets.
3) GET `/control/v1/tenant/:tenant_id/timeline/:timeline_id/` should return
current in memory state of the timeline and pending `MigrationRequest`,
if any.
@@ -423,12 +399,153 @@ safekeeper sets.
migration by switching configuration from the joint to the one with (previous) `sk_set` under CAS
(incrementing generation as always).
#### API implementation and reconciliation
For timeline creation/deletion we want to preserve the basic assumption that
unreachable minority (1 sk of 3) doesn't block their completion, but eventually
we want to finish creation/deletion on nodes which missed it (unless they are
removed). Similarly for migration; it may and should finish even though excluded
members missed their exclusion. And of course e.g. such pending exclusion on
node C after migration ABC -> ABD must not prevent next migration ABD -> ABE. As
another example, if some node missed timeline creation it clearly must not block
migration from it. Hence it is natural to have per safekeeper background
reconciler which retries these ops until they succeed. There are 3 possible
operation types, and the type is defined by timeline state (membership
configuration and whether it is deleted) and safekeeper id: we may need to
create timeline on sk (node added), locally delete it (node excluded, somewhat
similar to detach) or globally delete it (timeline is deleted).
Next, on storage controller restart in principle these pending operations can be
figured out by comparing safekeepers state against storcon state. But it seems
better to me to materialize them in the database; it is not expensive, avoids
these startup scans which themselves can fail etc and makes it very easy to see
outstanding work directly at the source of truth -- the db. So we can add table
`safekeeper_timeline_pending_ops`
```
table! {
// timeline_id, sk_id is primary key
safekeeper_timeline_pending_ops (sk_id, tenant_id, timeline_id) {
sk_id -> int8,
tenant_id -> Varchar,
timeline_id -> Varchar,
generation -> Int4,
op_type -> Varchar,
}
}
```
`op_type` can be `include` (seed from peers and ensure generation is up to
date), `exclude` (remove locally) and `delete`. Field is actually not strictly
needed as it can be computed from current configuration, but gives more explicit
observability.
`generation` is necessary there because after op is done reconciler must remove
it and not remove another row with higher gen which in theory might appear.
Any insert of row should overwrite (remove) all rows with the same sk and
timeline id but lower `generation` as next op makes previous obsolete. Insertion
of `op_type` `delete` overwrites all rows.
About `exclude`: rather than adding explicit safekeeper http endpoint, it is
reasonable to reuse membership switch endpoint: if safekeeper is not member
of the configuration it locally removes the timeline on the switch. In this case
404 should also be considered an 'ok' answer by the caller.
So, main loop of per sk reconcile reads `safekeeper_timeline_pending_ops`
joined with timeline configuration to get current conf (with generation `n`)
for the safekeeper and does the jobs, infinitely retrying failures:
1) If node is member (`include`):
- Check if timeline exists on it, if not, call pull_timeline on it from
other members
- Call switch configuration to the current
2) If node is not member (`exclude`):
- Call switch configuration to the current, 404 is ok.
3) If timeline is deleted (`delete`), call delete.
In cases 1 and 2 remove `safekeeper_timeline_pending_ops` for the sk and
timeline with generation <= `n` if `op_type` is not `delete`.
In case 3 also remove `safekeeper_timeline_pending_ops`
entry + remove `timelines` entry if there is nothing left in `safekeeper_timeline_pending_ops` for the timeline.
Let's consider in details how APIs can be implemented from this angle.
Timeline creation. It is assumed that cplane retries it until success, so all
actions must be idempotent. Now, a tricky point here is timeline start LSN. For
the initial (tenant creation) call cplane doesn't know it. However, setting
start_lsn on safekeepers during creation is a good thing -- it provides a
guarantee that walproposer can always find a common point in WAL histories of
safekeeper and its own, and so absense of it would be a clear sign of
corruption. The following sequence works:
1) Create timeline (or observe that it exists) on pageserver,
figuring out last_record_lsn in response.
2) Choose safekeepers and insert (ON CONFLICT DO NOTHING) timeline row into the
db. Note that last_record_lsn returned on the previous step is movable as it
changes once ingestion starts, insert must not overwrite it (as well as other
fields like membership conf). On the contrary, start_lsn used in the next
step must be set to the value in the db. cplane_notified_generation can be set
to 1 (initial generation) in insert to avoid notifying cplane about initial
conf as cplane will receive it in timeline creation request anyway.
3) Issue timeline creation calls to at least majority of safekeepers. Using
majority here is not necessary but handy because it guarantees that any live
majority will have at least one sk with created timeline and so
reconciliation task can use pull_timeline shared with migration instead of
create timeline special init case. OFC if timeline is already exists call is
ignored.
4) For minority of safekeepers which could have missed creation insert
entries to `safekeeper_timeline_pending_ops`. We won't miss this insertion
because response to cplane is sent only after it has happened, and cplane
retries the call until 200 response.
There is a small question how request handler (timeline creation in this
case) would interact with per sk reconciler. As always I prefer to do the
simplest possible thing and here it seems to be just waking it up so it
re-reads the db for work to do. Passing work in memory is faster, but
that shouldn't matter, and path to scan db for work will exist anyway,
simpler to reuse it.
For pg version / wal segment size: while we may persist them in `timelines`
table, it is not necessary as initial creation at step 3 can take them from
pageserver or cplane creation call and later pull_timeline will carry them
around.
Timeline migration.
1) CAS to the db to create joint conf, and in the same transaction create
`safekeeper_timeline_pending_ops` `include` entries to initialize new members
as well as deliver this conf to current ones; poke per sk reconcilers to work
on it. Also any conf change should also poke cplane notifier task(s).
2) Once it becomes possible per alg description above, get out of joint conf
with another CAS. Task should get wakeups from per sk reconcilers because
conf switch is required for advancement; however retries should be sleep
based as well as LSN advancement might be needed, though in happy path
it isn't. To see whether further transition is possible on wakup migration
executor polls safekeepers per the algorithm. CAS creating new conf with only
new members should again insert entries to `safekeeper_timeline_pending_ops`
to switch them there, as well as `exclude` rows to remove timeline from
old members.
Timeline deletion: just set `deleted_at` on the timeline row and insert
`safekeeper_timeline_pending_ops` entries in the same xact, the rest is done by
per sk reconcilers.
When node is removed (set to `decomissioned`), `safekeeper_timeline_pending_ops`
for it must be cleared in the same transaction.
One more task pool should infinitely retry notifying control plane about changed
safekeeper sets (trying making `cplane_notified_generation` equal `generation`).
#### Dealing with multiple instances of storage_controller
Operations described above executed concurrently might create some errors but do
not prevent progress, so while we normally don't want to run multiple instances
of storage_controller it is fine to have it temporarily, e.g. during redeploy.
To harden against some controller instance creating some work in
`safekeeper_timeline_pending_ops` and then disappearing without anyone pickup up
the job per sk reconcilers apart from explicit wakups should scan for work
periodically. It is possible to remove that though if all db updates are
protected with leadership token/term -- then such scans are needed only after
leadership is acquired.
Any interactions with db update in-memory controller state, e.g. if migration
request failed because different one is in progress, controller remembers that
and tries to finish it.
@@ -545,7 +662,7 @@ Aurora does this but similarly I don't think this is needed.
We should use Compute <-> safekeeper protocol change to include other (long
yearned) modifications:
- send data in network order to make arm work.
- send data in network order without putting whole structs to be arch independent
- remove term_start_lsn from AppendRequest
- add horizon to TermHistory
- add to ProposerGreeting number of connection from this wp to sk

View File

@@ -94,6 +94,7 @@ pub struct ConfigToml {
pub ondemand_download_behavior_treat_error_as_warn: bool,
#[serde(with = "humantime_serde")]
pub background_task_maximum_delay: Duration,
pub use_compaction_semaphore: bool,
pub control_plane_api: Option<reqwest::Url>,
pub control_plane_api_token: Option<String>,
pub control_plane_emergency_mode: bool,
@@ -470,6 +471,7 @@ impl Default for ConfigToml {
DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY,
)
.unwrap()),
use_compaction_semaphore: false,
control_plane_api: (None),
control_plane_api_token: (None),

View File

@@ -140,6 +140,10 @@ pub struct PageServerConf {
/// not terrible.
pub background_task_maximum_delay: Duration,
/// If true, use a separate semaphore for compaction tasks instead of the common background task
/// semaphore. Defaults to false.
pub use_compaction_semaphore: bool,
pub control_plane_api: Option<Url>,
/// JWT token for use with the control plane API.
@@ -332,6 +336,7 @@ impl PageServerConf {
test_remote_failures,
ondemand_download_behavior_treat_error_as_warn,
background_task_maximum_delay,
use_compaction_semaphore,
control_plane_api,
control_plane_api_token,
control_plane_emergency_mode,
@@ -385,6 +390,7 @@ impl PageServerConf {
test_remote_failures,
ondemand_download_behavior_treat_error_as_warn,
background_task_maximum_delay,
use_compaction_semaphore,
control_plane_api,
control_plane_emergency_mode,
heatmap_upload_concurrency,

View File

@@ -8,7 +8,6 @@ use std::time::Duration;
use crate::controller_upcall_client::ControlPlaneGenerationsApi;
use crate::metrics;
use crate::tenant::remote_timeline_client::remote_layer_path;
use crate::tenant::remote_timeline_client::remote_timeline_path;
use crate::tenant::remote_timeline_client::LayerFileMetadata;
use crate::virtual_file::MaybeFatalIo;
@@ -463,45 +462,18 @@ impl DeletionQueueClient {
///
/// The `current_generation` is the generation of this pageserver's current attachment. The
/// generations in `layers` are the generations in which those layers were written.
pub(crate) async fn push_layers(
pub(crate) fn push_layers(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
current_generation: Generation,
layers: Vec<(LayerName, LayerFileMetadata)>,
) -> Result<(), DeletionQueueError> {
if current_generation.is_none() {
debug!("Enqueuing deletions in legacy mode, skipping queue");
// None generations are not valid for attached tenants: they must always be attached in
// a known generation. None generations are still permitted for layers in the index because
// they may be historical.
assert!(!current_generation.is_none());
let mut layer_paths = Vec::new();
for (layer, meta) in layers {
layer_paths.push(remote_layer_path(
&tenant_shard_id.tenant_id,
&timeline_id,
meta.shard,
&layer,
meta.generation,
));
}
self.push_immediate(layer_paths).await?;
return self.flush_immediate().await;
}
self.push_layers_sync(tenant_shard_id, timeline_id, current_generation, layers)
}
/// When a Tenant has a generation, push_layers is always synchronous because
/// the ListValidator channel is an unbounded channel.
///
/// This can be merged into push_layers when we remove the Generation-less mode
/// support (`<https://github.com/neondatabase/neon/issues/5395>`)
pub(crate) fn push_layers_sync(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
current_generation: Generation,
layers: Vec<(LayerName, LayerFileMetadata)>,
) -> Result<(), DeletionQueueError> {
metrics::DELETION_QUEUE
.keys_submitted
.inc_by(layers.len() as u64);
@@ -957,14 +929,12 @@ mod test {
// File should still be there after we push it to the queue (we haven't pushed enough to flush anything)
info!("Pushing");
client
.push_layers(
tenant_shard_id,
TIMELINE_ID,
now_generation,
[(layer_file_name_1.clone(), layer_metadata)].to_vec(),
)
.await?;
client.push_layers(
tenant_shard_id,
TIMELINE_ID,
now_generation,
[(layer_file_name_1.clone(), layer_metadata)].to_vec(),
)?;
assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
assert_local_files(&[], &deletion_prefix);
@@ -1017,14 +987,12 @@ mod test {
assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
tracing::debug!("Pushing...");
client
.push_layers(
tenant_shard_id,
TIMELINE_ID,
stale_generation,
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
)
.await?;
client.push_layers(
tenant_shard_id,
TIMELINE_ID,
stale_generation,
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
)?;
// We enqueued the operation in a stale generation: it should have failed validation
tracing::debug!("Flushing...");
@@ -1032,14 +1000,12 @@ mod test {
assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
tracing::debug!("Pushing...");
client
.push_layers(
tenant_shard_id,
TIMELINE_ID,
latest_generation,
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
)
.await?;
client.push_layers(
tenant_shard_id,
TIMELINE_ID,
latest_generation,
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
)?;
// We enqueued the operation in a fresh generation: it should have passed validation
tracing::debug!("Flushing...");
@@ -1074,28 +1040,24 @@ mod test {
// generation gets that treatment)
let remote_layer_file_name_historical =
ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?;
client
.push_layers(
tenant_shard_id,
TIMELINE_ID,
now_generation.previous(),
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
)
.await?;
client.push_layers(
tenant_shard_id,
TIMELINE_ID,
now_generation.previous(),
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
)?;
// Inject a deletion in the generation before generation_now: after restart,
// this deletion should get executed, because we execute deletions in the
// immediately previous generation on the same node.
let remote_layer_file_name_previous =
ctx.write_remote_layer(EXAMPLE_LAYER_NAME_ALT, layer_generation)?;
client
.push_layers(
tenant_shard_id,
TIMELINE_ID,
now_generation,
[(EXAMPLE_LAYER_NAME_ALT.clone(), layer_metadata.clone())].to_vec(),
)
.await?;
client.push_layers(
tenant_shard_id,
TIMELINE_ID,
now_generation,
[(EXAMPLE_LAYER_NAME_ALT.clone(), layer_metadata.clone())].to_vec(),
)?;
client.flush().await?;
assert_remote_files(
@@ -1139,6 +1101,7 @@ pub(crate) mod mock {
use tracing::info;
use super::*;
use crate::tenant::remote_timeline_client::remote_layer_path;
use std::sync::atomic::{AtomicUsize, Ordering};
pub struct ConsumerState {

View File

@@ -61,6 +61,7 @@ use crate::{
remote_timeline_client::LayerFileMetadata,
secondary::SecondaryTenant,
storage_layer::{AsLayerDesc, EvictionError, Layer, LayerName, LayerVisibilityHint},
tasks::sleep_random,
},
CancellableTask, DiskUsageEvictionTask,
};
@@ -210,14 +211,8 @@ async fn disk_usage_eviction_task(
info!("disk usage based eviction task finishing");
};
use crate::tenant::tasks::random_init_delay;
{
if random_init_delay(task_config.period, &cancel)
.await
.is_err()
{
return;
}
if sleep_random(task_config.period, &cancel).await.is_err() {
return;
}
let mut iteration_no = 0;

View File

@@ -489,7 +489,6 @@ impl timeline::handle::TenantManager<TenantManagerTypes> for TenantManagerWrappe
let timeline = tenant_shard
.get_timeline(timeline_id, true)
.map_err(GetActiveTimelineError::Timeline)?;
set_tracing_field_shard_id(&timeline);
Ok(timeline)
}
}
@@ -774,11 +773,11 @@ impl PageServerHandler {
let batched_msg = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {
let span = tracing::info_span!(parent: parent_span, "handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn);
let shard = timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
let span = tracing::info_span!(parent: &parent_span, "handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
let timer = record_op_start_and_throttle(
&shard,
metrics::SmgrQueryType::GetRelExists,
@@ -793,11 +792,10 @@ impl PageServerHandler {
}
}
PagestreamFeMessage::Nblocks(req) => {
let span = tracing::info_span!(parent: parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn);
let shard = timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
let span = tracing::info_span!(parent: &parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
let timer = record_op_start_and_throttle(
&shard,
metrics::SmgrQueryType::GetRelSize,
@@ -812,11 +810,10 @@ impl PageServerHandler {
}
}
PagestreamFeMessage::DbSize(req) => {
let span = tracing::info_span!(parent: parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.hdr.request_lsn);
let shard = timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
let span = tracing::info_span!(parent: &parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
let timer = record_op_start_and_throttle(
&shard,
metrics::SmgrQueryType::GetDbSize,
@@ -831,11 +828,10 @@ impl PageServerHandler {
}
}
PagestreamFeMessage::GetSlruSegment(req) => {
let span = tracing::info_span!(parent: parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.hdr.request_lsn);
let shard = timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
let span = tracing::info_span!(parent: &parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
let timer = record_op_start_and_throttle(
&shard,
metrics::SmgrQueryType::GetSlruSegment,
@@ -850,12 +846,20 @@ impl PageServerHandler {
}
}
PagestreamFeMessage::GetPage(req) => {
let span = tracing::info_span!(parent: parent_span, "handle_get_page_at_lsn_request_batched", req_lsn = %req.hdr.request_lsn);
// avoid a somewhat costly Span::record() by constructing the entire span in one go.
macro_rules! mkspan {
(before shard routing) => {{
tracing::info_span!(parent: &parent_span, "handle_get_page_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.hdr.request_lsn)
}};
($shard_id:expr) => {{
tracing::info_span!(parent: &parent_span, "handle_get_page_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.hdr.request_lsn, shard_id = %$shard_id)
}};
}
macro_rules! respond_error {
($error:expr) => {{
($span:expr, $error:expr) => {{
let error = BatchedFeMessage::RespondError {
span,
span: $span,
error: BatchedPageStreamError {
req: req.hdr,
err: $error,
@@ -868,27 +872,35 @@ impl PageServerHandler {
let key = rel_block_to_key(req.rel, req.blkno);
let shard = match timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Page(key))
.instrument(span.clone()) // sets `shard_id` field
.await
{
Ok(tl) => tl,
Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
// We already know this tenant exists in general, because we resolved it at
// start of connection. Getting a NotFound here indicates that the shard containing
// the requested page is not present on this node: the client's knowledge of shard->pageserver
// mapping is out of date.
//
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
// and talk to a different pageserver.
return respond_error!(PageStreamError::Reconnect(
"getpage@lsn request routed to wrong shard".into()
));
}
Err(e) => {
return respond_error!(e.into());
let span = mkspan!(before shard routing);
match e {
GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_)) => {
// We already know this tenant exists in general, because we resolved it at
// start of connection. Getting a NotFound here indicates that the shard containing
// the requested page is not present on this node: the client's knowledge of shard->pageserver
// mapping is out of date.
//
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
// and talk to a different pageserver.
return respond_error!(
span,
PageStreamError::Reconnect(
"getpage@lsn request routed to wrong shard".into()
)
);
}
e => {
return respond_error!(span, e.into());
}
}
}
};
let span = mkspan!(shard.tenant_shard_id.shard_slug());
let timer = record_op_start_and_throttle(
&shard,
@@ -910,7 +922,7 @@ impl PageServerHandler {
{
Ok(lsn) => lsn,
Err(e) => {
return respond_error!(e);
return respond_error!(span, e);
}
};
BatchedFeMessage::GetPage {
@@ -922,11 +934,10 @@ impl PageServerHandler {
}
#[cfg(feature = "testing")]
PagestreamFeMessage::Test(req) => {
let span = tracing::info_span!(parent: parent_span, "handle_test_request");
let shard = timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
let span = tracing::info_span!(parent: &parent_span, "handle_test_request", shard_id = %shard.tenant_shard_id.shard_slug());
let timer =
record_op_start_and_throttle(&shard, metrics::SmgrQueryType::Test, received_at)
.await?;
@@ -1190,6 +1201,29 @@ impl PageServerHandler {
}
};
// We purposefully don't count flush time into the smgr operaiton timer.
//
// The reason is that current compute client will not perform protocol processing
// if the postgres backend process is doing things other than `->smgr_read()`.
// This is especially the case for prefetch.
//
// If the compute doesn't read from the connection, eventually TCP will backpressure
// all the way into our flush call below.
//
// The timer's underlying metric is used for a storage-internal latency SLO and
// we don't want to include latency in it that we can't control.
// And as pointed out above, in this case, we don't control the time that flush will take.
//
// We put each response in the batch onto the wire in a separate pgb_writer.flush()
// call, which (all unmeasured) adds syscall overhead but reduces time to first byte
// and avoids building up a "giant" contiguous userspace buffer to hold the entire response.
// TODO: vectored socket IO would be great, but pgb_writer doesn't support that.
//
// Since we're flushing multiple times in the loop, but only have access to the per-op
// timers inside the loop, we capture the flush start time here and reuse it to finish
// each op timer.
let flushing_start_time = Instant::now();
// Map handler result to protocol behavior.
// Some handler errors cause exit from pagestream protocol.
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
@@ -1238,21 +1272,9 @@ impl PageServerHandler {
&response_msg.serialize(protocol_version),
))?;
// We purposefully don't count flush time into the timer.
//
// The reason is that current compute client will not perform protocol processing
// if the postgres backend process is doing things other than `->smgr_read()`.
// This is especially the case for prefetch.
//
// If the compute doesn't read from the connection, eventually TCP will backpressure
// all the way into our flush call below.
//
// The timer's underlying metric is used for a storage-internal latency SLO and
// we don't want to include latency in it that we can't control.
// And as pointed out above, in this case, we don't control the time that flush will take.
let flushing_timer = timer.map(|mut timer| {
timer
.observe_execution_end_flush_start(Instant::now())
.observe_execution_end_flush_start(flushing_start_time)
.expect("we are the first caller")
});
@@ -1340,7 +1362,7 @@ impl PageServerHandler {
.take()
.expect("implementation error: timeline_handles should not be locked");
let request_span = info_span!("request", shard_id = tracing::field::Empty);
let request_span = info_span!("request");
let ((pgb_reader, timeline_handles), result) = match self.pipelining_config.clone() {
PageServicePipeliningConfig::Pipelined(pipelining_config) => {
self.handle_pagerequests_pipelined(
@@ -2034,6 +2056,7 @@ impl PageServerHandler {
.unwrap()
.get(tenant_id, timeline_id, ShardSelector::Zero)
.await?;
set_tracing_field_shard_id(&timeline);
if timeline.is_archived() == Some(true) {
// TODO after a grace period, turn this log line into a hard error

View File

@@ -328,8 +328,8 @@ pub enum TaskKind {
// Eviction. One per timeline.
Eviction,
// Ingest housekeeping (flushing ephemeral layers on time threshold or disk pressure)
IngestHousekeeping,
// Tenant housekeeping (flush idle ephemeral layers, shut down idle walredo, etc.).
TenantHousekeeping,
/// See [`crate::disk_usage_eviction_task`].
DiskUsageEviction,

View File

@@ -20,6 +20,7 @@ use chrono::NaiveDateTime;
use enumset::EnumSet;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use itertools::Itertools as _;
use pageserver_api::models;
use pageserver_api::models::CompactInfoResponse;
use pageserver_api::models::LsnLease;
@@ -3088,32 +3089,28 @@ impl Tenant {
Ok(rx)
}
// Call through to all timelines to freeze ephemeral layers if needed. Usually
// this happens during ingest: this background housekeeping is for freezing layers
// that are open but haven't been written to for some time.
async fn ingest_housekeeping(&self) {
// Scan through the hashmap and collect a list of all the timelines,
// while holding the lock. Then drop the lock and actually perform the
// compactions. We don't want to block everything else while the
// compaction runs.
let timelines = {
self.timelines
.lock()
.unwrap()
.values()
.filter_map(|timeline| {
if timeline.is_active() {
Some(timeline.clone())
} else {
None
}
})
.collect::<Vec<_>>()
};
/// Performs periodic housekeeping, via the tenant housekeeping background task.
async fn housekeeping(&self) {
// Call through to all timelines to freeze ephemeral layers as needed. This usually happens
// during ingest, but we don't want idle timelines to hold open layers for too long.
let timelines = self
.timelines
.lock()
.unwrap()
.values()
.filter(|tli| tli.is_active())
.cloned()
.collect_vec();
for timeline in &timelines {
for timeline in timelines {
timeline.maybe_freeze_ephemeral_layer().await;
}
// Shut down walredo if idle.
const WALREDO_IDLE_TIMEOUT: Duration = Duration::from_secs(180);
if let Some(ref walredo_mgr) = self.walredo_mgr {
walredo_mgr.maybe_quiesce(WALREDO_IDLE_TIMEOUT);
}
}
pub fn timeline_has_no_attached_children(&self, timeline_id: TimelineId) -> bool {

View File

@@ -517,7 +517,7 @@ impl RemoteTimelineClient {
if let Ok(queue) = queue_locked.initialized_mut() {
let blocked_deletions = std::mem::take(&mut queue.blocked_deletions);
for d in blocked_deletions {
if let Err(e) = self.deletion_queue_client.push_layers_sync(
if let Err(e) = self.deletion_queue_client.push_layers(
self.tenant_shard_id,
self.timeline_id,
self.generation,
@@ -2151,7 +2151,6 @@ impl RemoteTimelineClient {
self.generation,
delete.layers.clone(),
)
.await
.map_err(|e| anyhow::anyhow!(e))
}
}

View File

@@ -1,48 +1,64 @@
//! This module contains functions to serve per-tenant background processes,
//! such as compaction and GC
//! This module contains per-tenant background processes, e.g. compaction and GC.
use std::ops::ControlFlow;
use std::str::FromStr;
use std::cmp::max;
use std::future::Future;
use std::ops::{ControlFlow, RangeInclusive};
use std::pin::pin;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use once_cell::sync::Lazy;
use rand::Rng;
use scopeguard::defer;
use tokio::sync::{Semaphore, SemaphorePermit};
use tokio_util::sync::CancellationToken;
use tracing::*;
use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics::{BackgroundLoopSemaphoreMetricsRecorder, TENANT_TASK_EVENTS};
use crate::task_mgr;
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME, TOKIO_WORKER_THREADS};
use crate::tenant::throttle::Stats;
use crate::tenant::timeline::compaction::CompactionOutcome;
use crate::tenant::timeline::CompactionError;
use crate::tenant::{Tenant, TenantState};
use once_cell::sync::Lazy;
use rand::Rng;
use tokio_util::sync::CancellationToken;
use tracing::*;
use pageserver_api::config::tenant_conf_defaults::DEFAULT_COMPACTION_PERIOD;
use utils::completion::Barrier;
use utils::rate_limit::RateLimit;
use utils::{backoff, completion, pausable_failpoint};
use utils::{backoff, pausable_failpoint};
static CONCURRENT_BACKGROUND_TASKS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
once_cell::sync::Lazy::new(|| {
let total_threads = task_mgr::TOKIO_WORKER_THREADS.get();
let permits = usize::max(
1,
// while a lot of the work is done on spawn_blocking, we still do
// repartitioning in the async context. this should give leave us some workers
// unblocked to be blocked on other work, hopefully easing any outside visible
// effects of restarts.
//
// 6/8 is a guess; previously we ran with unlimited 8 and more from
// spawn_blocking.
(total_threads * 3).checked_div(4).unwrap_or(0),
);
assert_ne!(permits, 0, "we will not be adding in permits later");
assert!(
permits < total_threads,
"need threads avail for shorter work"
);
tokio::sync::Semaphore::new(permits)
});
/// Semaphore limiting concurrent background tasks (across all tenants).
///
/// We use 3/4 Tokio threads, to avoid blocking all threads in case we do any CPU-heavy work.
static CONCURRENT_BACKGROUND_TASKS: Lazy<Semaphore> = Lazy::new(|| {
let total_threads = TOKIO_WORKER_THREADS.get();
let permits = max(1, (total_threads * 3).checked_div(4).unwrap_or(0));
assert_ne!(permits, 0, "we will not be adding in permits later");
assert!(permits < total_threads, "need threads for other work");
Semaphore::new(permits)
});
/// Semaphore limiting concurrent compaction tasks (across all tenants). This is disabled by
/// default, see `use_compaction_semaphore`.
///
/// We use 3/4 Tokio threads, to avoid blocking all threads in case we do any CPU-heavy work.
///
/// This is a separate semaphore from background tasks, because L0 compaction needs to be responsive
/// to avoid high read amp during heavy write workloads.
///
/// TODO: split image compaction and L0 compaction, and move image compaction to background tasks.
/// Only L0 compaction needs to be responsive, and it shouldn't block on image compaction.
static CONCURRENT_COMPACTION_TASKS: Lazy<Semaphore> = Lazy::new(|| {
let total_threads = TOKIO_WORKER_THREADS.get();
let permits = max(1, (total_threads * 3).checked_div(4).unwrap_or(0));
assert_ne!(permits, 0, "we will not be adding in permits later");
assert!(permits < total_threads, "need threads for other work");
Semaphore::new(permits)
});
/// Background jobs.
///
/// NB: not all of these acquire a CONCURRENT_BACKGROUND_TASKS semaphore permit, only the ones that
/// do any significant IO.
#[derive(
Debug,
PartialEq,
@@ -58,7 +74,7 @@ pub(crate) enum BackgroundLoopKind {
Compaction,
Gc,
Eviction,
IngestHouseKeeping,
TenantHouseKeeping,
ConsumptionMetricsCollectMetrics,
ConsumptionMetricsSyntheticSizeWorker,
InitialLogicalSizeCalculation,
@@ -67,13 +83,14 @@ pub(crate) enum BackgroundLoopKind {
}
pub struct BackgroundLoopSemaphorePermit<'a> {
_permit: tokio::sync::SemaphorePermit<'static>,
_permit: SemaphorePermit<'static>,
_recorder: BackgroundLoopSemaphoreMetricsRecorder<'a>,
}
/// Cancellation safe.
pub(crate) async fn concurrent_background_tasks_rate_limit_permit(
/// Acquires a semaphore permit, to limit concurrent background jobs.
pub(crate) async fn acquire_concurrency_permit(
loop_kind: BackgroundLoopKind,
use_compaction_semaphore: bool,
_ctx: &RequestContext,
) -> BackgroundLoopSemaphorePermit<'static> {
// TODO: use a lower threshold and remove the pacer once we resolve some blockage.
@@ -88,10 +105,13 @@ pub(crate) async fn concurrent_background_tasks_rate_limit_permit(
}
// TODO: assert that we run on BACKGROUND_RUNTIME; requires tokio_unstable Handle::id();
let permit = CONCURRENT_BACKGROUND_TASKS
.acquire()
.await
.expect("should never close");
let permit = if loop_kind == BackgroundLoopKind::Compaction && use_compaction_semaphore {
CONCURRENT_COMPACTION_TASKS.acquire().await
} else {
assert!(!use_compaction_semaphore);
CONCURRENT_BACKGROUND_TASKS.acquire().await
}
.expect("should never close");
let waited = recorder.acquired();
if waited >= WARN_THRESHOLD {
@@ -108,12 +128,10 @@ pub(crate) async fn concurrent_background_tasks_rate_limit_permit(
}
}
/// Start per tenant background loops: compaction and gc.
pub fn start_background_loops(
tenant: &Arc<Tenant>,
background_jobs_can_start: Option<&completion::Barrier>,
) {
/// Start per tenant background loops: compaction, GC, and ingest housekeeping.
pub fn start_background_loops(tenant: &Arc<Tenant>, can_start: Option<&Barrier>) {
let tenant_shard_id = tenant.tenant_shard_id;
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::Compaction,
@@ -122,13 +140,15 @@ pub fn start_background_loops(
&format!("compactor for tenant {tenant_shard_id}"),
{
let tenant = Arc::clone(tenant);
let background_jobs_can_start = background_jobs_can_start.cloned();
let can_start = can_start.cloned();
async move {
let cancel = task_mgr::shutdown_token();
let cancel = task_mgr::shutdown_token(); // NB: must be in async context
tokio::select! {
_ = cancel.cancelled() => { return Ok(()) },
_ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
_ = cancel.cancelled() => return Ok(()),
_ = Barrier::maybe_wait(can_start) => {}
};
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
compaction_loop(tenant, cancel)
// If you rename this span, change the RUST_LOG env variable in test_runner/performance/test_branch_creation.py
.instrument(info_span!("compaction_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
@@ -137,6 +157,7 @@ pub fn start_background_loops(
}
},
);
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::GarbageCollector,
@@ -145,13 +166,15 @@ pub fn start_background_loops(
&format!("garbage collector for tenant {tenant_shard_id}"),
{
let tenant = Arc::clone(tenant);
let background_jobs_can_start = background_jobs_can_start.cloned();
let can_start = can_start.cloned();
async move {
let cancel = task_mgr::shutdown_token();
let cancel = task_mgr::shutdown_token(); // NB: must be in async context
tokio::select! {
_ = cancel.cancelled() => { return Ok(()) },
_ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
_ = cancel.cancelled() => return Ok(()),
_ = Barrier::maybe_wait(can_start) => {}
};
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
gc_loop(tenant, cancel)
.instrument(info_span!("gc_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
.await;
@@ -162,21 +185,23 @@ pub fn start_background_loops(
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::IngestHousekeeping,
TaskKind::TenantHousekeeping,
tenant_shard_id,
None,
&format!("ingest housekeeping for tenant {tenant_shard_id}"),
&format!("housekeeping for tenant {tenant_shard_id}"),
{
let tenant = Arc::clone(tenant);
let background_jobs_can_start = background_jobs_can_start.cloned();
let can_start = can_start.cloned();
async move {
let cancel = task_mgr::shutdown_token();
let cancel = task_mgr::shutdown_token(); // NB: must be in async context
tokio::select! {
_ = cancel.cancelled() => { return Ok(()) },
_ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
_ = cancel.cancelled() => return Ok(()),
_ = Barrier::maybe_wait(can_start) => {}
};
ingest_housekeeping_loop(tenant, cancel)
.instrument(info_span!("ingest_housekeeping_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
tenant_housekeeping_loop(tenant, cancel)
.instrument(info_span!("tenant_housekeeping_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
.await;
Ok(())
}
@@ -184,372 +209,293 @@ pub fn start_background_loops(
);
}
///
/// Compaction task's main loop
///
/// Compaction task's main loop.
async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
const MAX_BACKOFF_SECS: f64 = 300.0;
// How many errors we have seen consequtively
let mut error_run_count = 0;
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
let mut first = true;
loop {
tokio::select! {
_ = cancel.cancelled() => {
return;
},
tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
ControlFlow::Break(()) => return,
ControlFlow::Continue(()) => (),
},
}
let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
let mut first = true;
let mut error_run = 0; // consecutive errors
let period = tenant.get_compaction_period();
loop {
if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
return;
}
// TODO: we shouldn't need to await to find tenant and this could be moved outside of
// loop, #3501. There are also additional "allowed_errors" in tests.
if first {
first = false;
if random_init_delay(period, &cancel).await.is_err() {
break;
}
}
let period = tenant.get_compaction_period();
let sleep_duration;
if period == Duration::ZERO {
#[cfg(not(feature = "testing"))]
info!("automatic compaction is disabled");
// check again in 10 seconds, in case it's been enabled again.
sleep_duration = Duration::from_secs(10)
} else {
let iteration = Iteration {
started_at: Instant::now(),
period,
kind: BackgroundLoopKind::Compaction,
};
// Run compaction
let IterationResult { output, elapsed } = iteration
.run(tenant.compaction_iteration(&cancel, &ctx))
.await;
match output {
Ok(outcome) => {
error_run_count = 0;
// schedule the next compaction immediately in case there is a pending compaction task
sleep_duration = if let CompactionOutcome::Pending = outcome {
Duration::from_secs(1)
} else {
period
};
}
Err(e) => {
let wait_duration = backoff::exponential_backoff_duration_seconds(
error_run_count + 1,
1.0,
MAX_BACKOFF_SECS,
);
error_run_count += 1;
let wait_duration = Duration::from_secs_f64(wait_duration);
log_compaction_error(
&e,
error_run_count,
&wait_duration,
cancel.is_cancelled(),
);
sleep_duration = wait_duration;
}
}
// the duration is recorded by performance tests by enabling debug in this function
tracing::debug!(
elapsed_ms = elapsed.as_millis(),
"compaction iteration complete"
);
};
// Perhaps we did no work and the walredo process has been idle for some time:
// give it a chance to shut down to avoid leaving walredo process running indefinitely.
// TODO: move this to a separate task (housekeeping loop) that isn't affected by the back-off,
// so we get some upper bound guarantee on when walredo quiesce / this throttling reporting here happens.
if let Some(walredo_mgr) = &tenant.walredo_mgr {
walredo_mgr.maybe_quiesce(period * 10);
}
// Sleep
if tokio::time::timeout(sleep_duration, cancel.cancelled())
.await
.is_ok()
{
// TODO: we shouldn't need to await to find tenant and this could be moved outside of
// loop, #3501. There are also additional "allowed_errors" in tests.
if first {
first = false;
if sleep_random(period, &cancel).await.is_err() {
break;
}
}
let sleep_duration;
if period == Duration::ZERO {
#[cfg(not(feature = "testing"))]
info!("automatic compaction is disabled");
// check again in 10 seconds, in case it's been enabled again.
sleep_duration = Duration::from_secs(10)
} else {
let iteration = Iteration {
started_at: Instant::now(),
period,
kind: BackgroundLoopKind::Compaction,
};
// Run compaction
let IterationResult { output, elapsed } = iteration
.run(tenant.compaction_iteration(&cancel, &ctx))
.await;
match output {
Ok(outcome) => {
error_run = 0;
// schedule the next compaction immediately in case there is a pending compaction task
sleep_duration = if let CompactionOutcome::Pending = outcome {
Duration::from_secs(1)
} else {
period
};
}
Err(err) => {
let wait_duration = backoff::exponential_backoff_duration_seconds(
error_run + 1,
1.0,
MAX_BACKOFF_SECS,
);
error_run += 1;
let wait_duration = Duration::from_secs_f64(wait_duration);
log_compaction_error(&err, error_run, &wait_duration, cancel.is_cancelled());
sleep_duration = wait_duration;
}
}
// the duration is recorded by performance tests by enabling debug in this function
debug!(
elapsed_ms = elapsed.as_millis(),
"compaction iteration complete"
);
};
// Sleep
if tokio::time::timeout(sleep_duration, cancel.cancelled())
.await
.is_ok()
{
break;
}
}
.await;
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
}
fn log_compaction_error(
e: &CompactionError,
error_run_count: u32,
sleep_duration: &std::time::Duration,
err: &CompactionError,
error_count: u32,
sleep_duration: &Duration,
task_cancelled: bool,
) {
use crate::tenant::upload_queue::NotInitialized;
use crate::tenant::PageReconstructError;
use CompactionError::*;
enum LooksLike {
Info,
Error,
}
let level = match err {
ShuttingDown => return,
Offload(_) => Level::ERROR,
_ if task_cancelled => Level::INFO,
Other(err) => {
let root_cause = err.root_cause();
let decision = match e {
ShuttingDown => None,
Offload(_) => Some(LooksLike::Error),
_ if task_cancelled => Some(LooksLike::Info),
Other(e) => {
let root_cause = e.root_cause();
let is_stopping = {
let upload_queue = root_cause
.downcast_ref::<NotInitialized>()
.is_some_and(|e| e.is_stopping());
let timeline = root_cause
.downcast_ref::<PageReconstructError>()
.is_some_and(|e| e.is_stopping());
upload_queue || timeline
};
let upload_queue = root_cause
.downcast_ref::<NotInitialized>()
.is_some_and(|e| e.is_stopping());
let timeline = root_cause
.downcast_ref::<PageReconstructError>()
.is_some_and(|e| e.is_stopping());
let is_stopping = upload_queue || timeline;
if is_stopping {
Some(LooksLike::Info)
Level::INFO
} else {
Some(LooksLike::Error)
Level::ERROR
}
}
};
match decision {
Some(LooksLike::Info) => info!(
"Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:#}",
),
Some(LooksLike::Error) => error!(
"Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:?}",
),
None => {}
match level {
Level::ERROR => {
error!("Compaction failed {error_count} times, retrying in {sleep_duration:?}: {err:#}")
}
Level::INFO => {
info!("Compaction failed {error_count} times, retrying in {sleep_duration:?}: {err:#}")
}
level => unimplemented!("unexpected level {level:?}"),
}
}
///
/// GC task's main loop
///
/// GC task's main loop.
async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
const MAX_BACKOFF_SECS: f64 = 300.0;
// How many errors we have seen consequtively
let mut error_run_count = 0;
let mut error_run = 0; // consecutive errors
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
// GC might require downloading, to find the cutoff LSN that corresponds to the
// cutoff specified as time.
let ctx =
RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
// GC might require downloading, to find the cutoff LSN that corresponds to the
// cutoff specified as time.
let ctx = RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
let mut first = true;
let mut first = true;
loop {
tokio::select! {
_ = cancel.cancelled() => {
return;
},
tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
ControlFlow::Break(()) => return,
ControlFlow::Continue(()) => (),
},
}
loop {
if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
return;
}
let period = tenant.get_gc_period();
let period = tenant.get_gc_period();
if first {
first = false;
let delays = async {
random_init_delay(period, &cancel).await?;
Ok::<_, Cancelled>(())
};
if delays.await.is_err() {
break;
}
}
let gc_horizon = tenant.get_gc_horizon();
let sleep_duration;
if period == Duration::ZERO || gc_horizon == 0 {
#[cfg(not(feature = "testing"))]
info!("automatic GC is disabled");
// check again in 10 seconds, in case it's been enabled again.
sleep_duration = Duration::from_secs(10);
} else {
let iteration = Iteration {
started_at: Instant::now(),
period,
kind: BackgroundLoopKind::Gc,
};
// Run gc
let IterationResult { output, elapsed: _ } =
iteration.run(tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &cancel, &ctx))
.await;
match output {
Ok(_) => {
error_run_count = 0;
sleep_duration = period;
}
Err(crate::tenant::GcError::TenantCancelled) => {
return;
}
Err(e) => {
let wait_duration = backoff::exponential_backoff_duration_seconds(
error_run_count + 1,
1.0,
MAX_BACKOFF_SECS,
);
error_run_count += 1;
let wait_duration = Duration::from_secs_f64(wait_duration);
if matches!(e, crate::tenant::GcError::TimelineCancelled) {
// Timeline was cancelled during gc. We might either be in an event
// that affects the entire tenant (tenant deletion, pageserver shutdown),
// or in one that affects the timeline only (timeline deletion).
// Therefore, don't exit the loop.
info!("Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}");
} else {
error!("Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}");
}
sleep_duration = wait_duration;
}
}
};
if tokio::time::timeout(sleep_duration, cancel.cancelled())
.await
.is_ok()
{
if first {
first = false;
if sleep_random(period, &cancel).await.is_err() {
break;
}
}
}
.await;
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
}
async fn ingest_housekeeping_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
let mut last_throttle_flag_reset_at = Instant::now();
loop {
tokio::select! {
_ = cancel.cancelled() => {
return;
},
tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
ControlFlow::Break(()) => return,
ControlFlow::Continue(()) => (),
},
}
// We run ingest housekeeping with the same frequency as compaction: it is not worth
// having a distinct setting. But we don't run it in the same task, because compaction
// blocks on acquiring the background job semaphore.
let period = tenant.get_compaction_period();
// If compaction period is set to zero (to disable it), then we will use a reasonable default
let period = if period == Duration::ZERO {
humantime::Duration::from_str(
pageserver_api::config::tenant_conf_defaults::DEFAULT_COMPACTION_PERIOD,
)
.unwrap()
.into()
} else {
period
};
// Jitter the period by +/- 5%
let period =
rand::thread_rng().gen_range((period * (95)) / 100..(period * (105)) / 100);
// Always sleep first: we do not need to do ingest housekeeping early in the lifetime of
// a tenant, since it won't have started writing any ephemeral files yet.
if tokio::time::timeout(period, cancel.cancelled())
.await
.is_ok()
{
break;
}
let gc_horizon = tenant.get_gc_horizon();
let sleep_duration;
if period == Duration::ZERO || gc_horizon == 0 {
#[cfg(not(feature = "testing"))]
info!("automatic GC is disabled");
// check again in 10 seconds, in case it's been enabled again.
sleep_duration = Duration::from_secs(10);
} else {
let iteration = Iteration {
started_at: Instant::now(),
period,
kind: BackgroundLoopKind::IngestHouseKeeping,
kind: BackgroundLoopKind::Gc,
};
iteration.run(tenant.ingest_housekeeping()).await;
// TODO: rename the background loop kind to something more generic, like, tenant housekeeping.
// Or just spawn another background loop for this throttle, it's not like it's super costly.
info_span!(parent: None, "pagestream_throttle", tenant_id=%tenant.tenant_shard_id, shard_id=%tenant.tenant_shard_id.shard_slug()).in_scope(|| {
let now = Instant::now();
let prev = std::mem::replace(&mut last_throttle_flag_reset_at, now);
let Stats { count_accounted_start, count_accounted_finish, count_throttled, sum_throttled_usecs} = tenant.pagestream_throttle.reset_stats();
if count_throttled == 0 {
// Run gc
let IterationResult { output, elapsed: _ } = iteration
.run(tenant.gc_iteration(
None,
gc_horizon,
tenant.get_pitr_interval(),
&cancel,
&ctx,
))
.await;
match output {
Ok(_) => {
error_run = 0;
sleep_duration = period;
}
Err(crate::tenant::GcError::TenantCancelled) => {
return;
}
let allowed_rps = tenant.pagestream_throttle.steady_rps();
let delta = now - prev;
info!(
n_seconds=%format_args!("{:.3}", delta.as_secs_f64()),
count_accounted = count_accounted_finish, // don't break existing log scraping
count_throttled,
sum_throttled_usecs,
count_accounted_start, // log after pre-existing fields to not break existing log scraping
allowed_rps=%format_args!("{allowed_rps:.0}"),
"shard was throttled in the last n_seconds"
);
});
}
}
.await;
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
}
Err(e) => {
let wait_duration = backoff::exponential_backoff_duration_seconds(
error_run + 1,
1.0,
MAX_BACKOFF_SECS,
);
error_run += 1;
let wait_duration = Duration::from_secs_f64(wait_duration);
async fn wait_for_active_tenant(tenant: &Arc<Tenant>) -> ControlFlow<()> {
// if the tenant has a proper status already, no need to wait for anything
if tenant.current_state() == TenantState::Active {
ControlFlow::Continue(())
} else {
let mut tenant_state_updates = tenant.subscribe_for_state_updates();
loop {
match tenant_state_updates.changed().await {
Ok(()) => {
let new_state = &*tenant_state_updates.borrow();
match new_state {
TenantState::Active => {
debug!("Tenant state changed to active, continuing the task loop");
return ControlFlow::Continue(());
}
state => {
debug!("Not running the task loop, tenant is not active: {state:?}");
continue;
}
if matches!(e, crate::tenant::GcError::TimelineCancelled) {
// Timeline was cancelled during gc. We might either be in an event
// that affects the entire tenant (tenant deletion, pageserver shutdown),
// or in one that affects the timeline only (timeline deletion).
// Therefore, don't exit the loop.
info!("Gc failed {error_run} times, retrying in {wait_duration:?}: {e:?}");
} else {
error!("Gc failed {error_run} times, retrying in {wait_duration:?}: {e:?}");
}
}
Err(_sender_dropped_error) => {
return ControlFlow::Break(());
sleep_duration = wait_duration;
}
}
};
if tokio::time::timeout(sleep_duration, cancel.cancelled())
.await
.is_ok()
{
break;
}
}
}
/// Tenant housekeeping's main loop.
async fn tenant_housekeeping_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
let mut last_throttle_flag_reset_at = Instant::now();
loop {
if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
return;
}
// Use the same period as compaction; it's not worth a separate setting. But if it's set to
// zero (to disable compaction), then use a reasonable default. Jitter it by 5%.
let period = match tenant.get_compaction_period() {
Duration::ZERO => humantime::parse_duration(DEFAULT_COMPACTION_PERIOD).unwrap(),
period => period,
};
let Ok(period) = sleep_jitter(period, period * 5 / 100, &cancel).await else {
break;
};
// Do tenant housekeeping.
let iteration = Iteration {
started_at: Instant::now(),
period,
kind: BackgroundLoopKind::TenantHouseKeeping,
};
iteration.run(tenant.housekeeping()).await;
// Log any getpage throttling.
info_span!(parent: None, "pagestream_throttle", tenant_id=%tenant.tenant_shard_id, shard_id=%tenant.tenant_shard_id.shard_slug()).in_scope(|| {
let now = Instant::now();
let prev = std::mem::replace(&mut last_throttle_flag_reset_at, now);
let Stats { count_accounted_start, count_accounted_finish, count_throttled, sum_throttled_usecs} = tenant.pagestream_throttle.reset_stats();
if count_throttled == 0 {
return;
}
let allowed_rps = tenant.pagestream_throttle.steady_rps();
let delta = now - prev;
info!(
n_seconds=%format_args!("{:.3}", delta.as_secs_f64()),
count_accounted = count_accounted_finish, // don't break existing log scraping
count_throttled,
sum_throttled_usecs,
count_accounted_start, // log after pre-existing fields to not break existing log scraping
allowed_rps=%format_args!("{allowed_rps:.0}"),
"shard was throttled in the last n_seconds"
);
});
}
}
/// Waits until the tenant becomes active, or returns `ControlFlow::Break()` to shut down.
async fn wait_for_active_tenant(
tenant: &Arc<Tenant>,
cancel: &CancellationToken,
) -> ControlFlow<()> {
if tenant.current_state() == TenantState::Active {
return ControlFlow::Continue(());
}
let mut update_rx = tenant.subscribe_for_state_updates();
loop {
tokio::select! {
_ = cancel.cancelled() => return ControlFlow::Break(()),
result = update_rx.changed() => if result.is_err() {
return ControlFlow::Break(());
}
}
match &*update_rx.borrow() {
TenantState::Active => {
debug!("Tenant state changed to active, continuing the task loop");
return ControlFlow::Continue(());
}
state => debug!("Not running the task loop, tenant is not active: {state:?}"),
}
}
}
@@ -558,26 +504,41 @@ async fn wait_for_active_tenant(tenant: &Arc<Tenant>) -> ControlFlow<()> {
#[error("cancelled")]
pub(crate) struct Cancelled;
/// Provide a random delay for background task initialization.
/// Sleeps for a random interval up to the given max value.
///
/// This delay prevents a thundering herd of background tasks and will likely keep them running on
/// different periods for more stable load.
pub(crate) async fn random_init_delay(
period: Duration,
pub(crate) async fn sleep_random(
max: Duration,
cancel: &CancellationToken,
) -> Result<(), Cancelled> {
if period == Duration::ZERO {
return Ok(());
}
) -> Result<Duration, Cancelled> {
sleep_random_range(Duration::ZERO..=max, cancel).await
}
let d = {
let mut rng = rand::thread_rng();
rng.gen_range(Duration::ZERO..=period)
};
match tokio::time::timeout(d, cancel.cancelled()).await {
Ok(_) => Err(Cancelled),
Err(_) => Ok(()),
/// Sleeps for a random interval in the given range. Returns the duration.
pub(crate) async fn sleep_random_range(
interval: RangeInclusive<Duration>,
cancel: &CancellationToken,
) -> Result<Duration, Cancelled> {
let delay = rand::thread_rng().gen_range(interval);
if delay == Duration::ZERO {
return Ok(delay);
}
tokio::select! {
_ = cancel.cancelled() => Err(Cancelled),
_ = tokio::time::sleep(delay) => Ok(delay),
}
}
/// Sleeps for an interval with a random jitter.
pub(crate) async fn sleep_jitter(
duration: Duration,
jitter: Duration,
cancel: &CancellationToken,
) -> Result<Duration, Cancelled> {
let from = duration.saturating_sub(jitter);
let to = duration.saturating_add(jitter);
sleep_random_range(from..=to, cancel).await
}
struct Iteration {
@@ -593,42 +554,25 @@ struct IterationResult<O> {
impl Iteration {
#[instrument(skip_all)]
pub(crate) async fn run<Fut, O>(self, fut: Fut) -> IterationResult<O>
where
Fut: std::future::Future<Output = O>,
{
let Self {
started_at,
period,
kind,
} = self;
let mut fut = std::pin::pin!(fut);
pub(crate) async fn run<F: Future<Output = O>, O>(self, fut: F) -> IterationResult<O> {
let mut fut = pin!(fut);
// Wrap `fut` into a future that logs a message every `period` so that we get a
// very obvious breadcrumb in the logs _while_ a slow iteration is happening.
let liveness_logger = async move {
loop {
match tokio::time::timeout(period, &mut fut).await {
Ok(x) => return x,
Err(_) => {
// info level as per the same rationale why warn_when_period_overrun is info
// => https://github.com/neondatabase/neon/pull/5724
info!("still running");
}
}
let output = loop {
match tokio::time::timeout(self.period, &mut fut).await {
Ok(r) => break r,
Err(_) => info!("still running"),
}
};
let output = liveness_logger.await;
let elapsed = started_at.elapsed();
warn_when_period_overrun(elapsed, period, kind);
let elapsed = self.started_at.elapsed();
warn_when_period_overrun(elapsed, self.period, self.kind);
IterationResult { output, elapsed }
}
}
/// Attention: the `task` and `period` beocme labels of a pageserver-wide prometheus metric.
// NB: the `task` and `period` are used for metrics labels.
pub(crate) fn warn_when_period_overrun(
elapsed: Duration,
period: Duration,

View File

@@ -1718,8 +1718,9 @@ impl Timeline {
let prepare = async move {
let guard = self.compaction_lock.lock().await;
let permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
let permit = super::tasks::acquire_concurrency_permit(
BackgroundLoopKind::Compaction,
self.conf.use_compaction_semaphore,
ctx,
)
.await;
@@ -2632,7 +2633,7 @@ impl Timeline {
return;
}
FlushLoopState::Exited => {
warn!(
info!(
"ignoring attempt to restart exited flush_loop {}/{}",
self.tenant_shard_id, self.timeline_id
);
@@ -3056,8 +3057,9 @@ impl Timeline {
let self_ref = &self;
let skip_concurrency_limiter = &skip_concurrency_limiter;
async move {
let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
let wait_for_permit = super::tasks::acquire_concurrency_permit(
BackgroundLoopKind::InitialLogicalSizeCalculation,
false,
background_ctx,
);

View File

@@ -341,6 +341,13 @@ impl DeleteTimelineFlow {
let tenant_shard_id = timeline.tenant_shard_id();
let timeline_id = timeline.timeline_id();
// Take a tenant gate guard, because timeline deletion needs access to the tenant to update its manifest.
let Ok(tenant_guard) = tenant.gate.enter() else {
// It is safe to simply skip here, because we only schedule background work once the timeline is durably marked for deletion.
info!("Tenant is shutting down, timeline deletion will be resumed when it next starts");
return;
};
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
TaskKind::TimelineDeletionWorker,
@@ -348,6 +355,8 @@ impl DeleteTimelineFlow {
Some(timeline_id),
"timeline_delete",
async move {
let _guard = tenant_guard;
if let Err(err) = Self::background(guard, conf, &tenant, &timeline, remote_client).await {
// Only log as an error if it's not a cancellation.
if matches!(err, DeleteTimelineError::Cancelled) {

View File

@@ -32,7 +32,7 @@ use crate::{
tenant::{
size::CalculateSyntheticSizeError,
storage_layer::LayerVisibilityHint,
tasks::{BackgroundLoopKind, BackgroundLoopSemaphorePermit},
tasks::{sleep_random, BackgroundLoopKind, BackgroundLoopSemaphorePermit},
timeline::EvictionError,
LogicalSizeCalculationCause, Tenant,
},
@@ -83,8 +83,6 @@ impl Timeline {
#[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
async fn eviction_task(self: Arc<Self>, tenant: Arc<Tenant>) {
use crate::tenant::tasks::random_init_delay;
// acquire the gate guard only once within a useful span
let Ok(guard) = self.gate.enter() else {
return;
@@ -97,7 +95,7 @@ impl Timeline {
EvictionPolicy::OnlyImitiate(lat) => lat.period,
EvictionPolicy::NoEviction => Duration::from_secs(10),
};
if random_init_delay(period, &self.cancel).await.is_err() {
if sleep_random(period, &self.cancel).await.is_err() {
return;
}
}
@@ -334,8 +332,9 @@ impl Timeline {
cancel: &CancellationToken,
ctx: &RequestContext,
) -> ControlFlow<(), BackgroundLoopSemaphorePermit<'static>> {
let acquire_permit = crate::tenant::tasks::concurrent_background_tasks_rate_limit_permit(
let acquire_permit = crate::tenant::tasks::acquire_concurrency_permit(
BackgroundLoopKind::Eviction,
false,
ctx,
);

View File

@@ -37,8 +37,8 @@ To play with it locally one may start proxy over a local postgres installation
If both postgres and proxy are running you may send a SQL query:
```console
curl -k -X POST 'https://proxy.localtest.me:4444/sql' \
-H 'Neon-Connection-String: postgres://stas:pass@proxy.localtest.me:4444/postgres' \
curl -k -X POST 'https://proxy.local.neon.build:4444/sql' \
-H 'Neon-Connection-String: postgres://stas:pass@proxy.local.neon.build:4444/postgres' \
-H 'Content-Type: application/json' \
--data '{
"query":"SELECT $1::int[] as arr, $2::jsonb as obj, 42 as num",
@@ -104,7 +104,7 @@ cases where it is hard to use rows represented as objects (e.g. when several fie
## Test proxy locally
Proxy determines project name from the subdomain, request to the `round-rice-566201.somedomain.tld` will be routed to the project named `round-rice-566201`. Unfortunately, `/etc/hosts` does not support domain wildcards, so we can use *.localtest.me` which resolves to `127.0.0.1`.
Proxy determines project name from the subdomain, request to the `round-rice-566201.somedomain.tld` will be routed to the project named `round-rice-566201`. Unfortunately, `/etc/hosts` does not support domain wildcards, so we can use *.local.neon.build` which resolves to `127.0.0.1`.
We will need to have a postgres instance. Assuming that we have set up docker we can set it up as follows:
```sh
@@ -125,7 +125,7 @@ docker exec -it proxy-postgres psql -U postgres -c "CREATE ROLE proxy WITH SUPER
Let's create self-signed certificate by running:
```sh
openssl req -new -x509 -days 365 -nodes -text -out server.crt -keyout server.key -subj "/CN=*.localtest.me"
openssl req -new -x509 -days 365 -nodes -text -out server.crt -keyout server.key -subj "/CN=*.local.neon.build"
```
Then we need to build proxy with 'testing' feature and run, e.g.:
@@ -136,5 +136,5 @@ RUST_LOG=proxy cargo run -p proxy --bin proxy --features testing -- --auth-backe
Now from client you can start a new session:
```sh
PGSSLROOTCERT=./server.crt psql "postgresql://proxy:password@endpoint.localtest.me:4432/postgres?sslmode=verify-full"
PGSSLROOTCERT=./server.crt psql "postgresql://proxy:password@endpoint.local.neon.build:4432/postgres?sslmode=verify-full"
```

View File

@@ -592,6 +592,8 @@ impl Timeline {
assert!(self.cancel.is_cancelled());
assert!(self.gate.close_complete());
info!("deleting timeline {} from disk", self.ttid);
// Close associated FDs. Nobody will be able to touch timeline data once
// it is cancelled, so WAL storage won't be opened again.
shared_state.sk.close_wal_store();

View File

@@ -475,6 +475,8 @@ impl GlobalTimelines {
info!("deleting timeline {}, only_local={}", ttid, only_local);
timeline.shutdown().await;
info!("timeline {ttid} shut down for deletion");
// Take a lock and finish the deletion holding this mutex.
let mut shared_state = timeline.write_shared_state().await;

View File

@@ -413,7 +413,6 @@ pub struct Service {
// Limit how many Reconcilers we will spawn concurrently
reconciler_concurrency: Arc<tokio::sync::Semaphore>,
prio_reconciler_concurrency: Arc<tokio::sync::Semaphore>,
/// Queue of tenants who are waiting for concurrency limits to permit them to reconcile
/// Send into this queue to promptly attempt to reconcile this shard next time units are available.
@@ -1190,7 +1189,7 @@ impl Service {
let (nodes, tenants, _scheduler) = locked.parts_mut();
if let Some(shard) = tenants.get_mut(&tenant_shard_id) {
shard.delayed_reconcile = false;
self.maybe_reconcile_shard(shard, nodes, false);
self.maybe_reconcile_shard(shard, nodes);
}
if self.reconciler_concurrency.available_permits() == 0 {
@@ -1467,7 +1466,6 @@ impl Service {
reconciler_concurrency: Arc::new(tokio::sync::Semaphore::new(
config.reconciler_concurrency,
)),
prio_reconciler_concurrency: Arc::new(tokio::sync::Semaphore::new(1024)),
delayed_reconcile_tx,
abort_tx,
startup_complete: startup_complete.clone(),
@@ -2246,7 +2244,7 @@ impl Service {
tenants
.range_mut(TenantShardId::tenant_range(tenant_id))
.filter_map(|(_shard_id, shard)| {
self.maybe_configured_reconcile_shard(shard, nodes, config, true)
self.maybe_configured_reconcile_shard(shard, nodes, config)
})
.collect::<Vec<_>>()
};
@@ -2715,7 +2713,7 @@ impl Service {
shard.schedule(scheduler, &mut schedule_context)?;
let maybe_waiter = self.maybe_reconcile_shard(shard, nodes, true);
let maybe_waiter = self.maybe_reconcile_shard(shard, nodes);
if let Some(waiter) = maybe_waiter {
waiters.push(waiter);
}
@@ -2836,7 +2834,7 @@ impl Service {
let (nodes, tenants, _scheduler) = locked.parts_mut();
for (_shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
shard.config = config.clone();
if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes, true) {
if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes) {
waiters.push(waiter);
}
}
@@ -3118,7 +3116,7 @@ impl Service {
debug_assert!(shard.intent.get_attached().is_none());
debug_assert!(shard.intent.get_secondary().is_empty());
if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes, true) {
if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes) {
detach_waiters.push(waiter);
}
}
@@ -3270,7 +3268,7 @@ impl Service {
// In case scheduling is being switched back on, try it now.
shard.schedule(scheduler, &mut schedule_context).ok();
self.maybe_reconcile_shard(shard, nodes, true);
self.maybe_reconcile_shard(shard, nodes);
}
Ok(())
@@ -4319,7 +4317,7 @@ impl Service {
tracing::warn!("Failed to schedule {tenant_shard_id} during shard abort: {e}")
}
self.maybe_reconcile_shard(shard, nodes, true);
self.maybe_reconcile_shard(shard, nodes);
}
// We don't expect any new_shard_count shards to exist here, but drop them just in case
@@ -4485,8 +4483,7 @@ impl Service {
tracing::warn!("Failed to schedule child shard {child}: {e}");
}
// In the background, attach secondary locations for the new shards
if let Some(waiter) = self.maybe_reconcile_shard(&mut child_state, nodes, true)
{
if let Some(waiter) = self.maybe_reconcile_shard(&mut child_state, nodes) {
waiters.push(waiter);
}
@@ -4851,7 +4848,7 @@ impl Service {
shard.intent.clear_secondary(scheduler);
// Run Reconciler to execute detach fo secondary locations.
if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes, true) {
if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes) {
waiters.push(waiter);
}
}
@@ -5117,7 +5114,7 @@ impl Service {
shard.sequence = shard.sequence.next();
}
self.maybe_reconcile_shard(shard, nodes, true)
self.maybe_reconcile_shard(shard, nodes)
};
if let Some(waiter) = waiter {
@@ -5180,7 +5177,7 @@ impl Service {
);
}
self.maybe_reconcile_shard(shard, nodes, true)
self.maybe_reconcile_shard(shard, nodes)
};
if let Some(waiter) = waiter {
@@ -5592,7 +5589,7 @@ impl Service {
)
}
self.maybe_reconcile_shard(shard, nodes, true);
self.maybe_reconcile_shard(shard, nodes);
}
// Here we remove an existing observed location for the node we're removing, and it will
@@ -5961,10 +5958,7 @@ impl Service {
tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", node_id);
}
Ok(()) => {
if self
.maybe_reconcile_shard(tenant_shard, nodes, false)
.is_some()
{
if self.maybe_reconcile_shard(tenant_shard, nodes).is_some() {
tenants_affected += 1;
};
}
@@ -5995,7 +5989,7 @@ impl Service {
if let Some(observed_loc) = tenant_shard.observed.locations.get_mut(&node_id) {
if observed_loc.conf.is_none() {
self.maybe_reconcile_shard(tenant_shard, nodes, false);
self.maybe_reconcile_shard(tenant_shard, nodes);
}
}
}
@@ -6359,14 +6353,8 @@ impl Service {
&self,
shard: &mut TenantShard,
nodes: &Arc<HashMap<NodeId, Node>>,
high_priority: bool,
) -> Option<ReconcilerWaiter> {
self.maybe_configured_reconcile_shard(
shard,
nodes,
ReconcilerConfig::default(),
high_priority,
)
self.maybe_configured_reconcile_shard(shard, nodes, ReconcilerConfig::default())
}
/// Wrap [`TenantShard`] reconciliation methods with acquisition of [`Gate`] and [`ReconcileUnits`],
@@ -6375,7 +6363,6 @@ impl Service {
shard: &mut TenantShard,
nodes: &Arc<HashMap<NodeId, Node>>,
reconciler_config: ReconcilerConfig,
high_priority: bool,
) -> Option<ReconcilerWaiter> {
let reconcile_needed = shard.get_reconcile_needed(nodes);
@@ -6387,13 +6374,7 @@ impl Service {
}
};
let acq = if high_priority {
self.prio_reconciler_concurrency.clone().try_acquire_owned()
} else {
self.reconciler_concurrency.clone().try_acquire_owned()
};
let units = match acq {
let units = match self.reconciler_concurrency.clone().try_acquire_owned() {
Ok(u) => ReconcileUnits::new(u),
Err(_) => {
tracing::info!(tenant_id=%shard.tenant_shard_id.tenant_id, shard_id=%shard.tenant_shard_id.shard_slug(),
@@ -6487,10 +6468,7 @@ impl Service {
// Eventual consistency: if an earlier reconcile job failed, and the shard is still
// dirty, spawn another rone
if self
.maybe_reconcile_shard(shard, &pageservers, false)
.is_some()
{
if self.maybe_reconcile_shard(shard, &pageservers).is_some() {
reconciles_spawned += 1;
} else if shard.delayed_reconcile {
// Shard wanted to reconcile but for some reason couldn't.
@@ -6576,7 +6554,7 @@ impl Service {
tracing::info!(tenant_shard_id=%tenant_shard_id, "Applying optimization: {optimization:?}");
if shard.apply_optimization(scheduler, optimization) {
optimizations_applied += 1;
if self.maybe_reconcile_shard(shard, nodes, false).is_some() {
if self.maybe_reconcile_shard(shard, nodes).is_some() {
reconciles_spawned += 1;
}
}
@@ -7243,7 +7221,6 @@ impl Service {
tenant_shard,
nodes,
reconciler_config,
false,
);
if let Some(some) = waiter {
waiters.push(some);
@@ -7536,7 +7513,6 @@ impl Service {
tenant_shard,
nodes,
reconciler_config,
false,
) {
waiters.push(waiter);
}

View File

@@ -88,7 +88,7 @@ impl ChaosInjector {
shard.intent.demote_attached(scheduler, old_location);
shard.intent.promote_attached(scheduler, new_location);
self.service.maybe_reconcile_shard(shard, nodes, false);
self.service.maybe_reconcile_shard(shard, nodes);
}
async fn inject_chaos(&mut self) {

View File

@@ -3345,7 +3345,7 @@ class NeonProxy(PgProtocol):
metric_collection_interval: str | None = None,
):
host = "127.0.0.1"
domain = "proxy.localtest.me" # resolves to 127.0.0.1
domain = "proxy.local.neon.build" # resolves to 127.0.0.1
super().__init__(dsn=auth_backend.default_conn_url, host=domain, port=proxy_port)
self.domain = domain
@@ -3368,7 +3368,7 @@ class NeonProxy(PgProtocol):
# generate key of it doesn't exist
crt_path = self.test_output_dir / "proxy.crt"
key_path = self.test_output_dir / "proxy.key"
generate_proxy_tls_certs("*.localtest.me", key_path, crt_path)
generate_proxy_tls_certs("*.local.neon.build", key_path, crt_path)
args = [
str(self.neon_binpath / "proxy"),
@@ -3569,7 +3569,7 @@ class NeonAuthBroker:
external_http_port: int,
auth_backend: NeonAuthBroker.ProxyV1,
):
self.domain = "apiauth.localtest.me" # resolves to 127.0.0.1
self.domain = "apiauth.local.neon.build" # resolves to 127.0.0.1
self.host = "127.0.0.1"
self.http_port = http_port
self.external_http_port = external_http_port
@@ -3586,7 +3586,7 @@ class NeonAuthBroker:
# generate key of it doesn't exist
crt_path = self.test_output_dir / "proxy.crt"
key_path = self.test_output_dir / "proxy.key"
generate_proxy_tls_certs("apiauth.localtest.me", key_path, crt_path)
generate_proxy_tls_certs("apiauth.local.neon.build", key_path, crt_path)
args = [
str(self.neon_binpath / "proxy"),
@@ -5122,12 +5122,14 @@ def wait_for_last_flush_lsn(
timeline: TimelineId,
pageserver_id: int | None = None,
auth_token: str | None = None,
last_flush_lsn: Lsn | None = None,
) -> Lsn:
"""Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn."""
shards = tenant_get_shards(env, tenant, pageserver_id)
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
if last_flush_lsn is None:
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
results = []
for tenant_shard_id, pageserver in shards:

View File

@@ -282,18 +282,35 @@ class S3Storage:
def timeline_path(self, tenant_id: TenantShardId | TenantId, timeline_id: TimelineId) -> str:
return f"{self.tenant_path(tenant_id)}/timelines/{timeline_id}"
def get_latest_generation_key(self, prefix: str, suffix: str, keys: list[str]) -> str:
"""
Gets the latest generation key from a list of keys.
@param index_keys: A list of keys of different generations, which start with `prefix`
"""
def parse_gen(key: str) -> int:
shortname = key.split("/")[-1]
generation_str = shortname.removeprefix(prefix).removesuffix(suffix)
try:
return int(generation_str, base=16)
except ValueError:
log.info(f"Ignoring non-matching key: {key}")
return -1
if len(keys) == 0:
raise IndexError("No keys found")
return max(keys, key=parse_gen)
def get_latest_index_key(self, index_keys: list[str]) -> str:
"""
Gets the latest index file key.
@param index_keys: A list of index keys of different generations.
"""
def parse_gen(index_key: str) -> int:
parts = index_key.split("index_part.json-")
return int(parts[-1], base=16) if len(parts) == 2 else -1
return max(index_keys, key=parse_gen)
key = self.get_latest_generation_key(prefix="index_part.json-", suffix="", keys=index_keys)
return key
def download_index_part(self, index_key: str) -> IndexPartDump:
"""
@@ -306,6 +323,29 @@ class S3Storage:
log.info(f"index_part.json: {body}")
return IndexPartDump.from_json(json.loads(body))
def download_tenant_manifest(self, tenant_id: TenantId) -> dict[str, Any] | None:
tenant_prefix = self.tenant_path(tenant_id)
objects = self.client.list_objects_v2(Bucket=self.bucket_name, Prefix=f"{tenant_prefix}/")[
"Contents"
]
keys = [obj["Key"] for obj in objects if obj["Key"].find("tenant-manifest") != -1]
try:
manifest_key = self.get_latest_generation_key("tenant-manifest-", ".json", keys)
except IndexError:
log.info(
f"No manifest found for tenant {tenant_id}, this is normal if it didn't offload anything yet"
)
return None
response = self.client.get_object(Bucket=self.bucket_name, Key=manifest_key)
body = response["Body"].read().decode("utf-8")
log.info(f"Downloaded manifest {manifest_key}: {body}")
manifest = json.loads(body)
assert isinstance(manifest, dict)
return manifest
def heatmap_key(self, tenant_id: TenantId) -> str:
return f"{self.tenant_path(tenant_id)}/{TENANT_HEATMAP_FILE_NAME}"

View File

@@ -76,6 +76,9 @@ def test_ingest_logical_message(
log.info("Waiting for Pageserver to catch up")
wait_for_last_record_lsn(client, env.initial_tenant, env.initial_timeline, end_lsn)
recover_to_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
endpoint.stop()
# Now that all data is ingested, delete and recreate the tenant in the pageserver. This will
# reingest all the WAL from the safekeeper without any other constraints. This gives us a
# baseline of how fast the pageserver can ingest this WAL in isolation.
@@ -88,7 +91,13 @@ def test_ingest_logical_message(
with zenbenchmark.record_duration("pageserver_recover_ingest"):
log.info("Recovering WAL into pageserver")
client.timeline_create(env.pg_version, env.initial_tenant, env.initial_timeline)
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
wait_for_last_flush_lsn(
env, endpoint, env.initial_tenant, env.initial_timeline, last_flush_lsn=recover_to_lsn
)
# Check endpoint can start, i.e. we really recovered
endpoint.start()
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
# Emit metrics.
wal_written_mb = round((end_lsn - start_lsn) / (1024 * 1024))

View File

@@ -474,6 +474,14 @@ HISTORIC_DATA_SETS = [
PgVersion.V16,
"https://neon-github-public-dev.s3.eu-central-1.amazonaws.com/compatibility-data-snapshots/2024-07-18-pgv16.tar.zst",
),
# This dataset created on a pageserver running modern code at time of capture, but configured with no generation. This
# is our regression test that we can load data written without generations in layer file names & indices
HistoricDataSet(
"2025-02-07-nogenerations",
TenantId("e1411ca6562d6ff62419f693a5695d67"),
PgVersion.V17,
"https://neon-github-public-dev.s3.eu-central-1.amazonaws.com/compatibility-data-snapshots/2025-02-07-pgv17-nogenerations.tar.zst",
),
]

View File

@@ -12,7 +12,6 @@ of the pageserver are:
from __future__ import annotations
import os
import re
import time
from enum import StrEnum
@@ -29,7 +28,6 @@ from fixtures.pageserver.common_types import parse_layer_file_name
from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import (
assert_tenant_state,
list_prefix,
wait_for_last_record_lsn,
wait_for_upload,
)
@@ -124,109 +122,6 @@ def assert_deletion_queue(ps_http, size_fn) -> None:
assert size_fn(v) is True
def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
"""
Validate behavior when a pageserver is run without generation support enabled,
then started again after activating it:
- Before upgrade, no objects should have generation suffixes
- After upgrade, the bucket should contain a mixture.
- In both cases, postgres I/O should work.
"""
neon_env_builder.enable_pageserver_remote_storage(
RemoteStorageKind.MOCK_S3,
)
env = neon_env_builder.init_configs()
env.broker.start()
for sk in env.safekeepers:
sk.start()
env.storage_controller.start()
# We will start a pageserver with no control_plane_api set, so it won't be able to self-register
env.storage_controller.node_register(env.pageserver)
def remove_control_plane_api_field(config):
return config.pop("control_plane_api")
control_plane_api = env.pageserver.edit_config_toml(remove_control_plane_api_field)
env.pageserver.start()
env.storage_controller.node_configure(env.pageserver.id, {"availability": "Active"})
env.create_tenant(
tenant_id=env.initial_tenant, conf=TENANT_CONF, timeline_id=env.initial_timeline
)
generate_uploads_and_deletions(env, pageserver=env.pageserver)
def parse_generation_suffix(key):
m = re.match(".+-([0-9a-zA-Z]{8})$", key)
if m is None:
return None
else:
log.info(f"match: {m}")
log.info(f"group: {m.group(1)}")
return int(m.group(1), 16)
assert neon_env_builder.pageserver_remote_storage is not None
pre_upgrade_keys = list(
[
o["Key"]
for o in list_prefix(neon_env_builder.pageserver_remote_storage, delimiter="")[
"Contents"
]
]
)
for key in pre_upgrade_keys:
assert parse_generation_suffix(key) is None
env.pageserver.stop()
# Starting without the override that disabled control_plane_api
env.pageserver.patch_config_toml_nonrecursive(
{
"control_plane_api": control_plane_api,
}
)
env.pageserver.start()
generate_uploads_and_deletions(env, pageserver=env.pageserver, init=False)
legacy_objects: list[str] = []
suffixed_objects = []
post_upgrade_keys = list(
[
o["Key"]
for o in list_prefix(neon_env_builder.pageserver_remote_storage, delimiter="")[
"Contents"
]
]
)
for key in post_upgrade_keys:
log.info(f"post-upgrade key: {key}")
if parse_generation_suffix(key) is not None:
suffixed_objects.append(key)
else:
legacy_objects.append(key)
# Bucket now contains a mixture of suffixed and non-suffixed objects
assert len(suffixed_objects) > 0
assert len(legacy_objects) > 0
# Flush through deletions to get a clean state for scrub: we are implicitly validating
# that our generations-enabled pageserver was able to do deletions of layers
# from earlier which don't have a generation.
env.pageserver.http_client().deletion_queue_flush(execute=True)
assert get_deletion_queue_unexpected_errors(env.pageserver.http_client()) == 0
# Having written a mixture of generation-aware and legacy index_part.json,
# ensure the scrubber handles the situation as expected.
healthy, metadata_summary = env.storage_scrubber.scan_metadata()
assert metadata_summary["tenant_count"] == 1 # Scrubber should have seen our timeline
assert metadata_summary["timeline_count"] == 1
assert metadata_summary["timeline_shard_count"] == 1
assert healthy
def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_pageserver_remote_storage(
RemoteStorageKind.MOCK_S3,

View File

@@ -57,7 +57,7 @@ def test_proxy_select_1(static_proxy: NeonProxy):
assert out[0][0] == 1
# with SNI
out = static_proxy.safe_psql("select 42", host="generic-project-name.localtest.me")
out = static_proxy.safe_psql("select 42", host="generic-project-name.local.neon.build")
assert out[0][0] == 42
@@ -234,7 +234,7 @@ def test_sql_over_http_serverless_driver(static_proxy: NeonProxy):
connstr = f"postgresql://http:http@{static_proxy.domain}:{static_proxy.proxy_port}/postgres"
response = requests.post(
f"https://api.localtest.me:{static_proxy.external_http_port}/sql",
f"https://api.local.neon.build:{static_proxy.external_http_port}/sql",
data=json.dumps({"query": "select 42 as answer", "params": []}),
headers={"Content-Type": "application/sql", "Neon-Connection-String": connstr},
verify=str(static_proxy.test_output_dir / "proxy.crt"),

View File

@@ -35,7 +35,7 @@ async def test_proxy_psql_allowed_ips(static_proxy: NeonProxy, vanilla_pg: Vanil
check_cannot_connect(query="select 1", sslsni=0, options="endpoint=private-project")
# with SNI
check_cannot_connect(query="select 1", host="private-project.localtest.me")
check_cannot_connect(query="select 1", host="private-project.local.neon.build")
# no SNI, deprecated `options=project` syntax (before we had several endpoint in project)
out = static_proxy.safe_psql(query="select 1", sslsni=0, options="project=generic-project")
@@ -46,7 +46,7 @@ async def test_proxy_psql_allowed_ips(static_proxy: NeonProxy, vanilla_pg: Vanil
assert out[0][0] == 1
# with SNI
out = static_proxy.safe_psql(query="select 1", host="generic-project.localtest.me")
out = static_proxy.safe_psql(query="select 1", host="generic-project.local.neon.build")
assert out[0][0] == 1

View File

@@ -116,7 +116,7 @@ def test_pg_sni_router(
test_output_dir: Path,
):
generate_tls_cert(
"endpoint.namespace.localtest.me",
"endpoint.namespace.local.neon.build",
test_output_dir / "router.crt",
test_output_dir / "router.key",
)
@@ -130,7 +130,7 @@ def test_pg_sni_router(
with PgSniRouter(
neon_binpath=neon_binpath,
port=router_port,
destination="localtest.me",
destination="local.neon.build",
tls_cert=test_output_dir / "router.crt",
tls_key=test_output_dir / "router.key",
test_output_dir=test_output_dir,
@@ -141,7 +141,7 @@ def test_pg_sni_router(
"select 1",
dbname="postgres",
sslmode="require",
host=f"endpoint--namespace--{pg_port}.localtest.me",
host=f"endpoint--namespace--{pg_port}.local.neon.build",
hostaddr="127.0.0.1",
)
assert out[0][0] == 1

View File

@@ -554,8 +554,33 @@ def test_timeline_archival_chaos(neon_env_builder: NeonEnvBuilder):
log.info(f"Timeline {state.timeline_id} is still active")
shutdown.wait(0.5)
elif state.timeline_id in offloaded_ids:
log.info(f"Timeline {state.timeline_id} is now offloaded")
state.offloaded = True
log.info(f"Timeline {state.timeline_id} is now offloaded in memory")
# Hack: when we see something offloaded in the API, it doesn't guarantee that the offload
# is persistent (it is marked offloaded first, then that is persisted to the tenant manifest).
# So we wait until we see the manifest update before considering it offloaded, that way
# subsequent checks that it doesn't revert to active on a restart will pass reliably.
time.sleep(0.1)
assert isinstance(env.pageserver_remote_storage, S3Storage)
manifest = env.pageserver_remote_storage.download_tenant_manifest(
tenant_id
)
if manifest is None:
log.info(
f"Timeline {state.timeline_id} is not yet offloaded persistently (no manifest)"
)
elif str(state.timeline_id) in [
t["timeline_id"] for t in manifest["offloaded_timelines"]
]:
log.info(
f"Timeline {state.timeline_id} is now offloaded persistently"
)
state.offloaded = True
else:
log.info(
f"Timeline {state.timeline_id} is not yet offloaded persistently (manifest: {manifest})"
)
break
else:
# Timeline is neither offloaded nor active, this is unexpected: the pageserver

View File

@@ -13,12 +13,12 @@
# postgres -D data -p3000
#
# ## Launch proxy with WSS enabled:
# openssl req -new -x509 -days 365 -nodes -text -out server.crt -keyout server.key -subj '/CN=*.neon.localtest.me'
# openssl req -new -x509 -days 365 -nodes -text -out server.crt -keyout server.key -subj '/CN=*.local.neon.build'
# ./target/debug/proxy --wss 127.0.0.1:40433 --http 127.0.0.1:28080 --mgmt 127.0.0.1:9099 --proxy 127.0.0.1:4433 --tls-key server.key --tls-cert server.crt --auth-backend postgres
#
# ## Launch the tunnel:
#
# poetry run ./test_runner/websocket_tunnel.py --ws-port 40433 --ws-url "wss://ep-test.neon.localtest.me"
# poetry run ./test_runner/websocket_tunnel.py --ws-port 40433 --ws-url "wss://ep-test.local.neon.build"
#
# ## Now you can connect with psql:
# psql "postgresql://heikki@localhost:40433/postgres"