Compare commits

..

65 Commits

Author SHA1 Message Date
Christian Schwarz
3388eb1205 Merge branch 'problame/async-cleanup-on-drop-for-writers' into yuchen/direct-io-delta-image-layer-write 2025-04-11 17:46:46 +02:00
Christian Schwarz
2f0677be26 refactor delta&image writers to perform cleanup on Drop in the background
In #10063 we will switch BlobWriter, which underlies delta and image
layer writers, to use the owned buffers IO buffered writer.

That buffered writer implements double-buffering by virtue of a background task
that performs the flushing -- it owns the VirtualFile and both
DeltaLayerWriter and ImageLayerWriter are mere clients to it.

The implication is that it's no longer true that dropping these client
objects guarantees that all IO activity is complete. We must wait for the
flush task to exit.

In preparation for that new world, this PR moves the cleanup to a short-lived
task that is spawned from the Drop impl, and adds appropriate gate guard
holdings to hook it into the Timeline lifecycle.

We must (theoretically) worry that there will be a retry inbetween Drop
completing and the spawned task completing. It could collide on the
randomly generated temporary file name. We avoid this by switching to a
global monotonic counter.

Refs
- extracted from https://github.com/neondatabase/neon/pull/10063
- epic https://github.com/neondatabase/neon/issues/9868
2025-04-11 17:40:42 +02:00
Christian Schwarz
062c7b9a76 refactor: plumb gate and cancellation down to to blob_io::BlobWriter
In #10063 we will switch BlobWriter to use the owned buffers IO buffered
writer, which implements double-buffering by virtue of a background task
that performs the flushing.

That task's lifecylce must be contained within the Timeline lifecycle,
so, it must hold the timeline gate open and respect Timeline::cancel.

This PR does the noisy plumbing to reduce the #10063 diff.

Refs
- extracted from https://github.com/neondatabase/neon/pull/10063
- epic https://github.com/neondatabase/neon/issues/9868
2025-04-11 17:00:00 +02:00
Christian Schwarz
c6209b4a39 Revert "undo all changes except gate,cancel,context propagation"
This reverts commit f25f71bc98.
2025-04-11 16:57:36 +02:00
Christian Schwarz
f25f71bc98 undo all changes except gate,cancel,context propagation 2025-04-11 16:57:19 +02:00
Christian Schwarz
2ee316b454 minimize diff a bit 2025-04-11 16:46:41 +02:00
Christian Schwarz
a929e7a844 gate & cancel propagation: make it less invasive
- store reference to gate
- store CancellationToken clone
2025-04-11 16:39:54 +02:00
Christian Schwarz
3f417d4ac8 Merge 2025-04-11 main commit 'c66444ea1538349d13ab5e87bca880394434004b' into yuchen/direct-io-delta-image-layer-write 2025-04-11 16:26:41 +02:00
Christian Schwarz
af6c433947 Merge 2025-04-09 main commit 'a04e33ceb638a3ee5fef8d642b57ffc3a4543c98' into yuchen/direct-io-delta-image-layer-write 2025-04-11 16:26:26 +02:00
Christian Schwarz
fd7e3fd82f Merge WITH CONFLICTS commit '72832b32140a78db7612af626d7c69079d73f445' into yuchen/direct-io-delta-image-layer-write
Conflicts:
	pageserver/src/tenant/blob_io.rs
		- minor stuff

Also I noticed some earlier merge went through cleanly
but the `generate_tombstone_image_layer` layer writer didn't have the right
arugments, so, failed to compile. Fixed in this merge commit.
2025-04-11 16:24:35 +02:00
Christian Schwarz
ddf6ba75c2 Merge 2025-04-09 main commit 'd11f23a3419a5b8eef62bc5736a4dd9d413bdab8' into yuchen/direct-io-delta-image-layer-write 2025-04-11 16:18:59 +02:00
Christian Schwarz
f017382b2b Merge 2025-04-09 main commit 'e7502a3d637932a59ee502ababb1df3d0e3bca26' into yuchen/direct-io-delta-image-layer-write 2025-04-11 16:18:48 +02:00
Christian Schwarz
d0cb1a93dc Merge 2025-04-09 main commit 'ef8101a9be3ce80d104943238a7d608561432189' into yuchen/direct-io-delta-image-layer-write 2025-04-11 16:18:34 +02:00
Christian Schwarz
140b47dc5a Merge 2025-04-09 main commit 'a6ff8ec3d47963616d9cef07421d9319db958e8a' into yuchen/direct-io-delta-image-layer-write 2025-04-11 16:17:54 +02:00
Christian Schwarz
de1c392082 Merge 2025-04-07 main commit '486872dd28d538817599f29b045be025d1e3f43a' into yuchen/direct-io-delta-image-layer-write 2025-04-11 16:17:32 +02:00
Christian Schwarz
c5c60e156e Merge WITH CONFLICTS 2025-03-18 main commit '9fb77d6cdd0894ec4e93b4fe3a576655cfad3b2e' into yuchen/direct-io-delta-image-layer-write
The previous merge commit was the commit before, so, all these conflicts
are the conflicts that arise from this PR and 97fb77 which is the commit
that added cancellation sensitivity to flush task infinite retries.

Conflicts:
	pageserver/src/tenant/remote_timeline_client/download.rs
		- different return type
	pageserver/src/virtual_file/owned_buffers_io/write.rs
		- added TODO that needs to be fixed before merge about
                  retrying final write. I want a different API than
                  this shutdown() thing we have rn
	pageserver/src/virtual_file/owned_buffers_io/write/flush.rs

Most of the churn came from the need to propagate cancellation token.

And churn in tests from having to propagate upwards the FlushTaskError
instead of the std::io::Error we were propagating upwards before.
2025-04-11 16:13:44 +02:00
Arpad Müller
c66444ea15 Add timeline_import http endpoint (#11484)
The added `timleine_import` endpoint allows us to migrate safekeeper
timelines from control plane managed to storcon managed.
 
Part of #9011
2025-04-11 14:10:27 +00:00
Arpad Müller
88f01c1ca1 Introduce WalIngestError (#11506)
Introduces a `WalIngestError` struct together with a
`WalIngestErrorKind` enum, to be used for walingest related failures and
errors.

* the enum captures backtraces, so we don't regress in comparison to
`anyhow::Error`s (backtraces might be a bit shorter if we use one of the
`anyhow::Error` wrappers)
* it explicitly lists most/all of the potential cases that can occur.

I've originally been inspired to do this in #11496, but it's a
longer-term TODO.
2025-04-11 14:08:46 +00:00
Erik Grinaker
a6937a3281 pageserver: improve shard ancestor compaction logging (#11535)
## Problem

Shard ancestor compaction always logs "starting shard ancestor
compaction", even if there is no work to do. This is very spammy (every
20 seconds for every shard). It also has limited progress logging.

## Summary of changes

* Only log "starting shard ancestor compaction" when there's work to do.
* Include details about the amount of work.
* Log progress messages for each layer, and when waiting for uploads.
* Log when compaction is completed, with elapsed duration and whether
there is more work for a later iteration.
2025-04-11 12:14:08 +00:00
Christian Schwarz
9256935e1b fix download usage of buffered writer (using pad + set_len strategy)
this fixes tenant::timeline::tests::test_heatmap_generation
2025-04-11 13:51:56 +02:00
Christian Schwarz
647c881878 fix for vectored_blob_io::tests::test_really_big_array 2025-04-11 13:31:51 +02:00
Erik Grinaker
3c8565a194 test_runner: propagate config via attach_hook for test fix (#11529)
## Problem

The `pagebench` benchmarks set up an initial dataset by creating a
template tenant, copying the remote storage to a bunch of new tenants,
and attaching them to Pageservers.

In #11420, we found that
`test_pageserver_characterize_throughput_with_n_tenants` had degraded
performance because it set a custom tenant config in Pageservers that
was then replaced with the default tenant config by the storage
controller.

The initial fix was to register the tenants directly in the storage
controller, but this created the tenants with generation 1. This broke
`test_basebackup_with_high_slru_count`, where the template tenant was at
generation 2, leading to all layer files at generation 2 being ignored.

Resolves #11485.
Touches #11381.

## Summary of changes

This patch addresses both test issues by modifying `attach_hook` to also
take a custom tenant config. This allows attaching tenants to
Pageservers from pre-existing remote storage, specifying both the
generation and tenant config when registering them in the storage
controller.
2025-04-11 11:31:12 +00:00
Christian Schwarz
d1277b8259 I have a hypothesis for what the issue is with the vectored_blob_io::tests::test_really_big_array 2025-04-11 13:26:39 +02:00
Christian Schwarz
53b837d507 put in a note on blob_io writer not needed to do owned buffers io anymore 2025-04-11 13:25:25 +02:00
Christian Schwarz
f5d69e97c4 remark: vectored_blob_io::tests::test_really_big_array is failing since before I started merging from main 2025-04-11 12:56:07 +02:00
Christian Schwarz
e79beb0720 turns out we can delete all the seek-related APIs as well 2025-04-11 12:29:22 +02:00
Christian Schwarz
dfc364e4f4 remove non-absolute-position write APIs from VirtualFile 2025-04-11 11:57:09 +02:00
Christian Schwarz
979fa0682b tests: update batching perf test workload to include scattered LSNs (#11391)
The batching perf test workload is currently read-only sequential scans.
However, realistic workloads have concurrent writes (to other pages)
going on.

This PR simulates concurrent writes to other pages by emitting logical
replication messages.

These degrade the achieved batching factor, for the reason see
- https://github.com/neondatabase/neon/issues/10765

PR 
- https://github.com/neondatabase/neon/pull/11494

will fix this problem and get batching factor back up.

---------

Co-authored-by: Vlad Lazar <vlad@neon.tech>
2025-04-11 09:55:49 +00:00
Christian Schwarz
8884865bca tests: make test_pageserver_getpage_throttle less flaky (#11482)
# Refs

- fixes https://github.com/neondatabase/neon/issues/11395

# Problem

Since 2025-03-10, we have observed increased flakiness of
`test_pageserver_getpage_throttle`.

The test is timing-dependent by nature, and was hitting the

```
 assert duration_secs >= 10 * actual_smgr_query_seconds, (
        "smgr metrics should not include throttle wait time"
    )
```

quite frequently.

# Analysis

These failures are not reproducible.

In this PR's history is a commit that reran the test 100 times without
requiring a single retry.

In https://github.com/neondatabase/neon/issues/11395 there is a link to
a query to the test results database.
It shows that the flakiness was not constant, but rather episodic:
2025-03-{10,11,12,13} 2025-03-{19,20,21} 2025-03-31 and 2025-04-01.

To me, this suggests variability in available CPU.

# Solution

The point of the offending assertion is to ensure that most of the
request latency is spent on throttling, because testing of the
throttling mechanism is the point of the test.
The `10` magic number means at most 10% of mean latency may be spent on
request processing.

Ideally we would control the passage of time (virtual clock source) to
make this test deterministic.

But I don't see that happening in our regression test setup.

So, this PR de-flakes the test as follows:
- allot up to 66% of mean latency for request processing
- increase duration from 10s to 20s, hoping to get better protection
from momentary CPU spikes in noisy neighbor tests or VMs on the runner
host

As a drive-by, switch to `pytest.approx` and remove one self-test
assertion I can't make sense of anymore.
2025-04-11 09:38:05 +00:00
Dmitrii Kovalkov
4c4e33bc2e storage: add http/https server and cert resover metrics (#11450)
## Problem
We need to export some metrics about certs/connections to configure
alerts and make sure that all HTTP requests are gone before turning
https-only mode on.
- Closes: https://github.com/neondatabase/cloud/issues/25526

## Summary of changes
- Add started connection and connection error metrics to http/https
Server.
- Add certificate expiration time and reload metrics to
ReloadingCertificateResolver.
2025-04-11 06:11:35 +00:00
Tristan Partin
342607473a Make Endpoint::respec_deep() infinitely deep (#11527)
Because it wasn't recursive, there was a limit to the depth of updates.
This work is necessary because as we teach neon_local and compute_ctl
that the content in --spec-path should match a similar structure we get
from the control plane, the spec object itself will no longer be
toplevel. It will be under the "spec" key.

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-04-10 19:55:51 +00:00
John Spray
9c37bfc90a pageserver/tests: make image_layer_rewrite write less data (#11525)
## Problem

This test is slow to execute, particularly if you're on a slow
environment like vscode in a browser. Might have got much slower when we
switched to direct IO?

## Summary of changes

- Reduce the scale of the test by 10x, since there was nothing special
about the original size.
2025-04-10 17:03:22 +00:00
John Spray
52dee408dc storage controller: improve safety of shard splits coinciding with controller restarts (#11412)
## Problem

The graceful leadership transfer process involves calling step_down on
the old controller, but this was not waiting for shard splits to
complete, and the new controller could therefore end up trying to abort
a shard split while it was still going on.

We mitigated this already in #11256 by avoiding the case where shard
split completion would update the database incorrectly, but this was a
fragile fix because it assumes that is the only problematic part of the
split running concurrently.

Precursors:
- #11290 
- #11256

Closes: #11254 

## Summary of changes

- Hold the reconciler gate from shard splits, so that step_down will
wait for them. Splits should always be fairly prompt, so it is okay to
wait here.
- Defense in depth: if step_down times out (hardcoded 10 second limit),
then fully terminate the controller process rather than letting it
continue running, potentially doing split-brainy things. This makes
sense because the new controller will always declare itself leader
unilaterally if step_down fails, so leaving an old controller running is
not beneficial.
- Tests: extend
`test_storage_controller_leadership_transfer_during_split` to separately
exercise the case of a split holding up step_down, and the case where
the overall timeout on step_down is hit and the controller terminates.
2025-04-10 16:55:37 +00:00
Anastasia Lubennikova
5487a20b72 compute: Set log_parameter=off for audit logging. (#11500)
Log -> Base,
pgaudit.log = 'ddl', pgaudit.log_parameter='off'

Hipaa -> Extended.
pgaudit.log = 'all, -misc', pgaudit.log_parameter='off'
    
add new level Full:
pgaudit.log='all', pgaudit.log_parameter='on'

Keep old parameter names for compatibility,
until cplane side changes are implemented and released.

closes https://github.com/neondatabase/cloud/issues/27202
2025-04-10 15:28:28 +00:00
Alex Chi Z.
f06d721a98 test(pageserver): ensure gc-compaction does not fire critical errors (#11513)
## Problem

Part of https://github.com/neondatabase/neon/issues/10395

## Summary of changes

Add a test case to ensure gc-compaction doesn't fire any critical errors
if the key history is invalid due to partial GC.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-10 14:53:37 +00:00
Christian Schwarz
2e35f23085 tests: remove ignored fair field (#11521)
Pageserver has been ignoring field
`tenant_config.timeline_get_throttle.fair`
for many monhts, since we removed it from the config struct in
neondatabase/neon#8539.

Refs
- epic https://github.com/neondatabase/cloud/issues/27320
2025-04-10 14:24:30 +00:00
Anastasia Lubennikova
5063151271 compute: Add more neon ids to compute (#11366)
Pass more neon ids to compute_ctl.
Expose them to postgres as neon extension GUCs:
neon.project_id, neon.branch_id, neon.endpoint_id.


This is the compute side PR, not yet supported by cplane.
2025-04-10 13:04:18 +00:00
Erik Grinaker
0122d97f95 test_runner: only use last gen in test_location_conf_churn (#11511)
## Problem

`test_location_conf_churn` performs random location updates on
Pageservers. While doing this, it could instruct the compute to connect
to a stale generation and execute queries. This is invalid, and will
fail if a newer generation has removed layer files used by the stale
generation.

Resolves #11348.

## Summary of changes

Only connect to the latest generation when executing queries.
2025-04-10 10:07:16 +00:00
Arseny Sher
fae7528adb walproposer: make it aware of membership (#11407)
## Problem

Walproposer should get elected and commit WAL on safekeepers specified
by the membership configuration.

## Summary of changes

- Add to wp `members_safekeepers` and `new_members_safekeepers` arrays
mapping configuration members to connection slots. Establish this
mapping (by node id) when safekeeper sends greeting, giving its id and
when mconf becomes known / changes.
- Add to TermsCollected, VotesCollected,
GetAcknowledgedByQuorumWALPosition membership aware logic. Currently it
partially duplicates existing one, but we'll drop the latter eventually.
- In python, rename Configuration to MembershipConfiguration for
clarity.
- Add test_quorum_sanity testing new logic.

ref https://github.com/neondatabase/neon/issues/10851
2025-04-10 09:55:37 +00:00
Christian Schwarz
9222995c4f REVIEW more the shutdown API 2025-04-10 11:16:35 +02:00
Dmitrii Kovalkov
8a72e6f888 pageserver: add enable_tls_page_service_api (#11508)
## Problem
Page service doesn't use TLS for incoming requests.
- Closes: https://github.com/neondatabase/cloud/issues/27236

## Summary of changes
- Add option `enable_tls_page_service_api` to pageserver config
- Propagate `tls_server_config` to `page_service` if the option is
enabled

No integration tests for now because I didn't find out how to call page
service API from python and AFAIK computes don't support TLS yet
2025-04-10 08:45:17 +00:00
Christian Schwarz
6f25c976f6 REVIEW: undo the mutable->tail rename to minimize conflicts with next commit
Changes to be committed:
	modified:   pageserver/src/tenant/ephemeral_file.rs
	modified:   pageserver/src/virtual_file/owned_buffers_io/write.rs
2025-04-10 09:02:45 +02:00
Christian Schwarz
dd3178836d REVIEW: minor nits 2025-04-10 08:58:06 +02:00
Christian Schwarz
2a29b3de89 Merge 2025-03-18 main commit '99639c26b49a0d6d546fd' into yuchen/direct-io-delta-image-layer-write 2025-04-09 19:40:14 +02:00
Christian Schwarz
91aff7b842 Merge WITH CONFLICTS 2025-03-11 main commit '158db414bf881fb358494e3215d192c8fa420a53' into yuchen/dire
ct-io-delta-image-layer-write

Conflicts:
	pageserver/src/virtual_file.rs
	pageserver/src/virtual_file/owned_buffers_io/write/flush.rs
2025-04-09 19:39:56 +02:00
Christian Schwarz
f078d7e1a9 Merge WITH CONFLICTS 2025-03-11 main commit '7c462b3417ecd3ae3907f3480f3b8a8c99fc6d7b' into yuchen/dire
ct-io-delta-image-layer-write

Conflicts:
	pageserver/src/tenant/blob_io.rs
2025-04-09 19:39:12 +02:00
Christian Schwarz
537eb334f2 Merge WITH CONFLICTS 2025-02-25 main commit '920040e40240774219b6607f1f8ef74478dc4b29' into yuchen/dire
ct-io-delta-image-layer-write

Conflicts:
	pageserver/src/tenant/blob_io.rs
	pageserver/src/tenant/block_io.rs
	pageserver/src/tenant/disk_btree.rs
	pageserver/src/tenant/storage_layer/delta_layer.rs
	pageserver/src/tenant/storage_layer/image_layer.rs
	pageserver/src/virtual_file/owned_buffers_io/write.rs
2025-04-09 19:38:20 +02:00
Christian Schwarz
e37cbc1a50 make clippy pass 2025-04-09 19:33:35 +02:00
Yuchen Liang
38f1150ae2 review: shutdown in background task and let caller decide padding behavior
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-12-13 16:05:35 +00:00
Yuchen Liang
b1edbf667c review: use monotonically incresing suffix number for image/delta temp path
https://github.com/neondatabase/neon/pull/10063#discussion_r1880433368

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-12-12 19:25:38 +00:00
Yuchen Liang
daf214e2e9 review: create follow-up issue for changing BlockBuf::blocks
https://github.com/neondatabase/neon/pull/10063/files#r1880401785

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-12-11 19:13:08 +00:00
Yuchen Liang
3d4052f253 review: remove debug prints
https://github.com/neondatabase/neon/pull/10063#discussion_r1880438898

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-12-11 18:37:03 +00:00
Yuchen Liang
4b358f6a3c review: proper error handling for OwnedAsyncWriter::write_all_at
https://github.com/neondatabase/neon/pull/10063#discussion_r1880439513

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-12-11 18:34:01 +00:00
Yuchen Liang
0066f23c65 change it preferred back to IoMode::Buffered
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-12-10 22:42:23 +00:00
Yuchen Liang
6fa9a13ad4 rename end mutable buffer tail
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-12-10 22:40:39 +00:00
Yuchen Liang
e05b5d6ae4 extract summary serialization logic into Summary::ser_into_page
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-12-10 22:29:47 +00:00
Yuchen Liang
c6795532a0 use Cargo.lock from main
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-12-10 20:34:06 +00:00
Yuchen Liang
37ce1c1ee3 Merge branch 'main' into yuchen/direct-io-delta-image-layer-write 2024-12-10 13:19:44 -05:00
Yuchen Liang
cf5b0c9e43 use v2 file API and pad summary blk
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-12-10 18:18:22 +00:00
Yuchen Liang
7aa8111af9 fix doc
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-12-09 19:12:04 +00:00
Yuchen Liang
18cf3d6609 switch IoMode::preferred() back to Buffered
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-12-09 19:09:25 +00:00
Yuchen Liang
7261e426cc Merge branch 'main' into yuchen/direct-io-delta-image-layer-write 2024-12-09 11:55:35 -05:00
Yuchen Liang
9f977ba9bb fix clippy
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-12-09 16:48:56 +00:00
Yuchen Liang
011a578f1b use timeline gate to guard flush task
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-12-09 16:34:53 +00:00
Yuchen Liang
d079bf1d48 implement blob-writer io functionalities
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-12-06 21:50:53 +00:00
69 changed files with 2307 additions and 1105 deletions

1
Cargo.lock generated
View File

@@ -2837,6 +2837,7 @@ dependencies = [
"utils",
"uuid",
"workspace_hack",
"x509-cert",
]
[[package]]

View File

@@ -523,11 +523,14 @@ impl ComputeNode {
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
info!(
"starting compute for project {}, operation {}, tenant {}, timeline {}, features {:?}, spec.remote_extensions {:?}",
"starting compute for project {}, operation {}, tenant {}, timeline {}, project {}, branch {}, endpoint {}, features {:?}, spec.remote_extensions {:?}",
pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None"),
pspec.spec.operation_uuid.as_deref().unwrap_or("None"),
pspec.tenant_id,
pspec.timeline_id,
pspec.spec.project_id.as_deref().unwrap_or("None"),
pspec.spec.branch_id.as_deref().unwrap_or("None"),
pspec.spec.endpoint_id.as_deref().unwrap_or("None"),
pspec.spec.features,
pspec.spec.remote_extensions,
);
@@ -631,19 +634,23 @@ impl ComputeNode {
});
}
// Configure and start rsyslog for HIPAA if necessary
if let ComputeAudit::Hipaa = pspec.spec.audit_log_level {
let remote_endpoint = std::env::var("AUDIT_LOGGING_ENDPOINT").unwrap_or("".to_string());
if remote_endpoint.is_empty() {
anyhow::bail!("AUDIT_LOGGING_ENDPOINT is empty");
// Configure and start rsyslog for compliance audit logging
match pspec.spec.audit_log_level {
ComputeAudit::Hipaa | ComputeAudit::Extended | ComputeAudit::Full => {
let remote_endpoint =
std::env::var("AUDIT_LOGGING_ENDPOINT").unwrap_or("".to_string());
if remote_endpoint.is_empty() {
anyhow::bail!("AUDIT_LOGGING_ENDPOINT is empty");
}
let log_directory_path = Path::new(&self.params.pgdata).join("log");
let log_directory_path = log_directory_path.to_string_lossy().to_string();
configure_audit_rsyslog(log_directory_path.clone(), "hipaa", &remote_endpoint)?;
// Launch a background task to clean up the audit logs
launch_pgaudit_gc(log_directory_path);
}
let log_directory_path = Path::new(&self.params.pgdata).join("log");
let log_directory_path = log_directory_path.to_string_lossy().to_string();
configure_audit_rsyslog(log_directory_path.clone(), "hipaa", &remote_endpoint)?;
// Launch a background task to clean up the audit logs
launch_pgaudit_gc(log_directory_path);
_ => {}
}
// Configure and start rsyslog for Postgres logs export

View File

@@ -89,6 +89,15 @@ pub fn write_postgres_conf(
escape_conf_value(&s.to_string())
)?;
}
if let Some(s) = &spec.project_id {
writeln!(file, "neon.project_id={}", escape_conf_value(s))?;
}
if let Some(s) = &spec.branch_id {
writeln!(file, "neon.branch_id={}", escape_conf_value(s))?;
}
if let Some(s) = &spec.endpoint_id {
writeln!(file, "neon.endpoint_id={}", escape_conf_value(s))?;
}
// tls
if let Some(tls_config) = tls_config {
@@ -169,7 +178,7 @@ pub fn write_postgres_conf(
// and don't allow the user or the control plane admin to change them.
match spec.audit_log_level {
ComputeAudit::Disabled => {}
ComputeAudit::Log => {
ComputeAudit::Log | ComputeAudit::Base => {
writeln!(file, "# Managed by compute_ctl base audit settings: start")?;
writeln!(file, "pgaudit.log='ddl,role'")?;
// Disable logging of catalog queries to reduce the noise
@@ -193,16 +202,20 @@ pub fn write_postgres_conf(
}
writeln!(file, "# Managed by compute_ctl base audit settings: end")?;
}
ComputeAudit::Hipaa => {
ComputeAudit::Hipaa | ComputeAudit::Extended | ComputeAudit::Full => {
writeln!(
file,
"# Managed by compute_ctl compliance audit settings: begin"
)?;
// This log level is very verbose
// but this is necessary for HIPAA compliance.
// Exclude 'misc' category, because it doesn't contain anythig relevant.
writeln!(file, "pgaudit.log='all, -misc'")?;
writeln!(file, "pgaudit.log_parameter=on")?;
// Enable logging of parameters.
// This is very verbose and may contain sensitive data.
if spec.audit_log_level == ComputeAudit::Full {
writeln!(file, "pgaudit.log_parameter=on")?;
writeln!(file, "pgaudit.log='all'")?;
} else {
writeln!(file, "pgaudit.log_parameter=off")?;
writeln!(file, "pgaudit.log='all, -misc'")?;
}
// Disable logging of catalog queries
// The catalog doesn't contain sensitive data, so we don't need to audit it.
writeln!(file, "pgaudit.log_catalog=off")?;

View File

@@ -278,12 +278,12 @@ impl ComputeNode {
// so that all config operations are audit logged.
match spec.audit_log_level
{
ComputeAudit::Hipaa => {
ComputeAudit::Hipaa | ComputeAudit::Extended | ComputeAudit::Full => {
phases.push(CreatePgauditExtension);
phases.push(CreatePgauditlogtofileExtension);
phases.push(DisablePostgresDBPgAudit);
}
ComputeAudit::Log => {
ComputeAudit::Log | ComputeAudit::Base => {
phases.push(CreatePgauditExtension);
phases.push(DisablePostgresDBPgAudit);
}

View File

@@ -658,6 +658,9 @@ impl Endpoint {
delta_operations: None,
tenant_id: Some(self.tenant_id),
timeline_id: Some(self.timeline_id),
project_id: None,
branch_id: None,
endpoint_id: Some(self.endpoint_id.clone()),
mode: self.mode,
pageserver_connstring: Some(pageserver_connstring),
safekeepers_generation: safekeepers_generation.map(|g| g.into_inner()),

View File

@@ -318,7 +318,7 @@ impl PageServerNode {
self.conf.id, datadir,
)
})?;
let args = vec!["-D", datadir_path_str, "--dev"];
let args = vec!["-D", datadir_path_str];
background_process::start_process(
"pageserver",

View File

@@ -162,7 +162,6 @@ impl SafekeeperNode {
listen_http,
"--availability-zone".to_owned(),
availability_zone,
"--dev".to_owned(),
];
if let Some(pg_tenant_only_port) = self.conf.pg_tenant_only_port {
let listen_pg_tenant_only = format!("{}:{}", self.listen_addr, pg_tenant_only_port);

View File

@@ -13,7 +13,9 @@ use pageserver_api::controller_api::{
NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest,
TenantCreateResponse, TenantLocateResponse,
};
use pageserver_api::models::{TenantConfigRequest, TimelineCreateRequest, TimelineInfo};
use pageserver_api::models::{
TenantConfig, TenantConfigRequest, TimelineCreateRequest, TimelineInfo,
};
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api::ResponseErrorMessageExt;
use postgres_backend::AuthType;
@@ -82,7 +84,8 @@ impl NeonStorageControllerStopArgs {
pub struct AttachHookRequest {
pub tenant_shard_id: TenantShardId,
pub node_id: Option<NodeId>,
pub generation_override: Option<i32>,
pub generation_override: Option<i32>, // only new tenants
pub config: Option<TenantConfig>, // only new tenants
}
#[derive(Serialize, Deserialize)]
@@ -805,6 +808,7 @@ impl StorageController {
tenant_shard_id,
node_id: Some(pageserver_id),
generation_override: None,
config: None,
};
let response = self

View File

@@ -104,6 +104,12 @@ pub struct ComputeSpec {
pub timeline_id: Option<TimelineId>,
pub pageserver_connstring: Option<String>,
// More neon ids that we expose to the compute_ctl
// and to postgres as neon extension GUCs.
pub project_id: Option<String>,
pub branch_id: Option<String>,
pub endpoint_id: Option<String>,
/// Safekeeper membership config generation. It is put in
/// neon.safekeepers GUC and serves two purposes:
/// 1) Non zero value forces walproposer to use membership configurations.
@@ -159,13 +165,7 @@ pub struct ComputeSpec {
#[serde(default)] // Default false
pub drop_subscriptions_before_start: bool,
/// Log level for audit logging:
///
/// Disabled - no audit logging. This is the default.
/// log - log masked statements to the postgres log using pgaudit extension
/// hipaa - log unmasked statements to the file using pgaudit and pgauditlogtofile extension
///
/// Extensions should be present in shared_preload_libraries
/// Log level for compute audit logging
#[serde(default)]
pub audit_log_level: ComputeAudit,
@@ -289,14 +289,25 @@ impl ComputeMode {
}
/// Log level for audit logging
/// Disabled, log, hipaa
/// Default is Disabled
#[derive(Clone, Debug, Default, Eq, PartialEq, Deserialize, Serialize)]
pub enum ComputeAudit {
#[default]
Disabled,
// Deprecated, use Base instead
Log,
// (pgaudit.log = 'ddl', pgaudit.log_parameter='off')
// logged to the standard postgresql log stream
Base,
// Deprecated, use Full or Extended instead
Hipaa,
// (pgaudit.log = 'all, -misc', pgaudit.log_parameter='off')
// logged to separate files collected by rsyslog
// into dedicated log storage with strict access
Extended,
// (pgaudit.log='all', pgaudit.log_parameter='on'),
// logged to separate files collected by rsyslog
// into dedicated log storage with strict access.
Full,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]

View File

@@ -30,6 +30,7 @@ tokio.workspace = true
tracing.workspace = true
url.workspace = true
uuid.workspace = true
x509-cert.workspace = true
# to use tokio channels as streams, this is faster to compile than async_stream
# why is it only here? no other crate should use it, streams are rarely needed.

View File

@@ -4,6 +4,8 @@ use futures::StreamExt;
use futures::stream::FuturesUnordered;
use hyper0::Body;
use hyper0::server::conn::Http;
use metrics::{IntCounterVec, register_int_counter_vec};
use once_cell::sync::Lazy;
use routerify::{RequestService, RequestServiceBuilder};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_rustls::TlsAcceptor;
@@ -26,6 +28,24 @@ pub struct Server {
tls_acceptor: Option<TlsAcceptor>,
}
static CONNECTION_STARTED_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"http_server_connection_started_total",
"Number of established http/https connections",
&["scheme"]
)
.expect("failed to define a metric")
});
static CONNECTION_ERROR_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"http_server_connection_errors_total",
"Number of occured connection errors by type",
&["type"]
)
.expect("failed to define a metric")
});
impl Server {
pub fn new(
request_service: Arc<RequestServiceBuilder<Body, ApiError>>,
@@ -60,6 +80,15 @@ impl Server {
false
}
let tcp_error_cnt = CONNECTION_ERROR_COUNT.with_label_values(&["tcp"]);
let tls_error_cnt = CONNECTION_ERROR_COUNT.with_label_values(&["tls"]);
let http_error_cnt = CONNECTION_ERROR_COUNT.with_label_values(&["http"]);
let https_error_cnt = CONNECTION_ERROR_COUNT.with_label_values(&["https"]);
let panic_error_cnt = CONNECTION_ERROR_COUNT.with_label_values(&["panic"]);
let http_connection_cnt = CONNECTION_STARTED_COUNT.with_label_values(&["http"]);
let https_connection_cnt = CONNECTION_STARTED_COUNT.with_label_values(&["https"]);
let mut connections = FuturesUnordered::new();
loop {
tokio::select! {
@@ -67,6 +96,7 @@ impl Server {
let (tcp_stream, remote_addr) = match stream {
Ok(stream) => stream,
Err(err) => {
tcp_error_cnt.inc();
if !suppress_io_error(&err) {
info!("Failed to accept TCP connection: {err:#}");
}
@@ -78,11 +108,18 @@ impl Server {
let tls_acceptor = self.tls_acceptor.clone();
let cancel = cancel.clone();
let tls_error_cnt = tls_error_cnt.clone();
let http_error_cnt = http_error_cnt.clone();
let https_error_cnt = https_error_cnt.clone();
let http_connection_cnt = http_connection_cnt.clone();
let https_connection_cnt = https_connection_cnt.clone();
connections.push(tokio::spawn(
async move {
match tls_acceptor {
Some(tls_acceptor) => {
// Handle HTTPS connection.
https_connection_cnt.inc();
let tls_stream = tokio::select! {
tls_stream = tls_acceptor.accept(tcp_stream) => tls_stream,
_ = cancel.cancelled() => return,
@@ -90,6 +127,7 @@ impl Server {
let tls_stream = match tls_stream {
Ok(tls_stream) => tls_stream,
Err(err) => {
tls_error_cnt.inc();
if !suppress_io_error(&err) {
info!(%remote_addr, "Failed to accept TLS connection: {err:#}");
}
@@ -97,6 +135,7 @@ impl Server {
}
};
if let Err(err) = Self::serve_connection(tls_stream, service, cancel).await {
https_error_cnt.inc();
if !suppress_hyper_error(&err) {
info!(%remote_addr, "Failed to serve HTTPS connection: {err:#}");
}
@@ -104,7 +143,9 @@ impl Server {
}
None => {
// Handle HTTP connection.
http_connection_cnt.inc();
if let Err(err) = Self::serve_connection(tcp_stream, service, cancel).await {
http_error_cnt.inc();
if !suppress_hyper_error(&err) {
info!(%remote_addr, "Failed to serve HTTP connection: {err:#}");
}
@@ -115,6 +156,7 @@ impl Server {
}
Some(conn) = connections.next() => {
if let Err(err) = conn {
panic_error_cnt.inc();
error!("Connection panicked: {err:#}");
}
}
@@ -122,6 +164,7 @@ impl Server {
// Wait for graceful shutdown of all connections.
while let Some(conn) = connections.next().await {
if let Err(err) = conn {
panic_error_cnt.inc();
error!("Connection panicked: {err:#}");
}
}

View File

@@ -3,11 +3,14 @@ use std::{sync::Arc, time::Duration};
use anyhow::Context;
use arc_swap::ArcSwap;
use camino::Utf8Path;
use metrics::{IntCounterVec, UIntGaugeVec, register_int_counter_vec, register_uint_gauge_vec};
use once_cell::sync::Lazy;
use rustls::{
pki_types::{CertificateDer, PrivateKeyDer},
pki_types::{CertificateDer, PrivateKeyDer, UnixTime},
server::{ClientHello, ResolvesServerCert},
sign::CertifiedKey,
};
use x509_cert::der::Reader;
pub async fn load_cert_chain(filename: &Utf8Path) -> anyhow::Result<Vec<CertificateDer<'static>>> {
let cert_data = tokio::fs::read(filename)
@@ -53,6 +56,76 @@ pub async fn load_certified_key(
Ok(certified_key)
}
/// rustls's CertifiedKey with extra parsed fields used for metrics.
struct ParsedCertifiedKey {
certified_key: CertifiedKey,
expiration_time: UnixTime,
}
/// Parse expiration time from an X509 certificate.
fn parse_expiration_time(cert: &CertificateDer<'_>) -> anyhow::Result<UnixTime> {
let parsed_cert = x509_cert::der::SliceReader::new(cert)
.context("Failed to parse cerficiate")?
.decode::<x509_cert::Certificate>()
.context("Failed to parse cerficiate")?;
Ok(UnixTime::since_unix_epoch(
parsed_cert
.tbs_certificate
.validity
.not_after
.to_unix_duration(),
))
}
async fn load_and_parse_certified_key(
key_filename: &Utf8Path,
cert_filename: &Utf8Path,
) -> anyhow::Result<ParsedCertifiedKey> {
let certified_key = load_certified_key(key_filename, cert_filename).await?;
let expiration_time = parse_expiration_time(certified_key.end_entity_cert()?)?;
Ok(ParsedCertifiedKey {
certified_key,
expiration_time,
})
}
static CERT_EXPIRATION_TIME: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"tls_certs_expiration_time_seconds",
"Expiration time of the loaded certificate since unix epoch in seconds",
&["resolver_name"]
)
.expect("failed to define a metric")
});
static CERT_RELOAD_STARTED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"tls_certs_reload_started_total",
"Number of certificate reload loop iterations started",
&["resolver_name"]
)
.expect("failed to define a metric")
});
static CERT_RELOAD_UPDATED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"tls_certs_reload_updated_total",
"Number of times the certificate was updated to the new one",
&["resolver_name"]
)
.expect("failed to define a metric")
});
static CERT_RELOAD_FAILED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"tls_certs_reload_failed_total",
"Number of times the certificate reload failed",
&["resolver_name"]
)
.expect("failed to define a metric")
});
/// Implementation of [`rustls::server::ResolvesServerCert`] which reloads certificates from
/// the disk periodically.
#[derive(Debug)]
@@ -63,16 +136,28 @@ pub struct ReloadingCertificateResolver {
impl ReloadingCertificateResolver {
/// Creates a new Resolver by loading certificate and private key from FS and
/// creating tokio::task to reload them with provided reload_period.
/// resolver_name is used as metric's label.
pub async fn new(
resolver_name: &str,
key_filename: &Utf8Path,
cert_filename: &Utf8Path,
reload_period: Duration,
) -> anyhow::Result<Arc<Self>> {
// Create metrics for current resolver.
let cert_expiration_time = CERT_EXPIRATION_TIME.with_label_values(&[resolver_name]);
let cert_reload_started_counter =
CERT_RELOAD_STARTED_COUNTER.with_label_values(&[resolver_name]);
let cert_reload_updated_counter =
CERT_RELOAD_UPDATED_COUNTER.with_label_values(&[resolver_name]);
let cert_reload_failed_counter =
CERT_RELOAD_FAILED_COUNTER.with_label_values(&[resolver_name]);
let parsed_key = load_and_parse_certified_key(key_filename, cert_filename).await?;
let this = Arc::new(Self {
certified_key: ArcSwap::from_pointee(
load_certified_key(key_filename, cert_filename).await?,
),
certified_key: ArcSwap::from_pointee(parsed_key.certified_key),
});
cert_expiration_time.set(parsed_key.expiration_time.as_secs());
tokio::spawn({
let weak_this = Arc::downgrade(&this);
@@ -88,17 +173,22 @@ impl ReloadingCertificateResolver {
Some(this) => this,
None => break, // Resolver has been destroyed, exit.
};
match load_certified_key(&key_filename, &cert_filename).await {
Ok(new_certified_key) => {
if new_certified_key.cert == this.certified_key.load().cert {
cert_reload_started_counter.inc();
match load_and_parse_certified_key(&key_filename, &cert_filename).await {
Ok(parsed_key) => {
if parsed_key.certified_key.cert == this.certified_key.load().cert {
tracing::debug!("Certificate has not changed since last reloading");
} else {
tracing::info!("Certificate has been reloaded");
this.certified_key.store(Arc::new(new_certified_key));
this.certified_key.store(Arc::new(parsed_key.certified_key));
cert_expiration_time.set(parsed_key.expiration_time.as_secs());
cert_reload_updated_counter.inc();
}
last_reload_failed = false;
}
Err(err) => {
cert_reload_failed_counter.inc();
// Note: Reloading certs may fail if it conflicts with the script updating
// the files at the same time. Warn only if the error is persistent.
if last_reload_failed {

View File

@@ -180,8 +180,7 @@ pub struct ConfigToml {
#[serde(skip_serializing_if = "Option::is_none")]
pub generate_unarchival_heatmap: Option<bool>,
pub tracing: Option<Tracing>,
pub dev_mode: bool,
pub enable_tls_page_service_api: bool,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -633,7 +632,7 @@ impl Default for ConfigToml {
load_previous_heatmap: None,
generate_unarchival_heatmap: None,
tracing: None,
dev_mode: false,
enable_tls_page_service_api: false,
}
}
}

View File

@@ -7,7 +7,8 @@ use std::time::{Duration, Instant};
/// API (`/control/v1` prefix). Implemented by the server
/// in [`storage_controller::http`]
use serde::{Deserialize, Serialize};
use utils::id::{NodeId, TenantId};
use utils::id::{NodeId, TenantId, TimelineId};
use utils::lsn::Lsn;
use crate::models::{PageserverUtilization, ShardParameters, TenantConfig};
use crate::shard::{ShardStripeSize, TenantShardId};
@@ -499,6 +500,15 @@ pub struct SafekeeperSchedulingPolicyRequest {
pub scheduling_policy: SkSchedulingPolicy,
}
/// Import request for safekeeper timelines.
#[derive(Serialize, Deserialize, Clone)]
pub struct TimelineImportRequest {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub start_lsn: Lsn,
pub sk_set: Vec<NodeId>,
}
#[cfg(test)]
mod test {
use serde_json;

View File

@@ -927,7 +927,7 @@ impl Key {
/// Guaranteed to return `Ok()` if [`Self::is_rel_block_key`] returns `true` for `key`.
#[inline(always)]
pub fn to_rel_block(self) -> anyhow::Result<(RelTag, BlockNumber)> {
pub fn to_rel_block(self) -> Result<(RelTag, BlockNumber), ToRelBlockError> {
Ok(match self.field1 {
0x00 => (
RelTag {
@@ -938,7 +938,7 @@ impl Key {
},
self.field6,
),
_ => anyhow::bail!("unexpected value kind 0x{:02x}", self.field1),
_ => return Err(ToRelBlockError(self.field1)),
})
}
}
@@ -951,6 +951,17 @@ impl std::str::FromStr for Key {
}
}
#[derive(Debug)]
pub struct ToRelBlockError(u8);
impl fmt::Display for ToRelBlockError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "unexpected value kind 0x{:02x}", self.0)
}
}
impl std::error::Error for ToRelBlockError {}
#[cfg(test)]
mod tests {
use std::str::FromStr;

View File

@@ -126,7 +126,7 @@ async fn ingest(
max_concurrency: NonZeroUsize::new(1).unwrap(),
});
let (_desc, path) = layer
.write_to_disk(&ctx, None, l0_flush_state.inner())
.write_to_disk(&ctx, None, l0_flush_state.inner(), &gate, cancel.clone())
.await?
.unwrap();
tokio::fs::remove_file(path).await?;

View File

@@ -9,7 +9,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, anyhow, bail};
use anyhow::{Context, anyhow};
use camino::Utf8Path;
use clap::{Arg, ArgAction, Command};
use http_utils::tls_certs::ReloadingCertificateResolver;
@@ -99,20 +99,6 @@ fn main() -> anyhow::Result<()> {
let (conf, ignored) = initialize_config(&identity_file_path, &cfg_file_path, &workdir)?;
if !conf.dev_mode {
if matches!(conf.http_auth_type, AuthType::Trust)
|| matches!(conf.pg_auth_type, AuthType::Trust)
{
bail!(
"Pageserver refuses to start with HTTP or PostgreSQL API authentication disabled.\n\
Run with --dev to allow running without authentication.\n\
This is insecure and should only be used in development environments."
);
}
} else {
warn!("Starting in dev mode: this may be an insecure configuration.");
}
// Initialize logging.
//
// It must be initialized before the custom panic hook is installed below.
@@ -466,6 +452,24 @@ fn start_pageserver(
info!("Using auth for http API: {:#?}", conf.http_auth_type);
info!("Using auth for pg connections: {:#?}", conf.pg_auth_type);
let tls_server_config = if conf.listen_https_addr.is_some() || conf.enable_tls_page_service_api
{
let resolver = BACKGROUND_RUNTIME.block_on(ReloadingCertificateResolver::new(
"main",
&conf.ssl_key_file,
&conf.ssl_cert_file,
conf.ssl_cert_reload_period,
))?;
let server_config = rustls::ServerConfig::builder()
.with_no_client_auth()
.with_cert_resolver(resolver);
Some(Arc::new(server_config))
} else {
None
};
match var("NEON_AUTH_TOKEN") {
Ok(v) => {
info!("Loaded JWT token for authentication with Safekeeper");
@@ -684,17 +688,11 @@ fn start_pageserver(
let https_task = match https_listener {
Some(https_listener) => {
let resolver = MGMT_REQUEST_RUNTIME.block_on(ReloadingCertificateResolver::new(
&conf.ssl_key_file,
&conf.ssl_cert_file,
conf.ssl_cert_reload_period,
))?;
let tls_server_config = tls_server_config
.clone()
.expect("tls_server_config is set earlier if https is enabled");
let server_config = rustls::ServerConfig::builder()
.with_no_client_auth()
.with_cert_resolver(resolver);
let tls_acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(server_config));
let tls_acceptor = tokio_rustls::TlsAcceptor::from(tls_server_config);
let server =
http_utils::server::Server::new(service, https_listener, Some(tls_acceptor))?;
@@ -750,6 +748,11 @@ fn start_pageserver(
tokio::net::TcpListener::from_std(pageserver_listener)
.context("create tokio listener")?
},
if conf.enable_tls_page_service_api {
tls_server_config
} else {
None
},
);
// All started up! Now just sit and wait for shutdown signal.

View File

@@ -220,7 +220,10 @@ pub struct PageServerConf {
pub tracing: Option<pageserver_api::config::Tracing>,
pub dev_mode: bool,
/// Enable TLS in page service API.
/// Does not force TLS: the client negotiates TLS usage during the handshake.
/// Uses key and certificate from ssl_key_file/ssl_cert_file.
pub enable_tls_page_service_api: bool,
}
/// Token for authentication to safekeepers
@@ -393,6 +396,7 @@ impl PageServerConf {
load_previous_heatmap,
generate_unarchival_heatmap,
tracing,
enable_tls_page_service_api,
} = config_toml;
let mut conf = PageServerConf {
@@ -443,7 +447,7 @@ impl PageServerConf {
page_service_pipelining,
get_vectored_concurrent_io,
tracing,
dev_mode,
enable_tls_page_service_api,
// ------------------------------------------------------------
// fields that require additional validation or custom handling

View File

@@ -3253,7 +3253,7 @@ async fn ingest_aux_files(
modification
.put_file(&fname, content.as_bytes(), &ctx)
.await
.map_err(ApiError::InternalServerError)?;
.map_err(|e| ApiError::InternalServerError(e.into()))?;
}
modification
.commit(&ctx)

View File

@@ -27,7 +27,7 @@ use crate::context::RequestContext;
use crate::metrics::WAL_INGEST;
use crate::pgdatadir_mapping::*;
use crate::tenant::Timeline;
use crate::walingest::WalIngest;
use crate::walingest::{WalIngest, WalIngestErrorKind};
// Returns checkpoint LSN from controlfile
pub fn get_lsn_from_controlfile(path: &Utf8Path) -> Result<Lsn> {
@@ -157,9 +157,9 @@ async fn import_rel(
.put_rel_creation(rel, nblocks as u32, ctx)
.await
{
match e {
RelationError::AlreadyExists => {
debug!("Relation {} already exist. We must be extending it.", rel)
match e.kind {
WalIngestErrorKind::RelationAlreadyExists(rel) => {
debug!("Relation {rel} already exists. We must be extending it.")
}
_ => return Err(e.into()),
}

View File

@@ -1289,6 +1289,7 @@ pub(crate) enum StorageIoOperation {
Seek,
Fsync,
Metadata,
SetLen,
}
impl StorageIoOperation {
@@ -1303,6 +1304,7 @@ impl StorageIoOperation {
StorageIoOperation::Seek => "seek",
StorageIoOperation::Fsync => "fsync",
StorageIoOperation::Metadata => "metadata",
StorageIoOperation::SetLen => "set_len",
}
}
}

View File

@@ -105,6 +105,7 @@ pub fn spawn(
pg_auth: Option<Arc<SwappableJwtAuth>>,
perf_trace_dispatch: Option<Dispatch>,
tcp_listener: tokio::net::TcpListener,
tls_config: Option<Arc<rustls::ServerConfig>>,
) -> Listener {
let cancel = CancellationToken::new();
let libpq_ctx = RequestContext::todo_child(
@@ -124,6 +125,7 @@ pub fn spawn(
perf_trace_dispatch,
tcp_listener,
conf.pg_auth_type,
tls_config,
conf.page_service_pipelining.clone(),
libpq_ctx,
cancel.clone(),
@@ -181,6 +183,7 @@ pub async fn libpq_listener_main(
perf_trace_dispatch: Option<Dispatch>,
listener: tokio::net::TcpListener,
auth_type: AuthType,
tls_config: Option<Arc<rustls::ServerConfig>>,
pipelining_config: PageServicePipeliningConfig,
listener_ctx: RequestContext,
listener_cancel: CancellationToken,
@@ -223,6 +226,7 @@ pub async fn libpq_listener_main(
local_auth,
socket,
auth_type,
tls_config.clone(),
pipelining_config.clone(),
connection_ctx,
connections_cancel.child_token(),
@@ -264,6 +268,7 @@ async fn page_service_conn_main(
auth: Option<Arc<SwappableJwtAuth>>,
socket: tokio::net::TcpStream,
auth_type: AuthType,
tls_config: Option<Arc<rustls::ServerConfig>>,
pipelining_config: PageServicePipeliningConfig,
connection_ctx: RequestContext,
cancel: CancellationToken,
@@ -334,7 +339,8 @@ async fn page_service_conn_main(
cancel.clone(),
gate_guard,
);
let pgbackend = PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, None)?;
let pgbackend =
PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, tls_config)?;
match pgbackend.run(&mut conn_handler, &cancel).await {
Ok(()) => {

View File

@@ -9,8 +9,9 @@
use std::collections::{BTreeMap, HashMap, HashSet, hash_map};
use std::ops::{ControlFlow, Range};
use crate::PERF_TRACE_TARGET;
use anyhow::{Context, ensure};
use crate::walingest::{WalIngestError, WalIngestErrorKind};
use crate::{PERF_TRACE_TARGET, ensure_walingest};
use anyhow::Context;
use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum;
use itertools::Itertools;
@@ -136,12 +137,8 @@ impl From<PageReconstructError> for CalculateLogicalSizeError {
#[derive(Debug, thiserror::Error)]
pub enum RelationError {
#[error("Relation Already Exists")]
AlreadyExists,
#[error("invalid relnode")]
InvalidRelnode,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
///
@@ -1478,8 +1475,8 @@ impl DatadirModification<'_> {
}
/// Set the current lsn
pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
ensure!(
pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> Result<(), WalIngestError> {
ensure_walingest!(
lsn >= self.lsn,
"setting an older lsn {} than {} is not allowed",
lsn,
@@ -1578,7 +1575,7 @@ impl DatadirModification<'_> {
&mut self,
rel: RelTag,
ctx: &RequestContext,
) -> Result<u32, PageReconstructError> {
) -> Result<u32, WalIngestError> {
// Get current size and put rel creation if rel doesn't exist
//
// NOTE: we check the cache first even though get_rel_exists and get_rel_size would
@@ -1593,14 +1590,13 @@ impl DatadirModification<'_> {
.await?
{
// create it with 0 size initially, the logic below will extend it
self.put_rel_creation(rel, 0, ctx)
.await
.context("Relation Error")?;
self.put_rel_creation(rel, 0, ctx).await?;
Ok(0)
} else {
self.tline
Ok(self
.tline
.get_rel_size(rel, Version::Modified(self), ctx)
.await
.await?)
}
}
@@ -1637,11 +1633,14 @@ impl DatadirModification<'_> {
// TODO(vlad): remove this argument and replace the shard check with is_key_local
shard: &ShardIdentity,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let mut gaps_at_lsns = Vec::default();
for meta in batch.metadata.iter() {
let (rel, blkno) = Key::from_compact(meta.key()).to_rel_block()?;
let key = Key::from_compact(meta.key());
let (rel, blkno) = key
.to_rel_block()
.map_err(|_| WalIngestErrorKind::InvalidKey(key, meta.lsn()))?;
let new_nblocks = blkno + 1;
let old_nblocks = self.create_relation_if_required(rel, ctx).await?;
@@ -1683,8 +1682,8 @@ impl DatadirModification<'_> {
rel: RelTag,
blknum: BlockNumber,
rec: NeonWalRecord,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
) -> Result<(), WalIngestError> {
ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
Ok(())
}
@@ -1696,7 +1695,7 @@ impl DatadirModification<'_> {
segno: u32,
blknum: BlockNumber,
rec: NeonWalRecord,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
if !self.tline.tenant_shard_id.is_shard_zero() {
return Ok(());
}
@@ -1714,14 +1713,11 @@ impl DatadirModification<'_> {
rel: RelTag,
blknum: BlockNumber,
img: Bytes,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
) -> Result<(), WalIngestError> {
ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
let key = rel_block_to_key(rel, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
"the request contains data not supported by pageserver at {}",
key
);
Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
}
self.put(rel_block_to_key(rel, blknum), Value::Image(img));
Ok(())
@@ -1733,15 +1729,12 @@ impl DatadirModification<'_> {
segno: u32,
blknum: BlockNumber,
img: Bytes,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
assert!(self.tline.tenant_shard_id.is_shard_zero());
let key = slru_block_to_key(kind, segno, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
"the request contains data not supported by pageserver at {}",
key
);
Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
}
self.put(key, Value::Image(img));
Ok(())
@@ -1751,15 +1744,11 @@ impl DatadirModification<'_> {
&mut self,
rel: RelTag,
blknum: BlockNumber,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
) -> Result<(), WalIngestError> {
ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
let key = rel_block_to_key(rel, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
"the request contains data not supported by pageserver: {} @ {}",
key,
self.lsn
);
Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
}
let batch = self
@@ -1776,15 +1765,11 @@ impl DatadirModification<'_> {
kind: SlruKind,
segno: u32,
blknum: BlockNumber,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
assert!(self.tline.tenant_shard_id.is_shard_zero());
let key = slru_block_to_key(kind, segno, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
"the request contains data not supported by pageserver: {} @ {}",
key,
self.lsn
);
Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
}
let batch = self
@@ -1832,8 +1817,10 @@ impl DatadirModification<'_> {
dbnode: Oid,
img: Bytes,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let v2_enabled = self.maybe_enable_rel_size_v2()?;
) -> Result<(), WalIngestError> {
let v2_enabled = self
.maybe_enable_rel_size_v2()
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
// Add it to the directory (if it doesn't exist already)
let buf = self.get(DBDIR_KEY, ctx).await?;
@@ -1874,13 +1861,13 @@ impl DatadirModification<'_> {
xid: u64,
img: Bytes,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
// Add it to the directory entry
let dirbuf = self.get(TWOPHASEDIR_KEY, ctx).await?;
let newdirbuf = if self.tline.pg_version >= 17 {
let mut dir = TwoPhaseDirectoryV17::des(&dirbuf)?;
if !dir.xids.insert(xid) {
anyhow::bail!("twophase file for xid {} already exists", xid);
Err(WalIngestErrorKind::FileAlreadyExists(xid))?;
}
self.pending_directory_entries.push((
DirectoryKind::TwoPhase,
@@ -1891,7 +1878,7 @@ impl DatadirModification<'_> {
let xid = xid as u32;
let mut dir = TwoPhaseDirectory::des(&dirbuf)?;
if !dir.xids.insert(xid) {
anyhow::bail!("twophase file for xid {} already exists", xid);
Err(WalIngestErrorKind::FileAlreadyExists(xid.into()))?;
}
self.pending_directory_entries.push((
DirectoryKind::TwoPhase,
@@ -1909,22 +1896,22 @@ impl DatadirModification<'_> {
&mut self,
origin_id: RepOriginId,
origin_lsn: Lsn,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let key = repl_origin_key(origin_id);
self.put(key, Value::Image(origin_lsn.ser().unwrap().into()));
Ok(())
}
pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> anyhow::Result<()> {
pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> Result<(), WalIngestError> {
self.set_replorigin(origin_id, Lsn::INVALID).await
}
pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
pub fn put_control_file(&mut self, img: Bytes) -> Result<(), WalIngestError> {
self.put(CONTROLFILE_KEY, Value::Image(img));
Ok(())
}
pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
pub fn put_checkpoint(&mut self, img: Bytes) -> Result<(), WalIngestError> {
self.put(CHECKPOINT_KEY, Value::Image(img));
Ok(())
}
@@ -1934,7 +1921,7 @@ impl DatadirModification<'_> {
spcnode: Oid,
dbnode: Oid,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let total_blocks = self
.tline
.get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
@@ -1973,20 +1960,21 @@ impl DatadirModification<'_> {
rel: RelTag,
nblocks: BlockNumber,
ctx: &RequestContext,
) -> Result<(), RelationError> {
) -> Result<(), WalIngestError> {
if rel.relnode == 0 {
return Err(RelationError::InvalidRelnode);
Err(WalIngestErrorKind::LogicalError(anyhow::anyhow!(
"invalid relnode"
)))?;
}
// It's possible that this is the first rel for this db in this
// tablespace. Create the reldir entry for it if so.
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
.context("deserialize db")?;
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await?)?;
let dbdir_exists =
if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
// Didn't exist. Update dbdir
e.insert(false);
let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
let buf = DbDirectory::ser(&dbdir)?;
self.pending_directory_entries.push((
DirectoryKind::Db,
MetricsUpdate::Set(dbdir.dbdirs.len() as u64),
@@ -2003,27 +1991,25 @@ impl DatadirModification<'_> {
RelDirectory::default()
} else {
// reldir already exists, fetch it
RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
.context("deserialize db")?
RelDirectory::des(&self.get(rel_dir_key, ctx).await?)?
};
let v2_enabled = self.maybe_enable_rel_size_v2()?;
let v2_enabled = self
.maybe_enable_rel_size_v2()
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
if v2_enabled {
if rel_dir.rels.contains(&(rel.relnode, rel.forknum)) {
return Err(RelationError::AlreadyExists);
Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
}
let sparse_rel_dir_key =
rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum);
// check if the rel_dir_key exists in v2
let val = self
.sparse_get(sparse_rel_dir_key, ctx)
.await
.map_err(|e| RelationError::Other(e.into()))?;
let val = self.sparse_get(sparse_rel_dir_key, ctx).await?;
let val = RelDirExists::decode_option(val)
.map_err(|_| RelationError::Other(anyhow::anyhow!("invalid reldir key")))?;
.map_err(|_| WalIngestErrorKind::InvalidRelDirKey(sparse_rel_dir_key))?;
if val == RelDirExists::Exists {
return Err(RelationError::AlreadyExists);
Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
}
self.put(
sparse_rel_dir_key,
@@ -2039,9 +2025,7 @@ impl DatadirModification<'_> {
// will be key not found errors if we don't create an empty one for rel_size_v2.
self.put(
rel_dir_key,
Value::Image(Bytes::from(
RelDirectory::ser(&RelDirectory::default()).context("serialize")?,
)),
Value::Image(Bytes::from(RelDirectory::ser(&RelDirectory::default())?)),
);
}
self.pending_directory_entries
@@ -2049,7 +2033,7 @@ impl DatadirModification<'_> {
} else {
// Add the new relation to the rel directory entry, and write it back
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
return Err(RelationError::AlreadyExists);
Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
}
if !dbdir_exists {
self.pending_directory_entries
@@ -2059,9 +2043,7 @@ impl DatadirModification<'_> {
.push((DirectoryKind::Rel, MetricsUpdate::Add(1)));
self.put(
rel_dir_key,
Value::Image(Bytes::from(
RelDirectory::ser(&rel_dir).context("serialize")?,
)),
Value::Image(Bytes::from(RelDirectory::ser(&rel_dir)?)),
);
}
@@ -2086,8 +2068,8 @@ impl DatadirModification<'_> {
rel: RelTag,
nblocks: BlockNumber,
ctx: &RequestContext,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
) -> Result<(), WalIngestError> {
ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
if self
.tline
.get_rel_exists(rel, Version::Modified(self), ctx)
@@ -2117,8 +2099,8 @@ impl DatadirModification<'_> {
rel: RelTag,
nblocks: BlockNumber,
ctx: &RequestContext,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
) -> Result<(), WalIngestError> {
ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
// Put size
let size_key = rel_size_to_key(rel);
@@ -2142,8 +2124,10 @@ impl DatadirModification<'_> {
&mut self,
drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let v2_enabled = self.maybe_enable_rel_size_v2()?;
) -> Result<(), WalIngestError> {
let v2_enabled = self
.maybe_enable_rel_size_v2()
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
for ((spc_node, db_node), rel_tags) in drop_relations {
let dir_key = rel_dir_to_key(spc_node, db_node);
let buf = self.get(dir_key, ctx).await?;
@@ -2163,7 +2147,7 @@ impl DatadirModification<'_> {
let key =
rel_tag_sparse_key(spc_node, db_node, rel_tag.relnode, rel_tag.forknum);
let val = RelDirExists::decode_option(self.sparse_get(key, ctx).await?)
.map_err(|_| RelationError::Other(anyhow::anyhow!("invalid reldir key")))?;
.map_err(|_| WalIngestErrorKind::InvalidKey(key, self.lsn))?;
if val == RelDirExists::Exists {
self.pending_directory_entries
.push((DirectoryKind::RelV2, MetricsUpdate::Sub(1)));
@@ -2206,7 +2190,7 @@ impl DatadirModification<'_> {
segno: u32,
nblocks: BlockNumber,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
assert!(self.tline.tenant_shard_id.is_shard_zero());
// Add it to the directory entry
@@ -2215,7 +2199,7 @@ impl DatadirModification<'_> {
let mut dir = SlruSegmentDirectory::des(&buf)?;
if !dir.segments.insert(segno) {
anyhow::bail!("slru segment {kind:?}/{segno} already exists");
Err(WalIngestErrorKind::SlruAlreadyExists(kind, segno))?;
}
self.pending_directory_entries.push((
DirectoryKind::SlruSegment(kind),
@@ -2242,7 +2226,7 @@ impl DatadirModification<'_> {
kind: SlruKind,
segno: u32,
nblocks: BlockNumber,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
assert!(self.tline.tenant_shard_id.is_shard_zero());
// Put size
@@ -2258,7 +2242,7 @@ impl DatadirModification<'_> {
kind: SlruKind,
segno: u32,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
// Remove it from the directory entry
let dir_key = slru_dir_to_key(kind);
let buf = self.get(dir_key, ctx).await?;
@@ -2283,7 +2267,7 @@ impl DatadirModification<'_> {
}
/// Drop a relmapper file (pg_filenode.map)
pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> Result<(), WalIngestError> {
// TODO
Ok(())
}
@@ -2293,7 +2277,7 @@ impl DatadirModification<'_> {
&mut self,
xid: u64,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
// Remove it from the directory entry
let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
let newdirbuf = if self.tline.pg_version >= 17 {
@@ -2308,7 +2292,8 @@ impl DatadirModification<'_> {
));
Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
} else {
let xid: u32 = u32::try_from(xid)?;
let xid: u32 = u32::try_from(xid)
.map_err(|e| WalIngestErrorKind::LogicalError(anyhow::Error::from(e)))?;
let mut dir = TwoPhaseDirectory::des(&buf)?;
if !dir.xids.remove(&xid) {
@@ -2333,7 +2318,7 @@ impl DatadirModification<'_> {
path: &str,
content: &[u8],
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let key = aux_file::encode_aux_file_key(path);
// retrieve the key from the engine
let old_val = match self.get(key, ctx).await {
@@ -2342,7 +2327,7 @@ impl DatadirModification<'_> {
Err(e) => return Err(e.into()),
};
let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
aux_file::decode_file_value(old_val)?
aux_file::decode_file_value(old_val).map_err(WalIngestErrorKind::EncodeAuxFileError)?
} else {
Vec::new()
};
@@ -2387,7 +2372,8 @@ impl DatadirModification<'_> {
}
(None, true) => warn!("removing non-existing aux file: {}", path),
}
let new_val = aux_file::encode_file_value(&new_files)?;
let new_val = aux_file::encode_file_value(&new_files)
.map_err(WalIngestErrorKind::EncodeAuxFileError)?;
self.put(key, Value::Image(new_val.into()));
Ok(())

View File

@@ -11571,6 +11571,99 @@ mod tests {
Ok(())
}
#[cfg(feature = "testing")]
#[tokio::test]
async fn test_bottom_most_compation_redo_failure() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_bottom_most_compation_redo_failure").await?;
let (tenant, ctx) = harness.load().await;
fn get_key(id: u32) -> Key {
// using aux key here b/c they are guaranteed to be inside `collect_keyspace`.
let mut key = Key::from_hex("620000000033333333444444445500000000").unwrap();
key.field6 = id;
key
}
let img_layer = (0..10)
.map(|id| (get_key(id), Bytes::from(format!("value {id}@0x10"))))
.collect_vec();
let delta1 = vec![
(
get_key(1),
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_append("@0x20")),
),
(
get_key(1),
Lsn(0x24),
Value::WalRecord(NeonWalRecord::wal_append("@0x24")),
),
(
get_key(1),
Lsn(0x28),
// This record will fail to redo
Value::WalRecord(NeonWalRecord::wal_append_conditional("@0x28", "???")),
),
];
let tline = tenant
.create_test_timeline_with_layers(
TIMELINE_ID,
Lsn(0x10),
DEFAULT_PG_VERSION,
&ctx,
vec![], // in-memory layers
vec![DeltaLayerTestDesc::new_with_inferred_key_range(
Lsn(0x20)..Lsn(0x30),
delta1,
)], // delta layers
vec![(Lsn(0x10), img_layer)], // image layers
Lsn(0x50),
)
.await?;
{
tline
.applied_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x30))
.wait()
.await;
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
*guard = GcInfo {
retain_lsns: vec![],
cutoffs: GcCutoffs {
time: Lsn(0x30),
space: Lsn(0x30),
},
leases: Default::default(),
within_ancestor_pitr: false,
};
}
let cancel = CancellationToken::new();
// Compaction will fail, but should not fire any critical error.
// Gc-compaction currently cannot figure out what keys are not in the keyspace during the compaction
// process. It will always try to redo the logs it reads and if it doesn't work, fail the entire
// compaction job. Tracked in <https://github.com/neondatabase/neon/issues/10395>.
let res = tline
.compact_with_gc(
&cancel,
CompactOptions {
compact_key_range: None,
compact_lsn_range: None,
..Default::default()
},
&ctx,
)
.await;
assert!(res.is_err());
Ok(())
}
#[cfg(feature = "testing")]
#[tokio::test]
async fn test_synthetic_size_calculation_with_invisible_branches() -> anyhow::Result<()> {

View File

@@ -15,20 +15,23 @@
//! len >= 128: 1CCCXXXX XXXXXXXX XXXXXXXX XXXXXXXX
//!
use std::cmp::min;
use std::io::Error;
use std::sync::Arc;
use async_compression::Level;
use bytes::{BufMut, BytesMut};
use pageserver_api::models::ImageCompressionAlgorithm;
use tokio::io::AsyncWriteExt;
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
use tokio_epoll_uring::IoBuf;
use tokio_util::sync::CancellationToken;
use tracing::warn;
use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
use crate::tenant::block_io::BlockCursor;
use crate::virtual_file::IoBufferMut;
use crate::virtual_file::VirtualFile;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
use crate::virtual_file::owned_buffers_io::write::{BufferedWriter, FlushTaskError};
#[derive(Copy, Clone, Debug)]
pub struct CompressionInfo {
@@ -36,6 +39,14 @@ pub struct CompressionInfo {
pub compressed_size: Option<usize>,
}
#[derive(Debug, thiserror::Error)]
pub enum WriteBlobError {
#[error(transparent)]
Flush(FlushTaskError),
#[error("blob too large ({len} bytes)")]
BlobTooLarge { len: usize },
}
impl BlockCursor<'_> {
/// Read a blob into a new buffer.
pub async fn read_blob(
@@ -157,135 +168,62 @@ pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
/// A wrapper of `VirtualFile` that allows users to write blobs.
///
/// If a `BlobWriter` is dropped, the internal buffer will be
/// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
/// discarded. You need to call [`Self::into_inner`]
/// manually before dropping.
pub struct BlobWriter<const BUFFERED: bool> {
inner: VirtualFile,
offset: u64,
/// A buffer to save on write calls, only used if BUFFERED=true
buf: Vec<u8>,
pub struct BlobWriter {
/// We do tiny writes for the length headers; they need to be in an owned buffer;
io_buf: Option<BytesMut>,
writer: BufferedWriter<IoBufferMut, VirtualFile>,
offset: u64,
}
impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
pub fn new(inner: VirtualFile, start_offset: u64) -> Self {
Self {
inner,
offset: start_offset,
buf: Vec::with_capacity(Self::CAPACITY),
impl BlobWriter {
pub fn new(
file: Arc<VirtualFile>,
start_offset: u64,
gate: &utils::sync::gate::Gate,
cancel: CancellationToken,
ctx: &RequestContext,
flush_task_span: tracing::Span,
) -> anyhow::Result<Self> {
Ok(Self {
io_buf: Some(BytesMut::new()),
}
writer: BufferedWriter::new(
file,
start_offset,
|| IoBufferMut::with_capacity(Self::CAPACITY),
gate.enter()?,
cancel,
ctx,
flush_task_span,
),
offset: start_offset,
})
}
pub fn size(&self) -> u64 {
self.offset
}
const CAPACITY: usize = if BUFFERED { 64 * 1024 } else { 0 };
const CAPACITY: usize = 64 * 1024;
/// Writes the given buffer directly to the underlying `VirtualFile`.
/// You need to make sure that the internal buffer is empty, otherwise
/// data will be written in wrong order.
#[inline(always)]
async fn write_all_unbuffered<Buf: IoBuf + Send>(
&mut self,
src_buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<(), Error>) {
let (src_buf, res) = self.inner.write_all(src_buf, ctx).await;
let nbytes = match res {
Ok(nbytes) => nbytes,
Err(e) => return (src_buf, Err(e)),
};
self.offset += nbytes as u64;
(src_buf, Ok(()))
}
#[inline(always)]
/// Flushes the internal buffer to the underlying `VirtualFile`.
pub async fn flush_buffer(&mut self, ctx: &RequestContext) -> Result<(), Error> {
let buf = std::mem::take(&mut self.buf);
let (slice, res) = self.inner.write_all(buf.slice_len(), ctx).await;
res?;
let mut buf = slice.into_raw_slice().into_inner();
buf.clear();
self.buf = buf;
Ok(())
}
#[inline(always)]
/// Writes as much of `src_buf` into the internal buffer as it fits
fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize {
let remaining = Self::CAPACITY - self.buf.len();
let to_copy = src_buf.len().min(remaining);
self.buf.extend_from_slice(&src_buf[..to_copy]);
self.offset += to_copy as u64;
to_copy
}
/// Internal, possibly buffered, write function
/// Writes `src_buf` to the file at the current offset.
async fn write_all<Buf: IoBuf + Send>(
&mut self,
src_buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<(), Error>) {
let src_buf = src_buf.into_raw_slice();
let src_buf_bounds = src_buf.bounds();
let restore = move |src_buf_slice: Slice<_>| {
FullSlice::must_new(Slice::from_buf_bounds(
src_buf_slice.into_inner(),
src_buf_bounds,
))
};
) -> (FullSlice<Buf>, Result<(), FlushTaskError>) {
let res = self
.writer
// TODO: why are we taking a FullSlice if we're going to pass a borrow downstack?
// Can remove all the complexity around owned buffers upstack
.write_buffered_borrowed(&src_buf, ctx)
.await
.map(|len| {
self.offset += len as u64;
});
if !BUFFERED {
assert!(self.buf.is_empty());
return self
.write_all_unbuffered(FullSlice::must_new(src_buf), ctx)
.await;
}
let remaining = Self::CAPACITY - self.buf.len();
let src_buf_len = src_buf.bytes_init();
if src_buf_len == 0 {
return (restore(src_buf), Ok(()));
}
let mut src_buf = src_buf.slice(0..src_buf_len);
// First try to copy as much as we can into the buffer
if remaining > 0 {
let copied = self.write_into_buffer(&src_buf);
src_buf = src_buf.slice(copied..);
}
// Then, if the buffer is full, flush it out
if self.buf.len() == Self::CAPACITY {
if let Err(e) = self.flush_buffer(ctx).await {
return (restore(src_buf), Err(e));
}
}
// Finally, write the tail of src_buf:
// If it wholly fits into the buffer without
// completely filling it, then put it there.
// If not, write it out directly.
let src_buf = if !src_buf.is_empty() {
assert_eq!(self.buf.len(), 0);
if src_buf.len() < Self::CAPACITY {
let copied = self.write_into_buffer(&src_buf);
// We just verified above that src_buf fits into our internal buffer.
assert_eq!(copied, src_buf.len());
restore(src_buf)
} else {
let (src_buf, res) = self
.write_all_unbuffered(FullSlice::must_new(src_buf), ctx)
.await;
if let Err(e) = res {
return (src_buf, Err(e));
}
src_buf
}
} else {
restore(src_buf)
};
(src_buf, Ok(()))
(src_buf, res)
}
/// Write a blob of data. Returns the offset that it was written to,
@@ -294,7 +232,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
&mut self,
srcbuf: FullSlice<Buf>,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<u64, Error>) {
) -> (FullSlice<Buf>, Result<u64, WriteBlobError>) {
let (buf, res) = self
.write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled)
.await;
@@ -308,7 +246,10 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
srcbuf: FullSlice<Buf>,
ctx: &RequestContext,
algorithm: ImageCompressionAlgorithm,
) -> (FullSlice<Buf>, Result<(u64, CompressionInfo), Error>) {
) -> (
FullSlice<Buf>,
Result<(u64, CompressionInfo), WriteBlobError>,
) {
let offset = self.offset;
let mut compression_info = CompressionInfo {
written_compressed: false,
@@ -324,14 +265,16 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
if len < 128 {
// Short blob. Write a 1-byte length header
io_buf.put_u8(len as u8);
(self.write_all(io_buf.slice_len(), ctx).await, srcbuf)
let (slice, res) = self.write_all(io_buf.slice_len(), ctx).await;
let res = res.map_err(WriteBlobError::Flush);
((slice, res), srcbuf)
} else {
// Write a 4-byte length header
if len > MAX_SUPPORTED_BLOB_LEN {
return (
(
io_buf.slice_len(),
Err(Error::other(format!("blob too large ({len} bytes)"))),
Err(WriteBlobError::BlobTooLarge { len }),
),
srcbuf,
);
@@ -365,7 +308,9 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
assert_eq!(len_buf[0] & 0xf0, 0);
len_buf[0] |= high_bit_mask;
io_buf.extend_from_slice(&len_buf[..]);
(self.write_all(io_buf.slice_len(), ctx).await, srcbuf)
let (slice, res) = self.write_all(io_buf.slice_len(), ctx).await;
let res = res.map_err(WriteBlobError::Flush);
((slice, res), srcbuf)
}
}
.await;
@@ -380,33 +325,23 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
} else {
self.write_all(srcbuf, ctx).await
};
let res = res.map_err(WriteBlobError::Flush);
(srcbuf, res.map(|_| (offset, compression_info)))
}
}
impl BlobWriter<true> {
/// Access the underlying `VirtualFile`.
///
/// This function flushes the internal buffer before giving access
/// to the underlying `VirtualFile`.
pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<VirtualFile, Error> {
self.flush_buffer(ctx).await?;
Ok(self.inner)
}
/// Access the underlying `VirtualFile`.
///
/// Unlike [`into_inner`](Self::into_inner), this doesn't flush
/// the internal buffer before giving access.
pub fn into_inner_no_flush(self) -> VirtualFile {
self.inner
}
}
impl BlobWriter<false> {
/// Access the underlying `VirtualFile`.
pub fn into_inner(self) -> VirtualFile {
self.inner
/// The caller can use the `handle_tail` function to change the tail of the buffer before flushing it to disk.
/// The buffer will not be flushed to disk if handle_tail returns `None`.
pub async fn into_inner(
self,
handle_tail: impl FnMut(IoBufferMut) -> Option<IoBufferMut>,
) -> Result<VirtualFile, FlushTaskError> {
let (_, file) = self.writer.shutdown(handle_tail).await?;
Ok(file)
}
}
@@ -415,29 +350,33 @@ pub(crate) mod tests {
use camino::Utf8PathBuf;
use camino_tempfile::Utf8TempDir;
use rand::{Rng, SeedableRng};
use tracing::info_span;
use super::*;
use crate::context::DownloadBehavior;
use crate::task_mgr::TaskKind;
use crate::tenant::block_io::BlockReaderRef;
async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
round_trip_test_compressed::<BUFFERED>(blobs, false).await
async fn round_trip_test(blobs: &[Vec<u8>]) -> anyhow::Result<()> {
round_trip_test_compressed(blobs, false).await
}
pub(crate) async fn write_maybe_compressed<const BUFFERED: bool>(
pub(crate) async fn write_maybe_compressed(
blobs: &[Vec<u8>],
compression: bool,
ctx: &RequestContext,
) -> Result<(Utf8TempDir, Utf8PathBuf, Vec<u64>), Error> {
) -> anyhow::Result<(Utf8TempDir, Utf8PathBuf, Vec<u64>)> {
let temp_dir = camino_tempfile::tempdir()?;
let pathbuf = temp_dir.path().join("file");
let gate = utils::sync::gate::Gate::default();
let cancel = CancellationToken::new();
// Write part (in block to drop the file)
let mut offsets = Vec::new();
{
let file = VirtualFile::create(pathbuf.as_path(), ctx).await?;
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
let file = Arc::new(VirtualFile::create_v2(pathbuf.as_path(), ctx).await?);
let mut wtr =
BlobWriter::new(file, 0, &gate, cancel.clone(), ctx, info_span!("test")).unwrap();
for blob in blobs.iter() {
let (_, res) = if compression {
let res = wtr
@@ -454,26 +393,37 @@ pub(crate) mod tests {
let offs = res?;
offsets.push(offs);
}
// Write out one page worth of zeros so that we can
// read again with read_blk
let (_, res) = wtr.write_blob(vec![0; PAGE_SZ].slice_len(), ctx).await;
let offs = res?;
println!("Writing final blob at offs={offs}");
wtr.flush_buffer(ctx).await?;
wtr.into_inner(|mut buf| {
use crate::virtual_file::owned_buffers_io::write::Buffer;
let len = buf.pending();
let cap = buf.cap();
// pad zeros to the next io alignment requirement.
// TODO: this is actually padding to next PAGE_SZ multiple, but only if the buffer capacity is larger than that.
// We can't let the fact that we do direct IO, or the buffer capacity, dictate the on-disk format we write here.
// Need to find a better API that allows writing the format we intend to.
let count = len.next_multiple_of(PAGE_SZ).min(cap) - len;
buf.extend_with(0, count);
Some(buf)
})
.await?; // TODO: this here is the problem with the tests: we're dropping the tail end
}
Ok((temp_dir, pathbuf, offsets))
}
async fn round_trip_test_compressed<const BUFFERED: bool>(
async fn round_trip_test_compressed(
blobs: &[Vec<u8>],
compression: bool,
) -> Result<(), Error> {
) -> anyhow::Result<()> {
let ctx =
RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
let (_temp_dir, pathbuf, offsets) =
write_maybe_compressed::<BUFFERED>(blobs, compression, &ctx).await?;
write_maybe_compressed(blobs, compression, &ctx).await?;
let file = VirtualFile::open(pathbuf, &ctx).await?;
println!("Done writing!");
let file = VirtualFile::open_v2(pathbuf, &ctx).await?;
let rdr = BlockReaderRef::VirtualFile(&file);
let rdr = BlockCursor::new_with_compression(rdr, compression);
for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
@@ -492,30 +442,27 @@ pub(crate) mod tests {
}
#[tokio::test]
async fn test_one() -> Result<(), Error> {
async fn test_one() -> anyhow::Result<()> {
let blobs = &[vec![12, 21, 22]];
round_trip_test::<false>(blobs).await?;
round_trip_test::<true>(blobs).await?;
round_trip_test(blobs).await?;
Ok(())
}
#[tokio::test]
async fn test_hello_simple() -> Result<(), Error> {
async fn test_hello_simple() -> anyhow::Result<()> {
let blobs = &[
vec![0, 1, 2, 3],
b"Hello, World!".to_vec(),
Vec::new(),
b"foobar".to_vec(),
];
round_trip_test::<false>(blobs).await?;
round_trip_test::<true>(blobs).await?;
round_trip_test_compressed::<false>(blobs, true).await?;
round_trip_test_compressed::<true>(blobs, true).await?;
round_trip_test(blobs).await?;
round_trip_test_compressed(blobs, true).await?;
Ok(())
}
#[tokio::test]
async fn test_really_big_array() -> Result<(), Error> {
async fn test_really_big_array() -> anyhow::Result<()> {
let blobs = &[
b"test".to_vec(),
random_array(10 * PAGE_SZ),
@@ -524,25 +471,22 @@ pub(crate) mod tests {
vec![0xf3; 24 * PAGE_SZ],
b"foobar".to_vec(),
];
round_trip_test::<false>(blobs).await?;
round_trip_test::<true>(blobs).await?;
round_trip_test_compressed::<false>(blobs, true).await?;
round_trip_test_compressed::<true>(blobs, true).await?;
round_trip_test(blobs).await?;
round_trip_test_compressed(blobs, true).await?;
Ok(())
}
#[tokio::test]
async fn test_arrays_inc() -> Result<(), Error> {
async fn test_arrays_inc() -> anyhow::Result<()> {
let blobs = (0..PAGE_SZ / 8)
.map(|v| random_array(v * 16))
.collect::<Vec<_>>();
round_trip_test::<false>(&blobs).await?;
round_trip_test::<true>(&blobs).await?;
round_trip_test(&blobs).await?;
Ok(())
}
#[tokio::test]
async fn test_arrays_random_size() -> Result<(), Error> {
async fn test_arrays_random_size() -> anyhow::Result<()> {
let mut rng = rand::rngs::StdRng::seed_from_u64(42);
let blobs = (0..1024)
.map(|_| {
@@ -554,20 +498,18 @@ pub(crate) mod tests {
random_array(sz.into())
})
.collect::<Vec<_>>();
round_trip_test::<false>(&blobs).await?;
round_trip_test::<true>(&blobs).await?;
round_trip_test(&blobs).await?;
Ok(())
}
#[tokio::test]
async fn test_arrays_page_boundary() -> Result<(), Error> {
async fn test_arrays_page_boundary() -> anyhow::Result<()> {
let blobs = &[
random_array(PAGE_SZ - 4),
random_array(PAGE_SZ - 4),
random_array(PAGE_SZ - 4),
];
round_trip_test::<false>(blobs).await?;
round_trip_test::<true>(blobs).await?;
round_trip_test(blobs).await?;
Ok(())
}
}

View File

@@ -4,14 +4,12 @@
use std::ops::Deref;
use bytes::Bytes;
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
use crate::context::RequestContext;
use crate::page_cache::{self, FileId, PAGE_SZ, PageReadGuard, PageWriteGuard, ReadBufResult};
#[cfg(test)]
use crate::virtual_file::IoBufferMut;
use crate::virtual_file::VirtualFile;
use crate::virtual_file::{IoBuffer, VirtualFile};
/// This is implemented by anything that can read 8 kB (PAGE_SZ)
/// blocks, using the page cache
@@ -247,17 +245,17 @@ pub trait BlockWriter {
/// 'buf' must be of size PAGE_SZ. Returns the block number the page was
/// written to.
///
fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error>;
fn write_blk(&mut self, buf: IoBuffer) -> Result<u32, std::io::Error>;
}
///
/// A simple in-memory buffer of blocks.
///
pub struct BlockBuf {
pub blocks: Vec<Bytes>,
pub blocks: Vec<IoBuffer>,
}
impl BlockWriter for BlockBuf {
fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error> {
fn write_blk(&mut self, buf: IoBuffer) -> Result<u32, std::io::Error> {
assert!(buf.len() == PAGE_SZ);
let blknum = self.blocks.len();
self.blocks.push(buf);

View File

@@ -25,7 +25,7 @@ use std::{io, result};
use async_stream::try_stream;
use byteorder::{BE, ReadBytesExt};
use bytes::{BufMut, Bytes, BytesMut};
use bytes::BufMut;
use either::Either;
use futures::{Stream, StreamExt};
use hex;
@@ -34,6 +34,7 @@ use tracing::error;
use crate::context::RequestContext;
use crate::tenant::block_io::{BlockReader, BlockWriter};
use crate::virtual_file::{IoBuffer, IoBufferMut, owned_buffers_io::write::Buffer};
// The maximum size of a value stored in the B-tree. 5 bytes is enough currently.
pub const VALUE_SZ: usize = 5;
@@ -787,12 +788,12 @@ impl<const L: usize> BuildNode<L> {
///
/// Serialize the node to on-disk format.
///
fn pack(&self) -> Bytes {
fn pack(&self) -> IoBuffer {
assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
assert!(self.num_children > 0);
let mut buf = BytesMut::new();
let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
buf.put_u16(self.num_children);
buf.put_u8(self.level);
@@ -805,7 +806,7 @@ impl<const L: usize> BuildNode<L> {
assert!(buf.len() == self.size);
assert!(buf.len() <= PAGE_SZ);
buf.resize(PAGE_SZ, 0);
buf.extend_with(0, PAGE_SZ - buf.len());
buf.freeze()
}
@@ -839,7 +840,7 @@ pub(crate) mod tests {
#[derive(Clone, Default)]
pub(crate) struct TestDisk {
blocks: Vec<Bytes>,
blocks: Vec<IoBuffer>,
}
impl TestDisk {
fn new() -> Self {
@@ -857,7 +858,7 @@ pub(crate) mod tests {
}
}
impl BlockWriter for &mut TestDisk {
fn write_blk(&mut self, buf: Bytes) -> io::Result<u32> {
fn write_blk(&mut self, buf: IoBuffer) -> io::Result<u32> {
let blknum = self.blocks.len();
self.blocks.push(buf);
Ok(blknum as u32)

View File

@@ -75,6 +75,7 @@ impl EphemeralFile {
bytes_written: 0,
buffered_writer: owned_buffers_io::write::BufferedWriter::new(
file,
0,
|| IoBufferMut::with_capacity(TAIL_SZ),
gate.enter()?,
cancel.child_token(),

View File

@@ -32,12 +32,14 @@ use super::{
remote_tenant_manifest_prefix, remote_tenant_path,
};
use crate::TEMP_FILE_SUFFIX;
use crate::assert_u64_eq_usize::UsizeIsU64;
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::span::{
debug_assert_current_span_has_tenant_and_timeline_id, debug_assert_current_span_has_tenant_id,
};
use crate::tenant::Generation;
use crate::tenant::disk_btree::PAGE_SZ;
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
use crate::tenant::storage_layer::LayerName;
use crate::virtual_file::{MaybeFatalIo, VirtualFile, on_fatal_io_error};
@@ -227,6 +229,7 @@ async fn download_object(
let mut buffered = owned_buffers_io::write::BufferedWriter::<IoBufferMut, _>::new(
destination_file,
0,
|| IoBufferMut::with_capacity(super::BUFFER_SIZE),
gate.enter().map_err(|_| DownloadError::Cancelled)?,
cancel.child_token(),
@@ -251,13 +254,41 @@ async fn download_object(
FlushTaskError::Cancelled => DownloadError::Cancelled,
})?;
}
let inner = buffered
.flush_and_into_inner(ctx)
let mut pad_amount = None;
let (bytes_amount, destination_file) = buffered
.shutdown(|mut buf| {
use crate::virtual_file::owned_buffers_io::write::Buffer;
let len = buf.pending();
let cap = buf.cap();
// pad zeros to the next io alignment requirement.
// TODO: this is actually padding to next PAGE_SZ multiple, but only if the buffer capacity is larger than that.
// We can't let the fact that we do direct IO, or the buffer capacity, dictate the on-disk format we write here.
// Need to find a better API that allows writing the format we intend to.
let count = len.next_multiple_of(PAGE_SZ).min(cap) - len;
pad_amount = Some(count);
buf.extend_with(0, count);
Some(buf)
})
.await
.map_err(|e| match e {
FlushTaskError::Cancelled => DownloadError::Cancelled,
})?;
Ok(inner)
let pad_amount = pad_amount.expect("shutdown always invokes the closure").into_u64();
let set_len_arg = bytes_amount - pad_amount;
destination_file
.set_len(set_len_arg)
.await
.maybe_fatal_err("download_object set_len")
.with_context(|| {
format!("set len for file at {dst_path}: 0x{set_len_arg:x} = 0x{bytes_amount:x} - 0x{pad_amount:x}")
})
.map_err(DownloadError::Other)?;
Ok((set_len_arg, destination_file))
}
.await?;

View File

@@ -1521,12 +1521,11 @@ async fn load_heatmap(
path: &Utf8PathBuf,
ctx: &RequestContext,
) -> Result<Option<HeatMapTenant>, anyhow::Error> {
let mut file = match VirtualFile::open(path, ctx).await {
let st = match VirtualFile::read_to_string(path, ctx).await {
Ok(file) => file,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(e) => Err(e)?,
};
let st = file.read_to_string(ctx).await?;
let htm = serde_json::from_str(&st)?;
Ok(Some(htm))
}

View File

@@ -5,6 +5,7 @@ use std::sync::Arc;
use bytes::Bytes;
use pageserver_api::key::{KEY_SIZE, Key};
use pageserver_api::value::Value;
use tokio_util::sync::CancellationToken;
use utils::id::TimelineId;
use utils::lsn::Lsn;
use utils::shard::TenantShardId;
@@ -179,7 +180,7 @@ impl BatchLayerWriter {
/// An image writer that takes images and produces multiple image layers.
#[must_use]
pub struct SplitImageLayerWriter {
pub struct SplitImageLayerWriter<'a> {
inner: ImageLayerWriter,
target_layer_size: u64,
lsn: Lsn,
@@ -188,9 +189,12 @@ pub struct SplitImageLayerWriter {
tenant_shard_id: TenantShardId,
batches: BatchLayerWriter,
start_key: Key,
gate: &'a utils::sync::gate::Gate,
cancel: CancellationToken,
}
impl SplitImageLayerWriter {
impl<'a> SplitImageLayerWriter<'a> {
#[allow(clippy::too_many_arguments)]
pub async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
@@ -198,6 +202,8 @@ impl SplitImageLayerWriter {
start_key: Key,
lsn: Lsn,
target_layer_size: u64,
gate: &'a utils::sync::gate::Gate,
cancel: CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
Ok(Self {
@@ -208,6 +214,8 @@ impl SplitImageLayerWriter {
tenant_shard_id,
&(start_key..Key::MAX),
lsn,
gate,
cancel.clone(),
ctx,
)
.await?,
@@ -217,6 +225,8 @@ impl SplitImageLayerWriter {
batches: BatchLayerWriter::new(conf).await?,
lsn,
start_key,
gate,
cancel,
})
}
@@ -239,6 +249,8 @@ impl SplitImageLayerWriter {
self.tenant_shard_id,
&(key..Key::MAX),
self.lsn,
self.gate,
self.cancel.clone(),
ctx,
)
.await?;
@@ -291,7 +303,7 @@ impl SplitImageLayerWriter {
/// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
/// will split them into multiple files based on size.
#[must_use]
pub struct SplitDeltaLayerWriter {
pub struct SplitDeltaLayerWriter<'a> {
inner: Option<(Key, DeltaLayerWriter)>,
target_layer_size: u64,
conf: &'static PageServerConf,
@@ -300,15 +312,19 @@ pub struct SplitDeltaLayerWriter {
lsn_range: Range<Lsn>,
last_key_written: Key,
batches: BatchLayerWriter,
gate: &'a utils::sync::gate::Gate,
cancel: CancellationToken,
}
impl SplitDeltaLayerWriter {
impl<'a> SplitDeltaLayerWriter<'a> {
pub async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
lsn_range: Range<Lsn>,
target_layer_size: u64,
gate: &'a utils::sync::gate::Gate,
cancel: CancellationToken,
) -> anyhow::Result<Self> {
Ok(Self {
target_layer_size,
@@ -319,6 +335,8 @@ impl SplitDeltaLayerWriter {
lsn_range,
last_key_written: Key::MIN,
batches: BatchLayerWriter::new(conf).await?,
gate,
cancel,
})
}
@@ -344,6 +362,8 @@ impl SplitDeltaLayerWriter {
self.tenant_shard_id,
key,
self.lsn_range.clone(),
self.gate,
self.cancel.clone(),
ctx,
)
.await?,
@@ -362,6 +382,8 @@ impl SplitDeltaLayerWriter {
self.tenant_shard_id,
key,
self.lsn_range.clone(),
self.gate,
self.cancel.clone(),
ctx,
)
.await?;
@@ -469,6 +491,8 @@ mod tests {
get_key(0),
Lsn(0x18),
4 * 1024 * 1024,
&tline.gate,
tline.cancel.clone(),
&ctx,
)
.await
@@ -480,6 +504,8 @@ mod tests {
tenant.tenant_shard_id,
Lsn(0x18)..Lsn(0x20),
4 * 1024 * 1024,
&tline.gate,
tline.cancel.clone(),
)
.await
.unwrap();
@@ -546,6 +572,8 @@ mod tests {
get_key(0),
Lsn(0x18),
4 * 1024 * 1024,
&tline.gate,
tline.cancel.clone(),
&ctx,
)
.await
@@ -556,6 +584,8 @@ mod tests {
tenant.tenant_shard_id,
Lsn(0x18)..Lsn(0x20),
4 * 1024 * 1024,
&tline.gate,
tline.cancel.clone(),
)
.await
.unwrap();
@@ -643,6 +673,8 @@ mod tests {
get_key(0),
Lsn(0x18),
4 * 1024,
&tline.gate,
tline.cancel.clone(),
&ctx,
)
.await
@@ -654,6 +686,8 @@ mod tests {
tenant.tenant_shard_id,
Lsn(0x18)..Lsn(0x20),
4 * 1024,
&tline.gate,
tline.cancel.clone(),
)
.await
.unwrap();
@@ -730,6 +764,8 @@ mod tests {
tenant.tenant_shard_id,
Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
4 * 1024 * 1024,
&tline.gate,
tline.cancel.clone(),
)
.await
.unwrap();

View File

@@ -29,11 +29,11 @@
//!
use std::collections::{HashMap, VecDeque};
use std::fs::File;
use std::io::SeekFrom;
use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use anyhow::{Context, Result, bail, ensure};
use camino::{Utf8Path, Utf8PathBuf};
@@ -45,13 +45,13 @@ use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::ImageCompressionAlgorithm;
use pageserver_api::shard::TenantShardId;
use pageserver_api::value::Value;
use rand::Rng;
use rand::distributions::Alphanumeric;
use serde::{Deserialize, Serialize};
use tokio::sync::OnceCell;
use tokio_epoll_uring::IoBuf;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::bin_ser::BeSer;
use utils::bin_ser::SerializeError;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
@@ -74,7 +74,8 @@ use crate::tenant::vectored_blob_io::{
VectoredReadPlanner,
};
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::virtual_file::owned_buffers_io::write::Buffer;
use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
///
@@ -112,6 +113,15 @@ impl From<&DeltaLayer> for Summary {
}
impl Summary {
/// Serializes the summary header into an aligned buffer of lenth `PAGE_SZ`.
pub fn ser_into_page(&self) -> Result<IoBuffer, SerializeError> {
let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
Self::ser_into(self, &mut buf)?;
// Pad zeroes to the buffer so the length is a multiple of the alignment.
buf.extend_with(0, buf.capacity() - buf.len());
Ok(buf.freeze())
}
pub(super) fn expected(
tenant_id: TenantId,
timeline_id: TimelineId,
@@ -287,19 +297,19 @@ impl DeltaLayer {
key_start: Key,
lsn_range: &Range<Lsn>,
) -> Utf8PathBuf {
let rand_string: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(8)
.map(char::from)
.collect();
// Never reuse a filename in the lifetime of a pageserver process so that we need
// not worry about laggard Drop impl's async unlink hitting an already reused filename.
static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
let filename_disambiguator =
NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
conf.timeline_path(tenant_shard_id, timeline_id)
.join(format!(
"{}-XXX__{:016X}-{:016X}.{}.{}",
"{}-XXX__{:016X}-{:016X}.{:x}.{}",
key_start,
u64::from(lsn_range.start),
u64::from(lsn_range.end),
rand_string,
filename_disambiguator,
TEMP_FILE_SUFFIX,
))
}
@@ -390,22 +400,27 @@ struct DeltaLayerWriterInner {
tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
blob_writer: BlobWriter<true>,
blob_writer: BlobWriter,
// Number of key-lsns in the layer.
num_keys: usize,
_gate_guard: utils::sync::gate::GateGuard,
}
impl DeltaLayerWriterInner {
///
/// Start building a new delta layer.
///
#[allow(clippy::too_many_arguments)]
async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
key_start: Key,
lsn_range: Range<Lsn>,
gate: &utils::sync::gate::Gate,
cancel: CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
// Create the file initially with a temporary filename. We don't know
@@ -417,10 +432,17 @@ impl DeltaLayerWriterInner {
let path =
DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
let mut file = VirtualFile::create(&path, ctx).await?;
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
let file = Arc::new(VirtualFile::create_v2(&path, ctx).await?);
// Start at PAGE_SZ, make room for the header block
let blob_writer = BlobWriter::new(
file,
PAGE_SZ as u64,
gate,
cancel,
ctx,
info_span!(parent: None, "delta_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path),
)?;
// Initialize the b-tree index builder
let block_buf = BlockBuf::new();
@@ -435,6 +457,7 @@ impl DeltaLayerWriterInner {
tree: tree_builder,
blob_writer,
num_keys: 0,
_gate_guard: gate.enter()?,
})
}
@@ -530,15 +553,33 @@ impl DeltaLayerWriterInner {
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
let mut file = self.blob_writer.into_inner(ctx).await?;
let file = self
.blob_writer
.into_inner(|mut buf| {
let len = buf.pending();
let cap = buf.cap();
// pad zeros to the next io alignment requirement.
// TODO: this is actually padding to next PAGE_SZ multiple, but only if the buffer capacity is larger than that.
// We can't let the fact that we do direct IO, or the buffer capacity, dictate the on-disk format we write here.
// Need to find a better API that allows writing the format we intend to.
let count = len.next_multiple_of(PAGE_SZ).min(cap) - len;
buf.extend_with(0, count);
Some(buf)
})
.await?;
// Write out the index
let (index_root_blk, block_buf) = self.tree.finish()?;
file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
.await?;
let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
// TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
// Should we just replace BlockBuf::blocks with one big buffer
for buf in block_buf.blocks {
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
res?;
offset += PAGE_SZ as u64;
}
assert!(self.lsn_range.start < self.lsn_range.end);
// Fill in the summary on blk 0
@@ -553,11 +594,9 @@ impl DeltaLayerWriterInner {
index_root_blk,
};
let mut buf = Vec::with_capacity(PAGE_SZ);
// TODO: could use smallvec here but it's a pain with Slice<T>
Summary::ser_into(&summary, &mut buf)?;
file.seek(SeekFrom::Start(0)).await?;
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
// Writes summary at the first block (offset 0).
let buf = summary.ser_into_page()?;
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
res?;
let metadata = file
@@ -628,12 +667,15 @@ impl DeltaLayerWriter {
///
/// Start building a new delta layer.
///
#[allow(clippy::too_many_arguments)]
pub async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
key_start: Key,
lsn_range: Range<Lsn>,
gate: &utils::sync::gate::Gate,
cancel: CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
Ok(Self {
@@ -644,6 +686,8 @@ impl DeltaLayerWriter {
tenant_shard_id,
key_start,
lsn_range,
gate,
cancel,
ctx,
)
.await?,
@@ -719,12 +763,33 @@ impl DeltaLayerWriter {
impl Drop for DeltaLayerWriter {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
// We want to remove the virtual file here, so it's fine to not
// having completely flushed unwritten data.
let vfile = inner.blob_writer.into_inner_no_flush();
vfile.remove();
}
let Some(inner) = self.inner.take() else {
return;
};
tokio::spawn(async move {
let DeltaLayerWriterInner {
blob_writer,
_gate_guard,
..
} = inner;
let vfile = match blob_writer.into_inner(|_| None).await {
Ok(vfile) => vfile,
Err(e) => {
error!(err=%e, "failed to remove delta layer writer file");
drop(_gate_guard);
return;
}
};
if let Err(e) = std::fs::remove_file(vfile.path())
.maybe_fatal_err("failed to remove the virtual file")
{
error!(err=%e, path=%vfile.path(), "failed to remove delta layer writer file");
}
drop(_gate_guard);
});
}
}
@@ -751,7 +816,7 @@ impl DeltaLayer {
where
F: Fn(Summary) -> Summary,
{
let mut file = VirtualFile::open_with_options(
let file = VirtualFile::open_with_options_v2(
path,
virtual_file::OpenOptions::new().read(true).write(true),
ctx,
@@ -768,11 +833,8 @@ impl DeltaLayer {
let new_summary = rewrite(actual_summary);
let mut buf = Vec::with_capacity(PAGE_SZ);
// TODO: could use smallvec here, but it's a pain with Slice<T>
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
file.seek(SeekFrom::Start(0)).await?;
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
let buf = new_summary.ser_into_page().context("serialize")?;
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
res?;
Ok(())
}
@@ -1600,8 +1662,8 @@ pub(crate) mod test {
use bytes::Bytes;
use itertools::MinMaxResult;
use pageserver_api::value::Value;
use rand::RngCore;
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
use rand::{Rng, RngCore};
use super::*;
use crate::DEFAULT_PG_VERSION;
@@ -1885,6 +1947,8 @@ pub(crate) mod test {
harness.tenant_shard_id,
entries_meta.key_range.start,
entries_meta.lsn_range.clone(),
&timeline.gate,
timeline.cancel.clone(),
&ctx,
)
.await?;
@@ -2079,6 +2143,8 @@ pub(crate) mod test {
tenant.tenant_shard_id,
Key::MIN,
Lsn(0x11)..truncate_at,
&branch.gate,
branch.cancel.clone(),
ctx,
)
.await
@@ -2213,6 +2279,8 @@ pub(crate) mod test {
tenant.tenant_shard_id,
*key_start,
(*lsn_min)..lsn_end,
&tline.gate,
tline.cancel.clone(),
ctx,
)
.await?;

View File

@@ -27,11 +27,11 @@
//! actual page images are stored in the "values" part.
use std::collections::{HashMap, VecDeque};
use std::fs::File;
use std::io::SeekFrom;
use std::ops::Range;
use std::os::unix::prelude::FileExt;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use anyhow::{Context, Result, bail, ensure};
use bytes::Bytes;
@@ -43,13 +43,13 @@ use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use pageserver_api::value::Value;
use rand::Rng;
use rand::distributions::Alphanumeric;
use serde::{Deserialize, Serialize};
use tokio::sync::OnceCell;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::bin_ser::BeSer;
use utils::bin_ser::SerializeError;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
@@ -72,7 +72,8 @@ use crate::tenant::vectored_blob_io::{
VectoredReadPlanner,
};
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::virtual_file::owned_buffers_io::write::Buffer;
use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
///
@@ -111,6 +112,15 @@ impl From<&ImageLayer> for Summary {
}
impl Summary {
/// Serializes the summary header into an aligned buffer of lenth `PAGE_SZ`.
pub fn ser_into_page(&self) -> Result<IoBuffer, SerializeError> {
let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
Self::ser_into(self, &mut buf)?;
// Pad zeroes to the buffer so the length is a multiple of the alignment.
buf.extend_with(0, buf.capacity() - buf.len());
Ok(buf.freeze())
}
pub(super) fn expected(
tenant_id: TenantId,
timeline_id: TimelineId,
@@ -251,14 +261,17 @@ impl ImageLayer {
tenant_shard_id: TenantShardId,
fname: &ImageLayerName,
) -> Utf8PathBuf {
let rand_string: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(8)
.map(char::from)
.collect();
// Never reuse a filename in the lifetime of a pageserver process so that we need
// not worry about laggard Drop impl's async unlink hitting an already reused filename.
static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
let filename_disambiguator =
NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
conf.timeline_path(&tenant_shard_id, &timeline_id)
.join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
.join(format!(
"{fname}.{:x}.{TEMP_FILE_SUFFIX}",
filename_disambiguator
))
}
///
@@ -348,7 +361,7 @@ impl ImageLayer {
where
F: Fn(Summary) -> Summary,
{
let mut file = VirtualFile::open_with_options(
let file = VirtualFile::open_with_options_v2(
path,
virtual_file::OpenOptions::new().read(true).write(true),
ctx,
@@ -365,11 +378,8 @@ impl ImageLayer {
let new_summary = rewrite(actual_summary);
let mut buf = Vec::with_capacity(PAGE_SZ);
// TODO: could use smallvec here but it's a pain with Slice<T>
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
file.seek(SeekFrom::Start(0)).await?;
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
let buf = new_summary.ser_into_page().context("serialize")?;
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
res?;
Ok(())
}
@@ -737,23 +747,28 @@ struct ImageLayerWriterInner {
// Number of keys in the layer.
num_keys: usize,
blob_writer: BlobWriter<false>,
blob_writer: BlobWriter,
tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
#[cfg(feature = "testing")]
last_written_key: Key,
_gate_guard: utils::sync::gate::GateGuard,
}
impl ImageLayerWriterInner {
///
/// Start building a new image layer.
///
#[allow(clippy::too_many_arguments)]
async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
key_range: &Range<Key>,
lsn: Lsn,
gate: &utils::sync::gate::Gate,
cancel: CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
// Create the file initially with a temporary filename.
@@ -768,19 +783,28 @@ impl ImageLayerWriterInner {
},
);
trace!("creating image layer {}", path);
let mut file = {
VirtualFile::open_with_options(
&path,
virtual_file::OpenOptions::new()
.write(true)
.create_new(true),
ctx,
let file = {
Arc::new(
VirtualFile::open_with_options_v2(
&path,
virtual_file::OpenOptions::new()
.write(true)
.create_new(true),
ctx,
)
.await?,
)
.await?
};
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
// Start at `PAGE_SZ` to make room for the header block.
let blob_writer = BlobWriter::new(
file,
PAGE_SZ as u64,
gate,
cancel,
ctx,
info_span!(parent: None, "image_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path),
)?;
// Initialize the b-tree index builder
let block_buf = BlockBuf::new();
@@ -801,6 +825,7 @@ impl ImageLayerWriterInner {
num_keys: 0,
#[cfg(feature = "testing")]
last_written_key: Key::MIN,
_gate_guard: gate.enter()?,
};
Ok(writer)
@@ -886,15 +911,30 @@ impl ImageLayerWriterInner {
crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen);
crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
let mut file = self.blob_writer.into_inner();
let file = self
.blob_writer
.into_inner(|mut buf| {
let len = buf.pending();
let cap = buf.cap();
// pad zeros to the next io alignment requirement.
let count = len.next_multiple_of(PAGE_SZ).min(cap) - len;
buf.extend_with(0, count);
Some(buf)
})
.await?;
// Write out the index
file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
.await?;
let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
let (index_root_blk, block_buf) = self.tree.finish()?;
// TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
// Should we just replace BlockBuf::blocks with one big buffer?
for buf in block_buf.blocks {
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
res?;
offset += PAGE_SZ as u64;
}
let final_key_range = if let Some(end_key) = end_key {
@@ -915,11 +955,9 @@ impl ImageLayerWriterInner {
index_root_blk,
};
let mut buf = Vec::with_capacity(PAGE_SZ);
// TODO: could use smallvec here but it's a pain with Slice<T>
Summary::ser_into(&summary, &mut buf)?;
file.seek(SeekFrom::Start(0)).await?;
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
// Writes summary at the first block (offset 0).
let buf = summary.ser_into_page()?;
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
res?;
let metadata = file
@@ -988,18 +1026,30 @@ impl ImageLayerWriter {
///
/// Start building a new image layer.
///
#[allow(clippy::too_many_arguments)]
pub async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
key_range: &Range<Key>,
lsn: Lsn,
gate: &utils::sync::gate::Gate,
cancel: CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<ImageLayerWriter> {
Ok(Self {
inner: Some(
ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn, ctx)
.await?,
ImageLayerWriterInner::new(
conf,
timeline_id,
tenant_shard_id,
key_range,
lsn,
gate,
cancel,
ctx,
)
.await?,
),
})
}
@@ -1050,9 +1100,33 @@ impl ImageLayerWriter {
impl Drop for ImageLayerWriter {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
inner.blob_writer.into_inner().remove();
}
let Some(inner) = self.inner.take() else {
return;
};
tokio::spawn(async move {
let ImageLayerWriterInner {
blob_writer,
_gate_guard,
..
} = inner;
let vfile = match blob_writer.into_inner(|_| None).await {
Ok(vfile) => vfile,
Err(e) => {
error!(err=%e, "failed to remove image layer writer file");
drop(_gate_guard);
return;
}
};
if let Err(e) = std::fs::remove_file(vfile.path())
.maybe_fatal_err("failed to remove the virtual file")
{
error!(err=%e, path=%vfile.path(), "failed to remove image layer writer file");
}
drop(_gate_guard);
});
}
}
@@ -1192,7 +1266,7 @@ mod test {
// This key range contains several 0x8000 page stripes, only one of which belongs to shard zero
let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
let input_end = Key::from_hex("000000067f00000001000000ae0000002000").unwrap();
let range = input_start..input_end;
// Build an image layer to filter
@@ -1203,6 +1277,8 @@ mod test {
harness.tenant_shard_id,
&range,
lsn,
&timeline.gate,
timeline.cancel.clone(),
&ctx,
)
.await
@@ -1235,7 +1311,7 @@ mod test {
let shard_identity = ShardIdentity::new(
ShardNumber(shard_number),
shard_count,
ShardStripeSize(0x8000),
ShardStripeSize(0x800),
)
.unwrap();
let harness = TenantHarness::create_custom(
@@ -1268,6 +1344,8 @@ mod test {
harness.tenant_shard_id,
&range,
lsn,
&timeline.gate,
timeline.cancel.clone(),
&ctx,
)
.await
@@ -1287,12 +1365,12 @@ mod test {
// This exact size and those below will need updating as/when the layer encoding changes, but
// should be deterministic for a given version of the format, as we used no randomness generating the input.
assert_eq!(original_size, 1597440);
assert_eq!(original_size, 122880);
match shard_number {
0 => {
// We should have written out just one stripe for our shard identity
assert_eq!(wrote_keys, 0x8000);
assert_eq!(wrote_keys, 0x800);
let replacement = replacement.unwrap();
// We should have dropped some of the data
@@ -1300,7 +1378,7 @@ mod test {
assert!(replacement.metadata().file_size > 0);
// Assert that we dropped ~3/4 of the data.
assert_eq!(replacement.metadata().file_size, 417792);
assert_eq!(replacement.metadata().file_size, 49152);
}
1 => {
// Shard 1 has no keys in our input range
@@ -1309,19 +1387,19 @@ mod test {
}
2 => {
// Shard 2 has one stripes in the input range
assert_eq!(wrote_keys, 0x8000);
assert_eq!(wrote_keys, 0x800);
let replacement = replacement.unwrap();
assert!(replacement.metadata().file_size < original_size);
assert!(replacement.metadata().file_size > 0);
assert_eq!(replacement.metadata().file_size, 417792);
assert_eq!(replacement.metadata().file_size, 49152);
}
3 => {
// Shard 3 has two stripes in the input range
assert_eq!(wrote_keys, 0x10000);
assert_eq!(wrote_keys, 0x1000);
let replacement = replacement.unwrap();
assert!(replacement.metadata().file_size < original_size);
assert!(replacement.metadata().file_size > 0);
assert_eq!(replacement.metadata().file_size, 811008);
assert_eq!(replacement.metadata().file_size, 73728);
}
_ => unreachable!(),
}
@@ -1346,6 +1424,8 @@ mod test {
tenant.tenant_shard_id,
&key_range,
lsn,
&tline.gate,
tline.cancel.clone(),
ctx,
)
.await?;

View File

@@ -719,6 +719,8 @@ impl InMemoryLayer {
ctx: &RequestContext,
key_range: Option<Range<Key>>,
l0_flush_global_state: &l0_flush::Inner,
gate: &utils::sync::gate::Gate,
cancel: CancellationToken,
) -> Result<Option<(PersistentLayerDesc, Utf8PathBuf)>> {
// Grab the lock in read-mode. We hold it over the I/O, but because this
// layer is not writeable anymore, no one should be trying to acquire the
@@ -759,6 +761,8 @@ impl InMemoryLayer {
self.tenant_shard_id,
Key::MIN,
self.start_lsn..end_lsn,
gate,
cancel,
ctx,
)
.await?;

View File

@@ -4805,7 +4805,13 @@ impl Timeline {
let ctx = ctx.attached_child();
let work = async move {
let Some((desc, path)) = frozen_layer
.write_to_disk(&ctx, key_range, self_clone.l0_flush_global_state.inner())
.write_to_disk(
&ctx,
key_range,
self_clone.l0_flush_global_state.inner(),
&self_clone.gate,
self_clone.cancel.clone(),
)
.await?
else {
return Ok(None);
@@ -5343,6 +5349,8 @@ impl Timeline {
self.tenant_shard_id,
&img_range,
lsn,
&self.gate,
self.cancel.clone(),
ctx,
)
.await?;
@@ -6707,6 +6715,8 @@ impl Timeline {
self.tenant_shard_id,
&(min_key..end_key),
lsn,
&self.gate,
self.cancel.clone(),
ctx,
)
.await?;
@@ -6768,6 +6778,8 @@ impl Timeline {
self.tenant_shard_id,
deltas.key_range.start,
deltas.lsn_range,
&self.gate,
self.cancel.clone(),
ctx,
)
.await?;

View File

@@ -747,8 +747,8 @@ impl KeyHistoryRetention {
async fn pipe_to(
self,
key: Key,
delta_writer: &mut SplitDeltaLayerWriter,
mut image_writer: Option<&mut SplitImageLayerWriter>,
delta_writer: &mut SplitDeltaLayerWriter<'_>,
mut image_writer: Option<&mut SplitImageLayerWriter<'_>>,
stat: &mut CompactionStatistics,
ctx: &RequestContext,
) -> anyhow::Result<()> {
@@ -1140,6 +1140,7 @@ impl Timeline {
) -> Result<(), CompactionError> {
let mut drop_layers = Vec::new();
let mut layers_to_rewrite: Vec<Layer> = Vec::new();
let mut rewrite_max_exceeded: bool = false;
// We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
// layer is behind this Lsn, it indicates that the layer is being retained beyond the
@@ -1148,12 +1149,7 @@ impl Timeline {
// Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
// are rewriting layers.
let latest_gc_cutoff = self.get_applied_gc_cutoff_lsn();
tracing::info!(
"starting shard ancestor compaction, latest_gc_cutoff: {}, pitr cutoff {}",
*latest_gc_cutoff,
self.gc_info.read().unwrap().cutoffs.time
);
let pitr_cutoff = self.gc_info.read().unwrap().cutoffs.time;
let layers = self.layers.read().await;
for layer_desc in layers.layer_map()?.iter_historic_layers() {
@@ -1171,8 +1167,8 @@ impl Timeline {
// This ancestral layer only covers keys that belong to other shards.
// We include the full metadata in the log: if we had some critical bug that caused
// us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
info!(%layer, old_metadata=?layer.metadata(),
"dropping layer after shard split, contains no keys for this shard.",
debug!(%layer, old_metadata=?layer.metadata(),
"dropping layer after shard split, contains no keys for this shard",
);
if cfg!(debug_assertions) {
@@ -1234,9 +1230,10 @@ impl Timeline {
}
if layers_to_rewrite.len() >= rewrite_max {
tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
debug!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
layers_to_rewrite.len()
);
rewrite_max_exceeded = true;
continue;
}
@@ -1244,9 +1241,24 @@ impl Timeline {
layers_to_rewrite.push(layer);
}
// Drop read lock on layer map before we start doing time-consuming I/O
// Drop read lock on layer map before we start doing time-consuming I/O.
drop(layers);
// Drop out early if there's nothing to do.
if layers_to_rewrite.is_empty() && drop_layers.is_empty() {
return Ok(());
}
info!(
"starting shard ancestor compaction, rewriting {} layers and dropping {} layers \
(latest_gc_cutoff={} pitr_cutoff={})",
layers_to_rewrite.len(),
drop_layers.len(),
*latest_gc_cutoff,
pitr_cutoff,
);
let started = Instant::now();
let mut replace_image_layers = Vec::new();
for layer in layers_to_rewrite {
@@ -1254,13 +1266,15 @@ impl Timeline {
return Err(CompactionError::ShuttingDown);
}
tracing::info!(layer=%layer, "Rewriting layer after shard split...");
info!(layer=%layer, "rewriting layer after shard split");
let mut image_layer_writer = ImageLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
&layer.layer_desc().key_range,
layer.layer_desc().image_layer_lsn(),
&self.gate,
self.cancel.clone(),
ctx,
)
.await
@@ -1292,7 +1306,7 @@ impl Timeline {
.map_err(CompactionError::Other)?;
let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
.map_err(CompactionError::Other)?;
tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
info!(layer=%new_layer, "rewrote layer, {} -> {} bytes",
layer.metadata().file_size,
new_layer.metadata().file_size);
@@ -1304,6 +1318,12 @@ impl Timeline {
}
}
for layer in &drop_layers {
info!(%layer, old_metadata=?layer.metadata(),
"dropping layer after shard split (no keys for this shard)",
);
}
// At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
// metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
// to remote index) and be removed. This is inefficient but safe.
@@ -1319,6 +1339,7 @@ impl Timeline {
// necessary for correctness, but it simplifies testing, and avoids proceeding with another
// Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
// load.
info!("shard ancestor compaction waiting for uploads");
match self.remote_client.wait_completion().await {
Ok(()) => (),
Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
@@ -1327,6 +1348,15 @@ impl Timeline {
}
}
info!(
"shard ancestor compaction done in {:.3}s{}",
started.elapsed().as_secs_f64(),
match rewrite_max_exceeded {
true => format!(", more work pending due to rewrite_max={rewrite_max}"),
false => String::new(),
}
);
fail::fail_point!("compact-shard-ancestors-persistent");
Ok(())
@@ -1861,6 +1891,8 @@ impl Timeline {
debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
lsn_range.clone()
},
&self.gate,
self.cancel.clone(),
ctx,
)
.await
@@ -3055,6 +3087,8 @@ impl Timeline {
job_desc.compaction_key_range.start,
lowest_retain_lsn,
self.get_compaction_target_size(),
&self.gate,
self.cancel.clone(),
ctx,
)
.await
@@ -3071,6 +3105,8 @@ impl Timeline {
self.tenant_shard_id,
lowest_retain_lsn..end_lsn,
self.get_compaction_target_size(),
&self.gate,
self.cancel.clone(),
)
.await
.context("failed to create delta layer writer")
@@ -3167,6 +3203,8 @@ impl Timeline {
self.tenant_shard_id,
desc.key_range.start,
desc.lsn_range.clone(),
&self.gate,
self.cancel.clone(),
ctx,
)
.await
@@ -3184,6 +3222,8 @@ impl Timeline {
self.tenant_shard_id,
job_desc.compaction_key_range.end,
desc.lsn_range.clone(),
&self.gate,
self.cancel.clone(),
ctx,
)
.await
@@ -3753,6 +3793,8 @@ impl CompactionJobExecutor for TimelineAdaptor {
self.timeline.tenant_shard_id,
key_range.start,
lsn_range.clone(),
&self.timeline.gate,
self.timeline.cancel.clone(),
ctx,
)
.await?;
@@ -3828,6 +3870,8 @@ impl TimelineAdaptor {
self.timeline.tenant_shard_id,
key_range,
lsn,
&self.timeline.gate,
self.timeline.cancel.clone(),
ctx,
)
.await?;

View File

@@ -231,6 +231,8 @@ async fn generate_tombstone_image_layer(
detached.tenant_shard_id,
&key_range,
image_lsn,
&detached.gate,
detached.cancel.clone(),
ctx,
)
.await
@@ -779,6 +781,8 @@ async fn copy_lsn_prefix(
target_timeline.tenant_shard_id,
layer.layer_desc().key_range.start,
layer.layer_desc().lsn_range.start..end_lsn,
&target_timeline.gate,
target_timeline.cancel.clone(),
ctx,
)
.await

View File

@@ -738,6 +738,8 @@ impl ChunkProcessingJob {
self.timeline.tenant_shard_id,
&self.range,
self.pgdata_lsn,
&self.timeline.gate,
self.timeline.cancel.clone(),
ctx,
)
.await?;

View File

@@ -580,6 +580,7 @@ impl ConnectionManagerState {
);
Ok(())
}
WalReceiverError::Cancelled => Ok(()),
WalReceiverError::Other(e) => {
// give out an error to have task_mgr give it a really verbose logging
if cancellation.is_cancelled() {

View File

@@ -73,6 +73,7 @@ pub(super) enum WalReceiverError {
/// Generic error
Other(anyhow::Error),
ClosedGate,
Cancelled,
}
impl From<tokio_postgres::Error> for WalReceiverError {
@@ -200,6 +201,9 @@ pub(super) async fn handle_walreceiver_connection(
// with a similar error.
},
WalReceiverError::SuccessfulCompletion(_) => {}
WalReceiverError::Cancelled => {
debug!("Connection cancelled")
}
WalReceiverError::ClosedGate => {
// doesn't happen at runtime
}
@@ -273,7 +277,12 @@ pub(super) async fn handle_walreceiver_connection(
let mut waldecoder = WalStreamDecoder::new(startpoint, timeline.pg_version);
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx).await?;
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx)
.await
.map_err(|e| match e.kind {
crate::walingest::WalIngestErrorKind::Cancelled => WalReceiverError::Cancelled,
_ => WalReceiverError::Other(e.into()),
})?;
let shard = vec![*timeline.get_shard_identity()];

View File

@@ -677,7 +677,6 @@ impl StreamingVectoredReadPlanner {
#[cfg(test)]
mod tests {
use anyhow::Error;
use super::super::blob_io::tests::{random_array, write_maybe_compressed};
use super::*;
@@ -960,13 +959,16 @@ mod tests {
}
}
async fn round_trip_test_compressed(blobs: &[Vec<u8>], compression: bool) -> Result<(), Error> {
async fn round_trip_test_compressed(
blobs: &[Vec<u8>],
compression: bool,
) -> anyhow::Result<()> {
let ctx =
RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
let (_temp_dir, pathbuf, offsets) =
write_maybe_compressed::<true>(blobs, compression, &ctx).await?;
write_maybe_compressed(blobs, compression, &ctx).await?;
let file = VirtualFile::open(&pathbuf, &ctx).await?;
let file = VirtualFile::open_v2(&pathbuf, &ctx).await?;
let file_len = std::fs::metadata(&pathbuf)?.len();
// Multiply by two (compressed data might need more space), and add a few bytes for the header
@@ -1003,7 +1005,7 @@ mod tests {
}
#[tokio::test]
async fn test_really_big_array() -> Result<(), Error> {
async fn test_really_big_array() -> anyhow::Result<()> {
let blobs = &[
b"test".to_vec(),
random_array(10 * PAGE_SZ),
@@ -1018,7 +1020,7 @@ mod tests {
}
#[tokio::test]
async fn test_arrays_inc() -> Result<(), Error> {
async fn test_arrays_inc() -> anyhow::Result<()> {
let blobs = (0..PAGE_SZ / 8)
.map(|v| random_array(v * 16))
.collect::<Vec<_>>();

View File

@@ -12,7 +12,7 @@
//! src/backend/storage/file/fd.c
//!
use std::fs::File;
use std::io::{Error, ErrorKind, Seek, SeekFrom};
use std::io::{Error, ErrorKind};
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
#[cfg(target_os = "linux")]
use std::os::unix::fs::OpenOptionsExt;
@@ -185,18 +185,14 @@ impl VirtualFile {
self.inner.sync_data().await
}
pub async fn set_len(&self, len: u64) -> Result<(), Error> {
self.inner.set_len(len).await
}
pub async fn metadata(&self) -> Result<Metadata, Error> {
self.inner.metadata().await
}
pub fn remove(self) {
self.inner.remove();
}
pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
self.inner.seek(pos).await
}
pub async fn read_exact_at<Buf>(
&self,
slice: Slice<Buf>,
@@ -227,25 +223,31 @@ impl VirtualFile {
self.inner.write_all_at(buf, offset, ctx).await
}
pub async fn write_all<Buf: IoBuf + Send>(
&mut self,
buf: FullSlice<Buf>,
pub(crate) async fn read_to_string<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<usize, Error>) {
self.inner.write_all(buf, ctx).await
}
async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
self.inner.read_to_end(buf, ctx).await
}
pub(crate) async fn read_to_string(
&mut self,
ctx: &RequestContext,
) -> Result<String, anyhow::Error> {
) -> std::io::Result<String> {
let file = VirtualFile::open(path, ctx).await?; // TODO: open_v2
let mut buf = Vec::new();
self.read_to_end(&mut buf, ctx).await?;
Ok(String::from_utf8(buf)?)
let mut tmp = vec![0; 128];
let mut pos: u64 = 0;
loop {
let slice = tmp.slice(..128);
let (slice, res) = file.inner.read_at(slice, pos, ctx).await;
match res {
Ok(0) => break,
Ok(n) => {
pos += n as u64;
buf.extend_from_slice(&slice[..n]);
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
tmp = slice.into_inner();
}
String::from_utf8(buf).map_err(|_| {
std::io::Error::new(ErrorKind::InvalidData, "file contents are not valid UTF-8")
})
}
}
@@ -292,9 +294,6 @@ pub struct VirtualFileInner {
/// belongs to a different VirtualFile.
handle: RwLock<SlotHandle>,
/// Current file position
pos: u64,
/// File path and options to use to open it.
///
/// Note: this only contains the options needed to re-open it. For example,
@@ -608,7 +607,6 @@ impl VirtualFileInner {
let vfile = VirtualFileInner {
handle: RwLock::new(handle),
pos: 0,
path: path.to_owned(),
open_options: reopen_options,
};
@@ -675,6 +673,13 @@ impl VirtualFileInner {
})
}
pub async fn set_len(&self, len: u64) -> Result<(), Error> {
with_file!(self, StorageIoOperation::SetLen, |file_guard| {
let (_file_guard, res) = io_engine::get().set_len(file_guard, len).await;
res.maybe_fatal_err("set_len")
})
}
/// Helper function internal to `VirtualFile` that looks up the underlying File,
/// opens it and evicts some other File if necessary. The passed parameter is
/// assumed to be a function available for the physical `File`.
@@ -742,38 +747,6 @@ impl VirtualFileInner {
})
}
pub fn remove(self) {
let path = self.path.clone();
drop(self);
std::fs::remove_file(path).expect("failed to remove the virtual file");
}
pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
match pos {
SeekFrom::Start(offset) => {
self.pos = offset;
}
SeekFrom::End(offset) => {
self.pos = with_file!(self, StorageIoOperation::Seek, |mut file_guard| file_guard
.with_std_file_mut(|std_file| std_file.seek(SeekFrom::End(offset))))?
}
SeekFrom::Current(offset) => {
let pos = self.pos as i128 + offset as i128;
if pos < 0 {
return Err(Error::new(
ErrorKind::InvalidInput,
"offset would be negative",
));
}
if pos > u64::MAX as i128 {
return Err(Error::new(ErrorKind::InvalidInput, "offset overflow"));
}
self.pos = pos as u64;
}
}
Ok(self.pos)
}
/// Read the file contents in range `offset..(offset + slice.bytes_total())` into `slice[0..slice.bytes_total()]`.
///
/// The returned `Slice<Buf>` is equivalent to the input `slice`, i.e., it's the same view into the same buffer.
@@ -857,59 +830,7 @@ impl VirtualFileInner {
(restore(buf), Ok(()))
}
/// Writes `buf` to the file at the current offset.
///
/// Panics if there is an uninitialized range in `buf`, as that is most likely a bug in the caller.
pub async fn write_all<Buf: IoBuf + Send>(
&mut self,
buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<usize, Error>) {
let buf = buf.into_raw_slice();
let bounds = buf.bounds();
let restore =
|buf: Slice<_>| FullSlice::must_new(Slice::from_buf_bounds(buf.into_inner(), bounds));
let nbytes = buf.len();
let mut buf = buf;
while !buf.is_empty() {
let (tmp, res) = self.write(FullSlice::must_new(buf), ctx).await;
buf = tmp.into_raw_slice();
match res {
Ok(0) => {
return (
restore(buf),
Err(Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
)),
);
}
Ok(n) => {
buf = buf.slice(n..);
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return (restore(buf), Err(e)),
}
}
(restore(buf), Ok(nbytes))
}
async fn write<B: IoBuf + Send>(
&mut self,
buf: FullSlice<B>,
ctx: &RequestContext,
) -> (FullSlice<B>, Result<usize, std::io::Error>) {
let pos = self.pos;
let (buf, res) = self.write_at(buf, pos, ctx).await;
let n = match res {
Ok(n) => n,
Err(e) => return (buf, Err(e)),
};
self.pos += n as u64;
(buf, Ok(n))
}
pub(crate) async fn read_at<Buf>(
pub(super) async fn read_at<Buf>(
&self,
buf: tokio_epoll_uring::Slice<Buf>,
offset: u64,
@@ -937,23 +858,11 @@ impl VirtualFileInner {
})
}
/// The function aborts the process if the error is fatal.
async fn write_at<B: IoBuf + Send>(
&self,
buf: FullSlice<B>,
offset: u64,
ctx: &RequestContext,
) -> (FullSlice<B>, Result<usize, Error>) {
let (slice, result) = self.write_at_inner(buf, offset, ctx).await;
let result = result.maybe_fatal_err("write_at");
(slice, result)
}
async fn write_at_inner<B: IoBuf + Send>(
&self,
buf: FullSlice<B>,
offset: u64,
ctx: &RequestContext,
) -> (FullSlice<B>, Result<usize, Error>) {
let file_guard = match self.lock_file().await {
Ok(file_guard) => file_guard,
@@ -962,30 +871,13 @@ impl VirtualFileInner {
observe_duration!(StorageIoOperation::Write, {
let ((_file_guard, buf), result) =
io_engine::get().write_at(file_guard, offset, buf).await;
let result = result.maybe_fatal_err("write_at");
if let Ok(size) = result {
ctx.io_size_metrics().write.add(size.into_u64());
}
(buf, result)
})
}
async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
let mut tmp = vec![0; 128];
loop {
let slice = tmp.slice(..128);
let (slice, res) = self.read_at(slice, self.pos, ctx).await;
match res {
Ok(0) => return Ok(()),
Ok(n) => {
self.pos += n as u64;
buf.extend_from_slice(&slice[..n]);
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
tmp = slice.into_inner();
}
}
}
// Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
@@ -1200,19 +1092,6 @@ impl FileGuard {
let _ = file.into_raw_fd();
res
}
/// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
fn with_std_file_mut<F, R>(&mut self, with: F) -> R
where
F: FnOnce(&mut File) -> R,
{
// SAFETY:
// - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
// - &mut usage below: `self` is `&mut`, hence this call is the only task/thread that has control over the underlying fd
let mut file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
let res = with(&mut file);
let _ = file.into_raw_fd();
res
}
}
impl tokio_epoll_uring::IoFd for FileGuard {
@@ -1380,7 +1259,6 @@ static SYNC_MODE: AtomicU8 = AtomicU8::new(SyncMode::Sync as u8);
#[cfg(test)]
mod tests {
use std::io::Write;
use std::os::unix::fs::FileExt;
use std::sync::Arc;
@@ -1433,43 +1311,6 @@ mod tests {
MaybeVirtualFile::File(file) => file.write_all_at(&buf[..], offset),
}
}
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.seek(pos).await,
MaybeVirtualFile::File(file) => file.seek(pos),
}
}
async fn write_all<Buf: IoBuf + Send>(
&mut self,
buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => {
let (_buf, res) = file.write_all(buf, ctx).await;
res.map(|_| ())
}
MaybeVirtualFile::File(file) => file.write_all(&buf[..]),
}
}
// Helper function to slurp contents of a file, starting at the current position,
// into a string
async fn read_string(&mut self, ctx: &RequestContext) -> Result<String, Error> {
use std::io::Read;
let mut buf = String::new();
match self {
MaybeVirtualFile::VirtualFile(file) => {
let mut buf = Vec::new();
file.read_to_end(&mut buf, ctx).await?;
return Ok(String::from_utf8(buf).unwrap());
}
MaybeVirtualFile::File(file) => {
file.read_to_string(&mut buf)?;
}
}
Ok(buf)
}
// Helper function to slurp a portion of a file into a string
async fn read_string_at(
@@ -1565,48 +1406,23 @@ mod tests {
.await?;
file_a
.write_all(b"foobar".to_vec().slice_len(), &ctx)
.write_all_at(IoBuffer::from(b"foobar").slice_len(), 0, &ctx)
.await?;
// cannot read from a file opened in write-only mode
let _ = file_a.read_string(&ctx).await.unwrap_err();
let _ = file_a.read_string_at(0, 1, &ctx).await.unwrap_err();
// Close the file and re-open for reading
let mut file_a = A::open(path_a, OpenOptions::new().read(true).to_owned(), &ctx).await?;
// cannot write to a file opened in read-only mode
let _ = file_a
.write_all(b"bar".to_vec().slice_len(), &ctx)
.write_all_at(IoBuffer::from(b"bar").slice_len(), 0, &ctx)
.await
.unwrap_err();
// Try simple read
assert_eq!("foobar", file_a.read_string(&ctx).await?);
// It's positioned at the EOF now.
assert_eq!("", file_a.read_string(&ctx).await?);
// Test seeks.
assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
assert_eq!("oobar", file_a.read_string(&ctx).await?);
assert_eq!(file_a.seek(SeekFrom::End(-2)).await?, 4);
assert_eq!("ar", file_a.read_string(&ctx).await?);
assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
assert_eq!(file_a.seek(SeekFrom::Current(2)).await?, 3);
assert_eq!("bar", file_a.read_string(&ctx).await?);
assert_eq!(file_a.seek(SeekFrom::Current(-5)).await?, 1);
assert_eq!("oobar", file_a.read_string(&ctx).await?);
// Test erroneous seeks to before byte 0
file_a.seek(SeekFrom::End(-7)).await.unwrap_err();
assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
file_a.seek(SeekFrom::Current(-2)).await.unwrap_err();
// the erroneous seek should have left the position unchanged
assert_eq!("oobar", file_a.read_string(&ctx).await?);
assert_eq!("foobar", file_a.read_string_at(0, 6, &ctx).await?);
// Create another test file, and try FileExt functions on it.
let path_b = testdir.join("file_b");
@@ -1632,9 +1448,6 @@ mod tests {
// Open a lot of files, enough to cause some evictions. (Or to be precise,
// open the same file many times. The effect is the same.)
//
// leave file_a positioned at offset 1 before we start
assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
let mut vfiles = Vec::new();
for _ in 0..100 {
@@ -1644,7 +1457,7 @@ mod tests {
&ctx,
)
.await?;
assert_eq!("FOOBAR", vfile.read_string(&ctx).await?);
assert_eq!("FOOBAR", vfile.read_string_at(0, 6, &ctx).await?);
vfiles.push(vfile);
}
@@ -1652,8 +1465,8 @@ mod tests {
assert!(vfiles.len() > TEST_MAX_FILE_DESCRIPTORS * 2);
// The underlying file descriptor for 'file_a' should be closed now. Try to read
// from it again. We left the file positioned at offset 1 above.
assert_eq!("oobar", file_a.read_string(&ctx).await?);
// from it again.
assert_eq!("foobar", file_a.read_string_at(0, 6, &ctx).await?);
// Check that all the other FDs still work too. Use them in random order for
// good measure.
@@ -1747,7 +1560,7 @@ mod tests {
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
let post = file.read_string(&ctx).await.unwrap();
let post = file.read_string_at(0, 3, &ctx).await.unwrap();
assert_eq!(post, "foo");
assert!(!tmp_path.exists());
drop(file);
@@ -1756,7 +1569,7 @@ mod tests {
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
let post = file.read_string(&ctx).await.unwrap();
let post = file.read_string_at(0, 3, &ctx).await.unwrap();
assert_eq!(post, "bar");
assert!(!tmp_path.exists());
drop(file);
@@ -1781,7 +1594,7 @@ mod tests {
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
let post = file.read_string(&ctx).await.unwrap();
let post = file.read_string_at(0, 3, &ctx).await.unwrap();
assert_eq!(post, "foo");
assert!(!tmp_path.exists());
drop(file);

View File

@@ -209,6 +209,22 @@ impl IoEngine {
}
}
}
pub(super) async fn set_len(
&self,
file_guard: FileGuard,
len: u64,
) -> (FileGuard, std::io::Result<()>) {
match self {
IoEngine::NotSet => panic!("not initialized"),
// TODO: ftruncate op for tokio-epoll-uring
IoEngine::StdFs | IoEngine::TokioEpollUring => {
let res = file_guard.with_std_file(|std_file| std_file.set_len(len));
(file_guard, res)
}
}
}
pub(super) async fn write_at<B: IoBuf + Send>(
&self,
file_guard: FileGuard,

View File

@@ -282,6 +282,17 @@ unsafe impl<A: Alignment> tokio_epoll_uring::IoBufMut for AlignedBufferMut<A> {
}
}
impl<A: Alignment> std::io::Write for AlignedBufferMut<A> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
#[cfg(test)]
mod tests {

View File

@@ -1,9 +1,11 @@
use tokio_epoll_uring::{IoBuf, IoBufMut};
use crate::virtual_file::{IoBuffer, IoBufferMut, PageWriteGuardBuf};
use crate::virtual_file::{self, IoBuffer, IoBufferMut, PageWriteGuardBuf};
/// A marker trait for a mutable aligned buffer type.
pub trait IoBufAlignedMut: IoBufMut {}
pub trait IoBufAlignedMut: IoBufMut {
const ALIGN: usize = virtual_file::get_io_buffer_alignment();
}
/// A marker trait for an aligned buffer type.
pub trait IoBufAligned: IoBuf {}

View File

@@ -1,6 +1,7 @@
mod flush;
use std::sync::Arc;
use bytes::BufMut;
pub(crate) use flush::FlushControl;
use flush::FlushHandle;
pub(crate) use flush::FlushTaskError;
@@ -8,6 +9,7 @@ use tokio_epoll_uring::IoBuf;
use tokio_util::sync::CancellationToken;
use super::io_buf_aligned::IoBufAligned;
use super::io_buf_aligned::IoBufAlignedMut;
use super::io_buf_ext::{FullSlice, IoBufExt};
use crate::context::RequestContext;
use crate::virtual_file::{IoBuffer, IoBufferMut};
@@ -64,7 +66,7 @@ pub struct BufferedWriter<B: Buffer, W> {
impl<B, Buf, W> BufferedWriter<B, W>
where
B: Buffer<IoBuf = Buf> + Send + 'static,
B: IoBufAlignedMut + Buffer<IoBuf = Buf> + Send + 'static,
Buf: IoBufAligned + Send + Sync + CheapCloneForRead,
W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug,
{
@@ -73,6 +75,7 @@ where
/// The `buf_new` function provides a way to initialize the owned buffers used by this writer.
pub fn new(
writer: Arc<W>,
start_offset: u64,
buf_new: impl Fn() -> B,
gate_guard: utils::sync::gate::GateGuard,
cancel: CancellationToken,
@@ -91,7 +94,7 @@ where
ctx.attached_child(),
flush_task_span,
),
bytes_submitted: 0,
bytes_submitted: start_offset,
}
}
@@ -116,21 +119,29 @@ where
}
#[cfg_attr(target_os = "macos", allow(dead_code))]
pub async fn flush_and_into_inner(
mut self,
ctx: &RequestContext,
) -> Result<(u64, Arc<W>), FlushTaskError> {
self.flush(ctx).await?;
pub async fn shutdown(
self,
mut handle_tail: impl FnMut(B) -> Option<B>,
) -> Result<(u64, W), FlushTaskError> {
let Self {
mutable: buf,
mutable: tail,
maybe_flushed: _,
writer,
mut flush_handle,
bytes_submitted: bytes_amount,
bytes_submitted: submit_offset,
} = self;
flush_handle.shutdown().await?;
assert!(buf.is_some());
let ctx = flush_handle.shutdown().await?;
let buf = tail.expect("must not use after an error");
let writer = Arc::into_inner(writer).expect("writer is the only strong reference");
let mut bytes_amount = submit_offset;
if let Some(buf) = handle_tail(buf) {
bytes_amount += buf.pending() as u64;
// TODO: infinite retries + maybe_fatal_err like we do in the flush loop; can we just send this
// as work into the flush loop, as part of flush_handle.shutdown()
let (_, res) = writer.write_all_at(buf.flush(), submit_offset, &ctx).await;
let _: () = res.unwrap(); // DO NOT MERGE, THIS CAN FAIL E.G. on ENOSPC
}
Ok((bytes_amount, writer))
}
@@ -235,6 +246,10 @@ pub trait Buffer {
/// panics if `other.len() > self.cap() - self.pending()`.
fn extend_from_slice(&mut self, other: &[u8]);
/// Add `count` bytes `val` into `self`.
/// Panics if `count > self.cap() - self.pending()`.
fn extend_with(&mut self, val: u8, count: usize);
/// Number of bytes in the buffer.
fn pending(&self) -> usize;
@@ -262,6 +277,14 @@ impl Buffer for IoBufferMut {
IoBufferMut::extend_from_slice(self, other);
}
fn extend_with(&mut self, val: u8, count: usize) {
if self.len() + count > self.cap() {
panic!("Buffer capacity exceeded");
}
IoBufferMut::put_bytes(self, val, count);
}
fn pending(&self) -> usize {
self.len()
}
@@ -334,6 +357,7 @@ mod tests {
let cancel = CancellationToken::new();
let mut writer = BufferedWriter::<_, RecorderWriter>::new(
recorder,
0,
|| IoBufferMut::with_capacity(2),
gate.enter()?,
cancel,
@@ -350,7 +374,7 @@ mod tests {
writer.write_buffered_borrowed(b"j", ctx).await?;
writer.write_buffered_borrowed(b"klmno", ctx).await?;
let (_, recorder) = writer.flush_and_into_inner(ctx).await?;
let (_, recorder) = writer.shutdown(Some).await?;
assert_eq!(
recorder.get_writes(),
{

View File

@@ -1,5 +1,5 @@
use std::ops::ControlFlow;
use std::sync::Arc;
use std::{marker::PhantomData, sync::Arc};
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, info, info_span, warn};
@@ -21,7 +21,10 @@ pub struct FlushHandleInner<Buf, W> {
/// and receives recyled buffer.
channel: duplex::mpsc::Duplex<FlushRequest<Buf>, FullSlice<Buf>>,
/// Join handle for the background flush task.
join_handle: tokio::task::JoinHandle<Result<Arc<W>, FlushTaskError>>,
join_handle: tokio::task::JoinHandle<Result<RequestContext, FlushTaskError>>,
// TODO: get rit of the type parameter?
_phantom: PhantomData<W>,
}
struct FlushRequest<Buf> {
@@ -144,6 +147,7 @@ where
inner: Some(FlushHandleInner {
channel: front,
join_handle,
_phantom: PhantomData,
}),
}
}
@@ -179,12 +183,13 @@ where
Err(self
.shutdown()
.await
.expect_err("flush task only disconnects duplex if it exits with an error"))
.err()
.expect("flush task only disconnects duplex if it exits with an error"))
}
/// Cleans up the channel, join the flush task.
pub async fn shutdown(&mut self) -> Result<Arc<W>, FlushTaskError> {
let handle = self
pub async fn shutdown(&mut self) -> Result<RequestContext, FlushTaskError> {
let handle: FlushHandleInner<Buf, W> = self
.inner
.take()
.expect("must not use after we returned an error");
@@ -243,7 +248,7 @@ where
}
/// Runs the background flush task.
async fn run(mut self) -> Result<Arc<W>, FlushTaskError> {
async fn run(mut self) -> Result<RequestContext, FlushTaskError> {
// Exit condition: channel is closed and there is no remaining buffer to be flushed
while let Some(request) = self.channel.recv().await {
#[cfg(test)]
@@ -313,8 +318,7 @@ where
continue;
}
}
Ok(self.writer)
Ok(self.ctx)
}
}
@@ -349,7 +353,7 @@ impl FlushNotStarted {
impl FlushInProgress {
/// Waits until background flush is done.
pub async fn wait_until_flush_is_done(self) -> FlushDone {
self.done_flush_rx.await.unwrap();
let _ = self.done_flush_rx.await;
FlushDone
}
}

View File

@@ -21,13 +21,13 @@
//! redo Postgres process, but some records it can handle directly with
//! bespoken Rust code.
use std::backtrace::Backtrace;
use std::collections::HashMap;
use std::sync::{Arc, OnceLock};
use std::time::{Duration, Instant, SystemTime};
use anyhow::{Result, bail};
use bytes::{Buf, Bytes};
use pageserver_api::key::rel_block_to_key;
use pageserver_api::key::{Key, rel_block_to_key};
use pageserver_api::record::NeonWalRecord;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use pageserver_api::shard::ShardIdentity;
@@ -38,7 +38,7 @@ use postgres_ffi::{
fsm_logical_to_physical, pg_constants,
};
use tracing::*;
use utils::bin_ser::SerializeError;
use utils::bin_ser::{DeserializeError, SerializeError};
use utils::lsn::Lsn;
use utils::rate_limit::RateLimit;
use utils::{critical, failpoint_support};
@@ -104,12 +104,101 @@ struct WarnIngestLag {
timestamp_invalid_msg_ratelimit: RateLimit,
}
pub struct WalIngestError {
pub backtrace: std::backtrace::Backtrace,
pub kind: WalIngestErrorKind,
}
#[derive(thiserror::Error, Debug)]
pub enum WalIngestErrorKind {
#[error(transparent)]
#[allow(private_interfaces)]
PageReconstructError(#[from] PageReconstructError),
#[error(transparent)]
DeserializationFailure(#[from] DeserializeError),
#[error(transparent)]
SerializationFailure(#[from] SerializeError),
#[error("the request contains data not supported by pageserver: {0} @ {1}")]
InvalidKey(Key, Lsn),
#[error("twophase file for xid {0} already exists")]
FileAlreadyExists(u64),
#[error("slru segment {0:?}/{1} already exists")]
SlruAlreadyExists(SlruKind, u32),
#[error("relation already exists")]
RelationAlreadyExists(RelTag),
#[error("invalid reldir key {0}")]
InvalidRelDirKey(Key),
#[error(transparent)]
LogicalError(anyhow::Error),
#[error(transparent)]
EncodeAuxFileError(anyhow::Error),
#[error(transparent)]
MaybeRelSizeV2Error(anyhow::Error),
#[error("timeline shutting down")]
Cancelled,
}
impl<T> From<T> for WalIngestError
where
WalIngestErrorKind: From<T>,
{
fn from(value: T) -> Self {
WalIngestError {
backtrace: Backtrace::capture(),
kind: WalIngestErrorKind::from(value),
}
}
}
impl std::error::Error for WalIngestError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.kind.source()
}
}
impl core::fmt::Display for WalIngestError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
self.kind.fmt(f)
}
}
impl core::fmt::Debug for WalIngestError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
if f.alternate() {
f.debug_map()
.key(&"backtrace")
.value(&self.backtrace)
.key(&"kind")
.value(&self.kind)
.finish()
} else {
writeln!(f, "Error: {:?}", self.kind)?;
if self.backtrace.status() == std::backtrace::BacktraceStatus::Captured {
writeln!(f, "Stack backtrace: {:?}", self.backtrace)?;
}
Ok(())
}
}
}
#[macro_export]
macro_rules! ensure_walingest {
($($t:tt)*) => {
_ = || -> Result<(), anyhow::Error> {
anyhow::ensure!($($t)*);
Ok(())
}().map_err(WalIngestErrorKind::LogicalError)?;
};
}
impl WalIngest {
pub async fn new(
timeline: &Timeline,
startpoint: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<WalIngest> {
) -> Result<WalIngest, WalIngestError> {
// Fetch the latest checkpoint into memory, so that we can compare with it
// quickly in `ingest_record` and update it when it changes.
let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
@@ -145,7 +234,7 @@ impl WalIngest {
interpreted: InterpretedWalRecord,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> anyhow::Result<bool> {
) -> Result<bool, WalIngestError> {
WAL_INGEST.records_received.inc();
let prev_len = modification.len();
@@ -288,7 +377,7 @@ impl WalIngest {
}
/// This is the same as AdjustToFullTransactionId(xid) in PostgreSQL
fn adjust_to_full_transaction_id(&self, xid: TransactionId) -> Result<u64> {
fn adjust_to_full_transaction_id(&self, xid: TransactionId) -> Result<u64, WalIngestError> {
let next_full_xid =
enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, cp, { cp.nextXid.value });
@@ -298,9 +387,9 @@ impl WalIngest {
if xid > next_xid {
// Wraparound occurred, must be from a prev epoch.
if epoch == 0 {
bail!(
Err(WalIngestErrorKind::LogicalError(anyhow::anyhow!(
"apparent XID wraparound with prepared transaction XID {xid}, nextXid is {next_full_xid}"
);
)))?;
}
epoch -= 1;
}
@@ -313,7 +402,7 @@ impl WalIngest {
clear_vm_bits: ClearVmBits,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let ClearVmBits {
new_heap_blkno,
old_heap_blkno,
@@ -402,7 +491,7 @@ impl WalIngest {
create: DbaseCreate,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let DbaseCreate {
db_id,
tablespace_id,
@@ -505,7 +594,7 @@ impl WalIngest {
dbase_drop: DbaseDrop,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let DbaseDrop {
db_id,
tablespace_ids,
@@ -523,7 +612,7 @@ impl WalIngest {
create: SmgrCreate,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let SmgrCreate { rel } = create;
self.put_rel_creation(modification, rel, ctx).await?;
Ok(())
@@ -537,7 +626,7 @@ impl WalIngest {
truncate: XlSmgrTruncate,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let XlSmgrTruncate {
blkno,
rnode,
@@ -689,7 +778,7 @@ impl WalIngest {
record: XactRecord,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let (xact_common, is_commit, is_prepared) = match record {
XactRecord::Prepare(XactPrepare { xl_xid, data }) => {
let xid: u64 = if modification.tline.pg_version >= 17 {
@@ -813,7 +902,7 @@ impl WalIngest {
truncate: ClogTruncate,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let ClogTruncate {
pageno,
oldest_xid,
@@ -889,7 +978,7 @@ impl WalIngest {
zero_page: ClogZeroPage,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let ClogZeroPage { segno, rpageno } = zero_page;
self.put_slru_page_image(
@@ -907,7 +996,7 @@ impl WalIngest {
&mut self,
modification: &mut DatadirModification,
xlrec: &XlMultiXactCreate,
) -> Result<()> {
) -> Result<(), WalIngestError> {
// Create WAL record for updating the multixact-offsets page
let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
@@ -1010,7 +1099,7 @@ impl WalIngest {
modification: &mut DatadirModification<'_>,
xlrec: &XlMultiXactTruncate,
ctx: &RequestContext,
) -> Result<()> {
) -> Result<(), WalIngestError> {
let (maxsegment, startsegment, endsegment) =
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
cp.oldestMulti = xlrec.end_trunc_off;
@@ -1058,7 +1147,7 @@ impl WalIngest {
zero_page: MultiXactZeroPage,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> Result<()> {
) -> Result<(), WalIngestError> {
let MultiXactZeroPage {
slru_kind,
segno,
@@ -1080,7 +1169,7 @@ impl WalIngest {
update: RelmapUpdate,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> Result<()> {
) -> Result<(), WalIngestError> {
let RelmapUpdate { update, buf } = update;
modification
@@ -1093,7 +1182,7 @@ impl WalIngest {
raw_record: RawXlogRecord,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> Result<()> {
) -> Result<(), WalIngestError> {
let RawXlogRecord { info, lsn, mut buf } = raw_record;
let pg_version = modification.tline.pg_version;
@@ -1235,12 +1324,12 @@ impl WalIngest {
put: PutLogicalMessage,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> Result<()> {
) -> Result<(), WalIngestError> {
let PutLogicalMessage { path, buf } = put;
modification.put_file(path.as_str(), &buf, ctx).await
}
fn ingest_standby_record(&mut self, record: StandbyRecord) -> Result<()> {
fn ingest_standby_record(&mut self, record: StandbyRecord) -> Result<(), WalIngestError> {
match record {
StandbyRecord::RunningXacts(running_xacts) => {
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
@@ -1258,7 +1347,7 @@ impl WalIngest {
&mut self,
record: ReploriginRecord,
modification: &mut DatadirModification<'_>,
) -> Result<()> {
) -> Result<(), WalIngestError> {
match record {
ReploriginRecord::Set(set) => {
modification
@@ -1278,7 +1367,7 @@ impl WalIngest {
modification: &mut DatadirModification<'_>,
rel: RelTag,
ctx: &RequestContext,
) -> Result<()> {
) -> Result<(), WalIngestError> {
modification.put_rel_creation(rel, 0, ctx).await?;
Ok(())
}
@@ -1291,7 +1380,7 @@ impl WalIngest {
blknum: BlockNumber,
img: Bytes,
ctx: &RequestContext,
) -> Result<(), PageReconstructError> {
) -> Result<(), WalIngestError> {
self.handle_rel_extend(modification, rel, blknum, ctx)
.await?;
modification.put_rel_page_image(rel, blknum, img)?;
@@ -1305,7 +1394,7 @@ impl WalIngest {
blknum: BlockNumber,
rec: NeonWalRecord,
ctx: &RequestContext,
) -> Result<()> {
) -> Result<(), WalIngestError> {
self.handle_rel_extend(modification, rel, blknum, ctx)
.await?;
modification.put_rel_wal_record(rel, blknum, rec)?;
@@ -1318,7 +1407,7 @@ impl WalIngest {
rel: RelTag,
nblocks: BlockNumber,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
modification.put_rel_truncation(rel, nblocks, ctx).await?;
Ok(())
}
@@ -1329,7 +1418,7 @@ impl WalIngest {
rel: RelTag,
blknum: BlockNumber,
ctx: &RequestContext,
) -> Result<(), PageReconstructError> {
) -> Result<(), WalIngestError> {
let new_nblocks = blknum + 1;
// Check if the relation exists. We implicitly create relations on first
// record.
@@ -1423,7 +1512,7 @@ impl WalIngest {
blknum: BlockNumber,
img: Bytes,
ctx: &RequestContext,
) -> Result<()> {
) -> Result<(), WalIngestError> {
if !self.shard.is_shard_zero() {
return Ok(());
}
@@ -1441,7 +1530,7 @@ impl WalIngest {
segno: u32,
blknum: BlockNumber,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
// we don't use a cache for this like we do for relations. SLRUS are explcitly
// extended with ZEROPAGE records, not with commit records, so it happens
// a lot less frequently.
@@ -1509,6 +1598,7 @@ async fn get_relsize(
#[allow(clippy::bool_assert_comparison)]
#[cfg(test)]
mod tests {
use anyhow::Result;
use postgres_ffi::RELSEG_SIZE;
use super::*;
@@ -1530,7 +1620,7 @@ mod tests {
}
#[tokio::test]
async fn test_zeroed_checkpoint_decodes_correctly() -> Result<()> {
async fn test_zeroed_checkpoint_decodes_correctly() -> Result<(), anyhow::Error> {
for i in 14..=16 {
dispatch_pgversion!(i, {
pgv::CheckPoint::decode(&pgv::ZERO_CHECKPOINT)?;

View File

@@ -65,6 +65,9 @@ static const struct config_enum_entry neon_compute_modes[] = {
/* GUCs */
char *neon_timeline;
char *neon_tenant;
char *neon_project_id;
char *neon_branch_id;
char *neon_endpoint_id;
int32 max_cluster_size;
char *page_server_connstring;
char *neon_auth_token;
@@ -1352,6 +1355,31 @@ pg_init_libpagestore(void)
0, /* no flags required */
check_neon_id, NULL, NULL);
DefineCustomStringVariable("neon.project_id",
"Neon project_id the server is running on",
NULL,
&neon_project_id,
"",
PGC_POSTMASTER,
0, /* no flags required */
check_neon_id, NULL, NULL);
DefineCustomStringVariable("neon.branch_id",
"Neon branch_id the server is running on",
NULL,
&neon_branch_id,
"",
PGC_POSTMASTER,
0, /* no flags required */
check_neon_id, NULL, NULL);
DefineCustomStringVariable("neon.endpoint_id",
"Neon endpoint_id the server is running on",
NULL,
&neon_endpoint_id,
"",
PGC_POSTMASTER,
0, /* no flags required */
check_neon_id, NULL, NULL);
DefineCustomIntVariable("neon.stripe_size",
"sharding stripe size",
NULL,

View File

@@ -99,6 +99,9 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
wp->config = config;
wp->api = api;
wp->state = WPS_COLLECTING_TERMS;
wp->mconf.generation = INVALID_GENERATION;
wp->mconf.members.len = 0;
wp->mconf.new_members.len = 0;
wp_log(LOG, "neon.safekeepers=%s", wp->config->safekeepers_list);
@@ -170,6 +173,8 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
if (wp->config->proto_version != 2 && wp->config->proto_version != 3)
wp_log(FATAL, "unsupported safekeeper protocol version %d", wp->config->proto_version);
if (wp->safekeepers_generation > INVALID_GENERATION && wp->config->proto_version < 3)
wp_log(FATAL, "enabling generations requires protocol version 3");
wp_log(LOG, "using safekeeper protocol version %d", wp->config->proto_version);
/* Fill the greeting package */
@@ -214,7 +219,7 @@ WalProposerFree(WalProposer *wp)
static bool
WalProposerGenerationsEnabled(WalProposer *wp)
{
return wp->safekeepers_generation != 0;
return wp->safekeepers_generation != INVALID_GENERATION;
}
/*
@@ -723,13 +728,176 @@ SendProposerGreeting(Safekeeper *sk)
BlockingWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_HANDSHAKE_RECV);
}
/*
* Assuming `sk` sent its node id, find such member(s) in wp->mconf and set ptr in
* members_safekeepers & new_members_safekeepers to sk.
*/
static void
UpdateMemberSafekeeperPtr(WalProposer *wp, Safekeeper *sk)
{
/* members_safekeepers etc are fixed size, sanity check mconf size */
if (wp->mconf.members.len > MAX_SAFEKEEPERS)
wp_log(FATAL, "too many members %d in mconf", wp->mconf.members.len);
if (wp->mconf.new_members.len > MAX_SAFEKEEPERS)
wp_log(FATAL, "too many new_members %d in mconf", wp->mconf.new_members.len);
/* node id is not known until greeting is received */
if (sk->state < SS_WAIT_VOTING)
return;
/* 0 is assumed to be invalid node id, should never happen */
if (sk->greetResponse.nodeId == 0)
{
wp_log(WARNING, "safekeeper %s:%s sent zero node id", sk->host, sk->port);
return;
}
for (uint32 i = 0; i < wp->mconf.members.len; i++)
{
SafekeeperId *sk_id = &wp->mconf.members.m[i];
if (wp->mconf.members.m[i].node_id == sk->greetResponse.nodeId)
{
/*
* If mconf or list of safekeepers to connect to changed (the
* latter always currently goes through restart though),
* ResetMemberSafekeeperPtrs is expected to be called before
* UpdateMemberSafekeeperPtr. So, other value suggests that we are
* connected to the same sk under different host name, complain
* about that.
*/
if (wp->members_safekeepers[i] != NULL && wp->members_safekeepers[i] != sk)
{
wp_log(WARNING, "safekeeper {id = %lu, ep = %s:%u } in members[%u] is already mapped to connection slot %lu",
sk_id->node_id, sk_id->host, sk_id->port, i, wp->members_safekeepers[i] - wp->safekeeper);
}
wp_log(LOG, "safekeeper {id = %lu, ep = %s:%u } in members[%u] mapped to connection slot %lu",
sk_id->node_id, sk_id->host, sk_id->port, i, sk - wp->safekeeper);
wp->members_safekeepers[i] = sk;
}
}
/* repeat for new_members */
for (uint32 i = 0; i < wp->mconf.new_members.len; i++)
{
SafekeeperId *sk_id = &wp->mconf.new_members.m[i];
if (wp->mconf.new_members.m[i].node_id == sk->greetResponse.nodeId)
{
if (wp->new_members_safekeepers[i] != NULL && wp->new_members_safekeepers[i] != sk)
{
wp_log(WARNING, "safekeeper {id = %lu, ep = %s:%u } in new_members[%u] is already mapped to connection slot %lu",
sk_id->node_id, sk_id->host, sk_id->port, i, wp->new_members_safekeepers[i] - wp->safekeeper);
}
wp_log(LOG, "safekeeper {id = %lu, ep = %s:%u } in new_members[%u] mapped to connection slot %lu",
sk_id->node_id, sk_id->host, sk_id->port, i, sk - wp->safekeeper);
wp->new_members_safekeepers[i] = sk;
}
}
}
/*
* Reset wp->members_safekeepers & new_members_safekeepers and refill them.
* Called after wp changes mconf.
*/
static void
ResetMemberSafekeeperPtrs(WalProposer *wp)
{
memset(&wp->members_safekeepers, 0, sizeof(Safekeeper *) * MAX_SAFEKEEPERS);
memset(&wp->new_members_safekeepers, 0, sizeof(Safekeeper *) * MAX_SAFEKEEPERS);
for (int i = 0; i < wp->n_safekeepers; i++)
{
if (wp->safekeeper[i].state >= SS_WAIT_VOTING)
UpdateMemberSafekeeperPtr(wp, &wp->safekeeper[i]);
}
}
static uint32
MsetQuorum(MemberSet *mset)
{
Assert(mset->len > 0);
return mset->len / 2 + 1;
}
/* Does n forms quorum in mset? */
static bool
MsetHasQuorum(MemberSet *mset, uint32 n)
{
return n >= MsetQuorum(mset);
}
/*
* TermsCollected helper for a single member set `mset`.
*
* `msk` is the member -> safekeeper mapping for mset, i.e. members_safekeepers
* or new_members_safekeepers.
*/
static bool
TermsCollectedMset(WalProposer *wp, MemberSet *mset, Safekeeper **msk, StringInfo s)
{
uint32 n_greeted = 0;
for (uint32 i = 0; i < wp->mconf.members.len; i++)
{
Safekeeper *sk = msk[i];
if (sk != NULL && sk->state == SS_WAIT_VOTING)
{
if (n_greeted > 0)
appendStringInfoString(s, ", ");
appendStringInfo(s, "{id = %lu, ep = %s:%s}", sk->greetResponse.nodeId, sk->host, sk->port);
n_greeted++;
}
}
appendStringInfo(s, ", %u/%u total", n_greeted, mset->len);
return MsetHasQuorum(mset, n_greeted);
}
/*
* Have we received greeting from enough (quorum) safekeepers to start voting?
*/
static bool
TermsCollected(WalProposer *wp)
{
return wp->n_connected >= wp->quorum;
StringInfoData s; /* str for logging */
bool collected = false;
/* legacy: generations disabled */
if (!WalProposerGenerationsEnabled(wp) && wp->mconf.generation == INVALID_GENERATION)
{
collected = wp->n_connected >= wp->quorum;
if (collected)
{
wp->propTerm++;
wp_log(LOG, "walproposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT ", starting voting", wp->quorum, wp->propTerm);
}
return collected;
}
/*
* With generations enabled, we start campaign only when 1) some mconf is
* actually received 2) we have greetings from majority of members as well
* as from majority of new_members if it exists.
*/
if (wp->mconf.generation == INVALID_GENERATION)
return false;
initStringInfo(&s);
appendStringInfoString(&s, "mset greeters: ");
if (!TermsCollectedMset(wp, &wp->mconf.members, wp->members_safekeepers, &s))
goto res;
if (wp->mconf.new_members.len > 0)
{
appendStringInfoString(&s, ", new_mset greeters: ");
if (!TermsCollectedMset(wp, &wp->mconf.new_members, wp->new_members_safekeepers, &s))
goto res;
}
wp->propTerm++;
wp_log(LOG, "walproposer connected to quorum of safekeepers: %s, propTerm=" INT64_FORMAT ", starting voting", s.data, wp->propTerm);
collected = true;
res:
pfree(s.data);
return collected;
}
static void
@@ -753,13 +921,41 @@ RecvAcceptorGreeting(Safekeeper *sk)
pfree(mconf_toml);
/*
* Adopt mconf of safekeepers if it is higher. TODO: mconf change should
* restart wp if it started voting.
* Adopt mconf of safekeepers if it is higher.
*/
if (sk->greetResponse.mconf.generation > wp->mconf.generation)
{
/* sanity check before adopting, should never happen */
if (sk->greetResponse.mconf.members.len == 0)
{
wp_log(FATAL, "mconf %u has zero members", sk->greetResponse.mconf.generation);
}
/*
* If we at least started campaign, restart wp to get elected in the
* new mconf. Note: in principle once wp is already elected
* re-election is not required, but being conservative here is not
* bad.
*
* TODO: put mconf to shmem to immediately pick it up on start,
* otherwise if some safekeeper(s) misses latest mconf and gets
* connected the first, it may cause redundant restarts here.
*
* More generally, it would be nice to restart walproposer (wiping
* election state) without restarting the process. In particular, that
* would allow sync-safekeepers not to die here if it intersected with
* sk migration (as well as remove 1s delay).
*
* Note that assign_neon_safekeepers also currently restarts the
* process, so during normal migration walproposer may restart twice.
*/
if (wp->state >= WPS_CAMPAIGN)
{
wp_log(FATAL, "restarting to adopt mconf generation %d", sk->greetResponse.mconf.generation);
}
MembershipConfigurationFree(&wp->mconf);
MembershipConfigurationCopy(&sk->greetResponse.mconf, &wp->mconf);
ResetMemberSafekeeperPtrs(wp);
/* full conf was just logged above */
wp_log(LOG, "changed mconf to generation %u", wp->mconf.generation);
}
@@ -767,6 +963,9 @@ RecvAcceptorGreeting(Safekeeper *sk)
/* Protocol is all good, move to voting. */
sk->state = SS_WAIT_VOTING;
/* In greeting safekeeper sent its id; update mappings accordingly. */
UpdateMemberSafekeeperPtr(wp, sk);
/*
* Note: it would be better to track the counter on per safekeeper basis,
* but at worst walproposer would restart with 'term rejected', so leave
@@ -778,12 +977,9 @@ RecvAcceptorGreeting(Safekeeper *sk)
/* We're still collecting terms from the majority. */
wp->propTerm = Max(sk->greetResponse.term, wp->propTerm);
/* Quorum is acquried, prepare the vote request. */
/* Quorum is acquired, prepare the vote request. */
if (TermsCollected(wp))
{
wp->propTerm++;
wp_log(LOG, "proposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT, wp->quorum, wp->propTerm);
wp->state = WPS_CAMPAIGN;
wp->voteRequest.pam.tag = 'v';
wp->voteRequest.generation = wp->mconf.generation;
@@ -832,8 +1028,8 @@ SendVoteRequest(Safekeeper *sk)
&sk->outbuf, wp->config->proto_version);
/* We have quorum for voting, send our vote request */
wp_log(LOG, "requesting vote from %s:%s for generation %u term " UINT64_FORMAT, sk->host, sk->port,
wp->voteRequest.generation, wp->voteRequest.term);
wp_log(LOG, "requesting vote from sk {id = %lu, ep = %s:%s} for generation %u term " UINT64_FORMAT,
sk->greetResponse.nodeId, sk->host, sk->port, wp->voteRequest.generation, wp->voteRequest.term);
/* On failure, logging & resetting is handled */
BlockingWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_WAIT_VERDICT);
/* If successful, wait for read-ready with SS_WAIT_VERDICT */
@@ -851,8 +1047,8 @@ RecvVoteResponse(Safekeeper *sk)
return;
wp_log(LOG,
"got VoteResponse from acceptor %s:%s, generation=%u, term=%lu, voteGiven=%u, last_log_term=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X",
sk->host, sk->port, sk->voteResponse.generation, sk->voteResponse.term,
"got VoteResponse from sk {id = %lu, ep = %s:%s}, generation=%u, term=%lu, voteGiven=%u, last_log_term=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X",
sk->greetResponse.nodeId, sk->host, sk->port, sk->voteResponse.generation, sk->voteResponse.term,
sk->voteResponse.voteGiven,
GetHighestTerm(&sk->voteResponse.termHistory),
LSN_FORMAT_ARGS(sk->voteResponse.flushLsn),
@@ -899,6 +1095,53 @@ RecvVoteResponse(Safekeeper *sk)
}
}
/*
* VotesCollected helper for a single member set `mset`.
*
* `msk` is the member -> safekeeper mapping for mset, i.e. members_safekeepers
* or new_members_safekeepers.
*/
static bool
VotesCollectedMset(WalProposer *wp, MemberSet *mset, Safekeeper **msk, StringInfo s)
{
uint32 n_votes = 0;
for (uint32 i = 0; i < wp->mconf.members.len; i++)
{
Safekeeper *sk = msk[i];
if (sk != NULL && sk->state == SS_WAIT_ELECTED)
{
Assert(sk->voteResponse.voteGiven);
/*
* Find the highest vote. NULL check is for the legacy case where
* safekeeper might be not initialized with LSN at all and return
* 0 LSN in the vote response; we still want to set donor to
* something in this case.
*/
if (GetLastLogTerm(sk) > wp->donorLastLogTerm ||
(GetLastLogTerm(sk) == wp->donorLastLogTerm &&
sk->voteResponse.flushLsn > wp->propTermStartLsn) ||
wp->donor == NULL)
{
wp->donorLastLogTerm = GetLastLogTerm(sk);
wp->propTermStartLsn = sk->voteResponse.flushLsn;
wp->donor = sk;
}
wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn);
if (n_votes > 0)
appendStringInfoString(s, ", ");
appendStringInfo(s, "{id = %lu, ep = %s:%s}", sk->greetResponse.nodeId, sk->host, sk->port);
n_votes++;
}
}
appendStringInfo(s, ", %u/%u total", n_votes, mset->len);
return MsetHasQuorum(mset, n_votes);
}
/*
* Checks if enough votes has been collected to get elected and if that's the
* case finds the highest vote, setting donor, donorLastLogTerm,
@@ -907,7 +1150,8 @@ RecvVoteResponse(Safekeeper *sk)
static bool
VotesCollected(WalProposer *wp)
{
int n_ready = 0;
StringInfoData s; /* str for logging */
bool collected = false;
/* assumed to be called only when not elected yet */
Assert(wp->state == WPS_CAMPAIGN);
@@ -916,25 +1160,62 @@ VotesCollected(WalProposer *wp)
wp->donorLastLogTerm = 0;
wp->truncateLsn = InvalidXLogRecPtr;
for (int i = 0; i < wp->n_safekeepers; i++)
/* legacy: generations disabled */
if (!WalProposerGenerationsEnabled(wp) && wp->mconf.generation == INVALID_GENERATION)
{
if (wp->safekeeper[i].state == SS_WAIT_ELECTED)
{
n_ready++;
int n_ready = 0;
if (GetLastLogTerm(&wp->safekeeper[i]) > wp->donorLastLogTerm ||
(GetLastLogTerm(&wp->safekeeper[i]) == wp->donorLastLogTerm &&
wp->safekeeper[i].voteResponse.flushLsn > wp->propTermStartLsn))
for (int i = 0; i < wp->n_safekeepers; i++)
{
if (wp->safekeeper[i].state == SS_WAIT_ELECTED)
{
wp->donorLastLogTerm = GetLastLogTerm(&wp->safekeeper[i]);
wp->propTermStartLsn = wp->safekeeper[i].voteResponse.flushLsn;
wp->donor = i;
n_ready++;
if (GetLastLogTerm(&wp->safekeeper[i]) > wp->donorLastLogTerm ||
(GetLastLogTerm(&wp->safekeeper[i]) == wp->donorLastLogTerm &&
wp->safekeeper[i].voteResponse.flushLsn > wp->propTermStartLsn) ||
wp->donor == NULL)
{
wp->donorLastLogTerm = GetLastLogTerm(&wp->safekeeper[i]);
wp->propTermStartLsn = wp->safekeeper[i].voteResponse.flushLsn;
wp->donor = &wp->safekeeper[i];
}
wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn);
}
wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn);
}
collected = n_ready >= wp->quorum;
if (collected)
{
wp_log(LOG, "walproposer elected with %d/%d votes", n_ready, wp->n_safekeepers);
}
return collected;
}
return n_ready >= wp->quorum;
/*
* if generations are enabled we're expected to get to voting only when
* mconf is established.
*/
Assert(wp->mconf.generation != INVALID_GENERATION);
/*
* We must get votes from both msets if both are present.
*/
initStringInfo(&s);
appendStringInfoString(&s, "mset voters: ");
if (!VotesCollectedMset(wp, &wp->mconf.members, wp->members_safekeepers, &s))
goto res;
if (wp->mconf.new_members.len > 0)
{
appendStringInfoString(&s, ", new_mset voters: ");
if (!VotesCollectedMset(wp, &wp->mconf.new_members, wp->new_members_safekeepers, &s))
goto res;
}
wp_log(LOG, "walproposer elected, %s", s.data);
collected = true;
res:
pfree(s.data);
return collected;
}
/*
@@ -955,7 +1236,7 @@ HandleElectedProposer(WalProposer *wp)
* that only for logical replication (and switching logical walsenders to
* neon_walreader is a todo.)
*/
if (!wp->api.recovery_download(wp, &wp->safekeeper[wp->donor]))
if (!wp->api.recovery_download(wp, wp->donor))
{
wp_log(FATAL, "failed to download WAL for logical replicaiton");
}
@@ -1078,7 +1359,7 @@ ProcessPropStartPos(WalProposer *wp)
/*
* Proposer's term history is the donor's + its own entry.
*/
dth = &wp->safekeeper[wp->donor].voteResponse.termHistory;
dth = &wp->donor->voteResponse.termHistory;
wp->propTermHistory.n_entries = dth->n_entries + 1;
wp->propTermHistory.entries = palloc(sizeof(TermSwitchEntry) * wp->propTermHistory.n_entries);
if (dth->n_entries > 0)
@@ -1086,11 +1367,10 @@ ProcessPropStartPos(WalProposer *wp)
wp->propTermHistory.entries[wp->propTermHistory.n_entries - 1].term = wp->propTerm;
wp->propTermHistory.entries[wp->propTermHistory.n_entries - 1].lsn = wp->propTermStartLsn;
wp_log(LOG, "got votes from majority (%d) of nodes, term " UINT64_FORMAT ", epochStartLsn %X/%X, donor %s:%s, truncate_lsn %X/%X",
wp->quorum,
wp_log(LOG, "walproposer elected in term " UINT64_FORMAT ", epochStartLsn %X/%X, donor %s:%s, truncate_lsn %X/%X",
wp->propTerm,
LSN_FORMAT_ARGS(wp->propTermStartLsn),
wp->safekeeper[wp->donor].host, wp->safekeeper[wp->donor].port,
wp->donor->host, wp->donor->port,
LSN_FORMAT_ARGS(wp->truncateLsn));
/*
@@ -1508,6 +1788,14 @@ RecvAppendResponses(Safekeeper *sk)
readAnything = true;
/* should never happen: sk is expected to send ERROR instead */
if (sk->appendResponse.generation != wp->mconf.generation)
{
wp_log(FATAL, "safekeeper {id = %lu, ep = %s:%s} sent response with generation %u, expected %u",
sk->greetResponse.nodeId, sk->host, sk->port,
sk->appendResponse.generation, wp->mconf.generation);
}
if (sk->appendResponse.term > wp->propTerm)
{
/*
@@ -1624,30 +1912,101 @@ CalculateMinFlushLsn(WalProposer *wp)
}
/*
* Calculate WAL position acknowledged by quorum
* GetAcknowledgedByQuorumWALPosition for a single member set `mset`.
*
* `msk` is the member -> safekeeper mapping for mset, i.e. members_safekeepers
* or new_members_safekeepers.
*/
static XLogRecPtr
GetAcknowledgedByQuorumWALPosition(WalProposer *wp)
GetCommittedMset(WalProposer *wp, MemberSet *mset, Safekeeper **msk)
{
XLogRecPtr responses[MAX_SAFEKEEPERS];
/*
* Sort acknowledged LSNs
* Ascending sort acknowledged LSNs.
*/
for (int i = 0; i < wp->n_safekeepers; i++)
Assert(mset->len <= MAX_SAFEKEEPERS);
for (uint32 i = 0; i < mset->len; i++)
{
Safekeeper *sk = msk[i];
/*
* Like in Raft, we aren't allowed to commit entries from previous
* terms, so ignore reported LSN until it gets to epochStartLsn.
* terms, so ignore reported LSN until it gets to propTermStartLsn.
*
* Note: we ignore sk state, which is ok: before first ack flushLsn is
* 0, and later we just preserve value across reconnections. It would
* be ok to check for SS_ACTIVE as well.
*/
responses[i] = wp->safekeeper[i].appendResponse.flushLsn >= wp->propTermStartLsn ? wp->safekeeper[i].appendResponse.flushLsn : 0;
if (sk != NULL && sk->appendResponse.flushLsn >= wp->propTermStartLsn)
{
responses[i] = sk->appendResponse.flushLsn;
}
else
{
responses[i] = 0;
}
}
qsort(responses, wp->n_safekeepers, sizeof(XLogRecPtr), CompareLsn);
qsort(responses, mset->len, sizeof(XLogRecPtr), CompareLsn);
/*
* Get the smallest LSN committed by quorum
* And get value committed by the quorum. A way to view this: to get the
* highest value committed on the quorum, in the ordered array we skip n -
* n_quorum elements to get to the first (lowest) value present on all sks
* of the highest quorum.
*/
return responses[wp->n_safekeepers - wp->quorum];
return responses[mset->len - MsetQuorum(mset)];
}
/*
* Calculate WAL position acknowledged by quorum, i.e. which may be regarded
* committed.
*
* Zero may be returned when there is no quorum of nodes recovered to term start
* lsn which sent feedback yet.
*/
static XLogRecPtr
GetAcknowledgedByQuorumWALPosition(WalProposer *wp)
{
XLogRecPtr committed;
/* legacy: generations disabled */
if (!WalProposerGenerationsEnabled(wp) && wp->mconf.generation == INVALID_GENERATION)
{
XLogRecPtr responses[MAX_SAFEKEEPERS];
/*
* Sort acknowledged LSNs
*/
for (int i = 0; i < wp->n_safekeepers; i++)
{
/*
* Like in Raft, we aren't allowed to commit entries from previous
* terms, so ignore reported LSN until it gets to
* propTermStartLsn.
*
* Note: we ignore sk state, which is ok: before first ack
* flushLsn is 0, and later we just preserve value across
* reconnections. It would be ok to check for SS_ACTIVE as well.
*/
responses[i] = wp->safekeeper[i].appendResponse.flushLsn >= wp->propTermStartLsn ? wp->safekeeper[i].appendResponse.flushLsn : 0;
}
qsort(responses, wp->n_safekeepers, sizeof(XLogRecPtr), CompareLsn);
/*
* Get the smallest LSN committed by quorum
*/
return responses[wp->n_safekeepers - wp->quorum];
}
committed = GetCommittedMset(wp, &wp->mconf.members, wp->members_safekeepers);
if (wp->mconf.new_members.len > 0)
{
XLogRecPtr new_mset_committed = GetCommittedMset(wp, &wp->mconf.new_members, wp->new_members_safekeepers);
committed = Min(committed, new_mset_committed);
}
return committed;
}
/*
@@ -1662,7 +2021,7 @@ UpdateDonorShmem(WalProposer *wp)
int i;
XLogRecPtr donor_lsn = InvalidXLogRecPtr;
if (wp->n_votes < wp->quorum)
if (wp->state < WPS_ELECTED)
{
wp_log(WARNING, "UpdateDonorShmem called before elections are won");
return;
@@ -1673,9 +2032,9 @@ UpdateDonorShmem(WalProposer *wp)
* about its position immediately after election before any feedbacks are
* sent.
*/
if (wp->safekeeper[wp->donor].state >= SS_WAIT_ELECTED)
if (wp->donor->state >= SS_WAIT_ELECTED)
{
donor = &wp->safekeeper[wp->donor];
donor = wp->donor;
donor_lsn = wp->propTermStartLsn;
}
@@ -1746,13 +2105,13 @@ HandleSafekeeperResponse(WalProposer *wp, Safekeeper *fromsk)
}
/*
* Generally sync is done when majority switched the epoch so we committed
* epochStartLsn and made the majority aware of it, ensuring they are
* ready to give all WAL to pageserver. It would mean whichever majority
* is alive, there will be at least one safekeeper who is able to stream
* WAL to pageserver to make basebackup possible. However, since at the
* moment we don't have any good mechanism of defining the healthy and
* most advanced safekeeper who should push the wal into pageserver and
* Generally sync is done when majority reached propTermStartLsn so we
* committed it and made the majority aware of it, ensuring they are ready
* to give all WAL to pageserver. It would mean whichever majority is
* alive, there will be at least one safekeeper who is able to stream WAL
* to pageserver to make basebackup possible. However, since at the moment
* we don't have any good mechanism of defining the healthy and most
* advanced safekeeper who should push the wal into pageserver and
* basically the random one gets connected, to prevent hanging basebackup
* (due to pageserver connecting to not-synced-safekeeper) we currently
* wait for all seemingly alive safekeepers to get synced.
@@ -1774,7 +2133,7 @@ HandleSafekeeperResponse(WalProposer *wp, Safekeeper *fromsk)
n_synced++;
}
if (n_synced >= wp->quorum)
if (newCommitLsn >= wp->propTermStartLsn)
{
/* A quorum of safekeepers has been synced! */

View File

@@ -145,6 +145,7 @@ typedef uint64 NNodeId;
* This and following structs pair ones in membership.rs.
*/
typedef uint32 Generation;
#define INVALID_GENERATION 0
typedef struct SafekeeperId
{
@@ -771,7 +772,17 @@ typedef struct WalProposer
/* Current walproposer membership configuration */
MembershipConfiguration mconf;
/* (n_safekeepers / 2) + 1 */
/*
* Parallels mconf.members with pointers to the member's slot in
* safekeepers array of connections, or NULL if such member is not
* connected. Helps to avoid looking slot per id through all
* .safekeepers[] when doing quorum checks.
*/
Safekeeper *members_safekeepers[MAX_SAFEKEEPERS];
/* As above, but for new_members. */
Safekeeper *new_members_safekeepers[MAX_SAFEKEEPERS];
/* (n_safekeepers / 2) + 1. Used for static pre-generations quorum checks. */
int quorum;
/*
@@ -829,7 +840,7 @@ typedef struct WalProposer
term_t donorLastLogTerm;
/* Most advanced acceptor */
int donor;
Safekeeper *donor;
/* timeline globally starts at this LSN */
XLogRecPtr timelineStartLsn;

View File

@@ -226,9 +226,6 @@ struct Args {
/// Path to the JWT auth token used to authenticate with other safekeepers.
#[arg(long)]
auth_token_path: Option<Utf8PathBuf>,
#[arg(long, help = "Run in development mode (disables security checks)")]
dev: bool,
}
// Like PathBufValueParser, but allows empty string.
@@ -346,21 +343,6 @@ async fn main() -> anyhow::Result<()> {
}
};
if !args.dev {
let http_auth_enabled = args.http_auth_public_key_path.is_some();
let pg_auth_enabled = args.pg_auth_public_key_path.is_some();
let pg_tenant_only_auth_enabled = args.pg_tenant_only_auth_public_key_path.is_some();
if !http_auth_enabled || !pg_auth_enabled || !pg_tenant_only_auth_enabled {
bail!(
"Safekeeper refuses to start with HTTP, PostgreSQL, or tenant-only PostgreSQL API authentication disabled.\n\
Run with --dev to allow running without authentication.\n\
This is insecure and should only be used in development environments."
);
}
} else {
warn!("Starting in dev mode: this may be an insecure configuration.");
}
// Load JWT auth token to connect to other safekeepers for pull_timeline.
// First check if the env var is present, then check the arg with the path.
// We want to deprecate and remove the env var method in the future.

View File

@@ -31,6 +31,7 @@ pub async fn task_main_https(
global_timelines: Arc<GlobalTimelines>,
) -> anyhow::Result<()> {
let cert_resolver = ReloadingCertificateResolver::new(
"main",
&conf.ssl_key_file,
&conf.ssl_cert_file,
conf.ssl_cert_reload_period,

View File

@@ -22,6 +22,7 @@ use pageserver_api::controller_api::{
MetadataHealthListUnhealthyResponse, MetadataHealthUpdateRequest, MetadataHealthUpdateResponse,
NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, SafekeeperSchedulingPolicyRequest,
ShardsPreferredAzsRequest, TenantCreateRequest, TenantPolicyRequest, TenantShardMigrateRequest,
TimelineImportRequest,
};
use pageserver_api::models::{
DetachBehavior, LsnLeaseRequest, TenantConfigPatchRequest, TenantConfigRequest,
@@ -1235,8 +1236,18 @@ async fn handle_step_down(req: Request<Body>) -> Result<Response<Body>, ApiError
ForwardOutcome::NotForwarded(req) => req,
};
let state = get_state(&req);
json_response(StatusCode::OK, state.service.step_down().await)
// Spawn a background task: once we start stepping down, we must finish: if the client drops
// their request we should avoid stopping in some part-stepped-down state.
let handle = tokio::spawn(async move {
let state = get_state(&req);
state.service.step_down().await
});
let result = handle
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
json_response(StatusCode::OK, result)
}
async fn handle_tenant_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
@@ -1276,6 +1287,37 @@ async fn handle_tenant_import(req: Request<Body>) -> Result<Response<Body>, ApiE
)
}
async fn handle_timeline_import(req: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
check_permissions(&req, Scope::PageServerApi)?;
maybe_rate_limit(&req, tenant_id).await;
let mut req = match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => {
return res;
}
ForwardOutcome::NotForwarded(req) => req,
};
let import_req = json_request::<TimelineImportRequest>(&mut req).await?;
let state = get_state(&req);
if import_req.tenant_id != tenant_id || import_req.timeline_id != timeline_id {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"tenant id or timeline id mismatch: url={tenant_id}/{timeline_id}, body={}/{}",
import_req.tenant_id,
import_req.timeline_id
)));
}
json_response(
StatusCode::OK,
state.service.timeline_import(import_req).await?,
)
}
async fn handle_tenants_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
@@ -1949,6 +1991,16 @@ pub fn make_router(
RequestName("debug_v1_tenant_locate"),
)
})
.post(
"/debug/v1/tenant/:tenant_id/timeline/:timeline_id/import",
|r| {
named_request_span(
r,
handle_timeline_import,
RequestName("debug_v1_timeline_import"),
)
},
)
.get("/debug/v1/scheduler", |r| {
named_request_span(r, handle_scheduler_dump, RequestName("debug_v1_scheduler"))
})

View File

@@ -472,6 +472,7 @@ async fn async_main() -> anyhow::Result<()> {
let https_listener = tcp_listener::bind(https_addr)?;
let resolver = ReloadingCertificateResolver::new(
"main",
&args.ssl_key_file,
&args.ssl_cert_file,
*args.ssl_cert_reload_period,

View File

@@ -61,7 +61,7 @@ use utils::completion::Barrier;
use utils::generation::Generation;
use utils::id::{NodeId, TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::sync::gate::Gate;
use utils::sync::gate::{Gate, GateGuard};
use utils::{failpoint_support, pausable_failpoint};
use crate::background_node_operations::{
@@ -594,6 +594,8 @@ struct TenantShardSplitAbort {
new_stripe_size: Option<ShardStripeSize>,
/// Until this abort op is complete, no other operations may be done on the tenant
_tenant_lock: TracingExclusiveGuard<TenantOperations>,
/// The reconciler gate for the duration of the split operation, and any included abort.
_gate: GateGuard,
}
#[derive(thiserror::Error, Debug)]
@@ -1460,7 +1462,7 @@ impl Service {
// Retry until shutdown: we must keep this request object alive until it is properly
// processed, as it holds a lock guard that prevents other operations trying to do things
// to the tenant while it is in a weird part-split state.
while !self.cancel.is_cancelled() {
while !self.reconcilers_cancel.is_cancelled() {
match self.abort_tenant_shard_split(&op).await {
Ok(_) => break,
Err(e) => {
@@ -1473,9 +1475,12 @@ impl Service {
// when we retry, so that the abort op will succeed. If the abort op is failing
// for some other reason, we will keep retrying forever, or until a human notices
// and does something about it (either fixing a pageserver or restarting the controller).
tokio::time::timeout(Duration::from_secs(5), self.cancel.cancelled())
.await
.ok();
tokio::time::timeout(
Duration::from_secs(5),
self.reconcilers_cancel.cancelled(),
)
.await
.ok();
}
}
}
@@ -1847,6 +1852,7 @@ impl Service {
};
if insert {
let config = attach_req.config.clone().unwrap_or_default();
let tsp = TenantShardPersistence {
tenant_id: attach_req.tenant_shard_id.tenant_id.to_string(),
shard_number: attach_req.tenant_shard_id.shard_number.0 as i32,
@@ -1855,7 +1861,7 @@ impl Service {
generation: attach_req.generation_override.or(Some(0)),
generation_pageserver: None,
placement_policy: serde_json::to_string(&PlacementPolicy::Attached(0)).unwrap(),
config: serde_json::to_string(&TenantConfig::default()).unwrap(),
config: serde_json::to_string(&config).unwrap(),
splitting: SplitState::default(),
scheduling_policy: serde_json::to_string(&ShardSchedulingPolicy::default())
.unwrap(),
@@ -1878,16 +1884,16 @@ impl Service {
Ok(()) => {
tracing::info!("Inserted shard {} in database", attach_req.tenant_shard_id);
let mut locked = self.inner.write().unwrap();
locked.tenants.insert(
let mut shard = TenantShard::new(
attach_req.tenant_shard_id,
TenantShard::new(
attach_req.tenant_shard_id,
ShardIdentity::unsharded(),
PlacementPolicy::Attached(0),
None,
),
ShardIdentity::unsharded(),
PlacementPolicy::Attached(0),
None,
);
shard.config = config;
let mut locked = self.inner.write().unwrap();
locked.tenants.insert(attach_req.tenant_shard_id, shard);
tracing::info!("Inserted shard {} in memory", attach_req.tenant_shard_id);
}
}
@@ -1972,11 +1978,12 @@ impl Service {
.set_attached(scheduler, attach_req.node_id);
tracing::info!(
"attach_hook: tenant {} set generation {:?}, pageserver {}",
"attach_hook: tenant {} set generation {:?}, pageserver {}, config {:?}",
attach_req.tenant_shard_id,
tenant_shard.generation,
// TODO: this is an odd number of 0xf's
attach_req.node_id.unwrap_or(utils::id::NodeId(0xfffffff))
attach_req.node_id.unwrap_or(utils::id::NodeId(0xfffffff)),
attach_req.config,
);
// Trick the reconciler into not doing anything for this tenant: this helps
@@ -4910,7 +4917,7 @@ impl Service {
1,
10,
Duration::from_secs(5),
&self.cancel,
&self.reconcilers_cancel,
)
.await
{
@@ -5161,6 +5168,11 @@ impl Service {
)
.await;
let _gate = self
.reconcilers_gate
.enter()
.map_err(|_| ApiError::ShuttingDown)?;
let new_shard_count = ShardCount::new(split_req.new_shard_count);
let new_stripe_size = split_req.new_stripe_size;
@@ -5188,6 +5200,7 @@ impl Service {
new_shard_count,
new_stripe_size,
_tenant_lock,
_gate,
})
// Ignore error sending: that just means we're shutting down: aborts are ephemeral so it's fine to drop it.
.ok();
@@ -5527,7 +5540,10 @@ impl Service {
"failpoint".to_string()
)));
failpoint_support::sleep_millis_async!("shard-split-post-remote-sleep", &self.cancel);
failpoint_support::sleep_millis_async!(
"shard-split-post-remote-sleep",
&self.reconcilers_cancel
);
tracing::info!(
"Split {} into {}",
@@ -5585,7 +5601,7 @@ impl Service {
stripe_size,
preferred_az: preferred_az_id.as_ref().map(Cow::Borrowed),
},
&self.cancel,
&self.reconcilers_cancel,
)
.await
{
@@ -8670,9 +8686,24 @@ impl Service {
failpoint_support::sleep_millis_async!("sleep-on-step-down-handling");
self.inner.write().unwrap().step_down();
// TODO: would it make sense to have a time-out for this?
self.stop_reconciliations(StopReconciliationsReason::SteppingDown)
.await;
// Wait for reconciliations to stop, or terminate this process if they
// fail to stop in time (this indicates a bug in shutdown)
tokio::select! {
_ = self.stop_reconciliations(StopReconciliationsReason::SteppingDown) => {
tracing::info!("Reconciliations stopped, proceeding with step down");
}
_ = async {
failpoint_support::sleep_millis_async!("step-down-delay-timeout");
tokio::time::sleep(Duration::from_secs(10)).await
} => {
tracing::warn!("Step down timed out while waiting for reconciliation gate, terminating process");
// The caller may proceed to act as leader when it sees this request fail: reduce the chance
// of a split-brain situation by terminating this controller instead of leaving it up in a partially-shut-down state.
std::process::exit(1);
}
}
let mut global_observed = GlobalObservedState::default();
let locked = self.inner.read().unwrap();

View File

@@ -12,13 +12,16 @@ use crate::persistence::{
use crate::safekeeper::Safekeeper;
use anyhow::Context;
use http_utils::error::ApiError;
use pageserver_api::controller_api::{SafekeeperDescribeResponse, SkSchedulingPolicy};
use pageserver_api::controller_api::{
SafekeeperDescribeResponse, SkSchedulingPolicy, TimelineImportRequest,
};
use pageserver_api::models::{self, SafekeeperInfo, SafekeepersInfo, TimelineInfo};
use safekeeper_api::membership::{MemberSet, SafekeeperId};
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use utils::id::{NodeId, TenantId, TimelineId};
use utils::logging::SecretString;
use utils::lsn::Lsn;
use super::Service;
@@ -298,6 +301,31 @@ impl Service {
timeline_id,
})
}
/// Directly insert the timeline into the database without reconciling it with safekeepers.
///
/// Useful if the timeline already exists on the specified safekeepers,
/// but we want to make it storage controller managed.
pub(crate) async fn timeline_import(&self, req: TimelineImportRequest) -> Result<(), ApiError> {
let persistence = TimelinePersistence {
tenant_id: req.tenant_id.to_string(),
timeline_id: req.timeline_id.to_string(),
start_lsn: Lsn::INVALID.into(),
generation: 1,
sk_set: req.sk_set.iter().map(|sk_id| sk_id.0 as i64).collect(),
new_sk_set: None,
cplane_notified_generation: 1,
deleted_at: None,
};
let inserted = self.persistence.insert_timeline(persistence).await?;
if inserted {
tracing::info!("imported timeline into db");
} else {
tracing::info!("didn't import timeline into db, as it is already present in db");
}
Ok(())
}
/// Perform timeline deletion on safekeepers. Will return success: we persist the deletion into the reconciler.
pub(super) async fn tenant_timeline_delete_safekeepers(
self: &Arc<Self>,

View File

@@ -14,6 +14,7 @@ import threading
import time
import uuid
from collections import defaultdict
from collections.abc import Mapping
from contextlib import closing, contextmanager
from dataclasses import dataclass
from datetime import datetime
@@ -79,7 +80,12 @@ from fixtures.remote_storage import (
default_remote_storage,
remote_storage_to_toml_dict,
)
from fixtures.safekeeper.http import SafekeeperHttpClient
from fixtures.safekeeper.http import (
MembershipConfiguration,
SafekeeperHttpClient,
SafekeeperId,
TimelineCreateRequest,
)
from fixtures.safekeeper.utils import wait_walreceivers_absent
from fixtures.utils import (
ATTACHMENT_NAME_REGEX,
@@ -1980,10 +1986,13 @@ class NeonStorageController(MetricsGetter, LogUtils):
tenant_shard_id: TenantId | TenantShardId,
pageserver_id: int,
generation_override: int | None = None,
config: None | dict[str, Any] = None,
) -> int:
body = {"tenant_shard_id": str(tenant_shard_id), "node_id": pageserver_id}
if generation_override is not None:
body["generation_override"] = generation_override
if config is not None:
body["config"] = config
response = self.request(
"POST",
@@ -2878,13 +2887,14 @@ class NeonPageserver(PgProtocol, LogUtils):
self,
immediate: bool = False,
timeout_in_seconds: int | None = None,
extra_env_vars: dict[str, str] | None = None,
):
"""
High level wrapper for restart: restarts the process, and waits for
tenant state to stabilize.
"""
self.stop(immediate=immediate)
self.start(timeout_in_seconds=timeout_in_seconds)
self.start(timeout_in_seconds=timeout_in_seconds, extra_env_vars=extra_env_vars)
self.quiesce_tenants()
def quiesce_tenants(self):
@@ -2973,11 +2983,12 @@ class NeonPageserver(PgProtocol, LogUtils):
to call into the pageserver HTTP client.
"""
client = self.http_client()
if generation is None:
generation = self.env.storage_controller.attach_hook_issue(tenant_id, self.id)
elif override_storage_controller_generation:
if generation is None or override_storage_controller_generation:
generation = self.env.storage_controller.attach_hook_issue(
tenant_id, self.id, generation
tenant_id,
self.id,
generation_override=generation if override_storage_controller_generation else None,
config=config,
)
return client.tenant_attach(
tenant_id,
@@ -4291,28 +4302,29 @@ class Endpoint(PgProtocol, LogUtils):
def respec_deep(self, **kwargs: Any) -> None:
"""
Update the endpoint.json file taking into account nested keys.
It does one level deep update. Should enough for most cases.
Distinct method from respec() to do not break existing functionality.
Update the spec.json file taking into account nested keys.
Distinct method from respec() to not break existing functionality.
NOTE: This method also updates the spec.json file, not endpoint.json.
We need it because neon_local also writes to spec.json, so intended
use-case is i) start endpoint with some config, ii) respec_deep(),
iii) call reconfigure() to apply the changes.
"""
def update(curr, patch):
for k, v in patch.items():
if isinstance(v, Mapping):
curr[k] = update(curr.get(k, {}), v)
else:
curr[k] = v
return curr
config_path = os.path.join(self.endpoint_path(), "spec.json")
with open(config_path) as f:
data_dict: dict[str, Any] = json.load(f)
log.debug("Current compute spec: %s", json.dumps(data_dict, indent=4))
for key, value in kwargs.items():
if isinstance(value, dict):
if key not in data_dict:
data_dict[key] = value
else:
data_dict[key] = {**data_dict[key], **value}
else:
data_dict[key] = value
update(data_dict, kwargs)
with open(config_path, "w") as file:
log.debug("Updating compute spec to: %s", json.dumps(data_dict, indent=4))
@@ -4839,6 +4851,50 @@ class Safekeeper(LogUtils):
wait_until(paused)
@staticmethod
def sks_to_safekeeper_ids(sks: list[Safekeeper]) -> list[SafekeeperId]:
return [SafekeeperId(sk.id, "localhost", sk.port.pg_tenant_only) for sk in sks]
@staticmethod
def mconf_sks(env: NeonEnv, mconf: MembershipConfiguration) -> list[Safekeeper]:
"""
List of Safekeepers which are members in `mconf`.
"""
members_ids = [m.id for m in mconf.members]
new_members_ids = [m.id for m in mconf.new_members] if mconf.new_members is not None else []
return [sk for sk in env.safekeepers if sk.id in members_ids or sk.id in new_members_ids]
@staticmethod
def create_timeline(
tenant_id: TenantId,
timeline_id: TimelineId,
ps: NeonPageserver,
mconf: MembershipConfiguration,
members_sks: list[Safekeeper],
):
"""
Manually create timeline on safekeepers with given (presumably inital)
mconf: figure out LSN from pageserver, bake request and execute it on
given safekeepers.
Normally done by storcon, but some tests want to do it manually so far.
"""
ps_http_cli = ps.http_client()
# figure out initial LSN.
ps_timeline_detail = ps_http_cli.timeline_detail(tenant_id, timeline_id)
init_lsn = ps_timeline_detail["last_record_lsn"]
log.info(f"initial LSN: {init_lsn}")
# sk timeline creation request expects minor version
pg_version = ps_timeline_detail["pg_version"] * 10000
# create inital mconf
create_r = TimelineCreateRequest(
tenant_id, timeline_id, mconf, pg_version, Lsn(init_lsn), commit_lsn=None
)
log.info(f"sending timeline create: {create_r.to_json()}")
for sk in members_sks:
sk.http_client().timeline_create(create_r)
class NeonBroker(LogUtils):
"""An object managing storage_broker instance"""

View File

@@ -126,8 +126,6 @@ DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS = [
".*startup_reconcile: Could not scan node.*",
# Tests run in dev mode
".*Starting in dev mode.*",
".*Starting in dev mode - authentication security checks are disabled.*",
".*Starting in dev mode: this may be an insecure configuration.*",
# Tests that stop endpoints & use the storage controller's neon_local notification
# mechanism might fail (neon_local's stopping and endpoint isn't atomic wrt the storage
# controller's attempts to notify the endpoint).

View File

@@ -65,13 +65,11 @@ def single_timeline(
assert ps_http.tenant_list() == []
def attach(tenant):
# NB: create the new tenant in the storage controller with the correct tenant config. This
# will pick up the existing tenant data from remote storage. If we just attach it to the
# Pageserver, the storage controller will reset the tenant config to the default.
env.create_tenant(
tenant_id=tenant,
timeline_id=template_timeline,
conf=template_config,
env.pageserver.tenant_attach(
tenant,
config=template_config,
generation=100,
override_storage_controller_generation=True,
)
with concurrent.futures.ThreadPoolExecutor(max_workers=22) as executor:

View File

@@ -25,7 +25,7 @@ class Walreceiver:
@dataclass
class SafekeeperTimelineStatus:
mconf: Configuration | None
mconf: MembershipConfiguration | None
term: int
last_log_term: int
pg_version: int # Not exactly a PgVersion, safekeeper returns version as int, for example 150002 for 15.2
@@ -78,17 +78,17 @@ class SafekeeperId:
@dataclass
class Configuration:
class MembershipConfiguration:
generation: int
members: list[SafekeeperId]
new_members: list[SafekeeperId] | None
@classmethod
def from_json(cls, d: dict[str, Any]) -> Configuration:
def from_json(cls, d: dict[str, Any]) -> MembershipConfiguration:
generation = d["generation"]
members = d["members"]
new_members = d.get("new_members")
return Configuration(generation, members, new_members)
return MembershipConfiguration(generation, members, new_members)
def to_json(self) -> str:
return json.dumps(self, cls=EnhancedJSONEncoder)
@@ -98,7 +98,7 @@ class Configuration:
class TimelineCreateRequest:
tenant_id: TenantId
timeline_id: TimelineId
mconf: Configuration
mconf: MembershipConfiguration
# not exactly PgVersion, for example 150002 for 15.2
pg_version: int
start_lsn: Lsn
@@ -110,13 +110,13 @@ class TimelineCreateRequest:
@dataclass
class TimelineMembershipSwitchResponse:
previous_conf: Configuration
current_conf: Configuration
previous_conf: MembershipConfiguration
current_conf: MembershipConfiguration
@classmethod
def from_json(cls, d: dict[str, Any]) -> TimelineMembershipSwitchResponse:
previous_conf = Configuration.from_json(d["previous_conf"])
current_conf = Configuration.from_json(d["current_conf"])
previous_conf = MembershipConfiguration.from_json(d["previous_conf"])
current_conf = MembershipConfiguration.from_json(d["current_conf"])
return TimelineMembershipSwitchResponse(previous_conf, current_conf)
@@ -194,7 +194,7 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
resj = res.json()
walreceivers = [Walreceiver(wr["conn_id"], wr["status"]) for wr in resj["walreceivers"]]
# It is always normally not None, it is allowed only to make forward compat tests happy.
mconf = Configuration.from_json(resj["mconf"]) if "mconf" in resj else None
mconf = MembershipConfiguration.from_json(resj["mconf"]) if "mconf" in resj else None
return SafekeeperTimelineStatus(
mconf=mconf,
term=resj["acceptor_state"]["term"],
@@ -223,7 +223,9 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
return self.timeline_status(tenant_id, timeline_id).commit_lsn
# Get timeline membership configuration.
def get_membership(self, tenant_id: TenantId, timeline_id: TimelineId) -> Configuration:
def get_membership(
self, tenant_id: TenantId, timeline_id: TimelineId
) -> MembershipConfiguration:
# make mypy happy
return self.timeline_status(tenant_id, timeline_id).mconf # type: ignore
@@ -275,7 +277,7 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
return res_json
def timeline_exclude(
self, tenant_id: TenantId, timeline_id: TimelineId, to: Configuration
self, tenant_id: TenantId, timeline_id: TimelineId, to: MembershipConfiguration
) -> dict[str, Any]:
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/exclude",
@@ -287,7 +289,7 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
return res_json
def membership_switch(
self, tenant_id: TenantId, timeline_id: TimelineId, to: Configuration
self, tenant_id: TenantId, timeline_id: TimelineId, to: MembershipConfiguration
) -> TimelineMembershipSwitchResponse:
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/membership",

View File

@@ -66,11 +66,11 @@ def test_basebackup_with_high_slru_count(
n_txns = 500000
def setup_wrapper(env: NeonEnv):
return setup_tenant_template(env, n_txns)
env = setup_pageserver_with_tenants(
neon_env_builder, f"large_slru_count-{n_tenants}-{n_txns}", n_tenants, setup_wrapper
neon_env_builder,
f"large_slru_count-{n_tenants}-{n_txns}",
n_tenants,
lambda env: setup_tenant_template(env, n_txns),
)
run_benchmark(env, pg_bin, record, duration)
@@ -80,10 +80,6 @@ def setup_tenant_template(env: NeonEnv, n_txns: int):
"gc_period": "0s", # disable periodic gc
"checkpoint_timeout": "10 years",
"compaction_period": "0s", # disable periodic compaction
"compaction_threshold": 10,
"compaction_target_size": 134217728,
"checkpoint_distance": 268435456,
"image_creation_threshold": 3,
}
template_tenant, template_timeline = env.create_tenant(set_default=True)

View File

@@ -1,5 +1,8 @@
import concurrent.futures
import dataclasses
import json
import re
import threading
import time
from dataclasses import dataclass
from pathlib import Path
@@ -31,15 +34,15 @@ class PageServicePipeliningConfigPipelined(PageServicePipeliningConfig):
mode: str = "pipelined"
EXECUTION = ["concurrent-futures", "tasks"]
EXECUTION = ["concurrent-futures"]
NON_BATCHABLE: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]
for max_batch_size in [1, 32]:
for execution in EXECUTION:
NON_BATCHABLE.append(PageServicePipeliningConfigPipelined(max_batch_size, execution))
BATCHABLE: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]
for max_batch_size in [1, 2, 4, 8, 16, 32]:
BATCHABLE: list[PageServicePipeliningConfig] = []
for max_batch_size in [32]:
for execution in EXECUTION:
BATCHABLE.append(PageServicePipeliningConfigPipelined(max_batch_size, execution))
@@ -47,19 +50,6 @@ for max_batch_size in [1, 2, 4, 8, 16, 32]:
@pytest.mark.parametrize(
"tablesize_mib, pipelining_config, target_runtime, effective_io_concurrency, readhead_buffer_size, name",
[
# non-batchable workloads
# (A separate benchmark will consider latency).
*[
(
50,
config,
TARGET_RUNTIME,
1,
128,
f"not batchable {dataclasses.asdict(config)}",
)
for config in NON_BATCHABLE
],
# batchable workloads should show throughput and CPU efficiency improvements
*[
(
@@ -137,7 +127,14 @@ def test_throughput(
env = neon_env_builder.init_start()
ps_http = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main")
endpoint = env.endpoints.create_start(
"main",
config_lines=[
# minimal lfc & small shared buffers to force requests to pageserver
"neon.max_file_cache_size=1MB",
"shared_buffers=10MB",
],
)
conn = endpoint.connect()
cur = conn.cursor()
@@ -155,7 +152,6 @@ def test_throughput(
tablesize = tablesize_mib * 1024 * 1024
npages = tablesize // (8 * 1024)
cur.execute("INSERT INTO t SELECT generate_series(1, %s)", (npages,))
# TODO: can we force postgres to do sequential scans?
#
# Run the workload, collect `Metrics` before and after, calculate difference, normalize.
@@ -211,31 +207,73 @@ def test_throughput(
).value,
)
def workload() -> Metrics:
def workload(disruptor_started: threading.Event) -> Metrics:
disruptor_started.wait()
start = time.time()
iters = 0
while time.time() - start < target_runtime or iters < 2:
log.info("Seqscan %d", iters)
if iters == 1:
# round zero for warming up
before = get_metrics()
cur.execute(
"select clear_buffer_cache()"
) # TODO: what about LFC? doesn't matter right now because LFC isn't enabled by default in tests
cur.execute("select sum(data::bigint) from t")
assert cur.fetchall()[0][0] == npages * (npages + 1) // 2
iters += 1
after = get_metrics()
return (after - before).normalize(iters - 1)
def disruptor(disruptor_started: threading.Event, stop_disruptor: threading.Event):
conn = endpoint.connect()
cur = conn.cursor()
iters = 0
while True:
cur.execute("SELECT pg_logical_emit_message(true, 'test', 'advancelsn')")
if stop_disruptor.is_set():
break
disruptor_started.set()
iters += 1
time.sleep(0.001)
return iters
env.pageserver.patch_config_toml_nonrecursive(
{"page_service_pipelining": dataclasses.asdict(pipelining_config)}
)
env.pageserver.restart()
metrics = workload()
# set trace for log analysis below
env.pageserver.restart(extra_env_vars={"RUST_LOG": "info,pageserver::page_service=trace"})
log.info("Starting workload")
with concurrent.futures.ThreadPoolExecutor() as executor:
disruptor_started = threading.Event()
stop_disruptor = threading.Event()
disruptor_fut = executor.submit(disruptor, disruptor_started, stop_disruptor)
workload_fut = executor.submit(workload, disruptor_started)
metrics = workload_fut.result()
stop_disruptor.set()
ndisruptions = disruptor_fut.result()
log.info("Disruptor issued %d disrupting requests", ndisruptions)
log.info("Results: %s", metrics)
since_last_start: list[str] = []
for line in env.pageserver.logfile.read_text().splitlines():
if "git:" in line:
since_last_start = []
since_last_start.append(line)
stopping_batching_because_re = re.compile(
r"stopping batching because (LSN changed|of batch size|timeline object mismatch|batch key changed|same page was requested at different LSNs|.*)"
)
reasons_for_stopping_batching = {}
for line in since_last_start:
match = stopping_batching_because_re.search(line)
if match:
if match.group(1) not in reasons_for_stopping_batching:
reasons_for_stopping_batching[match.group(1)] = 0
reasons_for_stopping_batching[match.group(1)] += 1
log.info("Reasons for stopping batching: %s", reasons_for_stopping_batching)
#
# Sanity-checks on the collected data
#

View File

@@ -1,6 +1,5 @@
from __future__ import annotations
import copy
import json
import uuid
from typing import TYPE_CHECKING
@@ -16,7 +15,6 @@ if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin
@pytest.mark.skip("See https://github.com/neondatabase/neon/issues/11395")
def test_pageserver_getpage_throttle(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
env = neon_env_builder.init_start()
@@ -44,7 +42,6 @@ def test_pageserver_getpage_throttle(neon_env_builder: NeonEnvBuilder, pg_bin: P
"refill_interval": "100ms",
"refill_amount": int(rate_limit_rps / 10),
"max": int(rate_limit_rps / 10),
"fair": True,
},
},
)
@@ -98,17 +95,12 @@ def test_pageserver_getpage_throttle(neon_env_builder: NeonEnvBuilder, pg_bin: P
_, marker_offset = wait_until(lambda: env.pageserver.assert_log_contains(marker, offset=None))
log.info("run pagebench")
duration_secs = 10
duration_secs = 20
actual_ncompleted = run_pagebench_at_max_speed_and_get_total_requests_completed(duration_secs)
log.info("validate the client is capped at the configured rps limit")
expect_ncompleted = duration_secs * rate_limit_rps
delta_abs = abs(expect_ncompleted - actual_ncompleted)
threshold = 0.05 * expect_ncompleted
assert threshold / rate_limit_rps < 0.1 * duration_secs, (
"test self-test: unrealistic expecations regarding precision in this test"
)
assert delta_abs < 0.05 * expect_ncompleted, (
assert pytest.approx(expect_ncompleted, 0.05) == actual_ncompleted, (
"the throttling deviates more than 5percent from the expectation"
)
@@ -122,6 +114,7 @@ def test_pageserver_getpage_throttle(neon_env_builder: NeonEnvBuilder, pg_bin: P
timeout=compaction_period,
)
log.info("validate the metrics")
smgr_query_seconds_post = ps_http.get_metric_value(smgr_metric_name, smgr_metrics_query)
assert smgr_query_seconds_post is not None
throttled_usecs_post = ps_http.get_metric_value(throttle_metric_name, throttle_metrics_query)
@@ -130,72 +123,13 @@ def test_pageserver_getpage_throttle(neon_env_builder: NeonEnvBuilder, pg_bin: P
actual_throttled_usecs = throttled_usecs_post - throttled_usecs_pre
actual_throttled_secs = actual_throttled_usecs / 1_000_000
log.info("validate that the metric doesn't include throttle wait time")
assert duration_secs >= 10 * actual_smgr_query_seconds, (
"smgr metrics should not include throttle wait time"
)
log.info("validate that the throttling wait time metrics is correct")
assert pytest.approx(actual_throttled_secs + actual_smgr_query_seconds, 0.1) == duration_secs, (
"most of the time in this test is spent throttled because the rate-limit's contribution to latency dominates"
"throttling and processing latency = total request time; this assert validates thi holds on average"
)
throttle_config_with_field_fair_set = {
"task_kinds": ["PageRequestHandler"],
"fair": True,
"initial": 27,
"refill_interval": "43s",
"refill_amount": 23,
"max": 42,
}
def assert_throttle_config_with_field_fair_set(conf):
"""
Field `fair` is ignored, so, responses don't contain it
"""
without_fair = copy.deepcopy(throttle_config_with_field_fair_set)
without_fair.pop("fair")
assert conf == without_fair
def test_throttle_fair_config_is_settable_but_ignored_in_mgmt_api(neon_env_builder: NeonEnvBuilder):
"""
To be removed after https://github.com/neondatabase/neon/pull/8539 is rolled out.
"""
env = neon_env_builder.init_start()
vps_http = env.storage_controller.pageserver_api()
# with_fair config should still be settable
vps_http.set_tenant_config(
env.initial_tenant,
{"timeline_get_throttle": throttle_config_with_field_fair_set},
)
conf = vps_http.tenant_config(env.initial_tenant)
assert_throttle_config_with_field_fair_set(conf.effective_config["timeline_get_throttle"])
assert_throttle_config_with_field_fair_set(
conf.tenant_specific_overrides["timeline_get_throttle"]
)
def test_throttle_fair_config_is_settable_but_ignored_in_config_toml(
neon_env_builder: NeonEnvBuilder,
):
"""
To be removed after https://github.com/neondatabase/neon/pull/8539 is rolled out.
"""
def set_tenant_config(ps_cfg):
tenant_config = ps_cfg.setdefault("tenant_config", {})
tenant_config["timeline_get_throttle"] = throttle_config_with_field_fair_set
neon_env_builder.pageserver_config_override = set_tenant_config
env = neon_env_builder.init_start()
ps_http = env.pageserver.http_client()
conf = ps_http.tenant_config(env.initial_tenant)
assert_throttle_config_with_field_fair_set(conf.effective_config["timeline_get_throttle"])
env.pageserver.allowed_errors.append(
r'.*ignoring unknown configuration item path="tenant_config\.timeline_get_throttle\.fair"*'
# without this assertion, the test would pass even if the throttling was completely broken
# but the request processing is so slow that it makes up for the latency that a correct throttling
# implementation would add
assert actual_smgr_query_seconds < 0.66 * duration_secs, (
"test self-test: request processing is consuming most of the wall clock time; this risks that we're not actually testing throttling"
)

View File

@@ -242,7 +242,13 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, make_httpserver,
pageserver.tenant_location_configure(tenant_id, location_conf)
last_state[pageserver.id] = (mode, generation)
if mode.startswith("Attached"):
# It's only valid to connect to the last generation. Newer generations may yank layer
# files used in older generations.
last_generation = max(
[s[1] for s in last_state.values() if s[1] is not None], default=None
)
if mode.startswith("Attached") and generation == last_generation:
# This is a basic test: we are validating that he endpoint works properly _between_
# configuration changes. A stronger test would be to validate that clients see
# no errors while we are making the changes.

View File

@@ -1,5 +1,6 @@
import os
import ssl
from datetime import datetime, timedelta
import pytest
import requests
@@ -151,3 +152,63 @@ def test_certificate_rotation(neon_env_builder: NeonEnvBuilder):
requests.get(addr, verify=str(env.ssl_ca_file)).raise_for_status()
cur_cert = ssl.get_server_certificate(("localhost", port))
assert cur_cert == sk_cert
def test_server_and_cert_metrics(neon_env_builder: NeonEnvBuilder):
"""
Test metrics exported from http/https server and tls cert reloader.
"""
neon_env_builder.use_https_pageserver_api = True
neon_env_builder.pageserver_config_override = "ssl_cert_reload_period='100 ms'"
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.append(".*Error reloading certificate.*")
ps_client = env.pageserver.http_client()
# 1. Test connection started metric.
filter_https = {"scheme": "https"}
old_https_conn_count = (
ps_client.get_metric_value("http_server_connection_started_total", filter_https) or 0
)
addr = f"https://localhost:{env.pageserver.service_port.https}/v1/status"
requests.get(addr, verify=str(env.ssl_ca_file)).raise_for_status()
new_https_conn_count = (
ps_client.get_metric_value("http_server_connection_started_total", filter_https) or 0
)
# The counter should increase after the request,
# but it may increase by more than one because of storcon requests.
assert new_https_conn_count > old_https_conn_count
# 2. Test tls connection error.
# Request without specified CA cert file should fail.
with pytest.raises(requests.exceptions.SSLError):
requests.get(addr)
tls_error_cnt = (
ps_client.get_metric_value("http_server_connection_errors_total", {"type": "tls"}) or 0
)
assert tls_error_cnt == 1
# 3. Test expiration time metric.
expiration_time = datetime.fromtimestamp(
ps_client.get_metric_value("tls_certs_expiration_time_seconds") or 0
)
now = datetime.now()
# neon_local generates certs valid for 100 years.
# Compare with +-1 year to not care about leap years.
assert now + timedelta(days=365 * 99) < expiration_time < now + timedelta(days=365 * 101)
# 4. Test cert reload failed metric.
reload_error_cnt = ps_client.get_metric_value("tls_certs_reload_failed_total")
assert reload_error_cnt == 0
os.remove(env.pageserver.workdir / "server.crt")
def reload_failed():
reload_error_cnt = ps_client.get_metric_value("tls_certs_reload_failed_total") or 0
assert reload_error_cnt > 0
wait_until(reload_failed)

View File

@@ -2892,10 +2892,12 @@ def test_storage_controller_leadership_transfer(
)
@pytest.mark.parametrize("step_down_times_out", [False, True])
def test_storage_controller_leadership_transfer_during_split(
neon_env_builder: NeonEnvBuilder,
storage_controller_proxy: StorageControllerProxy,
port_distributor: PortDistributor,
step_down_times_out: bool,
):
"""
Exercise a race between shard splitting and graceful leadership transfer. This is
@@ -2936,6 +2938,18 @@ def test_storage_controller_leadership_transfer_during_split(
)
env.storage_controller.reconcile_until_idle()
# We are testing scenarios where the step down API does not complete: either because it is stuck
# doing a shard split, or because it totally times out on some other failpoint.
env.storage_controller.allowed_errors.extend(
[
".*step_down.*request was dropped before completing.*",
".*step_down.*operation timed out.*",
".*Send step down request failed, will retry.*",
".*Send step down request still failed after.*retries.*",
".*Leader .+ did not respond to step-down request.*",
]
)
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
# Start a shard split
env.storage_controller.allowed_errors.extend(
@@ -2943,6 +2957,14 @@ def test_storage_controller_leadership_transfer_during_split(
)
pause_failpoint = "shard-split-pre-complete"
env.storage_controller.configure_failpoints((pause_failpoint, "pause"))
if not step_down_times_out:
# Prevent the timeout self-terminate code from executing: we will block step down on the
# shard split itself
env.storage_controller.configure_failpoints(
("step-down-delay-timeout", "return(3600000)")
)
split_fut = executor.submit(
env.storage_controller.tenant_shard_split, list(tenants)[0], shard_count * 2
)
@@ -2961,12 +2983,20 @@ def test_storage_controller_leadership_transfer_during_split(
timeout_in_seconds=30, instance_id=2, base_port=storage_controller_2_port
)
if step_down_times_out:
# Step down will time out, original controller will terminate itself
env.storage_controller.allowed_errors.extend([".*terminating process.*"])
else:
# Step down does not time out: original controller hits its shard split completion
# code path and realises that it must not purge the parent shards from the database.
env.storage_controller.allowed_errors.extend([".*Enqueuing background abort.*"])
def passed_split_abort():
try:
log.info("Checking log for pattern...")
assert env.storage_controller.log_contains(
".*Using observed state received from leader.*"
)
# This log is indicative of entering startup_reconcile, which happens
# after the point we would abort shard splits
assert env.storage_controller.log_contains(".*Populating tenant shards.*")
except Exception:
log.exception("Failed to find pattern in log")
raise
@@ -2975,34 +3005,42 @@ def test_storage_controller_leadership_transfer_during_split(
wait_until(passed_split_abort, interval=0.1, status_interval=1.0)
assert env.storage_controller.log_contains(".*Aborting shard split.*")
# Proxy is still talking to original controller here: disable its pause failpoint so
# that its shard split can run to completion.
log.info("Disabling failpoint")
# Bypass the proxy: the python test HTTPServer is single threaded and still blocked
# on handling the shard split request.
env.storage_controller.request(
"PUT",
f"http://127.0.0.1:{storage_controller_1_port}/debug/v1/failpoints",
json=[{"name": "shard-split-pre-complete", "actions": "off"}],
headers=env.storage_controller.headers(TokenScope.ADMIN),
)
if step_down_times_out:
# We will let the old controller hit a timeout path where it terminates itself, rather than
# completing step_down and trying to complete a shard split
def old_controller_terminated():
assert env.storage_controller.log_contains(".*terminating process.*")
def previous_stepped_down():
assert (
env.storage_controller.get_leadership_status()
== StorageControllerLeadershipStatus.STEPPED_DOWN
wait_until(old_controller_terminated)
else:
# Proxy is still talking to original controller here: disable its pause failpoint so
# that its shard split can run to completion.
log.info("Disabling failpoint")
# Bypass the proxy: the python test HTTPServer is single threaded and still blocked
# on handling the shard split request.
env.storage_controller.request(
"PUT",
f"http://127.0.0.1:{storage_controller_1_port}/debug/v1/failpoints",
json=[{"name": "shard-split-pre-complete", "actions": "off"}],
headers=env.storage_controller.headers(TokenScope.ADMIN),
)
log.info("Awaiting step down")
wait_until(previous_stepped_down)
def previous_stepped_down():
assert (
env.storage_controller.get_leadership_status()
== StorageControllerLeadershipStatus.STEPPED_DOWN
)
# Let the shard split complete: this may happen _after_ the replacement has come up
# and tried to clean up the databases
log.info("Unblocking & awaiting shard split")
with pytest.raises(Exception, match="Unexpected child shard count"):
# This split fails when it tries to persist results, because it encounters
# changes already made by the new controller's abort-on-startup
split_fut.result()
log.info("Awaiting step down")
wait_until(previous_stepped_down)
# Let the shard split complete: this may happen _after_ the replacement has come up
# and tried to clean up the databases
log.info("Unblocking & awaiting shard split")
with pytest.raises(Exception, match="Unexpected child shard count"):
# This split fails when it tries to persist results, because it encounters
# changes already made by the new controller's abort-on-startup
split_fut.result()
log.info("Routing to new leader")
storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_2_port}")
@@ -3020,13 +3058,14 @@ def test_storage_controller_leadership_transfer_during_split(
env.storage_controller.wait_until_ready()
env.storage_controller.consistency_check()
# Check that the stepped down instance forwards requests
# to the new leader while it's still running.
storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_1_port}")
env.storage_controller.tenant_shard_dump()
env.storage_controller.node_configure(env.pageservers[0].id, {"scheduling": "Pause"})
status = env.storage_controller.node_status(env.pageservers[0].id)
assert status["scheduling"] == "Pause"
if not step_down_times_out:
# Check that the stepped down instance forwards requests
# to the new leader while it's still running.
storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_1_port}")
env.storage_controller.tenant_shard_dump()
env.storage_controller.node_configure(env.pageservers[0].id, {"scheduling": "Pause"})
status = env.storage_controller.node_status(env.pageservers[0].id)
assert status["scheduling"] == "Pause"
def test_storage_controller_ps_restarted_during_drain(neon_env_builder: NeonEnvBuilder):

View File

@@ -45,7 +45,7 @@ from fixtures.remote_storage import (
s3_storage,
)
from fixtures.safekeeper.http import (
Configuration,
MembershipConfiguration,
SafekeeperHttpClient,
SafekeeperId,
TimelineCreateRequest,
@@ -589,7 +589,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re
for sk in env.safekeepers:
sk.start()
cli = sk.http_client()
mconf = Configuration(generation=0, members=[], new_members=None)
mconf = MembershipConfiguration(generation=0, members=[], new_members=None)
# set start_lsn to the beginning of the first segment to allow reading
# WAL from there (could you intidb LSN as well).
r = TimelineCreateRequest(
@@ -1948,7 +1948,7 @@ def test_membership_api(neon_env_builder: NeonEnvBuilder):
sk_id_2 = SafekeeperId(11, "localhost", 5434) # just a mock
# Request to switch before timeline creation should fail.
init_conf = Configuration(generation=1, members=[sk_id_1], new_members=None)
init_conf = MembershipConfiguration(generation=1, members=[sk_id_1], new_members=None)
with pytest.raises(requests.exceptions.HTTPError):
http_cli.membership_switch(tenant_id, timeline_id, init_conf)
@@ -1960,7 +1960,7 @@ def test_membership_api(neon_env_builder: NeonEnvBuilder):
http_cli.timeline_create(create_r)
# Switch into some conf.
joint_conf = Configuration(generation=4, members=[sk_id_1], new_members=[sk_id_2])
joint_conf = MembershipConfiguration(generation=4, members=[sk_id_1], new_members=[sk_id_2])
resp = http_cli.membership_switch(tenant_id, timeline_id, joint_conf)
log.info(f"joint switch resp: {resp}")
assert resp.previous_conf.generation == 1
@@ -1973,24 +1973,26 @@ def test_membership_api(neon_env_builder: NeonEnvBuilder):
assert after_restart.generation == 4
# Switch into non joint conf of which sk is not a member, must fail.
non_joint_not_member = Configuration(generation=5, members=[sk_id_2], new_members=None)
non_joint_not_member = MembershipConfiguration(
generation=5, members=[sk_id_2], new_members=None
)
with pytest.raises(requests.exceptions.HTTPError):
resp = http_cli.membership_switch(tenant_id, timeline_id, non_joint_not_member)
# Switch into good non joint conf.
non_joint = Configuration(generation=6, members=[sk_id_1], new_members=None)
non_joint = MembershipConfiguration(generation=6, members=[sk_id_1], new_members=None)
resp = http_cli.membership_switch(tenant_id, timeline_id, non_joint)
log.info(f"non joint switch resp: {resp}")
assert resp.previous_conf.generation == 4
assert resp.current_conf.generation == 6
# Switch request to lower conf should be rejected.
lower_conf = Configuration(generation=3, members=[sk_id_1], new_members=None)
lower_conf = MembershipConfiguration(generation=3, members=[sk_id_1], new_members=None)
with pytest.raises(requests.exceptions.HTTPError):
http_cli.membership_switch(tenant_id, timeline_id, lower_conf)
# Now, exclude sk from the membership, timeline should be deleted.
excluded_conf = Configuration(generation=7, members=[sk_id_2], new_members=None)
excluded_conf = MembershipConfiguration(generation=7, members=[sk_id_2], new_members=None)
http_cli.timeline_exclude(tenant_id, timeline_id, excluded_conf)
with pytest.raises(requests.exceptions.HTTPError):
http_cli.timeline_status(tenant_id, timeline_id)
@@ -2010,11 +2012,6 @@ def test_explicit_timeline_creation(neon_env_builder: NeonEnvBuilder):
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
ps = env.pageservers[0]
ps_http_cli = ps.http_client()
http_clis = [sk.http_client() for sk in env.safekeepers]
config_lines = [
"neon.safekeeper_proto_version = 3",
]
@@ -2023,22 +2020,11 @@ def test_explicit_timeline_creation(neon_env_builder: NeonEnvBuilder):
# expected to fail because timeline is not created on safekeepers
with pytest.raises(Exception, match=r".*timed out.*"):
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3], timeout="2s")
# figure out initial LSN.
ps_timeline_detail = ps_http_cli.timeline_detail(tenant_id, timeline_id)
init_lsn = ps_timeline_detail["last_record_lsn"]
log.info(f"initial LSN: {init_lsn}")
# sk timeline creation request expects minor version
pg_version = ps_timeline_detail["pg_version"] * 10000
# create inital mconf
sk_ids = [SafekeeperId(sk.id, "localhost", sk.port.pg_tenant_only) for sk in env.safekeepers]
mconf = Configuration(generation=1, members=sk_ids, new_members=None)
create_r = TimelineCreateRequest(
tenant_id, timeline_id, mconf, pg_version, Lsn(init_lsn), commit_lsn=None
mconf = MembershipConfiguration(
generation=1, members=Safekeeper.sks_to_safekeeper_ids(env.safekeepers), new_members=None
)
log.info(f"sending timeline create: {create_r.to_json()}")
for sk_http_cli in http_clis:
sk_http_cli.timeline_create(create_r)
Safekeeper.create_timeline(tenant_id, timeline_id, env.pageservers[0], mconf, env.safekeepers)
# Once timeline created endpoint should start.
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")

View File

@@ -18,6 +18,7 @@ from fixtures.neon_fixtures import (
Safekeeper,
)
from fixtures.remote_storage import RemoteStorageKind
from fixtures.safekeeper.http import MembershipConfiguration
from fixtures.utils import skip_in_debug_build
if TYPE_CHECKING:
@@ -452,20 +453,24 @@ def test_concurrent_computes(neon_env_builder: NeonEnvBuilder):
asyncio.run(run_concurrent_computes(env))
async def assert_query_hangs(endpoint: Endpoint, query: str):
"""
Start on endpoint query which is expected to hang and check that it does.
"""
conn = await endpoint.connect_async()
bg_query = asyncio.create_task(conn.execute(query))
await asyncio.sleep(2)
assert not bg_query.done()
return bg_query
# Stop safekeeper and check that query cannot be executed while safekeeper is down.
# Query will insert a single row into a table.
async def check_unavailability(
sk: Safekeeper, conn: asyncpg.Connection, key: int, start_delay_sec: int = 2
):
async def check_unavailability(sk: Safekeeper, ep: Endpoint, key: int, start_delay_sec: int = 2):
# shutdown one of two acceptors, that is, majority
sk.stop()
bg_query = asyncio.create_task(conn.execute(f"INSERT INTO t values ({key}, 'payload')"))
await asyncio.sleep(start_delay_sec)
# ensure that the query has not been executed yet
assert not bg_query.done()
bg_query = await assert_query_hangs(ep, f"INSERT INTO t values ({key}, 'payload')")
# start safekeeper and await the query
sk.start()
await bg_query
@@ -480,10 +485,10 @@ async def run_unavailability(env: NeonEnv, endpoint: Endpoint):
await conn.execute("INSERT INTO t values (1, 'payload')")
# stop safekeeper and check that query cannot be executed while safekeeper is down
await check_unavailability(env.safekeepers[0], conn, 2)
await check_unavailability(env.safekeepers[0], endpoint, 2)
# for the world's balance, do the same with second safekeeper
await check_unavailability(env.safekeepers[1], conn, 3)
await check_unavailability(env.safekeepers[1], endpoint, 3)
# check that we can execute queries after restart
await conn.execute("INSERT INTO t values (4, 'payload')")
@@ -514,15 +519,7 @@ async def run_recovery_uncommitted(env: NeonEnv):
# insert with only one safekeeper up to create tail of flushed but not committed WAL
sk1.stop()
sk2.stop()
conn = await ep.connect_async()
# query should hang, so execute in separate task
bg_query = asyncio.create_task(
conn.execute("insert into t select generate_series(1, 2000), 'payload'")
)
sleep_sec = 2
await asyncio.sleep(sleep_sec)
# it must still be not finished
assert not bg_query.done()
await assert_query_hangs(ep, "insert into t select generate_series(1, 2000), 'payload'")
# note: destoy will kill compute_ctl, preventing it waiting for hanging sync-safekeepers.
ep.stop_and_destroy()
@@ -559,15 +556,7 @@ async def run_wal_truncation(env: NeonEnv, safekeeper_proto_version: int):
# insert with only one sk3 up to create tail of flushed but not committed WAL on it
sk1.stop()
sk2.stop()
conn = await ep.connect_async()
# query should hang, so execute in separate task
bg_query = asyncio.create_task(
conn.execute("insert into t select generate_series(1, 180000), 'Papaya'")
)
sleep_sec = 2
await asyncio.sleep(sleep_sec)
# it must still be not finished
assert not bg_query.done()
await assert_query_hangs(ep, "insert into t select generate_series(1, 180000), 'Papaya'")
# note: destoy will kill compute_ctl, preventing it waiting for hanging sync-safekeepers.
ep.stop_and_destroy()
@@ -607,6 +596,132 @@ def test_wal_truncation(neon_env_builder: NeonEnvBuilder, safekeeper_proto_versi
asyncio.run(run_wal_truncation(env, safekeeper_proto_version))
async def quorum_sanity_single(
env: NeonEnv,
compute_sks_ids: list[int],
members_sks_ids: list[int],
new_members_sks_ids: list[int] | None,
sks_to_stop_ids: list[int],
should_work_when_stopped: bool,
):
"""
*_ids params contain safekeeper node ids; it is assumed they are issued
from 1 and sequentially assigned to env.safekeepers.
"""
members_sks = [env.safekeepers[i - 1] for i in members_sks_ids]
new_members_sks = (
[env.safekeepers[i - 1] for i in new_members_sks_ids] if new_members_sks_ids else None
)
sks_to_stop = [env.safekeepers[i - 1] for i in sks_to_stop_ids]
mconf = MembershipConfiguration(
generation=1,
members=Safekeeper.sks_to_safekeeper_ids(members_sks),
new_members=Safekeeper.sks_to_safekeeper_ids(new_members_sks) if new_members_sks else None,
)
members_sks = Safekeeper.mconf_sks(env, mconf)
tenant_id = env.initial_tenant
compute_sks_ids_str = "-".join([str(sk_id) for sk_id in compute_sks_ids])
members_sks_ids_str = "-".join([str(sk.id) for sk in mconf.members])
new_members_sks_ids_str = "-".join(
[str(sk.id) for sk in mconf.new_members] if mconf.new_members is not None else []
)
sks_to_stop_ids_str = "-".join([str(sk.id) for sk in sks_to_stop])
log.info(
f"running quorum_sanity_single with compute_sks={compute_sks_ids_str}, members_sks={members_sks_ids_str}, new_members_sks={new_members_sks_ids_str}, sks_to_stop={sks_to_stop_ids_str}, should_work_when_stopped={should_work_when_stopped}"
)
branch_name = f"test_quorum_single_c{compute_sks_ids_str}_m{members_sks_ids_str}_{new_members_sks_ids_str}_s{sks_to_stop_ids_str}"
timeline_id = env.create_branch(branch_name)
# create timeline on `members_sks`
Safekeeper.create_timeline(tenant_id, timeline_id, env.pageservers[0], mconf, members_sks)
config_lines = [
"neon.safekeeper_proto_version = 3",
]
ep = env.endpoints.create(branch_name, config_lines=config_lines)
ep.start(safekeeper_generation=1, safekeepers=compute_sks_ids)
ep.safe_psql("create table t(key int, value text)")
# stop specified sks and check whether writes work
for sk in sks_to_stop:
sk.stop()
if should_work_when_stopped:
log.info("checking that writes still work")
ep.safe_psql("insert into t select generate_series(1, 100), 'Papaya'")
# restarting ep should also be fine
ep.stop()
ep.start()
ep.safe_psql("insert into t select generate_series(1, 100), 'plum'")
bg_query = None
else:
log.info("checking that writes hang")
bg_query = await assert_query_hangs(
ep, "insert into t select generate_series(1, 100), 'Papaya'"
)
# start again; now they should work
for sk in sks_to_stop:
sk.start()
if bg_query:
log.info("awaiting query")
await bg_query
# It's a bit tempting to iterate over all possible combinations, but let's stick
# with this for now.
async def run_quorum_sanity(env: NeonEnv):
# 3 members, all up, should work
await quorum_sanity_single(env, [1, 2, 3], [1, 2, 3], None, [], True)
# 3 members, 2/3 up, should work
await quorum_sanity_single(env, [1, 2, 3], [1, 2, 3], None, [3], True)
# 3 members, 1/3 up, should not work
await quorum_sanity_single(env, [1, 2, 3], [1, 2, 3], None, [2, 3], False)
# 3 members, all up, should work; wp redundantly talks to 4th.
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], None, [], True)
# 3 members, all up, should work with wp talking to 2 of these 3 + plus one redundant
await quorum_sanity_single(env, [2, 3, 4], [1, 2, 3], None, [], True)
# 3 members, 2/3 up, could work but wp talks to different 3s, so it shouldn't
await quorum_sanity_single(env, [2, 3, 4], [1, 2, 3], None, [3], False)
# joint conf of 1-2-3 and 4, all up, should work
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [4], [], True)
# joint conf of 1-2-3 and 4, 4 down, shouldn't work
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [4], [4], False)
# joint conf of 1-2-3 and 2-3-4, all up, should work
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [2, 3, 4], [], True)
# joint conf of 1-2-3 and 2-3-4, 1 and 4 down, should work
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [2, 3, 4], [1, 4], True)
# joint conf of 1-2-3 and 2-3-4, 2 down, should work
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [2, 3, 4], [2], True)
# joint conf of 1-2-3 and 2-3-4, 3 down, should work
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [2, 3, 4], [3], True)
# joint conf of 1-2-3 and 2-3-4, 1 and 2 down, shouldn't work
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [2, 3, 4], [1, 2], False)
# joint conf of 1-2-3 and 2-3-4, 2 and 4 down, shouldn't work
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [2, 3, 4], [2, 4], False)
# joint conf of 1-2-3 and 2-3-4 with wp talking to 2-3-4 only.
await quorum_sanity_single(env, [2, 3, 4], [1, 2, 3], [2, 3, 4], [], True)
# with 1 down should still be ok
await quorum_sanity_single(env, [2, 3, 4], [1, 2, 3], [2, 3, 4], [1], True)
# but with 2 down not ok
await quorum_sanity_single(env, [2, 3, 4], [1, 2, 3], [2, 3, 4], [2], False)
# Test various combinations of membership configurations / neon.safekeepers
# (list of safekeepers endpoint connects to) values / up & down safekeepers and
# check that endpont can start and write data when we have quorum and can't when
# we don't.
def test_quorum_sanity(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 4
env = neon_env_builder.init_start()
asyncio.run(run_quorum_sanity(env))
async def run_segment_init_failure(env: NeonEnv):
env.create_branch("test_segment_init_failure")
ep = env.endpoints.create_start("test_segment_init_failure")