Compare commits

...

39 Commits

Author SHA1 Message Date
John Spray
eafa5d2db8 tests: full compaction at end of pg_regress tests 2024-07-05 11:06:17 +00:00
John Spray
efccf6cb79 tests: common post-test checks in pg_regress 2024-07-05 11:00:26 +00:00
John Spray
9fc9553e1f pageserver: fix dropping backtrace from CreateImageLayersError 2024-07-05 10:35:24 +00:00
John Spray
bc87e78f1e pageserver: respect has_relmap_file in collect_keyspace 2024-07-05 10:35:24 +00:00
John Spray
2bdb79e17a tests: use smaller layers in test_pg_regress 2024-07-05 10:35:24 +00:00
Alexander Bayandin
5b69b32dc5 CI(build-and-test): add conclusion job (#8246)
## Problem

Currently, if you need to rename a job and the job is listed in [branch
protection
rules](https://github.com/neondatabase/neon/settings/branch_protection_rules),
the PR won't be allowed to merge.

## Summary of changes
- Add `conclusion` job that fails if any of its dependencies don't
finish successfully
2024-07-04 09:20:01 +01:00
Conrad Ludgate
e03c3c9893 proxy: cache certain non-retriable console errors for a short time (#8201)
## Problem

If there's a quota error, it makes sense to cache it for a short window
of time. Many clients do not handle database connection errors
gracefully, so just spam retry 🤡

## Summary of changes

Updates the node_info cache to support storing console errors. Store
console errors if they cannot be retried (using our own heuristic.
should only trigger for quota exceeded errors).
2024-07-04 09:03:03 +01:00
Vlad Lazar
bbb2fa7cdd tests: perform graceful rolling restarts in storcon scale test (#8173)
## Problem
Scale test doesn't exercise drain & fill.

## Summary of changes
Make scale test exercise drain & fill
2024-07-04 06:04:19 +01:00
John Spray
778787d8e9 pageserver: add supplementary branch usage stats (#8131)
## Problem

The metrics we have today aren't convenient for planning around the
impact of timeline archival on costs.

Closes: https://github.com/neondatabase/neon/issues/8108

## Summary of changes

- Add metric `pageserver_archive_size`, which indicates the logical
bytes of data which we would expect to write into an archived branch.
- Add metric `pageserver_pitr_history_size`, which indicates the
distance between last_record_lsn and the PITR cutoff.

These metrics are somewhat temporary: when we implement #8088 and
associated consumption metric changes, these will reach a final form.
For now, an "archived" branch is just any branch outside of its parent's
PITR window: later, archival will become an explicit state (which will
_usually_ correspond to falling outside the parent's PITR window).

The overall volume of timeline metrics is something to watch, but we are
removing many more in https://github.com/neondatabase/neon/pull/8245
than this PR is adding.
2024-07-03 22:29:43 +01:00
Alex Chi Z
90b51dcf16 fix(pageserver): ensure test creates valid layer map (#8191)
I'd like to add some constraints to the layer map we generate in tests.

(1) is the layer map that the current compaction algorithm will produce.
There is a property that for all delta layer, all delta layer overlaps
with it on the LSN axis will have the same LSN range.
(2) is the layer map that cannot be produced with the legacy compaction
algorithm.
(3) is the layer map that will be produced by the future
tiered-compaction algorithm. The current validator does not allow that
but we can modify the algorithm to allow it in the future.

## Summary of changes

Add a validator to check if the layer map is valid and refactor the test
cases to include delta layer start/end LSN.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
Co-authored-by: Christian Schwarz <christian@neon.tech>
2024-07-03 18:46:58 +00:00
Christian Schwarz
a85aa03d18 page_service: stop exposing get_last_record_rlsn (#8244)
Compute doesn't use it, let's eliminate it.

Ref to Slack thread:
https://neondb.slack.com/archives/C033RQ5SPDH/p1719920261995529
2024-07-03 20:05:01 +02:00
Japin Li
cdaed4d79c Fix outdated comment (#8149)
Commit 97b48c23f changes the log wait timeout from 1 second to 100
milliseconds but forgets to update the comment.
2024-07-03 13:55:36 -04:00
John Spray
ea0b22a9b0 pageserver: reduce ops tracked at per-timeline detail (#8245)
## Problem

We record detailed histograms for all page_service op types, which
mostly aren't very interesting, but make our prometheus scrapes huge.

Closes: #8223 

## Summary of changes

- Only track GetPageAtLsn histograms on a per-timeline granularity. For
all other operation types, rely on existing node-wide histograms.
2024-07-03 17:27:34 +01:00
Peter Bendel
392a58bdce add pagebench test cases for periodic pagebench on dedicated hardware (#8233)
we want to run some specific pagebench test cases on dedicated hardware
to get reproducible results

run1: 1 client per tenant => characterize throughput with n tenants.
-  500 tenants
- scale 13 (200 MB database)
- 1 hour duration
- ca 380 GB layer snapshot files

run2.singleclient: 1 client per tenant => characterize latencies
run2.manyclient: N clients per tenant => characterize throughput
scalability within one tenant.
- 1 tenant with 1 client for latencies
- 1 tenant with 64 clients because typically for a high number of
connections we recommend the connection pooler
which by default uses 64 connections (for scalability)
- scale 136 (2048 MB database)
- 20 minutes each
2024-07-03 16:22:33 +00:00
Arpad Müller
e0891ec8c8 Only support compressed reads if the compression setting is present (#8238)
PR #8106 was created with the assumption that no blob is larger than
`256 MiB`. Due to #7852 we have checking for *writes* of blobs larger
than that limit, but we didn't have checking for *reads* of such large
blobs: in theory, we could be reading these blobs every day but we just
don't happen to write the blobs for some reason.

Therefore, we now add a warning for *reads* of such large blobs as well.

To make deploying compression less dangerous, we therefore only assume a
blob is compressed if the compression setting is present in the config.
This also means that we can't back out of compression once we enabled
it.

Part of https://github.com/neondatabase/neon/issues/5431
2024-07-03 18:02:10 +02:00
John Spray
97f7188a07 pageserver: don't try to flush if shutdown during attach (#8235)
## Problem

test_location_conf_churn fails on log errors when it tries to shutdown a
pageserver immediately after starting a tenant attach, like this:
https://neon-github-public-dev.s3.amazonaws.com/reports/pr-8224/9761000525/index.html#/testresult/15fb6beca5c7327c

```
shutdown:shutdown{tenant_id=35f5c55eb34e7e5e12288c5d8ab8b909 shard_id=0000}:timeline_shutdown{timeline_id=30936747043353a98661735ad09cbbfe shutdown_mode=FreezeAndFlush}: failed to freeze and flush: cannot flush frozen layers when flush_loop is not running, state is Exited\n')
```

This is happening because Tenant::shutdown fires its cancellation token
early if the tenant is not fully attached by the time shutdown is
called, so the flush loop is shutdown by the time we try and flush.

## Summary of changes

- In the early-cancellation case, also set the shutdown mode to Hard to
skip trying to do a flush that will fail.
2024-07-03 13:13:06 +00:00
Alexander Bayandin
aae3876318 CI: update docker/* actions to latest versions (#7694)
## Problem

GitHub Actions complain that we use actions that depend on deprecated
Node 16:

```
Node.js 16 actions are deprecated. Please update the following actions to use Node.js 20: docker/setup-buildx-action@v2
```

But also, the latest `docker/setup-buildx-action` fails with the following
error:
```
/nvme/actions-runner/_work/_actions/docker/setup-buildx-action/v3/webpack:/docker-setup-buildx/node_modules/@actions/cache/lib/cache.js:175
            throw new Error(`Path Validation Error: Path(s) specified in the action for caching do(es) not exist, hence no cache is being saved.`);
^
Error: Path Validation Error: Path(s) specified in the action for caching do(es) not exist, hence no cache is being saved.
    at Object.rejected (/nvme/actions-runner/_work/_actions/docker/setup-buildx-action/v3/webpack:/docker-setup-buildx/node_modules/@actions/cache/lib/cache.js:175:1)
    at Generator.next (<anonymous>)
    at fulfilled (/nvme/actions-runner/_work/_actions/docker/setup-buildx-action/v3/webpack:/docker-setup-buildx/node_modules/@actions/cache/lib/cache.js:29:1)
```

We can work this around by setting `cache-binary: false` for `uses:
docker/setup-buildx-action@v3`

## Summary of changes
- Update `docker/setup-buildx-action` from `v2` to `v3`, set
`cache-binary: false`
- Update `docker/login-action` from `v2` to `v3`
- Update `docker/build-push-action` from `v4`/`v5` to `v6`
2024-07-03 12:19:13 +01:00
Heikki Linnakangas
dae55badf3 Simplify test_wal_page_boundary_start test (#8214)
All the code to ensure the WAL record lands at a page boundary was
unnecessary for reproducing the original problem. In fact, it's a pretty
basic test that checks that outbound replication (= neon as publisher)
still works after restarting the endpoint. It just used to be very
broken before commit 5ceccdc7de, which also added this test.

To verify that:

1. Check out commit f3af5f4660 (because the next commit, 7dd58e1449,
fixed the same bug in a different way, making it infeasible to revert
the bug fix in an easy way)
2. Revert the bug fix from commit 5ceccdc7de with this:

```
diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c
index 7debb6325..9f03bbd99 100644
--- a/pgxn/neon/walproposer_pg.c
+++ b/pgxn/neon/walproposer_pg.c
@@ -1437,8 +1437,10 @@ XLogWalPropWrite(WalProposer *wp, char *buf, Size nbytes, XLogRecPtr recptr)
 	 *
 	 * https://github.com/neondatabase/neon/issues/5749
 	 */
+#if 0
 	if (!wp->config->syncSafekeepers)
 		XLogUpdateWalBuffers(buf, recptr, nbytes);
+#endif

 	while (nbytes > 0)
 	{
```

3. Run the test_wal_page_boundary_start regression test. It fails, as
expected

4. Apply this commit to the test, and run it again. It still fails, with
the same error mentioned in issue #5749:

```
PG:2024-06-30 20:49:08.805 GMT [1248196] STATEMENT:  START_REPLICATION SLOT "sub1" LOGICAL 0/0 (proto_version '4', origin 'any', publication_names '"pub1"')
PG:2024-06-30 21:37:52.567 GMT [1467972] LOG:  starting logical decoding for slot "sub1"
PG:2024-06-30 21:37:52.567 GMT [1467972] DETAIL:  Streaming transactions committing after 0/1532330, reading WAL from 0/1531C78.
PG:2024-06-30 21:37:52.567 GMT [1467972] STATEMENT:  START_REPLICATION SLOT "sub1" LOGICAL 0/0 (proto_version '4', origin 'any', publication_names '"pub1"')
PG:2024-06-30 21:37:52.567 GMT [1467972] LOG:  logical decoding found consistent point at 0/1531C78
PG:2024-06-30 21:37:52.567 GMT [1467972] DETAIL:  There are no running transactions.
PG:2024-06-30 21:37:52.567 GMT [1467972] STATEMENT:  START_REPLICATION SLOT "sub1" LOGICAL 0/0 (proto_version '4', origin 'any', publication_names '"pub1"')
PG:2024-06-30 21:37:52.568 GMT [1467972] ERROR:  could not find record while sending logically-decoded data: invalid contrecord length 312 (expected 6) at 0/1533FD8
```
2024-07-03 13:22:53 +03:00
Alex Chi Z
4273309962 docker: add storage_scrubber into the docker image (#8239)
## Problem

We will run this tool in the k8s cluster. To make it accessible from
k8s, we need to package it into the docker image.

part of https://github.com/neondatabase/cloud/issues/14024
2024-07-03 09:48:56 +01:00
Konstantin Knizhnik
4a0c2aebe0 Add test for proper handling of connection failure to avoid 'cannot wait on socket event without a socket' error (#8231)
## Problem

See https://github.com/neondatabase/cloud/issues/14289
and PR #8210 

## Summary of changes

Add test for problems fixed in #8210

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2024-07-02 21:45:42 +03:00
Alex Chi Z
891cb5a9a8 fix(pageserver): comments about metadata key range (#8236)
Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-07-02 16:54:32 +00:00
John Spray
f5832329ac tense of errors (#8234)
I forgot a commit when merging
https://github.com/neondatabase/neon/pull/8177
2024-07-02 17:17:22 +01:00
Alexander Bayandin
6216df7765 CI(benchmarking): move psql queries to actions/run-python-test-set (#8230)
## Problem

Some of the Nightly benchmarks fail with the error
```
+ /tmp/neon/pg_install/v14/bin/pgbench --version
/tmp/neon/pg_install/v14/bin/pgbench: error while loading shared libraries: libpq.so.5: cannot open shared object file: No such file or directory
```
Originally, we added the `pgbench --version` call to check that
`pgbench` is installed and to fail earlier if it's not.
The failure happens because we don't have `LD_LIBRARY_PATH` set for
every job, and it also affects `psql` command.
We can move it to `actions/run-python-test-set` so as not to duplicate
code (as it already have `LD_LIBRARY_PATH` set).

## Summary of changes
- Remove `pgbench --version` call
- Move `psql` commands to common `actions/run-python-test-set`
2024-07-02 15:21:23 +00:00
Christian Schwarz
5de896e7d8 L0 flush: opt-in mechanism to bypass PageCache reads and writes (#8190)
part of https://github.com/neondatabase/neon/issues/7418

# Motivation

(reproducing #7418)

When we do an `InMemoryLayer::write_to_disk`, there is a tremendous
amount of random read I/O, as deltas from the ephemeral file (written in
LSN order) are written out to the delta layer in key order.

In benchmarks (https://github.com/neondatabase/neon/pull/7409) we can
see that this delta layer writing phase is substantially more expensive
than the initial ingest of data, and that within the delta layer write a
significant amount of the CPU time is spent traversing the page cache.

# High-Level Changes

Add a new mode for L0 flush that works as follows:

* Read the full ephemeral file into memory -- layers are much smaller
than total memory, so this is afforable
* Do all the random reads directly from this in memory buffer instead of
using blob IO/page cache/disk reads.
* Add a semaphore to limit how many timelines may concurrently do this
(limit peak memory).
* Make the semaphore configurable via PS config.

# Implementation Details

The new `BlobReaderRef::Slice` is a temporary hack until we can ditch
`blob_io` for `InMemoryLayer` => Plan for this is laid out in
https://github.com/neondatabase/neon/issues/8183

# Correctness

The correctness of this change is quite obvious to me: we do what we did
before (`blob_io`) but read from memory instead of going to disk.

The highest bug potential is in doing owned-buffers IO. I refactored the
API a bit in preliminary PR
https://github.com/neondatabase/neon/pull/8186 to make it less
error-prone, but still, careful review is requested.

# Performance

I manually measured single-client ingest performance from `pgbench -i
...`.

Full report:
https://neondatabase.notion.site/2024-06-28-benchmarking-l0-flush-performance-e98cff3807f94cb38f2054d8c818fe84?pvs=4

tl;dr:

* no speed improvements during ingest,  but
* significantly lower pressure on PS PageCache (eviction rate drops to
1/3)
  * (that's why I'm working on this)
* noticable but modestly lower CPU time

This is good enough for merging this PR because the changes require
opt-in.

We'll do more testing in staging & pre-prod.

# Stability / Monitoring

**memory consumption**: there's no _hard_ limit on max `InMemoryLayer`
size (aka "checkpoint distance") , hence there's no hard limit on the
memory allocation we do for flushing. In practice, we a) [log a
warning](23827c6b0d/pageserver/src/tenant/timeline.rs (L5741-L5743))
when we flush oversized layers, so we'd know which tenant is to blame
and b) if we were to put a hard limit in place, we would have to decide
what to do if there is an InMemoryLayer that exceeds the limit.
It seems like a better option to guarantee a max size for frozen layer,
dependent on `checkpoint_distance`. Then limit concurrency based on
that.

**metrics**: we do have the
[flush_time_histo](23827c6b0d/pageserver/src/tenant/timeline.rs (L3725-L3726)),
but that includes the wait time for the semaphore. We could add a
separate metric for the time spent after acquiring the semaphore, so one
can infer the wait time. Seems unnecessary at this point, though.
2024-07-02 16:29:09 +02:00
Arpad Müller
25eefdeb1f Add support for reading and writing compressed blobs (#8106)
Add support for reading and writing zstd-compressed blobs for use in
image layer generation, but maybe one day useful also for delta layers.
The reading of them is unconditional while the writing is controlled by
the `image_compression` config variable allowing for experiments.

For the on-disk format, we re-use some of the bitpatterns we currently
keep reserved for blobs larger than 256 MiB. This assumes that we have
never ever written any such large blobs to image layers.

After the preparation in #7852, we now are unable to read blobs with a
size larger than 256 MiB (or write them).

A non-goal of this PR is to come up with good heuristics of when to
compress a bitpattern. This is left for future work.

Parts of the PR were inspired by #7091.

cc  #7879

Part of #5431
2024-07-02 14:14:12 +00:00
Vlad Lazar
28929d9cfa pageserver: rate limit log for loads of layers visited (#8228)
## Problem
At high percentiles we see more than 800 layers being visited by the
read path. We need the tenant/timeline to investigate.

## Summary of changes
Add a rate limited log line when the average number of layers visited
per key is in the last specified histogram bucket.
I plan to use this to identify tenants in us-east-2 staging that exhibit
this behaviour. Will revert before next week's release.
2024-07-02 14:14:10 +01:00
Christian Schwarz
9b4b4bbf6f fix: noisy logging when download gets cancelled during shutdown (#8224)
Before this PR, during timeline shutdown, we'd occasionally see
log lines like this one:

```
2024-06-26T18:28:11.063402Z  INFO initial_size_calculation{tenant_id=$TENANT,shard_id=0000 timeline_id=$TIMELINE}:logical_size_calculation_task:get_or_maybe_download{layer=000000000000000000000000000000000000-000000067F0001A3950001C1630100000000__0000000D88265898}: layer file download failed, and caller has been cancelled: Cancelled, shutting down
Stack backtrace:
   0: <core::result::Result<T,F> as core::ops::try_trait::FromResidual<core::result::Result<core::convert::Infallible,E>>>::from_residual
             at /rustc/129f3b9964af4d4a709d1383930ade12dfe7c081/library/core/src/result.rs:1964:27
      pageserver::tenant::remote_timeline_client::RemoteTimelineClient::download_layer_file::{{closure}}
             at /home/nonroot/pageserver/src/tenant/remote_timeline_client.rs:531:13
      pageserver::tenant::storage_layer::layer::LayerInner::download_and_init::{{closure}}
             at /home/nonroot/pageserver/src/tenant/storage_layer/layer.rs:1136:14
      pageserver::tenant::storage_layer::layer::LayerInner::download_init_and_wait::{{closure}}::{{closure}}
             at /home/nonroot/pageserver/src/tenant/storage_layer/layer.rs:1082:74
```

We can eliminate the anyhow backtrace with no loss of information
because the conversion to anyhow::Error happens in exactly one place.

refs #7427
2024-07-02 13:13:27 +00:00
John Spray
1a0f545c16 pageserver: simpler, stricter config error handling (#8177)
## Problem

Tenant attachment has error paths for failures to write local
configuration, but these types of local storage I/O errors should be
considered fatal for the process. Related thread on an earlier PR that
touched this code:
https://github.com/neondatabase/neon/pull/7947#discussion_r1655134114

## Summary of changes

- Make errors writing tenant config fatal (abort process)
- When reading tenant config, make all I/O errors except ENOENT fatal
- Replace use of bare anyhow errors with `LoadConfigError`
2024-07-02 12:45:04 +00:00
Christian Schwarz
7dcdbaa25e remote_storage config: move handling of empty inline table {} to callers (#8193)
Before this PR, `RemoteStorageConfig::from_toml` would support
deserializing an
empty `{}` TOML inline table to a `None`, otherwise try `Some()`.

We can instead let
* in proxy: let clap derive handle the Option
* in PS & SK: assume that if the field is specified, it must be a valid
  RemtoeStorageConfig

(This PR started with a much simpler goal of factoring out the
`deserialize_item` function because I need that in another PR).
2024-07-02 12:53:08 +02:00
Konstantin Knizhnik
0497b99f3a Check status of connection after PQconnectStartParams (#8210)
## Problem

See https://github.com/neondatabase/cloud/issues/14289

## Summary of changes

Check connection status after calling PQconnectStartParams

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2024-07-02 06:56:10 +03:00
Vlad Lazar
9882ac8e06 docs: Graceful storage controller cluster restarts RFC (#7704)
RFC for "Graceful Restarts of Storage Controller Managed Clusters". 
Related https://github.com/neondatabase/neon/issues/7387
2024-07-01 18:44:28 +01:00
Heikki Linnakangas
0789160ffa tests: Make neon_xlogflush() flush all WAL, if you omit the LSN arg (#8215)
This makes it much more convenient to use in the common case that you
want to flush all the WAL. (Passing pg_current_wal_insert_lsn() as the
argument doesn't work for the same reasons as explained in the comments:
we need to be back off to the beginning of a page if the previous record
ended at page boundary.)

I plan to use this to fix the issue that Arseny Sher called out at
https://github.com/neondatabase/neon/pull/7288#discussion_r1660063852
2024-07-01 10:55:18 -05:00
Alexander Bayandin
9c32604aa9 CI(gather-rust-build-stats): fix build with libpq (#8219)
## Problem
I've missed setting `PQ_LIB_DIR` in
https://github.com/neondatabase/neon/pull/8206 in
`gather-rust-build-stats` job and it fails now:
```
  = note: /usr/bin/ld: cannot find -lpq
          collect2: error: ld returned 1 exit status
          

error: could not compile `storage_controller` (bin "storage_controller") due to 1 previous error
```

https://github.com/neondatabase/neon/actions/runs/9743960062/job/26888597735

## Summary of changes
- Set `PQ_LIB_DIR` for `gather-rust-build-stats` job
2024-07-01 16:42:23 +01:00
Alex Chi Z
b02aafdfda fix(pageserver): include aux file in basebackup only once (#8207)
Extracted from https://github.com/neondatabase/neon/pull/6560, currently
we include multiple copies of aux files in the basebackup.

## Summary of changes

Fix the loop.

Signed-off-by: Alex Chi Z <chi@neon.tech>
Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2024-07-01 14:36:49 +00:00
Alexander Bayandin
e823b92947 CI(build-tools): Remove libpq from build image (#8206)
## Problem
We use `build-tools` image as a base image to build other images, and it
has a pretty old `libpq-dev` installed (v13; it wasn't that old until I
removed system Postgres 14 from `build-tools` image in
https://github.com/neondatabase/neon/pull/6540)

## Summary of changes
- Remove `libpq-dev` from `build-tools` image
- Set `LD_LIBRARY_PATH` for tests (for different Postgres binaries that
we use, like psql and pgbench)
- Set `PQ_LIB_DIR` to build Storage Controller
- Set `LD_LIBRARY_PATH`/`DYLD_LIBRARY_PATH` in the Storage Controller
where it calls Postgres binaries
2024-07-01 13:11:55 +01:00
John Spray
aea5cfe21e pageserver: add metric pageserver_secondary_resident_physical_size (#8204)
## Problem

We lack visibility of how much local disk space is used by secondary
tenant locations

Close: https://github.com/neondatabase/neon/issues/8181

## Summary of changes

- Add `pageserver_secondary_resident_physical_size`, tagged by tenant
- Register & de-register label sets from SecondaryTenant
- Add+use wrappers in SecondaryDetail that update metrics when
adding+removing layers/timelines
2024-07-01 12:48:20 +01:00
Heikki Linnakangas
9ce193082a Restore running xacts from CLOG on replica startup (#7288)
We have one pretty serious MVCC visibility bug with hot standby
replicas. We incorrectly treat any transactions that are in progress
in the primary, when the standby is started, as aborted. That can
break MVCC for queries running concurrently in the standby. It can
also lead to hint bits being set incorrectly, and that damage can last
until the replica is restarted.

The fundamental bug was that we treated any replica start as starting
from a shut down server. The fix for that is straightforward: we need
to set 'wasShutdown = false' in InitWalRecovery() (see changes in the
postgres repo).

However, that introduces a new problem: with wasShutdown = false, the
standby will not open up for queries until it receives a running-xacts
WAL record from the primary. That's correct, and that's how Postgres
hot standby always works. But it's a problem for Neon, because:

* It changes the historical behavior for existing users. Currently,
  the standby immediately opens up for queries, so if they now need to
  wait, we can breka existing use cases that were working fine
  (assuming you don't hit the MVCC issues).

* The problem is much worse for Neon than it is for standalone
  PostgreSQL, because in Neon, we can start a replica from an
  arbitrary LSN. In standalone PostgreSQL, the replica always starts
  WAL replay from a checkpoint record, and the primary arranges things
  so that there is always a running-xacts record soon after each
  checkpoint record. You can still hit this issue with PostgreSQL if
  you have a transaction with lots of subtransactions running in the
  primary, but it's pretty rare in practice.

To mitigate that, we introduce another way to collect the
running-xacts information at startup, without waiting for the
running-xacts WAL record: We can the CLOG for XIDs that haven't been
marked as committed or aborted. It has limitations with
subtransactions too, but should mitigate the problem for most users.

See https://github.com/neondatabase/neon/issues/7236.

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2024-07-01 12:58:12 +03:00
Heikki Linnakangas
75c84c846a tests: Make neon_xlogflush() flush all WAL, if you omit the LSN arg
This makes it much more convenient to use in the common case that you
want to flush all the WAL. (Passing pg_current_wal_insert_lsn() as the
argument doesn't work for the same reasons as explained in the
comments: we need to be back off to the beginning of a page if the
previous record ended at page boundary.)

I plan to use this to fix the issue that Arseny Sher called out at
https://github.com/neondatabase/neon/pull/7288#discussion_r1660063852
2024-07-01 12:58:08 +03:00
Heikki Linnakangas
57535c039c tests: remove a leftover 'running' flag (#8216)
The 'running' boolean was replaced with a semaphore in commit
f0e2bb79b2, but this initialization was missed. Remove it so that if a
test tries to access it, you get an error rather than always claiming
that the endpoint is not running.

Spotted by Arseny at
https://github.com/neondatabase/neon/pull/7288#discussion_r1660068657
2024-07-01 11:23:31 +03:00
82 changed files with 3515 additions and 1010 deletions

View File

@@ -114,6 +114,7 @@ runs:
export PLATFORM=${PLATFORM:-github-actions-selfhosted}
export POSTGRES_DISTRIB_DIR=${POSTGRES_DISTRIB_DIR:-/tmp/neon/pg_install}
export DEFAULT_PG_VERSION=${PG_VERSION#v}
export LD_LIBRARY_PATH=${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/lib
if [ "${BUILD_TYPE}" = "remote" ]; then
export REMOTE_ENV=1
@@ -178,7 +179,15 @@ runs:
# Wake up the cluster if we use remote neon instance
if [ "${{ inputs.build_type }}" = "remote" ] && [ -n "${BENCHMARK_CONNSTR}" ]; then
${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin/psql ${BENCHMARK_CONNSTR} -c "SELECT version();"
QUERIES=("SELECT version()")
if [[ "${PLATFORM}" = "neon"* ]]; then
QUERIES+=("SHOW neon.tenant_id")
QUERIES+=("SHOW neon.timeline_id")
fi
for q in "${QUERIES[@]}"; do
${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin/psql ${BENCHMARK_CONNSTR} -c "${q}"
done
fi
# Run the tests.

View File

@@ -239,11 +239,6 @@ jobs:
path: /tmp/neon/
prefix: latest
- name: Add Postgres binaries to PATH
run: |
${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin/pgbench --version
echo "${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin" >> $GITHUB_PATH
- name: Create Neon Project
if: contains(fromJson('["neon-captest-new", "neon-captest-freetier", "neonvm-captest-new", "neonvm-captest-freetier"]'), matrix.platform)
id: create-neon-project
@@ -282,16 +277,6 @@ jobs:
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
QUERIES=("SELECT version()")
if [[ "${PLATFORM}" = "neon"* ]]; then
QUERIES+=("SHOW neon.tenant_id")
QUERIES+=("SHOW neon.timeline_id")
fi
for q in "${QUERIES[@]}"; do
psql ${CONNSTR} -c "${q}"
done
- name: Benchmark init
uses: ./.github/actions/run-python-test-set
with:
@@ -377,25 +362,12 @@ jobs:
path: /tmp/neon/
prefix: latest
- name: Add Postgres binaries to PATH
run: |
${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin/pgbench --version
echo "${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin" >> $GITHUB_PATH
- name: Set up Connection String
id: set-up-connstr
run: |
CONNSTR=${{ secrets.BENCHMARK_PGVECTOR_CONNSTR }}
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
QUERIES=("SELECT version()")
QUERIES+=("SHOW neon.tenant_id")
QUERIES+=("SHOW neon.timeline_id")
for q in "${QUERIES[@]}"; do
psql ${CONNSTR} -c "${q}"
done
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
- name: Benchmark pgvector hnsw indexing
uses: ./.github/actions/run-python-test-set
@@ -417,12 +389,12 @@ jobs:
test_selection: performance/test_perf_pgvector_queries.py
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600
extra_params: -m remote_cluster --timeout 21600
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
- name: Create Allure report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
@@ -477,11 +449,6 @@ jobs:
path: /tmp/neon/
prefix: latest
- name: Add Postgres binaries to PATH
run: |
${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin/pgbench --version
echo "${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin" >> $GITHUB_PATH
- name: Set up Connection String
id: set-up-connstr
run: |
@@ -503,16 +470,6 @@ jobs:
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
QUERIES=("SELECT version()")
if [[ "${PLATFORM}" = "neon"* ]]; then
QUERIES+=("SHOW neon.tenant_id")
QUERIES+=("SHOW neon.timeline_id")
fi
for q in "${QUERIES[@]}"; do
psql ${CONNSTR} -c "${q}"
done
- name: ClickBench benchmark
uses: ./.github/actions/run-python-test-set
with:
@@ -580,11 +537,6 @@ jobs:
path: /tmp/neon/
prefix: latest
- name: Add Postgres binaries to PATH
run: |
${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin/pgbench --version
echo "${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin" >> $GITHUB_PATH
- name: Get Connstring Secret Name
run: |
case "${PLATFORM}" in
@@ -613,16 +565,6 @@ jobs:
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
QUERIES=("SELECT version()")
if [[ "${PLATFORM}" = "neon"* ]]; then
QUERIES+=("SHOW neon.tenant_id")
QUERIES+=("SHOW neon.timeline_id")
fi
for q in "${QUERIES[@]}"; do
psql ${CONNSTR} -c "${q}"
done
- name: Run TPC-H benchmark
uses: ./.github/actions/run-python-test-set
with:
@@ -681,11 +623,6 @@ jobs:
path: /tmp/neon/
prefix: latest
- name: Add Postgres binaries to PATH
run: |
${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin/pgbench --version
echo "${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin" >> $GITHUB_PATH
- name: Set up Connection String
id: set-up-connstr
run: |
@@ -707,16 +644,6 @@ jobs:
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
QUERIES=("SELECT version()")
if [[ "${PLATFORM}" = "neon"* ]]; then
QUERIES+=("SHOW neon.tenant_id")
QUERIES+=("SHOW neon.timeline_id")
fi
for q in "${QUERIES[@]}"; do
psql ${CONNSTR} -c "${q}"
done
- name: Run user examples
uses: ./.github/actions/run-python-test-set
with:

View File

@@ -63,14 +63,16 @@ jobs:
mkdir -p /tmp/.docker-custom
echo DOCKER_CONFIG=/tmp/.docker-custom >> $GITHUB_ENV
- uses: docker/setup-buildx-action@v2
- uses: docker/setup-buildx-action@v3
with:
cache-binary: false
- uses: docker/login-action@v2
- uses: docker/login-action@v3
with:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
- uses: docker/build-push-action@v4
- uses: docker/build-push-action@v6
with:
context: .
provenance: false
@@ -82,6 +84,7 @@ jobs:
tags: neondatabase/build-tools:${{ inputs.image-tag }}-${{ matrix.arch }}
- name: Remove custom docker config directory
if: always()
run: |
rm -rf /tmp/.docker-custom

View File

@@ -335,6 +335,8 @@ jobs:
- name: Run cargo build
run: |
PQ_LIB_DIR=$(pwd)/pg_install/v16/lib
export PQ_LIB_DIR
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins --tests
# Do install *before* running rust tests because they might recompile the
@@ -383,6 +385,11 @@ jobs:
env:
NEXTEST_RETRIES: 3
run: |
PQ_LIB_DIR=$(pwd)/pg_install/v16/lib
export PQ_LIB_DIR
LD_LIBRARY_PATH=$(pwd)/pg_install/v16/lib
export LD_LIBRARY_PATH
#nextest does not yet support running doctests
cargo test --doc $CARGO_FLAGS $CARGO_FEATURES
@@ -744,14 +751,16 @@ jobs:
run: |
mkdir -p .docker-custom
echo DOCKER_CONFIG=$(pwd)/.docker-custom >> $GITHUB_ENV
- uses: docker/setup-buildx-action@v2
- uses: docker/setup-buildx-action@v3
with:
cache-binary: false
- uses: docker/login-action@v3
with:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
- uses: docker/build-push-action@v5
- uses: docker/build-push-action@v6
with:
context: .
build-args: |
@@ -822,11 +831,12 @@ jobs:
run: |
mkdir -p .docker-custom
echo DOCKER_CONFIG=$(pwd)/.docker-custom >> $GITHUB_ENV
- uses: docker/setup-buildx-action@v2
- uses: docker/setup-buildx-action@v3
with:
cache-binary: false
# Disable parallelism for docker buildkit.
# As we already build everything with `make -j$(nproc)`, running it in additional level of parallelisam blows up the Runner.
config-inline: |
buildkitd-config-inline: |
[worker.oci]
max-parallelism = 1
@@ -842,7 +852,7 @@ jobs:
password: ${{ secrets.AWS_SECRET_KEY_DEV }}
- name: Build compute-node image
uses: docker/build-push-action@v5
uses: docker/build-push-action@v6
with:
context: .
build-args: |
@@ -861,7 +871,7 @@ jobs:
- name: Build neon extensions test image
if: matrix.version == 'v16'
uses: docker/build-push-action@v5
uses: docker/build-push-action@v6
with:
context: .
build-args: |
@@ -882,7 +892,7 @@ jobs:
- name: Build compute-tools image
# compute-tools are Postgres independent, so build it only once
if: matrix.version == 'v16'
uses: docker/build-push-action@v5
uses: docker/build-push-action@v6
with:
target: compute-tools-image
context: .
@@ -1358,3 +1368,31 @@ jobs:
with:
from-tag: ${{ needs.build-build-tools-image.outputs.image-tag }}
secrets: inherit
# This job simplifies setting branch protection rules (in GitHub UI)
# by allowing to set only this job instead of listing many others.
# It also makes it easier to rename or parametrise jobs (using matrix)
# which requires changes in branch protection rules
#
# Note, that we can't add external check (like `neon-cloud-e2e`) we still need to use GitHub UI for that.
#
# https://github.com/neondatabase/neon/settings/branch_protection_rules
conclusion:
if: always()
# Format `needs` differently to make the list more readable.
# Usually we do `needs: [...]`
needs:
- check-codestyle-python
- check-codestyle-rust
- regress-tests
- test-images
runs-on: ubuntu-22.04
steps:
# The list of possible results:
# https://docs.github.com/en/actions/learn-github-actions/contexts#needs-context
- name: Fail the job if any of the dependencies do not succeed
run: exit 1
if: |
contains(needs.*.result, 'failure')
|| contains(needs.*.result, 'cancelled')
|| contains(needs.*.result, 'skipped')

View File

@@ -232,12 +232,19 @@ jobs:
- name: Run cargo build
run: |
PQ_LIB_DIR=$(pwd)/pg_install/v16/lib
export PQ_LIB_DIR
mold -run cargo build --locked $CARGO_FLAGS $CARGO_FEATURES --bins --tests -j$(nproc)
- name: Run cargo test
env:
NEXTEST_RETRIES: 3
run: |
PQ_LIB_DIR=$(pwd)/pg_install/v16/lib
export PQ_LIB_DIR
LD_LIBRARY_PATH=$(pwd)/pg_install/v16/lib
export LD_LIBRARY_PATH
cargo nextest run $CARGO_FEATURES -j$(nproc)
# Run separate tests for real S3
@@ -378,7 +385,7 @@ jobs:
run: make walproposer-lib -j$(nproc)
- name: Produce the build stats
run: cargo build --all --release --timings -j$(nproc)
run: PQ_LIB_DIR=$(pwd)/pg_install/v16/lib cargo build --all --release --timings -j$(nproc)
- name: Upload the build stats
id: upload-stats

144
.github/workflows/periodic_pagebench.yml vendored Normal file
View File

@@ -0,0 +1,144 @@
name: Periodic pagebench performance test on dedicated EC2 machine in eu-central-1 region
on:
schedule:
# * is a special character in YAML so you have to quote this string
# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
- cron: '0 18 * * *' # Runs at 6 PM UTC every day
workflow_dispatch: # Allows manual triggering of the workflow
inputs:
commit_hash:
type: string
description: 'The long neon repo commit hash for the system under test (pageserver) to be tested.'
required: false
default: ''
defaults:
run:
shell: bash -euo pipefail {0}
concurrency:
group: ${{ github.workflow }}
cancel-in-progress: false
jobs:
trigger_bench_on_ec2_machine_in_eu_central_1:
runs-on: [ self-hosted, gen3, small ]
container:
image: neondatabase/build-tools:pinned
credentials:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
options: --init
timeout-minutes: 360 # Set the timeout to 6 hours
env:
API_KEY: ${{ secrets.PERIODIC_PAGEBENCH_EC2_RUNNER_API_KEY }}
RUN_ID: ${{ github.run_id }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_EC2_US_TEST_RUNNER_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY : ${{ secrets.AWS_EC2_US_TEST_RUNNER_ACCESS_KEY_SECRET }}
AWS_DEFAULT_REGION : "eu-central-1"
AWS_INSTANCE_ID : "i-02a59a3bf86bc7e74"
steps:
- name: Show my own (github runner) external IP address - usefull for IP allowlisting
run: curl https://ifconfig.me
- name: Start EC2 instance and wait for the instance to boot up
run: |
aws ec2 start-instances --instance-ids $AWS_INSTANCE_ID
aws ec2 wait instance-running --instance-ids $AWS_INSTANCE_ID
sleep 60 # sleep some time to allow cloudinit and our API server to start up
- name: Determine public IP of the EC2 instance and set env variable EC2_MACHINE_URL_US
run: |
public_ip=$(aws ec2 describe-instances --instance-ids $AWS_INSTANCE_ID --query 'Reservations[*].Instances[*].PublicIpAddress' --output text)
echo "Public IP of the EC2 instance: $public_ip"
echo "EC2_MACHINE_URL_US=https://${public_ip}:8443" >> $GITHUB_ENV
- name: Determine commit hash
env:
INPUT_COMMIT_HASH: ${{ github.event.inputs.commit_hash }}
run: |
if [ -z "$INPUT_COMMIT_HASH" ]; then
echo "COMMIT_HASH=$(curl -s https://api.github.com/repos/neondatabase/neon/commits/main | jq -r '.sha')" >> $GITHUB_ENV
else
echo "COMMIT_HASH=$INPUT_COMMIT_HASH" >> $GITHUB_ENV
fi
- name: Start Bench with run_id
run: |
curl -k -X 'POST' \
"${EC2_MACHINE_URL_US}/start_test/${GITHUB_RUN_ID}" \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-H "Authorization: Bearer $API_KEY" \
-d "{\"neonRepoCommitHash\": \"${COMMIT_HASH}\"}"
- name: Poll Test Status
id: poll_step
run: |
status=""
while [[ "$status" != "failure" && "$status" != "success" ]]; do
response=$(curl -k -X 'GET' \
"${EC2_MACHINE_URL_US}/test_status/${GITHUB_RUN_ID}" \
-H 'accept: application/json' \
-H "Authorization: Bearer $API_KEY")
echo "Response: $response"
set +x
status=$(echo $response | jq -r '.status')
echo "Test status: $status"
if [[ "$status" == "failure" || "$status" == "success" || "$status" == "null" ]]; then
break
fi
if [[ "$status" == "too_many_runs" ]]; then
echo "Too many runs already running"
echo "too_many_runs=true" >> "$GITHUB_OUTPUT"
exit 1
fi
sleep 60 # Poll every 60 seconds
done
- name: Retrieve Test Logs
run: |
curl -k -X 'GET' \
"${EC2_MACHINE_URL_US}/test_log/${GITHUB_RUN_ID}" \
-H 'accept: application/gzip' \
-H "Authorization: Bearer $API_KEY" \
--output "test_log_${GITHUB_RUN_ID}.gz"
- name: Unzip Test Log and Print it into this job's log
run: |
gzip -d "test_log_${GITHUB_RUN_ID}.gz"
cat "test_log_${GITHUB_RUN_ID}"
- name: Create Allure report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
uses: slackapi/slack-github-action@v1
with:
channel-id: "C033QLM5P7D" # dev-staging-stream
slack-message: "Periodic pagebench testing on dedicated hardware: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
- name: Cleanup Test Resources
if: always()
run: |
curl -k -X 'POST' \
"${EC2_MACHINE_URL_US}/cleanup_test/${GITHUB_RUN_ID}" \
-H 'accept: application/json' \
-H "Authorization: Bearer $API_KEY" \
-d ''
- name: Stop EC2 instance and wait for the instance to be stopped
if: always() && steps.poll_step.outputs.too_many_runs != 'true'
run: |
aws ec2 stop-instances --instance-ids $AWS_INSTANCE_ID
aws ec2 wait instance-stopped --instance-ids $AWS_INSTANCE_ID

1
Cargo.lock generated
View File

@@ -6811,6 +6811,7 @@ dependencies = [
"tokio-stream",
"tokio-tar",
"tokio-util",
"toml_edit 0.19.10",
"tracing",
"tracing-error",
"tracing-subscriber",

View File

@@ -42,12 +42,13 @@ ARG CACHEPOT_BUCKET=neon-github-dev
COPY --from=pg-build /home/nonroot/pg_install/v14/include/postgresql/server pg_install/v14/include/postgresql/server
COPY --from=pg-build /home/nonroot/pg_install/v15/include/postgresql/server pg_install/v15/include/postgresql/server
COPY --from=pg-build /home/nonroot/pg_install/v16/include/postgresql/server pg_install/v16/include/postgresql/server
COPY --from=pg-build /home/nonroot/pg_install/v16/lib pg_install/v16/lib
COPY --chown=nonroot . .
# Show build caching stats to check if it was used in the end.
# Has to be the part of the same RUN since cachepot daemon is killed in the end of this RUN, losing the compilation stats.
RUN set -e \
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment" cargo build \
&& PQ_LIB_DIR=$(pwd)/pg_install/v16/lib RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment" cargo build \
--bin pg_sni_router \
--bin pageserver \
--bin pagectl \
@@ -56,6 +57,7 @@ RUN set -e \
--bin storage_controller \
--bin proxy \
--bin neon_local \
--bin storage_scrubber \
--locked --release \
&& cachepot -s
@@ -82,6 +84,7 @@ COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_broker
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_controller /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/neon_local /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_scrubber /usr/local/bin
COPY --from=pg-build /home/nonroot/pg_install/v14 /usr/local/v14/
COPY --from=pg-build /home/nonroot/pg_install/v15 /usr/local/v15/

View File

@@ -26,7 +26,6 @@ RUN set -e \
liblzma-dev \
libncurses5-dev \
libncursesw5-dev \
libpq-dev \
libreadline-dev \
libseccomp-dev \
libsqlite3-dev \

View File

@@ -873,9 +873,8 @@ impl ComputeNode {
Ok(())
}
// We could've wrapped this around `pg_ctl reload`, but right now we don't use
// `pg_ctl` for start / stop, so this just seems much easier to do as we already
// have opened connection to Postgres and superuser access.
// Wrapped this around `pg_ctl reload`, but right now we don't use
// `pg_ctl` for start / stop.
#[instrument(skip_all)]
fn pg_reload_conf(&self) -> Result<()> {
let pgctl_bin = Path::new(&self.pgbin).parent().unwrap().join("pg_ctl");

View File

@@ -489,7 +489,7 @@ pub fn handle_postgres_logs(stderr: std::process::ChildStderr) -> JoinHandle<()>
/// Read Postgres logs from `stderr` until EOF. Buffer is flushed on one of the following conditions:
/// - next line starts with timestamp
/// - EOF
/// - no new lines were written for the last second
/// - no new lines were written for the last 100 milliseconds
async fn handle_postgres_logs_async(stderr: tokio::process::ChildStderr) -> Result<()> {
let mut lines = tokio::io::BufReader::new(stderr).lines();
let timeout_duration = Duration::from_millis(100);

View File

@@ -325,11 +325,16 @@ impl LocalEnv {
}
}
pub fn pg_bin_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
Ok(self.pg_distrib_dir(pg_version)?.join("bin"))
pub fn pg_dir(&self, pg_version: u32, dir_name: &str) -> anyhow::Result<PathBuf> {
Ok(self.pg_distrib_dir(pg_version)?.join(dir_name))
}
pub fn pg_bin_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
self.pg_dir(pg_version, "bin")
}
pub fn pg_lib_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
Ok(self.pg_distrib_dir(pg_version)?.join("lib"))
self.pg_dir(pg_version, "lib")
}
pub fn pageserver_bin(&self) -> PathBuf {

View File

@@ -155,16 +155,16 @@ impl StorageController {
.expect("non-Unicode path")
}
/// Find the directory containing postgres binaries, such as `initdb` and `pg_ctl`
/// Find the directory containing postgres subdirectories, such `bin` and `lib`
///
/// This usually uses STORAGE_CONTROLLER_POSTGRES_VERSION of postgres, but will fall back
/// to other versions if that one isn't found. Some automated tests create circumstances
/// where only one version is available in pg_distrib_dir, such as `test_remote_extensions`.
pub async fn get_pg_bin_dir(&self) -> anyhow::Result<Utf8PathBuf> {
async fn get_pg_dir(&self, dir_name: &str) -> anyhow::Result<Utf8PathBuf> {
let prefer_versions = [STORAGE_CONTROLLER_POSTGRES_VERSION, 15, 14];
for v in prefer_versions {
let path = Utf8PathBuf::from_path_buf(self.env.pg_bin_dir(v)?).unwrap();
let path = Utf8PathBuf::from_path_buf(self.env.pg_dir(v, dir_name)?).unwrap();
if tokio::fs::try_exists(&path).await? {
return Ok(path);
}
@@ -172,11 +172,20 @@ impl StorageController {
// Fall through
anyhow::bail!(
"Postgres binaries not found in {}",
self.env.pg_distrib_dir.display()
"Postgres directory '{}' not found in {}",
dir_name,
self.env.pg_distrib_dir.display(),
);
}
pub async fn get_pg_bin_dir(&self) -> anyhow::Result<Utf8PathBuf> {
self.get_pg_dir("bin").await
}
pub async fn get_pg_lib_dir(&self) -> anyhow::Result<Utf8PathBuf> {
self.get_pg_dir("lib").await
}
/// Readiness check for our postgres process
async fn pg_isready(&self, pg_bin_dir: &Utf8Path) -> anyhow::Result<bool> {
let bin_path = pg_bin_dir.join("pg_isready");
@@ -229,12 +238,17 @@ impl StorageController {
.unwrap()
.join("storage_controller_db");
let pg_bin_dir = self.get_pg_bin_dir().await?;
let pg_lib_dir = self.get_pg_lib_dir().await?;
let pg_log_path = pg_data_path.join("postgres.log");
if !tokio::fs::try_exists(&pg_data_path).await? {
// Initialize empty database
let initdb_path = pg_bin_dir.join("initdb");
let mut child = Command::new(&initdb_path)
.envs(vec![
("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
])
.args(["-D", pg_data_path.as_ref()])
.spawn()
.expect("Failed to spawn initdb");
@@ -269,7 +283,10 @@ impl StorageController {
&self.env.base_data_dir,
pg_bin_dir.join("pg_ctl").as_std_path(),
db_start_args,
[],
vec![
("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
],
background_process::InitialPidFile::Create(self.postgres_pid_file()),
retry_timeout,
|| self.pg_isready(&pg_bin_dir),
@@ -324,7 +341,10 @@ impl StorageController {
&self.env.base_data_dir,
&self.env.storage_controller_bin(),
args,
[],
vec![
("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
],
background_process::InitialPidFile::Create(self.pid_file()),
retry_timeout,
|| async {

View File

@@ -0,0 +1,345 @@
# Graceful Restarts of Storage Controller Managed Clusters
## Summary
This RFC describes new storage controller APIs for draining and filling tenant shards from/on pageserver nodes.
It also covers how these new APIs should be used by an orchestrator (e.g. Ansible) in order to implement
graceful cluster restarts.
## Motivation
Pageserver restarts cause read availablity downtime for tenants.
For example pageserver-3 @ us-east-1 was unavailable for a randomly
picked tenant (which requested on-demand activation) for around 30 seconds
during the restart at 2024-04-03 16:37 UTC.
Note that lots of shutdowns on loaded pageservers do not finish within the
[10 second systemd enforced timeout](https://github.com/neondatabase/aws/blob/0a5280b383e43c063d43cbf87fa026543f6d6ad4/.github/ansible/systemd/pageserver.service#L16). This means we are shutting down without flushing ephemeral layers
and have to reingest data in order to serve requests after restarting, potentially making first request latencies worse.
This problem is not yet very acutely felt in storage controller managed pageservers since
tenant density is much lower there. However, we are planning on eventually migrating all
pageservers to storage controller management, so it makes sense to solve the issue proactively.
## Requirements
- Pageserver re-deployments cause minimal downtime for tenants
- The storage controller exposes HTTP API hooks for draining and filling tenant shards
from a given pageserver. Said hooks can be used by an orchestrator proces or a human operator.
- The storage controller exposes some HTTP API to cancel draining and filling background operations.
- Failures to drain or fill the node should not be fatal. In such cases, cluster restarts should proceed
as usual (with downtime).
- Progress of draining/filling is visible through metrics
## Non Goals
- Integration with the control plane
- Graceful restarts for large non-HA tenants.
## Impacted Components
- storage controller
- deployment orchestrator (i.e. Ansible)
- pageserver (indirectly)
## Terminology
** Draining ** is the process through which all tenant shards that can be migrated from a given pageserver
are distributed across the rest of the cluster.
** Filling ** is the symmetric opposite of draining. In this process tenant shards are migrated onto a given
pageserver until the cluster reaches a resonable, quiescent distribution of tenant shards across pageservers.
** Node scheduling policies ** act as constraints to the scheduler. For instance, when a
node is set in the `Paused` policy, no further shards will be scheduled on it.
** Node ** is a pageserver. Term is used interchangeably in this RFC.
** Deployment orchestrator ** is a generic term for whatever drives our deployments.
Currently, it's an Ansible playbook.
## Background
### Storage Controller Basics (skip if already familiar)
Fundamentally, the storage controller is a reconciler which aims to move from the observed mapping between pageservers and tenant shards to an intended mapping. Pageserver nodes and tenant shards metadata is durably persisted in a database, but note that the mapping between the two entities is not durably persisted. Instead, this mapping (*observed state*) is constructed at startup by sending `GET location_config` requests to registered pageservers.
An internal scheduler maps tenant shards to pageservers while respecting certain constraints. The result of scheduling is the *intent state*. When the intent state changes, a *reconciliation* will inform pageservers about the new assigment via `PUT location_config` requests and will notify the compute via the configured hook.
### Background Optimizations
The storage controller performs scheduling optimizations in the background. It will
migrate attachments to warm secondaries and replace secondaries in order to balance
the cluster out.
### Reconciliations Concurrency Limiting
There's a hard limit on the number of reconciles that the storage controller
can have in flight at any given time. To get an idea of scales, the limit is
128 at the time of writing.
## Implementation
Note: this section focuses on the core functionality of the graceful restart process.
It doesn't neccesarily describe the most efficient approach. Optimizations are described
separately in a later section.
### Overall Flow
This section describes how to implement graceful restarts from the perspective
of Ansible, the deployment orchestrator. Pageservers are already restarted sequentially.
The orchestrator shall implement the following epilogue and prologue steps for each
pageserver restart:
#### Prologue
The orchestrator shall first fetch the pageserver node id from the control plane or
the pageserver it aims to restart directly. Next, it issues an HTTP request
to the storage controller in order to start the drain of said pageserver node.
All error responses are retried with a short back-off. When a 202 (Accepted)
HTTP code is returned, the drain has started. Now the orchestrator polls the
node status endpoint exposed by the storage controller in order to await the
end of the drain process. When the `policy` field of the node status response
becomes `PauseForRestart`, the drain has completed and the orchestrator can
proceed with restarting the pageserver.
The prologue is subject to an overall timeout. It will have a value in the ballpark
of minutes. As storage controller managed pageservers become more loaded this timeout
will likely have to increase.
#### Epilogue
After restarting the pageserver, the orchestrator issues an HTTP request
to the storage controller to kick off the filling process. This API call
may be retried for all error codes with a short backoff. This also serves
as a synchronization primitive as the fill will be refused if the pageserver
has not yet re-attached to the storage controller. When a 202(Accepted) HTTP
code is returned, the fill has started. Now the orchestrator polls the node
status endpoint exposed by the storage controller in order to await the end of
the filling process. When the `policy` field of the node status response becomes
`Active`, the fill has completed and the orchestrator may proceed to the next pageserver.
Again, the epilogue is subject to an overall timeout. We can start off with
using the same timeout as for the prologue, but can also consider relying on
the storage controller's background optimizations with a shorter timeout.
In the case that the deployment orchestrator times out, it attempts to cancel
the fill. This operation shall be retried with a short back-off. If it ultimately
fails it will require manual intervention to set the nodes scheduling policy to
`NodeSchedulingPolicy::Active`. Not doing that is not immediately problematic,
but it constrains the scheduler as mentioned previously.
### Node Scheduling Policy State Machine
The state machine below encodes the behaviours discussed above and
the various failover situations described in a later section.
Assuming no failures and/or timeouts the flow should be:
`Active -> Draining -> PauseForRestart -> Active -> Filling -> Active`
```
Operator requested drain
+-----------------------------------------+
| |
+-------+-------+ +-------v-------+
| | | |
| Pause | +-----------> Draining +----------+
| | | | | |
+---------------+ | +-------+-------+ |
| | |
| | |
Drain requested| | |
| |Drain complete | Drain failed
| | | Cancelled/PS reattach/Storcon restart
| | |
+-------+-------+ | |
| | | |
+-------------+ Active <-----------+------------------+
| | | |
Fill requested | +---^---^-------+ |
| | | |
| | | |
| | | |
| Fill completed| | |
| | |PS reattach |
| | |after restart |
+-------v-------+ | | +-------v-------+
| | | | | |
| Filling +---------+ +-----------+PauseForRestart|
| | | |
+---------------+ +---------------+
```
### Draining/Filling APIs
The storage controller API to trigger the draining of a given node is:
`PUT /v1/control/node/:node_id/{drain,fill}`.
The following HTTP non-success return codes are used.
All of them are safely retriable from the perspective of the storage controller.
- 404: Requested node was not found
- 503: Requested node is known to the storage controller, but unavailable
- 412: Drain precondition failed: there is no other node to drain to or the node's schedulling policy forbids draining
- 409: A {drain, fill} is already in progress. Only one such background operation
is allowed per node.
When the drain is accepted and commenced a 202 HTTP code is returned.
Drains and fills shall be cancellable by the deployment orchestrator or a
human operator via: `DELETE /v1/control/node/:node_id/{drain,fill}`. A 200
response is returned when the cancelation is successful. Errors are retriable.
### Drain Process
Before accpeting a drain request the following validations is applied:
* Ensure that the node is known the storage controller
* Ensure that the schedulling policy is `NodeSchedulingPolicy::Active` or `NodeSchedulingPolicy::Pause`
* Ensure that another drain or fill is not already running on the node
* Ensure that a drain is possible (i.e. check that there is at least one
schedulable node to drain to)
After accepting the drain, the scheduling policy of the node is set to
`NodeSchedulingPolicy::Draining` and persisted in both memory and the database.
This disallows the optimizer from adding or removing shards from the node which
is desirable to avoid them racing.
Next, a separate Tokio task is spawned to manage the draining. For each tenant
shard attached to the node being drained, demote the node to a secondary and
attempt to schedule the node away. Scheduling might fail due to unsatisfiable
constraints, but that is fine. Draining is a best effort process since it might
not always be possible to cut over all shards.
Importantly, this task manages the concurrency of issued reconciles in order to
avoid drowning out the target pageservers and to allow other important reconciles
to proceed.
Once the triggered reconciles have finished or timed out, set the node's scheduling
policy to `NodeSchedulingPolicy::PauseForRestart` to signal the end of the drain.
A note on non HA tenants: These tenants do not have secondaries, so by the description
above, they would not be migrated. It makes sense to skip them (especially the large ones)
since, depending on tenant size, this might be more disruptive than the restart since the
pageserver we've moved to do will need to on-demand download the entire working set for the tenant.
We can consider expanding to small non-HA tenants in the future.
### Fill Process
Before accpeting a fill request the following validations is applied:
* Ensure that the node is known the storage controller
* Ensure that the schedulling policy is `NodeSchedulingPolicy::Active`.
This is the only acceptable policy for the fill starting state. When a node re-attaches,
it set the scheduling policy to `NodeSchedulingPolicy::Active` if it was equal to
`NodeSchedulingPolicy::PauseForRestart` or `NodeSchedulingPolicy::Draining` (possible end states for a node drain).
* Ensure that another drain or fill is not already running on the node
After accepting the drain, the scheduling policy of the node is set to
`NodeSchedulingPolicy::Filling` and persisted in both memory and the database.
This disallows the optimizer from adding or removing shards from the node which
is desirable to avoid them racing.
Next, a separate Tokio task is spawned to manage the draining. For each tenant
shard where the filled node is a secondary, promote the secondary. This is done
until we run out of shards or the counts of attached shards become balanced across
the cluster.
Like for draining, the concurrency of spawned reconciles is limited.
### Failure Modes & Handling
Failures are generally handled by transition back into the `Active`
(neutral) state. This simplifies the implementation greatly at the
cost of adding transitions to the state machine. For example, we
could detect the `Draining` state upon restart and proceed with a drain,
but how should the storage controller know that's what the orchestrator
needs still?
#### Storage Controller Crash
When the storage controller starts up reset the node scheduling policy
of all nodes in states `Draining`, `Filling` or `PauseForRestart` to
`Active`. The rationale is that when the storage controller restarts,
we have lost context of what the deployment orchestrator wants. It also
has the benefit of making things easier to reason about.
#### Pageserver Crash During Drain
The pageserver will attempt to re-attach during restart at which
point the node scheduling policy will be set back to `Active`, thus
reenabling the scheduler to use the node.
#### Non-drained Pageserver Crash During Drain
What should happen when a pageserver we are draining to crashes during the
process. Two reasonable options are: cancel the drain and focus on the failover
*or* do both, but prioritise failover. Since the number of concurrent reconciles
produced by drains/fills are limited, we get the later behaviour for free.
My suggestion is we take this approach, but the cancellation option is trivial
to implement as well.
#### Pageserver Crash During Fill
The pageserver will attempt to re-attach during restart at which
point the node scheduling policy will be set back to `Active`, thus
reenabling the scheduler to use the node.
#### Pageserver Goes unavailable During Drain/Fill
The drain and fill jobs handle this by stopping early. When the pageserver
is detected as online by storage controller heartbeats, reset its scheduling
policy to `Active`. If a restart happens instead, see the pageserver crash
failure mode.
#### Orchestrator Drain Times Out
Orchestrator will still proceed with the restart.
When the pageserver re-attaches, the scheduling policy is set back to
`Active`.
#### Orchestrator Fill Times Out
Orchestrator will attempt to cancel the fill operation. If that fails,
the fill will continue until it quiesces and the node will be left
in the `Filling` scheduling policy. This hinders the scheduler, but is
otherwise harmless. A human operator can handle this by setting the scheduling
policy to `Active`, or we can bake in a fill timeout into the storage controller.
## Optimizations
### Location Warmth
When cutting over to a secondary, the storage controller will wait for it to
become "warm" (i.e. download enough of the tenants data). This means that some
reconciliations can take significantly longer than others and hold up precious
reconciliations units. As an optimization, the drain stage can only cut over
tenants that are already "warm". Similarly, the fill stage can prioritise the
"warmest" tenants in the fill.
Given that the number of tenants by the storage controller will be fairly low
for the foreseable future, the first implementation could simply query the tenants
for secondary status. This doesn't scale well with increasing tenant counts, so
eventually we will need new pageserver API endpoints to report the sets of
"warm" and "cold" nodes.
## Alternatives Considered
### Draining and Filling Purely as Scheduling Constraints
At its core, the storage controller is a big background loop that detects changes
in the environment and reacts on them. One could express draining and filling
of nodes purely in terms of constraining the scheduler (as opposed to having
such background tasks).
While theoretically nice, I think that's harder to implement and more importantly operate and reason about.
Consider cancellation of a drain/fill operation. We would have to update the scheduler state, create
an entirely new schedule (intent state) and start work on applying that. It gets trickier if we wish
to cancel the reconciliation tasks spawned by drain/fill nodes. How would we know which ones belong
to the conceptual drain/fill? One could add labels to reconciliations, but it gets messy in my opinion.
It would also mean that reconciliations themselves have side effects that persist in the database
(persist something to the databse when the drain is done), which I'm not conceptually fond of.
## Proof of Concept
This RFC is accompanied by a POC which implements nearly everything mentioned here
apart from the optimizations and some of the failure handling:
https://github.com/neondatabase/neon/pull/7682

View File

@@ -29,7 +29,7 @@ pub const KEY_SIZE: usize = 18;
/// See [`Key::to_i128`] for more information on the encoding.
pub const METADATA_KEY_SIZE: usize = 16;
/// The key prefix start range for the metadata keys. All keys with the first byte >= 0x40 is a metadata key.
/// The key prefix start range for the metadata keys. All keys with the first byte >= 0x60 is a metadata key.
pub const METADATA_KEY_BEGIN_PREFIX: u8 = 0x60;
pub const METADATA_KEY_END_PREFIX: u8 = 0x7F;

View File

@@ -17,6 +17,16 @@ pub struct KeySpace {
pub ranges: Vec<Range<Key>>,
}
impl std::fmt::Display for KeySpace {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[")?;
for range in &self.ranges {
write!(f, "{}..{},", range.start, range.end)?;
}
write!(f, "]")
}
}
/// A wrapper type for sparse keyspaces.
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct SparseKeySpace(pub KeySpace);

View File

@@ -432,6 +432,24 @@ pub enum CompactionAlgorithm {
Tiered,
}
#[derive(
Debug,
Clone,
Copy,
PartialEq,
Eq,
Serialize,
Deserialize,
strum_macros::FromRepr,
strum_macros::EnumString,
)]
#[strum(serialize_all = "kebab-case")]
pub enum ImageCompressionAlgorithm {
/// Zstandard compression. Level 0 means and None mean the same (default level). Levels can be negative as well.
/// For details, see the [manual](http://facebook.github.io/zstd/zstd_manual.html).
Zstd { level: Option<i8> },
}
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
pub struct CompactionAlgorithmSettings {
pub kind: CompactionAlgorithm,
@@ -643,6 +661,16 @@ pub struct TimelineInfo {
pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
pub current_logical_size_non_incremental: Option<u64>,
/// How many bytes of WAL are within this branch's pitr_interval. If the pitr_interval goes
/// beyond the branch's branch point, we only count up to the branch point.
pub pitr_history_size: u64,
/// Whether this branch's branch point is within its ancestor's PITR interval (i.e. any
/// ancestor data used by this branch would have been retained anyway). If this is false, then
/// this branch may be imposing a cost on the ancestor by causing it to retain layers that it would
/// otherwise be able to GC.
pub within_ancestor_pitr: bool,
pub timeline_dir_layer_file_size_sum: Option<u64>,
pub wal_source_connstr: Option<String>,

View File

@@ -1,6 +1,5 @@
use std::{fmt::Debug, num::NonZeroUsize, str::FromStr, time::Duration};
use anyhow::bail;
use aws_sdk_s3::types::StorageClass;
use camino::Utf8PathBuf;
@@ -176,20 +175,8 @@ fn serialize_storage_class<S: serde::Serializer>(
impl RemoteStorageConfig {
pub const DEFAULT_TIMEOUT: Duration = std::time::Duration::from_secs(120);
pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<Option<RemoteStorageConfig>> {
let document: toml_edit::Document = match toml {
toml_edit::Item::Table(toml) => toml.clone().into(),
toml_edit::Item::Value(toml_edit::Value::InlineTable(toml)) => {
toml.clone().into_table().into()
}
_ => bail!("toml not a table or inline table"),
};
if document.is_empty() {
return Ok(None);
}
Ok(Some(toml_edit::de::from_document(document)?))
pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<RemoteStorageConfig> {
Ok(utils::toml_edit_ext::deserialize_item(toml)?)
}
}
@@ -197,7 +184,7 @@ impl RemoteStorageConfig {
mod tests {
use super::*;
fn parse(input: &str) -> anyhow::Result<Option<RemoteStorageConfig>> {
fn parse(input: &str) -> anyhow::Result<RemoteStorageConfig> {
let toml = input.parse::<toml_edit::Document>().unwrap();
RemoteStorageConfig::from_toml(toml.as_item())
}
@@ -207,7 +194,7 @@ mod tests {
let input = "local_path = '.'
timeout = '5s'";
let config = parse(input).unwrap().expect("it exists");
let config = parse(input).unwrap();
assert_eq!(
config,
@@ -229,7 +216,7 @@ timeout = '5s'";
timeout = '7s'
";
let config = parse(toml).unwrap().expect("it exists");
let config = parse(toml).unwrap();
assert_eq!(
config,
@@ -257,7 +244,7 @@ timeout = '5s'";
timeout = '7s'
";
let config = parse(toml).unwrap().expect("it exists");
let config = parse(toml).unwrap();
assert_eq!(
config,

View File

@@ -40,6 +40,7 @@ thiserror.workspace = true
tokio.workspace = true
tokio-tar.workspace = true
tokio-util.workspace = true
toml_edit.workspace = true
tracing.workspace = true
tracing-error.workspace = true
tracing-subscriber = { workspace = true, features = ["json", "registry"] }

View File

@@ -94,6 +94,8 @@ pub mod env;
pub mod poison;
pub mod toml_edit_ext;
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
///
/// we have several cases:

View File

@@ -0,0 +1,22 @@
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("item is not a document")]
ItemIsNotADocument,
#[error(transparent)]
Serde(toml_edit::de::Error),
}
pub fn deserialize_item<T>(item: &toml_edit::Item) -> Result<T, Error>
where
T: serde::de::DeserializeOwned,
{
let document: toml_edit::Document = match item {
toml_edit::Item::Table(toml) => toml.clone().into(),
toml_edit::Item::Value(toml_edit::Value::InlineTable(toml)) => {
toml.clone().into_table().into()
}
_ => return Err(Error::ItemIsNotADocument),
};
toml_edit::de::from_document(document).map_err(Error::Serde)
}

View File

@@ -178,7 +178,7 @@ async fn main() -> anyhow::Result<()> {
let toml_item = toml_document
.get("remote_storage")
.expect("need remote_storage");
let config = RemoteStorageConfig::from_toml(toml_item)?.expect("incomplete config");
let config = RemoteStorageConfig::from_toml(toml_item)?;
let storage = remote_storage::GenericRemoteStorage::from_config(&config);
let cancel = CancellationToken::new();
storage

View File

@@ -348,35 +348,36 @@ where
self.add_rel(rel, rel).await?;
}
}
for (path, content) in self
.timeline
.list_aux_files(self.lsn, self.ctx)
.await
.map_err(|e| BasebackupError::Server(e.into()))?
{
if path.starts_with("pg_replslot") {
let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN;
let restart_lsn = Lsn(u64::from_le_bytes(
content[offs..offs + 8].try_into().unwrap(),
));
info!("Replication slot {} restart LSN={}", path, restart_lsn);
min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn);
} else if path == "pg_logical/replorigin_checkpoint" {
// replorigin_checkoint is written only on compute shutdown, so it contains
// deteriorated values. So we generate our own version of this file for the particular LSN
// based on information about replorigins extracted from transaction commit records.
// In future we will not generate AUX record for "pg_logical/replorigin_checkpoint" at all,
// but now we should handle (skip) it for backward compatibility.
continue;
}
let header = new_tar_header(&path, content.len() as u64)?;
self.ar
.append(&header, &*content)
.await
.context("could not add aux file to basebackup tarball")?;
}
}
for (path, content) in self
.timeline
.list_aux_files(self.lsn, self.ctx)
.await
.map_err(|e| BasebackupError::Server(e.into()))?
{
if path.starts_with("pg_replslot") {
let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN;
let restart_lsn = Lsn(u64::from_le_bytes(
content[offs..offs + 8].try_into().unwrap(),
));
info!("Replication slot {} restart LSN={}", path, restart_lsn);
min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn);
} else if path == "pg_logical/replorigin_checkpoint" {
// replorigin_checkoint is written only on compute shutdown, so it contains
// deteriorated values. So we generate our own version of this file for the particular LSN
// based on information about replorigins extracted from transaction commit records.
// In future we will not generate AUX record for "pg_logical/replorigin_checkpoint" at all,
// but now we should handle (skip) it for backward compatibility.
continue;
}
let header = new_tar_header(&path, content.len() as u64)?;
self.ar
.append(&header, &*content)
.await
.context("could not add aux file to basebackup tarball")?;
}
if min_restart_lsn != Lsn::MAX {
info!(
"Min restart LSN for logical replication is {}",

View File

@@ -421,6 +421,10 @@ fn start_pageserver(
background_jobs_can_start: background_jobs_barrier.clone(),
};
info!(config=?conf.l0_flush, "using l0_flush config");
let l0_flush_global_state =
pageserver::l0_flush::L0FlushGlobalState::new(conf.l0_flush.clone());
// Scan the local 'tenants/' directory and start loading the tenants
let deletion_queue_client = deletion_queue.new_client();
let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
@@ -429,6 +433,7 @@ fn start_pageserver(
broker_client: broker_client.clone(),
remote_storage: remote_storage.clone(),
deletion_queue_client,
l0_flush_global_state,
},
order,
shutdown_pageserver.clone(),

View File

@@ -5,7 +5,7 @@
//! See also `settings.md` for better description on every parameter.
use anyhow::{anyhow, bail, ensure, Context, Result};
use pageserver_api::shard::TenantShardId;
use pageserver_api::{models::ImageCompressionAlgorithm, shard::TenantShardId};
use remote_storage::{RemotePath, RemoteStorageConfig};
use serde;
use serde::de::IntoDeserializer;
@@ -30,11 +30,11 @@ use utils::{
logging::LogFormat,
};
use crate::tenant::timeline::GetVectoredImpl;
use crate::tenant::vectored_blob_io::MaxVectoredReadBytes;
use crate::tenant::{config::TenantConfOpt, timeline::GetImpl};
use crate::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
use crate::{disk_usage_eviction_task::DiskUsageEvictionTaskConfig, virtual_file::io_engine};
use crate::{l0_flush::L0FlushConfig, tenant::timeline::GetVectoredImpl};
use crate::{tenant::config::TenantConf, virtual_file};
use crate::{TENANT_HEATMAP_BASENAME, TENANT_LOCATION_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX};
@@ -50,6 +50,7 @@ pub mod defaults {
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
DEFAULT_PG_LISTEN_PORT,
};
use pageserver_api::models::ImageCompressionAlgorithm;
pub use storage_broker::DEFAULT_ENDPOINT as BROKER_DEFAULT_ENDPOINT;
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
@@ -90,6 +91,8 @@ pub mod defaults {
pub const DEFAULT_MAX_VECTORED_READ_BYTES: usize = 128 * 1024; // 128 KiB
pub const DEFAULT_IMAGE_COMPRESSION: Option<ImageCompressionAlgorithm> = None;
pub const DEFAULT_VALIDATE_VECTORED_GET: bool = true;
pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0;
@@ -159,7 +162,7 @@ pub mod defaults {
#ephemeral_bytes_per_memory_kb = {DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB}
[remote_storage]
#[remote_storage]
"#
);
@@ -285,12 +288,16 @@ pub struct PageServerConf {
pub validate_vectored_get: bool,
pub image_compression: Option<ImageCompressionAlgorithm>,
/// How many bytes of ephemeral layer content will we allow per kilobyte of RAM. When this
/// is exceeded, we start proactively closing ephemeral layers to limit the total amount
/// of ephemeral data.
///
/// Setting this to zero disables limits on total ephemeral layer size.
pub ephemeral_bytes_per_memory_kb: usize,
pub l0_flush: L0FlushConfig,
}
/// We do not want to store this in a PageServerConf because the latter may be logged
@@ -395,7 +402,11 @@ struct PageServerConfigBuilder {
validate_vectored_get: BuilderValue<bool>,
image_compression: BuilderValue<Option<ImageCompressionAlgorithm>>,
ephemeral_bytes_per_memory_kb: BuilderValue<usize>,
l0_flush: BuilderValue<L0FlushConfig>,
}
impl PageServerConfigBuilder {
@@ -482,8 +493,10 @@ impl PageServerConfigBuilder {
max_vectored_read_bytes: Set(MaxVectoredReadBytes(
NonZeroUsize::new(DEFAULT_MAX_VECTORED_READ_BYTES).unwrap(),
)),
image_compression: Set(DEFAULT_IMAGE_COMPRESSION),
validate_vectored_get: Set(DEFAULT_VALIDATE_VECTORED_GET),
ephemeral_bytes_per_memory_kb: Set(DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
l0_flush: Set(L0FlushConfig::default()),
}
}
}
@@ -667,10 +680,18 @@ impl PageServerConfigBuilder {
self.validate_vectored_get = BuilderValue::Set(value);
}
pub fn get_image_compression(&mut self, value: Option<ImageCompressionAlgorithm>) {
self.image_compression = BuilderValue::Set(value);
}
pub fn get_ephemeral_bytes_per_memory_kb(&mut self, value: usize) {
self.ephemeral_bytes_per_memory_kb = BuilderValue::Set(value);
}
pub fn l0_flush(&mut self, value: L0FlushConfig) {
self.l0_flush = BuilderValue::Set(value);
}
pub fn build(self) -> anyhow::Result<PageServerConf> {
let default = Self::default_values();
@@ -727,7 +748,9 @@ impl PageServerConfigBuilder {
get_impl,
max_vectored_read_bytes,
validate_vectored_get,
image_compression,
ephemeral_bytes_per_memory_kb,
l0_flush,
}
CUSTOM LOGIC
{
@@ -918,7 +941,7 @@ impl PageServerConf {
"http_auth_type" => builder.http_auth_type(parse_toml_from_str(key, item)?),
"pg_auth_type" => builder.pg_auth_type(parse_toml_from_str(key, item)?),
"remote_storage" => {
builder.remote_storage_config(RemoteStorageConfig::from_toml(item)?)
builder.remote_storage_config(Some(RemoteStorageConfig::from_toml(item).context("remote_storage")?))
}
"tenant_config" => {
t_conf = TenantConfOpt::try_from(item.to_owned()).context(format!("failed to parse: '{key}'"))?;
@@ -946,7 +969,7 @@ impl PageServerConf {
builder.metric_collection_endpoint(Some(endpoint));
},
"metric_collection_bucket" => {
builder.metric_collection_bucket(RemoteStorageConfig::from_toml(item)?)
builder.metric_collection_bucket(Some(RemoteStorageConfig::from_toml(item)?))
}
"synthetic_size_calculation_interval" =>
builder.synthetic_size_calculation_interval(parse_toml_duration(key, item)?),
@@ -1004,9 +1027,15 @@ impl PageServerConf {
"validate_vectored_get" => {
builder.get_validate_vectored_get(parse_toml_bool("validate_vectored_get", item)?)
}
"image_compression" => {
builder.get_image_compression(Some(parse_toml_from_str("image_compression", item)?))
}
"ephemeral_bytes_per_memory_kb" => {
builder.get_ephemeral_bytes_per_memory_kb(parse_toml_u64("ephemeral_bytes_per_memory_kb", item)? as usize)
}
"l0_flush" => {
builder.l0_flush(utils::toml_edit_ext::deserialize_item(item).context("l0_flush")?)
}
_ => bail!("unrecognized pageserver option '{key}'"),
}
}
@@ -1088,8 +1117,10 @@ impl PageServerConf {
NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES)
.expect("Invalid default constant"),
),
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
l0_flush: L0FlushConfig::default(),
}
}
}
@@ -1328,7 +1359,9 @@ background_task_maximum_delay = '334 s'
.expect("Invalid default constant")
),
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
l0_flush: L0FlushConfig::default(),
},
"Correct defaults should be used when no config values are provided"
);
@@ -1401,7 +1434,9 @@ background_task_maximum_delay = '334 s'
.expect("Invalid default constant")
),
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
l0_flush: L0FlushConfig::default(),
},
"Should be able to parse all basic config values correctly"
);
@@ -1681,6 +1716,19 @@ threshold = "20m"
}
}
#[test]
fn empty_remote_storage_is_error() {
let tempdir = tempdir().unwrap();
let (workdir, _) = prepare_fs(&tempdir).unwrap();
let input = r#"
remote_storage = {}
"#;
let doc = toml_edit::Document::from_str(input).unwrap();
let err = PageServerConf::parse_and_validate(&doc, &workdir)
.expect_err("empty remote_storage field should fail, don't specify it if you want no remote_storage");
assert!(format!("{err}").contains("remote_storage"), "{err}");
}
fn prepare_fs(tempdir: &Utf8TempDir) -> anyhow::Result<(Utf8PathBuf, Utf8PathBuf)> {
let tempdir_path = tempdir.path();

View File

@@ -227,7 +227,7 @@ impl From<UpsertLocationError> for ApiError {
BadRequest(e) => ApiError::BadRequest(e),
Unavailable(_) => ApiError::ShuttingDown,
e @ InProgress => ApiError::Conflict(format!("{e}")),
Flush(e) | Other(e) => ApiError::InternalServerError(e),
Flush(e) | InternalError(e) => ApiError::InternalServerError(e),
}
}
}
@@ -406,6 +406,8 @@ async fn build_timeline_info_common(
let walreceiver_status = timeline.walreceiver_status();
let (pitr_history_size, within_ancestor_pitr) = timeline.get_pitr_history_stats();
let info = TimelineInfo {
tenant_id: timeline.tenant_shard_id,
timeline_id: timeline.timeline_id,
@@ -426,6 +428,8 @@ async fn build_timeline_info_common(
directory_entries_counts: timeline.get_directory_metrics().to_vec(),
current_physical_size,
current_logical_size_non_incremental: None,
pitr_history_size,
within_ancestor_pitr,
timeline_dir_layer_file_size_sum: None,
wal_source_connstr,
last_received_msg_lsn,
@@ -1296,7 +1300,7 @@ async fn update_tenant_config_handler(
crate::tenant::Tenant::persist_tenant_config(state.conf, &tenant_shard_id, &location_conf)
.await
.map_err(ApiError::InternalServerError)?;
.map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?;
tenant.set_new_tenant_config(new_tenant_conf);
json_response(StatusCode::OK, ())

View File

@@ -0,0 +1,46 @@
use std::{num::NonZeroUsize, sync::Arc};
use crate::tenant::ephemeral_file;
#[derive(Default, Debug, PartialEq, Eq, Clone, serde::Deserialize)]
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
pub enum L0FlushConfig {
#[default]
PageCached,
#[serde(rename_all = "snake_case")]
Direct { max_concurrency: NonZeroUsize },
}
#[derive(Clone)]
pub struct L0FlushGlobalState(Arc<Inner>);
pub(crate) enum Inner {
PageCached,
Direct { semaphore: tokio::sync::Semaphore },
}
impl L0FlushGlobalState {
pub fn new(config: L0FlushConfig) -> Self {
match config {
L0FlushConfig::PageCached => Self(Arc::new(Inner::PageCached)),
L0FlushConfig::Direct { max_concurrency } => {
let semaphore = tokio::sync::Semaphore::new(max_concurrency.get());
Self(Arc::new(Inner::Direct { semaphore }))
}
}
}
pub(crate) fn inner(&self) -> &Arc<Inner> {
&self.0
}
}
impl L0FlushConfig {
pub(crate) fn prewarm_on_write(&self) -> ephemeral_file::PrewarmPageCacheOnWrite {
use L0FlushConfig::*;
match self {
PageCached => ephemeral_file::PrewarmPageCacheOnWrite::Yes,
Direct { .. } => ephemeral_file::PrewarmPageCacheOnWrite::No,
}
}
}

View File

@@ -11,6 +11,7 @@ pub mod deletion_queue;
pub mod disk_usage_eviction_task;
pub mod http;
pub mod import_datadir;
pub mod l0_flush;
pub use pageserver_api::keyspace;
pub mod aux_file;
pub mod metrics;

View File

@@ -8,7 +8,7 @@ use metrics::{
};
use once_cell::sync::Lazy;
use pageserver_api::shard::TenantShardId;
use strum::{EnumCount, IntoEnumIterator, VariantNames};
use strum::{EnumCount, VariantNames};
use strum_macros::{EnumVariantNames, IntoStaticStr};
use tracing::warn;
use utils::id::TimelineId;
@@ -464,6 +464,24 @@ static LAST_RECORD_LSN: Lazy<IntGaugeVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
static PITR_HISTORY_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_pitr_history_size",
"Data written since PITR cutoff on this timeline",
&["tenant_id", "shard_id", "timeline_id"]
)
.expect("failed to define a metric")
});
static TIMELINE_ARCHIVE_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_archive_size",
"Timeline's logical size if it is considered eligible for archival (outside PITR window), else zero",
&["tenant_id", "shard_id", "timeline_id"]
)
.expect("failed to define a metric")
});
static STANDBY_HORIZON: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_standby_horizon",
@@ -476,7 +494,7 @@ static STANDBY_HORIZON: Lazy<IntGaugeVec> = Lazy::new(|| {
static RESIDENT_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_resident_physical_size",
"The size of the layer files present in the pageserver's filesystem.",
"The size of the layer files present in the pageserver's filesystem, for attached locations.",
&["tenant_id", "shard_id", "timeline_id"]
)
.expect("failed to define a metric")
@@ -1076,21 +1094,12 @@ pub(crate) mod virtual_file_io_engine {
});
}
#[derive(Debug)]
struct GlobalAndPerTimelineHistogram {
global: Histogram,
per_tenant_timeline: Histogram,
}
impl GlobalAndPerTimelineHistogram {
fn observe(&self, value: f64) {
self.global.observe(value);
self.per_tenant_timeline.observe(value);
}
}
struct GlobalAndPerTimelineHistogramTimer<'a, 'c> {
h: &'a GlobalAndPerTimelineHistogram,
global_metric: &'a Histogram,
// Optional because not all op types are tracked per-timeline
timeline_metric: Option<&'a Histogram>,
ctx: &'c RequestContext,
start: std::time::Instant,
op: SmgrQueryType,
@@ -1121,7 +1130,10 @@ impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> {
elapsed
}
};
self.h.observe(ex_throttled.as_secs_f64());
self.global_metric.observe(ex_throttled.as_secs_f64());
if let Some(timeline_metric) = self.timeline_metric {
timeline_metric.observe(ex_throttled.as_secs_f64());
}
}
}
@@ -1146,7 +1158,8 @@ pub enum SmgrQueryType {
#[derive(Debug)]
pub(crate) struct SmgrQueryTimePerTimeline {
metrics: [GlobalAndPerTimelineHistogram; SmgrQueryType::COUNT],
global_metrics: [Histogram; SmgrQueryType::COUNT],
per_timeline_getpage: Histogram,
}
static SMGR_QUERY_TIME_PER_TENANT_TIMELINE: Lazy<HistogramVec> = Lazy::new(|| {
@@ -1224,27 +1237,32 @@ impl SmgrQueryTimePerTimeline {
let tenant_id = tenant_shard_id.tenant_id.to_string();
let shard_slug = format!("{}", tenant_shard_id.shard_slug());
let timeline_id = timeline_id.to_string();
let metrics = std::array::from_fn(|i| {
let global_metrics = std::array::from_fn(|i| {
let op = SmgrQueryType::from_repr(i).unwrap();
let global = SMGR_QUERY_TIME_GLOBAL
SMGR_QUERY_TIME_GLOBAL
.get_metric_with_label_values(&[op.into()])
.unwrap();
let per_tenant_timeline = SMGR_QUERY_TIME_PER_TENANT_TIMELINE
.get_metric_with_label_values(&[op.into(), &tenant_id, &shard_slug, &timeline_id])
.unwrap();
GlobalAndPerTimelineHistogram {
global,
per_tenant_timeline,
}
.unwrap()
});
Self { metrics }
let per_timeline_getpage = SMGR_QUERY_TIME_PER_TENANT_TIMELINE
.get_metric_with_label_values(&[
SmgrQueryType::GetPageAtLsn.into(),
&tenant_id,
&shard_slug,
&timeline_id,
])
.unwrap();
Self {
global_metrics,
per_timeline_getpage,
}
}
pub(crate) fn start_timer<'c: 'a, 'a>(
&'a self,
op: SmgrQueryType,
ctx: &'c RequestContext,
) -> impl Drop + '_ {
let metric = &self.metrics[op as usize];
) -> Option<impl Drop + '_> {
let global_metric = &self.global_metrics[op as usize];
let start = Instant::now();
match ctx.micros_spent_throttled.open() {
Ok(()) => (),
@@ -1263,12 +1281,20 @@ impl SmgrQueryTimePerTimeline {
});
}
}
GlobalAndPerTimelineHistogramTimer {
h: metric,
let timeline_metric = if matches!(op, SmgrQueryType::GetPageAtLsn) {
Some(&self.per_timeline_getpage)
} else {
None
};
Some(GlobalAndPerTimelineHistogramTimer {
global_metric,
timeline_metric,
ctx,
start,
op,
}
})
}
}
@@ -1315,17 +1341,9 @@ mod smgr_query_time_tests {
let get_counts = || {
let global: u64 = ops
.iter()
.map(|op| metrics.metrics[*op as usize].global.get_sample_count())
.map(|op| metrics.global_metrics[*op as usize].get_sample_count())
.sum();
let per_tenant_timeline: u64 = ops
.iter()
.map(|op| {
metrics.metrics[*op as usize]
.per_tenant_timeline
.get_sample_count()
})
.sum();
(global, per_tenant_timeline)
(global, metrics.per_timeline_getpage.get_sample_count())
};
let (pre_global, pre_per_tenant_timeline) = get_counts();
@@ -1336,7 +1354,12 @@ mod smgr_query_time_tests {
drop(timer);
let (post_global, post_per_tenant_timeline) = get_counts();
assert_eq!(post_per_tenant_timeline, 1);
if matches!(op, super::SmgrQueryType::GetPageAtLsn) {
// getpage ops are tracked per-timeline, others aren't
assert_eq!(post_per_tenant_timeline, 1);
} else {
assert_eq!(post_per_tenant_timeline, 0);
}
assert!(post_global > pre_global);
}
}
@@ -1447,7 +1470,6 @@ pub(crate) enum ComputeCommandKind {
PageStreamV2,
PageStream,
Basebackup,
GetLastRecordRlsn,
Fullbackup,
ImportBasebackup,
ImportWal,
@@ -1691,6 +1713,15 @@ pub(crate) static SECONDARY_MODE: Lazy<SecondaryModeMetrics> = Lazy::new(|| {
}
});
pub(crate) static SECONDARY_RESIDENT_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_secondary_resident_physical_size",
"The size of the layer files present in the pageserver's filesystem, for secondary locations.",
&["tenant_id", "shard_id"]
)
.expect("failed to define a metric")
});
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum RemoteOpKind {
Upload,
@@ -2093,6 +2124,8 @@ pub(crate) struct TimelineMetrics {
pub garbage_collect_histo: StorageTimeMetrics,
pub find_gc_cutoffs_histo: StorageTimeMetrics,
pub last_record_gauge: IntGauge,
pub pitr_history_size: UIntGauge,
pub archival_size: UIntGauge,
pub standby_horizon_gauge: IntGauge,
pub resident_physical_size_gauge: UIntGauge,
/// copy of LayeredTimeline.current_logical_size
@@ -2166,6 +2199,15 @@ impl TimelineMetrics {
let last_record_gauge = LAST_RECORD_LSN
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let pitr_history_size = PITR_HISTORY_SIZE
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let archival_size = TIMELINE_ARCHIVE_SIZE
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let standby_horizon_gauge = STANDBY_HORIZON
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
@@ -2218,6 +2260,8 @@ impl TimelineMetrics {
find_gc_cutoffs_histo,
load_layer_map_histo,
last_record_gauge,
pitr_history_size,
archival_size,
standby_horizon_gauge,
resident_physical_size_gauge,
current_logical_size_gauge,
@@ -2275,6 +2319,10 @@ impl TimelineMetrics {
if let Some(metric) = Lazy::get(&DIRECTORY_ENTRIES_COUNT) {
let _ = metric.remove_label_values(&[tenant_id, shard_id, timeline_id]);
}
let _ = TIMELINE_ARCHIVE_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = PITR_HISTORY_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = EVICTIONS.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = AUX_FILE_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = VALID_LSN_LEASE_COUNT.remove_label_values(&[tenant_id, shard_id, timeline_id]);
@@ -2308,14 +2356,12 @@ impl TimelineMetrics {
let _ = STORAGE_IO_SIZE.remove_label_values(&[op, tenant_id, shard_id, timeline_id]);
}
for op in SmgrQueryType::iter() {
let _ = SMGR_QUERY_TIME_PER_TENANT_TIMELINE.remove_label_values(&[
op.into(),
tenant_id,
shard_id,
timeline_id,
]);
}
let _ = SMGR_QUERY_TIME_PER_TENANT_TIMELINE.remove_label_values(&[
SmgrQueryType::GetPageAtLsn.into(),
tenant_id,
shard_id,
timeline_id,
]);
}
}

View File

@@ -1656,53 +1656,6 @@ where
metric_recording.observe(&res);
res?;
}
// return pair of prev_lsn and last_lsn
else if let Some(params) = parts.strip_prefix(&["get_last_record_rlsn"]) {
if params.len() != 2 {
return Err(QueryError::Other(anyhow::anyhow!(
"invalid param number for get_last_record_rlsn command"
)));
}
let tenant_id = TenantId::from_str(params[0])
.with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
let timeline_id = TimelineId::from_str(params[1])
.with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
tracing::Span::current()
.record("tenant_id", field::display(tenant_id))
.record("timeline_id", field::display(timeline_id));
self.check_permission(Some(tenant_id))?;
COMPUTE_COMMANDS_COUNTERS
.for_command(ComputeCommandKind::GetLastRecordRlsn)
.inc();
async {
let timeline = self
.get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
.await?;
let end_of_timeline = timeline.get_last_record_rlsn();
pgb.write_message_noflush(&BeMessage::RowDescription(&[
RowDescriptor::text_col(b"prev_lsn"),
RowDescriptor::text_col(b"last_lsn"),
]))?
.write_message_noflush(&BeMessage::DataRow(&[
Some(end_of_timeline.prev.to_string().as_bytes()),
Some(end_of_timeline.last.to_string().as_bytes()),
]))?
.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
anyhow::Ok(())
}
.instrument(info_span!(
"handle_get_last_record_lsn",
shard_id = tracing::field::Empty
))
.await?;
}
// same as basebackup, but result includes relational data as well
else if let Some(params) = parts.strip_prefix(&["fullbackup"]) {
if params.len() < 2 {

View File

@@ -857,10 +857,12 @@ impl Timeline {
let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
let dbdir = DbDirectory::des(&buf)?;
let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect();
dbs.sort_unstable();
for (spcnode, dbnode) in dbs {
result.add_key(relmap_file_key(spcnode, dbnode));
let mut dbs: Vec<((Oid, Oid), bool)> = dbdir.dbdirs.iter().map(|(k, v)| (*k, *v)).collect();
dbs.sort_unstable_by(|(k_a, _), (k_b, _)| k_a.cmp(k_b));
for ((spcnode, dbnode), has_relmap_file) in dbs {
if has_relmap_file {
result.add_key(relmap_file_key(spcnode, dbnode));
}
result.add_key(rel_dir_to_key(spcnode, dbnode));
let mut rels: Vec<RelTag> = self

View File

@@ -73,6 +73,7 @@ use crate::deletion_queue::DeletionQueueClient;
use crate::deletion_queue::DeletionQueueError;
use crate::import_datadir;
use crate::is_uninit_mark;
use crate::l0_flush::L0FlushGlobalState;
use crate::metrics::TENANT;
use crate::metrics::{
remove_tenant_metrics, BROKEN_TENANTS_SET, TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC,
@@ -166,6 +167,7 @@ pub struct TenantSharedResources {
pub broker_client: storage_broker::BrokerClientChannel,
pub remote_storage: GenericRemoteStorage,
pub deletion_queue_client: DeletionQueueClient,
pub l0_flush_global_state: L0FlushGlobalState,
}
/// A [`Tenant`] is really an _attached_ tenant. The configuration
@@ -294,6 +296,8 @@ pub struct Tenant {
/// An ongoing timeline detach must be checked during attempts to GC or compact a timeline.
ongoing_timeline_detach: std::sync::Mutex<Option<(TimelineId, utils::completion::Barrier)>>,
l0_flush_global_state: L0FlushGlobalState,
}
impl std::fmt::Debug for Tenant {
@@ -529,6 +533,15 @@ impl From<PageReconstructError> for GcError {
}
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum LoadConfigError {
#[error("TOML deserialization error: '{0}'")]
DeserializeToml(#[from] toml_edit::de::Error),
#[error("Config not found at {0}")]
NotFound(Utf8PathBuf),
}
impl Tenant {
/// Yet another helper for timeline initialization.
///
@@ -667,6 +680,7 @@ impl Tenant {
broker_client,
remote_storage,
deletion_queue_client,
l0_flush_global_state,
} = resources;
let attach_mode = attached_conf.location.attach_mode;
@@ -681,6 +695,7 @@ impl Tenant {
tenant_shard_id,
remote_storage.clone(),
deletion_queue_client,
l0_flush_global_state,
));
// The attach task will carry a GateGuard, so that shutdown() reliably waits for it to drop out if
@@ -980,6 +995,7 @@ impl Tenant {
TimelineResources {
remote_client,
timeline_get_throttle: self.timeline_get_throttle.clone(),
l0_flush_global_state: self.l0_flush_global_state.clone(),
},
ctx,
)
@@ -1349,7 +1365,7 @@ impl Tenant {
initdb_lsn: Lsn,
pg_version: u32,
ctx: &RequestContext,
delta_layer_desc: Vec<Vec<(pageserver_api::key::Key, Lsn, crate::repository::Value)>>,
delta_layer_desc: Vec<timeline::DeltaLayerTestDesc>,
image_layer_desc: Vec<(Lsn, Vec<(pageserver_api::key::Key, bytes::Bytes)>)>,
end_lsn: Lsn,
) -> anyhow::Result<Arc<Timeline>> {
@@ -1800,9 +1816,15 @@ impl Tenant {
// If we're still attaching, fire the cancellation token early to drop out: this
// will prevent us flushing, but ensures timely shutdown if some I/O during attach
// is very slow.
if matches!(self.current_state(), TenantState::Attaching) {
let shutdown_mode = if matches!(self.current_state(), TenantState::Attaching) {
self.cancel.cancel();
}
// Having fired our cancellation token, do not try and flush timelines: their cancellation tokens
// are children of ours, so their flush loops will have shut down already
timeline::ShutdownMode::Hard
} else {
shutdown_mode
};
match self.set_stopping(shutdown_progress, false, false).await {
Ok(()) => {}
@@ -2469,6 +2491,7 @@ impl Tenant {
tenant_shard_id: TenantShardId,
remote_storage: GenericRemoteStorage,
deletion_queue_client: DeletionQueueClient,
l0_flush_global_state: L0FlushGlobalState,
) -> Tenant {
debug_assert!(
!attached_conf.location.generation.is_none() || conf.control_plane_api.is_none()
@@ -2556,6 +2579,7 @@ impl Tenant {
)),
tenant_conf: Arc::new(ArcSwap::from_pointee(attached_conf)),
ongoing_timeline_detach: std::sync::Mutex::default(),
l0_flush_global_state,
}
}
@@ -2563,36 +2587,35 @@ impl Tenant {
pub(super) fn load_tenant_config(
conf: &'static PageServerConf,
tenant_shard_id: &TenantShardId,
) -> anyhow::Result<LocationConf> {
) -> Result<LocationConf, LoadConfigError> {
let config_path = conf.tenant_location_config_path(tenant_shard_id);
if config_path.exists() {
// New-style config takes precedence
let deserialized = Self::read_config(&config_path)?;
Ok(toml_edit::de::from_document::<LocationConf>(deserialized)?)
} else {
// The config should almost always exist for a tenant directory:
// - When attaching a tenant, the config is the first thing we write
// - When detaching a tenant, we atomically move the directory to a tmp location
// before deleting contents.
//
// The very rare edge case that can result in a missing config is if we crash during attach
// between creating directory and writing config. Callers should handle that as if the
// directory didn't exist.
anyhow::bail!("tenant config not found in {}", config_path);
}
}
fn read_config(path: &Utf8Path) -> anyhow::Result<toml_edit::Document> {
info!("loading tenant configuration from {path}");
info!("loading tenant configuration from {config_path}");
// load and parse file
let config = fs::read_to_string(path)
.with_context(|| format!("Failed to load config from path '{path}'"))?;
let config = fs::read_to_string(&config_path).map_err(|e| {
match e.kind() {
std::io::ErrorKind::NotFound => {
// The config should almost always exist for a tenant directory:
// - When attaching a tenant, the config is the first thing we write
// - When detaching a tenant, we atomically move the directory to a tmp location
// before deleting contents.
//
// The very rare edge case that can result in a missing config is if we crash during attach
// between creating directory and writing config. Callers should handle that as if the
// directory didn't exist.
config
.parse::<toml_edit::Document>()
.with_context(|| format!("Failed to parse config from file '{path}' as toml file"))
LoadConfigError::NotFound(config_path)
}
_ => {
// No IO errors except NotFound are acceptable here: other kinds of error indicate local storage or permissions issues
// that we cannot cleanly recover
crate::virtual_file::on_fatal_io_error(&e, "Reading tenant config file")
}
}
})?;
Ok(toml_edit::de::from_str::<LocationConf>(&config)?)
}
#[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
@@ -2600,7 +2623,7 @@ impl Tenant {
conf: &'static PageServerConf,
tenant_shard_id: &TenantShardId,
location_conf: &LocationConf,
) -> anyhow::Result<()> {
) -> std::io::Result<()> {
let config_path = conf.tenant_location_config_path(tenant_shard_id);
Self::persist_tenant_config_at(tenant_shard_id, &config_path, location_conf).await
@@ -2611,7 +2634,7 @@ impl Tenant {
tenant_shard_id: &TenantShardId,
config_path: &Utf8Path,
location_conf: &LocationConf,
) -> anyhow::Result<()> {
) -> std::io::Result<()> {
debug!("persisting tenantconf to {config_path}");
let mut conf_content = r#"# This file contains a specific per-tenant's config.
@@ -2620,22 +2643,20 @@ impl Tenant {
.to_string();
fail::fail_point!("tenant-config-before-write", |_| {
anyhow::bail!("tenant-config-before-write");
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"tenant-config-before-write",
))
});
// Convert the config to a toml file.
conf_content += &toml_edit::ser::to_string_pretty(&location_conf)?;
conf_content +=
&toml_edit::ser::to_string_pretty(&location_conf).expect("Config serialization failed");
let temp_path = path_with_suffix_extension(config_path, TEMP_FILE_SUFFIX);
let tenant_shard_id = *tenant_shard_id;
let config_path = config_path.to_owned();
let conf_content = conf_content.into_bytes();
VirtualFile::crashsafe_overwrite(config_path.clone(), temp_path, conf_content)
.await
.with_context(|| format!("write tenant {tenant_shard_id} config to {config_path}"))?;
Ok(())
VirtualFile::crashsafe_overwrite(config_path.to_owned(), temp_path, conf_content).await
}
//
@@ -2853,6 +2874,7 @@ impl Tenant {
{
let mut target = timeline.gc_info.write().unwrap();
// Cull any expired leases
let now = SystemTime::now();
target.leases.retain(|_, lease| !lease.is_expired(&now));
@@ -2861,6 +2883,31 @@ impl Tenant {
.valid_lsn_lease_count_gauge
.set(target.leases.len() as u64);
// Look up parent's PITR cutoff to update the child's knowledge of whether it is within parent's PITR
if let Some(ancestor_id) = timeline.get_ancestor_timeline_id() {
if let Some(ancestor_gc_cutoffs) = gc_cutoffs.get(&ancestor_id) {
target.within_ancestor_pitr =
timeline.get_ancestor_lsn() >= ancestor_gc_cutoffs.pitr;
}
}
// Update metrics that depend on GC state
timeline
.metrics
.archival_size
.set(if target.within_ancestor_pitr {
timeline.metrics.current_logical_size_gauge.get()
} else {
0
});
timeline.metrics.pitr_history_size.set(
timeline
.get_last_record_lsn()
.checked_sub(target.cutoffs.pitr)
.unwrap_or(Lsn(0))
.0,
);
match gc_cutoffs.remove(&timeline.timeline_id) {
Some(cutoffs) => {
target.retain_lsns = branchpoints;
@@ -2912,7 +2959,7 @@ impl Tenant {
dst_id: TimelineId,
ancestor_lsn: Option<Lsn>,
ctx: &RequestContext,
delta_layer_desc: Vec<Vec<(pageserver_api::key::Key, Lsn, crate::repository::Value)>>,
delta_layer_desc: Vec<timeline::DeltaLayerTestDesc>,
image_layer_desc: Vec<(Lsn, Vec<(pageserver_api::key::Key, bytes::Bytes)>)>,
end_lsn: Lsn,
) -> anyhow::Result<Arc<Timeline>> {
@@ -3296,6 +3343,7 @@ impl Tenant {
TimelineResources {
remote_client,
timeline_get_throttle: self.timeline_get_throttle.clone(),
l0_flush_global_state: self.l0_flush_global_state.clone(),
}
}
@@ -3632,6 +3680,7 @@ pub(crate) mod harness {
use utils::logging;
use crate::deletion_queue::mock::MockDeletionQueue;
use crate::l0_flush::L0FlushConfig;
use crate::walredo::apply_neon;
use crate::{repository::Key, walrecord::NeonWalRecord};
@@ -3821,6 +3870,8 @@ pub(crate) mod harness {
self.tenant_shard_id,
self.remote_storage.clone(),
self.deletion_queue.new_client(),
// TODO: ideally we should run all unit tests with both configs
L0FlushGlobalState::new(L0FlushConfig::default()),
));
let preload = tenant
@@ -3908,7 +3959,7 @@ mod tests {
use storage_layer::PersistentLayerKey;
use tests::storage_layer::ValuesReconstructState;
use tests::timeline::{GetVectoredError, ShutdownMode};
use timeline::GcInfo;
use timeline::{DeltaLayerTestDesc, GcInfo};
use utils::bin_ser::BeSer;
use utils::id::TenantId;
@@ -6204,27 +6255,6 @@ mod tests {
.await
.unwrap();
async fn get_vectored_impl_wrapper(
tline: &Arc<Timeline>,
key: Key,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Option<Bytes>, GetVectoredError> {
let mut reconstruct_state = ValuesReconstructState::new();
let mut res = tline
.get_vectored_impl(
KeySpace::single(key..key.next()),
lsn,
&mut reconstruct_state,
ctx,
)
.await?;
Ok(res.pop_last().map(|(k, v)| {
assert_eq!(k, key);
v.unwrap()
}))
}
let lsn = Lsn(0x30);
// test vectored get on parent timeline
@@ -6300,27 +6330,6 @@ mod tests {
.await
.unwrap();
async fn get_vectored_impl_wrapper(
tline: &Arc<Timeline>,
key: Key,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Option<Bytes>, GetVectoredError> {
let mut reconstruct_state = ValuesReconstructState::new();
let mut res = tline
.get_vectored_impl(
KeySpace::single(key..key.next()),
lsn,
&mut reconstruct_state,
ctx,
)
.await?;
Ok(res.pop_last().map(|(k, v)| {
assert_eq!(k, key);
v.unwrap()
}))
}
let lsn = Lsn(0x30);
// test vectored get on parent timeline
@@ -6396,9 +6405,18 @@ mod tests {
&ctx,
// delta layers
vec![
vec![(key2, Lsn(0x10), Value::Image(test_img("metadata key 2")))],
vec![(key1, Lsn(0x20), Value::Image(Bytes::new()))],
vec![(key2, Lsn(0x20), Value::Image(Bytes::new()))],
DeltaLayerTestDesc::new_with_inferred_key_range(
Lsn(0x10)..Lsn(0x20),
vec![(key2, Lsn(0x10), Value::Image(test_img("metadata key 2")))],
),
DeltaLayerTestDesc::new_with_inferred_key_range(
Lsn(0x20)..Lsn(0x30),
vec![(key1, Lsn(0x20), Value::Image(Bytes::new()))],
),
DeltaLayerTestDesc::new_with_inferred_key_range(
Lsn(0x20)..Lsn(0x30),
vec![(key2, Lsn(0x20), Value::Image(Bytes::new()))],
),
],
// image layers
vec![
@@ -6464,17 +6482,29 @@ mod tests {
&ctx,
// delta layers
vec![
vec![(key2, Lsn(0x10), Value::Image(test_img("metadata key 2")))],
vec![(key1, Lsn(0x20), Value::Image(Bytes::new()))],
vec![(key2, Lsn(0x20), Value::Image(Bytes::new()))],
vec![
(key0, Lsn(0x30), Value::Image(test_img("metadata key 0"))),
(key3, Lsn(0x30), Value::Image(test_img("metadata key 3"))),
],
DeltaLayerTestDesc::new_with_inferred_key_range(
Lsn(0x10)..Lsn(0x20),
vec![(key2, Lsn(0x10), Value::Image(test_img("metadata key 2")))],
),
DeltaLayerTestDesc::new_with_inferred_key_range(
Lsn(0x20)..Lsn(0x30),
vec![(key1, Lsn(0x20), Value::Image(Bytes::new()))],
),
DeltaLayerTestDesc::new_with_inferred_key_range(
Lsn(0x20)..Lsn(0x30),
vec![(key2, Lsn(0x20), Value::Image(Bytes::new()))],
),
DeltaLayerTestDesc::new_with_inferred_key_range(
Lsn(0x30)..Lsn(0x40),
vec![
(key0, Lsn(0x30), Value::Image(test_img("metadata key 0"))),
(key3, Lsn(0x30), Value::Image(test_img("metadata key 3"))),
],
),
],
// image layers
vec![(Lsn(0x10), vec![(key1, test_img("metadata key 1"))])],
Lsn(0x30),
Lsn(0x40),
)
.await
.unwrap();
@@ -6497,7 +6527,7 @@ mod tests {
// Image layers are created at last_record_lsn
let images = tline
.inspect_image_layers(Lsn(0x30), &ctx)
.inspect_image_layers(Lsn(0x40), &ctx)
.await
.unwrap()
.into_iter()
@@ -6523,9 +6553,18 @@ mod tests {
&ctx,
// delta layers
vec![
vec![(key2, Lsn(0x10), Value::Image(test_img("metadata key 2")))],
vec![(key1, Lsn(0x20), Value::Image(Bytes::new()))],
vec![(key2, Lsn(0x20), Value::Image(Bytes::new()))],
DeltaLayerTestDesc::new_with_inferred_key_range(
Lsn(0x10)..Lsn(0x20),
vec![(key2, Lsn(0x10), Value::Image(test_img("metadata key 2")))],
),
DeltaLayerTestDesc::new_with_inferred_key_range(
Lsn(0x20)..Lsn(0x30),
vec![(key1, Lsn(0x20), Value::Image(Bytes::new()))],
),
DeltaLayerTestDesc::new_with_inferred_key_range(
Lsn(0x20)..Lsn(0x30),
vec![(key2, Lsn(0x20), Value::Image(Bytes::new()))],
),
],
// image layers
vec![(Lsn(0x10), vec![(key1, test_img("metadata key 1"))])],
@@ -6573,15 +6612,21 @@ mod tests {
key
}
// We create one bottom-most image layer, a delta layer D1 crossing the GC horizon, D2 below the horizon, and D3 above the horizon.
// We create
// - one bottom-most image layer,
// - a delta layer D1 crossing the GC horizon with data below and above the horizon,
// - a delta layer D2 crossing the GC horizon with data only below the horizon,
// - a delta layer D3 above the horizon.
//
// | D1 | | D3 |
// | D3 |
// | D1 |
// -| |-- gc horizon -----------------
// | | | D2 |
// --------- img layer ------------------
//
// What we should expact from this compaction is:
// | Part of D1 | | D3 |
// | D3 |
// | Part of D1 |
// --------- img layer with D1+D2 at GC horizon------------------
// img layer at 0x10
@@ -6621,13 +6666,13 @@ mod tests {
let delta3 = vec![
(
get_key(8),
Lsn(0x40),
Value::Image(Bytes::from("value 8@0x40")),
Lsn(0x48),
Value::Image(Bytes::from("value 8@0x48")),
),
(
get_key(9),
Lsn(0x40),
Value::Image(Bytes::from("value 9@0x40")),
Lsn(0x48),
Value::Image(Bytes::from("value 9@0x48")),
),
];
@@ -6637,7 +6682,11 @@ mod tests {
Lsn(0x10),
DEFAULT_PG_VERSION,
&ctx,
vec![delta1, delta2, delta3], // delta layers
vec![
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x48), delta1),
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x48), delta2),
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x48)..Lsn(0x50), delta3),
], // delta layers
vec![(Lsn(0x10), img_layer)], // image layers
Lsn(0x50),
)
@@ -6658,8 +6707,8 @@ mod tests {
Bytes::from_static(b"value 5@0x20"),
Bytes::from_static(b"value 6@0x20"),
Bytes::from_static(b"value 7@0x10"),
Bytes::from_static(b"value 8@0x40"),
Bytes::from_static(b"value 9@0x40"),
Bytes::from_static(b"value 8@0x48"),
Bytes::from_static(b"value 9@0x48"),
];
for (idx, expected) in expected_result.iter().enumerate() {
@@ -6747,10 +6796,10 @@ mod tests {
lsn_range: Lsn(0x30)..Lsn(0x41),
is_delta: true
},
// The delta layer we created and should not be picked for the compaction
// The delta3 layer that should not be picked for the compaction
PersistentLayerKey {
key_range: get_key(8)..get_key(10),
lsn_range: Lsn(0x40)..Lsn(0x41),
lsn_range: Lsn(0x48)..Lsn(0x50),
is_delta: true
}
]
@@ -6814,7 +6863,10 @@ mod tests {
Lsn(0x10),
DEFAULT_PG_VERSION,
&ctx,
vec![delta1], // delta layers
vec![DeltaLayerTestDesc::new_with_inferred_key_range(
Lsn(0x10)..Lsn(0x40),
delta1,
)], // delta layers
vec![(Lsn(0x10), image1)], // image layers
Lsn(0x50),
)
@@ -6938,15 +6990,21 @@ mod tests {
key
}
// We create one bottom-most image layer, a delta layer D1 crossing the GC horizon, D2 below the horizon, and D3 above the horizon.
// We create
// - one bottom-most image layer,
// - a delta layer D1 crossing the GC horizon with data below and above the horizon,
// - a delta layer D2 crossing the GC horizon with data only below the horizon,
// - a delta layer D3 above the horizon.
//
// | D1 | | D3 |
// | D3 |
// | D1 |
// -| |-- gc horizon -----------------
// | | | D2 |
// --------- img layer ------------------
//
// What we should expact from this compaction is:
// | Part of D1 | | D3 |
// | D3 |
// | Part of D1 |
// --------- img layer with D1+D2 at GC horizon------------------
// img layer at 0x10
@@ -6996,13 +7054,13 @@ mod tests {
let delta3 = vec![
(
get_key(8),
Lsn(0x40),
Value::WalRecord(NeonWalRecord::wal_append("@0x40")),
Lsn(0x48),
Value::WalRecord(NeonWalRecord::wal_append("@0x48")),
),
(
get_key(9),
Lsn(0x40),
Value::WalRecord(NeonWalRecord::wal_append("@0x40")),
Lsn(0x48),
Value::WalRecord(NeonWalRecord::wal_append("@0x48")),
),
];
@@ -7012,7 +7070,11 @@ mod tests {
Lsn(0x10),
DEFAULT_PG_VERSION,
&ctx,
vec![delta1, delta2, delta3], // delta layers
vec![
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x10)..Lsn(0x48), delta1),
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x10)..Lsn(0x48), delta2),
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x48)..Lsn(0x50), delta3),
], // delta layers
vec![(Lsn(0x10), img_layer)], // image layers
Lsn(0x50),
)
@@ -7027,6 +7089,7 @@ mod tests {
horizon: Lsn(0x30),
},
leases: Default::default(),
within_ancestor_pitr: false,
};
}
@@ -7039,8 +7102,8 @@ mod tests {
Bytes::from_static(b"value 5@0x10@0x20"),
Bytes::from_static(b"value 6@0x10@0x20"),
Bytes::from_static(b"value 7@0x10"),
Bytes::from_static(b"value 8@0x10@0x40"),
Bytes::from_static(b"value 9@0x10@0x40"),
Bytes::from_static(b"value 8@0x10@0x48"),
Bytes::from_static(b"value 9@0x10@0x48"),
];
let expected_result_at_gc_horizon = [

View File

@@ -6,13 +6,20 @@
//! is written as a one byte. If it's larger than that, the length
//! is written as a four-byte integer, in big-endian, with the high
//! bit set. This way, we can detect whether it's 1- or 4-byte header
//! by peeking at the first byte.
//! by peeking at the first byte. For blobs larger than 128 bits,
//! we also specify three reserved bits, only one of the three bit
//! patterns is currently in use (0b011) and signifies compression
//! with zstd.
//!
//! len < 128: 0XXXXXXX
//! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
//! len >= 128: 1CCCXXXX XXXXXXXX XXXXXXXX XXXXXXXX
//!
use async_compression::Level;
use bytes::{BufMut, BytesMut};
use pageserver_api::models::ImageCompressionAlgorithm;
use tokio::io::AsyncWriteExt;
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
use tracing::warn;
use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
@@ -66,12 +73,37 @@ impl<'a> BlockCursor<'a> {
len_buf.copy_from_slice(&buf[off..off + 4]);
off += 4;
}
len_buf[0] &= 0x7f;
let bit_mask = if self.read_compressed {
!LEN_COMPRESSION_BIT_MASK
} else {
0x7f
};
len_buf[0] &= bit_mask;
u32::from_be_bytes(len_buf) as usize
};
let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
dstbuf.clear();
dstbuf.reserve(len);
let mut tmp_buf = Vec::new();
let buf_to_write;
let compression = if compression_bits <= BYTE_UNCOMPRESSED || !self.read_compressed {
if compression_bits > BYTE_UNCOMPRESSED {
warn!("reading key above future limit ({len} bytes)");
}
buf_to_write = dstbuf;
None
} else if compression_bits == BYTE_ZSTD {
buf_to_write = &mut tmp_buf;
Some(dstbuf)
} else {
let error = std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("invalid compression byte {compression_bits:x}"),
);
return Err(error);
};
buf_to_write.clear();
buf_to_write.reserve(len);
// Read the payload
let mut remain = len;
@@ -85,14 +117,35 @@ impl<'a> BlockCursor<'a> {
page_remain = PAGE_SZ;
}
let this_blk_len = min(remain, page_remain);
dstbuf.extend_from_slice(&buf[off..off + this_blk_len]);
buf_to_write.extend_from_slice(&buf[off..off + this_blk_len]);
remain -= this_blk_len;
off += this_blk_len;
}
if let Some(dstbuf) = compression {
if compression_bits == BYTE_ZSTD {
let mut decoder = async_compression::tokio::write::ZstdDecoder::new(dstbuf);
decoder.write_all(buf_to_write).await?;
decoder.flush().await?;
} else {
unreachable!("already checked above")
}
}
Ok(())
}
}
/// Reserved bits for length and compression
const LEN_COMPRESSION_BIT_MASK: u8 = 0xf0;
/// The maximum size of blobs we support. The highest few bits
/// are reserved for compression and other further uses.
const MAX_SUPPORTED_LEN: usize = 0x0fff_ffff;
const BYTE_UNCOMPRESSED: u8 = 0x80;
const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
/// A wrapper of `VirtualFile` that allows users to write blobs.
///
/// If a `BlobWriter` is dropped, the internal buffer will be
@@ -219,6 +272,17 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
&mut self,
srcbuf: B,
ctx: &RequestContext,
) -> (B::Buf, Result<u64, Error>) {
self.write_blob_maybe_compressed(srcbuf, ctx, None).await
}
/// Write a blob of data. Returns the offset that it was written to,
/// which can be used to retrieve the data later.
pub async fn write_blob_maybe_compressed<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
srcbuf: B,
ctx: &RequestContext,
algorithm: Option<ImageCompressionAlgorithm>,
) -> (B::Buf, Result<u64, Error>) {
let offset = self.offset;
@@ -226,29 +290,58 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
let mut io_buf = self.io_buf.take().expect("we always put it back below");
io_buf.clear();
let (io_buf, hdr_res) = async {
let mut compressed_buf = None;
let ((io_buf, hdr_res), srcbuf) = async {
if len < 128 {
// Short blob. Write a 1-byte length header
io_buf.put_u8(len as u8);
self.write_all(io_buf, ctx).await
(
self.write_all(io_buf, ctx).await,
srcbuf.slice_full().into_inner(),
)
} else {
// Write a 4-byte length header
if len > 0x7fff_ffff {
if len > MAX_SUPPORTED_LEN {
return (
io_buf,
Err(Error::new(
ErrorKind::Other,
format!("blob too large ({len} bytes)"),
)),
(
io_buf,
Err(Error::new(
ErrorKind::Other,
format!("blob too large ({len} bytes)"),
)),
),
srcbuf.slice_full().into_inner(),
);
}
if len > 0x0fff_ffff {
tracing::warn!("writing blob above future limit ({len} bytes)");
}
let mut len_buf = (len as u32).to_be_bytes();
len_buf[0] |= 0x80;
let (high_bit_mask, len_written, srcbuf) = match algorithm {
Some(ImageCompressionAlgorithm::Zstd { level }) => {
let mut encoder = if let Some(level) = level {
async_compression::tokio::write::ZstdEncoder::with_quality(
Vec::new(),
Level::Precise(level.into()),
)
} else {
async_compression::tokio::write::ZstdEncoder::new(Vec::new())
};
let slice = srcbuf.slice_full();
encoder.write_all(&slice[..]).await.unwrap();
encoder.shutdown().await.unwrap();
let compressed = encoder.into_inner();
if compressed.len() < len {
let compressed_len = compressed.len();
compressed_buf = Some(compressed);
(BYTE_ZSTD, compressed_len, slice.into_inner())
} else {
(BYTE_UNCOMPRESSED, len, slice.into_inner())
}
}
None => (BYTE_UNCOMPRESSED, len, srcbuf.slice_full().into_inner()),
};
let mut len_buf = (len_written as u32).to_be_bytes();
assert_eq!(len_buf[0] & 0xf0, 0);
len_buf[0] |= high_bit_mask;
io_buf.extend_from_slice(&len_buf[..]);
self.write_all(io_buf, ctx).await
(self.write_all(io_buf, ctx).await, srcbuf)
}
}
.await;
@@ -257,7 +350,12 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
Ok(_) => (),
Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)),
}
let (srcbuf, res) = self.write_all(srcbuf, ctx).await;
let (srcbuf, res) = if let Some(compressed_buf) = compressed_buf {
let (_buf, res) = self.write_all(compressed_buf, ctx).await;
(Slice::into_inner(srcbuf.slice(..)), res)
} else {
self.write_all(srcbuf, ctx).await
};
(srcbuf, res.map(|_| offset))
}
}
@@ -295,6 +393,12 @@ mod tests {
use rand::{Rng, SeedableRng};
async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
round_trip_test_compressed::<BUFFERED, false>(blobs).await
}
async fn round_trip_test_compressed<const BUFFERED: bool, const COMPRESSION: bool>(
blobs: &[Vec<u8>],
) -> Result<(), Error> {
let temp_dir = camino_tempfile::tempdir()?;
let pathbuf = temp_dir.path().join("file");
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
@@ -305,7 +409,16 @@ mod tests {
let file = VirtualFile::create(pathbuf.as_path(), &ctx).await?;
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
for blob in blobs.iter() {
let (_, res) = wtr.write_blob(blob.clone(), &ctx).await;
let (_, res) = if COMPRESSION {
wtr.write_blob_maybe_compressed(
blob.clone(),
&ctx,
Some(ImageCompressionAlgorithm::Zstd { level: Some(1) }),
)
.await
} else {
wtr.write_blob(blob.clone(), &ctx).await
};
let offs = res?;
offsets.push(offs);
}
@@ -319,7 +432,7 @@ mod tests {
let file = VirtualFile::open(pathbuf.as_path(), &ctx).await?;
let rdr = BlockReaderRef::VirtualFile(&file);
let rdr = BlockCursor::new(rdr);
let rdr = BlockCursor::new_with_compression(rdr, COMPRESSION);
for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
let blob_read = rdr.read_blob(*offset, &ctx).await?;
assert_eq!(
@@ -353,6 +466,8 @@ mod tests {
];
round_trip_test::<false>(blobs).await?;
round_trip_test::<true>(blobs).await?;
round_trip_test_compressed::<false, true>(blobs).await?;
round_trip_test_compressed::<true, true>(blobs).await?;
Ok(())
}
@@ -361,10 +476,15 @@ mod tests {
let blobs = &[
b"test".to_vec(),
random_array(10 * PAGE_SZ),
b"hello".to_vec(),
random_array(66 * PAGE_SZ),
vec![0xf3; 24 * PAGE_SZ],
b"foobar".to_vec(),
];
round_trip_test::<false>(blobs).await?;
round_trip_test::<true>(blobs).await?;
round_trip_test_compressed::<false, true>(blobs).await?;
round_trip_test_compressed::<true, true>(blobs).await?;
Ok(())
}

View File

@@ -37,6 +37,7 @@ where
pub enum BlockLease<'a> {
PageReadGuard(PageReadGuard<'static>),
EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
Slice(&'a [u8; PAGE_SZ]),
#[cfg(test)]
Arc(std::sync::Arc<[u8; PAGE_SZ]>),
#[cfg(test)]
@@ -63,6 +64,7 @@ impl<'a> Deref for BlockLease<'a> {
match self {
BlockLease::PageReadGuard(v) => v.deref(),
BlockLease::EphemeralFileMutableTail(v) => v,
BlockLease::Slice(v) => v,
#[cfg(test)]
BlockLease::Arc(v) => v.deref(),
#[cfg(test)]
@@ -81,6 +83,7 @@ pub(crate) enum BlockReaderRef<'a> {
FileBlockReader(&'a FileBlockReader<'a>),
EphemeralFile(&'a EphemeralFile),
Adapter(Adapter<&'a DeltaLayerInner>),
Slice(&'a [u8]),
#[cfg(test)]
TestDisk(&'a super::disk_btree::tests::TestDisk),
#[cfg(test)]
@@ -99,6 +102,7 @@ impl<'a> BlockReaderRef<'a> {
FileBlockReader(r) => r.read_blk(blknum, ctx).await,
EphemeralFile(r) => r.read_blk(blknum, ctx).await,
Adapter(r) => r.read_blk(blknum, ctx).await,
Slice(s) => Self::read_blk_slice(s, blknum),
#[cfg(test)]
TestDisk(r) => r.read_blk(blknum),
#[cfg(test)]
@@ -107,6 +111,24 @@ impl<'a> BlockReaderRef<'a> {
}
}
impl<'a> BlockReaderRef<'a> {
fn read_blk_slice(slice: &[u8], blknum: u32) -> std::io::Result<BlockLease> {
let start = (blknum as usize).checked_mul(PAGE_SZ).unwrap();
let end = start.checked_add(PAGE_SZ).unwrap();
if end > slice.len() {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
format!("slice too short, len={} end={}", slice.len(), end),
));
}
let slice = &slice[start..end];
let page_sized: &[u8; PAGE_SZ] = slice
.try_into()
.expect("we add PAGE_SZ to start, so the slice must have PAGE_SZ");
Ok(BlockLease::Slice(page_sized))
}
}
///
/// A "cursor" for efficiently reading multiple pages from a BlockReader
///
@@ -127,16 +149,24 @@ impl<'a> BlockReaderRef<'a> {
/// ```
///
pub struct BlockCursor<'a> {
pub(super) read_compressed: bool,
reader: BlockReaderRef<'a>,
}
impl<'a> BlockCursor<'a> {
pub(crate) fn new(reader: BlockReaderRef<'a>) -> Self {
BlockCursor { reader }
Self::new_with_compression(reader, false)
}
pub(crate) fn new_with_compression(reader: BlockReaderRef<'a>, read_compressed: bool) -> Self {
BlockCursor {
read_compressed,
reader,
}
}
// Needed by cli
pub fn new_fileblockreader(reader: &'a FileBlockReader) -> Self {
BlockCursor {
read_compressed: false,
reader: BlockReaderRef::FileBlockReader(reader),
}
}
@@ -166,11 +196,25 @@ pub struct FileBlockReader<'a> {
/// Unique ID of this file, used as key in the page cache.
file_id: page_cache::FileId,
compressed_reads: bool,
}
impl<'a> FileBlockReader<'a> {
pub fn new(file: &'a VirtualFile, file_id: FileId) -> Self {
FileBlockReader { file_id, file }
Self::new_with_compression(file, file_id, false)
}
pub fn new_with_compression(
file: &'a VirtualFile,
file_id: FileId,
compressed_reads: bool,
) -> Self {
FileBlockReader {
file_id,
file,
compressed_reads,
}
}
/// Read a page from the underlying file into given buffer.
@@ -217,7 +261,10 @@ impl<'a> FileBlockReader<'a> {
impl BlockReader for FileBlockReader<'_> {
fn block_cursor(&self) -> BlockCursor<'_> {
BlockCursor::new(BlockReaderRef::FileBlockReader(self))
BlockCursor::new_with_compression(
BlockReaderRef::FileBlockReader(self),
self.compressed_reads,
)
}
}

View File

@@ -21,6 +21,7 @@ pub struct EphemeralFile {
}
mod page_caching;
pub(crate) use page_caching::PrewarmOnWrite as PrewarmPageCacheOnWrite;
mod zero_padded_read_write;
impl EphemeralFile {
@@ -53,7 +54,7 @@ impl EphemeralFile {
Ok(EphemeralFile {
_tenant_shard_id: tenant_shard_id,
_timeline_id: timeline_id,
rw: page_caching::RW::new(file),
rw: page_caching::RW::new(file, conf.l0_flush.prewarm_on_write()),
})
}
@@ -65,6 +66,11 @@ impl EphemeralFile {
self.rw.page_cache_file_id()
}
/// See [`self::page_caching::RW::load_to_vec`].
pub(crate) async fn load_to_vec(&self, ctx: &RequestContext) -> Result<Vec<u8>, io::Error> {
self.rw.load_to_vec(ctx).await
}
pub(crate) async fn read_blk(
&self,
blknum: u32,

View File

@@ -8,6 +8,7 @@ use crate::virtual_file::VirtualFile;
use once_cell::sync::Lazy;
use std::io::{self, ErrorKind};
use std::ops::{Deref, Range};
use tokio_epoll_uring::BoundedBuf;
use tracing::*;
@@ -19,14 +20,23 @@ pub struct RW {
rw: super::zero_padded_read_write::RW<PreWarmingWriter>,
}
/// When we flush a block to the underlying [`crate::virtual_file::VirtualFile`],
/// should we pre-warm the [`crate::page_cache`] with the contents?
#[derive(Clone, Copy)]
pub enum PrewarmOnWrite {
Yes,
No,
}
impl RW {
pub fn new(file: VirtualFile) -> Self {
pub fn new(file: VirtualFile, prewarm_on_write: PrewarmOnWrite) -> Self {
let page_cache_file_id = page_cache::next_file_id();
Self {
page_cache_file_id,
rw: super::zero_padded_read_write::RW::new(PreWarmingWriter::new(
page_cache_file_id,
file,
prewarm_on_write,
)),
}
}
@@ -49,6 +59,43 @@ impl RW {
self.rw.bytes_written()
}
/// Load all blocks that can be read via [`Self::read_blk`] into a contiguous memory buffer.
///
/// This includes the blocks that aren't yet flushed to disk by the internal buffered writer.
/// The last block is zero-padded to [`PAGE_SZ`], so, the returned buffer is always a multiple of [`PAGE_SZ`].
pub(super) async fn load_to_vec(&self, ctx: &RequestContext) -> Result<Vec<u8>, io::Error> {
// round up to the next PAGE_SZ multiple, required by blob_io
let size = {
let s = usize::try_from(self.bytes_written()).unwrap();
if s % PAGE_SZ == 0 {
s
} else {
s.checked_add(PAGE_SZ - (s % PAGE_SZ)).unwrap()
}
};
let vec = Vec::with_capacity(size);
// read from disk what we've already flushed
let writer = self.rw.as_writer();
let flushed_range = writer.written_range();
let mut vec = writer
.file
.read_exact_at(
vec.slice(0..(flushed_range.end - flushed_range.start)),
u64::try_from(flushed_range.start).unwrap(),
ctx,
)
.await?
.into_inner();
// copy from in-memory buffer what we haven't flushed yet but would return when accessed via read_blk
let buffered = self.rw.get_tail_zero_padded();
vec.extend_from_slice(buffered);
assert_eq!(vec.len(), size);
assert_eq!(vec.len() % PAGE_SZ, 0);
Ok(vec)
}
pub(crate) async fn read_blk(
&self,
blknum: u32,
@@ -116,19 +163,40 @@ impl Drop for RW {
}
struct PreWarmingWriter {
prewarm_on_write: PrewarmOnWrite,
nwritten_blocks: u32,
page_cache_file_id: page_cache::FileId,
file: VirtualFile,
}
impl PreWarmingWriter {
fn new(page_cache_file_id: page_cache::FileId, file: VirtualFile) -> Self {
fn new(
page_cache_file_id: page_cache::FileId,
file: VirtualFile,
prewarm_on_write: PrewarmOnWrite,
) -> Self {
Self {
prewarm_on_write,
nwritten_blocks: 0,
page_cache_file_id,
file,
}
}
/// Return the byte range within `file` that has been written though `write_all`.
///
/// The returned range would be invalidated by another `write_all`. To prevent that, we capture `&_`.
fn written_range(&self) -> (impl Deref<Target = Range<usize>> + '_) {
let nwritten_blocks = usize::try_from(self.nwritten_blocks).unwrap();
struct Wrapper(Range<usize>);
impl Deref for Wrapper {
type Target = Range<usize>;
fn deref(&self) -> &Range<usize> {
&self.0
}
}
Wrapper(0..nwritten_blocks * PAGE_SZ)
}
}
impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmingWriter {
@@ -178,45 +246,51 @@ impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmi
assert_eq!(&check_bounds_stuff_works, &*buf);
}
// Pre-warm page cache with the contents.
// At least in isolated bulk ingest benchmarks (test_bulk_insert.py), the pre-warming
// benefits the code that writes InMemoryLayer=>L0 layers.
let nblocks = buflen / PAGE_SZ;
let nblocks32 = u32::try_from(nblocks).unwrap();
let cache = page_cache::get();
static CTX: Lazy<RequestContext> = Lazy::new(|| {
RequestContext::new(
crate::task_mgr::TaskKind::EphemeralFilePreWarmPageCache,
crate::context::DownloadBehavior::Error,
)
});
for blknum_in_buffer in 0..nblocks {
let blk_in_buffer = &buf[blknum_in_buffer * PAGE_SZ..(blknum_in_buffer + 1) * PAGE_SZ];
let blknum = self
.nwritten_blocks
.checked_add(blknum_in_buffer as u32)
.unwrap();
match cache
.read_immutable_buf(self.page_cache_file_id, blknum, &CTX)
.await
{
Err(e) => {
error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
// fail gracefully, it's not the end of the world if we can't pre-warm the cache here
}
Ok(v) => match v {
page_cache::ReadBufResult::Found(_guard) => {
// This function takes &mut self, so, it shouldn't be possible to reach this point.
unreachable!("we just wrote block {blknum} to the VirtualFile, which is owned by Self, \
if matches!(self.prewarm_on_write, PrewarmOnWrite::Yes) {
// Pre-warm page cache with the contents.
// At least in isolated bulk ingest benchmarks (test_bulk_insert.py), the pre-warming
// benefits the code that writes InMemoryLayer=>L0 layers.
let cache = page_cache::get();
static CTX: Lazy<RequestContext> = Lazy::new(|| {
RequestContext::new(
crate::task_mgr::TaskKind::EphemeralFilePreWarmPageCache,
crate::context::DownloadBehavior::Error,
)
});
for blknum_in_buffer in 0..nblocks {
let blk_in_buffer =
&buf[blknum_in_buffer * PAGE_SZ..(blknum_in_buffer + 1) * PAGE_SZ];
let blknum = self
.nwritten_blocks
.checked_add(blknum_in_buffer as u32)
.unwrap();
match cache
.read_immutable_buf(self.page_cache_file_id, blknum, &CTX)
.await
{
Err(e) => {
error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
// fail gracefully, it's not the end of the world if we can't pre-warm the cache here
}
Ok(v) => match v {
page_cache::ReadBufResult::Found(_guard) => {
// This function takes &mut self, so, it shouldn't be possible to reach this point.
unreachable!("we just wrote block {blknum} to the VirtualFile, which is owned by Self, \
and this function takes &mut self, so, no concurrent read_blk is possible");
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
write_guard.copy_from_slice(blk_in_buffer);
let _ = write_guard.mark_valid();
}
},
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
write_guard.copy_from_slice(blk_in_buffer);
let _ = write_guard.mark_valid();
}
},
}
}
}
self.nwritten_blocks = self.nwritten_blocks.checked_add(nblocks32).unwrap();
Ok((buflen, buf.into_inner()))
}

View File

@@ -75,6 +75,21 @@ where
flushed_offset + u64::try_from(buffer.pending()).unwrap()
}
/// Get a slice of all blocks that [`Self::read_blk`] would return as [`ReadResult::ServedFromZeroPaddedMutableTail`].
pub fn get_tail_zero_padded(&self) -> &[u8] {
let buffer: &zero_padded::Buffer<TAIL_SZ> = self.buffered_writer.inspect_buffer();
let buffer_written_up_to = buffer.pending();
// pad to next page boundary
let read_up_to = if buffer_written_up_to % PAGE_SZ == 0 {
buffer_written_up_to
} else {
buffer_written_up_to
.checked_add(PAGE_SZ - (buffer_written_up_to % PAGE_SZ))
.unwrap()
};
&buffer.as_zero_padded_slice()[0..read_up_to]
}
pub(crate) async fn read_blk(&self, blknum: u32) -> Result<ReadResult<'_, W>, std::io::Error> {
let flushed_offset = self.buffered_writer.as_inner().bytes_written();
let buffer: &zero_padded::Buffer<TAIL_SZ> = self.buffered_writer.inspect_buffer();

View File

@@ -43,7 +43,8 @@ use crate::tenant::config::{
use crate::tenant::span::debug_assert_current_span_has_tenant_id;
use crate::tenant::storage_layer::inmemory_layer;
use crate::tenant::timeline::ShutdownMode;
use crate::tenant::{AttachedTenantConf, GcError, SpawnMode, Tenant, TenantState};
use crate::tenant::{AttachedTenantConf, GcError, LoadConfigError, SpawnMode, Tenant, TenantState};
use crate::virtual_file::MaybeFatalIo;
use crate::{InitializationOrder, TEMP_FILE_SUFFIX};
use utils::crashsafe::path_with_suffix_extension;
@@ -272,7 +273,7 @@ pub struct TenantManager {
}
fn emergency_generations(
tenant_confs: &HashMap<TenantShardId, anyhow::Result<LocationConf>>,
tenant_confs: &HashMap<TenantShardId, Result<LocationConf, LoadConfigError>>,
) -> HashMap<TenantShardId, TenantStartupMode> {
tenant_confs
.iter()
@@ -296,7 +297,7 @@ fn emergency_generations(
async fn init_load_generations(
conf: &'static PageServerConf,
tenant_confs: &HashMap<TenantShardId, anyhow::Result<LocationConf>>,
tenant_confs: &HashMap<TenantShardId, Result<LocationConf, LoadConfigError>>,
resources: &TenantSharedResources,
cancel: &CancellationToken,
) -> anyhow::Result<Option<HashMap<TenantShardId, TenantStartupMode>>> {
@@ -346,56 +347,32 @@ async fn init_load_generations(
/// Given a directory discovered in the pageserver's tenants/ directory, attempt
/// to load a tenant config from it.
///
/// If file is missing, return Ok(None)
/// If we cleaned up something expected (like an empty dir or a temp dir), return None.
fn load_tenant_config(
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
dentry: Utf8DirEntry,
) -> anyhow::Result<Option<(TenantShardId, anyhow::Result<LocationConf>)>> {
) -> Option<Result<LocationConf, LoadConfigError>> {
let tenant_dir_path = dentry.path().to_path_buf();
if crate::is_temporary(&tenant_dir_path) {
info!("Found temporary tenant directory, removing: {tenant_dir_path}");
// No need to use safe_remove_tenant_dir_all because this is already
// a temporary path
if let Err(e) = std::fs::remove_dir_all(&tenant_dir_path) {
error!(
"Failed to remove temporary directory '{}': {:?}",
tenant_dir_path, e
);
}
return Ok(None);
std::fs::remove_dir_all(&tenant_dir_path).fatal_err("delete temporary tenant dir");
return None;
}
// This case happens if we crash during attachment before writing a config into the dir
let is_empty = tenant_dir_path
.is_empty_dir()
.with_context(|| format!("Failed to check whether {tenant_dir_path:?} is an empty dir"))?;
.fatal_err("Checking for empty tenant dir");
if is_empty {
info!("removing empty tenant directory {tenant_dir_path:?}");
if let Err(e) = std::fs::remove_dir(&tenant_dir_path) {
error!(
"Failed to remove empty tenant directory '{}': {e:#}",
tenant_dir_path
)
}
return Ok(None);
std::fs::remove_dir(&tenant_dir_path).fatal_err("delete empty tenant dir");
return None;
}
let tenant_shard_id = match tenant_dir_path
.file_name()
.unwrap_or_default()
.parse::<TenantShardId>()
{
Ok(id) => id,
Err(_) => {
warn!("Invalid tenant path (garbage in our repo directory?): {tenant_dir_path}",);
return Ok(None);
}
};
Ok(Some((
tenant_shard_id,
Tenant::load_tenant_config(conf, &tenant_shard_id),
)))
Some(Tenant::load_tenant_config(conf, &tenant_shard_id))
}
/// Initial stage of load: walk the local tenants directory, clean up any temp files,
@@ -405,32 +382,51 @@ fn load_tenant_config(
/// seconds even on reasonably fast drives.
async fn init_load_tenant_configs(
conf: &'static PageServerConf,
) -> anyhow::Result<HashMap<TenantShardId, anyhow::Result<LocationConf>>> {
) -> HashMap<TenantShardId, Result<LocationConf, LoadConfigError>> {
let tenants_dir = conf.tenants_path();
let dentries = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<Utf8DirEntry>> {
let dir_entries = tenants_dir
.read_dir_utf8()
.with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
let dentries = tokio::task::spawn_blocking(move || -> Vec<Utf8DirEntry> {
let context = format!("read tenants dir {tenants_dir}");
let dir_entries = tenants_dir.read_dir_utf8().fatal_err(&context);
Ok(dir_entries.collect::<Result<Vec<_>, std::io::Error>>()?)
dir_entries
.collect::<Result<Vec<_>, std::io::Error>>()
.fatal_err(&context)
})
.await??;
.await
.expect("Config load task panicked");
let mut configs = HashMap::new();
let mut join_set = JoinSet::new();
for dentry in dentries {
join_set.spawn_blocking(move || load_tenant_config(conf, dentry));
let tenant_shard_id = match dentry.file_name().parse::<TenantShardId>() {
Ok(id) => id,
Err(_) => {
warn!(
"Invalid tenant path (garbage in our repo directory?): '{}'",
dentry.file_name()
);
continue;
}
};
join_set.spawn_blocking(move || {
(
tenant_shard_id,
load_tenant_config(conf, tenant_shard_id, dentry),
)
});
}
while let Some(r) = join_set.join_next().await {
if let Some((tenant_id, tenant_config)) = r?? {
configs.insert(tenant_id, tenant_config);
let (tenant_shard_id, tenant_config) = r.expect("Panic in config load task");
if let Some(tenant_config) = tenant_config {
configs.insert(tenant_shard_id, tenant_config);
}
}
Ok(configs)
configs
}
#[derive(Debug, thiserror::Error)]
@@ -472,7 +468,7 @@ pub async fn init_tenant_mgr(
);
// Scan local filesystem for attached tenants
let tenant_configs = init_load_tenant_configs(conf).await?;
let tenant_configs = init_load_tenant_configs(conf).await;
// Determine which tenants are to be secondary or attached, and in which generation
let tenant_modes = init_load_generations(conf, &tenant_configs, &resources, &cancel).await?;
@@ -590,31 +586,23 @@ pub async fn init_tenant_mgr(
);
// For those shards that have live configurations, construct `Tenant` or `SecondaryTenant` objects and start them running
for (tenant_shard_id, location_conf, config_write_result) in config_write_results {
// Errors writing configs are fatal
config_write_result?;
// Writing a config to local disk is foundational to startup up tenants: panic if we can't.
config_write_result.fatal_err("write tenant shard config file");
let tenant_dir_path = conf.tenant_path(&tenant_shard_id);
let shard_identity = location_conf.shard;
let slot = match location_conf.mode {
LocationMode::Attached(attached_conf) => {
match tenant_spawn(
conf,
tenant_shard_id,
&tenant_dir_path,
resources.clone(),
AttachedTenantConf::new(location_conf.tenant_conf, attached_conf),
shard_identity,
Some(init_order.clone()),
SpawnMode::Lazy,
&ctx,
) {
Ok(tenant) => TenantSlot::Attached(tenant),
Err(e) => {
error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Failed to start tenant: {e:#}");
continue;
}
}
}
LocationMode::Attached(attached_conf) => TenantSlot::Attached(tenant_spawn(
conf,
tenant_shard_id,
&tenant_dir_path,
resources.clone(),
AttachedTenantConf::new(location_conf.tenant_conf, attached_conf),
shard_identity,
Some(init_order.clone()),
SpawnMode::Lazy,
&ctx,
)),
LocationMode::Secondary(secondary_conf) => {
info!(
tenant_id = %tenant_shard_id.tenant_id,
@@ -649,8 +637,7 @@ pub async fn init_tenant_mgr(
})
}
/// Wrapper for Tenant::spawn that checks invariants before running, and inserts
/// a broken tenant in the map if Tenant::spawn fails.
/// Wrapper for Tenant::spawn that checks invariants before running
#[allow(clippy::too_many_arguments)]
fn tenant_spawn(
conf: &'static PageServerConf,
@@ -662,23 +649,18 @@ fn tenant_spawn(
init_order: Option<InitializationOrder>,
mode: SpawnMode,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
anyhow::ensure!(
tenant_path.is_dir(),
"Cannot load tenant from path {tenant_path:?}, it either does not exist or not a directory"
);
anyhow::ensure!(
!crate::is_temporary(tenant_path),
"Cannot load tenant from temporary path {tenant_path:?}"
);
anyhow::ensure!(
!tenant_path.is_empty_dir().with_context(|| {
format!("Failed to check whether {tenant_path:?} is an empty dir")
})?,
"Cannot load tenant from empty directory {tenant_path:?}"
);
) -> Arc<Tenant> {
// All these conditions should have been satisfied by our caller: the tenant dir exists, is a well formed
// path, and contains a configuration file. Assertions that do synchronous I/O are limited to debug mode
// to avoid impacting prod runtime performance.
assert!(!crate::is_temporary(tenant_path));
debug_assert!(tenant_path.is_dir());
debug_assert!(conf
.tenant_location_config_path(&tenant_shard_id)
.try_exists()
.unwrap());
let tenant = Tenant::spawn(
Tenant::spawn(
conf,
tenant_shard_id,
resources,
@@ -687,9 +669,7 @@ fn tenant_spawn(
init_order,
mode,
ctx,
);
Ok(tenant)
)
}
async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
@@ -840,8 +820,9 @@ pub(crate) enum UpsertLocationError {
#[error("Failed to flush: {0}")]
Flush(anyhow::Error),
/// This error variant is for unexpected situations (soft assertions) where the system is in an unexpected state.
#[error("Internal error: {0}")]
Other(#[from] anyhow::Error),
InternalError(anyhow::Error),
}
impl TenantManager {
@@ -971,7 +952,8 @@ impl TenantManager {
match fast_path_taken {
Some(FastPathModified::Attached(tenant)) => {
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
.await?;
.await
.fatal_err("write tenant shard config");
// Transition to AttachedStale means we may well hold a valid generation
// still, and have been requested to go stale as part of a migration. If
@@ -1001,7 +983,8 @@ impl TenantManager {
}
Some(FastPathModified::Secondary(_secondary_tenant)) => {
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
.await?;
.await
.fatal_err("write tenant shard config");
return Ok(None);
}
@@ -1067,7 +1050,7 @@ impl TenantManager {
Some(TenantSlot::InProgress(_)) => {
// This should never happen: acquire_slot should error out
// if the contents of a slot were InProgress.
return Err(UpsertLocationError::Other(anyhow::anyhow!(
return Err(UpsertLocationError::InternalError(anyhow::anyhow!(
"Acquired an InProgress slot, this is a bug."
)));
}
@@ -1086,12 +1069,14 @@ impl TenantManager {
// Does not need to be fsync'd because local storage is just a cache.
tokio::fs::create_dir_all(&timelines_path)
.await
.with_context(|| format!("Creating {timelines_path}"))?;
.fatal_err("create timelines/ dir");
// Before activating either secondary or attached mode, persist the
// configuration, so that on restart we will re-attach (or re-start
// secondary) on the tenant.
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config).await?;
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
.await
.fatal_err("write tenant shard config");
let new_slot = match &new_location_config.mode {
LocationMode::Secondary(secondary_config) => {
@@ -1110,13 +1095,15 @@ impl TenantManager {
// from upserts. This enables creating generation-less tenants even though neon_local
// always uses generations when calling the location conf API.
let attached_conf = if cfg!(feature = "testing") {
let mut conf = AttachedTenantConf::try_from(new_location_config)?;
let mut conf = AttachedTenantConf::try_from(new_location_config)
.map_err(UpsertLocationError::BadRequest)?;
if self.conf.control_plane_api.is_none() {
conf.location.generation = Generation::none();
}
conf
} else {
AttachedTenantConf::try_from(new_location_config)?
AttachedTenantConf::try_from(new_location_config)
.map_err(UpsertLocationError::BadRequest)?
};
let tenant = tenant_spawn(
@@ -1129,7 +1116,7 @@ impl TenantManager {
None,
spawn_mode,
ctx,
)?;
);
TenantSlot::Attached(tenant)
}
@@ -1143,7 +1130,7 @@ impl TenantManager {
match slot_guard.upsert(new_slot) {
Err(TenantSlotUpsertError::InternalError(e)) => {
Err(UpsertLocationError::Other(anyhow::anyhow!(e)))
Err(UpsertLocationError::InternalError(anyhow::anyhow!(e)))
}
Err(TenantSlotUpsertError::MapState(e)) => Err(UpsertLocationError::Unavailable(e)),
Err(TenantSlotUpsertError::ShuttingDown((new_slot, _completion))) => {
@@ -1250,7 +1237,7 @@ impl TenantManager {
None,
SpawnMode::Eager,
ctx,
)?;
);
slot_guard.upsert(TenantSlot::Attached(tenant))?;
@@ -1984,7 +1971,7 @@ impl TenantManager {
None,
SpawnMode::Eager,
ctx,
)?;
);
slot_guard.upsert(TenantSlot::Attached(tenant))?;

View File

@@ -519,7 +519,7 @@ impl RemoteTimelineClient {
local_path: &Utf8Path,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<u64> {
) -> Result<u64, DownloadError> {
let downloaded_size = {
let _unfinished_gauge_guard = self.metrics.call_begin(
&RemoteOpFileKind::Layer,

View File

@@ -23,6 +23,8 @@ use super::{
storage_layer::LayerName,
};
use crate::metrics::SECONDARY_RESIDENT_PHYSICAL_SIZE;
use metrics::UIntGauge;
use pageserver_api::{
models,
shard::{ShardIdentity, TenantShardId},
@@ -99,6 +101,17 @@ pub(crate) struct SecondaryTenant {
// Public state indicating overall progress of downloads relative to the last heatmap seen
pub(crate) progress: std::sync::Mutex<models::SecondaryProgress>,
// Sum of layer sizes on local disk
pub(super) resident_size_metric: UIntGauge,
}
impl Drop for SecondaryTenant {
fn drop(&mut self) {
let tenant_id = self.tenant_shard_id.tenant_id.to_string();
let shard_id = format!("{}", self.tenant_shard_id.shard_slug());
let _ = SECONDARY_RESIDENT_PHYSICAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]);
}
}
impl SecondaryTenant {
@@ -108,6 +121,12 @@ impl SecondaryTenant {
tenant_conf: TenantConfOpt,
config: &SecondaryLocationConfig,
) -> Arc<Self> {
let tenant_id = tenant_shard_id.tenant_id.to_string();
let shard_id = format!("{}", tenant_shard_id.shard_slug());
let resident_size_metric = SECONDARY_RESIDENT_PHYSICAL_SIZE
.get_metric_with_label_values(&[&tenant_id, &shard_id])
.unwrap();
Arc::new(Self {
tenant_shard_id,
// todo: shall we make this a descendent of the
@@ -123,6 +142,8 @@ impl SecondaryTenant {
detail: std::sync::Mutex::new(SecondaryDetail::new(config.clone())),
progress: std::sync::Mutex::default(),
resident_size_metric,
})
}
@@ -211,16 +232,12 @@ impl SecondaryTenant {
// have to 100% match what is on disk, because it's a best-effort warming
// of the cache.
let mut detail = this.detail.lock().unwrap();
if let Some(timeline_detail) = detail.timelines.get_mut(&timeline_id) {
let removed = timeline_detail.on_disk_layers.remove(&name);
// We might race with removal of the same layer during downloads, if it was removed
// from the heatmap. If we see that the OnDiskState is gone, then no need to
// do a physical deletion or store in evicted_at.
if let Some(removed) = removed {
removed.remove_blocking();
timeline_detail.evicted_at.insert(name, now);
}
if let Some(removed) =
detail.evict_layer(name, &timeline_id, now, &this.resident_size_metric)
{
// We might race with removal of the same layer during downloads, so finding the layer we
// were trying to remove is optional. Only issue the disk I/O to remove it if we found it.
removed.remove_blocking();
}
})
.await

View File

@@ -46,6 +46,7 @@ use crate::tenant::{
use camino::Utf8PathBuf;
use chrono::format::{DelayedFormat, StrftimeItems};
use futures::Future;
use metrics::UIntGauge;
use pageserver_api::models::SecondaryProgress;
use pageserver_api::shard::TenantShardId;
use remote_storage::{DownloadError, Etag, GenericRemoteStorage};
@@ -131,16 +132,66 @@ impl OnDiskState {
.or_else(fs_ext::ignore_not_found)
.fatal_err("Deleting secondary layer")
}
pub(crate) fn file_size(&self) -> u64 {
self.metadata.file_size
}
}
#[derive(Debug, Clone, Default)]
pub(super) struct SecondaryDetailTimeline {
pub(super) on_disk_layers: HashMap<LayerName, OnDiskState>,
on_disk_layers: HashMap<LayerName, OnDiskState>,
/// We remember when layers were evicted, to prevent re-downloading them.
pub(super) evicted_at: HashMap<LayerName, SystemTime>,
}
impl SecondaryDetailTimeline {
pub(super) fn remove_layer(
&mut self,
name: &LayerName,
resident_metric: &UIntGauge,
) -> Option<OnDiskState> {
let removed = self.on_disk_layers.remove(name);
if let Some(removed) = &removed {
resident_metric.sub(removed.file_size());
}
removed
}
/// `local_path`
fn touch_layer<F>(
&mut self,
conf: &'static PageServerConf,
tenant_shard_id: &TenantShardId,
timeline_id: &TimelineId,
touched: &HeatMapLayer,
resident_metric: &UIntGauge,
local_path: F,
) where
F: FnOnce() -> Utf8PathBuf,
{
use std::collections::hash_map::Entry;
match self.on_disk_layers.entry(touched.name.clone()) {
Entry::Occupied(mut v) => {
v.get_mut().access_time = touched.access_time;
}
Entry::Vacant(e) => {
e.insert(OnDiskState::new(
conf,
tenant_shard_id,
timeline_id,
touched.name.clone(),
touched.metadata.clone(),
touched.access_time,
local_path(),
));
resident_metric.add(touched.metadata.file_size);
}
}
}
}
// Aspects of a heatmap that we remember after downloading it
#[derive(Clone, Debug)]
struct DownloadSummary {
@@ -158,7 +209,7 @@ pub(super) struct SecondaryDetail {
last_download: Option<DownloadSummary>,
next_download: Option<Instant>,
pub(super) timelines: HashMap<TimelineId, SecondaryDetailTimeline>,
timelines: HashMap<TimelineId, SecondaryDetailTimeline>,
}
/// Helper for logging SystemTime
@@ -191,6 +242,38 @@ impl SecondaryDetail {
}
}
pub(super) fn evict_layer(
&mut self,
name: LayerName,
timeline_id: &TimelineId,
now: SystemTime,
resident_metric: &UIntGauge,
) -> Option<OnDiskState> {
let timeline = self.timelines.get_mut(timeline_id)?;
let removed = timeline.remove_layer(&name, resident_metric);
if removed.is_some() {
timeline.evicted_at.insert(name, now);
}
removed
}
pub(super) fn remove_timeline(
&mut self,
timeline_id: &TimelineId,
resident_metric: &UIntGauge,
) {
let removed = self.timelines.remove(timeline_id);
if let Some(removed) = removed {
resident_metric.sub(
removed
.on_disk_layers
.values()
.map(|l| l.metadata.file_size)
.sum(),
);
}
}
/// Additionally returns the total number of layers, used for more stable relative access time
/// based eviction.
pub(super) fn get_layers_for_eviction(
@@ -601,8 +684,13 @@ impl<'a> TenantDownloader<'a> {
Some(t) => t,
None => {
// We have no existing state: need to scan local disk for layers first.
let timeline_state =
init_timeline_state(self.conf, tenant_shard_id, timeline).await;
let timeline_state = init_timeline_state(
self.conf,
tenant_shard_id,
timeline,
&self.secondary_state.resident_size_metric,
)
.await;
// Re-acquire detail lock now that we're done with async load from local FS
self.secondary_state
@@ -671,6 +759,25 @@ impl<'a> TenantDownloader<'a> {
.await?;
}
// Metrics consistency check in testing builds
if cfg!(feature = "testing") {
let detail = self.secondary_state.detail.lock().unwrap();
let resident_size = detail
.timelines
.values()
.map(|tl| {
tl.on_disk_layers
.values()
.map(|v| v.metadata.file_size)
.sum::<u64>()
})
.sum::<u64>();
assert_eq!(
resident_size,
self.secondary_state.resident_size_metric.get()
);
}
// Only update last_etag after a full successful download: this way will not skip
// the next download, even if the heatmap's actual etag is unchanged.
self.secondary_state.detail.lock().unwrap().last_download = Some(DownloadSummary {
@@ -783,7 +890,7 @@ impl<'a> TenantDownloader<'a> {
for delete_timeline in &delete_timelines {
// We haven't removed from disk yet, but optimistically remove from in-memory state: if removal
// from disk fails that will be a fatal error.
detail.timelines.remove(delete_timeline);
detail.remove_timeline(delete_timeline, &self.secondary_state.resident_size_metric);
}
}
@@ -801,7 +908,7 @@ impl<'a> TenantDownloader<'a> {
let Some(timeline_state) = detail.timelines.get_mut(&timeline_id) else {
continue;
};
timeline_state.on_disk_layers.remove(&layer_name);
timeline_state.remove_layer(&layer_name, &self.secondary_state.resident_size_metric);
}
for timeline_id in delete_timelines {
@@ -1000,33 +1107,24 @@ impl<'a> TenantDownloader<'a> {
let timeline_detail = detail.timelines.entry(timeline_id).or_default();
tracing::info!("Wrote timeline_detail for {} touched layers", touched.len());
for t in touched {
use std::collections::hash_map::Entry;
match timeline_detail.on_disk_layers.entry(t.name.clone()) {
Entry::Occupied(mut v) => {
v.get_mut().access_time = t.access_time;
}
Entry::Vacant(e) => {
let local_path = local_layer_path(
touched.into_iter().for_each(|t| {
timeline_detail.touch_layer(
self.conf,
tenant_shard_id,
&timeline_id,
&t,
&self.secondary_state.resident_size_metric,
|| {
local_layer_path(
self.conf,
tenant_shard_id,
&timeline_id,
&t.name,
&t.metadata.generation,
);
e.insert(OnDiskState::new(
self.conf,
tenant_shard_id,
&timeline_id,
t.name,
t.metadata.clone(),
t.access_time,
local_path,
));
}
}
}
)
},
)
});
}
result
@@ -1135,6 +1233,7 @@ async fn init_timeline_state(
conf: &'static PageServerConf,
tenant_shard_id: &TenantShardId,
heatmap: &HeatMapTimeline,
resident_metric: &UIntGauge,
) -> SecondaryDetailTimeline {
let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id);
let mut detail = SecondaryDetailTimeline::default();
@@ -1210,17 +1309,13 @@ async fn init_timeline_state(
} else {
// We expect the access time to be initialized immediately afterwards, when
// the latest heatmap is applied to the state.
detail.on_disk_layers.insert(
name.clone(),
OnDiskState::new(
conf,
tenant_shard_id,
&heatmap.timeline_id,
name,
remote_meta.metadata.clone(),
remote_meta.access_time,
file_path,
),
detail.touch_layer(
conf,
tenant_shard_id,
&heatmap.timeline_id,
remote_meta,
resident_metric,
|| file_path,
);
}
}

View File

@@ -452,7 +452,12 @@ impl DeltaLayerWriterInner {
ctx: &RequestContext,
) -> (Vec<u8>, anyhow::Result<()>) {
assert!(self.lsn_range.start <= lsn);
let (val, res) = self.blob_writer.write_blob(val, ctx).await;
// We don't want to use compression in delta layer creation
let compression = None;
let (val, res) = self
.blob_writer
.write_blob_maybe_compressed(val, ctx, compression)
.await;
let off = match res {
Ok(off) => off,
Err(e) => return (val, Err(anyhow::anyhow!(e))),

View File

@@ -165,6 +165,7 @@ pub struct ImageLayerInner {
file_id: FileId,
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
compressed_reads: bool,
}
impl std::fmt::Debug for ImageLayerInner {
@@ -178,7 +179,8 @@ impl std::fmt::Debug for ImageLayerInner {
impl ImageLayerInner {
pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let block_reader =
FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads);
let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(
self.index_start_blk,
self.index_root_blk,
@@ -266,9 +268,10 @@ impl ImageLayer {
async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
let path = self.path();
let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx)
.await
.and_then(|res| res)?;
let loaded =
ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, false, ctx)
.await
.and_then(|res| res)?;
// not production code
let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
@@ -377,6 +380,7 @@ impl ImageLayerInner {
lsn: Lsn,
summary: Option<Summary>,
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
support_compressed_reads: bool,
ctx: &RequestContext,
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
let file = match VirtualFile::open(path, ctx).await {
@@ -420,6 +424,7 @@ impl ImageLayerInner {
file,
file_id,
max_vectored_read_bytes,
compressed_reads: support_compressed_reads,
key_range: actual_summary.key_range,
}))
}
@@ -430,7 +435,8 @@ impl ImageLayerInner {
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let block_reader =
FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads);
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);
@@ -490,12 +496,14 @@ impl ImageLayerInner {
&self,
ctx: &RequestContext,
) -> anyhow::Result<Vec<(Key, Lsn, Value)>> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let block_reader =
FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads);
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);
let mut result = Vec::new();
let mut stream = Box::pin(tree_reader.into_stream(&[0; KEY_SIZE], ctx));
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let block_reader =
FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads);
let cursor = block_reader.block_cursor();
while let Some(item) = stream.next().await {
// TODO: dedup code with get_reconstruct_value
@@ -530,7 +538,8 @@ impl ImageLayerInner {
.into(),
);
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let block_reader =
FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads);
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
@@ -691,7 +700,8 @@ impl ImageLayerInner {
#[cfg(test)]
pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let block_reader =
FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads);
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
ImageLayerIterator {

View File

@@ -6,13 +6,14 @@
//!
use crate::config::PageServerConf;
use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
use crate::page_cache::PAGE_SZ;
use crate::repository::{Key, Value};
use crate::tenant::block_io::BlockReader;
use crate::tenant::block_io::{BlockCursor, BlockReader, BlockReaderRef};
use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::storage_layer::ValueReconstructResult;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::{PageReconstructError, Timeline};
use crate::{page_cache, walrecord};
use crate::{l0_flush, page_cache, walrecord};
use anyhow::{anyhow, ensure, Result};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::InMemoryLayerInfo;
@@ -410,6 +411,7 @@ impl InMemoryLayer {
continue;
}
// TODO: this uses the page cache => https://github.com/neondatabase/neon/issues/8183
let buf = reader.read_blob(block_read.block_offset, &ctx).await;
if let Err(e) = buf {
reconstruct_state
@@ -620,6 +622,13 @@ impl InMemoryLayer {
// rare though, so we just accept the potential latency hit for now.
let inner = self.inner.read().await;
let l0_flush_global_state = timeline.l0_flush_global_state.inner().clone();
use l0_flush::Inner;
let _concurrency_permit = match &*l0_flush_global_state {
Inner::PageCached => None,
Inner::Direct { semaphore, .. } => Some(semaphore.acquire().await),
};
let end_lsn = *self.end_lsn.get().unwrap();
let key_count = if let Some(key_range) = key_range {
@@ -645,28 +654,77 @@ impl InMemoryLayer {
)
.await?;
let mut buf = Vec::new();
match &*l0_flush_global_state {
l0_flush::Inner::PageCached => {
let ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
.build();
let cursor = inner.file.block_cursor();
let mut buf = Vec::new();
let ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
.build();
for (key, vec_map) in inner.index.iter() {
// Write all page versions
for (lsn, pos) in vec_map.as_slice() {
cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?;
let will_init = Value::des(&buf)?.will_init();
let res;
(buf, res) = delta_layer_writer
.put_value_bytes(*key, *lsn, buf, will_init, &ctx)
.await;
res?;
let cursor = inner.file.block_cursor();
for (key, vec_map) in inner.index.iter() {
// Write all page versions
for (lsn, pos) in vec_map.as_slice() {
cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?;
let will_init = Value::des(&buf)?.will_init();
let res;
(buf, res) = delta_layer_writer
.put_value_bytes(*key, *lsn, buf, will_init, &ctx)
.await;
res?;
}
}
}
l0_flush::Inner::Direct { .. } => {
let file_contents: Vec<u8> = inner.file.load_to_vec(ctx).await?;
assert_eq!(
file_contents.len() % PAGE_SZ,
0,
"needed by BlockReaderRef::Slice"
);
assert_eq!(file_contents.len(), {
let written = usize::try_from(inner.file.len()).unwrap();
if written % PAGE_SZ == 0 {
written
} else {
written.checked_add(PAGE_SZ - (written % PAGE_SZ)).unwrap()
}
});
let cursor = BlockCursor::new(BlockReaderRef::Slice(&file_contents));
let mut buf = Vec::new();
for (key, vec_map) in inner.index.iter() {
// Write all page versions
for (lsn, pos) in vec_map.as_slice() {
// TODO: once we have blob lengths in the in-memory index, we can
// 1. get rid of the blob_io / BlockReaderRef::Slice business and
// 2. load the file contents into a Bytes and
// 3. the use `Bytes::slice` to get the `buf` that is our blob
// 4. pass that `buf` into `put_value_bytes`
// => https://github.com/neondatabase/neon/issues/8183
cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
let will_init = Value::des(&buf)?.will_init();
let res;
(buf, res) = delta_layer_writer
.put_value_bytes(*key, *lsn, buf, will_init, ctx)
.await;
res?;
}
}
// Hold the permit until the IO is done; if we didn't, one could drop this future,
// thereby releasing the permit, but the Vec<u8> remains allocated until the IO completes.
// => we'd have more concurrenct Vec<u8> than allowed as per the semaphore.
drop(_concurrency_permit);
}
}
// MAX is used here because we identify L0 layers by full key range
let delta_layer = delta_layer_writer.finish(Key::MAX, timeline, &ctx).await?;
let delta_layer = delta_layer_writer.finish(Key::MAX, timeline, ctx).await?;
Ok(Some(delta_layer))
}
}

View File

@@ -1096,19 +1096,10 @@ impl LayerInner {
match rx.await {
Ok(Ok(res)) => Ok(res),
Ok(Err(e)) => {
// sleep already happened in the spawned task, if it was not cancelled
match e.downcast_ref::<remote_storage::DownloadError>() {
// If the download failed due to its cancellation token,
// propagate the cancellation error upstream.
Some(remote_storage::DownloadError::Cancelled) => {
Err(DownloadError::DownloadCancelled)
}
// FIXME: this is not embedding the error because historically it would had
// been output to compute, however that is no longer the case.
_ => Err(DownloadError::DownloadFailed),
}
Ok(Err(remote_storage::DownloadError::Cancelled)) => {
Err(DownloadError::DownloadCancelled)
}
Ok(Err(_)) => Err(DownloadError::DownloadFailed),
Err(_gone) => Err(DownloadError::DownloadCancelled),
}
}
@@ -1118,7 +1109,7 @@ impl LayerInner {
timeline: Arc<Timeline>,
permit: heavier_once_cell::InitPermit,
ctx: &RequestContext,
) -> anyhow::Result<Arc<DownloadedLayer>> {
) -> Result<Arc<DownloadedLayer>, remote_storage::DownloadError> {
let result = timeline
.remote_client
.download_layer_file(
@@ -1694,6 +1685,7 @@ impl DownloadedLayer {
lsn,
summary,
Some(owner.conf.max_vectored_read_bytes),
owner.conf.image_compression.is_some(),
ctx,
)
.await

View File

@@ -65,7 +65,6 @@ use std::{
ops::{Deref, Range},
};
use crate::metrics::GetKind;
use crate::pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS;
use crate::{
aux_file::AuxFileSizeEstimator,
@@ -90,6 +89,10 @@ use crate::{
use crate::{
disk_usage_eviction_task::EvictionCandidate, tenant::storage_layer::delta_layer::DeltaEntry,
};
use crate::{
l0_flush::{self, L0FlushGlobalState},
metrics::GetKind,
};
use crate::{
metrics::ScanLatencyOngoingRecording, tenant::timeline::logical_size::CurrentLogicalSize,
};
@@ -208,6 +211,7 @@ pub struct TimelineResources {
pub timeline_get_throttle: Arc<
crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>,
>,
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
}
pub(crate) struct AuxFilesState {
@@ -433,6 +437,8 @@ pub struct Timeline {
/// in the future, add `extra_test_sparse_keyspace` if necessary.
#[cfg(test)]
pub(crate) extra_test_dense_keyspace: ArcSwap<KeySpace>,
pub(crate) l0_flush_global_state: L0FlushGlobalState,
}
pub struct WalReceiverInfo {
@@ -457,6 +463,9 @@ pub(crate) struct GcInfo {
/// Leases granted to particular LSNs.
pub(crate) leases: BTreeMap<Lsn, LsnLease>,
/// Whether our branch point is within our ancestor's PITR interval (for cost estimation)
pub(crate) within_ancestor_pitr: bool,
}
impl GcInfo {
@@ -717,6 +726,9 @@ impl From<CreateImageLayersError> for CompactionError {
fn from(e: CreateImageLayersError) -> Self {
match e {
CreateImageLayersError::Cancelled => CompactionError::ShuttingDown,
CreateImageLayersError::Other(e) => {
CompactionError::Other(e.context("create image layers"))
}
_ => CompactionError::Other(e.into()),
}
}
@@ -845,6 +857,18 @@ impl Timeline {
.map(|ancestor| ancestor.timeline_id)
}
/// Get the bytes written since the PITR cutoff on this branch, and
/// whether this branch's ancestor_lsn is within its parent's PITR.
pub(crate) fn get_pitr_history_stats(&self) -> (u64, bool) {
let gc_info = self.gc_info.read().unwrap();
let history = self
.get_last_record_lsn()
.checked_sub(gc_info.cutoffs.pitr)
.unwrap_or(Lsn(0))
.0;
(history, gc_info.within_ancestor_pitr)
}
/// Lock and get timeline's GC cutoff
pub(crate) fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
self.latest_gc_cutoff_lsn.read()
@@ -996,6 +1020,7 @@ impl Timeline {
}
pub(crate) const MAX_GET_VECTORED_KEYS: u64 = 32;
pub(crate) const VEC_GET_LAYERS_VISITED_WARN_THRESH: f64 = 512.0;
/// Look up multiple page versions at a given LSN
///
@@ -1228,7 +1253,7 @@ impl Timeline {
let get_data_timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME
.for_get_kind(get_kind)
.start_timer();
self.get_vectored_reconstruct_data(keyspace, lsn, reconstruct_state, ctx)
self.get_vectored_reconstruct_data(keyspace.clone(), lsn, reconstruct_state, ctx)
.await?;
get_data_timer.stop_and_record();
@@ -1258,11 +1283,26 @@ impl Timeline {
// (this is a requirement, not a bug). Skip updating the metric in these cases
// to avoid infinite results.
if !results.is_empty() {
let avg = layers_visited as f64 / results.len() as f64;
if avg >= Self::VEC_GET_LAYERS_VISITED_WARN_THRESH {
use utils::rate_limit::RateLimit;
static LOGGED: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
let mut rate_limit = LOGGED.lock().unwrap();
rate_limit.call(|| {
tracing::info!(
tenant_id = %self.tenant_shard_id.tenant_id,
shard_id = %self.tenant_shard_id.shard_slug(),
timeline_id = %self.timeline_id,
"Vectored read for {} visited {} layers on average per key and {} in total. {}/{} pages were returned",
keyspace, avg, layers_visited, results.len(), keyspace.total_raw_size());
});
}
// Note that this is an approximation. Tracking the exact number of layers visited
// per key requires virtually unbounded memory usage and is inefficient
// (i.e. segment tree tracking each range queried from a layer)
crate::metrics::VEC_READ_NUM_LAYERS_VISITED
.observe(layers_visited as f64 / results.len() as f64);
crate::metrics::VEC_READ_NUM_LAYERS_VISITED.observe(avg);
}
Ok(results)
@@ -2376,6 +2416,8 @@ impl Timeline {
#[cfg(test)]
extra_test_dense_keyspace: ArcSwap::new(Arc::new(KeySpace::default())),
l0_flush_global_state: resources.l0_flush_global_state,
};
result.repartition_threshold =
result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
@@ -4711,6 +4753,42 @@ impl DurationRecorder {
}
}
/// Descriptor for a delta layer used in testing infra. The start/end key/lsn range of the
/// delta layer might be different from the min/max key/lsn in the delta layer. Therefore,
/// the layer descriptor requires the user to provide the ranges, which should cover all
/// keys specified in the `data` field.
#[cfg(test)]
pub struct DeltaLayerTestDesc {
pub lsn_range: Range<Lsn>,
pub key_range: Range<Key>,
pub data: Vec<(Key, Lsn, Value)>,
}
#[cfg(test)]
impl DeltaLayerTestDesc {
#[allow(dead_code)]
pub fn new(lsn_range: Range<Lsn>, key_range: Range<Key>, data: Vec<(Key, Lsn, Value)>) -> Self {
Self {
lsn_range,
key_range,
data,
}
}
pub fn new_with_inferred_key_range(
lsn_range: Range<Lsn>,
data: Vec<(Key, Lsn, Value)>,
) -> Self {
let key_min = data.iter().map(|(key, _, _)| key).min().unwrap();
let key_max = data.iter().map(|(key, _, _)| key).max().unwrap();
Self {
key_range: (*key_min)..(key_max.next()),
lsn_range,
data,
}
}
}
impl Timeline {
async fn finish_compact_batch(
self: &Arc<Self>,
@@ -5511,37 +5589,65 @@ impl Timeline {
#[cfg(test)]
pub(super) async fn force_create_delta_layer(
self: &Arc<Timeline>,
mut deltas: Vec<(Key, Lsn, Value)>,
mut deltas: DeltaLayerTestDesc,
check_start_lsn: Option<Lsn>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let last_record_lsn = self.get_last_record_lsn();
deltas.sort_unstable_by(|(ka, la, _), (kb, lb, _)| (ka, la).cmp(&(kb, lb)));
let min_key = *deltas.first().map(|(k, _, _)| k).unwrap();
let end_key = deltas.last().map(|(k, _, _)| k).unwrap().next();
let min_lsn = *deltas.iter().map(|(_, lsn, _)| lsn).min().unwrap();
let max_lsn = *deltas.iter().map(|(_, lsn, _)| lsn).max().unwrap();
deltas
.data
.sort_unstable_by(|(ka, la, _), (kb, lb, _)| (ka, la).cmp(&(kb, lb)));
assert!(deltas.data.first().unwrap().0 >= deltas.key_range.start);
assert!(deltas.data.last().unwrap().0 < deltas.key_range.end);
for (_, lsn, _) in &deltas.data {
assert!(deltas.lsn_range.start <= *lsn && *lsn < deltas.lsn_range.end);
}
assert!(
max_lsn <= last_record_lsn,
"advance last record lsn before inserting a layer, max_lsn={max_lsn}, last_record_lsn={last_record_lsn}"
deltas.lsn_range.end <= last_record_lsn,
"advance last record lsn before inserting a layer, end_lsn={}, last_record_lsn={}",
deltas.lsn_range.end,
last_record_lsn
);
let end_lsn = Lsn(max_lsn.0 + 1);
if let Some(check_start_lsn) = check_start_lsn {
assert!(min_lsn >= check_start_lsn);
assert!(deltas.lsn_range.start >= check_start_lsn);
}
// check if the delta layer does not violate the LSN invariant, the legacy compaction should always produce a batch of
// layers of the same start/end LSN, and so should the force inserted layer
{
/// Checks if a overlaps with b, assume a/b = [start, end).
pub fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
!(a.end <= b.start || b.end <= a.start)
}
let guard = self.layers.read().await;
for layer in guard.layer_map().iter_historic_layers() {
if layer.is_delta()
&& overlaps_with(&layer.lsn_range, &deltas.lsn_range)
&& layer.lsn_range != deltas.lsn_range
{
// If a delta layer overlaps with another delta layer AND their LSN range is not the same, panic
panic!(
"inserted layer violates delta layer LSN invariant: current_lsn_range={}..{}, conflict_lsn_range={}..{}",
deltas.lsn_range.start, deltas.lsn_range.end, layer.lsn_range.start, layer.lsn_range.end
);
}
}
}
let mut delta_layer_writer = DeltaLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
min_key,
min_lsn..end_lsn,
deltas.key_range.start,
deltas.lsn_range,
ctx,
)
.await?;
for (key, lsn, val) in deltas {
for (key, lsn, val) in deltas.data {
delta_layer_writer.put_value(key, lsn, val, ctx).await?;
}
let delta_layer = delta_layer_writer.finish(end_key, self, ctx).await?;
let delta_layer = delta_layer_writer
.finish(deltas.key_range.end, self, ctx)
.await?;
{
let mut guard = self.layers.write().await;

View File

@@ -272,6 +272,7 @@ impl DeleteTimelineFlow {
TimelineResources {
remote_client,
timeline_get_throttle: tenant.timeline_get_throttle.clone(),
l0_flush_global_state: tenant.l0_flush_global_state.clone(),
},
// Important. We dont pass ancestor above because it can be missing.
// Thus we need to skip the validation here.

View File

@@ -343,7 +343,33 @@ impl WalIngest {
xlog_checkpoint.oldestActiveXid,
self.checkpoint.oldestActiveXid
);
self.checkpoint.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
// A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
// because at shutdown, all in-progress transactions will implicitly
// end. Postgres startup code knows that, and allows hot standby to start
// immediately from a shutdown checkpoint.
//
// In Neon, Postgres hot standby startup always behaves as if starting from
// an online checkpoint. It needs a valid `oldestActiveXid` value, so
// instead of overwriting self.checkpoint.oldestActiveXid with
// InvalidTransactionid from the checkpoint WAL record, update it to a
// proper value, knowing that there are no in-progress transactions at this
// point, except for prepared transactions.
//
// See also the neon code changes in the InitWalRecovery() function.
if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
&& info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
{
let mut oldest_active_xid = self.checkpoint.nextXid.value as u32;
for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
if (xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
oldest_active_xid = xid;
}
}
self.checkpoint.oldestActiveXid = oldest_active_xid;
} else {
self.checkpoint.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
}
// Write a new checkpoint key-value pair on every checkpoint record, even
// if nothing really changed. Not strictly required, but it seems nice to
@@ -375,6 +401,7 @@ impl WalIngest {
if info == pg_constants::XLOG_RUNNING_XACTS {
let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf);
self.checkpoint.oldestActiveXid = xlrec.oldest_running_xid;
self.checkpoint_modified = true;
}
}
pg_constants::RM_REPLORIGIN_ID => {
@@ -1277,13 +1304,10 @@ impl WalIngest {
xlrec.pageno, xlrec.oldest_xid, xlrec.oldest_xid_db
);
// Here we treat oldestXid and oldestXidDB
// differently from postgres redo routines.
// In postgres checkpoint.oldestXid lags behind xlrec.oldest_xid
// until checkpoint happens and updates the value.
// Here we can use the most recent value.
// It's just an optimization, though and can be deleted.
// TODO Figure out if there will be any issues with replica.
// In Postgres, oldestXid and oldestXidDB are updated in memory when the CLOG is
// truncated, but a checkpoint record with the updated values isn't written until
// later. In Neon, a server can start at any LSN, not just on a checkpoint record,
// so we keep the oldestXid and oldestXidDB up-to-date.
self.checkpoint.oldestXid = xlrec.oldest_xid;
self.checkpoint.oldestXidDB = xlrec.oldest_xid_db;
self.checkpoint_modified = true;

View File

@@ -427,12 +427,17 @@ pageserver_connect(shardno_t shard_no, int elevel)
values[n_pgsql_params] = NULL;
shard->conn = PQconnectStartParams(keywords, values, 1);
if (!shard->conn)
if (PQstatus(shard->conn) == CONNECTION_BAD)
{
neon_shard_log(shard_no, elevel, "Failed to connect to pageserver: out of memory");
char *msg = pchomp(PQerrorMessage(shard->conn));
CLEANUP_AND_DISCONNECT(shard);
ereport(elevel,
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
errmsg(NEON_TAG "[shard %d] could not establish connection to pageserver", shard_no),
errdetail_internal("%s", msg)));
pfree(msg);
return false;
}
shard->state = PS_Connecting_Startup;
/* fallthrough */
}

View File

@@ -12,6 +12,8 @@
#include "fmgr.h"
#include "miscadmin.h"
#include "access/subtrans.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "storage/buf_internals.h"
@@ -22,10 +24,12 @@
#include "replication/logical.h"
#include "replication/slot.h"
#include "replication/walsender.h"
#include "storage/proc.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "funcapi.h"
#include "access/htup_details.h"
#include "utils/builtins.h"
#include "utils/pg_lsn.h"
#include "utils/guc.h"
#include "utils/wait_event.h"
@@ -266,6 +270,293 @@ LogicalSlotsMonitorMain(Datum main_arg)
}
}
/*
* XXX: These private to procarray.c, but we need them here.
*/
#define PROCARRAY_MAXPROCS (MaxBackends + max_prepared_xacts)
#define TOTAL_MAX_CACHED_SUBXIDS \
((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS)
/*
* Restore running-xact information by scanning the CLOG at startup.
*
* In PostgreSQL, a standby always has to wait for a running-xacts WAL record
* to arrive before it can start accepting queries. Furthermore, if there are
* transactions with too many subxids (> 64) open to fit in the in-memory
* subxids cache, the running-xacts record will be marked as "suboverflowed",
* and the standby will need to also wait for the currently in-progress
* transactions to finish.
*
* That's not great in PostgreSQL, because a hot standby does not necessary
* open up for queries immediately as you might expect. But it's worse in
* Neon: A standby in Neon doesn't need to start WAL replay from a checkpoint
* record; it can start at any LSN. Postgres arranges things so that there is
* a running-xacts record soon after every checkpoint record, but when you
* start from an arbitrary LSN, that doesn't help. If the primary is idle, or
* not running at all, it might never write a new running-xacts record,
* leaving the replica in a limbo where it can never start accepting queries.
*
* To mitigate that, we have an additional mechanism to find the running-xacts
* information: we scan the CLOG, making note of any XIDs not marked as
* committed or aborted. They are added to the Postgres known-assigned XIDs
* array by calling ProcArrayApplyRecoveryInfo() in the caller of this
* function.
*
* There is one big limitation with that mechanism: The size of the
* known-assigned XIDs is limited, so if there are a lot of in-progress XIDs,
* we have to give up. Furthermore, we don't know how many of the in-progress
* XIDs are subtransactions, and if we use up all the space in the
* known-assigned XIDs array for subtransactions, we might run out of space in
* the array later during WAL replay, causing the replica to shut down with
* "ERROR: too many KnownAssignedXids". The safe # of XIDs that we can add to
* the known-assigned array without risking that error later is very low,
* merely PGPROC_MAX_CACHED_SUBXIDS == 64, so we take our chances and use up
* to half of the known-assigned XIDs array for the subtransactions, even
* though that risks getting the error later.
*
* Note: It's OK if the recovered list of XIDs includes some transactions that
* have crashed in the primary, and hence will never commit. They will be seen
* as in-progress, until we see a new next running-acts record with an
* oldestActiveXid that invalidates them. That's how the known-assigned XIDs
* array always works.
*
* If scraping the CLOG doesn't succeed for some reason, like the subxid
* overflow, Postgres will fall back to waiting for a running-xacts record
* like usual.
*
* Returns true if a complete list of in-progress XIDs was scraped.
*/
static bool
RestoreRunningXactsFromClog(CheckPoint *checkpoint, TransactionId **xids, int *nxids)
{
TransactionId from;
TransactionId till;
int max_xcnt;
TransactionId *prepared_xids = NULL;
int n_prepared_xids;
TransactionId *restored_xids = NULL;
int n_restored_xids;
int next_prepared_idx;
Assert(*xids == NULL);
/*
* If the checkpoint doesn't have a valid oldestActiveXid, bail out. We
* don't know where to start the scan.
*
* This shouldn't happen, because the pageserver always maintains a valid
* oldestActiveXid nowadays. Except when starting at an old point in time
* that was ingested before the pageserver was taught to do that.
*/
if (!TransactionIdIsValid(checkpoint->oldestActiveXid))
{
elog(LOG, "cannot restore running-xacts from CLOG because oldestActiveXid is not set");
goto fail;
}
/*
* We will scan the CLOG starting from the oldest active XID.
*
* In some corner cases, the oldestActiveXid from the last checkpoint
* might already have been truncated from the CLOG. That is,
* oldestActiveXid might be older than oldestXid. That's possible because
* oldestActiveXid is only updated at checkpoints. After the last
* checkpoint, the oldest transaction might have committed, and the CLOG
* might also have been already truncated. So if oldestActiveXid is older
* than oldestXid, start at oldestXid instead. (Otherwise we'd try to
* access CLOG segments that have already been truncated away.)
*/
from = TransactionIdPrecedes(checkpoint->oldestXid, checkpoint->oldestActiveXid)
? checkpoint->oldestActiveXid : checkpoint->oldestXid;
till = XidFromFullTransactionId(checkpoint->nextXid);
/*
* To avoid "too many KnownAssignedXids" error later during replay, we
* limit number of collected transactions. This is a tradeoff: if we are
* willing to consume more of the KnownAssignedXids space for the XIDs
* now, that allows us to start up, but we might run out of space later.
*
* The size of the KnownAssignedXids array is TOTAL_MAX_CACHED_SUBXIDS,
* which is (PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS). In
* PostgreSQL, that's always enough because the primary will always write
* an XLOG_XACT_ASSIGNMENT record if a transaction has more than
* PGPROC_MAX_CACHED_SUBXIDS subtransactions. Seeing that record allows
* the standby to mark the XIDs in pg_subtrans and removing them from the
* KnowingAssignedXids array.
*
* Here, we don't know which XIDs belong to subtransactions that have
* already been WAL-logged with an XLOG_XACT_ASSIGNMENT record. If we
* wanted to be totally safe and avoid the possibility of getting a "too
* many KnownAssignedXids" error later, we would have to limit ourselves
* to PGPROC_MAX_CACHED_SUBXIDS, which is not much. And that includes top
* transaction IDs too, because we cannot distinguish between top
* transaction IDs and subtransactions here.
*
* Somewhat arbitrarily, we use up to half of KnownAssignedXids. That
* strikes a sensible balance between being useful, and risking a "too
* many KnownAssignedXids" error later.
*/
max_xcnt = TOTAL_MAX_CACHED_SUBXIDS / 2;
/*
* Collect XIDs of prepared transactions in an array. This includes only
* their top-level XIDs. We assume that StandbyRecoverPreparedTransactions
* has already been called, so we can find all the sub-transactions in
* pg_subtrans.
*/
PrescanPreparedTransactions(&prepared_xids, &n_prepared_xids);
qsort(prepared_xids, n_prepared_xids, sizeof(TransactionId), xidLogicalComparator);
/*
* Scan the CLOG, collecting in-progress XIDs into 'restored_xids'.
*/
elog(DEBUG1, "scanning CLOG between %u and %u for in-progress XIDs", from, till);
restored_xids = (TransactionId *) palloc(max_xcnt * sizeof(TransactionId));
n_restored_xids = 0;
next_prepared_idx = 0;
for (TransactionId xid = from; xid != till;)
{
XLogRecPtr xidlsn;
XidStatus xidstatus;
xidstatus = TransactionIdGetStatus(xid, &xidlsn);
/*
* "Merge" the prepared transactions into the restored_xids array as
* we go. The prepared transactions array is sorted. This is mostly
* a sanity check to ensure that all the prpeared transactions are
* seen as in-progress. (There is a check after the loop that we didn't
* miss any.)
*/
if (next_prepared_idx < n_prepared_xids && xid == prepared_xids[next_prepared_idx])
{
/*
* This is a top-level transaction ID of a prepared transaction.
* Include it in the array.
*/
/* sanity check */
if (xidstatus != TRANSACTION_STATUS_IN_PROGRESS)
{
elog(LOG, "prepared transaction %u has unexpected status %X, cannot restore running-xacts from CLOG",
xid, xidstatus);
Assert(false);
goto fail;
}
elog(DEBUG1, "XID %u: was next prepared xact (%d / %d)", xid, next_prepared_idx, n_prepared_xids);
next_prepared_idx++;
}
else if (xidstatus == TRANSACTION_STATUS_COMMITTED)
{
elog(DEBUG1, "XID %u: was committed", xid);
goto skip;
}
else if (xidstatus == TRANSACTION_STATUS_ABORTED)
{
elog(DEBUG1, "XID %u: was aborted", xid);
goto skip;
}
else if (xidstatus == TRANSACTION_STATUS_IN_PROGRESS)
{
/*
* In-progress transactions are included in the array.
*
* Except subtransactions of the prepared transactions. They are
* already set in pg_subtrans, and hence don't need to be tracked
* in the known-assigned XIDs array.
*/
if (n_prepared_xids > 0)
{
TransactionId parent = SubTransGetParent(xid);
if (TransactionIdIsValid(parent))
{
/*
* This is a subtransaction belonging to a prepared
* transaction.
*
* Sanity check that it is in the prepared XIDs array. It
* should be, because StandbyRecoverPreparedTransactions
* populated pg_subtrans, and no other XID should be set
* in it yet. (This also relies on the fact that
* StandbyRecoverPreparedTransactions sets the parent of
* each subxid to point directly to the top-level XID,
* rather than restoring the original subtransaction
* hierarchy.)
*/
if (bsearch(&parent, prepared_xids, next_prepared_idx,
sizeof(TransactionId), xidLogicalComparator) == NULL)
{
elog(LOG, "sub-XID %u has unexpected parent %u, cannot restore running-xacts from CLOG",
xid, parent);
Assert(false);
goto fail;
}
elog(DEBUG1, "XID %u: was a subtransaction of prepared xid %u", xid, parent);
goto skip;
}
}
/* include it in the array */
elog(DEBUG1, "XID %u: is in progress", xid);
}
else
{
/*
* SUB_COMMITTED is a transient state used at commit. We don't
* expect to see that here.
*/
elog(LOG, "XID %u has unexpected status %X in pg_xact, cannot restore running-xacts from CLOG",
xid, xidstatus);
Assert(false);
goto fail;
}
if (n_restored_xids >= max_xcnt)
{
/*
* Overflowed. We won't be able to install the RunningTransactions
* snapshot.
*/
elog(LOG, "too many running xacts to restore from the CLOG; oldestXid=%u oldestActiveXid=%u nextXid %u",
checkpoint->oldestXid, checkpoint->oldestActiveXid,
XidFromFullTransactionId(checkpoint->nextXid));
goto fail;
}
restored_xids[n_restored_xids++] = xid;
skip:
TransactionIdAdvance(xid);
continue;
}
/* sanity check */
if (next_prepared_idx != n_prepared_xids)
{
elog(LOG, "prepared transaction ID %u was not visited in the CLOG scan, cannot restore running-xacts from CLOG",
prepared_xids[next_prepared_idx]);
Assert(false);
goto fail;
}
elog(LOG, "restored %d running xacts by scanning the CLOG; oldestXid=%u oldestActiveXid=%u nextXid %u",
n_restored_xids, checkpoint->oldestXid, checkpoint->oldestActiveXid, XidFromFullTransactionId(checkpoint->nextXid));
*nxids = n_restored_xids;
*xids = restored_xids;
return true;
fail:
*nxids = 0;
*xids = NULL;
if (restored_xids)
pfree(restored_xids);
if (prepared_xids)
pfree(prepared_xids);
return false;
}
void
_PG_init(void)
{
@@ -288,6 +579,8 @@ _PG_init(void)
pg_init_extension_server();
restore_running_xacts_callback = RestoreRunningXactsFromClog;
/*
* Important: This must happen after other parts of the extension are
* loaded, otherwise any settings to GUCs that were set before the

View File

@@ -7,7 +7,7 @@ OBJS = \
neontest.o
EXTENSION = neon_test_utils
DATA = neon_test_utils--1.1.sql
DATA = neon_test_utils--1.2.sql
PGFILEDESC = "neon_test_utils - helpers for neon testing and debugging"
PG_CONFIG = pg_config

View File

@@ -41,7 +41,7 @@ RETURNS bytea
AS 'MODULE_PATHNAME', 'get_raw_page_at_lsn_ex'
LANGUAGE C PARALLEL UNSAFE;
CREATE FUNCTION neon_xlogflush(lsn pg_lsn)
CREATE FUNCTION neon_xlogflush(lsn pg_lsn DEFAULT NULL)
RETURNS VOID
AS 'MODULE_PATHNAME', 'neon_xlogflush'
LANGUAGE C PARALLEL UNSAFE;

View File

@@ -1,6 +1,6 @@
# neon_test_utils extension
comment = 'helpers for neon testing and debugging'
default_version = '1.1'
default_version = '1.2'
module_pathname = '$libdir/neon_test_utils'
relocatable = true
trusted = true

View File

@@ -15,6 +15,7 @@
#include "access/relation.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "access/xlog_internal.h"
#include "catalog/namespace.h"
#include "fmgr.h"
#include "funcapi.h"
@@ -444,11 +445,46 @@ get_raw_page_at_lsn_ex(PG_FUNCTION_ARGS)
/*
* Directly calls XLogFlush(lsn) to flush WAL buffers.
*
* If 'lsn' is not specified (is NULL), flush all generated WAL.
*/
Datum
neon_xlogflush(PG_FUNCTION_ARGS)
{
XLogRecPtr lsn = PG_GETARG_LSN(0);
XLogRecPtr lsn;
if (RecoveryInProgress())
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("recovery is in progress"),
errhint("cannot flush WAL during recovery.")));
if (!PG_ARGISNULL(0))
lsn = PG_GETARG_LSN(0);
else
{
lsn = GetXLogInsertRecPtr();
/*---
* The LSN returned by GetXLogInsertRecPtr() is the position where the
* next inserted record would begin. If the last record ended just at
* the page boundary, the next record will begin after the page header
* on the next page, but the next page's page header has not been
* written yet. If we tried to flush it, XLogFlush() would throw an
* error:
*
* ERROR : xlog flush request %X/%X is not satisfied --- flushed only to %X/%X
*
* To avoid that, if the insert position points to just after the page
* header, back off to page boundary.
*/
if (lsn % XLOG_BLCKSZ == SizeOfXLogShortPHD &&
XLogSegmentOffset(lsn, wal_segment_size) > XLOG_BLCKSZ)
lsn -= SizeOfXLogShortPHD;
else if (lsn % XLOG_BLCKSZ == SizeOfXLogLongPHD &&
XLogSegmentOffset(lsn, wal_segment_size) < XLOG_BLCKSZ)
lsn -= SizeOfXLogLongPHD;
}
XLogFlush(lsn);
PG_RETURN_VOID();

View File

@@ -35,6 +35,7 @@ use proxy::usage_metrics;
use anyhow::bail;
use proxy::config::{self, ProxyConfig};
use proxy::serverless;
use remote_storage::RemoteStorageConfig;
use std::net::SocketAddr;
use std::pin::pin;
use std::sync::Arc;
@@ -205,8 +206,8 @@ struct ProxyCliArgs {
/// remote storage configuration for backup metric collection
/// Encoded as toml (same format as pageservers), eg
/// `{bucket_name='the-bucket',bucket_region='us-east-1',prefix_in_bucket='proxy',endpoint='http://minio:9000'}`
#[clap(long, default_value = "{}")]
metric_backup_collection_remote_storage: String,
#[clap(long, value_parser = remote_storage_from_toml)]
metric_backup_collection_remote_storage: Option<RemoteStorageConfig>,
/// chunk size for backup metric collection
/// Size of each event is no more than 400 bytes, so 2**22 is about 200MB before the compression.
#[clap(long, default_value = "4194304")]
@@ -511,9 +512,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
}
let backup_metric_collection_config = config::MetricBackupCollectionConfig {
interval: args.metric_backup_collection_interval,
remote_storage_config: remote_storage_from_toml(
&args.metric_backup_collection_remote_storage,
)?,
remote_storage_config: args.metric_backup_collection_remote_storage.clone(),
chunk_size: args.metric_backup_collection_chunk_size,
};

View File

@@ -53,6 +53,13 @@ impl<C: Cache, V> Cached<C, V> {
)
}
pub fn map<U>(self, f: impl FnOnce(V) -> U) -> Cached<C, U> {
Cached {
token: self.token,
value: f(self.value),
}
}
/// Drop this entry from a cache if it's still there.
pub fn invalidate(self) -> V {
if let Some((cache, info)) = &self.token {

View File

@@ -65,6 +65,8 @@ impl<K: Hash + Eq, V> Cache for TimedLru<K, V> {
struct Entry<T> {
created_at: Instant,
expires_at: Instant,
ttl: Duration,
update_ttl_on_retrieval: bool,
value: T,
}
@@ -122,7 +124,6 @@ impl<K: Hash + Eq, V> TimedLru<K, V> {
Q: Hash + Eq + ?Sized,
{
let now = Instant::now();
let deadline = now.checked_add(self.ttl).expect("time overflow");
// Do costly things before taking the lock.
let mut cache = self.cache.lock();
@@ -142,7 +143,8 @@ impl<K: Hash + Eq, V> TimedLru<K, V> {
let (created_at, expires_at) = (entry.created_at, entry.expires_at);
// Update the deadline and the entry's position in the LRU list.
if self.update_ttl_on_retrieval {
let deadline = now.checked_add(raw_entry.get().ttl).expect("time overflow");
if raw_entry.get().update_ttl_on_retrieval {
raw_entry.get_mut().expires_at = deadline;
}
raw_entry.to_back();
@@ -162,12 +164,27 @@ impl<K: Hash + Eq, V> TimedLru<K, V> {
/// existed, return the previous value and its creation timestamp.
#[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)]
fn insert_raw(&self, key: K, value: V) -> (Instant, Option<V>) {
self.insert_raw_ttl(key, value, self.ttl, self.update_ttl_on_retrieval)
}
/// Insert an entry to the cache. If an entry with the same key already
/// existed, return the previous value and its creation timestamp.
#[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)]
fn insert_raw_ttl(
&self,
key: K,
value: V,
ttl: Duration,
update: bool,
) -> (Instant, Option<V>) {
let created_at = Instant::now();
let expires_at = created_at.checked_add(self.ttl).expect("time overflow");
let expires_at = created_at.checked_add(ttl).expect("time overflow");
let entry = Entry {
created_at,
expires_at,
ttl,
update_ttl_on_retrieval: update,
value,
};
@@ -190,6 +207,21 @@ impl<K: Hash + Eq, V> TimedLru<K, V> {
}
impl<K: Hash + Eq + Clone, V: Clone> TimedLru<K, V> {
pub fn insert_ttl(&self, key: K, value: V, ttl: Duration) {
self.insert_raw_ttl(key, value, ttl, false);
}
pub fn insert_unit(&self, key: K, value: V) -> (Option<V>, Cached<&Self, ()>) {
let (created_at, old) = self.insert_raw(key.clone(), value);
let cached = Cached {
token: Some((self, LookupInfo { created_at, key })),
value: (),
};
(old, cached)
}
pub fn insert(&self, key: K, value: V) -> (Option<V>, Cached<&Self>) {
let (created_at, old) = self.insert_raw(key.clone(), value.clone());

View File

@@ -399,15 +399,11 @@ impl FromStr for EndpointCacheConfig {
#[derive(Debug)]
pub struct MetricBackupCollectionConfig {
pub interval: Duration,
pub remote_storage_config: OptRemoteStorageConfig,
pub remote_storage_config: Option<RemoteStorageConfig>,
pub chunk_size: usize,
}
/// Hack to avoid clap being smarter. If you don't use this type alias, clap assumes more about the optional state and you get
/// runtime type errors from the value parser we use.
pub type OptRemoteStorageConfig = Option<RemoteStorageConfig>;
pub fn remote_storage_from_toml(s: &str) -> anyhow::Result<OptRemoteStorageConfig> {
pub fn remote_storage_from_toml(s: &str) -> anyhow::Result<RemoteStorageConfig> {
RemoteStorageConfig::from_toml(&s.parse()?)
}

View File

@@ -9,7 +9,7 @@ use crate::proxy::retry::CouldRetry;
/// Generic error response with human-readable description.
/// Note that we can't always present it to user as is.
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
pub struct ConsoleError {
pub error: Box<str>,
#[serde(skip)]
@@ -82,41 +82,19 @@ impl CouldRetry for ConsoleError {
.details
.error_info
.map_or(Reason::Unknown, |e| e.reason);
match reason {
// not a transitive error
Reason::RoleProtected => false,
// on retry, it will still not be found
Reason::ResourceNotFound
| Reason::ProjectNotFound
| Reason::EndpointNotFound
| Reason::BranchNotFound => false,
// we were asked to go away
Reason::RateLimitExceeded
| Reason::NonDefaultBranchComputeTimeExceeded
| Reason::ActiveTimeQuotaExceeded
| Reason::ComputeTimeQuotaExceeded
| Reason::WrittenDataQuotaExceeded
| Reason::DataTransferQuotaExceeded
| Reason::LogicalSizeQuotaExceeded => false,
// transitive error. control plane is currently busy
// but might be ready soon
Reason::RunningOperations => true,
Reason::ConcurrencyLimitReached => true,
Reason::LockAlreadyTaken => true,
// unknown error. better not retry it.
Reason::Unknown => false,
}
reason.can_retry()
}
}
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
pub struct Status {
pub code: Box<str>,
pub message: Box<str>,
pub details: Details,
}
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
pub struct Details {
pub error_info: Option<ErrorInfo>,
pub retry_info: Option<RetryInfo>,
@@ -199,6 +177,34 @@ impl Reason {
| Reason::BranchNotFound
)
}
pub fn can_retry(&self) -> bool {
match self {
// do not retry role protected errors
// not a transitive error
Reason::RoleProtected => false,
// on retry, it will still not be found
Reason::ResourceNotFound
| Reason::ProjectNotFound
| Reason::EndpointNotFound
| Reason::BranchNotFound => false,
// we were asked to go away
Reason::RateLimitExceeded
| Reason::NonDefaultBranchComputeTimeExceeded
| Reason::ActiveTimeQuotaExceeded
| Reason::ComputeTimeQuotaExceeded
| Reason::WrittenDataQuotaExceeded
| Reason::DataTransferQuotaExceeded
| Reason::LogicalSizeQuotaExceeded => false,
// transitive error. control plane is currently busy
// but might be ready soon
Reason::RunningOperations
| Reason::ConcurrencyLimitReached
| Reason::LockAlreadyTaken => true,
// unknown error. better not retry it.
Reason::Unknown => false,
}
}
}
#[derive(Copy, Clone, Debug, Deserialize)]
@@ -206,7 +212,7 @@ pub struct RetryInfo {
pub retry_delay_ms: u64,
}
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
pub struct UserFacingMessage {
pub message: Box<str>,
}

View File

@@ -2,7 +2,7 @@
pub mod mock;
pub mod neon;
use super::messages::MetricsAuxInfo;
use super::messages::{ConsoleError, MetricsAuxInfo};
use crate::{
auth::{
backend::{ComputeCredentialKeys, ComputeUserInfo},
@@ -317,8 +317,8 @@ impl NodeInfo {
}
}
pub type NodeInfoCache = TimedLru<EndpointCacheKey, NodeInfo>;
pub type CachedNodeInfo = Cached<&'static NodeInfoCache>;
pub type NodeInfoCache = TimedLru<EndpointCacheKey, Result<NodeInfo, Box<ConsoleError>>>;
pub type CachedNodeInfo = Cached<&'static NodeInfoCache, NodeInfo>;
pub type CachedRoleSecret = Cached<&'static ProjectInfoCacheImpl, Option<AuthSecret>>;
pub type CachedAllowedIps = Cached<&'static ProjectInfoCacheImpl, Arc<Vec<IpPattern>>>;

View File

@@ -9,7 +9,7 @@ use super::{
use crate::{
auth::backend::ComputeUserInfo,
compute,
console::messages::ColdStartInfo,
console::messages::{ColdStartInfo, Reason},
http,
metrics::{CacheOutcome, Metrics},
rate_limiter::EndpointRateLimiter,
@@ -17,10 +17,10 @@ use crate::{
};
use crate::{cache::Cached, context::RequestMonitoring};
use futures::TryFutureExt;
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use tokio::time::Instant;
use tokio_postgres::config::SslMode;
use tracing::{error, info, info_span, warn, Instrument};
use tracing::{debug, error, info, info_span, warn, Instrument};
pub struct Api {
endpoint: http::Endpoint,
@@ -273,26 +273,34 @@ impl super::Api for Api {
) -> Result<CachedNodeInfo, WakeComputeError> {
let key = user_info.endpoint_cache_key();
macro_rules! check_cache {
() => {
if let Some(cached) = self.caches.node_info.get(&key) {
let (cached, info) = cached.take_value();
let info = info.map_err(|c| {
info!(key = &*key, "found cached wake_compute error");
WakeComputeError::ApiError(ApiError::Console(*c))
})?;
debug!(key = &*key, "found cached compute node info");
ctx.set_project(info.aux.clone());
return Ok(cached.map(|()| info));
}
};
}
// Every time we do a wakeup http request, the compute node will stay up
// for some time (highly depends on the console's scale-to-zero policy);
// The connection info remains the same during that period of time,
// which means that we might cache it to reduce the load and latency.
if let Some(cached) = self.caches.node_info.get(&key) {
info!(key = &*key, "found cached compute node info");
ctx.set_project(cached.aux.clone());
return Ok(cached);
}
check_cache!();
let permit = self.locks.get_permit(&key).await?;
// after getting back a permit - it's possible the cache was filled
// double check
if permit.should_check_cache() {
if let Some(cached) = self.caches.node_info.get(&key) {
info!(key = &*key, "found cached compute node info");
ctx.set_project(cached.aux.clone());
return Ok(cached);
}
check_cache!();
}
// check rate limit
@@ -300,23 +308,56 @@ impl super::Api for Api {
.wake_compute_endpoint_rate_limiter
.check(user_info.endpoint.normalize_intern(), 1)
{
info!(key = &*key, "found cached compute node info");
return Err(WakeComputeError::TooManyConnections);
}
let mut node = permit.release_result(self.do_wake_compute(ctx, user_info).await)?;
ctx.set_project(node.aux.clone());
let cold_start_info = node.aux.cold_start_info;
info!("woken up a compute node");
let node = permit.release_result(self.do_wake_compute(ctx, user_info).await);
match node {
Ok(node) => {
ctx.set_project(node.aux.clone());
debug!(key = &*key, "created a cache entry for woken compute node");
// store the cached node as 'warm'
node.aux.cold_start_info = ColdStartInfo::WarmCached;
let (_, mut cached) = self.caches.node_info.insert(key.clone(), node);
cached.aux.cold_start_info = cold_start_info;
let mut stored_node = node.clone();
// store the cached node as 'warm_cached'
stored_node.aux.cold_start_info = ColdStartInfo::WarmCached;
info!(key = &*key, "created a cache entry for compute node info");
let (_, cached) = self.caches.node_info.insert_unit(key, Ok(stored_node));
Ok(cached)
Ok(cached.map(|()| node))
}
Err(err) => match err {
WakeComputeError::ApiError(ApiError::Console(err)) => {
let Some(status) = &err.status else {
return Err(WakeComputeError::ApiError(ApiError::Console(err)));
};
let reason = status
.details
.error_info
.map_or(Reason::Unknown, |x| x.reason);
// if we can retry this error, do not cache it.
if reason.can_retry() {
return Err(WakeComputeError::ApiError(ApiError::Console(err)));
}
// at this point, we should only have quota errors.
debug!(
key = &*key,
"created a cache entry for the wake compute error"
);
self.caches.node_info.insert_ttl(
key,
Err(Box::new(err.clone())),
Duration::from_secs(30),
);
Err(WakeComputeError::ApiError(ApiError::Console(err)))
}
err => return Err(err),
},
}
}
}

View File

@@ -14,17 +14,14 @@ use parquet::{
record::RecordWriter,
};
use pq_proto::StartupMessageParams;
use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
use remote_storage::{GenericRemoteStorage, RemotePath, RemoteStorageConfig, TimeoutOrCancel};
use serde::ser::SerializeMap;
use tokio::{sync::mpsc, time};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, Span};
use utils::backoff;
use crate::{
config::{remote_storage_from_toml, OptRemoteStorageConfig},
context::LOG_CHAN_DISCONNECT,
};
use crate::{config::remote_storage_from_toml, context::LOG_CHAN_DISCONNECT};
use super::{RequestMonitoring, LOG_CHAN};
@@ -33,11 +30,11 @@ pub struct ParquetUploadArgs {
/// Storage location to upload the parquet files to.
/// Encoded as toml (same format as pageservers), eg
/// `{bucket_name='the-bucket',bucket_region='us-east-1',prefix_in_bucket='proxy',endpoint='http://minio:9000'}`
#[clap(long, default_value = "{}", value_parser = remote_storage_from_toml)]
parquet_upload_remote_storage: OptRemoteStorageConfig,
#[clap(long, value_parser = remote_storage_from_toml)]
parquet_upload_remote_storage: Option<RemoteStorageConfig>,
#[clap(long, default_value = "{}", value_parser = remote_storage_from_toml)]
parquet_upload_disconnect_events_remote_storage: OptRemoteStorageConfig,
#[clap(long, value_parser = remote_storage_from_toml)]
parquet_upload_disconnect_events_remote_storage: Option<RemoteStorageConfig>,
/// How many rows to include in a row group
#[clap(long, default_value_t = 8192)]

View File

@@ -540,8 +540,8 @@ fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeIn
},
allow_self_signed_compute: false,
};
let (_, node) = cache.insert("key".into(), node);
node
let (_, node2) = cache.insert_unit("key".into(), Ok(node.clone()));
node2.map(|()| node)
}
fn helper_create_connect_info(

View File

@@ -12,7 +12,6 @@ use sd_notify::NotifyState;
use tokio::runtime::Handle;
use tokio::signal::unix::{signal, SignalKind};
use tokio::task::JoinError;
use toml_edit::Document;
use utils::logging::SecretString;
use std::env::{var, VarError};
@@ -126,7 +125,7 @@ struct Args {
peer_recovery: bool,
/// Remote storage configuration for WAL backup (offloading to s3) as TOML
/// inline table, e.g.
/// {"max_concurrent_syncs" = 17, "max_sync_errors": 13, "bucket_name": "<BUCKETNAME>", "bucket_region":"<REGION>", "concurrency_limit": 119}
/// {max_concurrent_syncs = 17, max_sync_errors = 13, bucket_name = "<BUCKETNAME>", bucket_region = "<REGION>", concurrency_limit = 119}
/// Safekeeper offloads WAL to
/// [prefix_in_bucket/]<tenant_id>/<timeline_id>/<segment_file>, mirroring
/// structure on the file system.
@@ -553,16 +552,8 @@ fn set_id(workdir: &Utf8Path, given_id: Option<NodeId>) -> Result<NodeId> {
Ok(my_id)
}
// Parse RemoteStorage from TOML table.
fn parse_remote_storage(storage_conf: &str) -> anyhow::Result<RemoteStorageConfig> {
// funny toml doesn't consider plain inline table as valid document, so wrap in a key to parse
let storage_conf_toml = format!("remote_storage = {storage_conf}");
let parsed_toml = storage_conf_toml.parse::<Document>()?; // parse
let (_, storage_conf_parsed_toml) = parsed_toml.iter().next().unwrap(); // and strip key off again
RemoteStorageConfig::from_toml(storage_conf_parsed_toml).and_then(|parsed_config| {
// XXX: Don't print the original toml here, there might be some sensitive data
parsed_config.context("Incorrectly parsed remote storage toml as no remote storage config")
})
RemoteStorageConfig::from_toml(&storage_conf.parse()?)
}
#[test]

View File

@@ -144,6 +144,8 @@ PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
"pageserver_smgr_query_seconds_bucket",
"pageserver_smgr_query_seconds_count",
"pageserver_smgr_query_seconds_sum",
"pageserver_archive_size",
"pageserver_pitr_history_size",
"pageserver_storage_operations_seconds_count_total",
"pageserver_storage_operations_seconds_sum_total",
"pageserver_evictions_total",

View File

@@ -1167,7 +1167,9 @@ class NeonEnv:
if config.auth_enabled:
sk_cfg["auth_enabled"] = True
if self.safekeepers_remote_storage is not None:
sk_cfg["remote_storage"] = self.safekeepers_remote_storage.to_toml_inline_table()
sk_cfg[
"remote_storage"
] = self.safekeepers_remote_storage.to_toml_inline_table().strip()
self.safekeepers.append(Safekeeper(env=self, id=id, port=port))
cfg["safekeepers"].append(sk_cfg)
@@ -2111,6 +2113,21 @@ class NeonStorageController(MetricsGetter, LogUtils):
self.running = False
return self
@staticmethod
def retryable_node_operation(op, ps_id, max_attempts, backoff):
while max_attempts > 0:
try:
op(ps_id)
return
except StorageControllerApiException as e:
max_attempts -= 1
log.info(f"Operation failed ({max_attempts} attempts left): {e}")
if max_attempts == 0:
raise e
time.sleep(backoff)
@staticmethod
def raise_api_exception(res: requests.Response):
try:
@@ -2451,6 +2468,38 @@ class NeonStorageController(MetricsGetter, LogUtils):
)
log.info("storage controller passed consistency check")
def poll_node_status(
self, node_id: int, desired_scheduling_policy: str, max_attempts: int, backoff: int
):
"""
Poll the node status until it reaches 'desired_scheduling_policy' or 'max_attempts' have been exhausted
"""
log.info(f"Polling {node_id} for {desired_scheduling_policy} scheduling policy")
while max_attempts > 0:
try:
status = self.node_status(node_id)
policy = status["scheduling"]
if policy == desired_scheduling_policy:
return
else:
max_attempts -= 1
log.info(f"Status call returned {policy=} ({max_attempts} attempts left)")
if max_attempts == 0:
raise AssertionError(
f"Status for {node_id=} did not reach {desired_scheduling_policy=}"
)
time.sleep(backoff)
except StorageControllerApiException as e:
max_attempts -= 1
log.info(f"Status call failed ({max_attempts} retries left): {e}")
if max_attempts == 0:
raise e
time.sleep(backoff)
def configure_failpoints(self, config_strings: Tuple[str, str] | List[Tuple[str, str]]):
if isinstance(config_strings, tuple):
pairs = [config_strings]
@@ -3491,7 +3540,6 @@ class Endpoint(PgProtocol, LogUtils):
):
super().__init__(host="localhost", port=pg_port, user="cloud_admin", dbname="postgres")
self.env = env
self.running = False
self.branch_name: Optional[str] = None # dubious
self.endpoint_id: Optional[str] = None # dubious, see asserts below
self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA
@@ -3857,7 +3905,9 @@ class EndpointFactory:
return self
def new_replica(self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]]):
def new_replica(
self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]] = None
):
branch_name = origin.branch_name
assert origin in self.endpoints
assert branch_name is not None

View File

@@ -198,7 +198,7 @@ def wait_for_last_record_lsn(
lsn: Lsn,
) -> Lsn:
"""waits for pageserver to catch up to a certain lsn, returns the last observed lsn."""
for i in range(100):
for i in range(1000):
current_lsn = last_record_lsn(pageserver_http, tenant, timeline)
if current_lsn >= lsn:
return current_lsn

View File

@@ -1,4 +1,5 @@
import json
import os
from pathlib import Path
from typing import Any, Dict, Tuple
@@ -17,30 +18,74 @@ from performance.pageserver.util import (
setup_pageserver_with_tenants,
)
# The following tests use pagebench "getpage at latest LSN" to characterize the throughput of the pageserver.
# originally there was a single test named `test_pageserver_max_throughput_getpage_at_latest_lsn``
# so you still see some references to this name in the code.
# To avoid recreating the snapshots for each test, we continue to use the name `max_throughput_latest_lsn`
# for some files and metrics.
# For reference, the space usage of the snapshots:
# admin@ip-172-31-13-23:[~/neon-main]: sudo du -hs /instance_store/test_output/shared-snapshots
# 137G /instance_store/test_output/shared-snapshots
# admin@ip-172-31-13-23:[~/neon-main]: sudo du -hs /instance_store/test_output/shared-snapshots/*
# 1.8G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-1-13
# 1.1G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-1-6
# 8.5G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-10-13
# 5.1G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-10-6
# 76G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-100-13
# 46G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-100-6
@pytest.mark.parametrize("duration", [30])
@pytest.mark.parametrize("pgbench_scale", [get_scale_for_db(s) for s in [100, 200]])
@pytest.mark.parametrize("n_tenants", [1, 10])
@pytest.mark.timeout(
10000
) # TODO: this value is just "a really high number"; have this per instance type
def test_pageserver_max_throughput_getpage_at_latest_lsn(
# sudo du -hs /instance_store/neon/test_output/shared-snapshots/*
# 416G /instance_store/neon/test_output/shared-snapshots/max_throughput_latest_lsn-500-13
@pytest.mark.parametrize("duration", [60 * 60])
@pytest.mark.parametrize("pgbench_scale", [get_scale_for_db(200)])
@pytest.mark.parametrize("n_tenants", [500])
@pytest.mark.timeout(10000)
@pytest.mark.skipif(
os.getenv("CI", "false") == "true",
reason="This test needs lot of resources and should run on dedicated HW, not in github action runners as part of CI",
)
def test_pageserver_characterize_throughput_with_n_tenants(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
pg_bin: PgBin,
n_tenants: int,
pgbench_scale: int,
duration: int,
):
setup_and_run_pagebench_benchmark(
neon_env_builder, zenbenchmark, pg_bin, n_tenants, pgbench_scale, duration, 1
)
# For reference, the space usage of the snapshots:
# sudo du -hs /instance_store/neon/test_output/shared-snapshots/*
# 19G /instance_store/neon/test_output/shared-snapshots/max_throughput_latest_lsn-1-136
@pytest.mark.parametrize("duration", [20 * 60])
@pytest.mark.parametrize("pgbench_scale", [get_scale_for_db(2048)])
# we use 1 client to characterize latencies, and 64 clients to characterize throughput/scalability
# we use 64 clients because typically for a high number of connections we recommend the connection pooler
# which by default uses 64 connections
@pytest.mark.parametrize("n_clients", [1, 64])
@pytest.mark.parametrize("n_tenants", [1])
@pytest.mark.timeout(2400)
@pytest.mark.skipif(
os.getenv("CI", "false") == "true",
reason="This test needs lot of resources and should run on dedicated HW, not in github action runners as part of CI",
)
def test_pageserver_characterize_latencies_with_1_client_and_throughput_with_many_clients_one_tenant(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
pg_bin: PgBin,
n_tenants: int,
pgbench_scale: int,
duration: int,
n_clients: int,
):
setup_and_run_pagebench_benchmark(
neon_env_builder, zenbenchmark, pg_bin, n_tenants, pgbench_scale, duration, n_clients
)
def setup_and_run_pagebench_benchmark(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
pg_bin: PgBin,
n_tenants: int,
pgbench_scale: int,
duration: int,
n_clients: int,
):
def record(metric, **kwargs):
zenbenchmark.record(
@@ -55,6 +100,7 @@ def test_pageserver_max_throughput_getpage_at_latest_lsn(
"n_tenants": (n_tenants, {"unit": ""}),
"pgbench_scale": (pgbench_scale, {"unit": ""}),
"duration": (duration, {"unit": "s"}),
"n_clients": (n_clients, {"unit": ""}),
}
)
@@ -96,7 +142,7 @@ def test_pageserver_max_throughput_getpage_at_latest_lsn(
r".*query handler for.*pagestream.*failed: unexpected message: CopyFail during COPY.*"
)
run_benchmark_max_throughput_latest_lsn(env, pg_bin, record, duration)
run_pagebench_benchmark(env, pg_bin, record, duration, n_clients)
def setup_tenant_template(env: NeonEnv, pg_bin: PgBin, scale: int):
@@ -157,8 +203,8 @@ def setup_tenant_template(env: NeonEnv, pg_bin: PgBin, scale: int):
return (template_tenant, template_timeline, config)
def run_benchmark_max_throughput_latest_lsn(
env: NeonEnv, pg_bin: PgBin, record, duration_secs: int
def run_pagebench_benchmark(
env: NeonEnv, pg_bin: PgBin, record, duration_secs: int, n_clients: int
):
"""
Benchmark `env.pageserver` for max throughput @ latest LSN and record results in `zenbenchmark`.
@@ -172,6 +218,8 @@ def run_benchmark_max_throughput_latest_lsn(
ps_http.base_url,
"--page-service-connstring",
env.pageserver.connstr(password=None),
"--num-clients",
str(n_clients),
"--runtime",
f"{duration_secs}s",
# don't specify the targets explicitly, let pagebench auto-discover them

View File

@@ -22,7 +22,7 @@ def ensure_pageserver_ready_for_benchmarking(env: NeonEnv, n_tenants: int):
log.info("wait for all tenants to become active")
wait_until_all_tenants_state(
ps_http, "Active", iterations=n_tenants, period=1, http_error_ok=False
ps_http, "Active", iterations=10 + n_tenants, period=1, http_error_ok=False
)
# ensure all layers are resident for predictiable performance

View File

@@ -1,18 +1,89 @@
import concurrent.futures
import random
import time
from collections import defaultdict
from typing import Any, Dict
import pytest
from fixtures.common_types import TenantId, TenantShardId, TimelineId
from fixtures.compute_reconfigure import ComputeReconfigure
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
)
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pg_version import PgVersion
def get_consistent_node_shard_counts(env: NeonEnv, total_shards) -> defaultdict[str, int]:
"""
Get the number of shards attached to each node.
This function takes into account the intersection of the intent and the observed state.
If they do not match, it asserts out.
"""
tenants = env.storage_controller.tenant_list()
intent = dict()
observed = dict()
tenant_placement: defaultdict[str, Dict[str, Any]] = defaultdict(
lambda: {
"observed": {"attached": None, "secondary": []},
"intent": {"attached": None, "secondary": []},
}
)
for t in tenants:
for node_id, loc_state in t["observed"]["locations"].items():
if (
loc_state is not None
and "conf" in loc_state
and loc_state["conf"] is not None
and loc_state["conf"]["mode"]
in set(["AttachedSingle", "AttachedMulti", "AttachedStale"])
):
observed[t["tenant_shard_id"]] = int(node_id)
tenant_placement[t["tenant_shard_id"]]["observed"]["attached"] = int(node_id)
if (
loc_state is not None
and "conf" in loc_state
and loc_state["conf"] is not None
and loc_state["conf"]["mode"] == "Secondary"
):
tenant_placement[t["tenant_shard_id"]]["observed"]["secondary"].append(int(node_id))
if "attached" in t["intent"]:
intent[t["tenant_shard_id"]] = t["intent"]["attached"]
tenant_placement[t["tenant_shard_id"]]["intent"]["attached"] = t["intent"]["attached"]
if "secondary" in t["intent"]:
tenant_placement[t["tenant_shard_id"]]["intent"]["secondary"] += t["intent"][
"secondary"
]
log.info(f"{tenant_placement=}")
matching = {
tid: intent[tid] for tid in observed if tid in intent and intent[tid] == observed[tid]
}
assert len(matching) == total_shards
attached_per_node: defaultdict[str, int] = defaultdict(int)
for node_id in matching.values():
attached_per_node[node_id] += 1
return attached_per_node
def assert_consistent_balanced_attachments(env: NeonEnv, total_shards):
attached_per_node = get_consistent_node_shard_counts(env, total_shards)
min_shard_count = min(attached_per_node.values())
max_shard_count = max(attached_per_node.values())
flake_factor = 5 / 100
assert max_shard_count - min_shard_count <= int(total_shards * flake_factor)
@pytest.mark.timeout(3600) # super long running test: should go down as we optimize
def test_storage_controller_many_tenants(
neon_env_builder: NeonEnvBuilder, compute_reconfigure_listener: ComputeReconfigure
@@ -44,7 +115,8 @@ def test_storage_controller_many_tenants(
# A small sleep on each call into the notify hook, to simulate the latency of doing a database write
compute_reconfigure_listener.register_on_notify(lambda body: time.sleep(0.01))
env = neon_env_builder.init_start()
env = neon_env_builder.init_configs()
neon_env_builder.start()
# We will intentionally stress reconciler concurrrency, which triggers a warning when lots
# of shards are hitting the delayed path.
@@ -79,6 +151,8 @@ def test_storage_controller_many_tenants(
shard_count = 2
stripe_size = 1024
total_shards = tenant_count * shard_count
tenants = set(TenantId.generate() for _i in range(0, tenant_count))
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
@@ -195,10 +269,44 @@ def test_storage_controller_many_tenants(
env.storage_controller.consistency_check()
check_memory()
# Restart pageservers: this exercises the /re-attach API
for pageserver in env.pageservers:
pageserver.stop()
pageserver.start()
shard_counts = get_consistent_node_shard_counts(env, total_shards)
log.info(f"Shard counts before rolling restart: {shard_counts}")
assert_consistent_balanced_attachments(env, total_shards)
# Restart pageservers gracefully: this exercises the /re-attach pageserver API
# and the storage controller drain and fill API
for ps in env.pageservers:
env.storage_controller.retryable_node_operation(
lambda ps_id: env.storage_controller.node_drain(ps_id), ps.id, max_attempts=3, backoff=2
)
env.storage_controller.poll_node_status(
ps.id, "PauseForRestart", max_attempts=24, backoff=5
)
shard_counts = get_consistent_node_shard_counts(env, total_shards)
log.info(f"Shard counts after draining node {ps.id}: {shard_counts}")
# Assert that we've drained the node
assert shard_counts[str(ps.id)] == 0
# Assert that those shards actually went somewhere
assert sum(shard_counts.values()) == total_shards
ps.restart()
env.storage_controller.poll_node_status(ps.id, "Active", max_attempts=24, backoff=1)
env.storage_controller.retryable_node_operation(
lambda ps_id: env.storage_controller.node_fill(ps_id), ps.id, max_attempts=3, backoff=2
)
env.storage_controller.poll_node_status(ps.id, "Active", max_attempts=24, backoff=5)
shard_counts = get_consistent_node_shard_counts(env, total_shards)
log.info(f"Shard counts after filling node {ps.id}: {shard_counts}")
assert_consistent_balanced_attachments(env, total_shards)
env.storage_controller.reconcile_until_idle()
env.storage_controller.consistency_check()
# Consistency check is safe here: restarting pageservers should not have caused any Reconcilers to spawn,
# as they were not offline long enough to trigger any scheduling changes.

View File

@@ -211,7 +211,7 @@ def test_auth_failures(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
def check_pageserver(expect_success: bool, **conn_kwargs):
check_connection(
env.pageserver,
f"get_last_record_rlsn {env.initial_tenant} {timeline_id}",
f"show {env.initial_tenant}",
expect_success,
**conn_kwargs,
)

View File

@@ -4,7 +4,6 @@ from random import choice
from string import ascii_lowercase
import pytest
from fixtures.common_types import Lsn
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
AuxFileStore,
@@ -13,7 +12,7 @@ from fixtures.neon_fixtures import (
logical_replication_sync,
wait_for_last_flush_lsn,
)
from fixtures.utils import query_scalar, wait_until
from fixtures.utils import wait_until
def random_string(n: int):
@@ -326,12 +325,17 @@ FROM generate_series(1, 16384) AS seq; -- Inserts enough rows to exceed 16MB of
assert "could not receive data from WAL stream" not in logs
# Test compute start at LSN page of which starts with contrecord
# https://github.com/neondatabase/neon/issues/5749
# Test replication of WAL record spanning page boundary (with contrecord) after
# compute restart and WAL write of the page.
#
# See https://github.com/neondatabase/neon/issues/5749
#
# Most pages start with a contrecord, so we don't do anything special
# to ensure that.
@pytest.mark.parametrize(
"pageserver_aux_file_policy", [AuxFileStore.V1, AuxFileStore.CrossValidation]
)
def test_wal_page_boundary_start(neon_simple_env: NeonEnv, vanilla_pg):
def test_restart_endpoint(neon_simple_env: NeonEnv, vanilla_pg):
env = neon_simple_env
env.neon_cli.create_branch("init")
@@ -356,52 +360,6 @@ def test_wal_page_boundary_start(neon_simple_env: NeonEnv, vanilla_pg):
logical_replication_sync(vanilla_pg, endpoint)
vanilla_pg.stop()
with endpoint.cursor() as cur:
# measure how much space logical message takes. Sometimes first attempt
# creates huge message and then it stabilizes, have no idea why.
for _ in range(3):
lsn_before = Lsn(query_scalar(cur, "select pg_current_wal_lsn()"))
log.info(f"current_lsn={lsn_before}")
# Non-transactional logical message doesn't write WAL, only XLogInsert's
# it, so use transactional. Which is a bit problematic as transactional
# necessitates commit record. Alternatively we can do smth like
# select neon_xlogflush(pg_current_wal_insert_lsn());
# but isn't much better + that particular call complains on 'xlog flush
# request 0/282C018 is not satisfied' as pg_current_wal_insert_lsn skips
# page headers.
payload = "blahblah"
cur.execute(f"select pg_logical_emit_message(true, 'pref', '{payload}')")
lsn_after_by_curr_wal_lsn = Lsn(query_scalar(cur, "select pg_current_wal_lsn()"))
lsn_diff = lsn_after_by_curr_wal_lsn - lsn_before
logical_message_base = lsn_after_by_curr_wal_lsn - lsn_before - len(payload)
log.info(
f"before {lsn_before}, after {lsn_after_by_curr_wal_lsn}, lsn diff is {lsn_diff}, base {logical_message_base}"
)
# and write logical message spanning exactly as we want
lsn_before = Lsn(query_scalar(cur, "select pg_current_wal_lsn()"))
log.info(f"current_lsn={lsn_before}")
curr_lsn = Lsn(query_scalar(cur, "select pg_current_wal_lsn()"))
offs = int(curr_lsn) % 8192
till_page = 8192 - offs
payload_len = (
till_page - logical_message_base - 8
) # not sure why 8 is here, it is deduced from experiments
log.info(f"current_lsn={curr_lsn}, offs {offs}, till_page {till_page}")
# payload_len above would go exactly till the page boundary; but we want contrecord, so make it slightly longer
payload_len += 8
cur.execute(f"select pg_logical_emit_message(true, 'pref', 'f{'a' * payload_len}')")
supposedly_contrecord_end = Lsn(query_scalar(cur, "select pg_current_wal_lsn()"))
log.info(f"supposedly_page_boundary={supposedly_contrecord_end}")
# The calculations to hit the page boundary are very fuzzy, so just
# ignore test if we fail to reach it.
if not (int(supposedly_contrecord_end) % 8192 == 32):
pytest.skip("missed page boundary, bad luck")
cur.execute("insert into replication_example values (2, 3)")
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
endpoint.stop().start()

View File

@@ -2,6 +2,7 @@ import threading
import time
from contextlib import closing
import psycopg2.errors
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, PgBin
@@ -40,3 +41,26 @@ def test_pageserver_reconnect(neon_simple_env: NeonEnv, pg_bin: PgBin):
c.execute("select pg_reload_conf()")
thread.join()
# Test handling errors during page server reconnect
def test_pageserver_reconnect_failure(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_branch("test_pageserver_reconnect")
endpoint = env.endpoints.create_start("test_pageserver_reconnect")
con = endpoint.connect()
cur = con.cursor()
cur.execute("set statement_timeout='2s'")
cur.execute("SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'")
connstring = cur.fetchall()[0][0]
cur.execute(
f"alter system set neon.pageserver_connstring='{connstring}?some_invalid_param=xyz'"
)
cur.execute("select pg_reload_conf()")
try:
cur.execute("select count(*) from pg_class")
except psycopg2.errors.QueryCanceled:
log.info("Connection to PS failed")
assert not endpoint.log_contains("ERROR: cannot wait on socket event without a socket.*")

View File

@@ -8,8 +8,11 @@ from typing import TYPE_CHECKING, cast
import pytest
from fixtures.neon_fixtures import (
Endpoint,
NeonEnv,
NeonEnvBuilder,
check_restored_datadir_content,
tenant_get_shards,
)
from fixtures.pg_version import PgVersion
from fixtures.remote_storage import s3_storage
@@ -21,6 +24,97 @@ if TYPE_CHECKING:
from pytest import CaptureFixture
TENANT_CONF = {
# Scaled down thresholds so that we are exercising the pageserver beyond just writing
# ephemeral/L0 layers, and because debug-mode code is slow to read from full sized ephemeral layer files.
"pitr_interval": "60s",
"checkpoint_distance": f"{8 * 1024 * 1024}",
"compaction_target_size": f"{8 * 1024 * 1024}",
}
# # Ensure that compaction works, on a timeline containing all the diversity that postgres regression tests create.
# # There should have been compactions mid-test as well, this final check is in addition those.
# for (shard, pageserver) in tenant_get_shards(env, env.initial_tenant):
# pageserver.http_client().timeline_checkpoint(env.initial_tenant, env.initial_timeline, force_repartition=True, force_image_layer_creation=True)
def post_checks(env: NeonEnv, test_output_dir: Path, db_name: str, endpoint: Endpoint):
"""
After running some opaque tests that create interesting content in a timeline, run
some generic integrity checks that the storage stack is able to reproduce the written
data properly.
"""
ignored_files: Optional[list[str]] = None
# Neon handles unlogged relations in a special manner. During a
# basebackup, we ship the init fork as the main fork. This presents a
# problem in that the endpoint's data directory and the basebackup will
# have differences and will fail the eventual file comparison.
#
# Unlogged tables were introduced in version 9.1. ALTER TABLE grew
# support for setting the persistence of a table in 9.5. The reason that
# this doesn't affect versions < 15 (but probably would between 9.1 and
# 9.5) is that all the regression tests that deal with unlogged tables
# up until that point dropped the unlogged tables or set them to logged
# at some point during the test.
#
# In version 15, Postgres grew support for unlogged sequences, and with
# that came a few more regression tests. These tests did not all drop
# the unlogged tables/sequences prior to finishing.
#
# But unlogged sequences came with a bug in that, sequences didn't
# inherit the persistence of their "parent" tables if they had one. This
# was fixed and backported to 15, thus exacerbating our problem a bit.
#
# So what we can do is just ignore file differences between the data
# directory and basebackup for unlogged relations.
results = cast(
"list[tuple[str, str]]",
endpoint.safe_psql(
"""
SELECT
relkind,
pg_relation_filepath(
pg_filenode_relation(reltablespace, relfilenode)
) AS unlogged_relation_paths
FROM pg_class
WHERE relpersistence = 'u'
""",
dbname=db_name,
),
)
unlogged_relation_files: list[str] = []
for r in results:
unlogged_relation_files.append(r[1])
# This is related to the following Postgres commit:
#
# commit ccadf73163ca88bdaa74b8223d4dde05d17f550b
# Author: Heikki Linnakangas <heikki.linnakangas@iki.fi>
# Date: 2023-08-23 09:21:31 -0500
#
# Use the buffer cache when initializing an unlogged index.
#
# This patch was backpatched to 16. Without it, the LSN in the
# page header would be 0/0 in the data directory, which wouldn't
# match the LSN generated during the basebackup, thus creating
# a difference.
if env.pg_version <= PgVersion.V15 and r[0] == "i":
unlogged_relation_files.append(f"{r[1]}_init")
ignored_files = unlogged_relation_files
check_restored_datadir_content(test_output_dir, env, endpoint, ignored_files=ignored_files)
# Ensure that compaction works, on a timeline containing all the diversity that postgres regression tests create.
# There should have been compactions mid-test as well, this final check is in addition those.
for shard, pageserver in tenant_get_shards(env, env.initial_tenant):
pageserver.http_client().timeline_checkpoint(
shard, env.initial_timeline, force_repartition=True, force_image_layer_creation=True
)
# Run the main PostgreSQL regression tests, in src/test/regress.
#
@pytest.mark.timeout(600)
@@ -45,7 +139,10 @@ def test_pg_regress(
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.enable_scrub_on_exit()
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
env = neon_env_builder.init_start(
initial_tenant_conf=TENANT_CONF,
initial_tenant_shard_count=shard_count,
)
# Connect to postgres and create a database called "regression".
endpoint = env.endpoints.create_start("main")
@@ -84,67 +181,7 @@ def test_pg_regress(
with capsys.disabled():
pg_bin.run(pg_regress_command, env=env_vars, cwd=runpath)
ignored_files: Optional[list[str]] = None
# Neon handles unlogged relations in a special manner. During a
# basebackup, we ship the init fork as the main fork. This presents a
# problem in that the endpoint's data directory and the basebackup will
# have differences and will fail the eventual file comparison.
#
# Unlogged tables were introduced in version 9.1. ALTER TABLE grew
# support for setting the persistence of a table in 9.5. The reason that
# this doesn't affect versions < 15 (but probably would between 9.1 and
# 9.5) is that all the regression tests that deal with unlogged tables
# up until that point dropped the unlogged tables or set them to logged
# at some point during the test.
#
# In version 15, Postgres grew support for unlogged sequences, and with
# that came a few more regression tests. These tests did not all drop
# the unlogged tables/sequences prior to finishing.
#
# But unlogged sequences came with a bug in that, sequences didn't
# inherit the persistence of their "parent" tables if they had one. This
# was fixed and backported to 15, thus exacerbating our problem a bit.
#
# So what we can do is just ignore file differences between the data
# directory and basebackup for unlogged relations.
results = cast(
"list[tuple[str, str]]",
endpoint.safe_psql(
"""
SELECT
relkind,
pg_relation_filepath(
pg_filenode_relation(reltablespace, relfilenode)
) AS unlogged_relation_paths
FROM pg_class
WHERE relpersistence = 'u'
""",
dbname=DBNAME,
),
)
unlogged_relation_files: list[str] = []
for r in results:
unlogged_relation_files.append(r[1])
# This is related to the following Postgres commit:
#
# commit ccadf73163ca88bdaa74b8223d4dde05d17f550b
# Author: Heikki Linnakangas <heikki.linnakangas@iki.fi>
# Date: 2023-08-23 09:21:31 -0500
#
# Use the buffer cache when initializing an unlogged index.
#
# This patch was backpatched to 16. Without it, the LSN in the
# page header would be 0/0 in the data directory, which wouldn't
# match the LSN generated during the basebackup, thus creating
# a difference.
if env.pg_version <= PgVersion.V15 and r[0] == "i":
unlogged_relation_files.append(f"{r[1]}_init")
ignored_files = unlogged_relation_files
check_restored_datadir_content(test_output_dir, env, endpoint, ignored_files=ignored_files)
post_checks(env, test_output_dir, DBNAME, endpoint)
# Run the PostgreSQL "isolation" tests, in src/test/isolation.
@@ -159,16 +196,20 @@ def test_isolation(
pg_distrib_dir: Path,
shard_count: Optional[int],
):
DBNAME = "isolation_regression"
if shard_count is not None:
neon_env_builder.num_pageservers = shard_count
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.enable_scrub_on_exit()
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
env = neon_env_builder.init_start(
initial_tenant_conf=TENANT_CONF, initial_tenant_shard_count=shard_count
)
# Connect to postgres and create a database called "regression".
# isolation tests use prepared transactions, so enable them
endpoint = env.endpoints.create_start("main", config_lines=["max_prepared_transactions=100"])
endpoint.safe_psql("CREATE DATABASE isolation_regression")
endpoint.safe_psql(f"CREATE DATABASE {DBNAME}")
# Create some local directories for pg_isolation_regress to run in.
runpath = test_output_dir / "regress"
@@ -202,6 +243,9 @@ def test_isolation(
with capsys.disabled():
pg_bin.run(pg_isolation_regress_command, env=env_vars, cwd=runpath)
# This fails with a mismatch on `pg_multixact/offsets/0000`
# post_checks(env, test_output_dir, DBNAME, endpoint)
# Run extra Neon-specific pg_regress-based tests. The tests and their
# schedule file are in the sql_regress/ directory.
@@ -215,15 +259,19 @@ def test_sql_regress(
pg_distrib_dir: Path,
shard_count: Optional[int],
):
DBNAME = "regression"
if shard_count is not None:
neon_env_builder.num_pageservers = shard_count
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.enable_scrub_on_exit()
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
env = neon_env_builder.init_start(
initial_tenant_conf=TENANT_CONF, initial_tenant_shard_count=shard_count
)
# Connect to postgres and create a database called "regression".
endpoint = env.endpoints.create_start("main")
endpoint.safe_psql("CREATE DATABASE regression")
endpoint.safe_psql(f"CREATE DATABASE {DBNAME}")
# Create some local directories for pg_regress to run in.
runpath = test_output_dir / "regress"
@@ -258,4 +306,4 @@ def test_sql_regress(
with capsys.disabled():
pg_bin.run(pg_regress_command, env=env_vars, cwd=runpath)
check_restored_datadir_content(test_output_dir, env, endpoint)
post_checks(env, test_output_dir, DBNAME, endpoint)

View File

@@ -0,0 +1,646 @@
"""
In PostgreSQL, a standby always has to wait for a running-xacts WAL record to
arrive before it can start accepting queries. Furthermore, if there are
transactions with too many subxids (> 64) open to fit in the in-memory subxids
cache, the running-xacts record will be marked as "suboverflowed", and the
standby will need to also wait for the currently in-progress transactions to
finish.
In Neon, we have an additional mechanism that scans the CLOG at server startup
to determine the list of running transactions, so that the standby can start up
immediately without waiting for the running-xacts record, but that mechanism
only works if the # of active (sub-)transactions is reasonably small. Otherwise
it falls back to waiting. Furthermore, it's somewhat optimistic in using up the
known-assigned XIDs array: if too many transactions with subxids are started in
the primary later, the replay in the replica will crash with "too many
KnownAssignedXids" error.
This module contains tests for those various cases at standby startup: starting
from shutdown checkpoint, using the CLOG scanning mechanism, waiting for
running-xacts record and for in-progress transactions to finish etc.
"""
import threading
from contextlib import closing
import psycopg2
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, wait_for_last_flush_lsn, wait_replica_caughtup
from fixtures.pg_version import PgVersion
from fixtures.utils import query_scalar, wait_until
CREATE_SUBXACTS_FUNC = """
create or replace function create_subxacts(n integer) returns void as $$
declare
i integer;
begin
for i in 1..n loop
begin
insert into t (payload) values (0);
exception
when others then
raise exception 'caught something: %', sqlerrm;
end;
end loop;
end; $$ language plpgsql
"""
def test_replica_start_scan_clog(neon_simple_env: NeonEnv):
"""
Test the CLOG-scanning mechanism at hot standby startup. There is one
transaction active in the primary when the standby is started. The primary
is killed before it has a chance to write a running-xacts record. The
CLOG-scanning at neon startup allows the standby to start up anyway.
See the module docstring for background.
"""
# Initialize the primary, a test table, and a helper function to create lots
# of subtransactions.
env = neon_simple_env
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
primary_cur.execute("CREATE EXTENSION neon_test_utils")
primary_cur.execute("create table t(pk serial primary key, payload integer)")
primary_cur.execute(CREATE_SUBXACTS_FUNC)
primary_cur.execute("select pg_switch_wal()")
# Start a transaction in the primary. Leave the transaction open.
#
# The transaction has some subtransactions, but not too many to cause the
# CLOG-scanning mechanism to give up.
primary_cur.execute("begin")
primary_cur.execute("select create_subxacts(50)")
# Wait for the WAL to be flushed, but then immediately kill the primary,
# before it has a chance to generate a running-xacts record.
primary_cur.execute("select neon_xlogflush()")
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
primary.stop(mode="immediate")
# Create a replica. It should start up normally, thanks to the CLOG-scanning
# mechanism.
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
# The transaction did not commit, so it should not be visible in the secondary
secondary_conn = secondary.connect()
secondary_cur = secondary_conn.cursor()
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (0,)
def test_replica_start_scan_clog_crashed_xids(neon_simple_env: NeonEnv):
"""
Test the CLOG-scanning mechanism at hot standby startup, after
leaving behind crashed transactions.
See the module docstring for background.
"""
# Initialize the primary, a test table, and a helper function to create lots
# of subtransactions.
env = neon_simple_env
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
primary_cur.execute("create table t(pk serial primary key, payload integer)")
primary_cur.execute(CREATE_SUBXACTS_FUNC)
primary_cur.execute("select pg_switch_wal()")
# Consume a lot of XIDs, then kill Postgres without giving it a
# chance to write abort records for them.
primary_cur.execute("begin")
primary_cur.execute("select create_subxacts(100000)")
primary.stop(mode="immediate")
# Restart the primary. Do some light work, and shut it down cleanly
primary.start()
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
primary_cur.execute("insert into t (payload) values (0)")
primary.stop(mode="fast")
# Create a replica. It should start up normally, thanks to the CLOG-scanning
# mechanism. (Restarting the primary writes a checkpoint and/or running-xacts
# record, which allows the standby to know that the crashed XIDs are aborted)
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
secondary_conn = secondary.connect()
secondary_cur = secondary_conn.cursor()
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (1,)
def test_replica_start_at_running_xacts(neon_simple_env: NeonEnv, pg_version):
"""
Test that starting a replica works right after the primary has
created a running-xacts record. This may seem like a trivial case,
but during development, we had a bug that was triggered by having
oldestActiveXid == nextXid. Starting right after a running-xacts
record is one way to test that case.
See the module docstring for background.
"""
env = neon_simple_env
if env.pg_version == PgVersion.V14 or env.pg_version == PgVersion.V15:
pytest.skip("pg_log_standby_snapshot() function is available only in PG16")
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
primary_cur.execute("CREATE EXTENSION neon_test_utils")
primary_cur.execute("select pg_log_standby_snapshot()")
primary_cur.execute("select neon_xlogflush()")
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
secondary_conn = secondary.connect()
secondary_cur = secondary_conn.cursor()
secondary_cur.execute("select 123")
assert secondary_cur.fetchone() == (123,)
def test_replica_start_wait_subxids_finish(neon_simple_env: NeonEnv):
"""
Test replica startup when there are a lot of (sub)transactions active in the
primary. That's too many for the CLOG-scanning mechanism to handle, so the
replica has to wait for the large transaction to finish before it starts to
accept queries.
After replica startup, test MVCC with transactions that were in-progress
when the replica was started.
See the module docstring for background.
"""
# Initialize the primary, a test table, and a helper function to create
# lots of subtransactions.
env = neon_simple_env
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
primary_cur.execute("create table t(pk serial primary key, payload integer)")
primary_cur.execute(CREATE_SUBXACTS_FUNC)
# Start a transaction with 100000 subtransactions, and leave it open. That's
# too many to fit in the "known-assigned XIDs array" in the replica, and
# also too many to fit in the subxid caches so the running-xacts record will
# also overflow.
primary_cur.execute("begin")
primary_cur.execute("select create_subxacts(100000)")
# Start another, smaller transaction in the primary. We'll come back to this
# later.
primary_conn2 = primary.connect()
primary_cur2 = primary_conn2.cursor()
primary_cur2.execute("begin")
primary_cur2.execute("insert into t (payload) values (0)")
# Create a replica. but before that, wait for the wal to be flushed to
# safekeepers, so that the replica is started at a point where the large
# transaction is already active. (The whole transaction might not be flushed
# yet, but that's OK.)
#
# Start it in a separate thread, so that we can do other stuff while it's
# blocked waiting for the startup to finish.
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
secondary = env.endpoints.new_replica(origin=primary, endpoint_id="secondary")
start_secondary_thread = threading.Thread(target=secondary.start)
start_secondary_thread.start()
# Verify that the replica has otherwise started up, but cannot start
# accepting queries yet.
log.info("Waiting 5 s to verify that the secondary does not start")
start_secondary_thread.join(5)
assert secondary.log_contains("consistent recovery state reached")
assert secondary.log_contains("started streaming WAL from primary")
# The "redo starts" message is printed when the first WAL record is
# received. It might or might not be present in the log depending on how
# far exactly the WAL was flushed when the replica was started, and whether
# background activity caused any more WAL records to be flushed on the
# primary afterwards.
#
# assert secondary.log_contains("redo # starts")
# should not be open for connections yet
assert start_secondary_thread.is_alive()
assert not secondary.is_running()
assert not secondary.log_contains("database system is ready to accept read-only connections")
# Commit the large transaction in the primary.
#
# Within the next 15 s, the primary should write a new running-xacts record
# to the WAL which shows the transaction as completed. Once the replica
# replays that record, it will start accepting queries.
primary_cur.execute("commit")
start_secondary_thread.join()
# Verify that the large transaction is correctly visible in the secondary
# (but not the second, small transaction, which is still in-progress!)
secondary_conn = secondary.connect()
secondary_cur = secondary_conn.cursor()
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (100000,)
# Perform some more MVCC testing using the second transaction that was
# started in the primary before the replica was created
primary_cur2.execute("select create_subxacts(10000)")
# The second transaction still hasn't committed
wait_replica_caughtup(primary, secondary)
secondary_cur.execute("BEGIN ISOLATION LEVEL REPEATABLE READ")
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (100000,)
# Commit the second transaction in the primary
primary_cur2.execute("commit")
# Should still be invisible to the old snapshot
wait_replica_caughtup(primary, secondary)
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (100000,)
# Commit the REPEATABLE READ transaction in the replica. Both
# primary transactions should now be visible to a new snapshot.
secondary_cur.execute("commit")
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (110001,)
def test_replica_too_many_known_assigned_xids(neon_simple_env: NeonEnv):
"""
The CLOG-scanning mechanism fills the known-assigned XIDs array
optimistically at standby startup, betting that it can still fit
upcoming transactions replayed later from the WAL in the
array. This test tests what happens when that bet fails and the
known-assigned XID array fills up after the standby has already
been started. The WAL redo will fail with an error:
FATAL: too many KnownAssignedXids
CONTEXT: WAL redo at 0/1895CB0 for neon/INSERT: off: 25, flags: 0x08; blkref #0: rel 1663/5/16385, blk 64
which causes the standby to shut down.
See the module docstring for background.
"""
# Initialize the primary, a test table, and a helper function to create lots
# of subtransactions.
env = neon_simple_env
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
primary_cur.execute("CREATE EXTENSION neon_test_utils")
primary_cur.execute("create table t(pk serial primary key, payload integer)")
primary_cur.execute(CREATE_SUBXACTS_FUNC)
# Determine how many connections we can use
primary_cur.execute("show max_connections")
max_connections = int(primary_cur.fetchall()[0][0])
primary_cur.execute("show superuser_reserved_connections")
superuser_reserved_connections = int(primary_cur.fetchall()[0][0])
n_connections = max_connections - superuser_reserved_connections
n_subxids = 200
# Start one top transaction in primary, with lots of subtransactions. This
# uses up much of the known-assigned XIDs space in the standby, but doesn't
# cause it to overflow.
large_p_conn = primary.connect()
large_p_cur = large_p_conn.cursor()
large_p_cur.execute("begin")
large_p_cur.execute(f"select create_subxacts({max_connections} * 30)")
with closing(primary.connect()) as small_p_conn:
with small_p_conn.cursor() as small_p_cur:
small_p_cur.execute("select create_subxacts(1)")
# Create a replica at this LSN
primary_cur.execute("select neon_xlogflush()")
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
secondary_conn = secondary.connect()
secondary_cur = secondary_conn.cursor()
# The transaction in primary has not committed yet.
wait_replica_caughtup(primary, secondary)
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (1,)
# Start max number of top transactions in primary, with a lot of
# subtransactions each. We add the subtransactions to each top transaction
# in a round-robin fashion, instead of adding a lot of subtransactions to
# one top transaction at a time. This way, we will have the max number of
# subtransactions in the in-memory subxid cache of each top transaction,
# until they all overflow.
#
# Currently, PGPROC_MAX_CACHED_SUBXIDS == 64, so this will overflow the all
# the subxid caches after creating 64 subxids in each top transaction. The
# point just before the caches have overflowed is the most interesting point
# in time, but we'll keep going beyond that, to ensure that this test is
# robust even if PGPROC_MAX_CACHED_SUBXIDS changes.
p_curs = []
for _ in range(0, n_connections):
p_cur = primary.connect().cursor()
p_cur.execute("begin")
p_curs.append(p_cur)
for _subxid in range(0, n_subxids):
for i in range(0, n_connections):
p_curs[i].execute("select create_subxacts(1)")
# Commit all the transactions in the primary
for i in range(0, n_connections):
p_curs[i].execute("commit")
large_p_cur.execute("commit")
# Wait until the replica crashes with "too many KnownAssignedXids" error.
def check_replica_crashed():
try:
secondary.connect()
except psycopg2.Error:
# Once the connection fails, return success
return None
raise RuntimeError("connection succeeded")
wait_until(20, 0.5, check_replica_crashed)
assert secondary.log_contains("too many KnownAssignedXids")
# Replica is crashed, so ignore stop result
secondary.check_stop_result = False
def test_replica_start_repro_visibility_bug(neon_simple_env: NeonEnv):
"""
Before PR #7288, a hot standby in neon incorrectly started up
immediately, before it had received a running-xacts record. That
led to visibility bugs if there were active transactions in the
primary. This test reproduces the incorrect query results and
incorrectly set hint bits, before that was fixed.
"""
env = neon_simple_env
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
p_cur = primary.connect().cursor()
p_cur.execute("begin")
p_cur.execute("create table t(pk integer primary key, payload integer)")
p_cur.execute("insert into t values (generate_series(1,100000), 0)")
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
wait_replica_caughtup(primary, secondary)
s_cur = secondary.connect().cursor()
# Set hint bits for pg_class tuples. If primary's transaction is
# not marked as in-progress in MVCC snapshot, then XMIN_INVALID
# hint bit will be set for table's 't' tuple, making it invisible
# even after the commit record is replayed later.
s_cur.execute("select * from pg_class")
p_cur.execute("commit")
wait_replica_caughtup(primary, secondary)
s_cur.execute("select * from t where pk = 1")
assert s_cur.fetchone() == (1, 0)
@pytest.mark.parametrize("shutdown", [True, False])
def test_replica_start_with_prepared_xacts(neon_simple_env: NeonEnv, shutdown: bool):
"""
Test the CLOG-scanning mechanism at hot standby startup in the presence of
prepared transactions.
This test is run in two variants: one where the primary server is shut down
before starting the secondary, or not.
"""
# Initialize the primary, a test table, and a helper function to create lots
# of subtransactions.
env = neon_simple_env
primary = env.endpoints.create_start(
branch_name="main", endpoint_id="primary", config_lines=["max_prepared_transactions=5"]
)
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
primary_cur.execute("CREATE EXTENSION neon_test_utils")
primary_cur.execute("create table t(pk serial primary key, payload integer)")
primary_cur.execute("create table t1(pk integer primary key)")
primary_cur.execute("create table t2(pk integer primary key)")
primary_cur.execute(CREATE_SUBXACTS_FUNC)
# Prepare a transaction for two-phase commit
primary_cur.execute("begin")
primary_cur.execute("insert into t1 values (1)")
primary_cur.execute("prepare transaction 't1'")
# Prepare another transaction for two-phase commit, with a subtransaction
primary_cur.execute("begin")
primary_cur.execute("insert into t2 values (2)")
primary_cur.execute("savepoint sp")
primary_cur.execute("insert into t2 values (3)")
primary_cur.execute("prepare transaction 't2'")
# Start a transaction in the primary. Leave the transaction open.
#
# The transaction has some subtransactions, but not too many to cause the
# CLOG-scanning mechanism to give up.
primary_cur.execute("begin")
primary_cur.execute("select create_subxacts(50)")
# Wait for the WAL to be flushed
primary_cur.execute("select neon_xlogflush()")
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
if shutdown:
primary.stop(mode="fast")
# Create a replica. It should start up normally, thanks to the CLOG-scanning
# mechanism.
secondary = env.endpoints.new_replica_start(
origin=primary, endpoint_id="secondary", config_lines=["max_prepared_transactions=5"]
)
# The transaction did not commit, so it should not be visible in the secondary
secondary_conn = secondary.connect()
secondary_cur = secondary_conn.cursor()
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (0,)
secondary_cur.execute("select count(*) from t1")
assert secondary_cur.fetchone() == (0,)
secondary_cur.execute("select count(*) from t2")
assert secondary_cur.fetchone() == (0,)
if shutdown:
primary.start()
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
else:
primary_cur.execute("commit")
primary_cur.execute("commit prepared 't1'")
primary_cur.execute("commit prepared 't2'")
wait_replica_caughtup(primary, secondary)
secondary_cur.execute("select count(*) from t")
if shutdown:
assert secondary_cur.fetchone() == (0,)
else:
assert secondary_cur.fetchone() == (50,)
secondary_cur.execute("select * from t1")
assert secondary_cur.fetchall() == [(1,)]
secondary_cur.execute("select * from t2")
assert secondary_cur.fetchall() == [(2,), (3,)]
def test_replica_start_with_prepared_xacts_with_subxacts(neon_simple_env: NeonEnv):
"""
Test the CLOG-scanning mechanism at hot standby startup in the presence of
prepared transactions, with subtransactions.
"""
# Initialize the primary, a test table, and a helper function to create lots
# of subtransactions.
env = neon_simple_env
primary = env.endpoints.create_start(
branch_name="main", endpoint_id="primary", config_lines=["max_prepared_transactions=5"]
)
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
# Install extension containing function needed for test
primary_cur.execute("CREATE EXTENSION neon_test_utils")
primary_cur.execute("create table t(pk serial primary key, payload integer)")
primary_cur.execute(CREATE_SUBXACTS_FUNC)
# Advance nextXid close to the beginning of the next pg_subtrans segment (2^16 XIDs)
#
# This is interesting, because it tests that pg_subtrans is initialized correctly
# at standby startup. (We had a bug where it didn't at one point during development.)
while True:
xid = int(query_scalar(primary_cur, "SELECT txid_current()"))
log.info(f"xid now {xid}")
# Consume 500 transactions at a time until we get close
if xid < 65535 - 600:
primary_cur.execute("select test_consume_xids(500);")
else:
break
primary_cur.execute("checkpoint")
# Prepare a transaction for two-phase commit
primary_cur.execute("begin")
primary_cur.execute("select create_subxacts(1000)")
primary_cur.execute("prepare transaction 't1'")
# Wait for the WAL to be flushed, and stop the primary
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
primary.stop(mode="fast")
# Create a replica. It should start up normally, thanks to the CLOG-scanning
# mechanism.
secondary = env.endpoints.new_replica_start(
origin=primary, endpoint_id="secondary", config_lines=["max_prepared_transactions=5"]
)
# The transaction did not commit, so it should not be visible in the secondary
secondary_conn = secondary.connect()
secondary_cur = secondary_conn.cursor()
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (0,)
primary.start()
# Open a lot of subtransactions in the primary, causing the subxids cache to overflow
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
primary_cur.execute("select create_subxacts(100000)")
wait_replica_caughtup(primary, secondary)
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (100000,)
primary_cur.execute("commit prepared 't1'")
wait_replica_caughtup(primary, secondary)
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (101000,)
def test_replica_start_with_prepared_xacts_with_many_subxacts(neon_simple_env: NeonEnv):
"""
Test the CLOG-scanning mechanism at hot standby startup in the presence of
prepared transactions, with lots of subtransactions.
Like test_replica_start_with_prepared_xacts_with_subxacts, but with more
subxacts, to test that the prepared transaction's subxids don't consume
space in the known-assigned XIDs array. (They are set in pg_subtrans
instead)
"""
# Initialize the primary, a test table, and a helper function to create lots
# of subtransactions.
env = neon_simple_env
primary = env.endpoints.create_start(
branch_name="main", endpoint_id="primary", config_lines=["max_prepared_transactions=5"]
)
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
# Install extension containing function needed for test
primary_cur.execute("CREATE EXTENSION neon_test_utils")
primary_cur.execute("create table t(pk serial primary key, payload integer)")
primary_cur.execute(CREATE_SUBXACTS_FUNC)
# Prepare a transaction for two-phase commit, with lots of subxids
primary_cur.execute("begin")
primary_cur.execute("select create_subxacts(50000)")
# to make things a bit more varied, intersperse a few other XIDs in between
# the prepared transaction's sub-XIDs
with primary.connect().cursor() as primary_cur2:
primary_cur2.execute("insert into t (payload) values (123)")
primary_cur2.execute("begin; insert into t (payload) values (-1); rollback")
primary_cur.execute("select create_subxacts(50000)")
primary_cur.execute("prepare transaction 't1'")
# Wait for the WAL to be flushed
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
primary.stop(mode="fast")
# Create a replica. It should start up normally, thanks to the CLOG-scanning
# mechanism.
secondary = env.endpoints.new_replica_start(
origin=primary, endpoint_id="secondary", config_lines=["max_prepared_transactions=5"]
)
# The transaction did not commit, so it should not be visible in the secondary
secondary_conn = secondary.connect()
secondary_cur = secondary_conn.cursor()
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (1,)
primary.start()
# Open a lot of subtransactions in the primary, causing the subxids cache to overflow
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
primary_cur.execute("select create_subxacts(100000)")
wait_replica_caughtup(primary, secondary)
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (100001,)
primary_cur.execute("commit prepared 't1'")
wait_replica_caughtup(primary, secondary)
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (200001,)

View File

@@ -1,32 +0,0 @@
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
@pytest.mark.xfail
def test_replication_start(neon_simple_env: NeonEnv):
env = neon_simple_env
with env.endpoints.create_start(branch_name="main", endpoint_id="primary") as primary:
with primary.connect() as p_con:
with p_con.cursor() as p_cur:
p_cur.execute("begin")
p_cur.execute("create table t(pk integer primary key, payload integer)")
p_cur.execute("insert into t values (generate_series(1,100000), 0)")
p_cur.execute("select txid_current()")
xid = p_cur.fetchall()[0][0]
log.info(f"Master transaction {xid}")
with env.endpoints.new_replica_start(
origin=primary, endpoint_id="secondary"
) as secondary:
wait_replica_caughtup(primary, secondary)
with secondary.connect() as s_con:
with s_con.cursor() as s_cur:
# Enforce setting hint bits for pg_class tuples.
# If master's transaction is not marked as in-progress in MVCC snapshot,
# then XMIN_INVALID hint bit will be set for table's 't' tuple makeing it invisible.
s_cur.execute("select * from pg_class")
p_cur.execute("commit")
wait_replica_caughtup(primary, secondary)
s_cur.execute("select * from t where pk = 1")
assert s_cur.fetchone() == (1, 0)

View File

@@ -1518,49 +1518,6 @@ def test_tenant_import(neon_env_builder: NeonEnvBuilder, shard_count, remote_sto
workload.validate()
def retryable_node_operation(op, ps_id, max_attempts, backoff):
while max_attempts > 0:
try:
op(ps_id)
return
except StorageControllerApiException as e:
max_attempts -= 1
log.info(f"Operation failed ({max_attempts} attempts left): {e}")
if max_attempts == 0:
raise e
time.sleep(backoff)
def poll_node_status(env, node_id, desired_scheduling_policy, max_attempts, backoff):
log.info(f"Polling {node_id} for {desired_scheduling_policy} scheduling policy")
while max_attempts > 0:
try:
status = env.storage_controller.node_status(node_id)
policy = status["scheduling"]
if policy == desired_scheduling_policy:
return
else:
max_attempts -= 1
log.info(f"Status call returned {policy=} ({max_attempts} attempts left)")
if max_attempts == 0:
raise AssertionError(
f"Status for {node_id=} did not reach {desired_scheduling_policy=}"
)
time.sleep(backoff)
except StorageControllerApiException as e:
max_attempts -= 1
log.info(f"Status call failed ({max_attempts} retries left): {e}")
if max_attempts == 0:
raise e
time.sleep(backoff)
def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder):
"""
Graceful reststart of storage controller clusters use the drain and
@@ -1601,10 +1558,10 @@ def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder):
# Perform a graceful rolling restart
for ps in env.pageservers:
retryable_node_operation(
env.storage_controller.retryable_node_operation(
lambda ps_id: env.storage_controller.node_drain(ps_id), ps.id, max_attempts=3, backoff=2
)
poll_node_status(env, ps.id, "PauseForRestart", max_attempts=6, backoff=5)
env.storage_controller.poll_node_status(ps.id, "PauseForRestart", max_attempts=6, backoff=5)
shard_counts = get_node_shard_counts(env, tenant_ids)
log.info(f"Shard counts after draining node {ps.id}: {shard_counts}")
@@ -1614,12 +1571,12 @@ def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder):
assert sum(shard_counts.values()) == total_shards
ps.restart()
poll_node_status(env, ps.id, "Active", max_attempts=10, backoff=1)
env.storage_controller.poll_node_status(ps.id, "Active", max_attempts=10, backoff=1)
retryable_node_operation(
env.storage_controller.retryable_node_operation(
lambda ps_id: env.storage_controller.node_fill(ps_id), ps.id, max_attempts=3, backoff=2
)
poll_node_status(env, ps.id, "Active", max_attempts=6, backoff=5)
env.storage_controller.poll_node_status(ps.id, "Active", max_attempts=6, backoff=5)
shard_counts = get_node_shard_counts(env, tenant_ids)
log.info(f"Shard counts after filling node {ps.id}: {shard_counts}")
@@ -1657,15 +1614,15 @@ def test_background_operation_cancellation(neon_env_builder: NeonEnvBuilder):
ps_id_to_drain = env.pageservers[0].id
retryable_node_operation(
env.storage_controller.retryable_node_operation(
lambda ps_id: env.storage_controller.node_drain(ps_id),
ps_id_to_drain,
max_attempts=3,
backoff=2,
)
poll_node_status(env, ps_id_to_drain, "Draining", max_attempts=6, backoff=2)
env.storage_controller.poll_node_status(ps_id_to_drain, "Draining", max_attempts=6, backoff=2)
env.storage_controller.cancel_node_drain(ps_id_to_drain)
poll_node_status(env, ps_id_to_drain, "Active", max_attempts=6, backoff=2)
env.storage_controller.poll_node_status(ps_id_to_drain, "Active", max_attempts=6, backoff=2)

View File

@@ -41,18 +41,35 @@ def test_tenant_creation_fails(neon_simple_env: NeonEnv):
neon_simple_env.storage_controller.allowed_errors.extend(error_regexes)
pageserver_http = neon_simple_env.pageserver.http_client()
# Failure to write a config to local disk makes the pageserver assume that local disk is bad and abort the process
pageserver_http.configure_failpoints(("tenant-config-before-write", "return"))
with pytest.raises(Exception, match="tenant-config-before-write"):
# Storage controller will see a torn TCP connection when the crash point is reached, and follow an unclean 500 error path
neon_simple_env.storage_controller.allowed_errors.extend(
[
".*Reconcile not done yet while creating tenant.*",
".*Reconcile error: receive body: error sending request.*",
".*Error processing HTTP request: InternalServerError.*",
]
)
with pytest.raises(Exception, match="error sending request"):
_ = neon_simple_env.neon_cli.create_tenant()
# Any files left behind on disk during failed creation do not prevent
# a retry from succeeding. Restart pageserver with no failpoints.
neon_simple_env.pageserver.running = False
neon_simple_env.pageserver.start()
# The failed creation should not be present in list of tenants, as when we start up we'll see
# an empty tenant dir with no config in it.
neon_simple_env.pageserver.allowed_errors.append(".*Failed to load tenant config.*")
new_tenants = sorted(
map(lambda t: t.split()[0], neon_simple_env.neon_cli.list_tenants().stdout.splitlines())
)
assert initial_tenants == new_tenants, "should not create new tenants"
# Any files left behind on disk during failed creation do not prevent
# a retry from succeeding.
pageserver_http.configure_failpoints(("tenant-config-before-write", "off"))
neon_simple_env.neon_cli.create_tenant()

View File

@@ -1,5 +1,5 @@
{
"v16": ["16.3", "e06bebc75306b583e758b52c95946d41109239b2"],
"v15": ["15.7", "f54d7373eb0de5a54bce2becdb1c801026c7edff"],
"v14": ["14.12", "223dd925959f8124711dd3d867dc8ba6629d52c0"]
"v16": ["16.3", "b810fdfcbb59afea7ea7bbe0cf94eaccb55a2ea2"],
"v15": ["15.7", "4874c8e52ed349a9f8290bbdcd91eb92677a5d24"],
"v14": ["14.12", "ad73770c446ea361f43e4f0404798b7e5e7a62d8"]
}