Compare commits

..

43 Commits

Author SHA1 Message Date
Tristan Partin
f879c66f14 BLAH 2025-03-13 15:12:35 -05:00
Tristan Partin
387b5c585e Commit 6 2025-03-13 15:12:35 -05:00
Tristan Partin
ad83156e2f Commit 5 2025-03-13 15:12:35 -05:00
Tristan Partin
3bcfaeb936 Commit 4 2025-03-13 15:12:35 -05:00
Tristan Partin
7c102b92fb Commit 3 2025-03-13 15:12:35 -05:00
Tristan Partin
2e59a41c98 Commit 2 2025-03-13 15:12:35 -05:00
Tristan Partin
358b59a5d7 Commit 1
Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-03-13 15:12:35 -05:00
Arpad Müller
b1a1be6a4c switch pytests and neon_local to control_plane_hooks_api (#11195)
We want to switch away from and deprecate the `--compute-hook-url` param
for the storcon in favour of `--control-plane-url` because it allows us
to construct urls with `notify-safekeepers`.

This PR switches the pytests and neon_local from a
`control_plane_compute_hook_api` to a new param named
`control_plane_hooks_api` which is supposed to point to the parent of
the `notify-attach` URL.

We still support reading the old url from disk to not be too disruptive
with existing deployments, but we just ignore it.

Also add docs for the `notify-safekeepers` upcall API.

Follow-up of #11173
Part of https://github.com/neondatabase/neon/issues/11163
2025-03-13 19:50:52 +00:00
Erik Grinaker
8afae9d03c pageserver: enable l0_flush_delay_threshold by default (#11214)
## Problem

`l0_flush_delay_threshold` has already been set to 30 in production for
a couple of weeks. Let's harmonize the default.

## Summary of changes

Update `DEFAULT_L0_FLUSH_DELAY_FACTOR` to 3 such that the default
`l0_flush_delay_threshold` is `3 * compaction_threshold`.

This differs from the production setting, which is hardcoded to 30 (with
`compaction_threshold` at 10), and is more appropriate for any tenants
that have custom `compaction_threshold` overrides.
2025-03-13 19:15:22 +00:00
JC Grünhage
066b0a1be9 fix(ci): correctly push neon-test-extensions in releases and to ghcr (#11225)
## Problem
ef0d4a48a adjusted how we build container images and how we push them,
and the neon-test-extensions image was overlooked. Additionally, is was
also missed in 1f0dea9a1, which pushed our container images to GHCR.

## Summary of changes
Push neon-test-extensions to GHCR and also push release tags for it.
2025-03-13 18:18:55 +00:00
Konstantin Knizhnik
398d2794eb Handle DEBUG_COMPARE_LOCAL mode in neon_zeroextend (#11220)
## Problem

DEBUG_COMPARE_LOCAL is not supported in neon_zeroextend added in PG16

## Summary of changes

Add support of DEBUG_COMPARE_LOCAL in neon_zeroextend

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-03-13 16:30:32 +00:00
Erik Grinaker
3c3b9dc919 pageserver: enable image_creation_preempt_threshold by default (#11216)
## Problem

This is already set in production, we should harmonize the default.

## Summary of changes

Default `image_creation_preempt_threshold` to 3.
2025-03-13 16:28:21 +00:00
Christian Schwarz
ed31dd2a3c pageserver: better observability for slow wait_lsn (#11176)
# Problem

We leave too few observability breadcrumbs in the case where wait_lsn is
exceptionally slow.

# Changes

- refactor: extract the monitoring logic out of `log_slow` into
`monitor_slow_future`
- add global + per-timeline counter for time spent waiting for wait_lsn
- It is updated while we're still waiting, similar to what we do for
page_service response flush.
- add per-timeline counterpair for started & finished wait_lsn count
- add slow-logging to leave breadcrumbs in logs, not just metrics

For the slow-logging, we need to consider not flooding the logs during a
broker or network outage/blip.
The solution is a "log-streak-level" concurrency limit per timeline.
At any given time, there is at most one slow wait_lsn that is logging
the "still running" and "completed" sequence of logs.
Other concurrent slow wait_lsn's don't log at all.
This leaves at least one breadcrumb in each timeline's logs if some
wait_lsn was exceptionally slow during a given period.
The full degree of slowness can then be determined by looking at the
per-timeline metric.

# Performance

Reran the `bench_log_slow` benchmark, no difference, so, existing call
sites are fine.

We do use a Semaphore, but only try_acquire it _after_ things have
already been determined to be slow. So, no baseline overhead
anticipated.

# Refs

-
https://github.com/neondatabase/cloud/issues/23486#issuecomment-2711587222
2025-03-13 15:03:53 +00:00
Conrad Ludgate
3dec117572 feat(compute_ctl): use TLS if configured (#10972)
Closes: https://github.com/neondatabase/cloud/issues/22998

If control-plane reports that TLS should be used, load the certificates
(and watch for updates), make sure postgres use them, and detects
updates.

Procedure:
1. Load certificates
2. Reconfigure postgres/pgbouncer
3. Loop on a timer until certificates have loaded
4. Go to 1

Notes:
1. We only run this procedure if requested on startup by control plane.
2. We needed to compile pgbouncer with openssl enabled
3. Postgres doesn't allow tls keys to be globally accessible - must be
read only to the postgres user. I couldn't convince the autoscaling team
to let me put this logic into the VM settings, so instead compute_ctl
will copy the keys to be read-only by postgres.
4. To mitigate a race condition, we also verify that the key matches the
cert.
2025-03-13 15:03:22 +00:00
Alex Chi Z.
b2286f5bcb fix(pageserver): don't panic if gc-compaction find no keys (#11200)
## Problem

There was a panic on staging that compaction didn't find any keys. This
is possible if all layers selected for compaction does not contain any
keys within the current shard.

## Summary of changes

Make panic an error. In the future, we can try creating an empty image
layer so that GC can clean up those layers. Otherwise, for now, we can
only rely on shard ancestor compaction to remove these data.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-03-13 14:38:45 +00:00
Erik Grinaker
c036fec065 pageserver: enable compaction_l0_first by default (#11212)
## Problem

`compaction_l0_first` has already been enabled in production for a
couple of weeks.

## Summary of changes

Enable `compaction_l0_first` by default.

Also set `CompactFlags::NoYield` in `timeline_checkpoint_handler`, to
ensure explicitly requested compaction runs to completion. This endpoint
is mainly used in tests, and caused some flakiness where tests expected
compaction to complete.
2025-03-13 14:28:42 +00:00
JC Grünhage
89c7e4e917 fix(ci): use paranthesis for error handling in jq when fetching release PRs (#11217)
## Problem
#11061 introduced code fetching previous releases. #11151 introduced jq
error handling, which has also been applied in #11061, but parenthesis
have been missed.

## Summary of changes
Add parenthesis around error handling code.
2025-03-13 13:40:43 +00:00
Erik Grinaker
5a245a837d storcon: retain stripe size when autosplitting sharded tenants (#11194)
## Problem

Autosplits always request `DEFAULT_STRIPE_SIZE` for splits. However,
splits do not allow changing the stripe size of already-sharded tenants,
and will error out if it differs.

In #11168, we are changing the stripe size, which could hit this when
attempting to autosplit already sharded tenants.

Touches #11168.

## Summary of changes

Pass `new_stripe_size: None` when autosplitting already sharded tenants.
Otherwise, pass `DEFAULT_STRIPE_SIZE` instead of the shard identity's
stripe size, since we want to use the current default rather than an
old, persisted default.
2025-03-13 13:28:10 +00:00
devin-ai-integration[bot]
efb1df4362 fix: Change metric_unit from 'microseconds' to 'μs' in test_compute_ctl_api.py (#11209)
# Fix metric_unit length in test_compute_ctl_api.py

## Description
This PR changes the metric_unit from "microseconds" to "μs" in
test_compute_ctl_api.py to fix the issue where perf test results were
not being stored in the database due to the string exceeding the 10
character limit of the metric_unit column in the perf_test_results
table.

## Problem
As reported in Slack, the perf test results were not being uploaded to
the database because the "microseconds" string (12 characters) exceeds
the 10 character limit of the metric_unit column in the
perf_test_results table.

## Solution
Replace "microseconds" with "μs" in all metric_unit parameters in the
test_compute_ctl_api.py file.

## Testing
The changes have been committed and pushed. The PR is ready for review.

Link to Devin run:
https://app.devin.ai/sessions/e29edd672bd34114b059915820e8a853
Requested by: Peter Bendel

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: peterbendel@neon.tech <peterbendel@neon.tech>
2025-03-13 10:17:01 +00:00
JC Grünhage
803e6f908a fix(ci): fix syntax of lint-release-pr (#11208)
## Problem
A small adjustment in #11061 broke the lint-release-pr.sh script, and
the new version was neither tested nor linted. This has been done now,
the script is once again tested and passing `shellcheck`.

## Summary of changes
Add missing `el` of `elif` condition chain.
2025-03-13 09:42:38 +00:00
JC Grünhage
afc9524bc7 fix(ci): run lint-release-pr on head-ref (#11206)
## Problem
#11061 changed release pr creation, and I missed that the workflow will
checkout a would-be-merge of the rc branch and the release branch
instead of the head ref, unless explicitly instructed otherwise.

## Summary of changes
Check out head ref for linting the release PRs.
2025-03-13 08:17:33 +00:00
JC Grünhage
507353404c fix(ci): pass emtpy body when creating release PRs (#11203)
## Problem
#11061 changed release pr creation, and I missed that creating PRs using
`gh` in non-interactive environments *requires* `--body` instead of
defaulting to an empty body.

## Summary of changes
Explicitly set an empty body when creating release PRs.
2025-03-12 23:54:43 +00:00
JC Grünhage
48be4df3f3 fix(ci): fetch all refs in release PR creation (#11201)
## Problem
#11061 changed release PR creation, and I missed that we need to
explicitly fetch the whole history so that the relevant git refs and
objects are available.

## Summary of changes
- Fetch all git refs including history by setting fetch-depth to 0
- Reference release branch as a remote branch, because we haven't
checked it out locally
2025-03-12 22:32:38 +00:00
Alex Chi Z.
c3b3b507f7 feat(pageserver): support detaching behavior v2 (#11158)
## Problem

close https://github.com/neondatabase/neon/issues/10310

## Summary of changes

This patch adds a new behavior for the detach_ancestor API: detach with
multi-level ancestor and no reparenting. Though we can potentially
support multi-level + do reparenting / single-level + no-reparenting in
the future, as it's not required for the recovery/snapshot epic, I'd
prefer keeping things simple now that we only handle the old one and the
new one instead of supporting the full feature matrix.

I only added a test case of successful detaching instead of testing
failures. I'd like to make this into staging and add more tests in the
future.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-03-12 22:27:23 +00:00
JC Grünhage
ef0d4a48a8 Reuse artifacts from release PRs (#11061)
## Problem
When we release our components, we perform builds in the release PR,
then test the components, then merge the PR, and then build everything
*again*, run tests *again*, and only then start deployments.

To speed things up, we want to perform builds and run tests in the PR,
and start deployments using the existing artifacts from the release PR.

To make that possible, we need to have both CI pipelines running on the
same commit hash, which requires fast forwarding release. That only
works, if we have a commit in the PR that has the current release branch
state as an ancestor.

## Summary of changes
- Changes to release PR creation:
- Remove templates and automatic bodies for release PRs. The previous
template wasn't used anymore, and the automatic body we created in the
pipeline didn't contain any useful content anymore after the changees
here.
- Make it possible to select the source branch. For releases that aren't
cut from `main`, like https://github.com/neondatabase/neon/pull/11051,
we need a way to trigger the new flow from a different branch.
- Determine `release-branch` automatically from the component name
instead of passing that as well.
- Changes to the merge queue job:
- Rename `get-changed-files` to `meta` in preparation of additional data
being fetched as part of that job
- Fail the merge queue if we're trying to merge into a branch other than
main - this is to prevent non-fast-forward merges.
- Label PRs to branches other than main as `fast-forward`, to trigger
the fast-forward job
- Add a fast-forward job that can be triggered with the `fast-forward`
label that performs a fast-forward merge. This only happens if the PR
has `mergeable_state == clean`, so CI having passed.
- Build and Test on releases now skips building images, skips testing
images and skips triggering e2e tests. We add new tags to the images
from the release PR to tag them as release images, and we push them to
the prod registries.
2025-03-12 21:00:59 +00:00
Alex Chi Z.
8a5a739af0 test(pageserver): add small tenant compaction (#11049)
## Problem

close https://github.com/neondatabase/neon/issues/10881

## Summary of changes

Mock a tenant with very small amount of data.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-03-12 20:34:19 +00:00
Tristan Partin
5eed0e4b94 Add docs to performance/test_logical_replication.py on how to run the suite (#10175)
These docs are in tandem with what was recently published on the
internal docs site.

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-03-12 17:31:09 +00:00
Tristan Partin
bb3c0ff251 Make collecting the installed extensions metric async (#11071)
If the goal is to make compute_ctl completely asynchronous, then this is
one step to getting there.

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-03-12 16:09:02 +00:00
Conrad Ludgate
7aec1364dd chore(proxy): remove enum and composite type queries (#11178)
In our json encoding, we only need to know about array types.
Information about composites or enums are not actually used.

Enums are quite popular, needing to type query them when not needed can
add some latency cost for no gain.
2025-03-12 15:47:17 +00:00
Tristan Partin
40672b739e Move maybe_add_request_id_header middleware into middleware module (#11187)
This matches the authorization middleware.

---------

Signed-off-by: Tristan Partin <tristan@neon.tech>
Co-authored-by: Mikhail Kot <mikhail@neon.tech>
2025-03-12 15:34:46 +00:00
Vlad Lazar
02a83913ec storcon: do not update observed state on node activation (#11155)
## Problem

When a node becomes active, we query its locations and update the
observed state in-place.
This can race with the observed state updates done when processing
reconcile results.

## Summary of changes

The argument for this reconciliation step is that is reduces the need
for background reconciliations.
I don't think is actually true anymore. There's two cases.

1. Restart of node after drain. Usually the node does not go through the
offline state here, so observed locations
were not marked as none. In any case, there should be a handful of
shards max on the node since we've just drained it.
2. Node comes back online after failure or network partition. When the
node is marked offline, we reschedule everything away from it. When it
later becomes active, the previous observed location is extraneous and
requires a reconciliation anyway.

Closes https://github.com/neondatabase/neon/issues/11148
2025-03-12 15:31:28 +00:00
Erik Grinaker
c7717c85c7 storcon,pageserver: use persisted stripe size when loading unsharded tenants (#11193)
## Problem

When the storage controller and Pageserver loads tenants from persisted
storage, it uses `ShardIdentity::unsharded()` for unsharded tenants.
However, this replaces the persisted stripe size of unsharded tenants
with the default stripe size.

This doesn't really matter for practical purposes, since the stripe size
is meaningless for unsharded tenants anyway, but can cause consistency
check failures if the persisted stripe size differs from the default.
This was seen in #11168, where we change the default stripe size.

Touches #11168.

## Summary of changes

Carry over the persisted stripe size from `TenantShardPersistence` for
unsharded tenants, and from `LocationConf` on Pageservers.

Also add bounds checks for type casts when loading persisted shard
metadata.
2025-03-12 15:16:54 +00:00
Erik Grinaker
1436b8469c pageserver: appease unused lint on macOS (#11192)
## Problem

`info_span!` is only used in a `linux` branch, causing the unused lint
to fire on macOS.

## Summary of changes

Fully qualify the `info_span!` use.
2025-03-12 14:34:29 +00:00
JC Grünhage
fc515e7be2 chore(deps): bump env_logger to 0.11.7 (#11188)
## Problem
`humantime` is unmaintained, we want to migrate to `jiff`, see
https://github.com/neondatabase/neon/issues/11179.

`env_logger` in older versions depend on `humantime`, and newer versions
depend on `jiff`, so we need to update it.

## Summary of changes
Update `env_logger` to the most recent release, which does not depend on
`humantime` anymore.
2025-03-12 14:26:52 +00:00
John Spray
7015dbbdf0 storcon_cli: remove pre-warm helper (#11183)
## Problem

This command was used when onboarding tenants to the storage controller.
We no longer do that, so the command can go.

## Summary of changes

- Remove `storcon_cli tenant-warmup` command
2025-03-12 14:02:11 +00:00
Dmitrii Kovalkov
73e37ae388 Suppress "request was dropped" errors in test_timeline_archive (#11190)
## Problem

Test `test_timeline_archive` is flaky because it makes requests that are
intended to fail. It sometimes leads to warning in pageserver's logs.
More details are in the issue.

- Closes: https://github.com/neondatabase/neon/issues/11177

## Summary of changes
- Suppress such errors.
2025-03-12 13:23:31 +00:00
Vlad Lazar
1c0ff3c04d utils: explicit OTEL export config and OTEL enablement via common entry point (#11139)
We want to export performance traces from the pageserver in OTEL format.
End goal is to see them in Grafana.

To this end, there are two changes here:
1. Update the `tracing-utils` crate to allow for explicitly specifying
the export configuration. Pageserver configuration is loaded from a file
on start-up. This allows us to use the same flow for export configs
there.
2. Update the `utils::logging::init` common entry point to set up OTEL
tracing infrastructure if requested. Note that an entirely different
tracing subscriber is used. This is to avoid interference with the
existing tracing set-up. For now, no service uses this functionality.

PR to plug this into the pageserver is
[here](https://github.com/neondatabase/neon/pull/11140).

Related https://github.com/neondatabase/neon/issues/9873
2025-03-12 11:07:49 +00:00
John Spray
7bf6397334 pageserver: remove legacy TimelineInfo::latest_gc_cutoff field (1/2) (#11149)
## Problem

This field was retained for backward compat only in
https://github.com/neondatabase/neon/pull/10707.

Once https://github.com/neondatabase/cloud/pull/25233 is released,
nothing external will be reading this field.

Internally, this was a mandatory field so storage controller is still
trying to decode it, so we must do this removal in two steps: this PR
makes the field optional, and after one release we can fully remove it.

Related: https://github.com/neondatabase/cloud/issues/24250

## Summary of changes

- Rename field to `_unused`
- Remove field from swagger
- Make field optional
2025-03-12 10:23:41 +00:00
Konstantin Knizhnik
f60ffe3021 Rebase compare local debug mode (#11174)
## Problem

DEBUG_COMPARE_LOCAL mode is broken

See
https://neondb.slack.com/archives/C03QLRH7PPD/p1732862608323269?thread_ts=1732711054.862919&cid=C03QLRH7PPD

## Summary of changes

Fix compile errors and unlogged build issues.

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-03-12 05:52:18 +00:00
Arpad Müller
da2431f11f storcon: add --control-plane-url config option (#11173)
Adds the `--control-plane-url` config option to the storcon, which we
want to migrate to instead of using `notify-attach`.

Part of #11163
2025-03-12 02:30:56 +00:00
JC Grünhage
e8396034ac fix(ci): fail meta using jq halt_error if data is unexpectedly missing (#11151)
## Problem
When the githb API is having problems, we might not get data back, and
are happily setting vars as empty. This causes problems down the line.
See
https://github.com/neondatabase/neon/actions/runs/13718859397/job/38381946590?pr=11132#step:5:1
for example.

## Summary of changes
Fail the `meta` job if we don't get expected data back from github.
2025-03-11 22:59:30 +00:00
Tristan Partin
decd265c99 Revert notify to 6.0.0 (#11162)
The upgrade to 8.0.0 caused severe performance regressions in the
start_postgres_ms metric, which measures the time it takes from execing
Postgres to the time Postgres marks itself as ready in the
postmaster.pid file. We use the notify crate to watch for changes in the
pgdata directory and the postmaster.pid file.

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-03-11 22:18:09 +00:00
Christian Schwarz
158db414bf buffered writer: handle write errors by retrying all write IO errors indefinitely (#10993)
# Problem

If the Pageserver ingest path
(InMemoryLayer=>EphemeralFile=>BufferedWriter)
encounters ENOSPC or any other write IO error when flushing the mutable
buffer
of the BufferedWriter, the buffered writer is left in a state where
subsequent _reads_
from the InMemoryLayer it will cause a `must not use after we returned
an error` panic.

The reason is that
1. the flush background task bails on flush failure, 
2. causing the `FlushHandle::flush` function to fail at channel.recv()
and
3. causing the `FlushHandle::flush` function to bail with the flush
error,
4. leaving its caller `BufferedWriter::flush` with
`BufferedWriter::mutable = None`,
5. once the InMemoryLayer's RwLock::write guard is dropped, subsequent
reads can enter,
6. those reads find `mutable = None` and cause the panic.

# Context

It has always been the contract that writes against the BufferedWriter
API
must not be retried because the writer/stream-style/append-only
interface makes no atomicity guarantees ("On error, did nothing or a
piece of the buffer get appended?").

The idea was that the error would bubble up to upper layers that can
throw away the buffered writer and create a new one. (See our [internal
error handling policy document on how to handle e.g.
`ENOSPC`](c870a50bc0/src/storage/handling_io_and_logical_errors.md (L36-L43))).

That _might_ be true for delta/image layer writers, I haven't checked.

But it's certainly not true for the ingest path: there are no provisions
to throw away an InMemoryLayer that encountered a write error an
reingest the WAL already written to it.

Adding such higher-level retries would involve either resetting
last_record_lsn to a lower value and restarting walreceiver. The code
isn't flexible enough to do that, and such complexity likely isn't worth
it given that write errors are rare.

# Solution

The solution in this PR is to retry _any_ failing write operation
_indefinitely_ inside the buffered writer flush task, except of course
those that are fatal as per `maybe_fatal_err`.

Retrying indefinitely ensures that `BufferedWriter::mutable` is never
left `None` in the case of IO errors, thereby solving the problem
described above.

It's a clear improvement over the status quo.

However, while we're retrying, we build up backpressure because the
`flush` is only double-buffered, not infinitely buffered.

Backpressure here is generally good to avoid resource exhaustion, **but
blocks reads** and hence stalls GetPage requests because InMemoryLayer
reads and writes are mutually exclusive.
That's orthogonal to the problem that is solved here, though.

## Caveats

Note that there are some remaining conditions in the flush background
task where it can bail with an error. I have annotated one of them with
a TODO comment.
Hence the `FlushHandle::flush` is still fallible and hence the overall
scenario of leaving `mutable = None` on the bail path is still possible.
We can clean that up in a later commit.

Note also that retrying indefinitely is great for temporary errors like
ENOSPC but likely undesirable in case the `std::io::Error` we get is
really due to higher-level logic bugs.
For example, we could fail to flush because the timeline or tenant
directory got deleted and VirtualFile's reopen fails with ENOENT.

Note finally that cancellation is not respected while we're retrying.
This means we will block timeline/tenant/pageserver shutdown.
The reason is that the existing cancellation story for the buffered
writer background task was to recv from flush op channel until the
sending side (FlushHandle) is explicitly shut down or dropped.

Failing to handle cancellation carries the operational risk that even if
a single timeline gets stuck because of a logic bug such as the one laid
out above, we must still restart the whole pageserver process.

# Alternatives Considered

As pointed out in the `Context` section, throwing away a InMemoryLayer
that encountered an error and reingesting the WAL is a lot of complexity
that IMO isn't justified for such an edge case.
Also, it's wasteful.
I think it's a local optimum.

A more general and simpler solution for ENOSPC is to `abort()` the
process and run eviction on startup before bringing up the rest of
pageserver.
I argued for it in the past, the pro arguments are still valid and
complete:
https://neondb.slack.com/archives/C033RQ5SPDH/p1716896265296329
The trouble at the time was implementing eviction on startup.
However, maybe things are simpler now that we are fully storcon-managed
and all tenants have secondaries.
For example, if pageserver `abort()`s on ENOSPC and then simply don't
respond to storcon heartbeats while we're running eviction on startup,
storcon will fail tenants over to the secondary anyway, giving us all
the time we need to clean up.

The downside is that if there's a systemic space management bug, above
proposal will just propagate the problem to other nodes. But I imagine
that because of the delays involved with filling up disks, the system
might reach a half-stable state, providing operators more time to react.

# Demo

Intermediary commit `a03f335121480afc0171b0f34606bdf929e962c5` is demoed
in this (internal) screen recording:
https://drive.google.com/file/d/1nBC6lFV2himQ8vRXDXrY30yfWmI2JL5J/view?usp=drive_link

# Perf Testing

Ran `bench_ingest` on tmpfs, no measurable difference.

Spans are uniquely owned by the flush task, and the span stack isn't too
deep, so, enter and exit should be cheap.
Plus, each flush takes ~150us with direct IO enabled, so, not _that_
high frequency event anyways.

# Refs
- fixes https://github.com/neondatabase/neon/issues/10856
2025-03-11 20:40:23 +00:00
99 changed files with 2109 additions and 995 deletions

View File

@@ -1,21 +0,0 @@
## Release 202Y-MM-DD
**NB: this PR must be merged only by 'Create a merge commit'!**
### Checklist when preparing for release
- [ ] Read or refresh [the release flow guide](https://www.notion.so/neondatabase/Release-general-flow-61f2e39fd45d4d14a70c7749604bd70b)
- [ ] Ask in the [cloud Slack channel](https://neondb.slack.com/archives/C033A2WE6BZ) that you are going to rollout the release. Any blockers?
- [ ] Does this release contain any db migrations? Destructive ones? What is the rollback plan?
<!-- List everything that should be done **before** release, any issues / setting changes / etc -->
### Checklist after release
- [ ] Make sure instructions from PRs included in this release and labeled `manual_release_instructions` are executed (either by you or by people who wrote them).
- [ ] Based on the merged commits write release notes and open a PR into `website` repo ([example](https://github.com/neondatabase/website/pull/219/files))
- [ ] Check [#dev-production-stream](https://neondb.slack.com/archives/C03F5SM1N02) Slack channel
- [ ] Check [stuck projects page](https://console.neon.tech/admin/projects?sort=last_active&order=desc&stuck=true)
- [ ] Check [recent operation failures](https://console.neon.tech/admin/operations?action=create_timeline%2Cstart_compute%2Cstop_compute%2Csuspend_compute%2Capply_config%2Cdelete_timeline%2Cdelete_tenant%2Ccreate_branch%2Ccheck_availability&sort=updated_at&order=desc&had_retries=some)
- [ ] Check [cloud SLO dashboard](https://neonprod.grafana.net/d/_oWcBMJ7k/cloud-slos?orgId=1)
- [ ] Check [compute startup metrics dashboard](https://neonprod.grafana.net/d/5OkYJEmVz/compute-startup-time)
<!-- List everything that should be done **after** release, any admin UI configuration / Grafana dashboard / alert changes / setting changes / etc -->

View File

@@ -1,14 +1,16 @@
import itertools
import json
import os
import sys
build_tag = os.environ["BUILD_TAG"]
branch = os.environ["BRANCH"]
dev_acr = os.environ["DEV_ACR"]
prod_acr = os.environ["PROD_ACR"]
dev_aws = os.environ["DEV_AWS"]
prod_aws = os.environ["PROD_AWS"]
aws_region = os.environ["AWS_REGION"]
source_tag = os.getenv("SOURCE_TAG")
target_tag = os.getenv("TARGET_TAG")
branch = os.getenv("BRANCH")
dev_acr = os.getenv("DEV_ACR")
prod_acr = os.getenv("PROD_ACR")
dev_aws = os.getenv("DEV_AWS")
prod_aws = os.getenv("PROD_AWS")
aws_region = os.getenv("AWS_REGION")
components = {
"neon": ["neon"],
@@ -39,24 +41,23 @@ registries = {
outputs: dict[str, dict[str, list[str]]] = {}
target_tags = [build_tag, "latest"] if branch == "main" else [build_tag]
target_stages = ["dev", "prod"] if branch.startswith("release") else ["dev"]
target_tags = [target_tag, "latest"] if branch == "main" else [target_tag]
target_stages = (
["dev", "prod"] if branch in ["release", "release-proxy", "release-compute"] else ["dev"]
)
for component_name, component_images in components.items():
for stage in target_stages:
outputs[f"{component_name}-{stage}"] = dict(
[
(
f"docker.io/neondatabase/{component_image}:{build_tag}",
[
f"{combo[0]}/{component_image}:{combo[1]}"
for combo in itertools.product(registries[stage], target_tags)
],
)
for component_image in component_images
outputs[f"{component_name}-{stage}"] = {
f"docker.io/neondatabase/{component_image}:{source_tag}": [
f"{registry}/{component_image}:{tag}"
for registry, tag in itertools.product(registries[stage], target_tags)
if not (registry == "docker.io/neondatabase" and tag == source_tag)
]
)
for component_image in component_images
}
with open(os.environ["GITHUB_OUTPUT"], "a") as f:
with open(os.getenv("GITHUB_OUTPUT", "/dev/null"), "a") as f:
for key, value in outputs.items():
f.write(f"{key}={json.dumps(value)}\n")
print(f"Image map for {key}:\n{json.dumps(value, indent=2)}\n\n", file=sys.stderr)

110
.github/scripts/lint-release-pr.sh vendored Executable file
View File

@@ -0,0 +1,110 @@
#!/usr/bin/env bash
set -euo pipefail
DOCS_URL="https://docs.neon.build/overview/repositories/neon.html"
message() {
if [[ -n "${GITHUB_PR_NUMBER:-}" ]]; then
gh pr comment --repo "${GITHUB_REPOSITORY}" "${GITHUB_PR_NUMBER}" --edit-last --body "$1" \
|| gh pr comment --repo "${GITHUB_REPOSITORY}" "${GITHUB_PR_NUMBER}" --body "$1"
fi
echo "$1"
}
report_error() {
message "$1
For more details, see the documentation: ${DOCS_URL}"
exit 1
}
case "$RELEASE_BRANCH" in
"release") COMPONENT="Storage" ;;
"release-proxy") COMPONENT="Proxy" ;;
"release-compute") COMPONENT="Compute" ;;
*)
report_error "Unknown release branch: ${RELEASE_BRANCH}"
;;
esac
# Identify main and release branches
MAIN_BRANCH="origin/main"
REMOTE_RELEASE_BRANCH="origin/${RELEASE_BRANCH}"
# Find merge base
MERGE_BASE=$(git merge-base "${MAIN_BRANCH}" "${REMOTE_RELEASE_BRANCH}")
echo "Merge base of ${MAIN_BRANCH} and ${RELEASE_BRANCH}: ${MERGE_BASE}"
# Get the HEAD commit (last commit in PR, expected to be the merge commit)
LAST_COMMIT=$(git rev-parse HEAD)
MERGE_COMMIT_MESSAGE=$(git log -1 --format=%s "${LAST_COMMIT}")
EXPECTED_MESSAGE_REGEX="^$COMPONENT release [0-9]{4}-[0-9]{2}-[0-9]{2}$"
if ! [[ "${MERGE_COMMIT_MESSAGE}" =~ ${EXPECTED_MESSAGE_REGEX} ]]; then
report_error "Merge commit message does not match expected pattern: '<component> release YYYY-MM-DD'
Expected component: ${COMPONENT}
Found: '${MERGE_COMMIT_MESSAGE}'"
fi
echo "✅ Merge commit message is correctly formatted: '${MERGE_COMMIT_MESSAGE}'"
LAST_COMMIT_PARENTS=$(git cat-file -p "${LAST_COMMIT}" | jq -sR '[capture("parent (?<parent>[0-9a-f]{40})"; "g") | .parent]')
if [[ "$(echo "${LAST_COMMIT_PARENTS}" | jq 'length')" -ne 2 ]]; then
report_error "Last commit must be a merge commit with exactly two parents"
fi
EXPECTED_RELEASE_HEAD=$(git rev-parse "${REMOTE_RELEASE_BRANCH}")
if echo "${LAST_COMMIT_PARENTS}" | jq -e --arg rel "${EXPECTED_RELEASE_HEAD}" 'index($rel) != null' > /dev/null; then
LINEAR_HEAD=$(echo "${LAST_COMMIT_PARENTS}" | jq -r '[.[] | select(. != $rel)][0]' --arg rel "${EXPECTED_RELEASE_HEAD}")
else
report_error "Last commit must merge the release branch (${RELEASE_BRANCH})"
fi
echo "✅ Last commit correctly merges the previous commit and the release branch"
echo "Top commit of linear history: ${LINEAR_HEAD}"
MERGE_COMMIT_TREE=$(git rev-parse "${LAST_COMMIT}^{tree}")
LINEAR_HEAD_TREE=$(git rev-parse "${LINEAR_HEAD}^{tree}")
if [[ "${MERGE_COMMIT_TREE}" != "${LINEAR_HEAD_TREE}" ]]; then
report_error "Tree of merge commit (${MERGE_COMMIT_TREE}) does not match tree of linear history head (${LINEAR_HEAD_TREE})
This indicates that the merge of ${RELEASE_BRANCH} into this branch was not performed using the merge strategy 'ours'"
fi
echo "✅ Merge commit tree matches the linear history head"
EXPECTED_PREVIOUS_COMMIT="${LINEAR_HEAD}"
# Now traverse down the history, ensuring each commit has exactly one parent
CURRENT_COMMIT="${EXPECTED_PREVIOUS_COMMIT}"
while [[ "${CURRENT_COMMIT}" != "${MERGE_BASE}" && "${CURRENT_COMMIT}" != "${EXPECTED_RELEASE_HEAD}" ]]; do
CURRENT_COMMIT_PARENTS=$(git cat-file -p "${CURRENT_COMMIT}" | jq -sR '[capture("parent (?<parent>[0-9a-f]{40})"; "g") | .parent]')
if [[ "$(echo "${CURRENT_COMMIT_PARENTS}" | jq 'length')" -ne 1 ]]; then
report_error "Commit ${CURRENT_COMMIT} must have exactly one parent"
fi
NEXT_COMMIT=$(echo "${CURRENT_COMMIT_PARENTS}" | jq -r '.[0]')
if [[ "${NEXT_COMMIT}" == "${MERGE_BASE}" ]]; then
echo "✅ Reached merge base (${MERGE_BASE})"
PR_BASE="${MERGE_BASE}"
elif [[ "${NEXT_COMMIT}" == "${EXPECTED_RELEASE_HEAD}" ]]; then
echo "✅ Reached release branch (${EXPECTED_RELEASE_HEAD})"
PR_BASE="${EXPECTED_RELEASE_HEAD}"
elif [[ -z "${NEXT_COMMIT}" ]]; then
report_error "Unexpected end of commit history before reaching merge base"
fi
# Move to the next commit in the chain
CURRENT_COMMIT="${NEXT_COMMIT}"
done
echo "✅ All commits are properly ordered and linear"
echo "✅ Release PR structure is valid"
echo
message "Commits that are part of this release:
$(git log --oneline "${PR_BASE}..${LINEAR_HEAD}")"

View File

@@ -17,6 +17,12 @@
({};
.[$entry.component] |= (if . == null or $entry.version > .version then $entry else . end))
# Ensure that each component exists, or fail
| (["storage", "compute", "proxy"] - (keys)) as $missing
| if ($missing | length) > 0 then
"Error: Found no release for \($missing | join(", "))!\n" | halt_error(1)
else . end
# Convert the resulting object into an array of formatted strings
| to_entries
| map("\(.key)=\(.value.full)")

View File

@@ -7,8 +7,8 @@ on:
description: 'Component name'
required: true
type: string
release-branch:
description: 'Release branch'
source-branch:
description: 'Source branch'
required: true
type: string
secrets:
@@ -30,17 +30,25 @@ jobs:
steps:
- uses: actions/checkout@v4
with:
ref: main
ref: ${{ inputs.source-branch }}
fetch-depth: 0
- name: Set variables
id: vars
env:
COMPONENT_NAME: ${{ inputs.component-name }}
RELEASE_BRANCH: ${{ inputs.release-branch }}
RELEASE_BRANCH: >-
${{
false
|| inputs.component-name == 'Storage' && 'release'
|| inputs.component-name == 'Proxy' && 'release-proxy'
|| inputs.component-name == 'Compute' && 'release-compute'
}}
run: |
today=$(date +'%Y-%m-%d')
echo "title=${COMPONENT_NAME} release ${today}" | tee -a ${GITHUB_OUTPUT}
echo "rc-branch=rc/${RELEASE_BRANCH}/${today}" | tee -a ${GITHUB_OUTPUT}
echo "release-branch=${RELEASE_BRANCH}" | tee -a ${GITHUB_OUTPUT}
- name: Configure git
run: |
@@ -49,31 +57,36 @@ jobs:
- name: Create RC branch
env:
RELEASE_BRANCH: ${{ steps.vars.outputs.release-branch }}
RC_BRANCH: ${{ steps.vars.outputs.rc-branch }}
TITLE: ${{ steps.vars.outputs.title }}
run: |
git checkout -b "${RC_BRANCH}"
git switch -c "${RC_BRANCH}"
# create an empty commit to distinguish workflow runs
# from other possible releases from the same commit
git commit --allow-empty -m "${TITLE}"
# Manually create a merge commit on the current branch, keeping the
# tree and setting the parents to the current HEAD and the HEAD of the
# release branch. This commit is what we'll fast-forward the release
# branch to when merging the release branch.
# For details on why, look at
# https://docs.neon.build/overview/repositories/neon.html#background-on-commit-history-of-release-prs
current_tree=$(git rev-parse 'HEAD^{tree}')
release_head=$(git rev-parse "origin/${RELEASE_BRANCH}")
current_head=$(git rev-parse HEAD)
merge_commit=$(git commit-tree -p "${current_head}" -p "${release_head}" -m "${TITLE}" "${current_tree}")
# Fast-forward the current branch to the newly created merge_commit
git merge --ff-only ${merge_commit}
git push origin "${RC_BRANCH}"
- name: Create a PR into ${{ inputs.release-branch }}
- name: Create a PR into ${{ steps.vars.outputs.release-branch }}
env:
GH_TOKEN: ${{ secrets.ci-access-token }}
RC_BRANCH: ${{ steps.vars.outputs.rc-branch }}
RELEASE_BRANCH: ${{ inputs.release-branch }}
RELEASE_BRANCH: ${{ steps.vars.outputs.release-branch }}
TITLE: ${{ steps.vars.outputs.title }}
run: |
cat << EOF > body.md
## ${TITLE}
**Please merge this Pull Request using 'Create a merge commit' button**
EOF
gh pr create --title "${TITLE}" \
--body-file "body.md" \
--body "" \
--head "${RC_BRANCH}" \
--base "${RELEASE_BRANCH}"

View File

@@ -21,9 +21,16 @@ on:
run-kind:
description: "The kind of run we're currently in. Will be one of `push-main`, `storage-release`, `compute-release`, `proxy-release`, `storage-rc-pr`, `compute-rc-pr`, `proxy-rc-pr`, `pr`, or `workflow-dispatch`"
value: ${{ jobs.tags.outputs.run-kind }}
release-pr-run-id:
description: "Only available if `run-kind in [storage-release, proxy-release, compute-release]`. Contains the run ID of the `Build and Test` workflow, assuming one with the current commit can be found."
value: ${{ jobs.tags.outputs.release-pr-run-id }}
permissions: {}
defaults:
run:
shell: bash -euo pipefail {0}
jobs:
tags:
runs-on: ubuntu-22.04
@@ -33,6 +40,7 @@ jobs:
proxy: ${{ steps.previous-releases.outputs.proxy }}
storage: ${{ steps.previous-releases.outputs.storage }}
run-kind: ${{ steps.run-kind.outputs.run-kind }}
release-pr-run-id: ${{ steps.release-pr-run-id.outputs.release-pr-run-id }}
permissions:
contents: read
steps:
@@ -83,7 +91,11 @@ jobs:
echo "tag=release-compute-$(git rev-list --count HEAD)" | tee -a $GITHUB_OUTPUT
;;
pr|storage-rc-pr|compute-rc-pr|proxy-rc-pr)
BUILD_AND_TEST_RUN_ID=$(gh run list -b $CURRENT_BRANCH -c $CURRENT_SHA -w 'Build and Test' -L 1 --json databaseId --jq '.[].databaseId')
BUILD_AND_TEST_RUN_ID=$(gh api --paginate \
-H "Accept: application/vnd.github+json" \
-H "X-GitHub-Api-Version: 2022-11-28" \
"/repos/${GITHUB_REPOSITORY}/actions/runs?head_sha=${CURRENT_SHA}&branch=${CURRENT_BRANCH}" \
| jq '[.workflow_runs[] | select(.name == "Build and Test")][0].id // ("Error: No matching workflow run found." | halt_error(1))')
echo "tag=$BUILD_AND_TEST_RUN_ID" | tee -a $GITHUB_OUTPUT
;;
workflow-dispatch)
@@ -105,3 +117,13 @@ jobs:
"/repos/${GITHUB_REPOSITORY}/releases" \
| jq -f .github/scripts/previous-releases.jq -r \
| tee -a "${GITHUB_OUTPUT}"
- name: Get the release PR run ID
id: release-pr-run-id
if: ${{ contains(fromJson('["storage-release", "compute-release", "proxy-release"]'), steps.run-kind.outputs.run-kind) }}
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CURRENT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}
run: |
RELEASE_PR_RUN_ID=$(gh api "/repos/${GITHUB_REPOSITORY}/actions/runs?head_sha=$CURRENT_SHA" | jq '[.workflow_runs[] | select(.name == "Build and Test") | select(.head_branch | test("^rc/release(-(proxy)|(compute))?/[0-9]{4}-[0-9]{2}-[0-9]{2}$"; "s"))] | first | .id // ("Falied to find Build and Test run from RC PR!" | halt_error(1))')
echo "release-pr-run-id=$RELEASE_PR_RUN_ID" | tee -a $GITHUB_OUTPUT

View File

@@ -476,7 +476,7 @@ jobs:
(
!github.event.pull_request.draft
|| contains( github.event.pull_request.labels.*.name, 'run-e2e-tests-in-draft')
|| contains(fromJSON('["push-main", "storage-release", "proxy-release", "compute-release"]'), needs.meta.outputs.run-kind)
|| needs.meta.outputs.run-kind == 'push-main'
) && !failure() && !cancelled()
}}
needs: [ check-permissions, push-neon-image-dev, push-compute-image-dev, meta ]
@@ -487,7 +487,7 @@ jobs:
neon-image-arch:
needs: [ check-permissions, build-build-tools-image, meta ]
if: ${{ contains(fromJSON('["push-main", "pr", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr", "storage-rc-pr", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
strategy:
matrix:
arch: [ x64, arm64 ]
@@ -537,7 +537,7 @@ jobs:
neon-image:
needs: [ neon-image-arch, meta ]
if: ${{ contains(fromJSON('["push-main", "pr", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr", "storage-rc-pr", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
runs-on: ubuntu-22.04
permissions:
id-token: write # aws-actions/configure-aws-credentials
@@ -559,7 +559,7 @@ jobs:
compute-node-image-arch:
needs: [ check-permissions, build-build-tools-image, meta ]
if: ${{ contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
permissions:
id-token: write # aws-actions/configure-aws-credentials
statuses: write
@@ -651,7 +651,7 @@ jobs:
compute-node-image:
needs: [ compute-node-image-arch, meta ]
if: ${{ contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
permissions:
id-token: write # aws-actions/configure-aws-credentials
statuses: write
@@ -694,7 +694,7 @@ jobs:
vm-compute-node-image-arch:
needs: [ check-permissions, meta, compute-node-image ]
if: ${{ contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', matrix.arch == 'arm64' && 'large-arm64' || 'large')) }}
strategy:
fail-fast: false
@@ -747,7 +747,7 @@ jobs:
vm-compute-node-image:
needs: [ vm-compute-node-image-arch, meta ]
if: ${{ contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
runs-on: ubuntu-22.04
strategy:
matrix:
@@ -773,7 +773,12 @@ jobs:
test-images:
needs: [ check-permissions, meta, neon-image, compute-node-image ]
# Depends on jobs that can get skipped
if: "!failure() && !cancelled()"
if: >-
${{
!failure()
&& !cancelled()
&& contains(fromJSON('["push-main", "pr", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind)
}}
strategy:
fail-fast: false
matrix:
@@ -800,7 +805,7 @@ jobs:
# Ensure that we don't have bad versions.
- name: Verify image versions
shell: bash # ensure no set -e for better error messages
if: ${{ contains(fromJSON('["push-main", "pr", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr", "storage-rc-pr", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
run: |
pageserver_version=$(docker run --rm neondatabase/neon:${{ needs.meta.outputs.build-tag }} "/bin/sh" "-c" "/usr/local/bin/pageserver --version")
@@ -821,19 +826,19 @@ jobs:
env:
TAG: >-
${{
contains(fromJSON('["compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind)
needs.meta.outputs.run-kind == 'compute-rc-pr'
&& needs.meta.outputs.previous-storage-release
|| needs.meta.outputs.build-tag
}}
COMPUTE_TAG: >-
${{
contains(fromJSON('["storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind)
contains(fromJSON('["storage-rc-pr", "proxy-rc-pr"]'), needs.meta.outputs.run-kind)
&& needs.meta.outputs.previous-compute-release
|| needs.meta.outputs.build-tag
}}
TEST_EXTENSIONS_TAG: >-
${{
contains(fromJSON('["storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind)
contains(fromJSON('["storage-rc-pr", "proxy-rc-pr"]'), needs.meta.outputs.run-kind)
&& 'latest'
|| needs.meta.outputs.build-tag
}}
@@ -885,7 +890,13 @@ jobs:
id: generate
run: python3 .github/scripts/generate_image_maps.py
env:
BUILD_TAG: "${{ needs.meta.outputs.build-tag }}"
SOURCE_TAG: >-
${{
contains(fromJson('["storage-release", "compute-release", "proxy-release"]'), needs.meta.outputs.run-kind)
&& needs.meta.outputs.release-pr-run-id
|| needs.meta.outputs.build-tag
}}
TARGET_TAG: ${{ needs.meta.outputs.build-tag }}
BRANCH: "${{ github.ref_name }}"
DEV_ACR: "${{ vars.AZURE_DEV_REGISTRY_NAME }}"
PROD_ACR: "${{ vars.AZURE_PROD_REGISTRY_NAME }}"
@@ -895,7 +906,7 @@ jobs:
push-neon-image-dev:
needs: [ meta, generate-image-maps, neon-image ]
if: ${{ contains(fromJSON('["push-main", "pr", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ !failure() && !cancelled() && contains(fromJSON('["push-main", "pr", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
uses: ./.github/workflows/_push-to-container-registry.yml
permissions:
id-token: write # Required for aws/azure login
@@ -913,7 +924,7 @@ jobs:
push-compute-image-dev:
needs: [ meta, generate-image-maps, vm-compute-node-image ]
if: ${{ contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ !failure() && !cancelled() && contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
uses: ./.github/workflows/_push-to-container-registry.yml
permissions:
id-token: write # Required for aws/azure login
@@ -967,16 +978,55 @@ jobs:
acr-registry-name: ${{ vars.AZURE_PROD_REGISTRY_NAME }}
secrets: inherit
# This is a bit of a special case so we're not using a generated image map.
add-latest-tag-to-neon-extensions-test-image:
if: github.ref_name == 'main'
push-neon-test-extensions-image-ghcr:
if: ${{ contains(fromJSON('["push-main", "pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
needs: [ meta, compute-node-image ]
uses: ./.github/workflows/_push-to-container-registry.yml
with:
image-map: |
{
"docker.io/neondatabase/neon-test-extensions-v16:${{ needs.meta.outputs.build-tag }}": ["docker.io/neondatabase/neon-test-extensions-v16:latest"],
"docker.io/neondatabase/neon-test-extensions-v17:${{ needs.meta.outputs.build-tag }}": ["docker.io/neondatabase/neon-test-extensions-v17:latest"]
"docker.io/neondatabase/neon-test-extensions-v16:${{ needs.meta.outputs.build-tag }}": [
"ghcr.io/neondatabase/neon-test-extensions-v16:${{ needs.meta.outputs.build-tag }}"
],
"docker.io/neondatabase/neon-test-extensions-v17:${{ needs.meta.outputs.build-tag }}": [
"ghcr.io/neondatabase/neon-test-extensions-v17:${{ needs.meta.outputs.build-tag }}"
]
}
secrets: inherit
add-latest-tag-to-neon-test-extensions-image:
if: ${{ needs.meta.outputs.run-kind == 'push-main' }}
needs: [ meta, compute-node-image ]
uses: ./.github/workflows/_push-to-container-registry.yml
with:
image-map: |
{
"docker.io/neondatabase/neon-test-extensions-v16:${{ needs.meta.outputs.build-tag }}": [
"docker.io/neondatabase/neon-test-extensions-v16:latest",
"ghcr.io/neondatabase/neon-test-extensions-v16:latest"
],
"docker.io/neondatabase/neon-test-extensions-v17:${{ needs.meta.outputs.build-tag }}": [
"docker.io/neondatabase/neon-test-extensions-v17:latest",
"ghcr.io/neondatabase/neon-test-extensions-v17:latest"
]
}
secrets: inherit
add-release-tag-to-neon-test-extensions-image:
if: ${{ needs.meta.outputs.run-kind == 'compute-release' }}
needs: [ meta, compute-node-image ]
uses: ./.github/workflows/_push-to-container-registry.yml
with:
image-map: |
{
"docker.io/neondatabase/neon-test-extensions-v16:${{ needs.meta.outputs.release-pr-run-id }}": [
"docker.io/neondatabase/neon-test-extensions-v16:${{ needs.meta.outputs.build-tag }}",
"ghcr.io/neondatabase/neon-test-extensions-v16:${{ needs.meta.outputs.build-tag }}"
],
"docker.io/neondatabase/neon-test-extensions-v17:${{ needs.meta.outputs.release-pr-run-id }}": [
"docker.io/neondatabase/neon-test-extensions-v17:${{ needs.meta.outputs.build-tag }}",
"ghcr.io/neondatabase/neon-test-extensions-v17:${{ needs.meta.outputs.build-tag }}"
]
}
secrets: inherit
@@ -1235,7 +1285,7 @@ jobs:
# The job runs on `release` branch and copies compatibility data and Neon artifact from the last *release PR* to the latest directory
promote-compatibility-data:
needs: [ deploy ]
needs: [ meta, deploy ]
permissions:
id-token: write # aws-actions/configure-aws-credentials
statuses: write
@@ -1245,37 +1295,6 @@ jobs:
runs-on: ubuntu-22.04
steps:
- name: Fetch GITHUB_RUN_ID and COMMIT_SHA for the last merged release PR
id: fetch-last-release-pr-info
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
branch_name_and_pr_number=$(gh pr list \
--repo "${GITHUB_REPOSITORY}" \
--base release \
--state merged \
--limit 10 \
--json mergeCommit,headRefName,number \
--jq ".[] | select(.mergeCommit.oid==\"${GITHUB_SHA}\") | { branch_name: .headRefName, pr_number: .number }")
branch_name=$(echo "${branch_name_and_pr_number}" | jq -r '.branch_name')
pr_number=$(echo "${branch_name_and_pr_number}" | jq -r '.pr_number')
run_id=$(gh run list \
--repo "${GITHUB_REPOSITORY}" \
--workflow build_and_test.yml \
--branch "${branch_name}" \
--json databaseId \
--limit 1 \
--jq '.[].databaseId')
last_commit_sha=$(gh pr view "${pr_number}" \
--repo "${GITHUB_REPOSITORY}" \
--json commits \
--jq '.commits[-1].oid')
echo "run-id=${run_id}" | tee -a ${GITHUB_OUTPUT}
echo "commit-sha=${last_commit_sha}" | tee -a ${GITHUB_OUTPUT}
- uses: aws-actions/configure-aws-credentials@v4
with:
aws-region: eu-central-1
@@ -1286,8 +1305,8 @@ jobs:
env:
BUCKET: neon-github-public-dev
AWS_REGION: eu-central-1
COMMIT_SHA: ${{ steps.fetch-last-release-pr-info.outputs.commit-sha }}
RUN_ID: ${{ steps.fetch-last-release-pr-info.outputs.run-id }}
COMMIT_SHA: ${{ github.sha }}
RUN_ID: ${{ needs.meta.outputs.release-pr-run-id }}
run: |
old_prefix="artifacts/${COMMIT_SHA}/${RUN_ID}"
new_prefix="artifacts/latest"
@@ -1376,5 +1395,5 @@ jobs:
|| needs.files-changed.result == 'skipped'
|| (needs.push-compute-image-dev.result == 'skipped' && contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind))
|| (needs.push-neon-image-dev.result == 'skipped' && contains(fromJSON('["push-main", "pr", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind))
|| needs.test-images.result == 'skipped'
|| (needs.test-images.result == 'skipped' && contains(fromJSON('["push-main", "pr", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind))
|| (needs.trigger-custom-extensions-build-and-wait.result == 'skipped' && contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind))

36
.github/workflows/fast-forward.yml vendored Normal file
View File

@@ -0,0 +1,36 @@
name: Fast forward merge
on:
pull_request:
types: [labeled]
branches:
- release
- release-proxy
- release-compute
jobs:
fast-forward:
if: ${{ github.event.label.name == 'fast-forward' }}
runs-on: ubuntu-22.04
steps:
- name: Remove fast-forward label to PR
env:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
run: |
gh pr edit ${{ github.event.pull_request.number }} --repo "${GITHUB_REPOSITORY}" --remove-label "fast-forward"
- name: Fast forwarding
uses: sequoia-pgp/fast-forward@ea7628bedcb0b0b96e94383ada458d812fca4979
# See https://docs.github.com/en/graphql/reference/enums#mergestatestatus
if: ${{ github.event.pull_request.mergeable_state == 'clean' }}
with:
merge: true
comment: on-error
github_token: ${{ secrets.CI_ACCESS_TOKEN }}
- name: Comment if mergeable_state is not clean
if: ${{ github.event.pull_request.mergeable_state != 'clean' }}
run: |
gh pr comment ${{ github.event.pull_request.number }} \
--repo "${GITHUB_REPOSITORY}" \
--body "Not trying to forward pull-request, because \`mergeable_state\` is \`${{ github.event.pull_request.mergeable_state }}\`, not \`clean\`."

24
.github/workflows/lint-release-pr.yml vendored Normal file
View File

@@ -0,0 +1,24 @@
name: Lint Release PR
on:
pull_request:
branches:
- release
- release-proxy
- release-compute
jobs:
lint-release-pr:
runs-on: ubuntu-22.04
steps:
- name: Checkout PR branch
uses: actions/checkout@v4
with:
fetch-depth: 0 # Fetch full history for git operations
ref: ${{ github.event.pull_request.head.ref }}
- name: Run lint script
env:
RELEASE_BRANCH: ${{ github.base_ref }}
run: |
./.github/scripts/lint-release-pr.sh

View File

@@ -8,8 +8,6 @@ on:
- .github/workflows/build-build-tools-image.yml
- .github/workflows/pre-merge-checks.yml
merge_group:
branches:
- main
defaults:
run:
@@ -19,11 +17,13 @@ defaults:
permissions: {}
jobs:
get-changed-files:
meta:
runs-on: ubuntu-22.04
outputs:
python-changed: ${{ steps.python-src.outputs.any_changed }}
rust-changed: ${{ steps.rust-src.outputs.any_changed }}
branch: ${{ steps.group-metadata.outputs.branch }}
pr-number: ${{ steps.group-metadata.outputs.pr-number }}
steps:
- uses: actions/checkout@v4
@@ -58,12 +58,20 @@ jobs:
echo "${PYTHON_CHANGED_FILES}"
echo "${RUST_CHANGED_FILES}"
- name: Merge group metadata
if: ${{ github.event_name == 'merge_group' }}
id: group-metadata
env:
MERGE_QUEUE_REF: ${{ github.event.merge_group.head_ref }}
run: |
echo $MERGE_QUEUE_REF | jq -Rr 'capture("refs/heads/gh-readonly-queue/(?<branch>.*)/pr-(?<pr_number>[0-9]+)-[0-9a-f]{40}") | ["branch=" + .branch, "pr-number=" + .pr_number] | .[]' | tee -a "${GITHUB_OUTPUT}"
build-build-tools-image:
if: |
false
|| needs.get-changed-files.outputs.python-changed == 'true'
|| needs.get-changed-files.outputs.rust-changed == 'true'
needs: [ get-changed-files ]
|| needs.meta.outputs.python-changed == 'true'
|| needs.meta.outputs.rust-changed == 'true'
needs: [ meta ]
uses: ./.github/workflows/build-build-tools-image.yml
with:
# Build only one combination to save time
@@ -72,8 +80,8 @@ jobs:
secrets: inherit
check-codestyle-python:
if: needs.get-changed-files.outputs.python-changed == 'true'
needs: [ get-changed-files, build-build-tools-image ]
if: needs.meta.outputs.python-changed == 'true'
needs: [ meta, build-build-tools-image ]
uses: ./.github/workflows/_check-codestyle-python.yml
with:
# `-bookworm-x64` suffix should match the combination in `build-build-tools-image`
@@ -81,8 +89,8 @@ jobs:
secrets: inherit
check-codestyle-rust:
if: needs.get-changed-files.outputs.rust-changed == 'true'
needs: [ get-changed-files, build-build-tools-image ]
if: needs.meta.outputs.rust-changed == 'true'
needs: [ meta, build-build-tools-image ]
uses: ./.github/workflows/_check-codestyle-rust.yml
with:
# `-bookworm-x64` suffix should match the combination in `build-build-tools-image`
@@ -101,7 +109,7 @@ jobs:
statuses: write # for `github.repos.createCommitStatus(...)`
contents: write
needs:
- get-changed-files
- meta
- check-codestyle-python
- check-codestyle-rust
runs-on: ubuntu-22.04
@@ -129,7 +137,20 @@ jobs:
run: exit 1
if: |
false
|| (needs.check-codestyle-python.result == 'skipped' && needs.get-changed-files.outputs.python-changed == 'true')
|| (needs.check-codestyle-rust.result == 'skipped' && needs.get-changed-files.outputs.rust-changed == 'true')
|| (github.event_name == 'merge_group' && needs.meta.outputs.branch != 'main')
|| (needs.check-codestyle-python.result == 'skipped' && needs.meta.outputs.python-changed == 'true')
|| (needs.check-codestyle-rust.result == 'skipped' && needs.meta.outputs.rust-changed == 'true')
|| contains(needs.*.result, 'failure')
|| contains(needs.*.result, 'cancelled')
- name: Add fast-forward label to PR to trigger fast-forward merge
if: >-
${{
always()
&& github.event_name == 'merge_group'
&& contains(fromJson('["release", "release-proxy", "release-compute"]'), github.base_ref)
}}
env:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
run: >-
gh pr edit ${{ needs.meta.outputs.pr-number }} --repo "${GITHUB_REPOSITORY}" --add-label "fast-forward"

View File

@@ -38,7 +38,7 @@ jobs:
uses: ./.github/workflows/_create-release-pr.yml
with:
component-name: 'Storage'
release-branch: 'release'
source-branch: ${{ github.ref_name }}
secrets:
ci-access-token: ${{ secrets.CI_ACCESS_TOKEN }}
@@ -51,7 +51,7 @@ jobs:
uses: ./.github/workflows/_create-release-pr.yml
with:
component-name: 'Proxy'
release-branch: 'release-proxy'
source-branch: ${{ github.ref_name }}
secrets:
ci-access-token: ${{ secrets.CI_ACCESS_TOKEN }}
@@ -64,6 +64,6 @@ jobs:
uses: ./.github/workflows/_create-release-pr.yml
with:
component-name: 'Compute'
release-branch: 'release-compute'
source-branch: ${{ github.ref_name }}
secrets:
ci-access-token: ${{ secrets.CI_ACCESS_TOKEN }}

258
Cargo.lock generated
View File

@@ -191,7 +191,7 @@ checksum = "965c2d33e53cb6b267e148a4cb0760bc01f4904c1cd4bb4002a085bb016d1490"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
"synstructure",
]
@@ -203,7 +203,7 @@ checksum = "7b18050c2cd6fe86c3a76584ef5e0baf286d038cda203eb6223df2cc413565f7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -272,7 +272,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -283,7 +283,7 @@ checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -1021,7 +1021,7 @@ dependencies = [
"regex",
"rustc-hash 2.1.1",
"shlex",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -1248,7 +1248,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -1309,6 +1309,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"chrono",
"indexmap 2.0.1",
"jsonwebtoken",
"regex",
"remote_storage",
@@ -1339,6 +1340,7 @@ dependencies = [
"flate2",
"futures",
"http 1.1.0",
"indexmap 2.0.1",
"jsonwebtoken",
"metrics",
"nix 0.27.1",
@@ -1347,17 +1349,20 @@ dependencies = [
"once_cell",
"opentelemetry",
"opentelemetry_sdk",
"p256 0.13.2",
"postgres",
"postgres_initdb",
"regex",
"remote_storage",
"reqwest",
"ring",
"rlimit",
"rust-ini",
"serde",
"serde_json",
"serde_with",
"signal-hook",
"spki 0.7.3",
"tar",
"thiserror 1.0.69",
"tokio",
@@ -1377,6 +1382,7 @@ dependencies = [
"vm_monitor",
"walkdir",
"workspace_hack",
"x509-cert",
"zstd",
]
@@ -1703,7 +1709,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -1727,7 +1733,7 @@ dependencies = [
"proc-macro2",
"quote",
"strsim 0.10.0",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -1738,7 +1744,7 @@ checksum = "29a358ff9f12ec09c3e61fef9b5a9902623a695a46a917b07f269bff1445611a"
dependencies = [
"darling_core",
"quote",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -1801,6 +1807,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fffa369a668c8af7dbf8b5e56c9f744fbd399949ed171606040001947de40b1c"
dependencies = [
"const-oid",
"der_derive",
"flagset",
"pem-rfc7468",
"zeroize",
]
@@ -1819,6 +1827,17 @@ dependencies = [
"rusticata-macros",
]
[[package]]
name = "der_derive"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8034092389675178f570469e6c3b0465d3d30b4505c294a6550db47f3c17ad18"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]]
name = "deranged"
version = "0.3.11"
@@ -1888,7 +1907,7 @@ dependencies = [
"dsl_auto_type",
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -1908,7 +1927,7 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "209c735641a413bc68c4923a9d6ad4bcb3ca306b794edaa7eb0b3228a99ffb25"
dependencies = [
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -1937,7 +1956,7 @@ checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -1960,7 +1979,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -2105,7 +2124,7 @@ dependencies = [
"darling",
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -2115,28 +2134,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "186e05a59d4c50738528153b83b0b0194d3a29507dfec16eccd4b342903397d0"
dependencies = [
"log",
]
[[package]]
name = "env_logger"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4cd405aab171cb85d6735e5c8d9db038c17d3ca007a4d2c25f337935c3d90580"
dependencies = [
"humantime",
"is-terminal",
"log",
"regex",
"termcolor",
]
[[package]]
name = "env_logger"
version = "0.11.2"
version = "0.11.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c012a26a7f605efc424dd53697843a72be7dc86ad2d01f7814337794a12231d"
checksum = "c3716d7a920fb4fac5d84e9d4bce8ceb321e9414b4409da61b07b75c1e3d0697"
dependencies = [
"anstream",
"anstyle",
"env_filter",
"jiff",
"log",
]
@@ -2157,7 +2167,7 @@ checksum = "3bf679796c0322556351f287a51b49e48f7c4986e727b5dd78c972d30e2e16cc"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -2291,6 +2301,12 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
[[package]]
name = "flagset"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3ea1ec5f8307826a5b71094dd91fc04d4ae75d5709b20ad351c7fb4815c86ec"
[[package]]
name = "flate2"
version = "1.0.26"
@@ -2417,7 +2433,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -2530,7 +2546,7 @@ checksum = "53010ccb100b96a67bc32c0175f0ed1426b31b655d562898e57325f81c023ac0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -3148,7 +3164,7 @@ checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -3241,7 +3257,7 @@ dependencies = [
"crossbeam-channel",
"crossbeam-utils",
"dashmap 6.1.0",
"env_logger 0.11.2",
"env_logger",
"indexmap 2.0.1",
"itoa",
"log",
@@ -3364,6 +3380,30 @@ dependencies = [
"tracing",
]
[[package]]
name = "jiff"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d699bc6dfc879fb1bf9bdff0d4c56f0884fc6f0d0eb0fba397a6d00cd9a6b85e"
dependencies = [
"jiff-static",
"log",
"portable-atomic",
"portable-atomic-util",
"serde",
]
[[package]]
name = "jiff-static"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d16e75759ee0aa64c57a56acbf43916987b20c77373cb7e808979e02b93c9f9"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]]
name = "jobserver"
version = "0.1.32"
@@ -3535,9 +3575,9 @@ dependencies = [
[[package]]
name = "log"
version = "0.4.20"
version = "0.4.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
checksum = "30bde2b3dc3671ae49d8e2e9f044c7c005836e7a023ee57cffa25ab82764bb9e"
[[package]]
name = "lru"
@@ -3618,7 +3658,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -4308,6 +4348,7 @@ dependencies = [
"tokio-util",
"toml_edit",
"tracing",
"tracing-utils",
"url",
"utils",
"uuid",
@@ -4485,7 +4526,7 @@ dependencies = [
"parquet",
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -4587,7 +4628,7 @@ checksum = "f6e859e6e5bd50440ab63c47e3ebabc90f26251f7c73c3d3e837b74a1cc3fa67"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -4683,6 +4724,15 @@ version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "280dc24453071f1b63954171985a0b0d30058d287960968b9b2aca264c8d4ee6"
[[package]]
name = "portable-atomic-util"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507"
dependencies = [
"portable-atomic",
]
[[package]]
name = "postgres"
version = "0.19.7"
@@ -4790,7 +4840,7 @@ dependencies = [
"bytes",
"crc32c",
"criterion",
"env_logger 0.10.2",
"env_logger",
"log",
"memoffset 0.9.0",
"once_cell",
@@ -4889,7 +4939,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d3928fb5db768cb86f891ff014f0144589297e3c6a1aba6ed7cecfdace270c7"
dependencies = [
"proc-macro2",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -4903,9 +4953,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.92"
version = "1.0.94"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37d3544b3f2748c54e147655edb5025752e2303145b5aefb3c3ea2c78b973bb0"
checksum = "a31971752e70b8b2686d7e46ec17fb38dad4051d94024c88df49b667caea9c84"
dependencies = [
"unicode-ident",
]
@@ -4980,7 +5030,7 @@ checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4"
dependencies = [
"bytes",
"heck",
"itertools 0.10.5",
"itertools 0.12.1",
"log",
"multimap",
"once_cell",
@@ -4989,7 +5039,7 @@ dependencies = [
"prost 0.12.6",
"prost-types 0.12.6",
"regex",
"syn 2.0.90",
"syn 2.0.100",
"tempfile",
]
@@ -5010,7 +5060,7 @@ dependencies = [
"prost 0.13.3",
"prost-types 0.13.3",
"regex",
"syn 2.0.90",
"syn 2.0.100",
"tempfile",
]
@@ -5021,10 +5071,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1"
dependencies = [
"anyhow",
"itertools 0.10.5",
"itertools 0.12.1",
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -5037,7 +5087,7 @@ dependencies = [
"itertools 0.12.1",
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -5084,7 +5134,7 @@ dependencies = [
"consumption_metrics",
"ecdsa 0.16.9",
"ed25519-dalek",
"env_logger 0.10.2",
"env_logger",
"fallible-iterator",
"flate2",
"framed-websockets",
@@ -5221,9 +5271,9 @@ dependencies = [
[[package]]
name = "quote"
version = "1.0.37"
version = "1.0.39"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af"
checksum = "c1f1914ce909e1658d9907913b4b91947430c7d9be598b15a1912935b8c04801"
dependencies = [
"proc-macro2",
]
@@ -5752,7 +5802,7 @@ dependencies = [
"regex",
"relative-path",
"rustc_version",
"syn 2.0.90",
"syn 2.0.100",
"unicode-ident",
]
@@ -5967,7 +6017,7 @@ dependencies = [
"crc32c",
"criterion",
"desim",
"env_logger 0.10.2",
"env_logger",
"fail",
"futures",
"hex",
@@ -6298,7 +6348,7 @@ checksum = "ad1e866f866923f252f05c889987993144fb74e722403468a4ebd70c3cd756c0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -6380,7 +6430,7 @@ dependencies = [
"darling",
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -6395,9 +6445,9 @@ dependencies = [
[[package]]
name = "sha1"
version = "0.10.5"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3"
checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
dependencies = [
"cfg-if",
"cpufeatures",
@@ -6782,7 +6832,7 @@ dependencies = [
"proc-macro2",
"quote",
"rustversion",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -6833,9 +6883,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.90"
version = "2.0.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "919d3b74a5dd0ccd15aeb8f93e7006bd9e14c295087c9896a110f490752bcf31"
checksum = "b09a44accad81e1ba1cd74a32461ba89dee89095ba17b32f5d03683b1b1fc2a0"
dependencies = [
"proc-macro2",
"quote",
@@ -6865,7 +6915,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -6916,15 +6966,6 @@ dependencies = [
"serde_json",
]
[[package]]
name = "termcolor"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be55cf8942feac5c765c2c993422806843c9a9a45d4d5c407ad6dd2ea95eb9b6"
dependencies = [
"winapi-util",
]
[[package]]
name = "test-context"
version = "0.3.0"
@@ -6943,7 +6984,7 @@ checksum = "78ea17a2dc368aeca6f554343ced1b1e31f76d63683fa8016e5844bd7a5144a1"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -6972,7 +7013,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -6983,7 +7024,7 @@ checksum = "26afc1baea8a989337eeb52b6e72a039780ce45c3edfcc9c5b9d112feeb173c2"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -7114,6 +7155,27 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tls_codec"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0de2e01245e2bb89d6f05801c564fa27624dbd7b1846859876c7dad82e90bf6b"
dependencies = [
"tls_codec_derive",
"zeroize",
]
[[package]]
name = "tls_codec_derive"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d2e76690929402faae40aebdda620a2c0e25dd6d3b9afe48867dfd95991f4bd"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]]
name = "tokio"
version = "1.43.0"
@@ -7166,7 +7228,7 @@ checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -7399,7 +7461,7 @@ dependencies = [
"prost-build 0.13.3",
"prost-types 0.13.3",
"quote",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -7514,7 +7576,7 @@ checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -7845,6 +7907,7 @@ dependencies = [
"tracing",
"tracing-error",
"tracing-subscriber",
"tracing-utils",
"walkdir",
]
@@ -7908,7 +7971,7 @@ dependencies = [
"anyhow",
"camino-tempfile",
"clap",
"env_logger 0.10.2",
"env_logger",
"log",
"postgres",
"postgres_ffi",
@@ -8013,7 +8076,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
"wasm-bindgen-shared",
]
@@ -8047,7 +8110,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
@@ -8354,6 +8417,7 @@ name = "workspace_hack"
version = "0.1.0"
dependencies = [
"ahash",
"anstream",
"anyhow",
"base64 0.13.1",
"base64 0.21.7",
@@ -8364,12 +8428,17 @@ dependencies = [
"chrono",
"clap",
"clap_builder",
"const-oid",
"crypto-bigint 0.5.5",
"der 0.7.8",
"deranged",
"digest",
"displaydoc",
"ecdsa 0.16.9",
"either",
"elliptic-curve 0.13.8",
"env_filter",
"env_logger",
"fail",
"form_urlencoded",
"futures-channel",
@@ -8387,7 +8456,6 @@ dependencies = [
"hyper-util",
"indexmap 1.9.3",
"indexmap 2.0.1",
"itertools 0.10.5",
"itertools 0.12.1",
"lazy_static",
"libc",
@@ -8403,6 +8471,7 @@ dependencies = [
"num-rational",
"num-traits",
"once_cell",
"p256 0.13.2",
"parquet",
"prettyplease",
"proc-macro2",
@@ -8415,6 +8484,7 @@ dependencies = [
"reqwest",
"rustls 0.23.18",
"scopeguard",
"sec1 0.7.3",
"serde",
"serde_json",
"sha2",
@@ -8423,7 +8493,7 @@ dependencies = [
"spki 0.7.3",
"stable_deref_trait",
"subtle",
"syn 2.0.90",
"syn 2.0.100",
"sync_wrapper 0.1.2",
"tikv-jemalloc-ctl",
"tikv-jemalloc-sys",
@@ -8460,6 +8530,18 @@ version = "0.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51"
[[package]]
name = "x509-cert"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1301e935010a701ae5f8655edc0ad17c44bad3ac5ce8c39185f75453b720ae94"
dependencies = [
"const-oid",
"der 0.7.8",
"spki 0.7.3",
"tls_codec",
]
[[package]]
name = "x509-certificate"
version = "0.23.1"
@@ -8540,7 +8622,7 @@ checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
"synstructure",
]
@@ -8562,7 +8644,7 @@ checksum = "b3c129550b3e6de3fd0ba67ba5c81818f9805e58b8d7fee80a3a59d2c9fc601a"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -8582,15 +8664,15 @@ checksum = "595eed982f7d355beb85837f651fa22e90b3c044842dc7f2c2842c086f295808"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
"synstructure",
]
[[package]]
name = "zeroize"
version = "1.7.0"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "525b4ec142c6b68a2d10f01f7bbf6755599ca3f81ea53b8431b7dd348f5fdb2d"
checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde"
dependencies = [
"serde",
"zeroize_derive",
@@ -8604,7 +8686,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]
@@ -8626,7 +8708,7 @@ checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
"syn 2.0.100",
]
[[package]]

View File

@@ -112,7 +112,7 @@ hyper0 = { package = "hyper", version = "0.14" }
hyper = "1.4"
hyper-util = "0.1"
tokio-tungstenite = "0.21.0"
indexmap = "2"
indexmap = { version = "2", features = ["serde"] }
indoc = "2"
ipnet = "2.10.0"
itertools = "0.10"
@@ -219,7 +219,7 @@ zerocopy = { version = "0.7", features = ["derive"] }
json-structural-diff = { version = "0.2.0" }
## TODO replace this with tracing
env_logger = "0.10"
env_logger = "0.11"
log = "0.4"
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed

View File

@@ -1735,6 +1735,8 @@ RUN set -e \
libevent-dev \
libtool \
pkg-config \
libcurl4-openssl-dev \
libssl-dev \
&& apt clean && rm -rf /var/lib/apt/lists/*
# Use `dist_man_MANS=` to skip manpage generation (which requires python3/pandoc)
@@ -1743,7 +1745,7 @@ RUN set -e \
&& git clone --recurse-submodules --depth 1 --branch ${PGBOUNCER_TAG} https://github.com/pgbouncer/pgbouncer.git pgbouncer \
&& cd pgbouncer \
&& ./autogen.sh \
&& ./configure --prefix=/usr/local/pgbouncer --without-openssl \
&& ./configure --prefix=/usr/local/pgbouncer \
&& make -j $(nproc) dist_man_MANS= \
&& make install dist_man_MANS=

View File

@@ -26,6 +26,7 @@ fail.workspace = true
flate2.workspace = true
futures.workspace = true
http.workspace = true
indexmap.workspace = true
jsonwebtoken.workspace = true
metrics.workspace = true
nix.workspace = true
@@ -34,16 +35,19 @@ num_cpus.workspace = true
once_cell.workspace = true
opentelemetry.workspace = true
opentelemetry_sdk.workspace = true
p256 = { version = "0.13", features = ["pem"] }
postgres.workspace = true
regex.workspace = true
reqwest = { workspace = true, features = ["json"] }
ring = "0.17"
serde.workspace = true
serde_with.workspace = true
serde_json.workspace = true
signal-hook.workspace = true
spki = { version = "0.7.3", features = ["std"] }
tar.workspace = true
tower.workspace = true
tower-http.workspace = true
reqwest = { workspace = true, features = ["json"] }
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
tokio-postgres.workspace = true
tokio-util.workspace = true
@@ -57,6 +61,7 @@ thiserror.workspace = true
url.workspace = true
uuid.workspace = true
walkdir.workspace = true
x509-cert = { version = "0.2.5" }
postgres_initdb.workspace = true
compute_api.workspace = true

View File

@@ -41,6 +41,7 @@ use crate::rsyslog::configure_audit_rsyslog;
use crate::spec::*;
use crate::swap::resize_swap;
use crate::sync_sk::{check_if_synced, ping_safekeeper};
use crate::tls::watch_cert_for_changes;
use crate::{config, extension_server, local_proxy};
pub static SYNC_SAFEKEEPERS_PID: AtomicU32 = AtomicU32::new(0);
@@ -112,6 +113,7 @@ pub struct ComputeNode {
// key: ext_archive_name, value: started download time, download_completed?
pub ext_download_progress: RwLock<HashMap<String, (DateTime<Utc>, bool)>>,
pub compute_ctl_config: ComputeCtlConfig,
}
// store some metrics about download size that might impact startup time
@@ -135,8 +137,6 @@ pub struct ComputeState {
/// passed by the control plane with a /configure HTTP request.
pub pspec: Option<ParsedSpec>,
pub compute_ctl_config: ComputeCtlConfig,
/// If the spec is passed by a /configure request, 'startup_span' is the
/// /configure request's tracing span. The main thread enters it when it
/// processes the compute startup, so that the compute startup is considered
@@ -160,7 +160,6 @@ impl ComputeState {
last_active: None,
error: None,
pspec: None,
compute_ctl_config: ComputeCtlConfig::default(),
startup_span: None,
metrics: ComputeMetrics::default(),
}
@@ -314,7 +313,6 @@ impl ComputeNode {
let pspec = ParsedSpec::try_from(cli_spec).map_err(|msg| anyhow::anyhow!(msg))?;
new_state.pspec = Some(pspec);
}
new_state.compute_ctl_config = compute_ctl_config;
Ok(ComputeNode {
params,
@@ -323,6 +321,7 @@ impl ComputeNode {
state: Mutex::new(new_state),
state_changed: Condvar::new(),
ext_download_progress: RwLock::new(HashMap::new()),
compute_ctl_config,
})
}
@@ -345,7 +344,7 @@ impl ComputeNode {
// requests while configuration is still in progress.
crate::http::server::Server::External {
port: this.params.external_http_port,
jwks: this.state.lock().unwrap().compute_ctl_config.jwks.clone(),
config: this.compute_ctl_config.clone(),
compute_id: this.params.compute_id.clone(),
}
.launch(&this);
@@ -524,6 +523,16 @@ impl ComputeNode {
// Collect all the tasks that must finish here
let mut pre_tasks = tokio::task::JoinSet::new();
// Make sure TLS certificates are properly loaded and in the right place.
if self.compute_ctl_config.tls.is_some() {
let this = self.clone();
pre_tasks.spawn(async move {
this.watch_cert_for_changes().await;
Ok::<(), anyhow::Error>(())
});
}
// If there are any remote extensions in shared_preload_libraries, start downloading them
if pspec.spec.remote_extensions.is_some() {
let (this, spec) = (self.clone(), pspec.spec.clone());
@@ -579,11 +588,13 @@ impl ComputeNode {
if let Some(pgbouncer_settings) = &pspec.spec.pgbouncer_settings {
info!("tuning pgbouncer");
let pgbouncer_settings = pgbouncer_settings.clone();
let tls_config = self.compute_ctl_config.tls.clone();
// Spawn a background task to do the tuning,
// so that we don't block the main thread that starts Postgres.
let pgbouncer_settings = pgbouncer_settings.clone();
let _handle = tokio::spawn(async move {
let res = tune_pgbouncer(pgbouncer_settings).await;
let res = tune_pgbouncer(pgbouncer_settings, tls_config).await;
if let Err(err) = res {
error!("error while tuning pgbouncer: {err:?}");
// Continue with the startup anyway
@@ -645,9 +656,9 @@ impl ComputeNode {
if pspec.spec.mode == ComputeMode::Primary {
self.configure_as_primary(&compute_state)?;
let conf = self.get_conn_conf(None);
tokio::task::spawn_blocking(|| {
let res = get_installed_extensions(conf);
let conf = self.get_tokio_conn_conf(None);
tokio::task::spawn(async {
let res = get_installed_extensions(conf).await;
match res {
Ok(extensions) => {
info!(
@@ -855,12 +866,6 @@ impl ComputeNode {
info!("Storage auth token not set");
}
if let Some(spec) = &compute_state.pspec {
config.application_name(&format!("compute_ctl-{}", spec.spec.mode.to_type_str()));
} else {
config.application_name("compute_ctl");
}
// Connect to pageserver
let mut client = config.connect(NoTls)?;
let pageserver_connect_micros = start_time.elapsed().as_micros() as u64;
@@ -1111,9 +1116,10 @@ impl ComputeNode {
// Remove/create an empty pgdata directory and put configuration there.
self.create_pgdata()?;
config::write_postgres_conf(
&pgdata_path.join("postgresql.conf"),
pgdata_path,
&pspec.spec,
self.params.internal_http_port,
&self.compute_ctl_config.tls,
)?;
// Syncing safekeepers is only safe with primary nodes: if a primary
@@ -1495,11 +1501,13 @@ impl ComputeNode {
if let Some(ref pgbouncer_settings) = spec.pgbouncer_settings {
info!("tuning pgbouncer");
let pgbouncer_settings = pgbouncer_settings.clone();
let tls_config = self.compute_ctl_config.tls.clone();
// Spawn a background task to do the tuning,
// so that we don't block the main thread that starts Postgres.
let pgbouncer_settings = pgbouncer_settings.clone();
tokio::spawn(async move {
let res = tune_pgbouncer(pgbouncer_settings).await;
let res = tune_pgbouncer(pgbouncer_settings, tls_config).await;
if let Err(err) = res {
error!("error while tuning pgbouncer: {err:?}");
}
@@ -1511,7 +1519,8 @@ impl ComputeNode {
// Spawn a background task to do the configuration,
// so that we don't block the main thread that starts Postgres.
let local_proxy = local_proxy.clone();
let mut local_proxy = local_proxy.clone();
local_proxy.tls = self.compute_ctl_config.tls.clone();
tokio::spawn(async move {
if let Err(err) = local_proxy::configure(&local_proxy) {
error!("error while configuring local_proxy: {err:?}");
@@ -1521,8 +1530,12 @@ impl ComputeNode {
// Write new config
let pgdata_path = Path::new(&self.params.pgdata);
let postgresql_conf_path = pgdata_path.join("postgresql.conf");
config::write_postgres_conf(&postgresql_conf_path, &spec, self.params.internal_http_port)?;
config::write_postgres_conf(
pgdata_path,
&spec,
self.params.internal_http_port,
&self.compute_ctl_config.tls,
)?;
if !spec.skip_pg_catalog_updates {
let max_concurrent_connections = spec.reconfigure_concurrency;
@@ -1593,6 +1606,56 @@ impl ComputeNode {
Ok(())
}
pub async fn watch_cert_for_changes(self: Arc<Self>) {
// update status on cert renewal
if let Some(tls_config) = &self.compute_ctl_config.tls {
let tls_config = tls_config.clone();
// wait until the cert exists.
let mut cert_watch = watch_cert_for_changes(tls_config.cert_path.clone()).await;
tokio::task::spawn_blocking(move || {
let handle = tokio::runtime::Handle::current();
'cert_update: loop {
// let postgres/pgbouncer/local_proxy know the new cert/key exists.
// we need to wait until it's configurable first.
let mut state = self.state.lock().unwrap();
'status_update: loop {
match state.status {
// let's update the state to config pending
ComputeStatus::ConfigurationPending | ComputeStatus::Running => {
state.set_status(
ComputeStatus::ConfigurationPending,
&self.state_changed,
);
break 'status_update;
}
// exit loop
ComputeStatus::Failed
| ComputeStatus::TerminationPending
| ComputeStatus::Terminated => break 'cert_update,
// wait
ComputeStatus::Init
| ComputeStatus::Configuration
| ComputeStatus::Empty => {
state = self.state_changed.wait(state).unwrap();
}
}
}
drop(state);
// wait for a new certificate update
if handle.block_on(cert_watch.changed()).is_err() {
break;
}
}
});
}
}
/// Update the `last_active` in the shared state, but ensure that it's a more recent one.
pub fn update_last_active(&self, last_active: Option<DateTime<Utc>>) {
let mut state = self.state.lock().unwrap();

View File

@@ -6,11 +6,13 @@ use std::io::Write;
use std::io::prelude::*;
use std::path::Path;
use compute_api::responses::TlsConfig;
use compute_api::spec::{ComputeAudit, ComputeMode, ComputeSpec, GenericOption};
use crate::pg_helpers::{
GenericOptionExt, GenericOptionsSearch, PgOptionsSerialize, escape_conf_value,
};
use crate::tls::{self, SERVER_CRT, SERVER_KEY};
/// Check that `line` is inside a text file and put it there if it is not.
/// Create file if it doesn't exist.
@@ -38,10 +40,12 @@ pub fn line_in_file(path: &Path, line: &str) -> Result<bool> {
/// Create or completely rewrite configuration file specified by `path`
pub fn write_postgres_conf(
path: &Path,
pgdata_path: &Path,
spec: &ComputeSpec,
extension_server_port: u16,
tls_config: &Option<TlsConfig>,
) -> Result<()> {
let path = pgdata_path.join("postgresql.conf");
// File::create() destroys the file content if it exists.
let mut file = File::create(path)?;
@@ -86,6 +90,20 @@ pub fn write_postgres_conf(
)?;
}
// tls
if let Some(tls_config) = tls_config {
writeln!(file, "ssl = on")?;
// postgres requires the keyfile to be in a secure file,
// currently too complicated to ensure that at the VM level,
// so we just copy them to another file instead. :shrug:
tls::update_key_path_blocking(pgdata_path, tls_config);
// these are the default, but good to be explicit.
writeln!(file, "ssl_cert_file = '{}'", SERVER_CRT)?;
writeln!(file, "ssl_key_file = '{}'", SERVER_KEY)?;
}
// Locales
if cfg!(target_os = "macos") {
writeln!(file, "lc_messages='C'")?;
@@ -99,7 +117,6 @@ pub fn write_postgres_conf(
writeln!(file, "lc_numeric='C.UTF-8'")?;
}
writeln!(file, "neon.endpoint_type={}", spec.mode.to_type_str())?;
match spec.mode {
ComputeMode::Primary => {}
ComputeMode::Static(lsn) => {

View File

@@ -1 +1,2 @@
pub(in crate::http) mod authorize;
pub(in crate::http) mod request_id;

View File

@@ -0,0 +1,16 @@
use axum::{extract::Request, middleware::Next, response::Response};
use uuid::Uuid;
use crate::http::headers::X_REQUEST_ID;
/// This middleware function allows compute_ctl to generate its own request ID
/// if one isn't supplied. The control plane will always send one as a UUID. The
/// neon Postgres extension on the other hand does not send one.
pub async fn maybe_add_request_id_header(mut request: Request, next: Next) -> Response {
let headers = request.headers_mut();
if !headers.contains_key(X_REQUEST_ID) {
headers.append(X_REQUEST_ID, Uuid::new_v4().to_string().parse().unwrap());
}
next.run(request).await
}

View File

@@ -5,20 +5,19 @@ use std::time::Duration;
use anyhow::Result;
use axum::Router;
use axum::extract::Request;
use axum::middleware::{self, Next};
use axum::response::{IntoResponse, Response};
use axum::middleware::{self};
use axum::response::IntoResponse;
use axum::routing::{get, post};
use compute_api::responses::ComputeCtlConfig;
use http::StatusCode;
use jsonwebtoken::jwk::JwkSet;
use tokio::net::TcpListener;
use tower::ServiceBuilder;
use tower_http::{
auth::AsyncRequireAuthorizationLayer, request_id::PropagateRequestIdLayer, trace::TraceLayer,
};
use tracing::{Span, error, info};
use uuid::Uuid;
use super::middleware::request_id::maybe_add_request_id_header;
use super::{
headers::X_REQUEST_ID,
middleware::authorize::Authorize,
@@ -42,7 +41,7 @@ pub enum Server {
},
External {
port: u16,
jwks: JwkSet,
config: ComputeCtlConfig,
compute_id: String,
},
}
@@ -80,7 +79,7 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
router
}
Server::External {
jwks, compute_id, ..
config, compute_id, ..
} => {
let unauthenticated_router =
Router::<Arc<ComputeNode>>::new().route("/metrics", get(metrics::get_metrics));
@@ -96,7 +95,7 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
.route("/terminate", post(terminate::terminate))
.layer(AsyncRequireAuthorizationLayer::new(Authorize::new(
compute_id.clone(),
jwks.clone(),
config.jwks.clone(),
)));
router
@@ -219,15 +218,3 @@ impl Server {
tokio::spawn(self.serve(state));
}
}
/// This middleware function allows compute_ctl to generate its own request ID
/// if one isn't supplied. The control plane will always send one as a UUID. The
/// neon Postgres extension on the other hand does not send one.
async fn maybe_add_request_id_header(mut request: Request, next: Next) -> Response {
let headers = request.headers_mut();
if headers.get(X_REQUEST_ID).is_none() {
headers.append(X_REQUEST_ID, Uuid::new_v4().to_string().parse().unwrap());
}
next.run(request).await
}

View File

@@ -2,7 +2,7 @@ use std::collections::HashMap;
use anyhow::Result;
use compute_api::responses::{InstalledExtension, InstalledExtensions};
use postgres::{Client, NoTls};
use tokio_postgres::{Client, Config, NoTls};
use crate::metrics::INSTALLED_EXTENSIONS;
@@ -10,7 +10,7 @@ use crate::metrics::INSTALLED_EXTENSIONS;
/// and to make database listing query here more explicit.
///
/// Limit the number of databases to 500 to avoid excessive load.
fn list_dbs(client: &mut Client) -> Result<Vec<String>> {
async fn list_dbs(client: &mut Client) -> Result<Vec<String>> {
// `pg_database.datconnlimit = -2` means that the database is in the
// invalid state
let databases = client
@@ -20,7 +20,8 @@ fn list_dbs(client: &mut Client) -> Result<Vec<String>> {
AND datconnlimit <> - 2
LIMIT 500",
&[],
)?
)
.await?
.iter()
.map(|row| {
let db: String = row.get("datname");
@@ -36,20 +37,36 @@ fn list_dbs(client: &mut Client) -> Result<Vec<String>> {
/// Same extension can be installed in multiple databases with different versions,
/// so we report a separate metric (number of databases where it is installed)
/// for each extension version.
pub fn get_installed_extensions(mut conf: postgres::config::Config) -> Result<InstalledExtensions> {
pub async fn get_installed_extensions(mut conf: Config) -> Result<InstalledExtensions> {
conf.application_name("compute_ctl:get_installed_extensions");
let mut client = conf.connect(NoTls)?;
let databases: Vec<String> = list_dbs(&mut client)?;
let databases: Vec<String> = {
let (mut client, connection) = conf.connect(NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
list_dbs(&mut client).await?
};
let mut extensions_map: HashMap<(String, String, String), InstalledExtension> = HashMap::new();
for db in databases.iter() {
conf.dbname(db);
let mut db_client = conf.connect(NoTls)?;
let extensions: Vec<(String, String, i32)> = db_client
let (client, connection) = conf.connect(NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
let extensions: Vec<(String, String, i32)> = client
.query(
"SELECT extname, extversion, extowner::integer FROM pg_catalog.pg_extension",
&[],
)?
)
.await?
.iter()
.map(|row| {
(

View File

@@ -26,3 +26,4 @@ pub mod spec;
mod spec_apply;
pub mod swap;
pub mod sync_sk;
pub mod tls;

View File

@@ -24,7 +24,8 @@ pub async fn init_tracing_and_logging(default_log_level: &str) -> anyhow::Result
.with_writer(std::io::stderr);
// Initialize OpenTelemetry
let otlp_layer = tracing_utils::init_tracing("compute_ctl").await;
let otlp_layer =
tracing_utils::init_tracing("compute_ctl", tracing_utils::ExportConfig::default()).await;
// Put it all together
tracing_subscriber::registry()

View File

@@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::fmt::Write;
use std::fs;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::io::{BufRead, BufReader, Seek};
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::process::Child;
@@ -10,8 +10,10 @@ use std::str::FromStr;
use std::time::{Duration, Instant};
use anyhow::{Result, bail};
use compute_api::responses::TlsConfig;
use compute_api::spec::{Database, GenericOption, GenericOptions, PgIdent, Role};
use futures::StreamExt;
use indexmap::IndexMap;
use ini::Ini;
use notify::{RecursiveMode, Watcher};
use postgres::config::Config;
@@ -333,10 +335,25 @@ pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> {
}
};
watcher.watch(pgdata, RecursiveMode::NonRecursive)?;
// You cannot actually watch a file before it exists, so let's create the
// the postmaster.pid file for Postgres, so we can watch it even before
// Postgres actually starts. In the event that it already exists, just open
// the file for reading. Remember that we are racing Postgres here and that
// it doesn't matter who creates the postmaster.pid.
let mut file = match File::create(&pid_path) {
Ok(file) => file,
Err(e) => {
if e.kind() != std::io::ErrorKind::AlreadyExists {
return Err(anyhow::anyhow!(e));
}
File::open(&pid_path)?
}
};
watcher.watch(&pid_path, RecursiveMode::NonRecursive)?;
let started_at = Instant::now();
let mut postmaster_pid_seen = false;
loop {
if let Ok(Some(status)) = pg.try_wait() {
// Postgres exited, that is not what we expected, bail out earlier.
@@ -353,31 +370,18 @@ pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> {
debug!("swallowing extra event: {res:?}");
}
// Check that we can open pid file first.
if let Ok(file) = File::open(&pid_path) {
if !postmaster_pid_seen {
debug!("postmaster.pid appeared");
watcher
.unwatch(pgdata)
.expect("Failed to remove pgdata dir watch");
watcher
.watch(&pid_path, RecursiveMode::NonRecursive)
.expect("Failed to add postmaster.pid file watch");
postmaster_pid_seen = true;
}
file.seek(std::io::SeekFrom::Start(0)).unwrap();
let reader = BufReader::new(&file);
let last_line = reader.lines().last();
let file = BufReader::new(file);
let last_line = file.lines().last();
// Pid file could be there and we could read it, but it could be empty, for example.
if let Some(Ok(line)) = last_line {
let status = line.trim();
debug!("last line of postmaster.pid: {status:?}");
// Pid file could be there and we could read it, but it could be empty, for example.
if let Some(Ok(line)) = last_line {
let status = line.trim();
debug!("last line of postmaster.pid: {status:?}");
// Now Postgres is ready to accept connections
if status == "ready" {
break;
}
// Now Postgres is ready to accept connections
if status == "ready" {
break;
}
}
@@ -406,7 +410,7 @@ pub fn create_pgdata(pgdata: &str) -> Result<()> {
/// Update pgbouncer.ini with provided options
fn update_pgbouncer_ini(
pgbouncer_config: HashMap<String, String>,
pgbouncer_config: IndexMap<String, String>,
pgbouncer_ini_path: &str,
) -> Result<()> {
let mut conf = Ini::load_from_file(pgbouncer_ini_path)?;
@@ -427,7 +431,10 @@ fn update_pgbouncer_ini(
/// Tune pgbouncer.
/// 1. Apply new config using pgbouncer admin console
/// 2. Add new values to pgbouncer.ini to preserve them after restart
pub async fn tune_pgbouncer(pgbouncer_config: HashMap<String, String>) -> Result<()> {
pub async fn tune_pgbouncer(
mut pgbouncer_config: IndexMap<String, String>,
tls_config: Option<TlsConfig>,
) -> Result<()> {
let pgbouncer_connstr = if std::env::var_os("AUTOSCALING").is_some() {
// for VMs use pgbouncer specific way to connect to
// pgbouncer admin console without password
@@ -473,19 +480,21 @@ pub async fn tune_pgbouncer(pgbouncer_config: HashMap<String, String>) -> Result
}
};
// Apply new config
for (option_name, value) in pgbouncer_config.iter() {
let query = format!("SET {}={}", option_name, value);
// keep this log line for debugging purposes
info!("Applying pgbouncer setting change: {}", query);
if let Some(tls_config) = tls_config {
// pgbouncer starts in a half-ok state if it cannot find these files.
// It will default to client_tls_sslmode=deny, which causes proxy to error.
// There is a small window at startup where these files don't yet exist in the VM.
// Best to wait until it exists.
loop {
if let Ok(true) = tokio::fs::try_exists(&tls_config.key_path).await {
break;
}
tokio::time::sleep(Duration::from_millis(500)).await
}
if let Err(err) = client.simple_query(&query).await {
// Don't fail on error, just print it into log
error!(
"Failed to apply pgbouncer setting change: {}, {}",
query, err
);
};
pgbouncer_config.insert("client_tls_cert_file".to_string(), tls_config.cert_path);
pgbouncer_config.insert("client_tls_key_file".to_string(), tls_config.key_path);
pgbouncer_config.insert("client_tls_sslmode".to_string(), "allow".to_string());
}
// save values to pgbouncer.ini
@@ -501,6 +510,13 @@ pub async fn tune_pgbouncer(pgbouncer_config: HashMap<String, String>) -> Result
};
update_pgbouncer_ini(pgbouncer_config, &pgbouncer_ini_path)?;
info!("Applying pgbouncer setting change");
if let Err(err) = client.simple_query("RELOAD").await {
// Don't fail on error, just print it into log
error!("Failed to apply pgbouncer setting change, {err}",);
};
Ok(())
}

118
compute_tools/src/tls.rs Normal file
View File

@@ -0,0 +1,118 @@
use std::{io::Write, os::unix::fs::OpenOptionsExt, path::Path, time::Duration};
use anyhow::{Context, Result, bail};
use compute_api::responses::TlsConfig;
use ring::digest;
use spki::ObjectIdentifier;
use spki::der::{Decode, PemReader};
use x509_cert::Certificate;
#[derive(Clone, Copy)]
pub struct CertDigest(digest::Digest);
pub async fn watch_cert_for_changes(cert_path: String) -> tokio::sync::watch::Receiver<CertDigest> {
let mut digest = compute_digest(&cert_path).await;
let (tx, rx) = tokio::sync::watch::channel(digest);
tokio::spawn(async move {
while !tx.is_closed() {
let new_digest = compute_digest(&cert_path).await;
if digest.0.as_ref() != new_digest.0.as_ref() {
digest = new_digest;
_ = tx.send(digest);
}
tokio::time::sleep(Duration::from_secs(60)).await
}
});
rx
}
async fn compute_digest(cert_path: &str) -> CertDigest {
loop {
match try_compute_digest(cert_path).await {
Ok(d) => break d,
Err(e) => {
tracing::error!("could not read cert file {e:?}");
tokio::time::sleep(Duration::from_secs(1)).await
}
}
}
}
async fn try_compute_digest(cert_path: &str) -> Result<CertDigest> {
let data = tokio::fs::read(cert_path).await?;
// sha256 is extremely collision resistent. can safely assume the digest to be unique
Ok(CertDigest(digest::digest(&digest::SHA256, &data)))
}
pub const SERVER_CRT: &str = "server.crt";
pub const SERVER_KEY: &str = "server.key";
pub fn update_key_path_blocking(pg_data: &Path, tls_config: &TlsConfig) {
loop {
match try_update_key_path_blocking(pg_data, tls_config) {
Ok(()) => break,
Err(e) => {
tracing::error!("could not create key file {e:?}");
std::thread::sleep(Duration::from_secs(1))
}
}
}
}
// Postgres requires the keypath be "secure". This means
// 1. Owned by the postgres user.
// 2. Have permission 600.
fn try_update_key_path_blocking(pg_data: &Path, tls_config: &TlsConfig) -> Result<()> {
let key = std::fs::read_to_string(&tls_config.key_path)?;
let crt = std::fs::read_to_string(&tls_config.cert_path)?;
// to mitigate a race condition during renewal.
verify_key_cert(&key, &crt)?;
let mut key_file = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.mode(0o600)
.open(pg_data.join(SERVER_KEY))?;
let mut crt_file = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.mode(0o600)
.open(pg_data.join(SERVER_CRT))?;
key_file.write_all(key.as_bytes())?;
crt_file.write_all(crt.as_bytes())?;
Ok(())
}
fn verify_key_cert(key: &str, cert: &str) -> Result<()> {
const ECDSA_WITH_SHA256: ObjectIdentifier = ObjectIdentifier::new_unwrap("1.2.840.10045.4.3.2");
let cert = Certificate::decode(&mut PemReader::new(cert.as_bytes()).context("pem reader")?)
.context("decode cert")?;
match cert.signature_algorithm.oid {
ECDSA_WITH_SHA256 => {
let key = p256::SecretKey::from_sec1_pem(key).context("parse key")?;
let a = key.public_key().to_sec1_bytes();
let b = cert
.tbs_certificate
.subject_public_key_info
.subject_public_key
.raw_bytes();
if *a != *b {
bail!("private key file does not match certificate")
}
}
_ => bail!("unknown TLS key type"),
}
Ok(())
}

View File

@@ -36,7 +36,9 @@ use pageserver_api::config::{
use pageserver_api::controller_api::{
NodeAvailabilityWrapper, PlacementPolicy, TenantCreateRequest,
};
use pageserver_api::models::{ShardParameters, TimelineCreateRequest, TimelineInfo};
use pageserver_api::models::{
ShardParameters, TenantConfigRequest, TimelineCreateRequest, TimelineInfo,
};
use pageserver_api::shard::{ShardCount, ShardStripeSize, TenantShardId};
use postgres_backend::AuthType;
use postgres_connection::parse_host_port;
@@ -977,7 +979,7 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
neon_distrib_dir: None,
default_tenant_id: TenantId::from_array(std::array::from_fn(|_| 0)),
storage_controller: None,
control_plane_compute_hook_api: None,
control_plane_hooks_api: None,
generate_local_ssl_certs: false,
}
};
@@ -1129,12 +1131,16 @@ async fn handle_tenant(subcmd: &TenantCmd, env: &mut local_env::LocalEnv) -> any
let tenant_id = get_tenant_id(args.tenant_id, env)?;
let tenant_conf: HashMap<_, _> =
args.config.iter().flat_map(|c| c.split_once(':')).collect();
let config = PageServerNode::parse_config(tenant_conf)?;
pageserver
.tenant_config(tenant_id, tenant_conf)
let req = TenantConfigRequest { tenant_id, config };
let storage_controller = StorageController::from_env(env);
storage_controller
.set_tenant_config(&req)
.await
.with_context(|| format!("Tenant config failed for tenant with id {tenant_id}"))?;
println!("tenant {tenant_id} successfully configured on the pageserver");
println!("tenant {tenant_id} successfully configured via storcon");
}
}
Ok(())

View File

@@ -72,9 +72,9 @@ pub struct LocalEnv {
// be propagated into each pageserver's configuration.
pub control_plane_api: Url,
// Control plane upcall API for storage controller. If set, this will be propagated into the
// Control plane upcall APIs for storage controller. If set, this will be propagated into the
// storage controller's configuration.
pub control_plane_compute_hook_api: Option<Url>,
pub control_plane_hooks_api: Option<Url>,
/// Keep human-readable aliases in memory (and persist them to config), to hide ZId hex strings from the user.
// A `HashMap<String, HashMap<TenantId, TimelineId>>` would be more appropriate here,
@@ -104,6 +104,7 @@ pub struct OnDiskConfig {
pub pageservers: Vec<PageServerConf>,
pub safekeepers: Vec<SafekeeperConf>,
pub control_plane_api: Option<Url>,
pub control_plane_hooks_api: Option<Url>,
pub control_plane_compute_hook_api: Option<Url>,
branch_name_mappings: HashMap<String, Vec<(TenantId, TimelineId)>>,
// Note: skip serializing because in compat tests old storage controller fails
@@ -136,7 +137,7 @@ pub struct NeonLocalInitConf {
pub pageservers: Vec<NeonLocalInitPageserverConf>,
pub safekeepers: Vec<SafekeeperConf>,
pub control_plane_api: Option<Url>,
pub control_plane_compute_hook_api: Option<Option<Url>>,
pub control_plane_hooks_api: Option<Url>,
pub generate_local_ssl_certs: bool,
}
@@ -573,7 +574,8 @@ impl LocalEnv {
pageservers,
safekeepers,
control_plane_api,
control_plane_compute_hook_api,
control_plane_hooks_api,
control_plane_compute_hook_api: _,
branch_name_mappings,
generate_local_ssl_certs,
} = on_disk_config;
@@ -588,7 +590,7 @@ impl LocalEnv {
pageservers,
safekeepers,
control_plane_api: control_plane_api.unwrap(),
control_plane_compute_hook_api,
control_plane_hooks_api,
branch_name_mappings,
generate_local_ssl_certs,
}
@@ -695,7 +697,8 @@ impl LocalEnv {
pageservers: vec![], // it's skip_serializing anyway
safekeepers: self.safekeepers.clone(),
control_plane_api: Some(self.control_plane_api.clone()),
control_plane_compute_hook_api: self.control_plane_compute_hook_api.clone(),
control_plane_hooks_api: self.control_plane_hooks_api.clone(),
control_plane_compute_hook_api: None,
branch_name_mappings: self.branch_name_mappings.clone(),
generate_local_ssl_certs: self.generate_local_ssl_certs,
},
@@ -779,8 +782,8 @@ impl LocalEnv {
pageservers,
safekeepers,
control_plane_api,
control_plane_compute_hook_api,
generate_local_ssl_certs,
control_plane_hooks_api,
} = conf;
// Find postgres binaries.
@@ -827,7 +830,7 @@ impl LocalEnv {
pageservers: pageservers.iter().map(Into::into).collect(),
safekeepers,
control_plane_api: control_plane_api.unwrap(),
control_plane_compute_hook_api: control_plane_compute_hook_api.unwrap_or_default(),
control_plane_hooks_api,
branch_name_mappings: Default::default(),
generate_local_ssl_certs,
};

View File

@@ -14,7 +14,7 @@ use pageserver_api::controller_api::{
NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest,
TenantCreateResponse, TenantLocateResponse,
};
use pageserver_api::models::{TimelineCreateRequest, TimelineInfo};
use pageserver_api::models::{TenantConfigRequest, TimelineCreateRequest, TimelineInfo};
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api::ResponseErrorMessageExt;
use postgres_backend::AuthType;
@@ -558,10 +558,8 @@ impl StorageController {
args.push(format!("--public-key=\"{public_key}\""));
}
if let Some(control_plane_compute_hook_api) = &self.env.control_plane_compute_hook_api {
args.push(format!(
"--compute-hook-url={control_plane_compute_hook_api}"
));
if let Some(control_plane_hooks_api) = &self.env.control_plane_hooks_api {
args.push(format!("--control-plane-url={control_plane_hooks_api}"));
}
if let Some(split_threshold) = self.config.split_threshold.as_ref() {
@@ -878,4 +876,9 @@ impl StorageController {
)
.await
}
pub async fn set_tenant_config(&self, req: &TenantConfigRequest) -> anyhow::Result<()> {
self.dispatch(Method::PUT, "v1/tenant/config".to_string(), Some(req))
.await
}
}

View File

@@ -14,8 +14,8 @@ use pageserver_api::controller_api::{
TenantShardMigrateRequest, TenantShardMigrateResponse,
};
use pageserver_api::models::{
EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary, ShardParameters,
TenantConfig, TenantConfigPatchRequest, TenantConfigRequest, TenantShardSplitRequest,
EvictionPolicy, EvictionPolicyLayerAccessThreshold, ShardParameters, TenantConfig,
TenantConfigPatchRequest, TenantConfigRequest, TenantShardSplitRequest,
TenantShardSplitResponse,
};
use pageserver_api::shard::{ShardStripeSize, TenantShardId};
@@ -158,12 +158,6 @@ enum Command {
#[arg(long)]
tenant_id: TenantId,
},
/// For a tenant which hasn't been onboarded to the storage controller yet, add it in secondary
/// mode so that it can warm up content on a pageserver.
TenantWarmup {
#[arg(long)]
tenant_id: TenantId,
},
TenantSetPreferredAz {
#[arg(long)]
tenant_id: TenantId,
@@ -871,94 +865,6 @@ async fn main() -> anyhow::Result<()> {
)
.await?;
}
Command::TenantWarmup { tenant_id } => {
let describe_response = storcon_client
.dispatch::<(), TenantDescribeResponse>(
Method::GET,
format!("control/v1/tenant/{tenant_id}"),
None,
)
.await;
match describe_response {
Ok(describe) => {
if matches!(describe.policy, PlacementPolicy::Secondary) {
// Fine: it's already known to controller in secondary mode: calling
// again to put it into secondary mode won't cause problems.
} else {
anyhow::bail!("Tenant already present with policy {:?}", describe.policy);
}
}
Err(mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, _)) => {
// Fine: this tenant isn't know to the storage controller yet.
}
Err(e) => {
// Unexpected API error
return Err(e.into());
}
}
vps_client
.location_config(
TenantShardId::unsharded(tenant_id),
pageserver_api::models::LocationConfig {
mode: pageserver_api::models::LocationConfigMode::Secondary,
generation: None,
secondary_conf: Some(LocationConfigSecondary { warm: true }),
shard_number: 0,
shard_count: 0,
shard_stripe_size: ShardParameters::DEFAULT_STRIPE_SIZE.0,
tenant_conf: TenantConfig::default(),
},
None,
true,
)
.await?;
let describe_response = storcon_client
.dispatch::<(), TenantDescribeResponse>(
Method::GET,
format!("control/v1/tenant/{tenant_id}"),
None,
)
.await?;
let secondary_ps_id = describe_response
.shards
.first()
.unwrap()
.node_secondary
.first()
.unwrap();
println!("Tenant {tenant_id} warming up on pageserver {secondary_ps_id}");
loop {
let (status, progress) = vps_client
.tenant_secondary_download(
TenantShardId::unsharded(tenant_id),
Some(Duration::from_secs(10)),
)
.await?;
println!(
"Progress: {}/{} layers, {}/{} bytes",
progress.layers_downloaded,
progress.layers_total,
progress.bytes_downloaded,
progress.bytes_total
);
match status {
StatusCode::OK => {
println!("Download complete");
break;
}
StatusCode::ACCEPTED => {
// Loop
}
_ => {
anyhow::bail!("Unexpected download status: {status}");
}
}
}
}
Command::TenantDrop { tenant_id, unclean } => {
if !unclean {
anyhow::bail!(

View File

@@ -101,15 +101,25 @@ changes such as a pageserver node becoming unavailable, or the tenant's shard co
postgres clients to handle such changes, the storage controller calls an API hook when a tenant's pageserver
location changes.
The hook is configured using the storage controller's `--compute-hook-url` CLI option. If the hook requires
JWT auth, the token may be provided with `--control-plane-jwt-token`. The hook will be invoked with a `PUT` request.
The hook is configured using the storage controller's `--control-plane-url` CLI option, from which the hook URL is computed.
In the Neon cloud service, this hook is implemented by Neon's internal cloud control plane. In `neon_local` systems
Currently, there is two hooks, each computed by appending the name to the provided control plane URL prefix:
- `notify-attach`, called whenever attachment for pageservers changes
- `notify-safekeepers`, called whenever attachment for safekeepers changes
If the hooks require JWT auth, the token may be provided with `--control-plane-jwt-token`.
The hooks will be invoked with a `PUT` request.
In the Neon cloud service, these hooks are implemented by Neon's internal cloud control plane. In `neon_local` systems,
the storage controller integrates directly with neon_local to reconfigure local postgres processes instead of calling
the compute hook.
When implementing an on-premise Neon deployment, you must implement a service that handles the compute hook. This is not complicated:
the request body has format of the `ComputeHookNotifyRequest` structure, provided below for convenience.
When implementing an on-premise Neon deployment, you must implement a service that handles the compute hooks. This is not complicated.
### `notify-attach` body
The `notify-attach` request body follows the format of the `ComputeHookNotifyRequest` structure, provided below for convenience.
```
struct ComputeHookNotifyRequestShard {
@@ -128,15 +138,15 @@ When a notification is received:
1. Modify postgres configuration for this tenant:
- set `neon.pageserver_connstr` to a comma-separated list of postgres connection strings to pageservers according to the `shards` list. The
- set `neon.pageserver_connstring` to a comma-separated list of postgres connection strings to pageservers according to the `shards` list. The
shards identified by `NodeId` must be converted to the address+port of the node.
- if stripe_size is not None, set `neon.stripe_size` to this value
- if stripe_size is not None, set `neon.shard_stripe_size` to this value
2. Send SIGHUP to postgres to reload configuration
3. Respond with 200 to the notification request. Do not return success if postgres was not updated: if an error is returned, the controller
will retry the notification until it succeeds..
### Example notification body
Example body:
```
{
@@ -148,3 +158,34 @@ When a notification is received:
],
}
```
### `notify-safekeepers` body
The `notify-safekeepers` request body forllows the format of the `SafekeepersNotifyRequest` structure, provided below for convenience.
```
pub struct SafekeeperInfo {
pub id: NodeId,
pub hostname: String,
}
pub struct SafekeepersNotifyRequest {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub generation: u32,
pub safekeepers: Vec<SafekeeperInfo>,
}
```
When a notification is received:
1. Modify postgres configuration for this tenant:
- set `neon.safekeeper_connstrings` to an array of postgres connection strings to safekeepers according to the `safekeepers` list. The
safekeepers identified by `NodeId` must be converted to the address+port of the respective safekeeper.
The hostname is provided for debugging purposes, so we reserve changes to how we pass it.
- set `neon.safekeepers_generation` to the provided `generation` value.
2. Send SIGHUP to postgres to reload configuration
3. Respond with 200 to the notification request. Do not return success if postgres was not updated: if an error is returned, the controller
will retry the notification until it succeeds..

View File

@@ -7,6 +7,7 @@ license.workspace = true
[dependencies]
anyhow.workspace = true
chrono.workspace = true
indexmap.workspace = true
jsonwebtoken.workspace = true
serde.workspace = true
serde_json.workspace = true

View File

@@ -139,6 +139,7 @@ pub struct ComputeCtlConfig {
/// Set of JSON web keys that the compute can use to authenticate
/// communication from the control plane.
pub jwks: JwkSet,
pub tls: Option<TlsConfig>,
}
impl Default for ComputeCtlConfig {
@@ -147,10 +148,17 @@ impl Default for ComputeCtlConfig {
jwks: JwkSet {
keys: Vec::default(),
},
tls: None,
}
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct TlsConfig {
pub key_path: String,
pub cert_path: String,
}
/// Response of the `/computes/{compute_id}/spec` control-plane API.
#[derive(Deserialize, Debug)]
pub struct ControlPlaneSpecResponse {

View File

@@ -5,12 +5,15 @@
//! and connect it to the storage nodes.
use std::collections::HashMap;
use indexmap::IndexMap;
use regex::Regex;
use remote_storage::RemotePath;
use serde::{Deserialize, Serialize};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use crate::responses::TlsConfig;
/// String type alias representing Postgres identifier and
/// intended to be used for DB / role names.
pub type PgIdent = String;
@@ -125,7 +128,7 @@ pub struct ComputeSpec {
// information about available remote extensions
pub remote_extensions: Option<RemoteExtSpec>,
pub pgbouncer_settings: Option<HashMap<String, String>>,
pub pgbouncer_settings: Option<IndexMap<String, String>>,
// Stripe size for pageserver sharding, in pages
#[serde(default)]
@@ -272,18 +275,6 @@ pub enum ComputeMode {
Replica,
}
impl ComputeMode {
/// Convert the compute mode to a string that can be used to identify the type of compute,
/// which means that if it's a static compute, the LSN will not be included.
pub fn to_type_str(&self) -> &'static str {
match self {
ComputeMode::Primary => "primary",
ComputeMode::Static(_) => "static",
ComputeMode::Replica => "replica",
}
}
}
/// Log level for audit logging
/// Disabled, log, hipaa
/// Default is Disabled
@@ -369,6 +360,9 @@ pub struct LocalProxySpec {
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub jwks: Option<Vec<JwksSettings>>,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub tls: Option<TlsConfig>,
}
#[derive(Clone, Debug, Deserialize, Serialize)]

View File

@@ -272,15 +272,16 @@ pub struct TenantConfigToml {
/// size exceeds `compaction_upper_limit * checkpoint_distance`.
pub compaction_upper_limit: usize,
pub compaction_algorithm: crate::models::CompactionAlgorithmSettings,
/// If true, compact down L0 across all tenant timelines before doing regular compaction.
/// If true, compact down L0 across all tenant timelines before doing regular compaction. L0
/// compaction must be responsive to avoid read amp during heavy ingestion. Defaults to true.
pub compaction_l0_first: bool,
/// If true, use a separate semaphore (i.e. concurrency limit) for the L0 compaction pass. Only
/// has an effect if `compaction_l0_first` is `true`.
/// has an effect if `compaction_l0_first` is true. Defaults to true.
pub compaction_l0_semaphore: bool,
/// Level0 delta layer threshold at which to delay layer flushes for compaction backpressure,
/// such that they take 2x as long, and start waiting for layer flushes during ephemeral layer
/// rolls. This helps compaction keep up with WAL ingestion, and avoids read amplification
/// blowing up. Should be >compaction_threshold. 0 to disable. Disabled by default.
/// Level0 delta layer threshold at which to delay layer flushes such that they take 2x as long,
/// and block on layer flushes during ephemeral layer rolls, for compaction backpressure. This
/// helps compaction keep up with WAL ingestion, and avoids read amplification blowing up.
/// Should be >compaction_threshold. 0 to disable. Defaults to 3x compaction_threshold.
pub l0_flush_delay_threshold: Option<usize>,
/// Level0 delta layer threshold at which to stall layer flushes. Must be >compaction_threshold
/// to avoid deadlock. 0 to disable. Disabled by default.
@@ -567,7 +568,9 @@ pub mod tenant_conf_defaults {
// be reduced later by optimizing L0 hole calculation to avoid loading all keys into memory). So
// with this config, we can get a maximum peak compaction usage of 9 GB.
pub const DEFAULT_COMPACTION_UPPER_LIMIT: usize = 20;
pub const DEFAULT_COMPACTION_L0_FIRST: bool = false;
// Enable L0 compaction pass and semaphore by default. L0 compaction must be responsive to avoid
// read amp.
pub const DEFAULT_COMPACTION_L0_FIRST: bool = true;
pub const DEFAULT_COMPACTION_L0_SEMAPHORE: bool = true;
pub const DEFAULT_COMPACTION_ALGORITHM: crate::models::CompactionAlgorithm =
@@ -584,9 +587,8 @@ pub mod tenant_conf_defaults {
pub const DEFAULT_GC_PERIOD: &str = "1 hr";
pub const DEFAULT_IMAGE_CREATION_THRESHOLD: usize = 3;
// If there are more than threshold * compaction_threshold (that is 3 * 10 in the default config) L0 layers, image
// layer creation will end immediately. Set to 0 to disable. The target default will be 3 once we
// want to enable this feature.
pub const DEFAULT_IMAGE_CREATION_PREEMPT_THRESHOLD: usize = 0;
// layer creation will end immediately. Set to 0 to disable.
pub const DEFAULT_IMAGE_CREATION_PREEMPT_THRESHOLD: usize = 3;
pub const DEFAULT_PITR_INTERVAL: &str = "7 days";
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "10 seconds";
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds";

View File

@@ -1225,9 +1225,10 @@ pub struct TimelineInfo {
pub last_record_lsn: Lsn,
pub prev_record_lsn: Option<Lsn>,
/// Legacy field for compat with control plane. Synonym of `min_readable_lsn`.
/// TODO: remove once control plane no longer reads it.
pub latest_gc_cutoff_lsn: Lsn,
/// Legacy field, retained for one version to enable old storage controller to
/// decode (it was a mandatory field).
#[serde(default, rename = "latest_gc_cutoff_lsn")]
pub _unused: Lsn,
/// The LSN up to which GC has advanced: older data may still exist but it is not available for clients.
/// This LSN is not suitable for deciding where to create branches etc: use [`TimelineInfo::min_readable_lsn`] instead,

View File

@@ -112,6 +112,16 @@ impl ShardIdentity {
}
}
/// An unsharded identity with the given stripe size (if non-zero). This is typically used to
/// carry over a stripe size for an unsharded tenant from persistent storage.
pub fn unsharded_with_stripe_size(stripe_size: ShardStripeSize) -> Self {
let mut shard_identity = Self::unsharded();
if stripe_size.0 > 0 {
shard_identity.stripe_size = stripe_size;
}
shard_identity
}
/// A broken instance of this type is only used for `TenantState::Broken` tenants,
/// which are constructed in code paths that don't have access to proper configuration.
///

View File

@@ -135,8 +135,8 @@ impl Type {
pub enum Kind {
/// A simple type like `VARCHAR` or `INTEGER`.
Simple,
/// An enumerated type along with its variants.
Enum(Vec<String>),
/// An enumerated type.
Enum,
/// A pseudo-type.
Pseudo,
/// An array type along with the type of its elements.
@@ -146,9 +146,9 @@ pub enum Kind {
/// A multirange type along with the type of its elements.
Multirange(Type),
/// A domain type along with its underlying type.
Domain(Type),
/// A composite type along with information about its fields.
Composite(Vec<Field>),
Domain(Oid),
/// A composite type.
Composite(Oid),
}
/// Information about a field of a composite type.

View File

@@ -19,10 +19,10 @@ use crate::config::{Host, SslMode};
use crate::connection::{Request, RequestMessages};
use crate::query::RowStream;
use crate::simple_query::SimpleQueryStream;
use crate::types::{Oid, ToSql, Type};
use crate::types::{Oid, Type};
use crate::{
CancelToken, Error, ReadyForQueryStatus, Row, SimpleQueryMessage, Statement, Transaction,
TransactionBuilder, query, simple_query, slice_iter,
CancelToken, Error, ReadyForQueryStatus, SimpleQueryMessage, Statement, Transaction,
TransactionBuilder, query, simple_query,
};
pub struct Responses {
@@ -54,26 +54,18 @@ impl Responses {
/// A cache of type info and prepared statements for fetching type info
/// (corresponding to the queries in the [crate::prepare] module).
#[derive(Default)]
struct CachedTypeInfo {
pub(crate) struct CachedTypeInfo {
/// A statement for basic information for a type from its
/// OID. Corresponds to [TYPEINFO_QUERY](crate::prepare::TYPEINFO_QUERY) (or its
/// fallback).
typeinfo: Option<Statement>,
/// A statement for getting information for a composite type from its OID.
/// Corresponds to [TYPEINFO_QUERY](crate::prepare::TYPEINFO_COMPOSITE_QUERY).
typeinfo_composite: Option<Statement>,
/// A statement for getting information for a composite type from its OID.
/// Corresponds to [TYPEINFO_QUERY](crate::prepare::TYPEINFO_COMPOSITE_QUERY) (or
/// its fallback).
typeinfo_enum: Option<Statement>,
pub(crate) typeinfo: Option<Statement>,
/// Cache of types already looked up.
types: HashMap<Oid, Type>,
pub(crate) types: HashMap<Oid, Type>,
}
pub struct InnerClient {
sender: mpsc::UnboundedSender<Request>,
cached_typeinfo: Mutex<CachedTypeInfo>,
/// A buffer to use when writing out postgres commands.
buffer: Mutex<BytesMut>,
@@ -91,38 +83,6 @@ impl InnerClient {
})
}
pub fn typeinfo(&self) -> Option<Statement> {
self.cached_typeinfo.lock().typeinfo.clone()
}
pub fn set_typeinfo(&self, statement: &Statement) {
self.cached_typeinfo.lock().typeinfo = Some(statement.clone());
}
pub fn typeinfo_composite(&self) -> Option<Statement> {
self.cached_typeinfo.lock().typeinfo_composite.clone()
}
pub fn set_typeinfo_composite(&self, statement: &Statement) {
self.cached_typeinfo.lock().typeinfo_composite = Some(statement.clone());
}
pub fn typeinfo_enum(&self) -> Option<Statement> {
self.cached_typeinfo.lock().typeinfo_enum.clone()
}
pub fn set_typeinfo_enum(&self, statement: &Statement) {
self.cached_typeinfo.lock().typeinfo_enum = Some(statement.clone());
}
pub fn type_(&self, oid: Oid) -> Option<Type> {
self.cached_typeinfo.lock().types.get(&oid).cloned()
}
pub fn set_type(&self, oid: Oid, type_: &Type) {
self.cached_typeinfo.lock().types.insert(oid, type_.clone());
}
/// Call the given function with a buffer to be used when writing out
/// postgres commands.
pub fn with_buf<F, R>(&self, f: F) -> R
@@ -142,7 +102,6 @@ pub struct SocketConfig {
pub host: Host,
pub port: u16,
pub connect_timeout: Option<Duration>,
// pub keepalive: Option<KeepaliveConfig>,
}
/// An asynchronous PostgreSQL client.
@@ -151,6 +110,7 @@ pub struct SocketConfig {
/// through this client object.
pub struct Client {
inner: Arc<InnerClient>,
cached_typeinfo: CachedTypeInfo,
socket_config: SocketConfig,
ssl_mode: SslMode,
@@ -169,9 +129,9 @@ impl Client {
Client {
inner: Arc::new(InnerClient {
sender,
cached_typeinfo: Default::default(),
buffer: Default::default(),
}),
cached_typeinfo: Default::default(),
socket_config,
ssl_mode,
@@ -189,55 +149,6 @@ impl Client {
&self.inner
}
/// Executes a statement, returning a vector of the resulting rows.
///
/// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
/// provided, 1-indexed.
///
/// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
/// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
/// with the `prepare` method.
///
/// # Panics
///
/// Panics if the number of parameters provided does not match the number expected.
pub async fn query(
&self,
statement: Statement,
params: &[&(dyn ToSql + Sync)],
) -> Result<Vec<Row>, Error> {
self.query_raw(statement, slice_iter(params))
.await?
.try_collect()
.await
}
/// The maximally flexible version of [`query`].
///
/// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
/// provided, 1-indexed.
///
/// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
/// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
/// with the `prepare` method.
///
/// # Panics
///
/// Panics if the number of parameters provided does not match the number expected.
///
/// [`query`]: #method.query
pub async fn query_raw<'a, I>(
&self,
statement: Statement,
params: I,
) -> Result<RowStream, Error>
where
I: IntoIterator<Item = &'a (dyn ToSql + Sync)>,
I::IntoIter: ExactSizeIterator,
{
query::query(&self.inner, statement, params).await
}
/// Pass text directly to the Postgres backend to allow it to sort out typing itself and
/// to save a roundtrip
pub async fn query_raw_txt<S, I>(&self, statement: &str, params: I) -> Result<RowStream, Error>
@@ -284,14 +195,10 @@ impl Client {
simple_query::batch_execute(self.inner(), query).await
}
pub async fn discard_all(&self) -> Result<ReadyForQueryStatus, Error> {
pub async fn discard_all(&mut self) -> Result<ReadyForQueryStatus, Error> {
// clear the prepared statements that are about to be nuked from the postgres session
{
let mut typeinfo = self.inner.cached_typeinfo.lock();
typeinfo.typeinfo = None;
typeinfo.typeinfo_composite = None;
typeinfo.typeinfo_enum = None;
}
self.cached_typeinfo.typeinfo = None;
self.batch_execute("discard all").await
}
@@ -359,8 +266,8 @@ impl Client {
}
/// Query for type information
pub async fn get_type(&self, oid: Oid) -> Result<Type, Error> {
crate::prepare::get_type(&self.inner, oid).await
pub(crate) async fn get_type_inner(&mut self, oid: Oid) -> Result<Type, Error> {
crate::prepare::get_type(&self.inner, &mut self.cached_typeinfo, oid).await
}
/// Determines if the connection to the server has already closed.

View File

@@ -22,7 +22,7 @@ pub trait GenericClient: private::Sealed {
I::IntoIter: ExactSizeIterator + Sync + Send;
/// Query for type information
async fn get_type(&self, oid: Oid) -> Result<Type, Error>;
async fn get_type(&mut self, oid: Oid) -> Result<Type, Error>;
}
impl private::Sealed for Client {}
@@ -38,8 +38,8 @@ impl GenericClient for Client {
}
/// Query for type information
async fn get_type(&self, oid: Oid) -> Result<Type, Error> {
crate::prepare::get_type(self.inner(), oid).await
async fn get_type(&mut self, oid: Oid) -> Result<Type, Error> {
self.get_type_inner(oid).await
}
}
@@ -56,7 +56,7 @@ impl GenericClient for Transaction<'_> {
}
/// Query for type information
async fn get_type(&self, oid: Oid) -> Result<Type, Error> {
self.client().get_type(oid).await
async fn get_type(&mut self, oid: Oid) -> Result<Type, Error> {
self.client_mut().get_type(oid).await
}
}

View File

@@ -9,10 +9,10 @@ use log::debug;
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use crate::client::InnerClient;
use crate::client::{CachedTypeInfo, InnerClient};
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::types::{Field, Kind, Oid, Type};
use crate::types::{Kind, Oid, Type};
use crate::{Column, Error, Statement, query, slice_iter};
pub(crate) const TYPEINFO_QUERY: &str = "\
@@ -23,23 +23,7 @@ INNER JOIN pg_catalog.pg_namespace n ON t.typnamespace = n.oid
WHERE t.oid = $1
";
const TYPEINFO_ENUM_QUERY: &str = "\
SELECT enumlabel
FROM pg_catalog.pg_enum
WHERE enumtypid = $1
ORDER BY enumsortorder
";
pub(crate) const TYPEINFO_COMPOSITE_QUERY: &str = "\
SELECT attname, atttypid
FROM pg_catalog.pg_attribute
WHERE attrelid = $1
AND NOT attisdropped
AND attnum > 0
ORDER BY attnum
";
pub async fn prepare(
async fn prepare_typecheck(
client: &Arc<InnerClient>,
name: &'static str,
query: &str,
@@ -67,7 +51,7 @@ pub async fn prepare(
let mut parameters = vec![];
let mut it = parameter_description.parameters();
while let Some(oid) = it.next().map_err(Error::parse)? {
let type_ = get_type(client, oid).await?;
let type_ = Type::from_oid(oid).ok_or_else(Error::unexpected_message)?;
parameters.push(type_);
}
@@ -75,7 +59,7 @@ pub async fn prepare(
if let Some(row_description) = row_description {
let mut it = row_description.fields();
while let Some(field) = it.next().map_err(Error::parse)? {
let type_ = get_type(client, field.type_oid()).await?;
let type_ = Type::from_oid(field.type_oid()).ok_or_else(Error::unexpected_message)?;
let column = Column::new(field.name().to_string(), type_, field);
columns.push(column);
}
@@ -84,15 +68,6 @@ pub async fn prepare(
Ok(Statement::new(client, name, parameters, columns))
}
fn prepare_rec<'a>(
client: &'a Arc<InnerClient>,
name: &'static str,
query: &'a str,
types: &'a [Type],
) -> Pin<Box<dyn Future<Output = Result<Statement, Error>> + 'a + Send>> {
Box::pin(prepare(client, name, query, types))
}
fn encode(client: &InnerClient, name: &str, query: &str, types: &[Type]) -> Result<Bytes, Error> {
if types.is_empty() {
debug!("preparing query {}: {}", name, query);
@@ -108,16 +83,20 @@ fn encode(client: &InnerClient, name: &str, query: &str, types: &[Type]) -> Resu
})
}
pub async fn get_type(client: &Arc<InnerClient>, oid: Oid) -> Result<Type, Error> {
pub async fn get_type(
client: &Arc<InnerClient>,
typecache: &mut CachedTypeInfo,
oid: Oid,
) -> Result<Type, Error> {
if let Some(type_) = Type::from_oid(oid) {
return Ok(type_);
}
if let Some(type_) = client.type_(oid) {
return Ok(type_);
}
if let Some(type_) = typecache.types.get(&oid) {
return Ok(type_.clone());
};
let stmt = typeinfo_statement(client).await?;
let stmt = typeinfo_statement(client, typecache).await?;
let rows = query::query(client, stmt, slice_iter(&[&oid])).await?;
pin_mut!(rows);
@@ -136,100 +115,48 @@ pub async fn get_type(client: &Arc<InnerClient>, oid: Oid) -> Result<Type, Error
let relid: Oid = row.try_get(6)?;
let kind = if type_ == b'e' as i8 {
let variants = get_enum_variants(client, oid).await?;
Kind::Enum(variants)
Kind::Enum
} else if type_ == b'p' as i8 {
Kind::Pseudo
} else if basetype != 0 {
let type_ = get_type_rec(client, basetype).await?;
Kind::Domain(type_)
Kind::Domain(basetype)
} else if elem_oid != 0 {
let type_ = get_type_rec(client, elem_oid).await?;
let type_ = get_type_rec(client, typecache, elem_oid).await?;
Kind::Array(type_)
} else if relid != 0 {
let fields = get_composite_fields(client, relid).await?;
Kind::Composite(fields)
Kind::Composite(relid)
} else if let Some(rngsubtype) = rngsubtype {
let type_ = get_type_rec(client, rngsubtype).await?;
let type_ = get_type_rec(client, typecache, rngsubtype).await?;
Kind::Range(type_)
} else {
Kind::Simple
};
let type_ = Type::new(name, oid, kind, schema);
client.set_type(oid, &type_);
typecache.types.insert(oid, type_.clone());
Ok(type_)
}
fn get_type_rec<'a>(
client: &'a Arc<InnerClient>,
typecache: &'a mut CachedTypeInfo,
oid: Oid,
) -> Pin<Box<dyn Future<Output = Result<Type, Error>> + Send + 'a>> {
Box::pin(get_type(client, oid))
Box::pin(get_type(client, typecache, oid))
}
async fn typeinfo_statement(client: &Arc<InnerClient>) -> Result<Statement, Error> {
if let Some(stmt) = client.typeinfo() {
return Ok(stmt);
async fn typeinfo_statement(
client: &Arc<InnerClient>,
typecache: &mut CachedTypeInfo,
) -> Result<Statement, Error> {
if let Some(stmt) = &typecache.typeinfo {
return Ok(stmt.clone());
}
let typeinfo = "neon_proxy_typeinfo";
let stmt = prepare_rec(client, typeinfo, TYPEINFO_QUERY, &[]).await?;
let stmt = prepare_typecheck(client, typeinfo, TYPEINFO_QUERY, &[]).await?;
client.set_typeinfo(&stmt);
Ok(stmt)
}
async fn get_enum_variants(client: &Arc<InnerClient>, oid: Oid) -> Result<Vec<String>, Error> {
let stmt = typeinfo_enum_statement(client).await?;
query::query(client, stmt, slice_iter(&[&oid]))
.await?
.and_then(|row| async move { row.try_get(0) })
.try_collect()
.await
}
async fn typeinfo_enum_statement(client: &Arc<InnerClient>) -> Result<Statement, Error> {
if let Some(stmt) = client.typeinfo_enum() {
return Ok(stmt);
}
let typeinfo = "neon_proxy_typeinfo_enum";
let stmt = prepare_rec(client, typeinfo, TYPEINFO_ENUM_QUERY, &[]).await?;
client.set_typeinfo_enum(&stmt);
Ok(stmt)
}
async fn get_composite_fields(client: &Arc<InnerClient>, oid: Oid) -> Result<Vec<Field>, Error> {
let stmt = typeinfo_composite_statement(client).await?;
let rows = query::query(client, stmt, slice_iter(&[&oid]))
.await?
.try_collect::<Vec<_>>()
.await?;
let mut fields = vec![];
for row in rows {
let name = row.try_get(0)?;
let oid = row.try_get(1)?;
let type_ = get_type_rec(client, oid).await?;
fields.push(Field::new(name, type_));
}
Ok(fields)
}
async fn typeinfo_composite_statement(client: &Arc<InnerClient>) -> Result<Statement, Error> {
if let Some(stmt) = client.typeinfo_composite() {
return Ok(stmt);
}
let typeinfo = "neon_proxy_typeinfo_composite";
let stmt = prepare_rec(client, typeinfo, TYPEINFO_COMPOSITE_QUERY, &[]).await?;
client.set_typeinfo_composite(&stmt);
typecache.typeinfo = Some(stmt.clone());
Ok(stmt)
}

View File

@@ -72,4 +72,9 @@ impl<'a> Transaction<'a> {
pub fn client(&self) -> &Client {
self.client
}
/// Returns a reference to the underlying `Client`.
pub fn client_mut(&mut self) -> &mut Client {
self.client
}
}

View File

@@ -21,7 +21,7 @@
//! .with_writer(std::io::stderr);
//!
//! // Initialize OpenTelemetry. Exports tracing spans as OpenTelemetry traces
//! let otlp_layer = tracing_utils::init_tracing("my_application").await;
//! let otlp_layer = tracing_utils::init_tracing("my_application", tracing_utils::ExportConfig::default()).await;
//!
//! // Put it all together
//! tracing_subscriber::registry()
@@ -38,8 +38,12 @@ pub mod http;
use opentelemetry::KeyValue;
use opentelemetry::trace::TracerProvider;
use tracing::Subscriber;
use opentelemetry_otlp::WithExportConfig;
pub use opentelemetry_otlp::{ExportConfig, Protocol};
use tracing::level_filters::LevelFilter;
use tracing::{Dispatch, Subscriber};
use tracing_subscriber::Layer;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::registry::LookupSpan;
/// Set up OpenTelemetry exporter, using configuration from environment variables.
@@ -69,19 +73,28 @@ use tracing_subscriber::registry::LookupSpan;
///
/// This doesn't block, but is marked as 'async' to hint that this must be called in
/// asynchronous execution context.
pub async fn init_tracing<S>(service_name: &str) -> Option<impl Layer<S>>
pub async fn init_tracing<S>(
service_name: &str,
export_config: ExportConfig,
) -> Option<impl Layer<S>>
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
if std::env::var("OTEL_SDK_DISABLED") == Ok("true".to_string()) {
return None;
};
Some(init_tracing_internal(service_name.to_string()))
Some(init_tracing_internal(
service_name.to_string(),
export_config,
))
}
/// Like `init_tracing`, but creates a separate tokio Runtime for the tracing
/// tasks.
pub fn init_tracing_without_runtime<S>(service_name: &str) -> Option<impl Layer<S>>
pub fn init_tracing_without_runtime<S>(
service_name: &str,
export_config: ExportConfig,
) -> Option<impl Layer<S>>
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
@@ -112,16 +125,22 @@ where
));
let _guard = runtime.enter();
Some(init_tracing_internal(service_name.to_string()))
Some(init_tracing_internal(
service_name.to_string(),
export_config,
))
}
fn init_tracing_internal<S>(service_name: String) -> impl Layer<S>
fn init_tracing_internal<S>(service_name: String, export_config: ExportConfig) -> impl Layer<S>
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
// Sets up exporter from the OTEL_EXPORTER_* environment variables.
// Sets up exporter from the provided [`ExportConfig`] parameter.
// If the endpoint is not specified, it is loaded from the
// OTEL_EXPORTER_OTLP_ENDPOINT environment variable.
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_http()
.with_export_config(export_config)
.build()
.expect("could not initialize opentelemetry exporter");
@@ -151,3 +170,51 @@ where
pub fn shutdown_tracing() {
opentelemetry::global::shutdown_tracer_provider();
}
pub enum OtelEnablement {
Disabled,
Enabled {
service_name: String,
export_config: ExportConfig,
runtime: &'static tokio::runtime::Runtime,
},
}
pub struct OtelGuard {
pub dispatch: Dispatch,
}
impl Drop for OtelGuard {
fn drop(&mut self) {
shutdown_tracing();
}
}
/// Initializes OTEL infrastructure for performance tracing according to the provided configuration
///
/// Performance tracing is handled by a different [`tracing::Subscriber`]. This functions returns
/// an [`OtelGuard`] containing a [`tracing::Dispatch`] associated with a newly created subscriber.
/// Applications should use this dispatch for their performance traces.
///
/// The lifetime of the guard should match taht of the application. On drop, it tears down the
/// OTEL infra.
pub fn init_performance_tracing(otel_enablement: OtelEnablement) -> Option<OtelGuard> {
let otel_subscriber = match otel_enablement {
OtelEnablement::Disabled => None,
OtelEnablement::Enabled {
service_name,
export_config,
runtime,
} => {
let otel_layer = runtime
.block_on(init_tracing(&service_name, export_config))
.with_filter(LevelFilter::INFO);
let otel_subscriber = tracing_subscriber::registry().with(otel_layer);
let otel_dispatch = Dispatch::new(otel_subscriber);
Some(otel_dispatch)
}
};
otel_subscriber.map(|dispatch| OtelGuard { dispatch })
}

View File

@@ -42,6 +42,7 @@ toml_edit = { workspace = true, features = ["serde"] }
tracing.workspace = true
tracing-error.workspace = true
tracing-subscriber = { workspace = true, features = ["json", "registry"] }
tracing-utils.workspace = true
rand.workspace = true
scopeguard.workspace = true
strum.workspace = true

View File

@@ -49,7 +49,13 @@ pub fn bench_log_slow(c: &mut Criterion) {
// performance too. Use a simple noop future that yields once, to avoid any scheduler fast
// paths for a ready future.
if enabled {
b.iter(|| runtime.block_on(log_slow("ready", THRESHOLD, tokio::task::yield_now())));
b.iter(|| {
runtime.block_on(log_slow(
"ready",
THRESHOLD,
std::pin::pin!(tokio::task::yield_now()),
))
});
} else {
b.iter(|| runtime.block_on(tokio::task::yield_now()));
}

View File

@@ -165,6 +165,7 @@ pub fn init(
};
log_layer.with_filter(rust_log_env_filter())
});
let r = r.with(
TracingEventCountLayer(&TRACING_EVENT_COUNT_METRIC).with_filter(rust_log_env_filter()),
);
@@ -330,37 +331,90 @@ impl std::fmt::Debug for SecretString {
///
/// TODO: consider upgrading this to a warning, but currently it fires too often.
#[inline]
pub async fn log_slow<O>(name: &str, threshold: Duration, f: impl Future<Output = O>) -> O {
// TODO: we unfortunately have to pin the future on the heap, since GetPage futures are huge and
// won't fit on the stack.
let mut f = Box::pin(f);
pub async fn log_slow<F, O>(name: &str, threshold: Duration, f: std::pin::Pin<&mut F>) -> O
where
F: Future<Output = O>,
{
monitor_slow_future(
threshold,
threshold, // period = threshold
f,
|MonitorSlowFutureCallback {
ready,
is_slow,
elapsed_total,
elapsed_since_last_callback: _,
}| {
if !is_slow {
return;
}
if ready {
info!(
"slow {name} completed after {:.3}s",
elapsed_total.as_secs_f64()
);
} else {
info!(
"slow {name} still running after {:.3}s",
elapsed_total.as_secs_f64()
);
}
},
)
.await
}
/// Poll future `fut` to completion, invoking callback `cb` at the given `threshold` and every
/// `period` afterwards, and also unconditionally when the future completes.
#[inline]
pub async fn monitor_slow_future<F, O>(
threshold: Duration,
period: Duration,
mut fut: std::pin::Pin<&mut F>,
mut cb: impl FnMut(MonitorSlowFutureCallback),
) -> O
where
F: Future<Output = O>,
{
let started = Instant::now();
let mut attempt = 1;
let mut last_cb = started;
loop {
// NB: use timeout_at() instead of timeout() to avoid an extra clock reading in the common
// case where the timeout doesn't fire.
let deadline = started + attempt * threshold;
if let Ok(output) = tokio::time::timeout_at(deadline, &mut f).await {
// NB: we check if we exceeded the threshold even if the timeout never fired, because
// scheduling or execution delays may cause the future to succeed even if it exceeds the
// timeout. This costs an extra unconditional clock reading, but seems worth it to avoid
// false negatives.
let elapsed = started.elapsed();
if elapsed >= threshold {
info!("slow {name} completed after {:.3}s", elapsed.as_secs_f64());
}
let deadline = started + threshold + (attempt - 1) * period;
// TODO: still call the callback if the future panics? Copy how we do it for the page_service flush_in_progress counter.
let res = tokio::time::timeout_at(deadline, &mut fut).await;
let now = Instant::now();
let elapsed_total = now - started;
cb(MonitorSlowFutureCallback {
ready: res.is_ok(),
is_slow: elapsed_total >= threshold,
elapsed_total,
elapsed_since_last_callback: now - last_cb,
});
last_cb = now;
if let Ok(output) = res {
return output;
}
let elapsed = started.elapsed().as_secs_f64();
info!("slow {name} still running after {elapsed:.3}s",);
attempt += 1;
}
}
/// See [`monitor_slow_future`].
pub struct MonitorSlowFutureCallback {
/// Whether the future completed. If true, there will be no more callbacks.
pub ready: bool,
/// Whether the future is taking `>=` the specififed threshold duration to complete.
/// Monotonic: if true in one callback invocation, true in all subsequent onces.
pub is_slow: bool,
/// The time elapsed since the [`monitor_slow_future`] was first polled.
pub elapsed_total: Duration,
/// The time elapsed since the last callback invocation.
/// For the initial callback invocation, the time elapsed since the [`monitor_slow_future`] was first polled.
pub elapsed_since_last_callback: Duration,
}
#[cfg(test)]
mod tests {
use metrics::IntCounterVec;

View File

@@ -70,6 +70,7 @@ tokio-stream.workspace = true
tokio-util.workspace = true
toml_edit = { workspace = true, features = [ "serde" ] }
tracing.workspace = true
tracing-utils.workspace = true
url.workspace = true
walkdir.workspace = true
metrics.workspace = true

View File

@@ -12,7 +12,7 @@ pub(crate) fn setup_logging() {
logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
logging::Output::Stdout,
)
.expect("Failed to init test logging")
.expect("Failed to init test logging");
});
}

View File

@@ -111,6 +111,7 @@ fn main() -> anyhow::Result<()> {
} else {
TracingErrorLayerEnablement::Disabled
};
logging::init(
conf.log_format,
tracing_error_layer_enablement,

View File

@@ -1079,7 +1079,6 @@ components:
- last_record_lsn
- disk_consistent_lsn
- state
- latest_gc_cutoff_lsn
properties:
timeline_id:
type: string
@@ -1123,9 +1122,6 @@ components:
min_readable_lsn:
type: string
format: hex
latest_gc_cutoff_lsn:
type: string
format: hex
applied_gc_cutoff_lsn:
type: string
format: hex

View File

@@ -72,6 +72,7 @@ use crate::tenant::remote_timeline_client::{
use crate::tenant::secondary::SecondaryController;
use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::{IoConcurrency, LayerAccessStatsReset, LayerName};
use crate::tenant::timeline::detach_ancestor::DetachBehavior;
use crate::tenant::timeline::offload::{OffloadError, offload_timeline};
use crate::tenant::timeline::{
CompactFlags, CompactOptions, CompactRequest, CompactionError, Timeline, WaitLsnTimeout,
@@ -460,10 +461,7 @@ async fn build_timeline_info_common(
initdb_lsn,
last_record_lsn,
prev_record_lsn: Some(timeline.get_prev_record_lsn()),
// Externally, expose the lowest LSN that can be used to create a branch as the "GC cutoff", although internally
// we distinguish between the "planned" GC cutoff (PITR point) and the "latest" GC cutoff (where we
// actually trimmed data to), which can pass each other when PITR is changed.
latest_gc_cutoff_lsn: min_readable_lsn,
_unused: Default::default(), // Unused, for legacy decode only
min_readable_lsn,
applied_gc_cutoff_lsn: *timeline.get_applied_gc_cutoff_lsn(),
current_logical_size: current_logical_size.size_dont_care_about_accuracy(),
@@ -2394,6 +2392,7 @@ async fn timeline_checkpoint_handler(
let state = get_state(&request);
let mut flags = EnumSet::empty();
flags |= CompactFlags::NoYield; // run compaction to completion
if Some(true) == parse_query_param::<_, bool>(&request, "force_l0_compaction")? {
flags |= CompactFlags::ForceL0Compaction;
}
@@ -2508,6 +2507,8 @@ async fn timeline_detach_ancestor_handler(
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let behavior: Option<DetachBehavior> = parse_query_param(&request, "detach_behavior")?;
let behavior = behavior.unwrap_or_default();
let span = tracing::info_span!("detach_ancestor", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id);
@@ -2557,7 +2558,7 @@ async fn timeline_detach_ancestor_handler(
let ctx = &ctx.with_scope_timeline(&timeline);
let progress = timeline
.prepare_to_detach_from_ancestor(&tenant, options, ctx)
.prepare_to_detach_from_ancestor(&tenant, options, behavior, ctx)
.await?;
// uncomment to allow early as possible Tenant::drop
@@ -2572,6 +2573,7 @@ async fn timeline_detach_ancestor_handler(
tenant_shard_id,
timeline_id,
prepared,
behavior,
attempt,
ctx,
)

View File

@@ -465,12 +465,40 @@ pub(crate) fn page_cache_errors_inc(error_kind: PageCacheErrorKind) {
pub(crate) static WAIT_LSN_TIME: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_wait_lsn_seconds",
"Time spent waiting for WAL to arrive",
"Time spent waiting for WAL to arrive. Updated on completion of the wait_lsn operation.",
CRITICAL_OP_BUCKETS.into(),
)
.expect("failed to define a metric")
});
pub(crate) static WAIT_LSN_START_FINISH_COUNTERPAIR: Lazy<IntCounterPairVec> = Lazy::new(|| {
register_int_counter_pair_vec!(
"pageserver_wait_lsn_started_count",
"Number of wait_lsn operations started.",
"pageserver_wait_lsn_finished_count",
"Number of wait_lsn operations finished.",
&["tenant_id", "shard_id", "timeline_id"],
)
.expect("failed to define a metric")
});
pub(crate) static WAIT_LSN_IN_PROGRESS_MICROS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_wait_lsn_in_progress_micros",
"Time spent waiting for WAL to arrive, by timeline_id. Updated periodically while waiting.",
&["tenant_id", "shard_id", "timeline_id"],
)
.expect("failed to define a metric")
});
pub(crate) static WAIT_LSN_IN_PROGRESS_GLOBAL_MICROS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_wait_lsn_in_progress_micros_global",
"Time spent waiting for WAL to arrive, globally. Updated periodically while waiting."
)
.expect("failed to define a metric")
});
static FLUSH_WAIT_UPLOAD_TIME: Lazy<GaugeVec> = Lazy::new(|| {
register_gauge_vec!(
"pageserver_flush_wait_upload_seconds",
@@ -2830,7 +2858,6 @@ impl StorageTimeMetrics {
}
}
#[derive(Debug)]
pub(crate) struct TimelineMetrics {
tenant_id: String,
shard_id: String,
@@ -2863,6 +2890,8 @@ pub(crate) struct TimelineMetrics {
pub valid_lsn_lease_count_gauge: UIntGauge,
pub wal_records_received: IntCounter,
pub storage_io_size: StorageIoSizeMetrics,
pub wait_lsn_in_progress_micros: GlobalAndPerTenantIntCounter,
pub wait_lsn_start_finish_counterpair: IntCounterPair,
shutdown: std::sync::atomic::AtomicBool,
}
@@ -3000,6 +3029,17 @@ impl TimelineMetrics {
let storage_io_size = StorageIoSizeMetrics::new(&tenant_id, &shard_id, &timeline_id);
let wait_lsn_in_progress_micros = GlobalAndPerTenantIntCounter {
global: WAIT_LSN_IN_PROGRESS_GLOBAL_MICROS.clone(),
per_tenant: WAIT_LSN_IN_PROGRESS_MICROS
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap(),
};
let wait_lsn_start_finish_counterpair = WAIT_LSN_START_FINISH_COUNTERPAIR
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
TimelineMetrics {
tenant_id,
shard_id,
@@ -3032,6 +3072,8 @@ impl TimelineMetrics {
storage_io_size,
valid_lsn_lease_count_gauge,
wal_records_received,
wait_lsn_in_progress_micros,
wait_lsn_start_finish_counterpair,
shutdown: std::sync::atomic::AtomicBool::default(),
}
}
@@ -3224,6 +3266,15 @@ impl TimelineMetrics {
let _ = STORAGE_IO_SIZE.remove_label_values(&[op, tenant_id, shard_id, timeline_id]);
}
let _ =
WAIT_LSN_IN_PROGRESS_MICROS.remove_label_values(&[tenant_id, shard_id, timeline_id]);
{
let mut res = [Ok(()), Ok(())];
WAIT_LSN_START_FINISH_COUNTERPAIR
.remove_label_values(&mut res, &[tenant_id, shard_id, timeline_id]);
}
let _ = SMGR_QUERY_STARTED_PER_TENANT_TIMELINE.remove_label_values(&[
SmgrQueryType::GetPageAtLsn.into(),
tenant_id,
@@ -3836,27 +3887,29 @@ pub mod tokio_epoll_uring {
});
}
pub(crate) struct GlobalAndPerTenantIntCounter {
global: IntCounter,
per_tenant: IntCounter,
}
impl GlobalAndPerTenantIntCounter {
#[inline(always)]
pub(crate) fn inc(&self) {
self.inc_by(1)
}
#[inline(always)]
pub(crate) fn inc_by(&self, n: u64) {
self.global.inc_by(n);
self.per_tenant.inc_by(n);
}
}
pub(crate) mod tenant_throttling {
use metrics::{IntCounter, register_int_counter_vec};
use metrics::register_int_counter_vec;
use once_cell::sync::Lazy;
use utils::shard::TenantShardId;
pub(crate) struct GlobalAndPerTenantIntCounter {
global: IntCounter,
per_tenant: IntCounter,
}
impl GlobalAndPerTenantIntCounter {
#[inline(always)]
pub(crate) fn inc(&self) {
self.inc_by(1)
}
#[inline(always)]
pub(crate) fn inc_by(&self, n: u64) {
self.global.inc_by(n);
self.per_tenant.inc_by(n);
}
}
use super::GlobalAndPerTenantIntCounter;
pub(crate) struct Metrics<const KIND: usize> {
pub(super) count_accounted_start: GlobalAndPerTenantIntCounter,
@@ -4102,6 +4155,7 @@ pub fn preinitialize_metrics(conf: &'static PageServerConf) {
&CIRCUIT_BREAKERS_BROKEN,
&CIRCUIT_BREAKERS_UNBROKEN,
&PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL,
&WAIT_LSN_IN_PROGRESS_GLOBAL_MICROS,
]
.into_iter()
.for_each(|c| {

View File

@@ -1106,12 +1106,19 @@ impl PageServerHandler {
};
// Dispatch the batch to the appropriate request handler.
let (mut handler_results, span) = log_slow(
batch.as_static_str(),
LOG_SLOW_GETPAGE_THRESHOLD,
self.pagestream_dispatch_batched_message(batch, io_concurrency, ctx),
)
.await?;
let log_slow_name = batch.as_static_str();
let (mut handler_results, span) = {
// TODO: we unfortunately have to pin the future on the heap, since GetPage futures are huge and
// won't fit on the stack.
let mut boxpinned =
Box::pin(self.pagestream_dispatch_batched_message(batch, io_concurrency, ctx));
log_slow(
log_slow_name,
LOG_SLOW_GETPAGE_THRESHOLD,
boxpinned.as_mut(),
)
.await?
};
// We purposefully don't count flush time into the smgr operation timer.
//

View File

@@ -5754,7 +5754,7 @@ pub(crate) mod harness {
logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
logging::Output::Stdout,
)
.expect("Failed to init test logging")
.expect("Failed to init test logging");
});
}

View File

@@ -219,7 +219,11 @@ impl LocationConf {
};
let shard = if conf.shard_count == 0 {
ShardIdentity::unsharded()
// NB: carry over the persisted stripe size instead of using the default. This doesn't
// matter for most practical purposes, since unsharded tenants don't use the stripe
// size, but can cause inconsistencies between storcon and Pageserver and cause manual
// splits without `new_stripe_size` to use an unintended stripe size.
ShardIdentity::unsharded_with_stripe_size(ShardStripeSize(conf.shard_stripe_size))
} else {
ShardIdentity::new(
ShardNumber(conf.shard_number),

View File

@@ -9,7 +9,7 @@ use camino::Utf8PathBuf;
use num_traits::Num;
use pageserver_api::shard::TenantShardId;
use tokio_epoll_uring::{BoundedBuf, Slice};
use tracing::error;
use tracing::{error, info_span};
use utils::id::TimelineId;
use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64};
@@ -76,6 +76,7 @@ impl EphemeralFile {
|| IoBufferMut::with_capacity(TAIL_SZ),
gate.enter()?,
ctx,
info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename),
),
_gate_guard: gate.enter()?,
})

View File

@@ -300,9 +300,8 @@ impl TimelineMetadata {
/// Returns true if anything was changed
pub fn detach_from_ancestor(&mut self, branchpoint: &(TimelineId, Lsn)) {
if let Some(ancestor) = self.body.ancestor_timeline {
assert_eq!(ancestor, branchpoint.0);
}
// Detaching from ancestor now doesn't always detach directly to the direct ancestor, but we
// ensure the LSN is the same. So we don't check the timeline ID.
if self.body.ancestor_lsn != Lsn(0) {
assert_eq!(self.body.ancestor_lsn, branchpoint.1);
}

View File

@@ -1914,6 +1914,7 @@ impl TenantManager {
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
prepared: PreparedTimelineDetach,
behavior: detach_ancestor::DetachBehavior,
mut attempt: detach_ancestor::Attempt,
ctx: &RequestContext,
) -> Result<HashSet<TimelineId>, detach_ancestor::Error> {
@@ -1957,7 +1958,14 @@ impl TenantManager {
.map_err(Error::NotFound)?;
let resp = timeline
.detach_from_ancestor_and_reparent(&tenant, prepared, ctx)
.detach_from_ancestor_and_reparent(
&tenant,
prepared,
attempt.ancestor_timeline_id,
attempt.ancestor_lsn,
behavior,
ctx,
)
.await?;
let mut slot_guard = slot_guard;

View File

@@ -229,6 +229,7 @@ async fn download_object(
|| IoBufferMut::with_capacity(super::BUFFER_SIZE),
gate.enter().map_err(|_| DownloadError::Cancelled)?,
ctx,
tracing::info_span!(parent: None, "download_object_buffered_writer", %dst_path),
);
// TODO: use vectored write (writev) once supported by tokio-epoll-uring.

View File

@@ -67,6 +67,7 @@ use tracing::*;
use utils::generation::Generation;
use utils::guard_arc_swap::GuardArcSwap;
use utils::id::TimelineId;
use utils::logging::{MonitorSlowFutureCallback, monitor_slow_future};
use utils::lsn::{AtomicLsn, Lsn, RecordLsn};
use utils::postgres_client::PostgresClientProtocol;
use utils::rate_limit::RateLimit;
@@ -439,6 +440,8 @@ pub struct Timeline {
heatmap_layers_downloader: Mutex<Option<heatmap_layers_downloader::HeatmapLayersDownloader>>,
pub(crate) rel_size_v2_status: ArcSwapOption<RelSizeMigration>,
wait_lsn_log_slow: tokio::sync::Semaphore,
}
pub(crate) enum PreviousHeatmap {
@@ -1479,17 +1482,67 @@ impl Timeline {
WaitLsnTimeout::Default => self.conf.wait_lsn_timeout,
};
let _timer = crate::metrics::WAIT_LSN_TIME.start_timer();
let timer = crate::metrics::WAIT_LSN_TIME.start_timer();
let start_finish_counterpair_guard = self.metrics.wait_lsn_start_finish_counterpair.guard();
match self.last_record_lsn.wait_for_timeout(lsn, timeout).await {
let wait_for_timeout = self.last_record_lsn.wait_for_timeout(lsn, timeout);
let wait_for_timeout = std::pin::pin!(wait_for_timeout);
// Use threshold of 1 because even 1 second of wait for ingest is very much abnormal.
let log_slow_threshold = Duration::from_secs(1);
// Use period of 10 to avoid flooding logs during an outage that affects all timelines.
let log_slow_period = Duration::from_secs(10);
let mut logging_permit = None;
let wait_for_timeout = monitor_slow_future(
log_slow_threshold,
log_slow_period,
wait_for_timeout,
|MonitorSlowFutureCallback {
ready,
is_slow,
elapsed_total,
elapsed_since_last_callback,
}| {
self.metrics
.wait_lsn_in_progress_micros
.inc_by(u64::try_from(elapsed_since_last_callback.as_micros()).unwrap());
if !is_slow {
return;
}
// It's slow, see if we should log it.
// (We limit the logging to one per invocation per timeline to avoid excessive
// logging during an extended broker / networking outage that affects all timelines.)
if logging_permit.is_none() {
logging_permit = self.wait_lsn_log_slow.try_acquire().ok();
}
if logging_permit.is_none() {
return;
}
// We log it.
if ready {
info!(
"slow wait_lsn completed after {:.3}s",
elapsed_total.as_secs_f64()
);
} else {
info!(
"slow wait_lsn still running for {:.3}s",
elapsed_total.as_secs_f64()
);
}
},
);
let res = wait_for_timeout.await;
// don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo
drop(logging_permit);
drop(start_finish_counterpair_guard);
drop(timer);
match res {
Ok(()) => Ok(()),
Err(e) => {
use utils::seqwait::SeqWaitError::*;
match e {
Shutdown => Err(WaitLsnError::Shutdown),
Timeout => {
// don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo
drop(_timer);
let walreceiver_status = self.walreceiver_status();
Err(WaitLsnError::Timeout(format!(
"Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}, WalReceiver status: {}",
@@ -2423,8 +2476,9 @@ impl Timeline {
}
fn get_l0_flush_delay_threshold(&self) -> Option<usize> {
// Disable L0 flushes by default. This and compaction needs further tuning.
const DEFAULT_L0_FLUSH_DELAY_FACTOR: usize = 0; // TODO: default to e.g. 3
// By default, delay L0 flushes at 3x the compaction threshold. The compaction threshold
// defaults to 10, and L0 compaction is generally able to keep L0 counts below 30.
const DEFAULT_L0_FLUSH_DELAY_FACTOR: usize = 3;
// If compaction is disabled, don't delay.
if self.get_compaction_period() == Duration::ZERO {
@@ -2452,8 +2506,9 @@ impl Timeline {
}
fn get_l0_flush_stall_threshold(&self) -> Option<usize> {
// Disable L0 stalls by default. In ingest benchmarks, we see image compaction take >10
// minutes, blocking L0 compaction, and we can't stall L0 flushes for that long.
// Disable L0 stalls by default. Stalling can cause unavailability if L0 compaction isn't
// responsive, and it can e.g. block on other compaction via the compaction semaphore or
// sibling timelines. We need more confidence before enabling this.
const DEFAULT_L0_FLUSH_STALL_FACTOR: usize = 0; // TODO: default to e.g. 5
// If compaction is disabled, don't stall.
@@ -2821,6 +2876,8 @@ impl Timeline {
heatmap_layers_downloader: Mutex::new(None),
rel_size_v2_status: ArcSwapOption::from_pointee(rel_size_v2_status),
wait_lsn_log_slow: tokio::sync::Semaphore::new(1),
};
result.repartition_threshold =
@@ -5388,9 +5445,10 @@ impl Timeline {
self: &Arc<Timeline>,
tenant: &crate::tenant::Tenant,
options: detach_ancestor::Options,
behavior: detach_ancestor::DetachBehavior,
ctx: &RequestContext,
) -> Result<detach_ancestor::Progress, detach_ancestor::Error> {
detach_ancestor::prepare(self, tenant, options, ctx).await
detach_ancestor::prepare(self, tenant, behavior, options, ctx).await
}
/// Second step of detach from ancestor; detaches the `self` from it's current ancestor and
@@ -5406,9 +5464,21 @@ impl Timeline {
self: &Arc<Timeline>,
tenant: &crate::tenant::Tenant,
prepared: detach_ancestor::PreparedTimelineDetach,
ancestor_timeline_id: TimelineId,
ancestor_lsn: Lsn,
behavior: detach_ancestor::DetachBehavior,
ctx: &RequestContext,
) -> Result<detach_ancestor::DetachingAndReparenting, detach_ancestor::Error> {
detach_ancestor::detach_and_reparent(self, tenant, prepared, ctx).await
detach_ancestor::detach_and_reparent(
self,
tenant,
prepared,
ancestor_timeline_id,
ancestor_lsn,
behavior,
ctx,
)
.await
}
/// Final step which unblocks the GC.

View File

@@ -3189,7 +3189,11 @@ impl Timeline {
}
// TODO: move the below part to the loop body
let last_key = last_key.expect("no keys produced during compaction");
let Some(last_key) = last_key else {
return Err(CompactionError::Other(anyhow!(
"no keys produced during compaction"
)));
};
stat.on_unique_key_visited();
let retention = self

View File

@@ -32,6 +32,9 @@ pub(crate) enum Error {
#[error("too many ancestors")]
TooManyAncestors,
#[error("ancestor is not empty")]
AncestorNotEmpty,
#[error("shutting down, please retry later")]
ShuttingDown,
@@ -89,7 +92,9 @@ impl From<Error> for ApiError {
fn from(value: Error) -> Self {
match value {
Error::NoAncestor => ApiError::Conflict(value.to_string()),
Error::TooManyAncestors => ApiError::BadRequest(anyhow::anyhow!("{value}")),
Error::TooManyAncestors | Error::AncestorNotEmpty => {
ApiError::BadRequest(anyhow::anyhow!("{value}"))
}
Error::ShuttingDown => ApiError::ShuttingDown,
Error::Archived(_) => ApiError::BadRequest(anyhow::anyhow!("{value}")),
Error::OtherTimelineDetachOngoing(_) | Error::FailedToReparentAll => {
@@ -127,13 +132,37 @@ pub(crate) struct PreparedTimelineDetach {
layers: Vec<Layer>,
}
/// TODO: this should be part of PageserverConf because we cannot easily modify cplane arguments.
// TODO: this should be part of PageserverConf because we cannot easily modify cplane arguments.
#[derive(Debug)]
pub(crate) struct Options {
pub(crate) rewrite_concurrency: std::num::NonZeroUsize,
pub(crate) copy_concurrency: std::num::NonZeroUsize,
}
/// Controls the detach ancestor behavior.
/// - When set to `NoAncestorAndReparent`, we will only detach a branch if its ancestor is a root branch. It will automatically reparent any children of the ancestor before and at the branch point.
/// - When set to `MultiLevelAndNoReparent`, we will detach a branch from multiple levels of ancestors, and no reparenting will happen at all.
#[derive(Debug, Clone, Copy, Default)]
pub enum DetachBehavior {
#[default]
NoAncestorAndReparent,
MultiLevelAndNoReparent,
}
impl std::str::FromStr for DetachBehavior {
type Err = &'static str;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"no_ancestor_and_reparent" => Ok(DetachBehavior::NoAncestorAndReparent),
"multi_level_and_no_reparent" => Ok(DetachBehavior::MultiLevelAndNoReparent),
"v1" => Ok(DetachBehavior::NoAncestorAndReparent),
"v2" => Ok(DetachBehavior::MultiLevelAndNoReparent),
_ => Err("cannot parse detach behavior"),
}
}
}
impl Default for Options {
fn default() -> Self {
Self {
@@ -147,7 +176,8 @@ impl Default for Options {
#[derive(Debug)]
pub(crate) struct Attempt {
pub(crate) timeline_id: TimelineId,
pub(crate) ancestor_timeline_id: TimelineId,
pub(crate) ancestor_lsn: Lsn,
_guard: completion::Completion,
gate_entered: Option<utils::sync::gate::GateGuard>,
}
@@ -167,25 +197,30 @@ impl Attempt {
pub(super) async fn prepare(
detached: &Arc<Timeline>,
tenant: &Tenant,
behavior: DetachBehavior,
options: Options,
ctx: &RequestContext,
) -> Result<Progress, Error> {
use Error::*;
let Some((ancestor, ancestor_lsn)) = detached
let Some((mut ancestor, mut ancestor_lsn)) = detached
.ancestor_timeline
.as_ref()
.map(|tl| (tl.clone(), detached.ancestor_lsn))
else {
let ancestor_id;
let ancestor_lsn;
let still_in_progress = {
let accessor = detached.remote_client.initialized_upload_queue()?;
// we are safe to inspect the latest uploaded, because we can only witness this after
// restart is complete and ancestor is no more.
let latest = accessor.latest_uploaded_index_part();
if latest.lineage.detached_previous_ancestor().is_none() {
let Some((id, lsn)) = latest.lineage.detached_previous_ancestor() else {
return Err(NoAncestor);
};
ancestor_id = id;
ancestor_lsn = lsn;
latest
.gc_blocking
@@ -196,7 +231,8 @@ pub(super) async fn prepare(
if still_in_progress {
// gc is still blocked, we can still reparent and complete.
// we are safe to reparent remaining, because they were locked in in the beginning.
let attempt = continue_with_blocked_gc(detached, tenant).await?;
let attempt =
continue_with_blocked_gc(detached, tenant, ancestor_id, ancestor_lsn).await?;
// because the ancestor of detached is already set to none, we have published all
// of the layers, so we are still "prepared."
@@ -224,13 +260,34 @@ pub(super) async fn prepare(
check_no_archived_children_of_ancestor(tenant, detached, &ancestor, ancestor_lsn)?;
if ancestor.ancestor_timeline.is_some() {
if let DetachBehavior::MultiLevelAndNoReparent = behavior {
// If the ancestor has an ancestor, we might be able to fast-path detach it if the current ancestor does not have any data written/used by the detaching timeline.
while let Some(ancestor_of_ancestor) = ancestor.ancestor_timeline.clone() {
if ancestor_lsn != ancestor.ancestor_lsn {
// non-technical requirement; we could flatten still if ancestor LSN does not match but that needs
// us to copy and cut more layers.
return Err(AncestorNotEmpty);
}
// Use the ancestor of the ancestor as the new ancestor (only when the ancestor LSNs are the same)
ancestor_lsn = ancestor.ancestor_lsn; // Get the LSN first before resetting the `ancestor` variable
ancestor = ancestor_of_ancestor;
// TODO: do we still need to check if we don't want to reparent?
check_no_archived_children_of_ancestor(tenant, detached, &ancestor, ancestor_lsn)?;
}
} else if ancestor.ancestor_timeline.is_some() {
// non-technical requirement; we could flatten N ancestors just as easily but we chose
// not to, at least initially
return Err(TooManyAncestors);
}
let attempt = start_new_attempt(detached, tenant).await?;
tracing::info!(
"attempt to detach the timeline from the ancestor: {}@{}, behavior={:?}",
ancestor.timeline_id,
ancestor_lsn,
behavior
);
let attempt = start_new_attempt(detached, tenant, ancestor.timeline_id, ancestor_lsn).await?;
utils::pausable_failpoint!("timeline-detach-ancestor::before_starting_after_locking-pausable");
@@ -450,8 +507,13 @@ pub(super) async fn prepare(
Ok(Progress::Prepared(attempt, prepared))
}
async fn start_new_attempt(detached: &Timeline, tenant: &Tenant) -> Result<Attempt, Error> {
let attempt = obtain_exclusive_attempt(detached, tenant)?;
async fn start_new_attempt(
detached: &Timeline,
tenant: &Tenant,
ancestor_timeline_id: TimelineId,
ancestor_lsn: Lsn,
) -> Result<Attempt, Error> {
let attempt = obtain_exclusive_attempt(detached, tenant, ancestor_timeline_id, ancestor_lsn)?;
// insert the block in the index_part.json, if not already there.
let _dont_care = tenant
@@ -466,13 +528,23 @@ async fn start_new_attempt(detached: &Timeline, tenant: &Tenant) -> Result<Attem
Ok(attempt)
}
async fn continue_with_blocked_gc(detached: &Timeline, tenant: &Tenant) -> Result<Attempt, Error> {
async fn continue_with_blocked_gc(
detached: &Timeline,
tenant: &Tenant,
ancestor_timeline_id: TimelineId,
ancestor_lsn: Lsn,
) -> Result<Attempt, Error> {
// FIXME: it would be nice to confirm that there is an in-memory version, since we've just
// verified there is a persistent one?
obtain_exclusive_attempt(detached, tenant)
obtain_exclusive_attempt(detached, tenant, ancestor_timeline_id, ancestor_lsn)
}
fn obtain_exclusive_attempt(detached: &Timeline, tenant: &Tenant) -> Result<Attempt, Error> {
fn obtain_exclusive_attempt(
detached: &Timeline,
tenant: &Tenant,
ancestor_timeline_id: TimelineId,
ancestor_lsn: Lsn,
) -> Result<Attempt, Error> {
use Error::{OtherTimelineDetachOngoing, ShuttingDown};
// ensure we are the only active attempt for this tenant
@@ -493,6 +565,8 @@ fn obtain_exclusive_attempt(detached: &Timeline, tenant: &Tenant) -> Result<Atte
Ok(Attempt {
timeline_id: detached.timeline_id,
ancestor_timeline_id,
ancestor_lsn,
_guard: guard,
gate_entered: Some(_gate_entered),
})
@@ -795,6 +869,9 @@ pub(super) async fn detach_and_reparent(
detached: &Arc<Timeline>,
tenant: &Tenant,
prepared: PreparedTimelineDetach,
ancestor_timeline_id: TimelineId,
ancestor_lsn: Lsn,
behavior: DetachBehavior,
_ctx: &RequestContext,
) -> Result<DetachingAndReparenting, Error> {
let PreparedTimelineDetach { layers } = prepared;
@@ -822,7 +899,30 @@ pub(super) async fn detach_and_reparent(
"cannot (detach? reparent)? complete if the operation is not still ongoing"
);
let ancestor = match (detached.ancestor_timeline.as_ref(), recorded_branchpoint) {
let ancestor_to_detach = match detached.ancestor_timeline.as_ref() {
Some(mut ancestor) => {
while ancestor.timeline_id != ancestor_timeline_id {
match ancestor.ancestor_timeline.as_ref() {
Some(found) => {
if ancestor_lsn != ancestor.ancestor_lsn {
return Err(Error::DetachReparent(anyhow::anyhow!(
"cannot find the ancestor timeline to detach from: wrong ancestor lsn"
)));
}
ancestor = found;
}
None => {
return Err(Error::DetachReparent(anyhow::anyhow!(
"cannot find the ancestor timeline to detach from"
)));
}
}
}
Some(ancestor)
}
None => None,
};
let ancestor = match (ancestor_to_detach, recorded_branchpoint) {
(Some(ancestor), None) => {
assert!(
!layers.is_empty(),
@@ -895,6 +995,11 @@ pub(super) async fn detach_and_reparent(
Ancestor::Detached(ancestor, ancestor_lsn) => (ancestor, ancestor_lsn, false),
};
if let DetachBehavior::MultiLevelAndNoReparent = behavior {
// Do not reparent if the user requests to behave so.
return Ok(DetachingAndReparenting::Reparented(HashSet::new()));
}
let mut tasks = tokio::task::JoinSet::new();
// Returns a single permit semaphore which will be used to make one reparenting succeed,
@@ -1032,6 +1137,11 @@ pub(super) async fn complete(
}
/// Query against a locked `Tenant::timelines`.
///
/// A timeline is reparentable if:
///
/// - It is not the timeline being detached.
/// - It has the same ancestor as the timeline being detached. Note that the ancestor might not be the direct ancestor.
fn reparentable_timelines<'a, I>(
timelines: I,
detached: &'a Arc<Timeline>,

View File

@@ -1299,9 +1299,8 @@ impl OwnedAsyncWriter for VirtualFile {
buf: FullSlice<Buf>,
offset: u64,
ctx: &RequestContext,
) -> std::io::Result<FullSlice<Buf>> {
let (buf, res) = VirtualFile::write_all_at(self, buf, offset, ctx).await;
res.map(|_| buf)
) -> (FullSlice<Buf>, std::io::Result<()>) {
VirtualFile::write_all_at(self, buf, offset, ctx).await
}
}

View File

@@ -31,7 +31,7 @@ pub trait OwnedAsyncWriter {
buf: FullSlice<Buf>,
offset: u64,
ctx: &RequestContext,
) -> impl std::future::Future<Output = std::io::Result<FullSlice<Buf>>> + Send;
) -> impl std::future::Future<Output = (FullSlice<Buf>, std::io::Result<()>)> + Send;
}
/// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch
@@ -66,6 +66,7 @@ where
buf_new: impl Fn() -> B,
gate_guard: utils::sync::gate::GateGuard,
ctx: &RequestContext,
flush_task_span: tracing::Span,
) -> Self {
Self {
writer: writer.clone(),
@@ -75,6 +76,7 @@ where
buf_new(),
gate_guard,
ctx.attached_child(),
flush_task_span,
),
bytes_submitted: 0,
}
@@ -269,12 +271,12 @@ mod tests {
buf: FullSlice<Buf>,
offset: u64,
_: &RequestContext,
) -> std::io::Result<FullSlice<Buf>> {
) -> (FullSlice<Buf>, std::io::Result<()>) {
self.writes
.lock()
.unwrap()
.push((Vec::from(&buf[..]), offset));
Ok(buf)
(buf, Ok(()))
}
}
@@ -293,6 +295,7 @@ mod tests {
|| IoBufferMut::with_capacity(2),
gate.enter()?,
ctx,
tracing::Span::none(),
);
writer.write_buffered_borrowed(b"abc", ctx).await?;

View File

@@ -1,9 +1,14 @@
use std::ops::ControlFlow;
use std::sync::Arc;
use once_cell::sync::Lazy;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, info, info_span, warn};
use utils::sync::duplex;
use super::{Buffer, CheapCloneForRead, OwnedAsyncWriter};
use crate::context::RequestContext;
use crate::virtual_file::MaybeFatalIo;
use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAligned;
use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice;
@@ -118,6 +123,7 @@ where
buf: B,
gate_guard: utils::sync::gate::GateGuard,
ctx: RequestContext,
span: tracing::Span,
) -> Self
where
B: Buffer<IoBuf = Buf> + Send + 'static,
@@ -125,11 +131,14 @@ where
// It is fine to buffer up to only 1 message. We only 1 message in-flight at a time.
let (front, back) = duplex::mpsc::channel(1);
let join_handle = tokio::spawn(async move {
FlushBackgroundTask::new(back, file, gate_guard, ctx)
.run(buf.flush())
.await
});
let join_handle = tokio::spawn(
async move {
FlushBackgroundTask::new(back, file, gate_guard, ctx)
.run(buf.flush())
.await
}
.instrument(span),
);
FlushHandle {
inner: Some(FlushHandleInner {
@@ -236,6 +245,7 @@ where
/// The passed in slice is immediately sent back to the flush handle through the duplex channel.
async fn run(mut self, slice: FullSlice<Buf>) -> std::io::Result<Arc<W>> {
// Sends the extra buffer back to the handle.
// TODO: can this ever await and or fail? I think not.
self.channel.send(slice).await.map_err(|_| {
std::io::Error::new(std::io::ErrorKind::BrokenPipe, "flush handle closed early")
})?;
@@ -251,10 +261,47 @@ where
}
// Write slice to disk at `offset`.
let slice = self
.writer
.write_all_at(request.slice, request.offset, &self.ctx)
.await?;
//
// Error handling happens according to the current policy of crashing
// on fatal IO errors and retrying in place otherwise (deeming all other errors retryable).
// (The upper layers of the Pageserver write path are not equipped to retry write errors
// becasuse they often deallocate the buffers that were already written).
//
// TODO: cancellation sensitiity.
// Without it, if we hit a bug where retrying is never successful,
// then we can't shut down the timeline/tenant/pageserver cleanly because
// layers of the Pageserver write path are holding the gate open for EphemeralFile.
//
// TODO: use utils::backoff::retry once async closures are actually usable
//
let mut slice_storage = Some(request.slice);
for attempt in 1.. {
let result = async {
if attempt > 1 {
info!("retrying flush");
}
let slice = slice_storage.take().expect(
"likely previous invocation of this future didn't get polled to completion",
);
let (slice, res) = self.writer.write_all_at(slice, request.offset, &self.ctx).await;
slice_storage = Some(slice);
let res = res.maybe_fatal_err("owned_buffers_io flush");
let Err(err) = res else {
return ControlFlow::Break(());
};
warn!(%err, "error flushing buffered writer buffer to disk, retrying after backoff");
static NO_CANCELLATION: Lazy<CancellationToken> = Lazy::new(CancellationToken::new);
utils::backoff::exponential_backoff(attempt, 1.0, 10.0, &NO_CANCELLATION).await;
ControlFlow::Continue(())
}
.instrument(info_span!("flush_attempt", %attempt))
.await;
match result {
ControlFlow::Break(()) => break,
ControlFlow::Continue(()) => continue,
}
}
let slice = slice_storage.expect("loop must have run at least once");
#[cfg(test)]
{

View File

@@ -50,20 +50,6 @@
#define MIN_RECONNECT_INTERVAL_USEC 1000
#define MAX_RECONNECT_INTERVAL_USEC 1000000
enum NeonEndpointType {
EP_TYPE_PRIMARY = 1,
EP_TYPE_REPLICA,
EP_TYPE_STATIC
};
static const struct config_enum_entry neon_endpoint_types[] = {
{"primary", EP_TYPE_PRIMARY, false},
{"replica", EP_TYPE_REPLICA, false},
{"static", EP_TYPE_STATIC, false},
{NULL, 0, false}
};
/* GUCs */
char *neon_timeline;
char *neon_tenant;
@@ -76,8 +62,6 @@ int flush_every_n_requests = 8;
int neon_protocol_version = 2;
static int neon_endpoint_type = 0;
static int max_reconnect_attempts = 60;
static int stripe_size;
@@ -408,7 +392,7 @@ pageserver_connect(shardno_t shard_no, int elevel)
{
const char *keywords[4];
const char *values[4];
char pid_str[24];
char pid_str[16];
int n_pgsql_params;
TimestampTz now;
int64 us_since_last_attempt;
@@ -461,22 +445,7 @@ pageserver_connect(shardno_t shard_no, int elevel)
*/
keywords[n_pgsql_params] = "application_name";
{
int ret;
switch (neon_endpoint_type)
{
case EP_TYPE_PRIMARY:
ret = snprintf(pid_str, sizeof(pid_str), "%d-%s", MyProcPid, "primary");
break;
case EP_TYPE_REPLICA:
ret = snprintf(pid_str, sizeof(pid_str), "%d-%s", MyProcPid, "replica");
break;
case EP_TYPE_STATIC:
ret = snprintf(pid_str, sizeof(pid_str), "%d-%s", MyProcPid, "static");
break;
default:
ret = snprintf(pid_str, sizeof(pid_str), "%d", MyProcPid);
break;
}
int ret = snprintf(pid_str, sizeof(pid_str), "%d", MyProcPid);
if (ret < 0 || ret >= (int)(sizeof(pid_str)))
elog(FATAL, "stack-allocated buffer too small to hold pid");
}
@@ -1401,17 +1370,6 @@ pg_init_libpagestore(void)
GUC_UNIT_MS,
NULL, NULL, NULL);
DefineCustomEnumVariable(
"neon.endpoint_type",
"The compute endpoint node type",
NULL,
&neon_endpoint_type,
EP_TYPE_PRIMARY,
neon_endpoint_types,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
relsize_hash_init();
if (page_server != NULL)

View File

@@ -76,6 +76,10 @@
#include "access/xlogrecovery.h"
#endif
#if PG_VERSION_NUM < 160000
typedef PGAlignedBlock PGIOAlignedBlock;
#endif
/*
* If DEBUG_COMPARE_LOCAL is defined, we pass through all the SMGR API
* calls to md.c, and *also* do the calls to the Page Server. On every
@@ -1803,7 +1807,7 @@ static XLogRecPtr
log_newpage_copy(NRelFileInfo * rinfo, ForkNumber forkNum, BlockNumber blkno,
Page page, bool page_std)
{
PGAlignedBlock copied_buffer;
PGIOAlignedBlock copied_buffer;
memcpy(copied_buffer.data, page, BLCKSZ);
return log_newpage(rinfo, forkNum, blkno, copied_buffer.data, page_std);
@@ -1820,7 +1824,7 @@ static XLogRecPtr
log_newpages_copy(NRelFileInfo * rinfo, ForkNumber forkNum, BlockNumber blkno,
BlockNumber nblocks, Page *pages, bool page_std)
{
PGAlignedBlock copied_buffer[XLR_MAX_BLOCK_ID];
PGIOAlignedBlock copied_buffer[XLR_MAX_BLOCK_ID];
BlockNumber blknos[XLR_MAX_BLOCK_ID];
Page pageptrs[XLR_MAX_BLOCK_ID];
int nregistered = 0;
@@ -1858,7 +1862,7 @@ log_newpages_copy(NRelFileInfo * rinfo, ForkNumber forkNum, BlockNumber blkno,
static bool
PageIsEmptyHeapPage(char *buffer)
{
PGAlignedBlock empty_page;
PGIOAlignedBlock empty_page;
PageInit((Page) empty_page.data, BLCKSZ, 0);
@@ -2847,7 +2851,7 @@ static void
neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum,
int nblocks, bool skipFsync)
{
const PGAlignedBlock buffer = {0};
const PGIOAlignedBlock buffer = {0};
int remblocks = nblocks;
XLogRecPtr lsn = 0;
@@ -2894,6 +2898,11 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum,
relpath(reln->smgr_rlocator, forkNum),
InvalidBlockNumber)));
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
mdzeroextend(reln, forkNum, blocknum, nblocks, skipFsync);
#endif
/* Don't log any pages if we're not allowed to do so. */
if (!XLogInsertAllowed())
return;
@@ -3389,15 +3398,16 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
if (forkNum == MAIN_FORKNUM && IS_LOCAL_REL(reln))
{
char pageserver_masked[BLCKSZ];
char mdbuf[BLCKSZ];
char mdbuf_masked[BLCKSZ];
PGIOAlignedBlock mdbuf;
PGIOAlignedBlock mdbuf_masked;
XLogRecPtr request_lsn = request_lsns.request_lsn;
mdread(reln, forkNum, blkno, mdbuf);
mdread(reln, forkNum, blkno, mdbuf.data);
memcpy(pageserver_masked, buffer, BLCKSZ);
memcpy(mdbuf_masked, mdbuf, BLCKSZ);
memcpy(mdbuf_masked.data, mdbuf.data, BLCKSZ);
if (PageIsNew((Page) mdbuf))
if (PageIsNew((Page) mdbuf.data))
{
if (!PageIsNew((Page) pageserver_masked))
{
@@ -3416,41 +3426,41 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forkNum,
(uint32) (request_lsn >> 32), (uint32) request_lsn,
hexdump_page(mdbuf));
hexdump_page(mdbuf.data));
}
else if (PageGetSpecialSize(mdbuf) == 0)
else if (PageGetSpecialSize(mdbuf.data) == 0)
{
/* assume heap */
RmgrTable[RM_HEAP_ID].rm_mask(mdbuf_masked, blkno);
RmgrTable[RM_HEAP_ID].rm_mask(mdbuf_masked.data, blkno);
RmgrTable[RM_HEAP_ID].rm_mask(pageserver_masked, blkno);
if (memcmp(mdbuf_masked, pageserver_masked, BLCKSZ) != 0)
if (memcmp(mdbuf_masked.data, pageserver_masked, BLCKSZ) != 0)
{
neon_log(PANIC, "heap buffers differ at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n------ MD ------\n%s\n------ Page Server ------\n%s\n",
blkno,
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forkNum,
(uint32) (request_lsn >> 32), (uint32) request_lsn,
hexdump_page(mdbuf_masked),
hexdump_page(mdbuf_masked.data),
hexdump_page(pageserver_masked));
}
}
else if (PageGetSpecialSize(mdbuf) == MAXALIGN(sizeof(BTPageOpaqueData)))
else if (PageGetSpecialSize(mdbuf.data) == MAXALIGN(sizeof(BTPageOpaqueData)))
{
if (((BTPageOpaqueData *) PageGetSpecialPointer(mdbuf))->btpo_cycleid < MAX_BT_CYCLE_ID)
if (((BTPageOpaqueData *) PageGetSpecialPointer(mdbuf.data))->btpo_cycleid < MAX_BT_CYCLE_ID)
{
/* assume btree */
RmgrTable[RM_BTREE_ID].rm_mask(mdbuf_masked, blkno);
RmgrTable[RM_BTREE_ID].rm_mask(mdbuf_masked.data, blkno);
RmgrTable[RM_BTREE_ID].rm_mask(pageserver_masked, blkno);
if (memcmp(mdbuf_masked, pageserver_masked, BLCKSZ) != 0)
if (memcmp(mdbuf_masked.data, pageserver_masked, BLCKSZ) != 0)
{
neon_log(PANIC, "btree buffers differ at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n------ MD ------\n%s\n------ Page Server ------\n%s\n",
blkno,
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forkNum,
(uint32) (request_lsn >> 32), (uint32) request_lsn,
hexdump_page(mdbuf_masked),
hexdump_page(mdbuf_masked.data),
hexdump_page(pageserver_masked));
}
}
@@ -3542,77 +3552,85 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
prefetch_pump_state(false);
#ifdef DEBUG_COMPARE_LOCAL
if (forkNum == MAIN_FORKNUM && IS_LOCAL_REL(reln))
if (forknum == MAIN_FORKNUM && IS_LOCAL_REL(reln))
{
char pageserver_masked[BLCKSZ];
char mdbuf[BLCKSZ];
char mdbuf_masked[BLCKSZ];
PGIOAlignedBlock mdbuf;
PGIOAlignedBlock mdbuf_masked;
XLogRecPtr request_lsn = request_lsns->request_lsn;
for (int i = 0; i < nblocks; i++)
{
BlockNumber blkno = blocknum + i;
if (!BITMAP_ISSET(read, i))
continue;
#if PG_MAJORVERSION_NUM >= 17
mdreadv(reln, forkNum, blkno + i, &mdbuf, 1);
{
void* mdbuffers[1] = { mdbuf.data };
mdreadv(reln, forknum, blkno, mdbuffers, 1);
}
#else
mdread(reln, forkNum, blkno + i, mdbuf);
mdread(reln, forknum, blkno, mdbuf.data);
#endif
memcpy(pageserver_masked, buffer, BLCKSZ);
memcpy(mdbuf_masked, mdbuf, BLCKSZ);
memcpy(pageserver_masked, buffers[i], BLCKSZ);
memcpy(mdbuf_masked.data, mdbuf.data, BLCKSZ);
if (PageIsNew((Page) mdbuf))
if (PageIsNew((Page) mdbuf.data))
{
if (!PageIsNew((Page) pageserver_masked))
{
neon_log(PANIC, "page is new in MD but not in Page Server at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n%s\n",
blkno,
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forkNum,
forknum,
(uint32) (request_lsn >> 32), (uint32) request_lsn,
hexdump_page(buffer));
hexdump_page(buffers[i]));
}
}
else if (PageIsNew((Page) buffer))
else if (PageIsNew((Page) buffers[i]))
{
neon_log(PANIC, "page is new in Page Server but not in MD at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n%s\n",
blkno,
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forkNum,
forknum,
(uint32) (request_lsn >> 32), (uint32) request_lsn,
hexdump_page(mdbuf));
hexdump_page(mdbuf.data));
}
else if (PageGetSpecialSize(mdbuf) == 0)
else if (PageGetSpecialSize(mdbuf.data) == 0)
{
/* assume heap */
RmgrTable[RM_HEAP_ID].rm_mask(mdbuf_masked, blkno);
RmgrTable[RM_HEAP_ID].rm_mask(mdbuf_masked.data, blkno);
RmgrTable[RM_HEAP_ID].rm_mask(pageserver_masked, blkno);
if (memcmp(mdbuf_masked, pageserver_masked, BLCKSZ) != 0)
if (memcmp(mdbuf_masked.data, pageserver_masked, BLCKSZ) != 0)
{
neon_log(PANIC, "heap buffers differ at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n------ MD ------\n%s\n------ Page Server ------\n%s\n",
blkno,
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forkNum,
forknum,
(uint32) (request_lsn >> 32), (uint32) request_lsn,
hexdump_page(mdbuf_masked),
hexdump_page(mdbuf_masked.data),
hexdump_page(pageserver_masked));
}
}
else if (PageGetSpecialSize(mdbuf) == MAXALIGN(sizeof(BTPageOpaqueData)))
else if (PageGetSpecialSize(mdbuf.data) == MAXALIGN(sizeof(BTPageOpaqueData)))
{
if (((BTPageOpaqueData *) PageGetSpecialPointer(mdbuf))->btpo_cycleid < MAX_BT_CYCLE_ID)
if (((BTPageOpaqueData *) PageGetSpecialPointer(mdbuf.data))->btpo_cycleid < MAX_BT_CYCLE_ID)
{
/* assume btree */
RmgrTable[RM_BTREE_ID].rm_mask(mdbuf_masked, blkno);
RmgrTable[RM_BTREE_ID].rm_mask(mdbuf_masked.data, blkno);
RmgrTable[RM_BTREE_ID].rm_mask(pageserver_masked, blkno);
if (memcmp(mdbuf_masked, pageserver_masked, BLCKSZ) != 0)
if (memcmp(mdbuf_masked.data, pageserver_masked, BLCKSZ) != 0)
{
neon_log(PANIC, "btree buffers differ at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n------ MD ------\n%s\n------ Page Server ------\n%s\n",
blkno,
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forkNum,
forknum,
(uint32) (request_lsn >> 32), (uint32) request_lsn,
hexdump_page(mdbuf_masked),
hexdump_page(mdbuf_masked.data),
hexdump_page(pageserver_masked));
}
}
@@ -3664,6 +3682,7 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo
switch (reln->smgr_relpersistence)
{
case 0:
#ifndef DEBUG_COMPARE_LOCAL
/* This is a bit tricky. Check if the relation exists locally */
if (mdexists(reln, forknum))
{
@@ -3682,6 +3701,7 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo
*/
return;
}
#endif
break;
case RELPERSISTENCE_PERMANENT:
@@ -3732,6 +3752,7 @@ neon_writev(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
switch (reln->smgr_relpersistence)
{
case 0:
#ifndef DEBUG_COMPARE_LOCAL
/* This is a bit tricky. Check if the relation exists locally */
if (mdexists(reln, forknum))
{
@@ -3747,6 +3768,7 @@ neon_writev(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
*/
return;
}
#endif
break;
case RELPERSISTENCE_PERMANENT:
@@ -3768,7 +3790,7 @@ neon_writev(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
mdwritev(reln, forknum, blocknum, &buffer, 1, skipFsync);
mdwritev(reln, forknum, blkno, buffers, nblocks, skipFsync);
#endif
}

View File

@@ -5,6 +5,7 @@ use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, bail, ensure};
use arc_swap::ArcSwapOption;
use camino::{Utf8Path, Utf8PathBuf};
use clap::Parser;
use compute_api::spec::LocalProxySpec;
@@ -27,6 +28,7 @@ use crate::config::{
};
use crate::control_plane::locks::ApiLocks;
use crate::control_plane::messages::{EndpointJwksResponse, JwksSettings};
use crate::ext::TaskExt;
use crate::http::health_server::AppMetrics;
use crate::intern::RoleNameInt;
use crate::metrics::{Metrics, ThreadPoolMetrics};
@@ -190,7 +192,11 @@ pub async fn run() -> anyhow::Result<()> {
// 2. The config file is written but the signal hook is not yet received
// 3. local_proxy completes startup but has no config loaded, despite there being a registerd config.
refresh_config_notify.notify_one();
tokio::spawn(refresh_config_loop(args.config_path, refresh_config_notify));
tokio::spawn(refresh_config_loop(
config,
args.config_path,
refresh_config_notify,
));
maintenance_tasks.spawn(crate::http::health_server::task_main(
metrics_listener,
@@ -269,7 +275,7 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig
};
Ok(Box::leak(Box::new(ProxyConfig {
tls_config: None,
tls_config: ArcSwapOption::from(None),
metric_collection: None,
http_config,
authentication_config: AuthenticationConfig {
@@ -311,14 +317,16 @@ enum RefreshConfigError {
Parse(#[from] serde_json::Error),
#[error(transparent)]
Validate(anyhow::Error),
#[error(transparent)]
Tls(anyhow::Error),
}
async fn refresh_config_loop(path: Utf8PathBuf, rx: Arc<Notify>) {
async fn refresh_config_loop(config: &ProxyConfig, path: Utf8PathBuf, rx: Arc<Notify>) {
let mut init = true;
loop {
rx.notified().await;
match refresh_config_inner(&path).await {
match refresh_config_inner(config, &path).await {
Ok(()) => {}
// don't log for file not found errors if this is the first time we are checking
// for computes that don't use local_proxy, this is not an error.
@@ -327,6 +335,9 @@ async fn refresh_config_loop(path: Utf8PathBuf, rx: Arc<Notify>) {
{
debug!(error=?e, ?path, "could not read config file");
}
Err(RefreshConfigError::Tls(e)) => {
error!(error=?e, ?path, "could not read TLS certificates");
}
Err(e) => {
error!(error=?e, ?path, "could not read config file");
}
@@ -336,7 +347,10 @@ async fn refresh_config_loop(path: Utf8PathBuf, rx: Arc<Notify>) {
}
}
async fn refresh_config_inner(path: &Utf8Path) -> Result<(), RefreshConfigError> {
async fn refresh_config_inner(
config: &ProxyConfig,
path: &Utf8Path,
) -> Result<(), RefreshConfigError> {
let bytes = tokio::fs::read(&path).await?;
let data: LocalProxySpec = serde_json::from_slice(&bytes)?;
@@ -406,5 +420,20 @@ async fn refresh_config_inner(path: &Utf8Path) -> Result<(), RefreshConfigError>
info!("successfully loaded new config");
JWKS_ROLE_MAP.store(Some(Arc::new(EndpointJwksResponse { jwks: jwks_set })));
if let Some(tls_config) = data.tls {
let tls_config = tokio::task::spawn_blocking(move || {
crate::tls::server_config::configure_tls(
&tls_config.key_path,
&tls_config.cert_path,
None,
false,
)
})
.await
.propagate_task_panic()
.map_err(RefreshConfigError::Tls)?;
config.tls_config.store(Some(Arc::new(tls_config)));
}
Ok(())
}

View File

@@ -4,6 +4,7 @@ use std::sync::Arc;
use std::time::Duration;
use anyhow::bail;
use arc_swap::ArcSwapOption;
use futures::future::Either;
use remote_storage::RemoteStorageConfig;
use tokio::net::TcpListener;
@@ -563,6 +564,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
(None, None) => None,
_ => bail!("either both or neither tls-key and tls-cert must be specified"),
};
let tls_config = ArcSwapOption::from(tls_config.map(Arc::new));
let backup_metric_collection_config = config::MetricBackupCollectionConfig {
remote_storage_config: args.metric_backup_collection_remote_storage.clone(),

View File

@@ -3,6 +3,7 @@ use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, Ok, bail, ensure};
use arc_swap::ArcSwapOption;
use clap::ValueEnum;
use remote_storage::RemoteStorageConfig;
@@ -17,7 +18,7 @@ pub use crate::tls::server_config::{TlsConfig, configure_tls};
use crate::types::Host;
pub struct ProxyConfig {
pub tls_config: Option<TlsConfig>,
pub tls_config: ArcSwapOption<TlsConfig>,
pub metric_collection: Option<MetricCollectionConfig>,
pub http_config: HttpConfig,
pub authentication_config: AuthenticationConfig,

View File

@@ -177,7 +177,8 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
let proto = ctx.protocol();
let request_gauge = metrics.connection_requests.guard(proto);
let tls = config.tls_config.as_ref();
let tls = config.tls_config.load();
let tls = tls.as_deref();
let record_handshake_error = !ctx.has_private_peer_addr();
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Client);

View File

@@ -46,7 +46,8 @@ pub async fn init() -> anyhow::Result<LoggingGuard> {
.expect("this should be a valid filter directive"),
);
let otlp_layer = tracing_utils::init_tracing("proxy").await;
let otlp_layer =
tracing_utils::init_tracing("proxy", tracing_utils::ExportConfig::default()).await;
let json_log_layer = if logfmt == LogFormat::Json {
Some(JsonLoggingLayer::new(

View File

@@ -114,7 +114,7 @@ pub(crate) async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
let mut read_buf = read_buf.reader();
let mut res = Ok(());
let accept = tokio_rustls::TlsAcceptor::from(tls.to_server_config())
let accept = tokio_rustls::TlsAcceptor::from(tls.pg_config.clone())
.accept_with(raw, |session| {
// push the early data to the tls session
while !read_buf.get_ref().is_empty() {

View File

@@ -278,7 +278,8 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
let proto = ctx.protocol();
let request_gauge = metrics.connection_requests.guard(proto);
let tls = config.tls_config.as_ref();
let tls = config.tls_config.load();
let tls = tls.as_deref();
let record_handshake_error = !ctx.has_private_peer_addr();
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Client);

View File

@@ -96,16 +96,18 @@ fn generate_tls_config<'a>(
.with_safe_default_protocol_versions()
.context("ring should support the default protocol versions")?
.with_no_client_auth()
.with_single_cert(vec![cert.clone()], key.clone_key())?
.into();
.with_single_cert(vec![cert.clone()], key.clone_key())?;
let mut cert_resolver = CertResolver::new();
cert_resolver.add_cert(key, vec![cert], true)?;
let common_names = cert_resolver.get_common_names();
let config = Arc::new(config);
TlsConfig {
config,
http_config: config.clone(),
pg_config: config,
common_names,
cert_resolver: Arc::new(cert_resolver),
}

View File

@@ -19,6 +19,7 @@ use std::pin::{Pin, pin};
use std::sync::Arc;
use anyhow::Context;
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use atomic_take::AtomicTake;
use bytes::Bytes;
@@ -117,18 +118,7 @@ pub async fn task_main(
auth_backend,
endpoint_rate_limiter: Arc::clone(&endpoint_rate_limiter),
});
let tls_acceptor: Arc<dyn MaybeTlsAcceptor> = match config.tls_config.as_ref() {
Some(config) => {
let mut tls_server_config = rustls::ServerConfig::clone(&config.to_server_config());
// prefer http2, but support http/1.1
tls_server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
Arc::new(tls_server_config)
}
None => {
warn!("TLS config is missing");
Arc::new(NoTls)
}
};
let tls_acceptor: Arc<dyn MaybeTlsAcceptor> = Arc::new(&config.tls_config);
let connections = tokio_util::task::task_tracker::TaskTracker::new();
connections.close(); // allows `connections.wait to complete`
@@ -216,22 +206,20 @@ pub(crate) type AsyncRW = Pin<Box<dyn AsyncReadWrite>>;
#[async_trait]
trait MaybeTlsAcceptor: Send + Sync + 'static {
async fn accept(self: Arc<Self>, conn: ChainRW<TcpStream>) -> std::io::Result<AsyncRW>;
async fn accept(&self, conn: ChainRW<TcpStream>) -> std::io::Result<AsyncRW>;
}
#[async_trait]
impl MaybeTlsAcceptor for rustls::ServerConfig {
async fn accept(self: Arc<Self>, conn: ChainRW<TcpStream>) -> std::io::Result<AsyncRW> {
Ok(Box::pin(TlsAcceptor::from(self).accept(conn).await?))
}
}
struct NoTls;
#[async_trait]
impl MaybeTlsAcceptor for NoTls {
async fn accept(self: Arc<Self>, conn: ChainRW<TcpStream>) -> std::io::Result<AsyncRW> {
Ok(Box::pin(conn))
impl MaybeTlsAcceptor for &'static ArcSwapOption<crate::config::TlsConfig> {
async fn accept(&self, conn: ChainRW<TcpStream>) -> std::io::Result<AsyncRW> {
match &*self.load() {
Some(config) => Ok(Box::pin(
TlsAcceptor::from(config.http_config.clone())
.accept(conn)
.await?,
)),
None => Ok(Box::pin(conn)),
}
}
}

View File

@@ -614,7 +614,9 @@ async fn handle_inner(
&config.authentication_config,
ctx,
request.headers(),
config.tls_config.as_ref(),
// todo: race condition?
// we're unlikely to change the common names.
config.tls_config.load().as_deref(),
)?;
info!(
user = conn_info.conn_info.user_info.user.as_str(),
@@ -860,7 +862,13 @@ impl QueryData {
let cancel_token = inner.cancel_token();
let res = match select(
pin!(query_to_json(config, &*inner, self, &mut 0, parsed_headers)),
pin!(query_to_json(
config,
&mut *inner,
self,
&mut 0,
parsed_headers
)),
pin!(cancel.cancelled()),
)
.await
@@ -944,7 +952,7 @@ impl BatchQueryData {
builder = builder.deferrable(true);
}
let transaction = builder
let mut transaction = builder
.start()
.await
.inspect_err(|_| {
@@ -957,7 +965,7 @@ impl BatchQueryData {
let json_output = match query_batch(
config,
cancel.child_token(),
&transaction,
&mut transaction,
self,
parsed_headers,
)
@@ -1009,7 +1017,7 @@ impl BatchQueryData {
async fn query_batch(
config: &'static HttpConfig,
cancel: CancellationToken,
transaction: &Transaction<'_>,
transaction: &mut Transaction<'_>,
queries: BatchQueryData,
parsed_headers: HttpHeaders,
) -> Result<String, SqlOverHttpError> {
@@ -1047,7 +1055,7 @@ async fn query_batch(
async fn query_to_json<T: GenericClient>(
config: &'static HttpConfig,
client: &T,
client: &mut T,
data: QueryData,
current_size: &mut usize,
parsed_headers: HttpHeaders,

View File

@@ -9,17 +9,14 @@ use rustls::pki_types::{CertificateDer, PrivateKeyDer};
use super::{PG_ALPN_PROTOCOL, TlsServerEndPoint};
pub struct TlsConfig {
pub config: Arc<rustls::ServerConfig>,
// unfortunate split since we cannot change the ALPN on demand.
// <https://github.com/rustls/rustls/issues/2260>
pub http_config: Arc<rustls::ServerConfig>,
pub pg_config: Arc<rustls::ServerConfig>,
pub common_names: HashSet<String>,
pub cert_resolver: Arc<CertResolver>,
}
impl TlsConfig {
pub fn to_server_config(&self) -> Arc<rustls::ServerConfig> {
self.config.clone()
}
}
/// Configure TLS for the main endpoint.
pub fn configure_tls(
key_path: &str,
@@ -71,8 +68,15 @@ pub fn configure_tls(
config.key_log = Arc::new(rustls::KeyLogFile::new());
}
let mut http_config = config.clone();
let mut pg_config = config;
http_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
pg_config.alpn_protocols = vec![b"postgresql".to_vec()];
Ok(TlsConfig {
config: Arc::new(config),
http_config: Arc::new(http_config),
pg_config: Arc::new(pg_config),
common_names,
cert_resolver,
})

View File

@@ -624,7 +624,16 @@ impl ComputeHook {
MaybeSendResult::Transmit((request, lock)) => (request, lock),
};
let result = if let Some(notify_url) = &self.config.compute_hook_url {
let compute_hook_url = if let Some(control_plane_url) = &self.config.control_plane_url {
Some(if control_plane_url.ends_with('/') {
format!("{control_plane_url}notify-attach")
} else {
format!("{control_plane_url}/notify-attach")
})
} else {
self.config.compute_hook_url.clone()
};
let result = if let Some(notify_url) = &compute_hook_url {
self.do_notify(notify_url, &request, cancel).await
} else {
self.do_notify_local(&request).await.map_err(|e| {

View File

@@ -71,6 +71,10 @@ struct Cli {
#[arg(long)]
compute_hook_url: Option<String>,
/// URL to control plane storage API prefix
#[arg(long)]
control_plane_url: Option<String>,
/// URL to connect to postgres, like postgresql://localhost:1234/storage_controller
#[arg(long)]
database_url: Option<String>,
@@ -313,11 +317,13 @@ async fn async_main() -> anyhow::Result<()> {
"Insecure config! One or more secrets is not set. This is only permitted in `--dev` mode"
);
}
StrictMode::Strict if args.compute_hook_url.is_none() => {
// Production systems should always have a compute hook set, to prevent falling
StrictMode::Strict
if args.compute_hook_url.is_none() && args.control_plane_url.is_none() =>
{
// Production systems should always have a control plane URL set, to prevent falling
// back to trying to use neon_local.
anyhow::bail!(
"`--compute-hook-url` is not set: this is only permitted in `--dev` mode"
"neither `--compute-hook-url` nor `--control-plane-url` are set: this is only permitted in `--dev` mode"
);
}
StrictMode::Strict => {
@@ -343,6 +349,7 @@ async fn async_main() -> anyhow::Result<()> {
control_plane_jwt_token: secrets.control_plane_jwt_token,
peer_jwt_token: secrets.peer_jwt_token,
compute_hook_url: args.compute_hook_url,
control_plane_url: args.control_plane_url,
max_offline_interval: args
.max_offline_interval
.map(humantime::Duration::into)

View File

@@ -1613,23 +1613,49 @@ pub(crate) struct TenantShardPersistence {
}
impl TenantShardPersistence {
fn get_shard_count(&self) -> Result<ShardCount, ShardConfigError> {
self.shard_count
.try_into()
.map(ShardCount)
.map_err(|_| ShardConfigError::InvalidCount)
}
fn get_shard_number(&self) -> Result<ShardNumber, ShardConfigError> {
self.shard_number
.try_into()
.map(ShardNumber)
.map_err(|_| ShardConfigError::InvalidNumber)
}
fn get_stripe_size(&self) -> Result<ShardStripeSize, ShardConfigError> {
self.shard_stripe_size
.try_into()
.map(ShardStripeSize)
.map_err(|_| ShardConfigError::InvalidStripeSize)
}
pub(crate) fn get_shard_identity(&self) -> Result<ShardIdentity, ShardConfigError> {
if self.shard_count == 0 {
Ok(ShardIdentity::unsharded())
// NB: carry over the stripe size from the persisted record, to avoid consistency check
// failures if the persisted value differs from the default stripe size. The stripe size
// doesn't really matter for unsharded tenants anyway.
Ok(ShardIdentity::unsharded_with_stripe_size(
self.get_stripe_size()?,
))
} else {
Ok(ShardIdentity::new(
ShardNumber(self.shard_number as u8),
ShardCount::new(self.shard_count as u8),
ShardStripeSize(self.shard_stripe_size as u32),
self.get_shard_number()?,
self.get_shard_count()?,
self.get_stripe_size()?,
)?)
}
}
pub(crate) fn get_tenant_shard_id(&self) -> Result<TenantShardId, hex::FromHexError> {
pub(crate) fn get_tenant_shard_id(&self) -> anyhow::Result<TenantShardId> {
Ok(TenantShardId {
tenant_id: TenantId::from_str(self.tenant_id.as_str())?,
shard_number: ShardNumber(self.shard_number as u8),
shard_count: ShardCount::new(self.shard_count as u8),
shard_number: self.get_shard_number()?,
shard_count: self.get_shard_count()?,
})
}
}

View File

@@ -363,6 +363,15 @@ pub struct Config {
/// assume it is running in a test environment and try to update neon_local.
pub compute_hook_url: Option<String>,
/// Prefix for storage API endpoints of the control plane. We use this prefix to compute
/// URLs that we use to send pageserver and safekeeper attachment locations.
/// If this is None, the compute hook will assume it is running in a test environment
/// and try to invoke neon_local instead.
///
/// For now, there is also `compute_hook_url` which allows configuration of the pageserver
/// specific endpoint, but it is in the process of being phased out.
pub control_plane_url: Option<String>,
/// Grace period within which a pageserver does not respond to heartbeats, but is still
/// considered active. Once the grace period elapses, the next heartbeat failure will
/// mark the pagseserver offline.
@@ -1995,21 +2004,41 @@ impl Service {
tracing::info!("Loaded {} LocationConfigs", configs.tenant_shards.len());
let mut cleanup = Vec::new();
let mut mismatched_locations = 0;
{
let mut locked = self.inner.write().unwrap();
for (tenant_shard_id, observed_loc) in configs.tenant_shards {
for (tenant_shard_id, reported) in configs.tenant_shards {
let Some(tenant_shard) = locked.tenants.get_mut(&tenant_shard_id) else {
cleanup.push(tenant_shard_id);
continue;
};
tenant_shard
let on_record = &mut tenant_shard
.observed
.locations
.insert(node.get_id(), ObservedStateLocation { conf: observed_loc });
.entry(node.get_id())
.or_insert_with(|| ObservedStateLocation { conf: None })
.conf;
// If the location reported by the node does not match our observed state,
// then we mark it as uncertain and let the background reconciliation loop
// deal with it.
//
// Note that this also covers net new locations reported by the node.
if *on_record != reported {
mismatched_locations += 1;
*on_record = None;
}
}
}
if mismatched_locations > 0 {
tracing::info!(
"Set observed state to None for {mismatched_locations} mismatched locations"
);
}
for tenant_shard_id in cleanup {
tracing::info!("Detaching {tenant_shard_id}");
match node
@@ -7865,6 +7894,9 @@ impl Service {
/// At most one tenant will be split per call: the one with the largest max logical size. It
/// will split 1 → 8 shards.
///
/// An unsharded tenant will get DEFAULT_STRIPE_SIZE, regardless of what its ShardIdentity says.
/// A sharded tenant will retain its stripe size, as splits do not allow changing it.
///
/// TODO: consider splitting based on total logical size rather than max logical size.
///
/// TODO: consider spawning multiple splits in parallel: this is only called once every 20
@@ -7910,6 +7942,16 @@ impl Service {
"Auto-splitting tenant for size threshold {split_threshold}: current size {split_candidate:?}"
);
// Retain the stripe size of sharded tenants, as splits don't allow changing it. Otherwise,
// use DEFAULT_STRIPE_SIZE for unsharded tenants -- their stripe size doesn't really matter,
// and if we change the default stripe size we want to use the new default rather than an
// old, persisted stripe size.
let new_stripe_size = match split_candidate.id.shard_count.count() {
0 => panic!("invalid shard count 0"),
1 => Some(ShardParameters::DEFAULT_STRIPE_SIZE),
2.. => None,
};
let this = self.clone();
tokio::spawn(
async move {
@@ -7923,7 +7965,7 @@ impl Service {
// because our max shard count is relatively low anyway. This policy
// will be adjusted in future once we support higher shard count.
new_shard_count: MAX_SHARDS.literal(),
new_stripe_size: Some(ShardParameters::DEFAULT_STRIPE_SIZE),
new_stripe_size,
},
)
.await

View File

@@ -19,7 +19,7 @@ if TYPE_CHECKING:
class ComputeReconfigure:
def __init__(self, server: HTTPServer):
self.server = server
self.control_plane_compute_hook_api = f"http://{server.host}:{server.port}/notify-attach"
self.control_plane_hooks_api = f"http://{server.host}:{server.port}/"
self.workloads: dict[TenantId, Any] = {}
self.on_notify: Callable[[Any], None] | None = None

View File

@@ -175,6 +175,9 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
counter("pageserver_tenant_throttling_count"),
counter("pageserver_timeline_wal_records_received"),
counter("pageserver_page_service_pagestream_flush_in_progress_micros"),
counter("pageserver_wait_lsn_in_progress_micros"),
counter("pageserver_wait_lsn_started_count"),
counter("pageserver_wait_lsn_finished_count"),
*histogram("pageserver_page_service_batch_size"),
*histogram("pageserver_page_service_pagestream_batch_wait_time_seconds"),
*PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,

View File

@@ -460,7 +460,7 @@ class NeonEnvBuilder:
self.overlay_mounts_created_by_us: list[tuple[str, Path]] = []
self.config_init_force: str | None = None
self.top_output_dir = top_output_dir
self.control_plane_compute_hook_api: str | None = None
self.control_plane_hooks_api: str | None = None
self.storage_controller_config: dict[Any, Any] | None = None
# Flag to enable https listener in pageserver, generate local ssl certs,
@@ -1116,7 +1116,7 @@ class NeonEnv:
self.control_plane_api: str = self.storage_controller.upcall_api_endpoint()
# For testing this with a fake HTTP server, enable passing through a URL from config
self.control_plane_compute_hook_api = config.control_plane_compute_hook_api
self.control_plane_hooks_api = config.control_plane_hooks_api
self.pageserver_virtual_file_io_engine = config.pageserver_virtual_file_io_engine
self.pageserver_virtual_file_io_mode = config.pageserver_virtual_file_io_mode
@@ -1137,8 +1137,8 @@ class NeonEnv:
if self.control_plane_api is not None:
cfg["control_plane_api"] = self.control_plane_api
if self.control_plane_compute_hook_api is not None:
cfg["control_plane_compute_hook_api"] = self.control_plane_compute_hook_api
if self.control_plane_hooks_api is not None:
cfg["control_plane_hooks_api"] = self.control_plane_hooks_api
storage_controller_config = self.storage_controller_config

View File

@@ -1070,11 +1070,14 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
tenant_id: TenantId | TenantShardId,
timeline_id: TimelineId,
batch_size: int | None = None,
behavior_v2: bool = False,
**kwargs,
) -> set[TimelineId]:
params = {}
params: dict[str, Any] = {}
if batch_size is not None:
params["batch_size"] = batch_size
if behavior_v2:
params["detach_behavior"] = "v2"
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/detach_ancestor",
params=params,

View File

@@ -41,24 +41,24 @@ def test_compute_ctl_api_latencies(
zenbenchmark.record(
"status_response_latency_p50_us",
status_response_latency_us[len(status_response_latency_us) // 2],
"microseconds",
"μs",
MetricReport.LOWER_IS_BETTER,
)
zenbenchmark.record(
"metrics_response_latency_p50_us",
metrics_response_latency_us[len(metrics_response_latency_us) // 2],
"microseconds",
"μs",
MetricReport.LOWER_IS_BETTER,
)
zenbenchmark.record(
"status_response_latency_p99_us",
status_response_latency_us[len(status_response_latency_us) * 99 // 100],
"microseconds",
"μs",
MetricReport.LOWER_IS_BETTER,
)
zenbenchmark.record(
"metrics_response_latency_p99_us",
metrics_response_latency_us[len(metrics_response_latency_us) * 99 // 100],
"microseconds",
"μs",
MetricReport.LOWER_IS_BETTER,
)

View File

@@ -23,6 +23,25 @@ if TYPE_CHECKING:
from psycopg2.extensions import connection, cursor
"""
These benchmarks stress test logical replication within Neon. In order to run
them locally, they require setting up some infrastructure. See
https://docs.neon.build/compute/logical_replication_benchmarks.html for how to
do that. After setting that up, run the following shell commands.
# These are the project IDs setup for the purposes of running these benchmarks
export BENCHMARK_PROJECT_ID_PUB=
export BENCHMARK_PROJECT_ID_SUB=
# See https://neon.tech/docs/manage/api-keys
export NEON_API_KEY=
# Fiddling with the --timeout parameter may be required depending on the
# performance of the benchmark
pytest -m remote_cluster 'test_runner/performance/test_logical_replication.py'
"""
@pytest.mark.timeout(1000)
def test_logical_replication(neon_simple_env: NeonEnv, pg_bin: PgBin, vanilla_pg: VanillaPostgres):
env = neon_simple_env

View File

@@ -83,9 +83,7 @@ def test_storage_controller_many_tenants(
"max_offline": "30s",
"max_warming_up": "300s",
}
neon_env_builder.control_plane_compute_hook_api = (
compute_reconfigure_listener.control_plane_compute_hook_api
)
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
AZS = ["alpha", "bravo", "charlie"]

View File

@@ -23,8 +23,8 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder, make_httpserver):
)
env = neon_env_builder.init_start()
neon_env_builder.control_plane_compute_hook_api = (
f"http://{make_httpserver.host}:{make_httpserver.port}/notify-attach"
neon_env_builder.control_plane_hooks_api = (
f"http://{make_httpserver.host}:{make_httpserver.port}/"
)
def ignore_notify(request: Request):

View File

@@ -524,6 +524,42 @@ def test_pageserver_gc_compaction_trigger(neon_env_builder: NeonEnvBuilder):
workload.validate(env.pageserver.id)
def test_pageserver_small_tenant_compaction(neon_env_builder: NeonEnvBuilder):
"""
Create a small tenant that rarely needs compaction and ensure that everything works.
"""
SMOKE_CONF = {
# Run both gc and gc-compaction.
"gc_period": "5s",
"compaction_period": "5s",
# No PiTR interval and small GC horizon
"pitr_interval": "0s",
"gc_horizon": 1024,
"lsn_lease_length": "0s",
}
env = neon_env_builder.init_start(initial_tenant_conf=SMOKE_CONF)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
ps_http = env.pageserver.http_client()
workload = Workload(env, tenant_id, timeline_id)
workload.init(env.pageserver.id)
log.info("Writing initial data ...")
workload.write_rows(10000, env.pageserver.id)
for _ in range(100):
workload.churn_rows(10, env.pageserver.id, upload=False, ingest=False)
ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=True)
ps_http.timeline_compact(tenant_id, timeline_id)
ps_http.timeline_gc(tenant_id, timeline_id, None)
log.info("Validating at workload end ...")
workload.validate(env.pageserver.id)
# Stripe sizes in number of pages.
TINY_STRIPES = 16
LARGE_STRIPES = 32768

View File

@@ -87,8 +87,8 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, make_httpserver,
neon_env_builder.enable_pageserver_remote_storage(
remote_storage_kind=s3_storage(),
)
neon_env_builder.control_plane_compute_hook_api = (
f"http://{make_httpserver.host}:{make_httpserver.port}/notify-attach"
neon_env_builder.control_plane_hooks_api = (
f"http://{make_httpserver.host}:{make_httpserver.port}/"
)
def ignore_notify(request: Request):

View File

@@ -794,7 +794,7 @@ def test_sharding_split_stripe_size(
Check that modifying stripe size inline with a shard split works as expected
"""
(host, port) = httpserver_listen_address
neon_env_builder.control_plane_compute_hook_api = f"http://{host}:{port}/notify"
neon_env_builder.control_plane_hooks_api = f"http://{host}:{port}"
neon_env_builder.num_pageservers = 1
# Set up fake HTTP notify endpoint: we will use this to validate that we receive
@@ -806,7 +806,7 @@ def test_sharding_split_stripe_size(
notifications.append(request.json)
return Response(status=200)
httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler)
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
env = neon_env_builder.init_start(
initial_tenant_shard_count=1, initial_tenant_shard_stripe_size=initial_stripe_size
@@ -1312,9 +1312,7 @@ def test_sharding_split_failures(
failure: Failure,
):
neon_env_builder.num_pageservers = 4
neon_env_builder.control_plane_compute_hook_api = (
compute_reconfigure_listener.control_plane_compute_hook_api
)
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
initial_shard_count = 2
split_shard_count = 4

View File

@@ -605,7 +605,7 @@ def test_storage_controller_compute_hook(
# when migrating.
neon_env_builder.num_pageservers = 2
(host, port) = httpserver_listen_address
neon_env_builder.control_plane_compute_hook_api = f"http://{host}:{port}/notify"
neon_env_builder.control_plane_hooks_api = f"http://{host}:{port}"
# Set up fake HTTP notify endpoint
notifications = []
@@ -618,7 +618,7 @@ def test_storage_controller_compute_hook(
notifications.append(request.json)
return Response(status=status)
httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler)
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
# Start running
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
@@ -724,7 +724,7 @@ def test_storage_controller_stuck_compute_hook(
neon_env_builder.num_pageservers = 2
(host, port) = httpserver_listen_address
neon_env_builder.control_plane_compute_hook_api = f"http://{host}:{port}/notify"
neon_env_builder.control_plane_hooks_api = f"http://{host}:{port}"
handle_params = {"status": 200}
@@ -736,7 +736,7 @@ def test_storage_controller_stuck_compute_hook(
notifications.append(request.json)
return Response(status=status)
httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler)
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
# Start running
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
@@ -871,7 +871,7 @@ def test_storage_controller_compute_hook_retry(
neon_env_builder.num_pageservers = 2
(host, port) = httpserver_listen_address
neon_env_builder.control_plane_compute_hook_api = f"http://{host}:{port}/notify"
neon_env_builder.control_plane_hooks_api = f"http://{host}:{port}"
handle_params = {"status": 200}
@@ -883,7 +883,7 @@ def test_storage_controller_compute_hook_retry(
notifications.append(request.json)
return Response(status=status)
httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler)
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
# Start running
env = neon_env_builder.init_configs()
@@ -993,7 +993,7 @@ def test_storage_controller_compute_hook_revert(
# when migrating.
neon_env_builder.num_pageservers = 2
(host, port) = httpserver_listen_address
neon_env_builder.control_plane_compute_hook_api = f"http://{host}:{port}/notify"
neon_env_builder.control_plane_hooks_api = f"http://{host}:{port}"
# Set up fake HTTP notify endpoint
notifications = []
@@ -1006,7 +1006,7 @@ def test_storage_controller_compute_hook_revert(
notifications.append(request.json)
return Response(status=status)
httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler)
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
# Start running
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
@@ -1395,9 +1395,7 @@ def test_storage_controller_tenant_deletion(
"""
neon_env_builder.num_pageservers = 4
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.control_plane_compute_hook_api = (
compute_reconfigure_listener.control_plane_compute_hook_api
)
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
env = neon_env_builder.init_configs()
env.start()
@@ -1749,18 +1747,23 @@ def test_storage_controller_re_attach(neon_env_builder: NeonEnvBuilder):
# Restart the failed pageserver
victim_ps.start()
env.storage_controller.reconcile_until_idle()
# We expect that the re-attach call correctly tipped off the pageserver that its locations
# are all secondaries now.
locations = victim_ps.http_client().tenant_list_locations()["tenant_shards"]
assert len(locations) == 2
assert all(loc[1]["mode"] == "Secondary" for loc in locations)
# We expect that this situation resulted from the re_attach call, and not any explicit
# Reconciler runs: assert that the reconciliation count has not gone up since we restarted.
# We expect that this situation resulted from background reconciliations
# Reconciler runs: assert that the reconciliation count has gone up by exactly
# one for each shard
reconciles_after_restart = env.storage_controller.get_metric_value(
"storage_controller_reconcile_complete_total", filter={"status": "ok"}
)
assert reconciles_after_restart == reconciles_before_restart
assert reconciles_before_restart is not None
assert reconciles_after_restart == reconciles_before_restart + 2
def test_storage_controller_shard_scheduling_policy(neon_env_builder: NeonEnvBuilder):

View File

@@ -436,7 +436,7 @@ def test_single_branch_get_tenant_size_grows(
# when our tenant is configured with a tiny pitr interval, dropping a table should
# cause synthetic size to go down immediately
tenant_config["pitr_interval"] = "0s"
env.pageserver.http_client().set_tenant_config(tenant_id, tenant_config)
env.storage_controller.pageserver_api().set_tenant_config(tenant_id, tenant_config)
(current_lsn, size) = get_current_consistent_size(
env, endpoint, size_debug_file, http_client, tenant_id, timeline_id
)

View File

@@ -42,6 +42,14 @@ def test_timeline_archive(neon_env_builder: NeonEnvBuilder, shard_count: int):
# If we run the unsharded version, talk to the storage controller
ps_http = env.storage_controller.pageserver_api()
for ps in env.pageservers:
# We make /archival_config requests that are intended to fail.
# It's expected that storcon drops requests to other pageservers after
# it gets the first error (https://github.com/neondatabase/neon/issues/11177)
ps.allowed_errors.append(
".*WARN.* path=/v1/tenant/.*/archival_config .*request was dropped before completing",
)
# first try to archive a non existing timeline for an existing tenant:
invalid_timeline_id = TimelineId.generate()
with pytest.raises(PageserverApiException, match="timeline not found") as exc:

View File

@@ -319,8 +319,9 @@ def test_ancestor_detach_reparents_earlier(neon_env_builder: NeonEnvBuilder):
# this does not contain Z in the end, so fromisoformat accepts it
# it is to be in line with the deletion timestamp.. well, almost.
when = original_ancestor[2][:26]
when_ts = datetime.datetime.fromisoformat(when)
assert when_ts < datetime.datetime.now()
when_ts = datetime.datetime.fromisoformat(when).replace(tzinfo=datetime.UTC)
now = datetime.datetime.utcnow().replace(tzinfo=datetime.UTC)
assert when_ts < now
assert len(lineage.get("reparenting_history", [])) == 0
elif expected_ancestor == timeline_id:
assert len(lineage.get("original_ancestor", [])) == 0
@@ -342,6 +343,138 @@ def test_ancestor_detach_reparents_earlier(neon_env_builder: NeonEnvBuilder):
wait_timeline_detail_404(client, env.initial_tenant, env.initial_timeline)
def test_ancestor_detach_behavior_v2(neon_env_builder: NeonEnvBuilder):
"""
Test the v2 behavior of ancestor detach.
old main -------|---------X--------->
| | |
| | +-> after
| +--X empty snapshot branch
| |
| +-> branch-to-detach
|
+-> earlier
Ends up as:
old main -------|---------X--------->
| | |
| | +-> after
| +--X empty snapshot branch
|
+-> earlier
new main -------|---------|----> branch-to-detach
"""
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
client = env.pageserver.http_client()
with env.endpoints.create_start("main", tenant_id=env.initial_tenant) as ep:
ep.safe_psql("CREATE TABLE foo (i BIGINT);")
ep.safe_psql("CREATE TABLE audit AS SELECT 1 as starts;")
branchpoint_pipe = wait_for_last_flush_lsn(
env, ep, env.initial_tenant, env.initial_timeline
)
ep.safe_psql("INSERT INTO foo SELECT i::bigint FROM generate_series(0, 8191) g(i);")
branchpoint_x = wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
client.timeline_checkpoint(env.initial_tenant, env.initial_timeline)
ep.safe_psql("INSERT INTO foo SELECT i::bigint FROM generate_series(8192, 16383) g(i);")
wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
earlier = env.create_branch(
"earlier", ancestor_branch_name="main", ancestor_start_lsn=branchpoint_pipe
)
snapshot_branchpoint = env.create_branch(
"snapshot_branchpoint", ancestor_branch_name="main", ancestor_start_lsn=branchpoint_x
)
branch_to_detach = env.create_branch(
"branch_to_detach",
ancestor_branch_name="snapshot_branchpoint",
ancestor_start_lsn=branchpoint_x,
)
after = env.create_branch("after", ancestor_branch_name="main", ancestor_start_lsn=None)
all_reparented = client.detach_ancestor(env.initial_tenant, branch_to_detach, behavior_v2=True)
assert set(all_reparented) == set()
env.pageserver.quiesce_tenants()
# checking the ancestor after is much faster than waiting for the endpoint not start
expected_result = [
("main", env.initial_timeline, None, 16384, 1),
("after", after, env.initial_timeline, 16384, 1),
("snapshot_branchpoint", snapshot_branchpoint, env.initial_timeline, 8192, 1),
("branch_to_detach", branch_to_detach, None, 8192, 1),
("earlier", earlier, env.initial_timeline, 0, 1),
]
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
for branch_name, queried_timeline, expected_ancestor, _, _ in expected_result:
details = client.timeline_detail(env.initial_tenant, queried_timeline)
ancestor_timeline_id = details["ancestor_timeline_id"]
if expected_ancestor is None:
assert ancestor_timeline_id is None
else:
assert (
TimelineId(ancestor_timeline_id) == expected_ancestor
), f"when checking branch {branch_name}, mapping={expected_result}"
index_part = env.pageserver_remote_storage.index_content(
env.initial_tenant, queried_timeline
)
lineage = index_part["lineage"]
assert lineage is not None
assert lineage.get("reparenting_history_overflown", "false") == "false"
if queried_timeline == branch_to_detach:
original_ancestor = lineage["original_ancestor"]
assert original_ancestor is not None
assert original_ancestor[0] == str(env.initial_timeline)
assert original_ancestor[1] == str(branchpoint_x)
# this does not contain Z in the end, so fromisoformat accepts it
# it is to be in line with the deletion timestamp.. well, almost.
when = original_ancestor[2][:26]
when_ts = datetime.datetime.fromisoformat(when).replace(tzinfo=datetime.UTC)
now = datetime.datetime.utcnow().replace(tzinfo=datetime.UTC)
assert when_ts < now
assert len(lineage.get("reparenting_history", [])) == 0
elif expected_ancestor == branch_to_detach:
assert len(lineage.get("original_ancestor", [])) == 0
assert lineage["reparenting_history"] == [str(env.initial_timeline)]
else:
assert len(lineage.get("original_ancestor", [])) == 0
assert len(lineage.get("reparenting_history", [])) == 0
for name, _, _, rows, starts in expected_result:
with env.endpoints.create_start(name, tenant_id=env.initial_tenant) as ep:
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == rows
assert ep.safe_psql(f"SELECT count(*) FROM audit WHERE starts = {starts}")[0][0] == 1
# delete the new timeline to confirm it doesn't carry over the anything from the old timeline
client.timeline_delete(env.initial_tenant, branch_to_detach)
wait_timeline_detail_404(client, env.initial_tenant, branch_to_detach)
# delete the after timeline
client.timeline_delete(env.initial_tenant, after)
wait_timeline_detail_404(client, env.initial_tenant, after)
def test_detached_receives_flushes_while_being_detached(neon_env_builder: NeonEnvBuilder):
"""
Makes sure that the timeline is able to receive writes through-out the detach process.

View File

@@ -1,7 +1,7 @@
{
"v17": [
"17.4",
"e5e87b9f52d0eaeb83f3e2517bb9727aac37729b"
"4cf26c355142dc9009042dbc90e0231a6218fe0d"
],
"v16": [
"16.8",

View File

@@ -16,6 +16,7 @@ license.workspace = true
### BEGIN HAKARI SECTION
[dependencies]
ahash = { version = "0.8" }
anstream = { version = "0.6" }
anyhow = { version = "1", features = ["backtrace"] }
base64-594e8ee84c453af0 = { package = "base64", version = "0.13", features = ["alloc"] }
base64-647d43efb71741da = { package = "base64", version = "0.21" }
@@ -25,11 +26,16 @@ camino = { version = "1", default-features = false, features = ["serde1"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] }
clap = { version = "4", features = ["derive", "env", "string"] }
clap_builder = { version = "4", default-features = false, features = ["color", "env", "help", "std", "string", "suggestions", "usage"] }
const-oid = { version = "0.9", default-features = false, features = ["db", "std"] }
crypto-bigint = { version = "0.5", features = ["generic-array", "zeroize"] }
der = { version = "0.7", default-features = false, features = ["oid", "pem", "std"] }
der = { version = "0.7", default-features = false, features = ["derive", "flagset", "oid", "pem", "std"] }
deranged = { version = "0.3", default-features = false, features = ["powerfmt", "serde", "std"] }
digest = { version = "0.10", features = ["mac", "oid", "std"] }
ecdsa = { version = "0.16", features = ["pem", "signing", "std", "verifying"] }
either = { version = "1" }
elliptic-curve = { version = "0.13", default-features = false, features = ["digest", "hazmat", "jwk", "pem", "std"] }
env_filter = { version = "0.1", default-features = false, features = ["regex"] }
env_logger = { version = "0.11" }
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
form_urlencoded = { version = "1" }
futures-channel = { version = "0.3", features = ["sink"] }
@@ -47,8 +53,7 @@ hyper-dff4ba8e3ae991db = { package = "hyper", version = "1", features = ["full"]
hyper-util = { version = "0.1", features = ["client-legacy", "http1", "http2", "server", "service"] }
indexmap-dff4ba8e3ae991db = { package = "indexmap", version = "1", default-features = false, features = ["std"] }
indexmap-f595c2ba2a3f28df = { package = "indexmap", version = "2", features = ["serde"] }
itertools-5ef9efb8ec2df382 = { package = "itertools", version = "0.12" }
itertools-93f6ce9d446188ac = { package = "itertools", version = "0.10" }
itertools = { version = "0.12" }
lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] }
libc = { version = "0.2", features = ["extra_traits", "use_std"] }
log = { version = "0.4", default-features = false, features = ["std"] }
@@ -63,6 +68,7 @@ num-iter = { version = "0.1", default-features = false, features = ["i128", "std
num-rational = { version = "0.4", default-features = false, features = ["num-bigint-std", "std"] }
num-traits = { version = "0.2", features = ["i128", "libm"] }
once_cell = { version = "1" }
p256 = { version = "0.13", features = ["jwk"] }
parquet = { version = "53", default-features = false, features = ["zstd"] }
prost = { version = "0.13", features = ["no-recursion-limit", "prost-derive"] }
rand = { version = "0.8", features = ["small_rng"] }
@@ -72,6 +78,7 @@ regex-syntax = { version = "0.8" }
reqwest = { version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls", "rustls-tls-native-roots", "stream"] }
rustls = { version = "0.23", default-features = false, features = ["logging", "ring", "std", "tls12"] }
scopeguard = { version = "1" }
sec1 = { version = "0.7", features = ["pem", "serde", "std", "subtle"] }
serde = { version = "1", features = ["alloc", "derive"] }
serde_json = { version = "1", features = ["alloc", "raw_value"] }
sha2 = { version = "0.10", features = ["asm", "oid"] }
@@ -115,8 +122,7 @@ half = { version = "2", default-features = false, features = ["num-traits"] }
hashbrown = { version = "0.14", features = ["raw"] }
indexmap-dff4ba8e3ae991db = { package = "indexmap", version = "1", default-features = false, features = ["std"] }
indexmap-f595c2ba2a3f28df = { package = "indexmap", version = "2", features = ["serde"] }
itertools-5ef9efb8ec2df382 = { package = "itertools", version = "0.12" }
itertools-93f6ce9d446188ac = { package = "itertools", version = "0.10" }
itertools = { version = "0.12" }
libc = { version = "0.2", features = ["extra_traits", "use_std"] }
log = { version = "0.4", default-features = false, features = ["std"] }
memchr = { version = "2" }